1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

many changes

This commit is contained in:
Joel Bender 2016-08-30 02:25:32 -04:00
parent 1fa3fd1c76
commit 1a9f06fd51
7 changed files with 1158 additions and 186 deletions

View File

@ -30,6 +30,7 @@ from . import comm
from . import task
from . import singleton
from . import capability
from . import iocb
#
# Link Layer Modules

View File

@ -8,6 +8,7 @@ import warnings
from .debugging import bacpypes_debugging, DebugContents, ModuleLogger
from .comm import ApplicationServiceElement, bind
from .iocb import IOQController, IOCB
from .pdu import Address
@ -183,112 +184,25 @@ class DeviceInfoCache:
del self.cache[cache_address]
#
# CurrentDateProperty
#
class CurrentDateProperty(Property):
def __init__(self, identifier):
Property.__init__(self, identifier, Date, default=None, optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
# access an array
if arrayIndex is not None:
raise TypeError("{0} is unsubscriptable".format(self.identifier))
# get the value
now = Date()
now.now()
return now.value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None):
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# CurrentTimeProperty
#
class CurrentTimeProperty(Property):
def __init__(self, identifier):
Property.__init__(self, identifier, Time, default=None, optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
# access an array
if arrayIndex is not None:
raise TypeError("{0} is unsubscriptable".format(self.identifier))
# get the value
now = Time()
now.now()
return now.value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None):
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# LocalDeviceObject
# ApplicationController
#
@bacpypes_debugging
class LocalDeviceObject(DeviceObject):
class ApplicationController(IOQController):
properties = \
[ CurrentTimeProperty('localTime')
, CurrentDateProperty('localDate')
]
def __init__(self, request_fn, address):
if _debug: ApplicationController._debug("__init__ %r %r", request_fn, address)
IOQController.__init__(self, name=str(address))
defaultProperties = \
{ 'maxApduLengthAccepted': 1024
, 'segmentationSupported': 'segmentedBoth'
, 'maxSegmentsAccepted': 16
, 'apduSegmentTimeout': 5000
, 'apduTimeout': 3000
, 'numberOfApduRetries': 3
}
# save a reference to the request function
self.request_fn = request_fn
def __init__(self, **kwargs):
if _debug: LocalDeviceObject._debug("__init__ %r", kwargs)
def process_io(self, iocb):
if _debug: ApplicationController._debug("process_io %r", iocb)
"""Called by a client to start processing a request."""
# fill in default property values not in kwargs
for attr, value in LocalDeviceObject.defaultProperties.items():
if attr not in kwargs:
kwargs[attr] = value
# check for registration
if self.__class__ not in registered_object_types.values():
if 'vendorIdentifier' not in kwargs:
raise RuntimeError("vendorIdentifier required to auto-register the LocalDeviceObject class")
register_object_type(self.__class__, vendor_id=kwargs['vendorIdentifier'])
# check for local time
if 'localDate' in kwargs:
raise RuntimeError("localDate is provided by LocalDeviceObject and cannot be overridden")
if 'localTime' in kwargs:
raise RuntimeError("localTime is provided by LocalDeviceObject and cannot be overridden")
# check for a minimum value
if kwargs['maxApduLengthAccepted'] < 50:
raise ValueError("invalid max APDU length accepted")
# dump the updated attributes
if _debug: LocalDeviceObject._debug(" - updated kwargs: %r", kwargs)
# proceed as usual
DeviceObject.__init__(self, **kwargs)
# create a default implementation of an object list for local devices.
# If it is specified in the kwargs, that overrides this default.
if ('objectList' not in kwargs):
self.objectList = ArrayOf(ObjectIdentifier)([self.objectIdentifier])
# if the object has a property list and one wasn't provided
# in the kwargs, then it was created by default and the objectList
# property should be included
if ('propertyList' not in kwargs) and self.propertyList:
# make sure it's not already there
if 'objectList' not in self.propertyList:
self.propertyList.append('objectList')
# send the request
self.request_fn(iocb.args[0])
#
# Application
@ -333,6 +247,9 @@ class Application(ApplicationServiceElement, Collector):
# use the provided cache or make a default one
self.deviceInfoCache = deviceInfoCache or DeviceInfoCache()
# controllers for managing confirmed requests as a client
self.controllers = {}
def add_object(self, obj):
"""Add an object to the local collection."""
if _debug: Application._debug("add_object %r", obj)
@ -426,6 +343,51 @@ class Application(ApplicationServiceElement, Collector):
#-----
def request(self, apdu):
if _debug: Application._debug("request %r", apdu)
if isinstance(apdu, UnconfirmedRequestPDU):
iocb = None
elif isinstance(apdu, ConfirmedRequestPDU):
iocb = IOCB(apdu)
if _debug: Application._debug(" - iocb: %r", iocb)
# get the controller for this destination
controller = self.controllers.get(apdu.pduDestination, None)
if not controller:
if _debug: Application._debug(" - new controller")
controller = ApplicationController(
super(Application, self).request,
apdu.pduDestination,
)
# keep track of the controller
self.controllers[apdu.pduDestination] = controller
# request this apdu
controller.request_io(iocb)
# return the iocb if one was created
return iocb
def confirmation(self, apdu):
if _debug: Application._debug("confirmation %r", apdu)
# get the queue for this destination
controller = self.controllers.get(apdu.pduSource, None)
if not controller:
if _debug: Application._debug(" - no queue for this source")
return
# this request is complete
controller.complete_io(controller.active_iocb, apdu)
# if the queue is empty, forget about the controller
if not controller.ioQueue.queue:
if _debug: Application._debug(" - controller queue is empty")
del self.controllers[apdu.pduSource]
def indication(self, apdu):
if _debug: Application._debug("indication %r", apdu)

