1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-10-27 00:57:47 +08:00

LocalDeviceObject shuffling, automatic protocolServicesSupported

This commit is contained in:
Joel Bender
2018-02-24 00:37:04 -05:00
parent b700b09856
commit c25b086211
50 changed files with 61 additions and 537 deletions

View File

@@ -69,6 +69,8 @@ from . import apdu
from . import app
from . import appservice
from . import local
from . import service
#

View File

@@ -4,136 +4,16 @@ from ..debugging import bacpypes_debugging, ModuleLogger
from ..capability import Capability
from ..pdu import GlobalBroadcast
from ..primitivedata import Date, Time, ObjectIdentifier
from ..constructeddata import ArrayOf
from ..apdu import WhoIsRequest, IAmRequest, IHaveRequest, SimpleAckPDU, Error
from ..apdu import WhoIsRequest, IAmRequest, IHaveRequest, SimpleAckPDU
from ..errors import ExecutionError, InconsistentParameters, \
MissingRequiredParameter, ParameterOutOfRange
from ..object import register_object_type, registered_object_types, \
Property, DeviceObject
from ..task import FunctionTask
from .object import CurrentPropertyListMixIn
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# CurrentDateProperty
#
class CurrentDateProperty(Property):
def __init__(self, identifier):
Property.__init__(self, identifier, Date, default=(), optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
# access an array
if arrayIndex is not None:
raise TypeError("{0} is unsubscriptable".format(self.identifier))
# get the value
now = Date()
now.now()
return now.value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# CurrentTimeProperty
#
class CurrentTimeProperty(Property):
def __init__(self, identifier):
Property.__init__(self, identifier, Time, default=(), optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
# access an array
if arrayIndex is not None:
raise TypeError("{0} is unsubscriptable".format(self.identifier))
# get the value
now = Time()
now.now()
return now.value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# LocalDeviceObject
#
@bacpypes_debugging
class LocalDeviceObject(CurrentPropertyListMixIn, DeviceObject):
properties = \
[ CurrentTimeProperty('localTime')
, CurrentDateProperty('localDate')
]
defaultProperties = \
{ 'maxApduLengthAccepted': 1024
, 'segmentationSupported': 'segmentedBoth'
, 'maxSegmentsAccepted': 16
, 'apduSegmentTimeout': 5000
, 'apduTimeout': 3000
, 'numberOfApduRetries': 3
}
def __init__(self, **kwargs):
if _debug: LocalDeviceObject._debug("__init__ %r", kwargs)
# fill in default property values not in kwargs
for attr, value in LocalDeviceObject.defaultProperties.items():
if attr not in kwargs:
kwargs[attr] = value
for key, value in kwargs.items():
if key.startswith("_"):
setattr(self, key, value)
del kwargs[key]
# check for registration
if self.__class__ not in registered_object_types.values():
if 'vendorIdentifier' not in kwargs:
raise RuntimeError("vendorIdentifier required to auto-register the LocalDeviceObject class")
register_object_type(self.__class__, vendor_id=kwargs['vendorIdentifier'])
# check for properties this class implements
if 'localDate' in kwargs:
raise RuntimeError("localDate is provided by LocalDeviceObject and cannot be overridden")
if 'localTime' in kwargs:
raise RuntimeError("localTime is provided by LocalDeviceObject and cannot be overridden")
# the object identifier is required for the object list
if 'objectIdentifier' not in kwargs:
raise RuntimeError("objectIdentifier is required")
# coerce the object identifier
object_identifier = kwargs['objectIdentifier']
if isinstance(object_identifier, (int, long)):
object_identifier = ('device', object_identifier)
# the object list is provided
if 'objectList' in kwargs:
raise RuntimeError("objectList is provided by LocalDeviceObject and cannot be overridden")
kwargs['objectList'] = ArrayOf(ObjectIdentifier)([object_identifier])
# check for a minimum value
if kwargs['maxApduLengthAccepted'] < 50:
raise ValueError("invalid max APDU length accepted")
# dump the updated attributes
if _debug: LocalDeviceObject._debug(" - updated kwargs: %r", kwargs)
# proceed as usual
super(LocalDeviceObject, self).__init__(**kwargs)
#
# Who-Is I-Am Services
#

View File

@@ -7,11 +7,11 @@ from ..basetypes import ErrorType, PropertyIdentifier
from ..primitivedata import Atomic, Null, Unsigned
from ..constructeddata import Any, Array, ArrayOf
from ..apdu import Error, \
from ..apdu import \
SimpleAckPDU, ReadPropertyACK, ReadPropertyMultipleACK, \
ReadAccessResult, ReadAccessResultElement, ReadAccessResultElementChoice
from ..errors import ExecutionError
from ..object import Property, Object, PropertyError
from ..object import PropertyError
# some debugging
_debug = 0
@@ -20,57 +20,6 @@ _log = ModuleLogger(globals())
# handy reference
ArrayOfPropertyIdentifier = ArrayOf(PropertyIdentifier)
#
# CurrentPropertyList
#
@bacpypes_debugging
class CurrentPropertyList(Property):
def __init__(self):
if _debug: CurrentPropertyList._debug("__init__")
Property.__init__(self, 'propertyList', ArrayOfPropertyIdentifier, default=None, optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
if _debug: CurrentPropertyList._debug("ReadProperty %r %r", obj, arrayIndex)
# make a list of the properties that have values
property_list = [k for k, v in obj._values.items()
if v is not None
and k not in ('objectName', 'objectType', 'objectIdentifier', 'propertyList')
]
if _debug: CurrentPropertyList._debug(" - property_list: %r", property_list)
# sort the list so it's stable
property_list.sort()
# asking for the whole thing
if arrayIndex is None:
return ArrayOfPropertyIdentifier(property_list)
# asking for the length
if arrayIndex == 0:
return len(property_list)
# asking for an index
if arrayIndex > len(property_list):
raise ExecutionError(errorClass='property', errorCode='invalidArrayIndex')
return property_list[arrayIndex - 1]
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# CurrentPropertyListMixIn
#
@bacpypes_debugging
class CurrentPropertyListMixIn(Object):
properties = [
CurrentPropertyList(),
]
#
# ReadProperty and WriteProperty Services
#