1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

add the close_socket() function requested in #107

This commit is contained in:
Joel Bender 2017-03-03 23:43:05 -05:00
parent 6b93f606ee
commit 66c4eebf55
9 changed files with 78 additions and 0 deletions

View File

@ -503,6 +503,12 @@ class BIPSimpleApplication(ApplicationIOController, WhoIsIAmServices, ReadWriteP
# bind the BIP stack to the network, no network number
self.nsap.bind(self.bip)
def close_socket(self):
if _debug: BIPSimpleApplication._debug("close_socket")
# pass to the multiplexer, then down to the sockets
self.mux.close_socket()
bacpypes_debugging(BIPSimpleApplication)
#
@ -550,6 +556,12 @@ class BIPForeignApplication(ApplicationIOController, WhoIsIAmServices, ReadWrite
# bind the NSAP to the stack, no network number
self.nsap.bind(self.bip)
def close_socket(self):
if _debug: BIPForeignApplication._debug("close_socket")
# pass to the multiplexer, then down to the sockets
self.mux.close_socket()
bacpypes_debugging(BIPForeignApplication)
#

View File

@ -102,6 +102,14 @@ class UDPMultiplexer:
self.annexH = _MultiplexServer(self)
self.annexJ = _MultiplexServer(self)
def close_socket(self):
if _debug: UDPMultiplexer._debug("close_socket")
# pass along the close to the director(s)
self.directPort.close_socket()
if self.broadcastPort:
self.broadcastPort.close_socket()
def indication(self, server, pdu):
if _debug: UDPMultiplexer._debug("indication %r %r", server, pdu)

View File

@ -248,6 +248,12 @@ class UDPDirector(asyncore.dispatcher, Server, ServiceAccessPoint):
# let the director handle the error
self.handle_error(err)
def close_socket(self):
"""Close the socket."""
if _debug: UDPDirector._debug("close_socket")
self.socket.close()
def handle_close(self):
"""Remove this from the monitor when it's closed."""
if _debug: UDPDirector._debug("handle_close")

View File

@ -500,6 +500,12 @@ class BIPSimpleApplication(ApplicationIOController, WhoIsIAmServices, ReadWriteP
# bind the BIP stack to the network, no network number
self.nsap.bind(self.bip)
def close_socket(self):
if _debug: BIPSimpleApplication._debug("close_socket")
# pass to the multiplexer, then down to the sockets
self.mux.close_socket()
#
# BIPForeignApplication
#
@ -546,6 +552,12 @@ class BIPForeignApplication(ApplicationIOController, WhoIsIAmServices, ReadWrite
# bind the NSAP to the stack, no network number
self.nsap.bind(self.bip)
def close_socket(self):
if _debug: BIPForeignApplication._debug("close_socket")
# pass to the multiplexer, then down to the sockets
self.mux.close_socket()
#
# BIPNetworkApplication
#

View File

@ -103,6 +103,14 @@ class UDPMultiplexer:
self.annexH = _MultiplexServer(self)
self.annexJ = _MultiplexServer(self)
def close_socket(self):
if _debug: UDPMultiplexer._debug("close_socket")
# pass along the close to the director(s)
self.directPort.close_socket()
if self.broadcastPort:
self.broadcastPort.close_socket()
def indication(self, server, pdu):
if _debug: UDPMultiplexer._debug("indication %r %r", server, pdu)

View File

@ -247,6 +247,12 @@ class UDPDirector(asyncore.dispatcher, Server, ServiceAccessPoint):
# let the director handle the error
self.handle_error(err)
def close_socket(self):
"""Close the socket."""
if _debug: UDPDirector._debug("close_socket")
self.socket.close()
def handle_close(self):
"""Remove this from the monitor when it's closed."""
if _debug: UDPDirector._debug("handle_close")

View File

@ -500,6 +500,12 @@ class BIPSimpleApplication(ApplicationIOController, WhoIsIAmServices, ReadWriteP
# bind the BIP stack to the network, no network number
self.nsap.bind(self.bip)
def close_socket(self):
if _debug: BIPSimpleApplication._debug("close_socket")
# pass to the multiplexer, then down to the sockets
self.mux.close_socket()
#
# BIPForeignApplication
#
@ -546,6 +552,12 @@ class BIPForeignApplication(ApplicationIOController, WhoIsIAmServices, ReadWrite
# bind the NSAP to the stack, no network number
self.nsap.bind(self.bip)
def close_socket(self):
if _debug: BIPForeignApplication._debug("close_socket")
# pass to the multiplexer, then down to the sockets
self.mux.close_socket()
#
# BIPNetworkApplication
#

View File

@ -102,6 +102,14 @@ class UDPMultiplexer:
self.annexH = _MultiplexServer(self)
self.annexJ = _MultiplexServer(self)
def close_socket(self):
if _debug: UDPMultiplexer._debug("close_socket")
# pass along the close to the director(s)
self.directPort.close_socket()
if self.broadcastPort:
self.broadcastPort.close_socket()
def indication(self, server, pdu):
if _debug: UDPMultiplexer._debug("indication %r %r", server, pdu)

View File

@ -247,6 +247,12 @@ class UDPDirector(asyncore.dispatcher, Server, ServiceAccessPoint):
# let the director handle the error
self.handle_error(err)
def close_socket(self):
"""Close the socket."""
if _debug: UDPDirector._debug("close_socket")
self.socket.close()
def handle_close(self):
"""Remove this from the monitor when it's closed."""
if _debug: UDPDirector._debug("handle_close")