860
py27/bacpypes/iocb.py Normal file
View File

@ -0,0 +1,860 @@
#!/usr/bin/python
"""
IOCB Module
"""
import sys
import logging
from time import time as _time
import threading
from bisect import bisect_left
from .debugging import bacpypes_debugging, ModuleLogger, DebugContents
from .core import deferred
from .task import FunctionTask
# some debugging
_debug = 0
_log = ModuleLogger(globals())
_statelog = logging.getLogger(__name__ + "._statelog")
# globals
_localControllers = {}
#
# IOCB States
#
IDLE = 0 # has not been submitted
PENDING = 1 # queued, waiting for processing
ACTIVE = 2 # being processed
COMPLETED = 3 # finished
ABORTED = 4 # finished in a bad way
_stateNames = {
0: 'IDLE',
1: 'PENDING',
2: 'ACTIVE',
3: 'COMPLETED',
4: 'ABORTED',
}
#
# IOQController States
#
CTRL_IDLE = 0 # nothing happening
CTRL_ACTIVE = 1 # working on an iocb
CTRL_WAITING = 1 # waiting between iocb requests (throttled)
_ctrlStateNames = {
0: 'IDLE',
1: 'ACTIVE',
2: 'WAITING',
}
# special abort error
TimeoutError = RuntimeError("timeout")
# current time formatting (short version)
_strftime = lambda: "%011.6f" % (_time() % 3600,)
#
# IOCB - Input Output Control Block
#
_identNext = 1
_identLock = threading.Lock()
@bacpypes_debugging
class IOCB(DebugContents):
_debugContents = \
( 'args', 'kwargs'
, 'ioState', 'ioResponse-', 'ioError'
, 'ioController', 'ioServerRef', 'ioControllerRef', 'ioClientID', 'ioClientAddr'
, 'ioComplete', 'ioCallback+', 'ioQueue', 'ioPriority', 'ioTimeout'
)
def __init__(self, *args, **kwargs):
global _identNext
# lock the identity sequence number
_identLock.acquire()
# generate a unique identity for this block
ioID = _identNext
_identNext += 1
# release the lock
_identLock.release()
# debugging postponed until ID acquired
if _debug: IOCB._debug("__init__(%d) %r %r", ioID, args, kwargs)
# save the ID
self.ioID = ioID
# save the request parameters
self.args = args
self.kwargs = kwargs
# start with an idle request
self.ioState = IDLE
self.ioResponse = None
self.ioError = None
# blocks are bound to a controller
self.ioController = None
# each block gets a completion event
self.ioComplete = threading.Event()
self.ioComplete.clear()
# applications can set a callback functions
self.ioCallback = []
# request is not currently queued
self.ioQueue = None
# extract the priority if it was given
self.ioPriority = kwargs.get('_priority', 0)
if '_priority' in kwargs:
if _debug: IOCB._debug(" - ioPriority: %r", self.ioPriority)
del kwargs['_priority']
# request has no timeout
self.ioTimeout = None
def add_callback(self, fn, *args, **kwargs):
"""Pass a function to be called when IO is complete."""
if _debug: IOCB._debug("add_callback(%d) %r %r %r", self.ioID, fn, args, kwargs)
# store it
self.ioCallback.append((fn, args, kwargs))
# already complete?
if self.ioComplete.isSet():
self.Trigger()
def wait(self, *args):
"""Wait for the completion event to be set."""
if _debug: IOCB._debug("wait(%d) %r", self.ioID, args)
# waiting from a non-daemon thread could be trouble
self.ioComplete.wait(*args)
def trigger(self):
"""Set the completion event and make the callback(s)."""
if _debug: IOCB._debug("trigger(%d)", self.ioID)
# if it's queued, remove it from its queue
if self.ioQueue:
if _debug: IOCB._debug(" - dequeue")
self.ioQueue.Remove(self)
# if there's a timer, cancel it
if self.ioTimeout:
if _debug: IOCB._debug(" - cancel timeout")
self.ioTimeout.SuspendTask()
# set the completion event
self.ioComplete.set()
# make the callback(s)
for fn, args, kwargs in self.ioCallback:
if _debug: IOCB._debug(" - callback fn: %r %r %r", fn, args, kwargs)
fn(self, *args, **kwargs)
def complete(self, msg):
"""Called to complete a transaction, usually when ProcessIO has
shipped the IOCB off to some other thread or function."""
if _debug: IOCB._debug("complete(%d) %r", self.ioID, msg)
if self.ioController:
# pass to controller
self.ioController.complete_io(self, msg)
else:
# just fill in the data
self.ioState = COMPLETED
self.ioResponse = msg
self.Trigger()
def abort(self, err):
"""Called by a client to abort a transaction."""
if _debug: IOCB._debug("abort(%d) %r", self.ioID, err)
if self.ioController:
# pass to controller
self.ioController.abort_io(self, err)
elif self.ioState < COMPLETED:
# just fill in the data
self.ioState = ABORTED
self.ioError = err
self.Trigger()
def set_timeout(self, delay, err=TimeoutError):
"""Called to set a transaction timer."""
if _debug: IOCB._debug("set_timeout(%d) %r err=%r", self.ioID, delay, err)
# if one has already been created, cancel it
if self.ioTimeout:
self.ioTimeout.suspend_task()
else:
self.ioTimeout = FunctionTask(self.Abort, err)
# (re)schedule it
self.ioTimeout.install_task(_time() + delay)
def __repr__(self):
xid = id(self)
if (xid < 0): xid += (1 << 32)
sname = self.__module__ + '.' + self.__class__.__name__
desc = "(%d)" % (self.ioID)
return '<' + sname + desc + ' instance at 0x%08x' % (xid,) + '>'
#
# IOChainMixIn
#
@bacpypes_debugging
class IOChainMixIn(DebugContents):
_debugContents = ( 'ioChain++', )
def __init__(self, iocb):
if _debug: IOChainMixIn._debug("__init__ %r", iocb)
# save a refence back to the iocb
self.ioChain = iocb
# set the callback to follow the chain
self.add_callback(self.chain_callback)
# if we're not chained, there's no notification to do
if not self.ioChain:
return
# this object becomes its controller
iocb.ioController = self
# consider the parent active
iocb.ioState = ACTIVE
try:
if _debug: IOChainMixIn._debug(" - encoding")
# let the derived class set the args and kwargs
self.encode()
if _debug: IOChainMixIn._debug(" - encode complete")
except:
# extract the error and abort the request
err = sys.exc_info()[1]
if _debug: IOChainMixIn._exception(" - encoding exception: %r", err)
iocb.abort(err)
def chain_callback(self, iocb):
"""Callback when this iocb completes."""
if _debug: IOChainMixIn._debug("chain_callback %r", iocb)
# if we're not chained, there's no notification to do
if not self.ioChain:
return
# refer to the chained iocb
iocb = self.ioChain
try:
if _debug: IOChainMixIn._debug(" - decoding")
# let the derived class transform the data
self.decode()
if _debug: IOChainMixIn._debug(" - decode complete")
except:
# extract the error and abort
err = sys.exc_info()[1]
if _debug: IOChainMixIn._exception(" - decoding exception: %r", err)
iocb.ioState = ABORTED
iocb.ioError = err
# break the references
self.ioChain = None
iocb.ioController = None
# notify the client
iocb.trigger()
def abort_io(self, iocb, err):
"""Forward the abort downstream."""
if _debug: IOChainMixIn._debug("abort_io %r %r", iocb, err)
# make sure we're being notified of an abort request from
# the iocb we are chained from
if iocb is not self.ioChain:
raise RuntimeError("broken chain")
# call my own Abort(), which may forward it to a controller or
# be overridden by IOGroup
self.abort(err)
def encode(self):
"""Hook to transform the request, called when this IOCB is
chained."""
if _debug: IOChainMixIn._debug("encode")
# by default do nothing, the arguments have already been supplied
def decode(self):
"""Hook to transform the response, called when this IOCB is
completed."""
if _debug: IOChainMixIn._debug("decode")
# refer to the chained iocb
iocb = self.ioChain
# if this has completed successfully, pass it up
if self.ioState == COMPLETED:
if _debug: IOChainMixIn._debug(" - completed: %r", self.ioResponse)
# change the state and transform the content
iocb.ioState = COMPLETED
iocb.ioResponse = self.ioResponse
# if this aborted, pass that up too
elif self.ioState == ABORTED:
if _debug: IOChainMixIn._debug(" - aborted: %r", self.ioError)
# change the state
iocb.ioState = ABORTED
iocb.ioError = self.ioError
else:
raise RuntimeError("invalid state: %d" % (self.ioState,))
#
# IOChain
#
@bacpypes_debugging
class IOChain(IOCB, IOChainMixIn):
def __init__(self, chain, *args, **kwargs):
"""Initialize a chained control block."""
if _debug: IOChain._debug("__init__ %r %r %r", chain, args, kwargs)
# initialize IOCB part to pick up the ioID
IOCB.__init__(self, *args, **kwargs)
IOChainMixIn.__init__(self, chain)
#
# IOGroup
#
@bacpypes_debugging
class IOGroup(IOCB, DebugContents):
_debugContents = ('ioMembers',)
def __init__(self):
"""Initialize a group."""
if _debug: IOGroup._debug("__init__")
IOCB.__init__(self)
# start with an empty list of members
self.ioMembers = []
# start out being done. When an IOCB is added to the
# group that is not already completed, this state will
# change to PENDING.
self.ioState = COMPLETED
self.ioComplete.set()
def add(self, iocb):
"""Add an IOCB to the group, you can also add other groups."""
if _debug: IOGroup._debug("add %r", iocb)
# add this to our members
self.ioMembers.append(iocb)
# assume all of our members have not completed yet
self.ioState = PENDING
self.ioComplete.clear()
# when this completes, call back to the group. If this
# has already completed, it will trigger
iocb.add_callback(self.group_callback)
def group_callback(self, iocb):
"""Callback when a child iocb completes."""
if _debug: IOGroup._debug("group_callback %r", iocb)
# check all the members
for iocb in self.ioMembers:
if not iocb.ioComplete.isSet():
if _debug: IOGroup._debug(" - waiting for child: %r", iocb)
break
else:
if _debug: IOGroup._debug(" - all children complete")
# everything complete
self.ioState = COMPLETED
self.trigger()
def abort(self, err):
"""Called by a client to abort all of the member transactions.
When the last pending member is aborted the group callback
function will be called."""
if _debug: IOGroup._debug("abort %r", err)
# change the state to reflect that it was killed
self.ioState = ABORTED
self.ioError = err
# abort all the members
for iocb in self.ioMembers:
iocb.abort(err)
# notify the client
self.trigger()
#
# IOQueue
#
@bacpypes_debugging
class IOQueue:
def __init__(self, name=None):
if _debug: IOQueue._debug("__init__ %r", name)
self.notempty = threading.Event()
self.notempty.clear()
self.queue = []
def put(self, iocb):
"""Add an IOCB to a queue. This is usually called by the function
that filters requests and passes them out to the correct processing
thread."""
if _debug: IOQueue._debug("put %r", iocb)
# requests should be pending before being queued
if iocb.ioState != PENDING:
raise RuntimeError("invalid state transition")
# save that it might have been empty
wasempty = not self.notempty.isSet()
# add the request to the end of the list of iocb's at same priority
priority = iocb.ioPriority
item = (priority, iocb)
self.queue.insert(bisect_left(self.queue, (priority+1,)), item)
# point the iocb back to this queue
iocb.ioQueue = self
# set the event, queue is no longer empty
self.notempty.set()
return wasempty
def get(self, block=1, delay=None):
"""Get a request from a queue, optionally block until a request
is available."""
if _debug: IOQueue._debug("get block=%r delay=%r", block, delay)
# if the queue is empty and we do not block return None
if not block and not self.notempty.isSet():
return None
# wait for something to be in the queue
if delay:
self.notempty.wait(delay)
if not self.notempty.isSet():
return None
else:
self.notempty.wait()
# extract the first element
priority, iocb = self.queue[0]
del self.queue[0]
iocb.ioQueue = None
# if the queue is empty, clear the event
qlen = len(self.queue)
if not qlen:
self.notempty.clear()
# return the request
return iocb
def remove(self, iocb):
"""Remove a control block from the queue, called if the request
is canceled/aborted."""
if _debug: IOQueue._debug("remove %r", iocb)
# remove the request from the queue
for i, item in enumerate(self.queue):
if iocb is item[1]:
if _debug: IOQueue._debug(" - found at %d", i)
del self.queue[i]
# if the queue is empty, clear the event
qlen = len(self.queue)
if not qlen:
self.notempty.clear()
# record the new length
# self.queuesize.Record( qlen, _time() )
break
else:
if _debug: IOQueue._debug(" - not found")
def abort(self, err):
"""Abort all of the control blocks in the queue."""
if _debug: IOQueue._debug("abort %r", err)
# send aborts to all of the members
try:
for iocb in self.queue:
iocb.ioQueue = None
iocb.abort(err)
# flush the queue
self.queue = []
# the queue is now empty, clear the event
self.notempty.clear()
except ValueError:
pass
#
# IOController
#
@bacpypes_debugging
class IOController:
def __init__(self, name=None):
"""Initialize a controller."""
if _debug: IOController._debug("__init__ name=%r", name)
# save the name
self.name = name
# register the name
if name is not None:
if name in _localControllers:
raise RuntimeError("already a local controller called '%s': %r" % (name, _localControllers[name]))
_localControllers[name] = self
def abort(self, err):
"""Abort all requests, no default implementation."""
pass
def request_io(self, iocb):
"""Called by a client to start processing a request."""
if _debug: IOController._debug("request_io %r", iocb)
# bind the iocb to this controller
iocb.ioController = self
try:
# hopefully there won't be an error
err = None
# change the state
iocb.ioState = PENDING
# let derived class figure out how to process this
self.process_io(iocb)
except:
# extract the error
err = sys.exc_info()[1]
# if there was an error, abort the request
if err:
self.abort_io(iocb, err)
def process_io(self, iocb):
"""Figure out how to respond to this request. This must be
provided by the derived class."""
raise NotImplementedError("IOController must implement process_io()")
def active_io(self, iocb):
"""Called by a handler to notify the controller that a request is
being processed."""
if _debug: IOController._debug("active_io %r", iocb)
# requests should be idle or pending before coming active
if (iocb.ioState != IDLE) and (iocb.ioState != PENDING):
raise RuntimeError("invalid state transition (currently %d)" % (iocb.ioState,))
# change the state
iocb.ioState = ACTIVE
def complete_io(self, iocb, msg):
"""Called by a handler to return data to the client."""
if _debug: IOController._debug("complete_io %r %r", iocb, msg)
# if it completed, leave it alone
if iocb.ioState == COMPLETED:
pass
# if it already aborted, leave it alone
elif iocb.ioState == ABORTED:
pass
else:
# change the state
iocb.ioState = COMPLETED
iocb.ioResponse = msg
# notify the client
iocb.Trigger()
def abort_io(self, iocb, err):
"""Called by a handler or a client to abort a transaction."""
if _debug: IOController._debug("abort_io %r %r", iocb, err)
# if it completed, leave it alone
if iocb.ioState == COMPLETED:
pass
# if it already aborted, leave it alone
elif iocb.ioState == ABORTED:
pass
else:
# change the state
iocb.ioState = ABORTED
iocb.ioError = err
# notify the client
iocb.Trigger()
#
# IOQController
#
@bacpypes_debugging
class IOQController(IOController):
wait_time = 0.0
def __init__(self, name=None):
"""Initialize a queue controller."""
if _debug: IOQController._debug("__init__ name=%r", name)
# give ourselves a nice name
if not name:
name = self.__class__.__name__
IOController.__init__(self, name)
# start idle
self.state = CTRL_IDLE
_statelog.debug("%s %s %s" % (_strftime(), self.name, "idle"))
# no active iocb
self.active_iocb = None
# create an IOQueue for iocb's requested when not idle
self.ioQueue = IOQueue(str(name) + " queue")
def abort(self, err):
"""Abort all pending requests."""
if _debug: IOQController._debug("abort %r", err)
if (self.state == CTRL_IDLE):
if _debug: IOQController._debug(" - idle")
return
while True:
iocb = self.ioQueue.get()
if not iocb:
break
if _debug: IOQController._debug(" - iocb: %r", iocb)
# change the state
iocb.ioState = ABORTED
iocb.ioError = err
# notify the client
iocb.trigger()
if (self.state != CTRL_IDLE):
if _debug: IOQController._debug(" - busy after aborts")
def request_io(self, iocb):
"""Called by a client to start processing a request."""
if _debug: IOQController._debug("request_io %r", iocb)
# bind the iocb to this controller
iocb.ioController = self
# if we're busy, queue it
if (self.state != CTRL_IDLE):
if _debug: IOQController._debug(" - busy, request queued")
iocb.ioState = PENDING
self.ioQueue.put(iocb)
return
try:
# hopefully there won't be an error
err = None
# let derived class figure out how to process this
self.process_io(iocb)
except:
# extract the error
err = sys.exc_info()[1]
# if there was an error, abort the request
if err:
self.abortIO(iocb, err)
def process_io(self, iocb):
"""Figure out how to respond to this request. This must be
provided by the derived class."""
raise NotImplementedError("IOController must implement process_io()")
def active_io(self, iocb):
"""Called by a handler to notify the controller that a request is
being processed."""
if _debug: IOQController._debug("active_io %r", iocb)
# base class work first, setting iocb state and timer data
IOController.active_io(self, iocb)
# change our state
self.state = CTRL_ACTIVE
_statelog.debug("%s %s %s" % (_strftime(), self.name, "active"))
# keep track of the iocb
self.active_iocb = iocb
def complete_io(self, iocb, msg):
"""Called by a handler to return data to the client."""
if _debug: IOQController._debug("complete_io %r %r", iocb, msg)
# check to see if it is completing the active one
if iocb is not self.active_iocb:
raise RuntimeError("not the current iocb")
# normal completion
IOController.complete_io(self, iocb, msg)
# no longer an active iocb
self.active_iocb = None
# check to see if we should wait a bit
if self.wait_time:
# change our state
self.state = CTRL_WAITING
_statelog.debug("%s %s %s" % (_strftime(), self.name, "waiting"))
# schedule a call in the future
task = FunctionTask(IOQController._wait_trigger, self)
task.install_task(_time() + self.wait_time)
else:
# change our state
self.state = CTRL_IDLE
_statelog.debug("%s %s %s" % (_strftime(), self.name, "idle"))
# look for more to do
deferred(IOQController._trigger, self)
def abort_io(self, iocb, err):
"""Called by a handler or a client to abort a transaction."""
if _debug: IOQController._debug("abort_io %r %r", iocb, err)
# normal abort
IOController.abort_io(self, iocb, err)
# check to see if it is completing the active one
if iocb is not self.active_iocb:
if _debug: IOQController._debug(" - not current iocb")
return
# no longer an active iocb
self.active_iocb = None
# change our state
self.state = CTRL_IDLE
_statelog.debug("%s %s %s" % (_strftime(), self.name, "idle"))
# look for more to do
deferred(IOQController._trigger, self)
def _trigger(self):
"""Called to launch the next request in the queue."""
if _debug: IOQController._debug("_trigger")
# if we are busy, do nothing
if self.state != CTRL_IDLE:
if _debug: IOQController._debug(" - not idle")
return
# if there is nothing to do, return
if not self.ioQueue.queue:
if _debug: IOQController._debug(" - empty queue")
return
# get the next iocb
iocb = self.ioQueue.get()
try:
# hopefully there won't be an error
err = None
# let derived class figure out how to process this
self.process_io(iocb)
except:
# extract the error
err = sys.exc_info()[1]
# if there was an error, abort the request
if err:
self.abort_io(iocb, err)
# if we're idle, call again
if self.state == CTRL_IDLE:
deferred(IOQController._trigger, self)
def _wait_trigger(self):
"""Called to launch the next request in the queue."""
if _debug: IOQController._debug("_wait_trigger")
# make sure we are waiting
if (self.state != CTRL_WAITING):
raise RuntimeError("not waiting")
# change our state
self.state = CTRL_IDLE
_statelog.debug("%s %s %s" % (_strftime(), self.name, "idle"))
# look for more to do
IOQController._trigger(self)
#
# abort
#
@bacpypes_debugging
def abort(err):
"""Abort everything, everywhere."""
if _debug: abort._debug("abort %r", err)
# tell all the local controllers to abort
for controller in _localControllers.values():
controller.abort(err)

