From ac3d56f18eab6056080579a7afba9ecbca1ebfc7 Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Wed, 26 Oct 2016 01:43:57 -0400 Subject: [PATCH] change from passing exceptions up the stack (which mever seemed right) to passing them to the application service element via the director --- py25/bacpypes/tcp.py | 121 +++++++++++++++++++++++++-------------- py27/bacpypes/tcp.py | 133 +++++++++++++++++++++++++++++-------------- py34/bacpypes/tcp.py | 119 +++++++++++++++++++++++++------------- samples/TCPClient.py | 2 +- samples/TCPServer.py | 2 +- 5 files changed, 249 insertions(+), 128 deletions(-) diff --git a/py25/bacpypes/tcp.py b/py25/bacpypes/tcp.py index 4ee2862..10d5679 100755 --- a/py25/bacpypes/tcp.py +++ b/py25/bacpypes/tcp.py @@ -111,8 +111,8 @@ class TCPClient(asyncore.dispatcher): except socket.error as err: if _debug: TCPClient._debug(" - connect socket error: %r", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to an error handler + self.handle_error(err) def handle_connect(self): if _debug: TCPClient._debug("handle_connect") @@ -126,24 +126,15 @@ class TCPClient(asyncore.dispatcher): # check for connection refused if (err == 0): - if _debug: TCPClient._debug(" - connection established") + if _debug: TCPClient._debug(" - no error") elif (err == 111): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) - deferred(self.response, socket.error(111, "connection refused")) + self.handle_error(socket.error(111, "connection refused")) return # pass along asyncore.dispatcher.handle_connect_event(self) - def handle_expt(self): - if _debug: TCPClient._debug("handle_expt") - - err = self.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) - if _debug: TCPClient._debug(" - err: %r", err) - - # pass along - asyncore.dispatcher.handle_expt(self) - def readable(self): return self.connected @@ -158,7 +149,7 @@ class TCPClient(asyncore.dispatcher): if not self.socket: if _debug: TCPClient._debug(" - socket was closed") else: - # sent the data upstream + # send the data upstream deferred(self.response, PDU(msg)) except socket.error as err: @@ -167,8 +158,8 @@ class TCPClient(asyncore.dispatcher): else: TCPClient._debug(" - recv socket error: %r", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to a handler + self.handle_error(err) def writable(self): return (len(self.request) != 0) @@ -191,8 +182,8 @@ class TCPClient(asyncore.dispatcher): else: if _debug: TCPClient._debug(" - send socket error: %s", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to a handler + self.handle_error(err) def handle_write_event(self): if _debug: TCPClient._debug("handle_write_event") @@ -204,7 +195,9 @@ class TCPClient(asyncore.dispatcher): # check for connection refused if (err == 61): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) - deferred(self.response, socket.error(61, "connection refused")) + self.handle_error(socket.error(61, "connection refused")) + self.handle_close() + return # pass along asyncore.dispatcher.handle_write_event(self) @@ -218,6 +211,13 @@ class TCPClient(asyncore.dispatcher): # make sure other routines know the socket is closed self.socket = None + def handle_error(self, error=None): + """Trap for TCPClient errors, otherwise continue.""" + if _debug: TCPClient._debug("handle_error %r", error) + + # core does not take parameters + asyncore.dispatcher.handle_error(self) + def indication(self, pdu): """Requests are queued for delivery.""" if _debug: TCPClient._debug("indication %r", pdu) @@ -256,6 +256,16 @@ class TCPClientActor(TCPClient): # tell the director this is a new actor self.director.add_actor(self) + def handle_error(self, error=None): + """Trap for TCPClient errors, otherwise continue.""" + if _debug: TCPClientActor._debug("handle_error %r", error) + + # pass along to the director + if error is not None: + self.director.actor_error(self, error) + else: + TCPClient.handle_error(self) + def handle_close(self): if _debug: TCPClientActor._debug("handle_close") @@ -388,6 +398,13 @@ class TCPClientDirector(Server, ServiceAccessPoint, DebugContents): connect_task = FunctionTask(self.connect, actor.peer) connect_task.install_task(_time() + self.reconnect[actor.peer]) + def actor_error(self, actor, error): + if _debug: TCPClientDirector._debug("actor_error %r %r", actor, error) + + # tell the ASE the actor had an error + if self.serviceElement: + self.sap_request(actor_error=actor, error=error) + def get_actor(self, address): """ Get the actor associated with an address or None. """ return self.clients.get(address, None) @@ -452,67 +469,75 @@ class TCPServer(asyncore.dispatcher): self.request = '' def handle_connect(self): - if _debug: deferred(TCPServer._debug, "handle_connect") + if _debug: TCPServer._debug("handle_connect") def readable(self): return 1 def handle_read(self): - if _debug: deferred(TCPServer._debug, "handle_read") + if _debug: TCPServer._debug("handle_read") try: msg = self.recv(65536) - if _debug: deferred(TCPServer._debug, " - received %d octets", len(msg)) + if _debug: TCPServer._debug(" - received %d octets", len(msg)) # no socket means it was closed if not self.socket: - if _debug: deferred(TCPServer._debug, " - socket was closed") + if _debug: TCPServer._debug(" - socket was closed") else: + # send the data upstream deferred(self.response, PDU(msg)) except socket.error as err: if (err.args[0] == 111): - deferred(TCPServer._debug, " - connection to %r refused", self.peer) + if _debug: TCPServer._debug(" - connection to %r refused", self.peer) else: - deferred(TCPServer._debug, " - recv socket error: %s", err) + if _debug: TCPServer._debug(" - recv socket error: %s", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to a handler + self.handle_error(err) def writable(self): return (len(self.request) != 0) def handle_write(self): - if _debug: deferred(TCPServer._debug, "handle_write") + if _debug: TCPServer._debug("handle_write") try: sent = self.send(self.request) - if _debug: deferred(TCPServer._debug, " - sent %d octets, %d remaining", sent, len(self.request) - sent) + if _debug: TCPServer._debug(" - sent %d octets, %d remaining", sent, len(self.request) - sent) self.request = self.request[sent:] except socket.error as err: if (err.args[0] == 111): - deferred(TCPServer._debug, " - connection to %r refused", self.peer) + if _debug: TCPServer._debug(" - connection to %r refused", self.peer) else: - deferred(TCPServer._debug, " - send socket error: %s", err) + if _debug: TCPServer._debug(" - send socket error: %s", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to a handler + self.handle_error(err) def handle_close(self): - if _debug: deferred(TCPServer._debug, "handle_close") + if _debug: TCPServer._debug("handle_close") if not self: - deferred(TCPServer._warning, "handle_close: self is None") + if _debug: TCPServer._debug(" - self is None") return if not self.socket: - deferred(TCPServer._warning, "handle_close: socket already closed") + if _debug: TCPServer._debug(" - socket already closed") return self.close() self.socket = None + def handle_error(self, error=None): + """Trap for TCPServer errors, otherwise continue.""" + if _debug: TCPServer._debug("handle_error %r", error) + + # core does not take parameters + asyncore.dispatcher.handle_error(self) + def indication(self, pdu): """Requests are queued for delivery.""" if _debug: TCPServer._debug("indication %r", pdu) @@ -548,6 +573,16 @@ class TCPServerActor(TCPServer): # tell the director this is a new actor self.director.add_actor(self) + def handle_error(self, error=None): + """Trap for TCPServer errors, otherwise continue.""" + if _debug: TCPServerActor._debug("handle_error %r", error) + + # pass along to the director + if error is not None: + self.director.actor_error(self, error) + else: + TCPServer.handle_error(self) + def handle_close(self): if _debug: TCPServerActor._debug("handle_close") @@ -728,6 +763,13 @@ class TCPServerDirector(asyncore.dispatcher, Server, ServiceAccessPoint, DebugCo if self.serviceElement: self.sap_request(delPeer=actor.peer) + def actor_error(self, actor, error): + if _debug: TCPServerDirector._debug("actor_error %r %r", actor, error) + + # tell the ASE the actor had an error + if self.serviceElement: + self.sap_request(actor_error=actor, error=error) + def get_actor(self, address): """ Get the actor associated with an address or None. """ return self.servers.get(address, None) @@ -814,11 +856,6 @@ class StreamToPacket(Client, Server): """Message going upstream.""" if _debug: StreamToPacket._debug("StreamToPacket.confirmation %r", pdu) - # short circuit errors - if isinstance(pdu, IOError): - self.response(pdu) - return - # hack it up into chunks for packet in self.packetize(pdu, self.upstreamBuffer): self.response(packet) @@ -839,7 +876,7 @@ class StreamToPacketSAP(ApplicationServiceElement, ServiceAccessPoint): # save a reference to the StreamToPacket object self.stp = stp - def indication(self, addPeer=None, delPeer=None): + def indication(self, addPeer=None, delPeer=None, actor_error=None, error=None): if _debug: StreamToPacketSAP._debug("indication addPeer=%r delPeer=%r", addPeer, delPeer) if addPeer: diff --git a/py27/bacpypes/tcp.py b/py27/bacpypes/tcp.py index 5c2d39d..cafeebe 100755 --- a/py27/bacpypes/tcp.py +++ b/py27/bacpypes/tcp.py @@ -117,8 +117,11 @@ class TCPClient(asyncore.dispatcher): except socket.error as err: if _debug: TCPClient._debug(" - connect socket error: %r", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to a handler + self.handle_error(err) + + def handle_accept(self): + if _debug: TCPClient._debug("handle_accept") def handle_connect(self): if _debug: TCPClient._debug("handle_connect") @@ -132,24 +135,15 @@ class TCPClient(asyncore.dispatcher): # check for connection refused if (err == 0): - if _debug: TCPClient._debug(" - connection established") + if _debug: TCPClient._debug(" - no error") elif (err == 111): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) - deferred(self.response, socket.error(111, "connection refused")) + self.handle_error(socket.error(111, "connection refused")) return # pass along asyncore.dispatcher.handle_connect_event(self) - def handle_expt(self): - if _debug: TCPClient._debug("handle_expt") - - err = self.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) - if _debug: TCPClient._debug(" - err: %r", err) - - # pass along - asyncore.dispatcher.handle_expt(self) - def readable(self): return self.connected @@ -164,17 +158,17 @@ class TCPClient(asyncore.dispatcher): if not self.socket: if _debug: TCPClient._debug(" - socket was closed") else: - # sent the data upstream + # send the data upstream deferred(self.response, PDU(msg)) except socket.error as err: if (err.args[0] == 111): - deferred(TCPClient._debug, " - connection to %r refused", self.peer) + if _debug: TCPClient._debug(" - connection to %r refused", self.peer) else: - deferred(TCPClient._debug, " - recv socket error: %r", err) + if _debug: TCPClient._debug(" - recv socket error: %r", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to a handler + self.handle_error(err) def writable(self): return (len(self.request) != 0) @@ -197,8 +191,8 @@ class TCPClient(asyncore.dispatcher): else: if _debug: TCPClient._debug(" - send socket error: %s", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to a handler + self.handle_error(err) def handle_write_event(self): if _debug: TCPClient._debug("handle_write_event") @@ -210,7 +204,9 @@ class TCPClient(asyncore.dispatcher): # check for connection refused if (err == 61): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) - deferred(self.response, socket.error(61, "connection refused")) + self.handle_error(socket.error(61, "connection refused")) + self.handle_close() + return # pass along asyncore.dispatcher.handle_write_event(self) @@ -224,6 +220,20 @@ class TCPClient(asyncore.dispatcher): # make sure other routines know the socket is closed self.socket = None + def handle_error(self, error=None): + """Trap for TCPClient errors, otherwise continue.""" + if _debug: TCPClient._debug("handle_error %r", error) + + # core does not take parameters + asyncore.dispatcher.handle_error(self) + + def handle_error(self, error=None): + """Trap for TCPClient errors, otherwise continue.""" + if _debug: TCPClient._debug("handle_error %r", error) + + # core does not take parameters + asyncore.dispatcher.handle_error(self) + def indication(self, pdu): """Requests are queued for delivery.""" if _debug: TCPClient._debug("indication %r", pdu) @@ -261,6 +271,16 @@ class TCPClientActor(TCPClient): # tell the director this is a new actor self.director.add_actor(self) + def handle_error(self, error=None): + """Trap for TCPClient errors, otherwise continue.""" + if _debug: TCPClientActor._debug("handle_error %r", error) + + # pass along to the director + if error is not None: + self.director.actor_error(self, error) + else: + TCPClient.handle_error(self) + def handle_close(self): if _debug: TCPClientActor._debug("handle_close") @@ -392,6 +412,13 @@ class TCPClientDirector(Server, ServiceAccessPoint, DebugContents): connect_task = FunctionTask(self.connect, actor.peer) connect_task.install_task(_time() + self.reconnect[actor.peer]) + def actor_error(self, actor, error): + if _debug: TCPClientDirector._debug("actor_error %r %r", actor, error) + + # tell the ASE the actor had an error + if self.serviceElement: + self.sap_request(actor_error=actor, error=error) + def get_actor(self, address): """ Get the actor associated with an address or None. """ return self.clients.get(address, None) @@ -455,67 +482,75 @@ class TCPServer(asyncore.dispatcher): self.request = '' def handle_connect(self): - if _debug: deferred(TCPServer._debug, "handle_connect") + if _debug: TCPServer._debug("handle_connect") def readable(self): return self.connected def handle_read(self): - if _debug: deferred(TCPServer._debug, "handle_read") + if _debug: TCPServer._debug("handle_read") try: msg = self.recv(65536) - if _debug: deferred(TCPServer._debug, " - received %d octets", len(msg)) + if _debug: TCPServer._debug(" - received %d octets", len(msg)) # no socket means it was closed if not self.socket: - if _debug: deferred(TCPServer._debug, " - socket was closed") + if _debug: TCPServer._debug(" - socket was closed") else: + # send the data upstream deferred(self.response, PDU(msg)) except socket.error as err: if (err.args[0] == 111): - deferred(TCPServer._debug, " - connection to %r refused", self.peer) + if _debug: TCPServer._debug(" - connection to %r refused", self.peer) else: - deferred(TCPServer._debug, " - recv socket error: %s", err) + if _debug: TCPServer._debug(" - recv socket error: %s", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to a handler + self.handle_error(err) def writable(self): return (len(self.request) != 0) def handle_write(self): - if _debug: deferred(TCPServer._debug, "handle_write") + if _debug: TCPServer._debug("handle_write") try: sent = self.send(self.request) - if _debug: deferred(TCPServer._debug, " - sent %d octets, %d remaining", sent, len(self.request) - sent) + if _debug: TCPServer._debug(" - sent %d octets, %d remaining", sent, len(self.request) - sent) self.request = self.request[sent:] except socket.error as err: if (err.args[0] == 111): - deferred(TCPServer._debug, " - connection to %r refused", self.peer) + if _debug: TCPServer._debug(" - connection to %r refused", self.peer) else: - deferred(TCPServer._debug, " - send socket error: %s", err) + if _debug: TCPServer._debug(" - send socket error: %s", err) # sent the exception upstream - deferred(self.response, err) + self.handle_error(err) def handle_close(self): - if _debug: deferred(TCPServer._debug, "handle_close") + if _debug: TCPServer._debug("handle_close") if not self: - deferred(TCPServer._warning, "handle_close: self is None") + if _debug: TCPServer._debug(" - self is None") return if not self.socket: - deferred(TCPServer._warning, "handle_close: socket already closed") + if _debug: TCPServer._debug(" - socket already closed") return self.close() self.socket = None + def handle_error(self, error=None): + """Trap for TCPServer errors, otherwise continue.""" + if _debug: TCPServer._debug("handle_error %r", error) + + # core does not take parameters + asyncore.dispatcher.handle_error(self) + def indication(self, pdu): """Requests are queued for delivery.""" if _debug: TCPServer._debug("indication %r", pdu) @@ -550,6 +585,16 @@ class TCPServerActor(TCPServer): # tell the director this is a new actor self.director.add_actor(self) + def handle_error(self, error=None): + """Trap for TCPServer errors, otherwise continue.""" + if _debug: TCPServerActor._debug("handle_error %r", error) + + # pass along to the director + if error is not None: + self.director.actor_error(self, error) + else: + TCPServer.handle_error(self) + def handle_close(self): if _debug: TCPServerActor._debug("handle_close") @@ -729,6 +774,13 @@ class TCPServerDirector(asyncore.dispatcher, Server, ServiceAccessPoint, DebugCo if self.serviceElement: self.sap_request(delPeer=actor.peer) + def actor_error(self, actor, error): + if _debug: TCPServerDirector._debug("actor_error %r %r", actor, error) + + # tell the ASE the actor had an error + if self.serviceElement: + self.sap_request(actor_error=actor, error=error) + def get_actor(self, address): """ Get the actor associated with an address or None. """ return self.servers.get(address, None) @@ -814,11 +866,6 @@ class StreamToPacket(Client, Server): """Message going upstream.""" if _debug: StreamToPacket._debug("StreamToPacket.confirmation %r", pdu) - # short circuit errors - if isinstance(pdu, IOError): - self.response(pdu) - return - # hack it up into chunks for packet in self.packetize(pdu, self.upstreamBuffer): self.response(packet) @@ -838,7 +885,7 @@ class StreamToPacketSAP(ApplicationServiceElement, ServiceAccessPoint): # save a reference to the StreamToPacket object self.stp = stp - def indication(self, addPeer=None, delPeer=None): + def indication(self, addPeer=None, delPeer=None, actor_error=None, error=None): if _debug: StreamToPacketSAP._debug("indication addPeer=%r delPeer=%r", addPeer, delPeer) if addPeer: diff --git a/py34/bacpypes/tcp.py b/py34/bacpypes/tcp.py index 441b77d..e357409 100755 --- a/py34/bacpypes/tcp.py +++ b/py34/bacpypes/tcp.py @@ -111,8 +111,8 @@ class TCPClient(asyncore.dispatcher): except socket.error as err: if _debug: TCPClient._debug(" - connect socket error: %r", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to a handler + self.handle_error(err) def handle_connect(self): if _debug: TCPClient._debug("handle_connect") @@ -126,24 +126,15 @@ class TCPClient(asyncore.dispatcher): # check for connection refused if (err == 0): - if _debug: TCPClient._debug(" - connection established") + if _debug: TCPClient._debug(" - no error") elif (err == 111): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) - deferred(self.response, socket.error(111, "connection refused")) + self.handle_error(socket.error(111, "connection refused")) return # pass along asyncore.dispatcher.handle_connect_event(self) - def handle_expt(self): - if _debug: TCPClient._debug("handle_expt") - - err = self.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) - if _debug: TCPClient._debug(" - err: %r", err) - - # pass along - asyncore.dispatcher.handle_expt(self) - def readable(self): return self.connected @@ -167,8 +158,8 @@ class TCPClient(asyncore.dispatcher): else: if _debug: TCPClient._debug(" - recv socket error: %r", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to a handler + self.handle_error(err) def writable(self): return (len(self.request) != 0) @@ -191,8 +182,8 @@ class TCPClient(asyncore.dispatcher): else: if _debug: TCPClient._debug(" - send socket error: %s", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to a handler + self.handle_error(err) def handle_write_event(self): if _debug: TCPClient._debug("handle_write_event") @@ -204,7 +195,9 @@ class TCPClient(asyncore.dispatcher): # check for connection refused if (err == 61): if _debug: TCPClient._debug(" - connection to %r refused", self.peer) - deferred(self.response, socket.error(61, "connection refused")) + self.handle_error(socket.error(61, "connection refused")) + self.handle_close() + return # pass along asyncore.dispatcher.handle_write_event(self) @@ -218,6 +211,13 @@ class TCPClient(asyncore.dispatcher): # make sure other routines know the socket is closed self.socket = None + def handle_error(self, error=None): + """Trap for TCPClient errors, otherwise continue.""" + if _debug: TCPClient._debug("handle_error %r", error) + + # core does not take parameters + asyncore.dispatcher.handle_error(self) + def indication(self, pdu): """Requests are queued for delivery.""" if _debug: TCPClient._debug("indication %r", pdu) @@ -255,6 +255,16 @@ class TCPClientActor(TCPClient): # tell the director this is a new actor self.director.add_actor(self) + def handle_error(self, error=None): + """Trap for TCPClient errors, otherwise continue.""" + if _debug: TCPClientActor._debug("handle_error %r", error) + + # pass along to the director + if error is not None: + self.director.actor_error(self, error) + else: + TCPClient.handle_error(self) + def handle_close(self): if _debug: TCPClientActor._debug("handle_close") @@ -386,6 +396,13 @@ class TCPClientDirector(Server, ServiceAccessPoint, DebugContents): connect_task = FunctionTask(self.connect, actor.peer) connect_task.install_task(_time() + self.reconnect[actor.peer]) + def actor_error(self, actor, error): + if _debug: TCPClientDirector._debug("actor_error %r %r", actor, error) + + # tell the ASE the actor had an error + if self.serviceElement: + self.sap_request(actor_error=actor, error=error) + def get_actor(self, address): """ Get the actor associated with an address or None. """ return self.clients.get(address, None) @@ -449,67 +466,75 @@ class TCPServer(asyncore.dispatcher): self.request = '' def handle_connect(self): - if _debug: deferred(TCPServer._debug, "handle_connect") + if _debug: TCPServer._debug("handle_connect") def readable(self): return 1 def handle_read(self): - if _debug: deferred(TCPServer._debug, "handle_read") + if _debug: TCPServer._debug("handle_read") try: msg = self.recv(65536) - if _debug: deferred(TCPServer._debug, " - received %d octets", len(msg)) + if _debug: TCPServer._debug(" - received %d octets", len(msg)) # no socket means it was closed if not self.socket: - if _debug: deferred(TCPServer._debug, " - socket was closed") + if _debug: TCPServer._debug(" - socket was closed") else: + # send the data upstream deferred(self.response, PDU(msg)) except socket.error as err: if (err.args[0] == 111): - deferred(TCPServer._debug, " - connection to %r refused", self.peer) + if _debug: TCPServer._debug(" - connection to %r refused", self.peer) else: - deferred(TCPServer._debug, " - recv socket error: %s", err) + if _debug: TCPServer._debug(" - recv socket error: %s", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to a handler + self.handle_error(err) def writable(self): return (len(self.request) != 0) def handle_write(self): - if _debug: deferred(TCPServer._debug, "handle_write") + if _debug: TCPServer._debug("handle_write") try: sent = self.send(self.request) - if _debug: deferred(TCPServer._debug, " - sent %d octets, %d remaining", sent, len(self.request) - sent) + if _debug: TCPServer._debug(" - sent %d octets, %d remaining", sent, len(self.request) - sent) self.request = self.request[sent:] except socket.error as err: if (err.args[0] == 111): - deferred(TCPServer._debug, " - connection to %r refused", self.peer) + if _debug: TCPServer._debug(" - connection to %r refused", self.peer) else: - deferred(TCPServer._debug, " - send socket error: %s", err) + if _debug: TCPServer._debug(" - send socket error: %s", err) - # sent the exception upstream - deferred(self.response, err) + # pass along to a handler + self.handle_error(err) def handle_close(self): - if _debug: deferred(TCPServer._debug, "handle_close") + if _debug: TCPServer._debug("handle_close") if not self: - deferred(TCPServer._warning, "handle_close: self is None") + if _debug: TCPServer._warning("handle_close: self is None") return if not self.socket: - deferred(TCPServer._warning, "handle_close: socket already closed") + if _debug: TCPServer._warning("handle_close: socket already closed") return self.close() self.socket = None + def handle_error(self, error=None): + """Trap for TCPServer errors, otherwise continue.""" + if _debug: TCPServer._debug("handle_error %r", error) + + # core does not take parameters + asyncore.dispatcher.handle_error(self) + def indication(self, pdu): """Requests are queued for delivery.""" if _debug: TCPServer._debug("indication %r", pdu) @@ -544,6 +569,16 @@ class TCPServerActor(TCPServer): # tell the director this is a new actor self.director.add_actor(self) + def handle_error(self, error=None): + """Trap for TCPServer errors, otherwise continue.""" + if _debug: TCPServerActor._debug("handle_error %r", error) + + # pass along to the director + if error is not None: + self.director.actor_error(self, error) + else: + TCPServer.handle_error(self) + def handle_close(self): if _debug: TCPServerActor._debug("handle_close") @@ -723,6 +758,13 @@ class TCPServerDirector(asyncore.dispatcher, Server, ServiceAccessPoint, DebugCo if self.serviceElement: self.sap_request(delPeer=actor.peer) + def actor_error(self, actor, error): + if _debug: TCPServerDirector._debug("actor_error %r %r", actor, error) + + # tell the ASE the actor had an error + if self.serviceElement: + self.sap_request(actor_error=actor, error=error) + def get_actor(self, address): """ Get the actor associated with an address or None. """ return self.servers.get(address, None) @@ -808,11 +850,6 @@ class StreamToPacket(Client, Server): """Message going upstream.""" if _debug: StreamToPacket._debug("StreamToPacket.confirmation %r", pdu) - # short circuit errors - if isinstance(pdu, IOError): - self.response(pdu) - return - # hack it up into chunks for packet in self.packetize(pdu, self.upstreamBuffer): self.response(packet) @@ -832,7 +869,7 @@ class StreamToPacketSAP(ApplicationServiceElement, ServiceAccessPoint): # save a reference to the StreamToPacket object self.stp = stp - def indication(self, addPeer=None, delPeer=None): + def indication(self, addPeer=None, delPeer=None, actor_error=None, error=None): if _debug: StreamToPacketSAP._debug("indication addPeer=%r delPeer=%r", addPeer, delPeer) if addPeer: diff --git a/samples/TCPClient.py b/samples/TCPClient.py index 6b565ab..2991c86 100644 --- a/samples/TCPClient.py +++ b/samples/TCPClient.py @@ -73,7 +73,7 @@ class MiddleMan(Client, Server): @bacpypes_debugging class MiddleManASE(ApplicationServiceElement): - def indication(self, addPeer=None, delPeer=None): + def indication(self, addPeer=None, delPeer=None, actor_error=None, error=None): """ This function is called by the TCPDirector when the client connects to or disconnects from a server. It is called with addPeer or delPeer diff --git a/samples/TCPServer.py b/samples/TCPServer.py index 7e9d7b0..44e9924 100755 --- a/samples/TCPServer.py +++ b/samples/TCPServer.py @@ -50,7 +50,7 @@ class EchoMaster(Client): @bacpypes_debugging class MiddleManASE(ApplicationServiceElement): - def indication(self, addPeer=None, delPeer=None): + def indication(self, addPeer=None, delPeer=None, actor_error=None, error=None): """ This function is called by the TCPDirector when the client connects to or disconnects from a server. It is called with addPeer or delPeer