1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

new SieveClientController class for other protocol libs (like MODpypes)

This commit is contained in:
Joel Bender 2016-10-06 09:42:23 -04:00
parent 2d75e6640e
commit 970ed8f7f0
3 changed files with 276 additions and 4 deletions

View File

@ -15,6 +15,7 @@ from .debugging import bacpypes_debugging, ModuleLogger, DebugContents
from .core import deferred
from .task import FunctionTask
from .comm import Client
# some debugging
_debug = 0
@ -715,7 +716,7 @@ class IOQController(IOController):
# if there was an error, abort the request
if err:
self.abortIO(iocb, err)
self.abort_io(iocb, err)
def process_io(self, iocb):
"""Figure out how to respond to this request. This must be
@ -843,6 +844,97 @@ class IOQController(IOController):
bacpypes_debugging(IOQController)
#
# SieveQueue
#
class SieveQueue(IOQController):
def __init__(self, request_fn, address=None):
if _debug: SieveQueue._debug("__init__ %r %r", request_fn, address)
IOQController.__init__(self, str(address))
# save a reference to the request function
self.request_fn = request_fn
self.address = address
def process_io(self, iocb):
if _debug: SieveQueue._debug("process_io %r", iocb)
# this is now an active request
self.active_io(iocb)
# send the request
self.request_fn(iocb.args[0])
bacpypes_debugging(SieveQueue)
#
# SieveClientController
#
class SieveClientController(Client, IOController):
def __init__(self):
if _debug: SieveClientController._debug("__init__")
Client.__init__(self)
IOController.__init__(self)
# queues for each address
self.queues = {}
def process_io(self, iocb):
if _debug: SieveClientController._debug("process_io %r", iocb)
# get the destination address from the pdu
destination_address = iocb.args[0].pduDestination
if _debug: Controller._debug(" - destination_address: %r", destination_address)
# look up the queue
queue = self.queues.get(destination_address, None)
if not queue:
queue = SieveQueue(self.request, destination_address)
self.queues[destination_address] = queue
if _debug: Controller._debug(" - queue: %r", queue)
# ask the queue to process the request
queue.request_io(iocb)
def request(self, pdu):
if _debug: SieveClientController._debug("request %r", pdu)
# send it downstream
super(SieveClientController, self).request(pdu)
def confirmation(self, pdu):
if _debug: SieveClientController._debug("confirmation %r", pdu)
# get the source address
source_address = pdu.pduSource
if _debug: SieveClientController._debug(" - source_address: %r", source_address)
# look up the queue
queue = self.queues.get(source_address, None)
if not queue:
SieveClientController._debug("no queue for %r" % (source_address,))
return
if _debug: SieveClientController._debug(" - queue: %r", queue)
# make sure it has an active iocb
if not queue.active_iocb:
SieveClientController._debug("no active request for %r" % (source_address,))
return
# complete the request
queue.complete_io(queue.active_iocb, pdu)
# if the queue is empty and idle, forget about the controller
if not queue.ioQueue.queue and not queue.active_iocb:
if _debug: SieveClientController._debug(" - queue is empty")
del self.queues[source_address]
bacpypes_debugging(SieveClientController)
#
# register_controller
#
@ -876,4 +968,4 @@ def abort(err):
for controller in local_controllers.values():
controller.abort(err)
bacpypes_debugging(abort)
bacpypes_debugging(abort)

View File

@ -15,6 +15,7 @@ from .debugging import bacpypes_debugging, ModuleLogger, DebugContents
from .core import deferred
from .task import FunctionTask
from .comm import Client
# some debugging
_debug = 0
@ -710,7 +711,7 @@ class IOQController(IOController):
# if there was an error, abort the request
if err:
self.abortIO(iocb, err)
self.abort_io(iocb, err)
def process_io(self, iocb):
"""Figure out how to respond to this request. This must be
@ -836,6 +837,95 @@ class IOQController(IOController):
# look for more to do
IOQController._trigger(self)
#
# SieveQueue
#
@bacpypes_debugging
class SieveQueue(IOQController):
def __init__(self, request_fn, address=None):
if _debug: SieveQueue._debug("__init__ %r %r", request_fn, address)
IOQController.__init__(self, str(address))
# save a reference to the request function
self.request_fn = request_fn
self.address = address
def process_io(self, iocb):
if _debug: SieveQueue._debug("process_io %r", iocb)
# this is now an active request
self.active_io(iocb)
# send the request
self.request_fn(iocb.args[0])
#
# SieveClientController
#
@bacpypes_debugging
class SieveClientController(Client, IOController):
def __init__(self):
if _debug: SieveClientController._debug("__init__")
Client.__init__(self)
IOController.__init__(self)
# queues for each address
self.queues = {}
def process_io(self, iocb):
if _debug: SieveClientController._debug("process_io %r", iocb)
# get the destination address from the pdu
destination_address = iocb.args[0].pduDestination
if _debug: Controller._debug(" - destination_address: %r", destination_address)
# look up the queue
queue = self.queues.get(destination_address, None)
if not queue:
queue = SieveQueue(self.request, destination_address)
self.queues[destination_address] = queue
if _debug: Controller._debug(" - queue: %r", queue)
# ask the queue to process the request
queue.request_io(iocb)
def request(self, pdu):
if _debug: SieveClientController._debug("request %r", pdu)
# send it downstream
super(SieveClientController, self).request(pdu)
def confirmation(self, pdu):
if _debug: SieveClientController._debug("confirmation %r", pdu)
# get the source address
source_address = pdu.pduSource
if _debug: SieveClientController._debug(" - source_address: %r", source_address)
# look up the queue
queue = self.queues.get(source_address, None)
if not queue:
SieveClientController._debug("no queue for %r" % (source_address,))
return
if _debug: SieveClientController._debug(" - queue: %r", queue)
# make sure it has an active iocb
if not queue.active_iocb:
SieveClientController._debug("no active request for %r" % (source_address,))
return
# complete the request
queue.complete_io(queue.active_iocb, pdu)
# if the queue is empty and idle, forget about the controller
if not queue.ioQueue.queue and not queue.active_iocb:
if _debug: SieveClientController._debug(" - queue is empty")
del self.queues[source_address]
#
# register_controller
#