View File

@ -440,6 +440,55 @@ class Object(object):
return prop.WriteProperty(self, value, direct=True)
def add_property(self, prop):
"""Adding a property disconnects it from the collection of properties
common to all of the objects of its class."""
if _debug: Object._debug("add_property %r", prop)
# get the property
prop = self._properties.get(prop.identifier)
if prop:
raise PropertyError(prop.identifier)
# make a copy of the properties dictionary
self._properties = copy(self._properties)
# save the property reference and default value (usually None)
self._properties[prop.identifier] = prop
self._values[prop.identifier] = prop.default
# tell the object it has a new property
if 'propertyList' in self._values:
property_list = self.propertyList
if prop.identifier not in property_list:
if _debug: Object._debug(" - adding to property list")
property_list.append(prop.identifier)
def delete_property(self, prop):
"""Deleting a property disconnects it from the collection of properties
common to all of the objects of its class."""
if _debug: Object._debug("delete_property %r", value)
# get the property
prop = self._properties.get(prop.identifier)
if not prop:
raise PropertyError(prop.identifier)
# make a copy of the properties dictionary
self._properties = copy(self._properties)
# delete the property from the dictionary and values
del self._properties[prop.identifier]
if prop.identifier in self._values:
del self._values[prop.identifier]
# remove the property identifier from its list of know properties
if 'propertyList' in self._values:
property_list = self.propertyList
if prop.identifier in property_list:
if _debug: Object._debug(" - removing from property list")
property_list.remove(prop.identifier)
def ReadProperty(self, propid, arrayIndex=None):
if _debug: Object._debug("ReadProperty %r arrayIndex=%r", propid, arrayIndex)

