From 970ed8f7f0cce4273d1cb2b3aa6f49d6dd1f3a4e Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Thu, 6 Oct 2016 09:42:23 -0400 Subject: [PATCH] new SieveClientController class for other protocol libs (like MODpypes) --- py25/bacpypes/iocb.py | 96 ++++++++++++++++++++++++++++++++++++++++++- py27/bacpypes/iocb.py | 92 ++++++++++++++++++++++++++++++++++++++++- py34/bacpypes/iocb.py | 92 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 276 insertions(+), 4 deletions(-) diff --git a/py25/bacpypes/iocb.py b/py25/bacpypes/iocb.py index 93ba3b5..9ab4c3f 100644 --- a/py25/bacpypes/iocb.py +++ b/py25/bacpypes/iocb.py @@ -15,6 +15,7 @@ from .debugging import bacpypes_debugging, ModuleLogger, DebugContents from .core import deferred from .task import FunctionTask +from .comm import Client # some debugging _debug = 0 @@ -715,7 +716,7 @@ class IOQController(IOController): # if there was an error, abort the request if err: - self.abortIO(iocb, err) + self.abort_io(iocb, err) def process_io(self, iocb): """Figure out how to respond to this request. This must be @@ -843,6 +844,97 @@ class IOQController(IOController): bacpypes_debugging(IOQController) +# +# SieveQueue +# + +class SieveQueue(IOQController): + + def __init__(self, request_fn, address=None): + if _debug: SieveQueue._debug("__init__ %r %r", request_fn, address) + IOQController.__init__(self, str(address)) + + # save a reference to the request function + self.request_fn = request_fn + self.address = address + + def process_io(self, iocb): + if _debug: SieveQueue._debug("process_io %r", iocb) + + # this is now an active request + self.active_io(iocb) + + # send the request + self.request_fn(iocb.args[0]) + +bacpypes_debugging(SieveQueue) + +# +# SieveClientController +# + +class SieveClientController(Client, IOController): + + def __init__(self): + if _debug: SieveClientController._debug("__init__") + Client.__init__(self) + IOController.__init__(self) + + # queues for each address + self.queues = {} + + def process_io(self, iocb): + if _debug: SieveClientController._debug("process_io %r", iocb) + + # get the destination address from the pdu + destination_address = iocb.args[0].pduDestination + if _debug: Controller._debug(" - destination_address: %r", destination_address) + + # look up the queue + queue = self.queues.get(destination_address, None) + if not queue: + queue = SieveQueue(self.request, destination_address) + self.queues[destination_address] = queue + if _debug: Controller._debug(" - queue: %r", queue) + + # ask the queue to process the request + queue.request_io(iocb) + + def request(self, pdu): + if _debug: SieveClientController._debug("request %r", pdu) + + # send it downstream + super(SieveClientController, self).request(pdu) + + def confirmation(self, pdu): + if _debug: SieveClientController._debug("confirmation %r", pdu) + + # get the source address + source_address = pdu.pduSource + if _debug: SieveClientController._debug(" - source_address: %r", source_address) + + # look up the queue + queue = self.queues.get(source_address, None) + if not queue: + SieveClientController._debug("no queue for %r" % (source_address,)) + return + if _debug: SieveClientController._debug(" - queue: %r", queue) + + # make sure it has an active iocb + if not queue.active_iocb: + SieveClientController._debug("no active request for %r" % (source_address,)) + return + + # complete the request + queue.complete_io(queue.active_iocb, pdu) + + # if the queue is empty and idle, forget about the controller + if not queue.ioQueue.queue and not queue.active_iocb: + if _debug: SieveClientController._debug(" - queue is empty") + del self.queues[source_address] + +bacpypes_debugging(SieveClientController) + # # register_controller # @@ -876,4 +968,4 @@ def abort(err): for controller in local_controllers.values(): controller.abort(err) -bacpypes_debugging(abort) \ No newline at end of file +bacpypes_debugging(abort) diff --git a/py27/bacpypes/iocb.py b/py27/bacpypes/iocb.py index 02e152e..7e87e48 100644 --- a/py27/bacpypes/iocb.py +++ b/py27/bacpypes/iocb.py @@ -15,6 +15,7 @@ from .debugging import bacpypes_debugging, ModuleLogger, DebugContents from .core import deferred from .task import FunctionTask +from .comm import Client # some debugging _debug = 0 @@ -710,7 +711,7 @@ class IOQController(IOController): # if there was an error, abort the request if err: - self.abortIO(iocb, err) + self.abort_io(iocb, err) def process_io(self, iocb): """Figure out how to respond to this request. This must be @@ -836,6 +837,95 @@ class IOQController(IOController): # look for more to do IOQController._trigger(self) +# +# SieveQueue +# + +@bacpypes_debugging +class SieveQueue(IOQController): + + def __init__(self, request_fn, address=None): + if _debug: SieveQueue._debug("__init__ %r %r", request_fn, address) + IOQController.__init__(self, str(address)) + + # save a reference to the request function + self.request_fn = request_fn + self.address = address + + def process_io(self, iocb): + if _debug: SieveQueue._debug("process_io %r", iocb) + + # this is now an active request + self.active_io(iocb) + + # send the request + self.request_fn(iocb.args[0]) + +# +# SieveClientController +# + +@bacpypes_debugging +class SieveClientController(Client, IOController): + + def __init__(self): + if _debug: SieveClientController._debug("__init__") + Client.__init__(self) + IOController.__init__(self) + + # queues for each address + self.queues = {} + + def process_io(self, iocb): + if _debug: SieveClientController._debug("process_io %r", iocb) + + # get the destination address from the pdu + destination_address = iocb.args[0].pduDestination + if _debug: Controller._debug(" - destination_address: %r", destination_address) + + # look up the queue + queue = self.queues.get(destination_address, None) + if not queue: + queue = SieveQueue(self.request, destination_address) + self.queues[destination_address] = queue + if _debug: Controller._debug(" - queue: %r", queue) + + # ask the queue to process the request + queue.request_io(iocb) + + def request(self, pdu): + if _debug: SieveClientController._debug("request %r", pdu) + + # send it downstream + super(SieveClientController, self).request(pdu) + + def confirmation(self, pdu): + if _debug: SieveClientController._debug("confirmation %r", pdu) + + # get the source address + source_address = pdu.pduSource + if _debug: SieveClientController._debug(" - source_address: %r", source_address) + + # look up the queue + queue = self.queues.get(source_address, None) + if not queue: + SieveClientController._debug("no queue for %r" % (source_address,)) + return + if _debug: SieveClientController._debug(" - queue: %r", queue) + + # make sure it has an active iocb + if not queue.active_iocb: + SieveClientController._debug("no active request for %r" % (source_address,)) + return + + # complete the request + queue.complete_io(queue.active_iocb, pdu) + + # if the queue is empty and idle, forget about the controller + if not queue.ioQueue.queue and not queue.active_iocb: + if _debug: SieveClientController._debug(" - queue is empty") + del self.queues[source_address] + # # register_controller # diff --git a/py34/bacpypes/iocb.py b/py34/bacpypes/iocb.py index 02e152e..7e87e48 100644 --- a/py34/bacpypes/iocb.py +++ b/py34/bacpypes/iocb.py @@ -15,6 +15,7 @@ from .debugging import bacpypes_debugging, ModuleLogger, DebugContents from .core import deferred from .task import FunctionTask +from .comm import Client # some debugging _debug = 0 @@ -710,7 +711,7 @@ class IOQController(IOController): # if there was an error, abort the request if err: - self.abortIO(iocb, err) + self.abort_io(iocb, err) def process_io(self, iocb): """Figure out how to respond to this request. This must be @@ -836,6 +837,95 @@ class IOQController(IOController): # look for more to do IOQController._trigger(self) +# +# SieveQueue +# + +@bacpypes_debugging +class SieveQueue(IOQController): + + def __init__(self, request_fn, address=None): + if _debug: SieveQueue._debug("__init__ %r %r", request_fn, address) + IOQController.__init__(self, str(address)) + + # save a reference to the request function + self.request_fn = request_fn + self.address = address + + def process_io(self, iocb): + if _debug: SieveQueue._debug("process_io %r", iocb) + + # this is now an active request + self.active_io(iocb) + + # send the request + self.request_fn(iocb.args[0]) + +# +# SieveClientController +# + +@bacpypes_debugging +class SieveClientController(Client, IOController): + + def __init__(self): + if _debug: SieveClientController._debug("__init__") + Client.__init__(self) + IOController.__init__(self) + + # queues for each address + self.queues = {} + + def process_io(self, iocb): + if _debug: SieveClientController._debug("process_io %r", iocb) + + # get the destination address from the pdu + destination_address = iocb.args[0].pduDestination + if _debug: Controller._debug(" - destination_address: %r", destination_address) + + # look up the queue + queue = self.queues.get(destination_address, None) + if not queue: + queue = SieveQueue(self.request, destination_address) + self.queues[destination_address] = queue + if _debug: Controller._debug(" - queue: %r", queue) + + # ask the queue to process the request + queue.request_io(iocb) + + def request(self, pdu): + if _debug: SieveClientController._debug("request %r", pdu) + + # send it downstream + super(SieveClientController, self).request(pdu) + + def confirmation(self, pdu): + if _debug: SieveClientController._debug("confirmation %r", pdu) + + # get the source address + source_address = pdu.pduSource + if _debug: SieveClientController._debug(" - source_address: %r", source_address) + + # look up the queue + queue = self.queues.get(source_address, None) + if not queue: + SieveClientController._debug("no queue for %r" % (source_address,)) + return + if _debug: SieveClientController._debug(" - queue: %r", queue) + + # make sure it has an active iocb + if not queue.active_iocb: + SieveClientController._debug("no active request for %r" % (source_address,)) + return + + # complete the request + queue.complete_io(queue.active_iocb, pdu) + + # if the queue is empty and idle, forget about the controller + if not queue.ioQueue.queue and not queue.active_iocb: + if _debug: SieveClientController._debug(" - queue is empty") + del self.queues[source_address] + # # register_controller #