View File

@ -15,6 +15,7 @@ from .debugging import bacpypes_debugging, ModuleLogger, DebugContents
from .core import deferred
from .task import FunctionTask
from .comm import Client
# some debugging
_debug = 0
@ -710,7 +711,7 @@ class IOQController(IOController):
# if there was an error, abort the request
if err:
self.abortIO(iocb, err)
self.abort_io(iocb, err)
def process_io(self, iocb):
"""Figure out how to respond to this request. This must be
@ -836,6 +837,95 @@ class IOQController(IOController):
# look for more to do
IOQController._trigger(self)
#
# SieveQueue
#
@bacpypes_debugging
class SieveQueue(IOQController):
def __init__(self, request_fn, address=None):
if _debug: SieveQueue._debug("__init__ %r %r", request_fn, address)
IOQController.__init__(self, str(address))
# save a reference to the request function
self.request_fn = request_fn
self.address = address
def process_io(self, iocb):
if _debug: SieveQueue._debug("process_io %r", iocb)
# this is now an active request
self.active_io(iocb)
# send the request
self.request_fn(iocb.args[0])
#
# SieveClientController
#
@bacpypes_debugging
class SieveClientController(Client, IOController):
def __init__(self):
if _debug: SieveClientController._debug("__init__")
Client.__init__(self)
IOController.__init__(self)
# queues for each address
self.queues = {}
def process_io(self, iocb):
if _debug: SieveClientController._debug("process_io %r", iocb)
# get the destination address from the pdu
destination_address = iocb.args[0].pduDestination
if _debug: Controller._debug(" - destination_address: %r", destination_address)
# look up the queue
queue = self.queues.get(destination_address, None)
if not queue:
queue = SieveQueue(self.request, destination_address)
self.queues[destination_address] = queue
if _debug: Controller._debug(" - queue: %r", queue)
# ask the queue to process the request
queue.request_io(iocb)
def request(self, pdu):
if _debug: SieveClientController._debug("request %r", pdu)
# send it downstream
super(SieveClientController, self).request(pdu)
def confirmation(self, pdu):
if _debug: SieveClientController._debug("confirmation %r", pdu)
# get the source address
source_address = pdu.pduSource
if _debug: SieveClientController._debug(" - source_address: %r", source_address)
# look up the queue
queue = self.queues.get(source_address, None)
if not queue:
SieveClientController._debug("no queue for %r" % (source_address,))
return
if _debug: SieveClientController._debug(" - queue: %r", queue)
# make sure it has an active iocb
if not queue.active_iocb:
SieveClientController._debug("no active request for %r" % (source_address,))
return
# complete the request
queue.complete_io(queue.active_iocb, pdu)
# if the queue is empty and idle, forget about the controller
if not queue.ioQueue.queue and not queue.active_iocb:
if _debug: SieveClientController._debug(" - queue is empty")
del self.queues[source_address]
#
# register_controller
#