1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

update timeouts and connection logic

This commit is contained in:
Joel Bender 2017-04-27 01:55:43 -04:00
parent 90a3e62692
commit 91925d0b54

View File

@ -6,6 +6,8 @@ TCP Communications Module
import asyncore
import socket
import errno
import pickle
from time import time as _time, sleep as _sleep
from io import StringIO
@ -23,6 +25,7 @@ _log = ModuleLogger(globals())
# globals
REBIND_SLEEP_INTERVAL = 2.0
CONNECT_TIMEOUT = 30.0
#
# PickleActorMixIn
@ -91,6 +94,8 @@ class PickleActorMixIn:
@bacpypes_debugging
class TCPClient(asyncore.dispatcher):
_connect_timeout = CONNECT_TIMEOUT
def __init__(self, peer):
if _debug: TCPClient._debug("__init__ %r", peer)
asyncore.dispatcher.__init__(self)
@ -98,22 +103,39 @@ class TCPClient(asyncore.dispatcher):
# ask the dispatcher for a socket
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
# set the timeout
self.socket.settimeout(self._connect_timeout)
if _debug: TCPClient._debug(" - timeout: %r", self._connect_timeout)
# save the peer
self.peer = peer
self.connected = False
# create a request buffer
self.request = b''
# try to connect
try:
if _debug: TCPClient._debug(" - initiate connection")
self.connect(peer)
rslt = self.socket.connect_ex(peer)
if (rslt == 0):
if _debug: TCPClient._debug(" - connected")
self.connected = True
elif (rslt == errno.EINPROGRESS):
if _debug: TCPClient._debug(" - in progress")
elif (rslt in (errno.ECONNREFUSED, 111)):
if _debug: TCPClient._debug(" - connection refused")
self.handle_error(rslt)
else:
if _debug: TCPClient._debug(" - connect_ex: %r", rslt)
except socket.error as err:
if _debug: TCPClient._debug(" - connect socket error: %r", err)
# pass along to a handler
self.handle_error(err)
def handle_accept(self):
if _debug: TCPClient._debug("handle_accept")
def handle_connect(self):
if _debug: TCPClient._debug("handle_connect")
@ -127,9 +149,10 @@ class TCPClient(asyncore.dispatcher):
# check for connection refused
if (err == 0):
if _debug: TCPClient._debug(" - no error")
elif (err == 111):
self.connected = True
elif (err in (errno.ECONNREFUSED, 111)):
if _debug: TCPClient._debug(" - connection to %r refused", self.peer)
self.handle_error(socket.error(111, "connection refused"))
self.handle_error(socket.error(errno.ECONNREFUSED, "connection refused"))
return
# pass along
@ -149,11 +172,11 @@ class TCPClient(asyncore.dispatcher):
if not self.socket:
if _debug: TCPClient._debug(" - socket was closed")
else:
# sent the data upstream
# send the data upstream
deferred(self.response, PDU(msg))
except socket.error as err:
if (err.args[0] in (61, 111)):
if (err.args[0] in (errno.ECONNREFUSED, 111)):
if _debug: TCPClient._debug(" - connection to %r refused", self.peer)
else:
if _debug: TCPClient._debug(" - recv socket error: %r", err)
@ -177,7 +200,7 @@ class TCPClient(asyncore.dispatcher):
if (err.args[0] == 32):
if _debug: TCPClient._debug(" - broken pipe to %r", self.peer)
return
elif (err.args[0] in (61, 111)):
elif (err.args[0] in (errno.ECONNREFUSED, 111)):
if _debug: TCPClient._debug(" - connection to %r refused", self.peer)
else:
if _debug: TCPClient._debug(" - send socket error: %s", err)
@ -193,7 +216,11 @@ class TCPClient(asyncore.dispatcher):
if _debug: TCPClient._debug(" - err: %r", err)
# check for connection refused
if (err in (61, 111)):
if err == 0:
if not self.connected:
if _debug: TCPClient._debug(" - connected")
self.connected = True
elif (err in (errno.ECONNREFUSED, 111)):
if _debug: TCPClient._debug(" - connection to %r refused", self.peer)
self.handle_error(socket.error(err, "connection refused"))
self.handle_close()
@ -206,7 +233,7 @@ class TCPClient(asyncore.dispatcher):
if _debug: TCPClient._debug("handle_close")
# close the socket
self.close()
self.socket.close()
# make sure other routines know the socket is closed
self.socket = None
@ -215,6 +242,11 @@ class TCPClient(asyncore.dispatcher):
"""Trap for TCPClient errors, otherwise continue."""
if _debug: TCPClient._debug("handle_error %r", error)
# if there is no socket, it was closed
if not self.socket:
if _debug: TCPClient._debug(" - error already handled")
return
# core does not take parameters
asyncore.dispatcher.handle_error(self)
@ -236,32 +268,51 @@ class TCPClientActor(TCPClient):
def __init__(self, director, peer):
if _debug: TCPClientActor._debug("__init__ %r %r", director, peer)
# no director yet, no connection error
self.director = None
self._connection_error = None
# pass along the connect timeout from the director
if director.connect_timeout is not None:
self._connect_timeout = director.connect_timeout
# continue with initialization
TCPClient.__init__(self, peer)
# keep track of the director
self.director = director
# add a timer
self.timeout = director.timeout
if self.timeout > 0:
self.timer = FunctionTask(self.idle_timeout)
self.timer.install_task(_time() + self.timeout)
self._idle_timeout = director.idle_timeout
if self._idle_timeout:
self.idle_timeout_task = FunctionTask(self.idle_timeout)
self.idle_timeout_task.install_task(_time() + self._idle_timeout)
else:
self.timer = None
self.idle_timeout_task = None
# this may have a flush state
self.flushTask = None
self.flush_task = None
# tell the director this is a new actor
self.director.add_actor(self)
# if there was a connection error, pass it to the director
if self._connection_error:
if _debug: TCPClientActor._debug(" - had connection error")
self.director.actor_error(self, self._connection_error)
def handle_error(self, error=None):
"""Trap for TCPClient errors, otherwise continue."""
if _debug: TCPClientActor._debug("handle_error %r", error)
# pass along to the director
if error is not None:
self.director.actor_error(self, error)
# this error may be during startup
if not self.director:
self._connection_error = error
else:
self.director.actor_error(self, error)
else:
TCPClient.handle_error(self)
@ -269,12 +320,12 @@ class TCPClientActor(TCPClient):
if _debug: TCPClientActor._debug("handle_close")
# if there's a flush task, cancel it
if self.flushTask:
self.flushTask.suspend_task()
if self.flush_task:
self.flush_task.suspend_task()
# cancel the timer
if self.timer:
self.timer.suspend_task()
if self.idle_timeout_task:
self.idle_timeout_task.suspend_task()
# tell the director this is gone
self.director.del_actor(self)
@ -292,13 +343,13 @@ class TCPClientActor(TCPClient):
if _debug: TCPClientActor._debug("indication %r", pdu)
# additional downstream data is tossed while flushing
if self.flushTask:
if self.flush_task:
if _debug: TCPServerActor._debug(" - flushing")
return
# reschedule the timer
if self.timer:
self.timer.install_task(_time() + self.timeout)
if self.idle_timeout_task:
self.idle_timeout_task.install_task(_time() + self._idle_timeout)
# continue as usual
TCPClient.indication(self, pdu)
@ -310,8 +361,8 @@ class TCPClientActor(TCPClient):
pdu.pduSource = self.peer
# reschedule the timer
if self.timer:
self.timer.install_task(_time() + self.timeout)
if self.idle_timeout_task:
self.idle_timeout_task.install_task(_time() + self._idle_timeout)
# process this as a response from the director
self.director.response(pdu)
@ -320,11 +371,11 @@ class TCPClientActor(TCPClient):
if _debug: TCPClientActor._debug("flush")
# clear out the old task
self.flushTask = None
self.flush_task = None
# if the outgoing buffer has data, re-schedule another attempt
if self.request:
self.flushTask = OneShotFunction(self.flush)
self.flush_task = OneShotFunction(self.flush)
return
# close up shop, all done
@ -350,10 +401,13 @@ class TCPPickleClientActor(PickleActorMixIn, TCPClientActor):
@bacpypes_debugging
class TCPClientDirector(Server, ServiceAccessPoint, DebugContents):
_debug_contents = ('timeout', 'actorClass', 'clients', 'reconnect')
_debug_contents = ('connect_timeout', 'idle_timeout', 'actorClass', 'clients', 'reconnect')
def __init__(self, timeout=0, actorClass=TCPClientActor, sid=None, sapID=None):
if _debug: TCPClientDirector._debug("__init__ timeout=%r actorClass=%r sid=%r sapID=%r", timeout, actorClass, sid, sapID)
def __init__(self, connect_timeout=None, idle_timeout=None, actorClass=TCPClientActor, sid=None, sapID=None):
if _debug:
TCPClientDirector._debug("__init__ connect_timeout=%r idle_timeout=%r actorClass=%r sid=%r sapID=%r",
connect_timeout, idle_timeout, actorClass, sid, sapID,
)
Server.__init__(self, sid)
ServiceAccessPoint.__init__(self, sapID)
@ -363,7 +417,8 @@ class TCPClientDirector(Server, ServiceAccessPoint, DebugContents):
self.actorClass = actorClass
# save the timeout for actors
self.timeout = timeout
self.connect_timeout = connect_timeout
self.idle_timeout = idle_timeout
# start with an empty client pool
self.clients = {}
@ -469,7 +524,7 @@ class TCPServer(asyncore.dispatcher):
if _debug: TCPServer._debug("handle_connect")
def readable(self):
return 1
return self.connected
def handle_read(self):
if _debug: TCPServer._debug("handle_read")
@ -512,17 +567,17 @@ class TCPServer(asyncore.dispatcher):
else:
if _debug: TCPServer._debug(" - send socket error: %s", err)
# pass along to a handler
# sent the exception upstream
self.handle_error(err)
def handle_close(self):
if _debug: TCPServer._debug("handle_close")
if not self:
if _debug: TCPServer._warning("handle_close: self is None")
if _debug: TCPServer._debug(" - self is None")
return
if not self.socket:
if _debug: TCPServer._warning("handle_close: socket already closed")
if _debug: TCPServer._debug(" - socket already closed")
return
self.close()
@ -556,15 +611,15 @@ class TCPServerActor(TCPServer):
self.director = director
# add a timer
self.timeout = director.timeout
if self.timeout > 0:
self.timer = FunctionTask(self.idle_timeout)
self.timer.install_task(_time() + self.timeout)
self._idle_timeout = director.idle_timeout
if self._idle_timeout:
self.idle_timeout_task = FunctionTask(self.idle_timeout)
self.idle_timeout_task.install_task(_time() + self._idle_timeout)
else:
self.timer = None
self.idle_timeout_task = None
# this may have a flush state
self.flushTask = None
self.flush_task = None
# tell the director this is a new actor
self.director.add_actor(self)
@ -583,8 +638,8 @@ class TCPServerActor(TCPServer):
if _debug: TCPServerActor._debug("handle_close")
# if there's a flush task, cancel it
if self.flushTask:
self.flushTask.suspend_task()
if self.flush_task:
self.flush_task.suspend_task()
# tell the director this is gone
self.director.del_actor(self)
@ -602,13 +657,13 @@ class TCPServerActor(TCPServer):
if _debug: TCPServerActor._debug("indication %r", pdu)
# additional downstream data is tossed while flushing
if self.flushTask:
if self.flush_task:
if _debug: TCPServerActor._debug(" - flushing")
return
# reschedule the timer
if self.timer:
self.timer.install_task(_time() + self.timeout)
if self.idle_timeout_task:
self.idle_timeout_task.install_task(_time() + self._idle_timeout)
# continue as usual
TCPServer.indication(self, pdu)
@ -617,7 +672,7 @@ class TCPServerActor(TCPServer):
if _debug: TCPServerActor._debug("response %r", pdu)
# upstream data is tossed while flushing
if self.flushTask:
if self.flush_task:
if _debug: TCPServerActor._debug(" - flushing")
return
@ -625,8 +680,8 @@ class TCPServerActor(TCPServer):
pdu.pduSource = self.peer
# reschedule the timer
if self.timer:
self.timer.install_task(_time() + self.timeout)
if self.idle_timeout_task:
self.idle_timeout_task.install_task(_time() + self._idle_timeout)
# process this as a response from the director
self.director.response(pdu)
@ -635,11 +690,11 @@ class TCPServerActor(TCPServer):
if _debug: TCPServerActor._debug("flush")
# clear out the old task
self.flushTask = None
self.flush_task = None
# if the outgoing buffer has data, re-schedule another attempt
if self.request:
self.flushTask = OneShotFunction(self.flush)
self.flush_task = OneShotFunction(self.flush)
return
# close up shop, all done
@ -659,19 +714,19 @@ class TCPPickleServerActor(PickleActorMixIn, TCPServerActor):
@bacpypes_debugging
class TCPServerDirector(asyncore.dispatcher, Server, ServiceAccessPoint, DebugContents):
_debug_contents = ('port', 'timeout', 'actorClass', 'servers')
_debug_contents = ('port', 'idle_timeout', 'actorClass', 'servers')
def __init__(self, address, listeners=5, timeout=0, reuse=False, actorClass=TCPServerActor, cid=None, sapID=None):
def __init__(self, address, listeners=5, idle_timeout=0, reuse=False, actorClass=TCPServerActor, cid=None, sapID=None):
if _debug:
TCPServerDirector._debug("__init__ %r listeners=%r timeout=%r reuse=%r actorClass=%r cid=%r sapID=%r"
, address, listeners, timeout, reuse, actorClass, cid, sapID
TCPServerDirector._debug("__init__ %r listeners=%r idle_timeout=%r reuse=%r actorClass=%r cid=%r sapID=%r"
, address, listeners, idle_timeout, reuse, actorClass, cid, sapID
)
Server.__init__(self, cid)
ServiceAccessPoint.__init__(self, sapID)
# save the address and timeout
self.port = address
self.timeout = timeout
self.idle_timeout = idle_timeout
# check the actor class
if not issubclass(actorClass, TCPServerActor):