1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

bring up-to-date with the 0.16.7 release

This commit is contained in:
Joel Bender 2018-02-16 23:14:21 -05:00
parent 1c1ebce4b8
commit f23dd4fef4
15 changed files with 1023 additions and 211 deletions

View File

@ -473,7 +473,7 @@ class BIPSimpleApplication(ApplicationIOController, WhoIsIAmServices, ReadWriteP
def __init__(self, localDevice, localAddress, deviceInfoCache=None, aseID=None):
if _debug: BIPSimpleApplication._debug("__init__ %r %r deviceInfoCache=%r aseID=%r", localDevice, localAddress, deviceInfoCache, aseID)
ApplicationIOController.__init__(self, localDevice, deviceInfoCache, aseID=aseID)
ApplicationIOController.__init__(self, localDevice, localAddress, deviceInfoCache, aseID=aseID)
# local address might be useful for subclasses
if isinstance(localAddress, Address):
@ -528,9 +528,14 @@ bacpypes_debugging(BIPSimpleApplication)
class BIPForeignApplication(ApplicationIOController, WhoIsIAmServices, ReadWritePropertyServices):
def __init__(self, localDevice, localAddress, bbmdAddress, bbmdTTL, aseID=None):
if _debug: BIPForeignApplication._debug("__init__ %r %r %r %r aseID=%r", localDevice, localAddress, bbmdAddress, bbmdTTL, aseID)
ApplicationIOController.__init__(self, localDevice, aseID=aseID)
def __init__(self, localDevice, localAddress, bbmdAddress, bbmdTTL, deviceInfoCache=None, aseID=None):
if _debug:
BIPForeignApplication._debug(
"__init__ %r %r %r %r deviceInfoCache=%r aseID=%r",
localDevice, localAddress, bbmdAddress, bbmdTTL,
deviceInfoCache, aseID,
)
ApplicationIOController.__init__(self, localDevice, localAddress, deviceInfoCache, aseID=aseID)
# local address might be useful for subclasses
if isinstance(localAddress, Address):

View File

@ -1604,6 +1604,16 @@ class ErrorType(Sequence):
, Element('errorCode', ErrorCode)
]
class LightingCommand(Sequence):
sequenceElements = \
[ Element('operation', LightingOperation, 0)
, Element('targetLevel', Real, 1, True)
, Element('rampRate', Real, 2, True)
, Element('stepIncrement', Real, 3, True)
, Element('fadeTime', Unsigned, 4, True)
, Element('priority', Unsigned, 5, True)
]
class ObjectPropertyReference(Sequence):
sequenceElements = \
[ Element('objectIdentifier', ObjectIdentifier, 0)
@ -1792,8 +1802,21 @@ class CalendarEntry(Choice):
]
class ChannelValue(Choice):
choiceElements = [
### needs help
choiceElements = \
[ Element('null', Null)
, Element('real', Real)
, Element('enumerated', Enumerated)
, Element('unsigned', Unsigned)
, Element('boolean', Boolean)
, Element('integer', Integer)
, Element('double', Double)
, Element('time', Time)
, Element('characterString', CharacterString)
, Element('octetString', OctetString)
, Element('bitString', BitString)
, Element('date', Date)
, Element('objectidentifier', ObjectIdentifier)
, Element('lightingCommand', LightingCommand, 0)
]
class ClientCOV(Choice):
@ -2068,16 +2091,6 @@ class KeyIdentifier(Sequence):
, Element('keyId', Unsigned, 1)
]
class LightingCommand(Sequence):
sequenceElements = \
[ Element('operation', LightingOperation, 0)
, Element('targetLevel', Real, 1) ### optional
, Element('rampRate', Real, 2) ### optional
, Element('stepIncrement', Real, 3) ### optional
, Element('fadeTime', Unsigned, 4) ### optional
, Element('priority', Unsigned, 5) ### optional
]
class LogDataLogData(Choice):
choiceElements = \
[ Element('booleanValue', Boolean, 0)
@ -2336,14 +2349,14 @@ class PriorityValue(Choice):
, Element('enumerated', Enumerated)
, Element('unsigned', Unsigned)
, Element('boolean', Boolean)
, Element('signed', Integer)
, Element('integer', Integer)
, Element('double', Double)
, Element('time', Time)
, Element('characterString', CharacterString)
, Element('octetString', OctetString)
, Element('bitString', BitString)
, Element('date', Date)
, Element('objectid', ObjectIdentifier)
, Element('objectidentifier', ObjectIdentifier)
, Element('constructedValue', Any, 0)
, Element('datetime', DateTime, 1)
]

View File