View File

@ -15,7 +15,6 @@ from ..apdu import ConfirmedCOVNotificationRequest, \
SimpleAckPDU, Error, RejectPDU, AbortPDU
from ..errors import ExecutionError
from ..app import LocalDeviceObject
from ..object import Object, Property, PropertyError, \
AccessDoorObject, AccessPointObject, \
AnalogInputObject, AnalogOutputObject, AnalogValueObject, \
@ -567,6 +566,72 @@ class PulseConverterObjectCOV(COVObjectMixin, PulseConverterCriteria, PulseConve
pass
#
# ActiveCOVSubscriptions
#
@bacpypes_debugging
class ActiveCOVSubscriptions(Property):
def __init__(self):
Property.__init__(
self, 'activeCovSubscriptions', SequenceOf(COVSubscription),
default=None, optional=True, mutable=False,
)
def ReadProperty(self, obj, arrayIndex=None):
if _debug: ActiveCOVSubscriptions._debug("ReadProperty %s arrayIndex=%r", obj, arrayIndex)
# get the current time from the task manager
current_time = TaskManager().get_time()
if _debug: ActiveCOVSubscriptions._debug(" - current_time: %r", current_time)
# start with an empty sequence
cov_subscriptions = SequenceOf(COVSubscription)()
# the obj is a DeviceObject with a reference to the application
for cov in obj._app.active_cov_subscriptions:
# calculate time remaining
if not cov.lifetime:
time_remaining = 0
else:
time_remaining = int(cov.taskTime - current_time)
# make sure it is at least one second
if not time_remaining:
time_remaining = 1
recipient_process = RecipientProcess(
recipient=Recipient(
address=DeviceAddress(
networkNumber=cov.client_addr.addrNet or 0,
macAddress=cov.client_addr.addrAddr,
),
),
processIdentifier=cov.proc_id,
)
cov_subscription = COVSubscription(
recipient=recipient_process,
monitoredPropertyReference=ObjectPropertyReference(
objectIdentifier=cov.obj_id,
propertyIdentifier=cov.obj_ref._monitored_property_reference,
),
issueConfirmedNotifications=cov.confirmed,
timeRemaining=time_remaining,
# covIncrement=???,
)
if _debug: ActiveCOVSubscriptions._debug(" - cov_subscription: %r", cov_subscription)
# add the list
cov_subscriptions.append(cov_subscription)
return cov_subscriptions
def WriteProperty(self, obj, value, arrayIndex=None, priority=None):
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# ChangeOfValueServices
#
@ -584,6 +649,11 @@ class ChangeOfValueServices(Capability):
# a queue of confirmed notifications by client address
self.confirmed_notifications_queue = defaultdict(list)
# if there is a local device object, make sure it has an active COV
# subscriptions property
if self.localDevice and self.localDevice.activeCovSubscriptions is None:
self.localDevice.add_propert(ActiveCOVSubscriptions)
def cov_notification(self, cov, request):
if _debug: ChangeOfValueServices._debug("cov_notification %s %s", str(cov), str(request))
@ -719,83 +789,3 @@ class ChangeOfValueServices(Capability):
# return the result
self.response(response)
#
# ActiveCOVSubscriptions
#
@bacpypes_debugging
class ActiveCOVSubscriptions(Property):
def __init__(self, identifier):
Property.__init__(
self, identifier, SequenceOf(COVSubscription),
default=None, optional=True, mutable=False,
)
def ReadProperty(self, obj, arrayIndex=None):
if _debug: ActiveCOVSubscriptions._debug("ReadProperty %s arrayIndex=%r", obj, arrayIndex)
# get the current time from the task manager
current_time = TaskManager().get_time()
if _debug: ActiveCOVSubscriptions._debug(" - current_time: %r", current_time)
# start with an empty sequence
cov_subscriptions = SequenceOf(COVSubscription)()
# the obj is a DeviceObject with a reference to the application
for cov in obj._app.active_cov_subscriptions:
# calculate time remaining
if not cov.lifetime:
time_remaining = 0
else:
time_remaining = int(cov.taskTime - current_time)
# make sure it is at least one second
if not time_remaining:
time_remaining = 1
recipient_process = RecipientProcess(
recipient=Recipient(
address=DeviceAddress(
networkNumber=cov.client_addr.addrNet or 0,
macAddress=cov.client_addr.addrAddr,
),
),
processIdentifier=cov.proc_id,
)
cov_subscription = COVSubscription(
recipient=recipient_process,
monitoredPropertyReference=ObjectPropertyReference(
objectIdentifier=cov.obj_id,
propertyIdentifier=cov.obj_ref._monitored_property_reference,
),
issueConfirmedNotifications=cov.confirmed,
timeRemaining=time_remaining,
# covIncrement=???,
)
if _debug: ActiveCOVSubscriptions._debug(" - cov_subscription: %r", cov_subscription)
# add the list
cov_subscriptions.append(cov_subscription)
return cov_subscriptions
def WriteProperty(self, obj, value, arrayIndex=None, priority=None):
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# COVDeviceObject
#
@bacpypes_debugging
class COVDeviceMixin(object):
properties = [
ActiveCOVSubscriptions('activeCovSubscriptions'),
]
class LocalDeviceObjectCOV(COVDeviceMixin, LocalDeviceObject):
pass

View File

@ -5,8 +5,8 @@ from ..capability import Capability
from ..pdu import GlobalBroadcast
from ..basetypes import ErrorType
from ..primitivedata import Atomic, Null, Unsigned
from ..constructeddata import Any, Array
from ..primitivedata import Atomic, Null, Unsigned, Date, Time, ObjectIdentifier
from ..constructeddata import Any, Array, ArrayOf
from ..apdu import Error, WhoIsRequest, IAmRequest, \
SimpleAckPDU, ReadPropertyACK, ReadPropertyMultipleACK, \
@ -14,12 +14,121 @@ from ..apdu import Error, WhoIsRequest, IAmRequest, \
from ..errors import ExecutionError, InconsistentParameters, \
MissingRequiredParameter, ParameterOutOfRange
from ..object import PropertyError
from ..object import register_object_type, registered_object_types, \
Property, PropertyError, DeviceObject, registered_object_types
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# CurrentDateProperty
#
class CurrentDateProperty(Property):
def __init__(self, identifier):
Property.__init__(self, identifier, Date, default=None, optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
# access an array
if arrayIndex is not None:
raise TypeError("{0} is unsubscriptable".format(self.identifier))
# get the value
now = Date()
now.now()
return now.value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None):
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# CurrentTimeProperty
#
class CurrentTimeProperty(Property):
def __init__(self, identifier):
Property.__init__(self, identifier, Time, default=None, optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
# access an array
if arrayIndex is not None:
raise TypeError("{0} is unsubscriptable".format(self.identifier))
# get the value
now = Time()
now.now()
return now.value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None):
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# LocalDeviceObject
#
@bacpypes_debugging
class LocalDeviceObject(DeviceObject):
properties = \
[ CurrentTimeProperty('localTime')
, CurrentDateProperty('localDate')
]
defaultProperties = \
{ 'maxApduLengthAccepted': 1024
, 'segmentationSupported': 'segmentedBoth'
, 'maxSegmentsAccepted': 16
, 'apduSegmentTimeout': 5000
, 'apduTimeout': 3000
, 'numberOfApduRetries': 3
}
def __init__(self, **kwargs):
if _debug: LocalDeviceObject._debug("__init__ %r", kwargs)
# fill in default property values not in kwargs
for attr, value in LocalDeviceObject.defaultProperties.items():
if attr not in kwargs:
kwargs[attr] = value
# check for registration
if self.__class__ not in registered_object_types.values():
if 'vendorIdentifier' not in kwargs:
raise RuntimeError("vendorIdentifier required to auto-register the LocalDeviceObject class")
register_object_type(self.__class__, vendor_id=kwargs['vendorIdentifier'])
# check for local time
if 'localDate' in kwargs:
raise RuntimeError("localDate is provided by LocalDeviceObject and cannot be overridden")
if 'localTime' in kwargs:
raise RuntimeError("localTime is provided by LocalDeviceObject and cannot be overridden")
# check for a minimum value
if kwargs['maxApduLengthAccepted'] < 50:
raise ValueError("invalid max APDU length accepted")
# dump the updated attributes
if _debug: LocalDeviceObject._debug(" - updated kwargs: %r", kwargs)
# proceed as usual
DeviceObject.__init__(self, **kwargs)
# create a default implementation of an object list for local devices.
# If it is specified in the kwargs, that overrides this default.
if ('objectList' not in kwargs):
self.objectList = ArrayOf(ObjectIdentifier)([self.objectIdentifier])
# if the object has a property list and one wasn't provided
# in the kwargs, then it was created by default and the objectList
# property should be included
if ('propertyList' not in kwargs) and self.propertyList:
# make sure it's not already there
if 'objectList' not in self.propertyList:
self.propertyList.append('objectList')
#
# Who-Is I-Am Services
#

View File

@ -15,13 +15,14 @@ from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run, enable_sleeping
from bacpypes.pdu import Address
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import get_object_class, get_datatype
from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK
from bacpypes.primitivedata import Unsigned
from bacpypes.constructeddata import Array
from bacpypes.app import BIPSimpleApplication
from bacpypes.object import get_object_class, get_datatype
from bacpypes.service.device import LocalDeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())