@ -407,6 +407,54 @@ class BIPSimple(BIPSAP, Client, Server):
# send it upstream
self.response(xpdu)
elif isinstance(pdu, WriteBroadcastDistributionTable):
# build a response
xpdu = Result(code=0x0010, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, ReadBroadcastDistributionTable):
# build a response
xpdu = Result(code=0x0020, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, RegisterForeignDevice):
# build a response
xpdu = Result(code=0x0030, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, ReadForeignDeviceTable):
# build a response
xpdu = Result(code=0x0040, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, DeleteForeignDeviceTableEntry):
# build a response
xpdu = Result(code=0x0050, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, DistributeBroadcastToNetwork):
# build a response
xpdu = Result(code=0x0060, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
else:
BIPSimple._warning("invalid pdu type: %s", type(pdu))
@ -446,11 +494,6 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
def indication(self, pdu):
if _debug: BIPForeign._debug("indication %r", pdu)
# check the BBMD registration status, we may not be registered
if self.registrationStatus != 0:
if _debug: BIPForeign._debug(" - packet dropped, unregistered")
return
# check for local stations
if pdu.pduDestination.addrType == Address.localStationAddr:
# make an original unicast PDU
@ -462,6 +505,11 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
# check for broadcasts
elif pdu.pduDestination.addrType == Address.localBroadcastAddr:
# check the BBMD registration status, we may not be registered
if self.registrationStatus != 0:
if _debug: BIPForeign._debug(" - packet dropped, unregistered")
return
# make an original broadcast PDU
xpdu = DistributeBroadcastToNetwork(pdu, user_data=pdu.pduUserData)
xpdu.pduDestination = self.bbmdAddress
@ -498,6 +546,13 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
return
elif isinstance(pdu, OriginalUnicastNPDU):
# build a vanilla PDU
xpdu = PDU(pdu.pduData, source=pdu.pduSource, destination=pdu.pduDestination, user_data=pdu.pduUserData)
# send it upstream
self.response(xpdu)
# check the BBMD registration status, we may not be registered
if self.registrationStatus != 0:
if _debug: BIPForeign._debug(" - packet dropped, unregistered")
@ -511,13 +566,6 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
# send this to the service access point
self.sap_response(pdu)
elif isinstance(pdu, OriginalUnicastNPDU):
# build a vanilla PDU
xpdu = PDU(pdu.pduData, source=pdu.pduSource, destination=pdu.pduDestination, user_data=pdu.pduUserData)
# send it upstream
self.response(xpdu)
elif isinstance(pdu, ForwardedNPDU):
# build a PDU with the source from the real source
xpdu = PDU(pdu.pduData, source=pdu.bvlciAddress, destination=LocalBroadcast(), user_data=pdu.pduUserData)
@ -525,6 +573,54 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
# send it upstream
self.response(xpdu)
elif isinstance(pdu, WriteBroadcastDistributionTable):
# build a response
xpdu = Result(code=0x0010, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, ReadBroadcastDistributionTable):
# build a response
xpdu = Result(code=0x0020, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, RegisterForeignDevice):
# build a response
xpdu = Result(code=0x0030, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, ReadForeignDeviceTable):
# build a response
xpdu = Result(code=0x0040, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, DeleteForeignDeviceTableEntry):
# build a response
xpdu = Result(code=0x0050, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, DistributeBroadcastToNetwork):
# build a response
xpdu = Result(code=0x0060, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
else:
BIPForeign._warning("invalid pdu type: %s", type(pdu))

View File

@ -113,13 +113,15 @@ class LocalDeviceObject(CurrentPropertyListMixIn, DeviceObject):
if 'objectIdentifier' not in kwargs:
raise RuntimeError("objectIdentifier is required")
# coerce the object identifier
object_identifier = kwargs['objectIdentifier']
if isinstance(object_identifier, (int, long)):
object_identifier = ('device', object_identifier)
# the object list is provided
if 'objectList' in kwargs:
raise RuntimeError("objectList is provided by LocalDeviceObject and cannot be overridden")
else:
kwargs['objectList'] = ArrayOf(ObjectIdentifier)([
kwargs['objectIdentifier'],
])
kwargs['objectList'] = ArrayOf(ObjectIdentifier)([object_identifier])
# check for a minimum value
if kwargs['maxApduLengthAccepted'] < 50:

View File

@ -24,7 +24,6 @@ ArrayOfPropertyIdentifier = ArrayOf(PropertyIdentifier)
# CurrentPropertyList
#
@bacpypes_debugging
class CurrentPropertyList(Property):
def __init__(self):
@ -60,17 +59,20 @@ class CurrentPropertyList(Property):
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
bacpypes_debugging(CurrentPropertyList)
#
# CurrentPropertyListMixIn
#
@bacpypes_debugging
class CurrentPropertyListMixIn(Object):
properties = [
CurrentPropertyList(),
]
bacpypes_debugging(CurrentPropertyListMixIn)
#
# ReadProperty and WriteProperty Services
#

View File

@ -470,7 +470,7 @@ class BIPSimpleApplication(ApplicationIOController, WhoIsIAmServices, ReadWriteP
def __init__(self, localDevice, localAddress, deviceInfoCache=None, aseID=None):
if _debug: BIPSimpleApplication._debug("__init__ %r %r deviceInfoCache=%r aseID=%r", localDevice, localAddress, deviceInfoCache, aseID)
ApplicationIOController.__init__(self, localDevice, deviceInfoCache, aseID=aseID)
ApplicationIOController.__init__(self, localDevice, localAddress, deviceInfoCache, aseID=aseID)
# local address might be useful for subclasses
if isinstance(localAddress, Address):
@ -524,9 +524,14 @@ class BIPSimpleApplication(ApplicationIOController, WhoIsIAmServices, ReadWriteP
@bacpypes_debugging
class BIPForeignApplication(ApplicationIOController, WhoIsIAmServices, ReadWritePropertyServices):
def __init__(self, localDevice, localAddress, bbmdAddress, bbmdTTL, aseID=None):
if _debug: BIPForeignApplication._debug("__init__ %r %r %r %r aseID=%r", localDevice, localAddress, bbmdAddress, bbmdTTL, aseID)
ApplicationIOController.__init__(self, localDevice, aseID=aseID)
def __init__(self, localDevice, localAddress, bbmdAddress, bbmdTTL, deviceInfoCache=None, aseID=None):
if _debug:
BIPForeignApplication._debug(
"__init__ %r %r %r %r deviceInfoCache=%r aseID=%r",
localDevice, localAddress, bbmdAddress, bbmdTTL,
deviceInfoCache, aseID,
)
ApplicationIOController.__init__(self, localDevice, localAddress, deviceInfoCache, aseID=aseID)
# local address might be useful for subclasses
if isinstance(localAddress, Address):

View File

@ -1604,6 +1604,16 @@ class ErrorType(Sequence):
, Element('errorCode', ErrorCode)
]
class LightingCommand(Sequence):
sequenceElements = \
[ Element('operation', LightingOperation, 0)
, Element('targetLevel', Real, 1, True)
, Element('rampRate', Real, 2, True)
, Element('stepIncrement', Real, 3, True)
, Element('fadeTime', Unsigned, 4, True)
, Element('priority', Unsigned, 5, True)
]
class ObjectPropertyReference(Sequence):
sequenceElements = \
[ Element('objectIdentifier', ObjectIdentifier, 0)
@ -1792,8 +1802,21 @@ class CalendarEntry(Choice):
]
class ChannelValue(Choice):
choiceElements = [
### needs help
choiceElements = \
[ Element('null', Null)
, Element('real', Real)
, Element('enumerated', Enumerated)
, Element('unsigned', Unsigned)
, Element('boolean', Boolean)
, Element('integer', Integer)
, Element('double', Double)
, Element('time', Time)
, Element('characterString', CharacterString)
, Element('octetString', OctetString)
, Element('bitString', BitString)
, Element('date', Date)
, Element('objectidentifier', ObjectIdentifier)
, Element('lightingCommand', LightingCommand, 0)
]
class ClientCOV(Choice):
@ -2068,16 +2091,6 @@ class KeyIdentifier(Sequence):
, Element('keyId', Unsigned, 1)
]
class LightingCommand(Sequence):
sequenceElements = \
[ Element('operation', LightingOperation, 0)
, Element('targetLevel', Real, 1) ### optional
, Element('rampRate', Real, 2) ### optional
, Element('stepIncrement', Real, 3) ### optional
, Element('fadeTime', Unsigned, 4) ### optional
, Element('priority', Unsigned, 5) ### optional
]
class LogDataLogData(Choice):
choiceElements = \
[ Element('booleanValue', Boolean, 0)
@ -2336,14 +2349,14 @@ class PriorityValue(Choice):
, Element('enumerated', Enumerated)
, Element('unsigned', Unsigned)
, Element('boolean', Boolean)
, Element('signed', Integer)
, Element('integer', Integer)
, Element('double', Double)
, Element('time', Time)
, Element('characterString', CharacterString)
, Element('octetString', OctetString)
, Element('bitString', BitString)
, Element('date', Date)
, Element('objectid', ObjectIdentifier)
, Element('objectidentifier', ObjectIdentifier)
, Element('constructedValue', Any, 0)
, Element('datetime', DateTime, 1)
]

View File

@ -404,6 +404,54 @@ class BIPSimple(BIPSAP, Client, Server):
# send it upstream
self.response(xpdu)
elif isinstance(pdu, WriteBroadcastDistributionTable):
# build a response
xpdu = Result(code=0x0010, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, ReadBroadcastDistributionTable):
# build a response
xpdu = Result(code=0x0020, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, RegisterForeignDevice):
# build a response
xpdu = Result(code=0x0030, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, ReadForeignDeviceTable):
# build a response
xpdu = Result(code=0x0040, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, DeleteForeignDeviceTableEntry):
# build a response
xpdu = Result(code=0x0050, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, DistributeBroadcastToNetwork):
# build a response
xpdu = Result(code=0x0060, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
else:
BIPSimple._warning("invalid pdu type: %s", type(pdu))
@ -442,11 +490,6 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
def indication(self, pdu):
if _debug: BIPForeign._debug("indication %r", pdu)
# check the BBMD registration status, we may not be registered
if self.registrationStatus != 0:
if _debug: BIPForeign._debug(" - packet dropped, unregistered")
return
# check for local stations
if pdu.pduDestination.addrType == Address.localStationAddr:
# make an original unicast PDU
@ -458,6 +501,11 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
# check for broadcasts
elif pdu.pduDestination.addrType == Address.localBroadcastAddr:
# check the BBMD registration status, we may not be registered
if self.registrationStatus != 0:
if _debug: BIPForeign._debug(" - packet dropped, unregistered")
return
# make an original broadcast PDU
xpdu = DistributeBroadcastToNetwork(pdu, user_data=pdu.pduUserData)
xpdu.pduDestination = self.bbmdAddress
@ -494,6 +542,14 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
return
elif isinstance(pdu, OriginalUnicastNPDU):
# build a vanilla PDU
xpdu = PDU(pdu.pduData, source=pdu.pduSource, destination=pdu.pduDestination, user_data=pdu.pduUserData)
# send it upstream
self.response(xpdu)
return
# check the BBMD registration status, we may not be registered
if self.registrationStatus != 0:
if _debug: BIPForeign._debug(" - packet dropped, unregistered")
@ -507,13 +563,6 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
# send this to the service access point
self.sap_response(pdu)
elif isinstance(pdu, OriginalUnicastNPDU):
# build a vanilla PDU
xpdu = PDU(pdu.pduData, source=pdu.pduSource, destination=pdu.pduDestination, user_data=pdu.pduUserData)
# send it upstream
self.response(xpdu)
elif isinstance(pdu, ForwardedNPDU):
# build a PDU with the source from the real source
xpdu = PDU(pdu.pduData, source=pdu.bvlciAddress, destination=LocalBroadcast(), user_data=pdu.pduUserData)
@ -521,6 +570,54 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
# send it upstream
self.response(xpdu)
elif isinstance(pdu, WriteBroadcastDistributionTable):
# build a response
xpdu = Result(code=0x0010, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, ReadBroadcastDistributionTable):
# build a response
xpdu = Result(code=0x0020, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, RegisterForeignDevice):
# build a response
xpdu = Result(code=0x0030, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, ReadForeignDeviceTable):
# build a response
xpdu = Result(code=0x0040, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, DeleteForeignDeviceTableEntry):
# build a response
xpdu = Result(code=0x0050, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, DistributeBroadcastToNetwork):
# build a response
xpdu = Result(code=0x0060, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
else:
BIPForeign._warning("invalid pdu type: %s", type(pdu))

View File

@ -114,13 +114,15 @@ class LocalDeviceObject(CurrentPropertyListMixIn, DeviceObject):
if 'objectIdentifier' not in kwargs:
raise RuntimeError("objectIdentifier is required")
# coerce the object identifier
object_identifier = kwargs['objectIdentifier']
if isinstance(object_identifier, (int, long)):
object_identifier = ('device', object_identifier)
# the object list is provided
if 'objectList' in kwargs:
raise RuntimeError("objectList is provided by LocalDeviceObject and cannot be overridden")
else:
kwargs['objectList'] = ArrayOf(ObjectIdentifier)([
kwargs['objectIdentifier'],
])
kwargs['objectList'] = ArrayOf(ObjectIdentifier)([object_identifier])
# check for a minimum value
if kwargs['maxApduLengthAccepted'] < 50:

View File

@ -470,7 +470,7 @@ class BIPSimpleApplication(ApplicationIOController, WhoIsIAmServices, ReadWriteP
def __init__(self, localDevice, localAddress, deviceInfoCache=None, aseID=None):
if _debug: BIPSimpleApplication._debug("__init__ %r %r deviceInfoCache=%r aseID=%r", localDevice, localAddress, deviceInfoCache, aseID)
ApplicationIOController.__init__(self, localDevice, deviceInfoCache, aseID=aseID)
ApplicationIOController.__init__(self, localDevice, localAddress, deviceInfoCache, aseID=aseID)
# local address might be useful for subclasses
if isinstance(localAddress, Address):
@ -524,9 +524,14 @@ class BIPSimpleApplication(ApplicationIOController, WhoIsIAmServices, ReadWriteP
@bacpypes_debugging
class BIPForeignApplication(ApplicationIOController, WhoIsIAmServices, ReadWritePropertyServices):
def __init__(self, localDevice, localAddress, bbmdAddress, bbmdTTL, aseID=None):
if _debug: BIPForeignApplication._debug("__init__ %r %r %r %r aseID=%r", localDevice, localAddress, bbmdAddress, bbmdTTL, aseID)
ApplicationIOController.__init__(self, localDevice, aseID=aseID)
def __init__(self, localDevice, localAddress, bbmdAddress, bbmdTTL, deviceInfoCache=None, aseID=None):
if _debug:
BIPForeignApplication._debug(
"__init__ %r %r %r %r deviceInfoCache=%r aseID=%r",
localDevice, localAddress, bbmdAddress, bbmdTTL,
deviceInfoCache, aseID,
)
ApplicationIOController.__init__(self, localDevice, localAddress, deviceInfoCache, aseID=aseID)
# local address might be useful for subclasses
if isinstance(localAddress, Address):

View File

@ -1604,6 +1604,16 @@ class ErrorType(Sequence):
, Element('errorCode', ErrorCode)
]
class LightingCommand(Sequence):
sequenceElements = \
[ Element('operation', LightingOperation, 0)
, Element('targetLevel', Real, 1, True)
, Element('rampRate', Real, 2, True)
, Element('stepIncrement', Real, 3, True)
, Element('fadeTime', Unsigned, 4, True)
, Element('priority', Unsigned, 5, True)
]
class ObjectPropertyReference(Sequence):
sequenceElements = \
[ Element('objectIdentifier', ObjectIdentifier, 0)
@ -1792,8 +1802,21 @@ class CalendarEntry(Choice):
]
class ChannelValue(Choice):
choiceElements = [
### needs help
choiceElements = \
[ Element('null', Null)
, Element('real', Real)
, Element('enumerated', Enumerated)
, Element('unsigned', Unsigned)
, Element('boolean', Boolean)
, Element('integer', Integer)
, Element('double', Double)
, Element('time', Time)
, Element('characterString', CharacterString)
, Element('octetString', OctetString)
, Element('bitString', BitString)
, Element('date', Date)
, Element('objectidentifier', ObjectIdentifier)
, Element('lightingCommand', LightingCommand, 0)
]
class ClientCOV(Choice):
@ -2068,16 +2091,6 @@ class KeyIdentifier(Sequence):
, Element('keyId', Unsigned, 1)
]
class LightingCommand(Sequence):
sequenceElements = \
[ Element('operation', LightingOperation, 0)
, Element('targetLevel', Real, 1) ### optional
, Element('rampRate', Real, 2) ### optional
, Element('stepIncrement', Real, 3) ### optional
, Element('fadeTime', Unsigned, 4) ### optional
, Element('priority', Unsigned, 5) ### optional
]
class LogDataLogData(Choice):
choiceElements = \
[ Element('booleanValue', Boolean, 0)
@ -2336,14 +2349,14 @@ class PriorityValue(Choice):
, Element('enumerated', Enumerated)
, Element('unsigned', Unsigned)
, Element('boolean', Boolean)
, Element('signed', Integer)
, Element('integer', Integer)
, Element('double', Double)
, Element('time', Time)
, Element('characterString', CharacterString)
, Element('octetString', OctetString)
, Element('bitString', BitString)
, Element('date', Date)
, Element('objectid', ObjectIdentifier)
, Element('objectidentifier', ObjectIdentifier)
, Element('constructedValue', Any, 0)
, Element('datetime', DateTime, 1)
]

View File

@ -403,6 +403,54 @@ class BIPSimple(BIPSAP, Client, Server):
# send it upstream
self.response(xpdu)
elif isinstance(pdu, WriteBroadcastDistributionTable):
# build a response
xpdu = Result(code=0x0010, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, ReadBroadcastDistributionTable):
# build a response
xpdu = Result(code=0x0020, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, RegisterForeignDevice):
# build a response
xpdu = Result(code=0x0030, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, ReadForeignDeviceTable):
# build a response
xpdu = Result(code=0x0040, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, DeleteForeignDeviceTableEntry):
# build a response
xpdu = Result(code=0x0050, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, DistributeBroadcastToNetwork):
# build a response
xpdu = Result(code=0x0060, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
else:
BIPSimple._warning("invalid pdu type: %s", type(pdu))
@ -441,11 +489,6 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
def indication(self, pdu):
if _debug: BIPForeign._debug("indication %r", pdu)
# check the BBMD registration status, we may not be registered
if self.registrationStatus != 0:
if _debug: BIPForeign._debug(" - packet dropped, unregistered")
return
# check for local stations
if pdu.pduDestination.addrType == Address.localStationAddr:
# make an original unicast PDU
@ -457,6 +500,11 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
# check for broadcasts
elif pdu.pduDestination.addrType == Address.localBroadcastAddr:
# check the BBMD registration status, we may not be registered
if self.registrationStatus != 0:
if _debug: BIPForeign._debug(" - packet dropped, unregistered")
return
# make an original broadcast PDU
xpdu = DistributeBroadcastToNetwork(pdu, user_data=pdu.pduUserData)
xpdu.pduDestination = self.bbmdAddress
@ -493,6 +541,13 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
return
elif isinstance(pdu, OriginalUnicastNPDU):
# build a vanilla PDU
xpdu = PDU(pdu.pduData, source=pdu.pduSource, destination=pdu.pduDestination, user_data=pdu.pduUserData)
# send it upstream
self.response(xpdu)
# check the BBMD registration status, we may not be registered
if self.registrationStatus != 0:
if _debug: BIPForeign._debug(" - packet dropped, unregistered")
@ -506,13 +561,6 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
# send this to the service access point
self.sap_response(pdu)
elif isinstance(pdu, OriginalUnicastNPDU):
# build a vanilla PDU
xpdu = PDU(pdu.pduData, source=pdu.pduSource, destination=pdu.pduDestination, user_data=pdu.pduUserData)
# send it upstream
self.response(xpdu)
elif isinstance(pdu, ForwardedNPDU):
# build a PDU with the source from the real source
xpdu = PDU(pdu.pduData, source=pdu.bvlciAddress, destination=LocalBroadcast(), user_data=pdu.pduUserData)
@ -520,6 +568,54 @@ class BIPForeign(BIPSAP, Client, Server, OneShotTask, DebugContents):
# send it upstream
self.response(xpdu)
elif isinstance(pdu, WriteBroadcastDistributionTable):
# build a response
xpdu = Result(code=0x0010, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, ReadBroadcastDistributionTable):
# build a response
xpdu = Result(code=0x0020, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, RegisterForeignDevice):
# build a response
xpdu = Result(code=0x0030, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, ReadForeignDeviceTable):
# build a response
xpdu = Result(code=0x0040, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, DeleteForeignDeviceTableEntry):
# build a response
xpdu = Result(code=0x0050, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
elif isinstance(pdu, DistributeBroadcastToNetwork):
# build a response
xpdu = Result(code=0x0060, user_data=pdu.pduUserData)
xpdu.pduDestination = pdu.pduSource
# send it downstream
self.request(xpdu)
else:
BIPForeign._warning("invalid pdu type: %s", type(pdu))

View File

@ -114,13 +114,15 @@ class LocalDeviceObject(CurrentPropertyListMixIn, DeviceObject):
if 'objectIdentifier' not in kwargs:
raise RuntimeError("objectIdentifier is required")
# coerce the object identifier
object_identifier = kwargs['objectIdentifier']
if isinstance(object_identifier, int):
object_identifier = ('device', object_identifier)
# the object list is provided
if 'objectList' in kwargs:
raise RuntimeError("objectList is provided by LocalDeviceObject and cannot be overridden")
else:
kwargs['objectList'] = ArrayOf(ObjectIdentifier)([
kwargs['objectIdentifier'],
])
kwargs['objectList'] = ArrayOf(ObjectIdentifier)([object_identifier])
# check for a minimum value
if kwargs['maxApduLengthAccepted'] < 50:

445
samples/CommandableMixin.py Executable file → Normal file
View File

@ -1,154 +1,371 @@
#!/usr/bin/env python
"""
This sample application demonstrates a mix-in class for commandable properties
(not useful for Binary Out or Binary Value objects that have a minimum on and off
time, or for Channel objects).
Rebuilt Commandable
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.task import OneShotTask
from bacpypes.errors import ExecutionError
from bacpypes.object import AnalogValueObject, DateValueObject
from bacpypes.primitivedata import Null, Date
from bacpypes.basetypes import PriorityValue, PriorityArray
from bacpypes.primitivedata import BitString, CharacterString, Date, Integer, \
Double, Enumerated, OctetString, Real, Time, Unsigned
from bacpypes.basetypes import BinaryPV, ChannelValue, DateTime, DoorValue, PriorityValue, \
PriorityArray
from bacpypes.object import Property, ReadableProperty, WritableProperty, \
register_object_type, \
AccessDoorObject, AnalogOutputObject, AnalogValueObject, \
BinaryOutputObject, BinaryValueObject, BitStringValueObject, CharacterStringValueObject, \
DateValueObject, DatePatternValueObject, DateTimePatternValueObject, \
DateTimeValueObject, IntegerValueObject, \
LargeAnalogValueObject, LightingOutputObject, MultiStateOutputObject, \
MultiStateValueObject, OctetStringValueObject, PositiveIntegerValueObject, \
TimeValueObject, TimePatternValueObject, ChannelObject
from bacpypes.app import BIPSimpleApplication
from bacpypes.service.object import CurrentPropertyListMixIn
from bacpypes.service.device import LocalDeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# CommandableMixin
# Commandable
#
@bacpypes_debugging
class CommandableMixin(object):
def Commandable(datatype, presentValue='presentValue', priorityArray='priorityArray', relinquishDefault='relinquishDefault'):
if _debug: Commandable._debug("Commandable %r ...", datatype)
def __init__(self, init_value, **kwargs):
if _debug: CommandableMixin._debug("__init__ %r, %r", init_value, kwargs)
super(CommandableMixin, self).__init__(**kwargs)
class _Commando(object):
# if no present value given, give it the default value
if ('presentValue' not in kwargs):
if _debug: CommandableMixin._debug(" - initialize present value")
self.presentValue = init_value
properties = [
WritableProperty(presentValue, datatype),
ReadableProperty(priorityArray, PriorityArray),
ReadableProperty(relinquishDefault, datatype),
]
# if no priority array given, give it an empty one
if ('priorityArray' not in kwargs):
if _debug: CommandableMixin._debug(" - initialize priority array")
self.priorityArray = PriorityArray()
for i in range(16):
self.priorityArray.append(PriorityValue(null=Null()))
_pv_choice = None
# if no relinquish default value given, give it the default value
if ('relinquishDefault' not in kwargs):
if _debug: CommandableMixin._debug(" - initialize relinquish default")
self.relinquishDefault = init_value
def __init__(self, **kwargs):
super(_Commando, self).__init__(**kwargs)
# capture the present value property
self._pv = self._properties['presentValue']
if _debug: CommandableMixin._debug(" - _pv: %r", self._pv)
# build a default value in case one is needed
default_value = datatype().value
if issubclass(datatype, Enumerated):
default_value = datatype._xlate_table[default_value]
if _debug: Commandable._debug(" - default_value: %r", default_value)
# capture the datatype
self._pv_datatype = self._pv.datatype
if _debug: CommandableMixin._debug(" - _pv_datatype: %r", self._pv_datatype)
# see if a present value was provided
if (presentValue not in kwargs):
setattr(self, presentValue, default_value)
# look up a matching priority value choice
for element in PriorityValue.choiceElements:
if element.klass is self._pv_datatype:
self._pv_choice = element.name
break
else:
self._pv_choice = 'constructedValue'
if _debug: CommandableMixin._debug(" - _pv_choice: %r", self._pv_choice)
# see if a priority array was provided
if (priorityArray not in kwargs):
new_priority_array = PriorityArray()
for i in range(16):
new_priority_array.append(PriorityValue(null=()))
setattr(self, priorityArray, new_priority_array)
def WriteProperty(self, property, value, arrayIndex=None, priority=None, direct=False):
if _debug: CommandableMixin._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", property, value, arrayIndex, priority, direct)
# see if a present value was provided
if (relinquishDefault not in kwargs):
setattr(self, relinquishDefault, default_value)
# when writing to the presentValue with a priority
if (property == 'presentValue'):
# default (lowest) priority
if priority is None:
priority = 16
if _debug: CommandableMixin._debug(" - translate to array index %d", priority)
def _highest_priority_value(self):
if _debug: Commandable._debug("_highest_priority_value")
# translate to updating the priority array
property = 'priorityArray'
arrayIndex = priority
priority = None
# update the priority array entry
if (property == 'priorityArray') and (arrayIndex is not None):
# check the bounds
if arrayIndex == 0:
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
if (arrayIndex < 1) or (arrayIndex > 16):
raise ExecutionError(errorClass='property', errorCode='invalidArrayIndex')
# update the specific priorty value element
priority_value = self.priorityArray[arrayIndex]
if _debug: CommandableMixin._debug(" - priority_value: %r", priority_value)
# the null or the choice has to be set, the other clear
if value is ():
if _debug: CommandableMixin._debug(" - write a null")
priority_value.null = value
setattr(priority_value, self._pv_choice, None)
else:
if _debug: CommandableMixin._debug(" - write a value")
priority_value.null = None
setattr(priority_value, self._pv_choice, value)
# look for the highest priority value
priority_array = getattr(self, priorityArray)
for i in range(1, 17):
priority_value = self.priorityArray[i]
priority_value = priority_array[i]
if priority_value.null is None:
if (i < arrayIndex):
if _debug: CommandableMixin._debug(" - existing higher priority value")
return
value = getattr(priority_value, self._pv_choice)
if _debug: Commandable._debug(" - found at index: %r", i)
value = getattr(priority_value, _Commando._pv_choice)
value_source = "###"
if issubclass(datatype, Enumerated):
value = datatype._xlate_table[value]
if _debug: Commandable._debug(" - remapped enumeration: %r", value)
break
else:
value = self.relinquishDefault
if _debug: CommandableMixin._debug(" - new present value: %r", value)
value = getattr(self, relinquishDefault)
value_source = None
property = 'presentValue'
arrayIndex = priority = None
if _debug: Commandable._debug(" - value, value_source: %r, %r", value, value_source)
# allow the request to pass through
if _debug: CommandableMixin._debug(" - super: %r %r arrayIndex=%r priority=%r", property, value, arrayIndex, priority)
super(CommandableMixin, self).WriteProperty(
property, value,
arrayIndex=arrayIndex, priority=priority, direct=direct,
)
# return what you found
return value, value_source
def WriteProperty(self, property, value, arrayIndex=None, priority=None, direct=False):
if _debug: Commandable._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", property, value, arrayIndex, priority, direct)
# when writing to the presentValue with a priority
if (property == presentValue):
if _debug: Commandable._debug(" - writing to %s, priority %r", presentValue, priority)
# default (lowest) priority
if priority is None:
priority = 16
if _debug: Commandable._debug(" - translate to priority array, index %d", priority)
# translate to updating the priority array
property = priorityArray
arrayIndex = priority
priority = None
# update the priority array entry
if (property == priorityArray):
if (arrayIndex is None):
if _debug: Commandable._debug(" - writing entire %s", priorityArray)
# pass along the request
super(_Commando, self).WriteProperty(
property, value,
arrayIndex=arrayIndex, priority=priority, direct=direct,
)
else:
if _debug: Commandable._debug(" - writing to %s, array index %d", priorityArray, arrayIndex)
# check the bounds
if arrayIndex == 0:
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
if (arrayIndex < 1) or (arrayIndex > 16):
raise ExecutionError(errorClass='property', errorCode='invalidArrayIndex')
# update the specific priorty value element
priority_value = getattr(self, priorityArray)[arrayIndex]
if _debug: Commandable._debug(" - priority_value: %r", priority_value)
# the null or the choice has to be set, the other clear
if value is ():
if _debug: Commandable._debug(" - write a null")
priority_value.null = value
setattr(priority_value, _Commando._pv_choice, None)
else:
if _debug: Commandable._debug(" - write a value")
if issubclass(datatype, Enumerated):
value = datatype._xlate_table[value]
if _debug: Commandable._debug(" - remapped enumeration: %r", value)
priority_value.null = None
setattr(priority_value, _Commando._pv_choice, value)
# look for the highest priority value
value, value_source = self._highest_priority_value()
# compare with the current value
current_value = getattr(self, presentValue)
if value == current_value:
if _debug: Commandable._debug(" - no present value change")
return
# turn this into a present value change
property = presentValue
arrayIndex = priority = None
# allow the request to pass through
if _debug: Commandable._debug(" - super: %r %r arrayIndex=%r priority=%r", property, value, arrayIndex, priority)
super(_Commando, self).WriteProperty(
property, value,
arrayIndex=arrayIndex, priority=priority, direct=direct,
)
# look up a matching priority value choice
for element in PriorityValue.choiceElements:
if issubclass(datatype, element.klass):
_Commando._pv_choice = element.name
break
else:
_Commando._pv_choice = 'constructedValue'
if _debug: Commandable._debug(" - _pv_choice: %r", _Commando._pv_choice)
# return the class
return _Commando
#
# CommandableAnalogValueObject
# MinOnOffTask
#
@bacpypes_debugging
class CommandableAnalogValueObject(CommandableMixin, AnalogValueObject):
class MinOnOffTask(OneShotTask):
def __init__(self, **kwargs):
if _debug: CommandableAnalogValueObject._debug("__init__ %r", kwargs)
CommandableMixin.__init__(self, 0.0, **kwargs)
def __init__(self, binary_obj):
if _debug: MinOnOffTask._debug("__init__ %s", repr(binary_obj))
OneShotTask.__init__(self)
# save a reference to the object
self.binary_obj = binary_obj
# listen for changes to the present value
self.binary_obj._property_monitors['presentValue'].append(self.present_value_change)
def present_value_change(self, old_value, new_value):
if _debug: MinOnOffTask._debug("present_value_change %r %r", old_value, new_value)
# if there's no value change, skip all this
if old_value == new_value:
if _debug: MinOnOffTask._debug(" - no state change")
return
# get the minimum on/off time
if new_value == 'inactive':
task_delay = getattr(self.binary_obj, 'minimumOnTime') or 0
if _debug: MinOnOffTask._debug(" - minimum on: %r", task_delay)
elif new_value == 'active':
task_delay = getattr(self.binary_obj, 'minimumOffTime') or 0
if _debug: MinOnOffTask._debug(" - minimum off: %r", task_delay)
else:
raise ValueError("unrecognized present value for %r: %r" % (self.binary_obj.objectIdentifier, new_value))
# if there's no delay, don't bother
if not task_delay:
if _debug: MinOnOffTask._debug(" - no delay")
return
# set the value at priority 6
self.binary_obj.WriteProperty('presentValue', new_value, priority=6)
# install this to run, if there is a delay
self.install_task(delta=task_delay)
def process_task(self):
if _debug: MinOnOffTask._debug("process_task(%s)", self.binary_obj.objectName)
# clear the value at priority 6
self.binary_obj.WriteProperty('presentValue', (), priority=6)
#
# CommandableDateValueObject
# MinOnOff
#
@bacpypes_debugging
class CommandableDateValueObject(CommandableMixin, DateValueObject):
class MinOnOff(object):
def __init__(self, **kwargs):
if _debug: CommandableDateValueObject._debug("__init__ %r", kwargs)
CommandableMixin.__init__(self, None, **kwargs)
if _debug: MinOnOff._debug("__init__ ...")
super(MinOnOff, self).__init__(**kwargs)
# create the timer task
self._min_on_off_task = MinOnOffTask(self)
#
# Commandable Standard Objects
#
class AccessDoorObjectCmd(Commandable(DoorValue), AccessDoorObject):
pass
class AnalogOutputObjectCmd(Commandable(Real), AnalogOutputObject):
pass
class AnalogValueObjectCmd(Commandable(Real), AnalogValueObject):
pass
### class BinaryLightingOutputObjectCmd(Commandable(Real), BinaryLightingOutputObject):
### pass
class BinaryOutputObjectCmd(Commandable(BinaryPV), MinOnOff, BinaryOutputObject):
pass
class BinaryValueObjectCmd(Commandable(BinaryPV), MinOnOff, BinaryValueObject):
pass
class BitStringValueObjectCmd(Commandable(BitString), BitStringValueObject):
pass
class CharacterStringValueObjectCmd(Commandable(CharacterString), CharacterStringValueObject):
pass
class DateValueObjectCmd(Commandable(Date), DateValueObject):
pass
class DatePatternValueObjectCmd(Commandable(Date), DatePatternValueObject):
pass
class DateTimeValueObjectCmd(Commandable(DateTime), DateTimeValueObject):
pass
class DateTimePatternValueObjectCmd(Commandable(DateTime), DateTimePatternValueObject):
pass
class IntegerValueObjectCmd(Commandable(Integer), IntegerValueObject):
pass
class LargeAnalogValueObjectCmd(Commandable(Double), LargeAnalogValueObject):
pass
class LightingOutputObjectCmd(Commandable(Real), LightingOutputObject):
pass
class MultiStateOutputObjectCmd(Commandable(Unsigned), MultiStateOutputObject):
pass
class MultiStateValueObjectCmd(Commandable(Unsigned), MultiStateValueObject):
pass
class OctetStringValueObjectCmd(Commandable(OctetString), OctetStringValueObject):
pass
class PositiveIntegerValueObjectCmd(Commandable(Unsigned), PositiveIntegerValueObject):
pass
class TimeValueObjectCmd(Commandable(Time), TimeValueObject):
pass
class TimePatternValueObjectCmd(Commandable(Time), TimePatternValueObject):
pass
#
# ChannelValueProperty
#
class ChannelValueProperty(Property):
def __init__(self):
if _debug: ChannelValueProperty._debug("__init__")
Property.__init__(self, 'presentValue', ChannelValue, default=None, optional=False, mutable=True)
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug: ChannelValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
### Clause 12.53.5, page 487
raise NotImplementedError()
#
# ChannelObjectCmd
#
class ChannelObjectCmd(ChannelObject):
properties = [
ChannelValueProperty(),
]
##
##
##
##
##
@register_object_type(vendor_id=999)
class LocalAnalogValueObjectCmd(CurrentPropertyListMixIn, AnalogValueObjectCmd):
pass
@register_object_type(vendor_id=999)
class LocalBinaryOutputObjectCmd(CurrentPropertyListMixIn, BinaryOutputObjectCmd):
pass
@register_object_type(vendor_id=999)
class LocalDateValueObjectCmd(CurrentPropertyListMixIn, DateValueObjectCmd):
pass
#
# __main__
@ -174,22 +391,36 @@ def main():
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make a commandable analog value object, add to the device
cavo1 = CommandableAnalogValueObject(
objectIdentifier=('analogValue', 1), objectName='Commandable1',
avo1 = LocalAnalogValueObjectCmd(
objectIdentifier=('analogValue', 1),
objectName='avo1',
)
if _debug: _log.debug(" - cavo1: %r", cavo1)
this_application.add_object(cavo1)
if _debug: _log.debug(" - avo1: %r", avo1)
this_application.add_object(avo1)
# make a commandable binary output object, add to the device
boo1 = LocalBinaryOutputObjectCmd(
objectIdentifier=('binaryOutput', 1),
objectName='boo1',
presentValue='inactive',
relinquishDefault='inactive',
minimumOnTime=5, # let it warm up
minimumOffTime=10, # let it cool off
)
if _debug: _log.debug(" - boo1: %r", boo1)
this_application.add_object(boo1)
# get the current date
today = Date().now()
# make a commandable date value object, add to the device
cdvo2 = CommandableDateValueObject(
objectIdentifier=('dateValue', 1), objectName='Commandable2',
dvo1 = LocalDateValueObjectCmd(
objectIdentifier=('dateValue', 1),
objectName='dvo1',
presentValue=today.value,
)
if _debug: _log.debug(" - cdvo2: %r", cdvo2)
this_application.add_object(cdvo2)
if _debug: _log.debug(" - dvo1: %r", dvo1)
this_application.add_object(dvo1)
if _debug: _log.debug("running")

230
samples/WhoIsIAm.py Executable file
View File

@ -0,0 +1,230 @@
#!/usr/bin/env python
"""
This application presents a 'console' prompt to the user asking for Who-Is and I-Am
commands which create the related APDUs, then lines up the coorresponding I-Am
for incoming traffic and prints out the contents.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run, enable_sleeping
from bacpypes.iocb import IOCB
from bacpypes.pdu import Address, GlobalBroadcast
from bacpypes.apdu import WhoIsRequest, IAmRequest
from bacpypes.basetypes import ServicesSupported
from bacpypes.errors import DecodingError
from bacpypes.app import BIPSimpleApplication
from bacpypes.service.device import LocalDeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
#
# WhoIsIAmApplication
#
@bacpypes_debugging
class WhoIsIAmApplication(BIPSimpleApplication):
def __init__(self, *args):
if _debug: WhoIsIAmApplication._debug("__init__ %r", args)
BIPSimpleApplication.__init__(self, *args)
# keep track of requests to line up responses
self._request = None
def request(self, apdu):
if _debug: WhoIsIAmApplication._debug("request %r", apdu)
# save a copy of the request
self._request = apdu
# forward it along
BIPSimpleApplication.request(self, apdu)
def confirmation(self, apdu):
if _debug: WhoIsIAmApplication._debug("confirmation %r", apdu)
# forward it along
BIPSimpleApplication.confirmation(self, apdu)
def indication(self, apdu):
if _debug: WhoIsIAmApplication._debug("indication %r", apdu)
if (isinstance(self._request, WhoIsRequest)) and (isinstance(apdu, IAmRequest)):
device_type, device_instance = apdu.iAmDeviceIdentifier
if device_type != 'device':
raise DecodingError("invalid object type")
if (self._request.deviceInstanceRangeLowLimit is not None) and \
(device_instance < self._request.deviceInstanceRangeLowLimit):
pass
elif (self._request.deviceInstanceRangeHighLimit is not None) and \
(device_instance > self._request.deviceInstanceRangeHighLimit):
pass
else:
# print out the contents
sys.stdout.write('pduSource = ' + repr(apdu.pduSource) + '\n')
sys.stdout.write('iAmDeviceIdentifier = ' + str(apdu.iAmDeviceIdentifier) + '\n')
sys.stdout.write('maxAPDULengthAccepted = ' + str(apdu.maxAPDULengthAccepted) + '\n')
sys.stdout.write('segmentationSupported = ' + str(apdu.segmentationSupported) + '\n')
sys.stdout.write('vendorID = ' + str(apdu.vendorID) + '\n')
sys.stdout.flush()
# forward it along
BIPSimpleApplication.indication(self, apdu)
#
# WhoIsIAmConsoleCmd
#
@bacpypes_debugging
class WhoIsIAmConsoleCmd(ConsoleCmd):
def do_whois(self, args):
"""whois [ <addr>] [ <lolimit> <hilimit> ]"""
args = args.split()
if _debug: WhoIsIAmConsoleCmd._debug("do_whois %r", args)
try:
# build a request
request = WhoIsRequest()
if (len(args) == 1) or (len(args) == 3):
request.pduDestination = Address(args[0])
del args[0]
else:
request.pduDestination = GlobalBroadcast()
if len(args) == 2:
request.deviceInstanceRangeLowLimit = int(args[0])
request.deviceInstanceRangeHighLimit = int(args[1])
if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request)
# make an IOCB
iocb = IOCB(request)
if _debug: WhoIsIAmConsoleCmd._debug(" - iocb: %r", iocb)
# give it to the application
this_application.request_io(iocb)
except Exception as err:
WhoIsIAmConsoleCmd._exception("exception: %r", err)
def do_iam(self, args):
"""iam"""
args = args.split()
if _debug: WhoIsIAmConsoleCmd._debug("do_iam %r", args)
try:
# build a request
request = IAmRequest()
request.pduDestination = GlobalBroadcast()
# set the parameters from the device object
request.iAmDeviceIdentifier = this_device.objectIdentifier
request.maxAPDULengthAccepted = this_device.maxApduLengthAccepted
request.segmentationSupported = this_device.segmentationSupported
request.vendorID = this_device.vendorIdentifier
if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request)
# make an IOCB
iocb = IOCB(request)
if _debug: WhoIsIAmConsoleCmd._debug(" - iocb: %r", iocb)
# give it to the application
this_application.request_io(iocb)
except Exception as err:
WhoIsIAmConsoleCmd._exception("exception: %r", err)
def do_rtn(self, args):
"""rtn <addr> <net> ... """
args = args.split()
if _debug: WhoIsIAmConsoleCmd._debug("do_rtn %r", args)
# safe to assume only one adapter
adapter = this_application.nsap.adapters[0]
if _debug: WhoIsIAmConsoleCmd._debug(" - adapter: %r", adapter)
# provide the address and a list of network numbers
router_address = Address(args[0])
network_list = [int(arg) for arg in args[1:]]
# pass along to the service access point
this_application.nsap.add_router_references(adapter, router_address, network_list)
#
# main
#
def main():
global this_device, this_application
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
if _debug: _log.debug(" - this_device: %r", this_device)
# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['writeProperty'] = 1
# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value
# make a simple application
this_application = WhoIsIAmApplication(
this_device, args.ini.address,
)
if _debug: _log.debug(" - this_application: %r", this_application)
# get the services supported
services_supported = this_application.get_services_supported()
if _debug: _log.debug(" - services_supported: %r", services_supported)
# let the device object know
this_device.protocolServicesSupported = services_supported.value
# make a console
this_console = WhoIsIAmConsoleCmd()
if _debug: _log.debug(" - this_console: %r", this_console)
# enable sleeping will help with threads
enable_sleeping()
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()