1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

release 0.18.5

This commit is contained in:
Joel Bender 2021-10-13 18:17:33 -04:00
commit 41104c2b56
28 changed files with 1063 additions and 245 deletions

View File

@ -18,7 +18,7 @@ if _sys.platform not in _supported_platforms:
# Project Metadata
#
__version__ = '0.18.4'
__version__ = '0.18.5'
__author__ = 'Joel Bender'
__email__ = 'joel@carrickbender.com'

View File

@ -2095,7 +2095,7 @@ class OctetStringValueObject(Object):
_object_supports_cov = True
properties = \
[ ReadableProperty('presentValue', CharacterString)
[ ReadableProperty('presentValue', OctetString)
, ReadableProperty('statusFlags', StatusFlags)
, OptionalProperty('eventState', EventState)
, OptionalProperty('reliability', Reliability)

View File

@ -458,7 +458,7 @@ class PulseConverterCriteria(COVIncrementCriteria):
# check for a new period
if new_value != 0:
self.cov_period_task = RecurringFunctionTask(new_value * 1000, self.cov_period_x)
self.cov_period_task = RecurringFunctionTask(new_value * 1000, self.send_cov_notifications)
self.cov_period_task.install_task()
if _debug: PulseConverterCriteria._debug(" - new task created and installed")

View File

@ -18,7 +18,7 @@ if _sys.platform not in _supported_platforms:
# Project Metadata
#
__version__ = '0.18.4'
__version__ = '0.18.5'
__author__ = 'Joel Bender'
__email__ = 'joel@carrickbender.com'

View File

@ -1241,7 +1241,7 @@ class GetEnrollmentSummaryRequestEventStateFilterType(Enumerated):
, 'active':4
}
class GetEnrollmentSummaryRequestPriorityFilterType:
class GetEnrollmentSummaryRequestPriorityFilterType(Sequence):
sequenceElements = \
[ Element('minPriority', Unsigned, 0)
, Element('maxPriority', Unsigned, 1)

View File

@ -482,8 +482,8 @@ class BinaryLightingPV(Enumerated):
{ 'off':0
, 'on':1
, 'warn':2
, 'warn-off':3
, 'warn-relinquish':4
, 'warnOff':3
, 'warnRelinquish':4
, 'stop':5
}
@ -1016,11 +1016,11 @@ class EventType(Enumerated):
class FaultType(Enumerated):
enumerations = \
{ 'none':0
, 'fault-characterstring':1
, 'fault-extended':2
, 'fault-life-safety':3
, 'fault-state':4
, 'fault-status-flags':5
, 'faultCharacterstring':1
, 'faultExtended':2
, 'faultLifeSafety':3
, 'faultState':4
, 'faultStatusFlags':5
}
class FileAccessMethod(Enumerated):
@ -1831,7 +1831,7 @@ class Reliability(Enumerated):
, 'lampFailure': 16
, 'activationFailure': 17
, 'renewDHCPFailure': 18
, 'renewFDRegistration-failure': 19
, 'renewFDRegistrationFailure': 19
, 'restartAutoNegotiationFailure': 20
, 'restartFailure': 21
, 'proprietaryCommandFailure': 22
@ -1918,12 +1918,12 @@ class VTClass(Enumerated):
vendor_range = (64, 65535)
enumerations = \
{ 'defaultTerminal':0
, 'ansiX3-64':1
, 'ansiX364':1
, 'decVt52':2
, 'decVt100':3
, 'decVt220':4
, 'hp-700-94':5
, 'ibm-3130':6
, 'hp70094':5
, 'ibm3130':6
}
class WriteStatus(Enumerated):
@ -3187,7 +3187,7 @@ class ValueSource(Choice):
choiceElements = \
[ Element('none', Null, 0)
, Element('object', DeviceObjectReference, 1)
, Element('Address', Address)
, Element('address', Address, 2)
]

View File

@ -1109,7 +1109,7 @@ class AnalogValueObject(Object):
@register_object_type
class AuditLogObject(Object):
objectType = 'analogLog'
objectType = 'auditLog'
properties = \
[ ReadableProperty('statusFlags', StatusFlags)
@ -1217,7 +1217,7 @@ class BinaryInputObject(Object):
@register_object_type
class BinaryLightingOutputObject(Object):
objectType = 'binaryLightingOutputObject'
objectType = 'binaryLightingOutput'
properties = \
[ WritableProperty('presentValue', BinaryLightingPV)
@ -2458,7 +2458,7 @@ class OctetStringValueObject(Object):
_object_supports_cov = True
properties = \
[ ReadableProperty('presentValue', CharacterString)
[ ReadableProperty('presentValue', OctetString)
, ReadableProperty('statusFlags', StatusFlags)
, OptionalProperty('eventState', EventState)
, OptionalProperty('reliability', Reliability)

View File

@ -1714,6 +1714,7 @@ class ObjectType(Enumerated):
, 'program':16
, 'pulseConverter':24
, 'schedule':17
, 'staging':60
, 'structuredView':29
, 'timePatternValue':49
, 'timeValue':50

View File

@ -456,7 +456,7 @@ class PulseConverterCriteria(COVIncrementCriteria):
# check for a new period
if new_value != 0:
self.cov_period_task = RecurringFunctionTask(new_value * 1000, self.cov_period_x)
self.cov_period_task = RecurringFunctionTask(new_value * 1000, self.send_cov_notifications)
self.cov_period_task.install_task()
if _debug: PulseConverterCriteria._debug(" - new task created and installed")

View File

@ -18,7 +18,7 @@ if _sys.platform not in _supported_platforms:
# Project Metadata
#
__version__ = '0.18.4'
__version__ = '0.18.5'
__author__ = 'Joel Bender'
__email__ = 'joel@carrickbender.com'

View File

@ -1241,7 +1241,7 @@ class GetEnrollmentSummaryRequestEventStateFilterType(Enumerated):
, 'active':4
}
class GetEnrollmentSummaryRequestPriorityFilterType:
class GetEnrollmentSummaryRequestPriorityFilterType(Sequence):
sequenceElements = \
[ Element('minPriority', Unsigned, 0)
, Element('maxPriority', Unsigned, 1)

View File

@ -1530,7 +1530,7 @@ class ApplicationServiceAccessPoint(ApplicationServiceElement, ServiceAccessPoin
xpdu.decode(apdu)
except Exception as err:
ApplicationServiceAccessPoint._exception("complex ack decoding error: %r", err)
return
xpdu = Error(errorClass=7, errorCode=57) # communication, invalidTag
elif isinstance(apdu, ErrorPDU):
atype = error_types.get(apdu.apduService)

View File

@ -482,8 +482,8 @@ class BinaryLightingPV(Enumerated):
{ 'off':0
, 'on':1
, 'warn':2
, 'warn-off':3
, 'warn-relinquish':4
, 'warnOff':3
, 'warnRelinquish':4
, 'stop':5
}
@ -1016,11 +1016,11 @@ class EventType(Enumerated):
class FaultType(Enumerated):
enumerations = \
{ 'none':0
, 'fault-characterstring':1
, 'fault-extended':2
, 'fault-life-safety':3
, 'fault-state':4
, 'fault-status-flags':5
, 'faultCharacterstring':1
, 'faultExtended':2
, 'faultLifeSafety':3
, 'faultState':4
, 'faultStatusFlags':5
}
class FileAccessMethod(Enumerated):
@ -1831,7 +1831,7 @@ class Reliability(Enumerated):
, 'lampFailure': 16
, 'activationFailure': 17
, 'renewDHCPFailure': 18
, 'renewFDRegistration-failure': 19
, 'renewFDRegistrationFailure': 19
, 'restartAutoNegotiationFailure': 20
, 'restartFailure': 21
, 'proprietaryCommandFailure': 22
@ -1918,12 +1918,12 @@ class VTClass(Enumerated):
vendor_range = (64, 65535)
enumerations = \
{ 'defaultTerminal':0
, 'ansiX3-64':1
, 'ansiX364':1
, 'decVt52':2
, 'decVt100':3
, 'decVt220':4
, 'hp-700-94':5
, 'ibm-3130':6
, 'hp70094':5
, 'ibm3130':6
}
class WriteStatus(Enumerated):
@ -3187,7 +3187,7 @@ class ValueSource(Choice):
choiceElements = \
[ Element('none', Null, 0)
, Element('object', DeviceObjectReference, 1)
, Element('Address', Address)
, Element('address', Address, 2)
]

View File

@ -5,21 +5,62 @@ import re
from ..debugging import bacpypes_debugging, ModuleLogger
from ..task import OneShotTask
from ..primitivedata import Atomic, Null, BitString, CharacterString, \
Date, Integer, Double, Enumerated, OctetString, Real, Time, Unsigned
from ..basetypes import PropertyIdentifier, DateTime, NameValue, BinaryPV, \
ChannelValue, DoorValue, PriorityValue, PriorityArray
from ..primitivedata import (
Atomic,
Null,
BitString,
CharacterString,
Date,
Integer,
Double,
Enumerated,
OctetString,
Real,
Time,
Unsigned,
ObjectIdentifier,
)
from ..basetypes import (
PropertyIdentifier,
DateTime,
NameValue,
BinaryPV,
ChannelValue,
DoorValue,
PriorityValue,
PriorityArray,
)
from ..constructeddata import Array, ArrayOf, SequenceOf
from ..errors import ExecutionError
from ..object import Property, ReadableProperty, WritableProperty, OptionalProperty, Object, \
AccessDoorObject, AnalogOutputObject, AnalogValueObject, \
BinaryOutputObject, BinaryValueObject, BitStringValueObject, CharacterStringValueObject, \
DateValueObject, DatePatternValueObject, DateTimePatternValueObject, \
DateTimeValueObject, IntegerValueObject, \
LargeAnalogValueObject, LightingOutputObject, MultiStateOutputObject, \
MultiStateValueObject, OctetStringValueObject, PositiveIntegerValueObject, \
TimeValueObject, TimePatternValueObject, ChannelObject
from ..object import (
Property,
ReadableProperty,
WritableProperty,
OptionalProperty,
Object,
AccessDoorObject,
AnalogOutputObject,
AnalogValueObject,
BinaryOutputObject,
BinaryValueObject,
BitStringValueObject,
CharacterStringValueObject,
DateValueObject,
DatePatternValueObject,
DateTimePatternValueObject,
DateTimeValueObject,
IntegerValueObject,
LargeAnalogValueObject,
LightingOutputObject,
MultiStateOutputObject,
MultiStateValueObject,
OctetStringValueObject,
PositiveIntegerValueObject,
TimeValueObject,
TimePatternValueObject,
ChannelObject,
)
# some debugging
@ -33,22 +74,35 @@ ArrayOfPropertyIdentifier = ArrayOf(PropertyIdentifier)
# CurrentPropertyList
#
@bacpypes_debugging
class CurrentPropertyList(Property):
def __init__(self):
if _debug: CurrentPropertyList._debug("__init__")
Property.__init__(self, 'propertyList', ArrayOfPropertyIdentifier, default=None, optional=True, mutable=False)
if _debug:
CurrentPropertyList._debug("__init__")
Property.__init__(
self,
"propertyList",
ArrayOfPropertyIdentifier,
default=None,
optional=True,
mutable=False,
)
def ReadProperty(self, obj, arrayIndex=None):
if _debug: CurrentPropertyList._debug("ReadProperty %r %r", obj, arrayIndex)
if _debug:
CurrentPropertyList._debug("ReadProperty %r %r", obj, arrayIndex)
# make a list of the properties that have values
property_list = [k for k, v in obj._values.items()
property_list = [
k
for k, v in obj._values.items()
if v is not None
and k not in ('objectName', 'objectType', 'objectIdentifier', 'propertyList')
]
if _debug: CurrentPropertyList._debug(" - property_list: %r", property_list)
and k
not in ("objectName", "objectType", "objectIdentifier", "propertyList")
]
if _debug:
CurrentPropertyList._debug(" - property_list: %r", property_list)
# sort the list so it's stable
property_list.sort()
@ -63,22 +117,160 @@ class CurrentPropertyList(Property):
# asking for an index
if arrayIndex > len(property_list):
raise ExecutionError(errorClass='property', errorCode='invalidArrayIndex')
raise ExecutionError(errorClass="property", errorCode="invalidArrayIndex")
return property_list[arrayIndex - 1]
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
raise ExecutionError(errorClass="property", errorCode="writeAccessDenied")
#
# CurrentPropertyListMixIn
#
@bacpypes_debugging
class CurrentPropertyListMixIn(Object):
properties = [
CurrentPropertyList(),
]
]
#
# WriteableObjectName
#
@bacpypes_debugging
class WriteableObjectName(Property):
def __init__(self):
if _debug:
WriteableObjectName._debug("__init__")
Property.__init__(
self, "objectName", CharacterString, optional=False, mutable=True
)
def WriteProperty(self, obj, new_name, arrayIndex=None, priority=None, direct=False):
if _debug:
WriteableObjectName._debug(
"WriteProperty %r %r arrayIndex=%r priority=%r direct=%r",
obj,
new_name,
arrayIndex,
priority,
direct,
)
# check for being bound to an application
old_name = None
if getattr(obj, "_app", None):
if _debug:
WriteableObjectName._debug(" - more error checking")
old_name = obj._values["objectName"]
if new_name == old_name:
return
if new_name in obj._app.objectName:
raise ExecutionError(errorClass="property", errorCode="duplicateName")
# pass it along
Property.WriteProperty(self, obj, new_name, arrayIndex, priority, direct)
if old_name is not None:
if _debug:
WriteableObjectName._debug(" - update the application")
del obj._app.objectName[old_name]
obj._app.objectName[new_name] = obj
#
# WriteableObjectNameMixIn
#
@bacpypes_debugging
class WriteableObjectNameMixIn(Object):
properties = [
WriteableObjectName(),
]
#
# WriteableObjectIdentifier
#
@bacpypes_debugging
class WriteableObjectIdentifier(Property):
def __init__(self):
if _debug:
WriteableObjectIdentifier._debug("__init__")
Property.__init__(
self, "objectIdentifier", ObjectIdentifier, optional=False, mutable=True
)
def WriteProperty(
self, obj, new_identifier, arrayIndex=None, priority=None, direct=False
):
if _debug:
WriteableObjectIdentifier._debug(
"WriteProperty %r %r arrayIndex=%r priority=%r direct=%r",
obj,
new_identifier,
arrayIndex,
priority,
direct,
)
# check for being bound to an application
old_identifier = None
if getattr(obj, "_app", None):
if _debug:
WriteableObjectIdentifier._debug(" - more error checking")
old_identifier = obj._values["objectIdentifier"]
if new_identifier == old_identifier:
return
if new_identifier in obj._app.objectIdentifier:
raise ExecutionError(
errorClass="property", errorCode="duplicateObjectId"
)
if new_identifier[0] != old_identifier[0]:
raise ExecutionError(
errorClass="property", errorCode="valueOutOfRange"
)
# pass it along
Property.WriteProperty(self, obj, new_identifier, arrayIndex, priority, direct)
if old_identifier is not None:
if _debug:
WriteableObjectIdentifier._debug(" - update the application")
del obj._app.objectIdentifier[old_identifier]
obj._app.objectIdentifier[new_identifier] = obj
# update the object list of the local device object
app_object_list = obj._app.localDevice.objectList
indx = app_object_list.index(old_identifier)
del app_object_list[indx]
app_object_list.append(new_identifier)
#
# WriteableObjectIdentifierMixIn
#
@bacpypes_debugging
class WriteableObjectIdentifierMixIn(Object):
properties = [
WriteableObjectIdentifier(),
]
#
# Turtle Reference Patterns
@ -145,6 +337,7 @@ blank_node_re = re.compile(u"^" + BLANK_NODE_LABEL + u"$", re.UNICODE)
# see https://tools.ietf.org/html/bcp47#section-2.1 for better syntax
language_tag_re = re.compile(u"^[A-Za-z0-9-]+$", re.UNICODE)
class IRI:
# regex from RFC 3986
_e = r"^(?:([^:/?#]+):)?(?://([^/?#]*))?([^?#]*)(?:\?([^#]*))?(?:#(.*))?"
@ -316,7 +509,8 @@ class TagSet:
def index(self, name, value=None):
"""Find the first name with dictionary semantics or (name, value) with
list semantics."""
if _debug: TagSet._debug("index %r %r", name, value)
if _debug:
TagSet._debug("index %r %r", name, value)
# if this is a NameValue rip it apart first
if isinstance(name, NameValue):
@ -348,7 +542,8 @@ class TagSet:
def add(self, name, value=None):
"""Add a (name, value) with mutable set semantics."""
if _debug: TagSet._debug("add %r %r", name, value)
if _debug:
TagSet._debug("add %r %r", name, value)
# provide a Null if you are adding a is-a relationship, wrap strings
# to be friendly
@ -367,31 +562,31 @@ class TagSet:
if not isinstance(value, CharacterString):
raise TypeError("value must be an string")
v = self.get('@base')
v = self.get("@base")
if v and v.value == value.value:
pass
else:
raise ValueError("@base exists")
# if not iriref_re.match(value.value):
# raise ValueError("value must be an IRI")
# if not iriref_re.match(value.value):
# raise ValueError("value must be an IRI")
elif name == "@id":
if not isinstance(value, CharacterString):
raise TypeError("value must be an string")
v = self.get('@id')
v = self.get("@id")
if v and v.value == value.value:
pass
else:
raise ValueError("@id exists")
# # check the patterns
# for pattern in (blank_node_re, prefixed_name_re, local_name_re, iriref_re):
# if pattern.match(value.value):
# break
# else:
# raise ValueError("invalid value for @id")
# # check the patterns
# for pattern in (blank_node_re, prefixed_name_re, local_name_re, iriref_re):
# if pattern.match(value.value):
# break
# else:
# raise ValueError("invalid value for @id")
elif name == "@language":
if not isinstance(value, CharacterString):
@ -410,7 +605,7 @@ class TagSet:
if not isinstance(value, CharacterString):
raise TypeError("value must be an string")
v = self.get('@vocab')
v = self.get("@vocab")
if v and v.value == value.value:
pass
else:
@ -429,16 +624,16 @@ class TagSet:
else:
raise ValueError("prefix exists: %r" % (name,))
# if not iriref_re.match(value.value):
# raise ValueError("value must be an IRI")
# if not iriref_re.match(value.value):
# raise ValueError("value must be an IRI")
else:
# # check the patterns
# for pattern in (prefixed_name_re, local_name_re, iriref_re):
# if pattern.match(name):
# break
# else:
# raise ValueError("invalid name")
# # check the patterns
# for pattern in (prefixed_name_re, local_name_re, iriref_re):
# if pattern.match(name):
# break
# else:
# raise ValueError("invalid name")
pass
# check the value
@ -453,7 +648,8 @@ class TagSet:
def discard(self, name, value=None):
"""Discard a (name, value) with mutable set semantics."""
if _debug: TagSet._debug("discard %r %r", name, value)
if _debug:
TagSet._debug("discard %r %r", name, value)
# provide a Null if you are adding a is-a relationship, wrap strings
# to be friendly
@ -467,7 +663,8 @@ class TagSet:
def append(self, name_value):
"""Override the append operation for mutable set semantics."""
if _debug: TagSet._debug("append %r", name_value)
if _debug:
TagSet._debug("append %r", name_value)
if not isinstance(name_value, NameValue):
raise TypeError
@ -478,7 +675,8 @@ class TagSet:
def get(self, key, default=None):
"""Get the value of a key or default value if the key was not found,
dictionary semantics."""
if _debug: TagSet._debug("get %r %r", key, default)
if _debug:
TagSet._debug("get %r %r", key, default)
try:
if not isinstance(key, str):
@ -491,7 +689,8 @@ class TagSet:
"""If item is an integer, return the value of the NameValue element
with array/sequence semantics. If the item is a string, return the
value with dictionary semantics."""
if _debug: TagSet._debug("__getitem__ %r", item)
if _debug:
TagSet._debug("__getitem__ %r", item)
# integers imply index
if isinstance(item, int):
@ -503,7 +702,8 @@ class TagSet:
"""If item is an integer, change the value of the NameValue element
with array/sequence semantics. If the item is a string, change the
current value or add a new value with dictionary semantics."""
if _debug: TagSet._debug("__setitem__ %r %r", item, value)
if _debug:
TagSet._debug("__setitem__ %r %r", item, value)
# integers imply index
if isinstance(item, int):
@ -537,7 +737,8 @@ class TagSet:
"""If the item is a integer, delete the element with array semantics, or
if the item is a string, delete the element with dictionary semantics,
or (name, value) with mutable set semantics."""
if _debug: TagSet._debug("__delitem__ %r", item)
if _debug:
TagSet._debug("__delitem__ %r", item)
# integers imply index
if isinstance(item, int):
@ -552,7 +753,8 @@ class TagSet:
return super(TagSet, self).__delitem__(indx)
def __contains__(self, key):
if _debug: TagSet._debug("__contains__ %r", key)
if _debug:
TagSet._debug("__contains__ %r", key)
try:
if isinstance(key, tuple):
@ -576,14 +778,18 @@ class SequenceOfNameValue(TagSet, SequenceOf(NameValue)):
class TagsMixIn(Object):
properties = \
[ OptionalProperty('tags', ArrayOfNameValue)
]
properties = [OptionalProperty("tags", ArrayOfNameValue)]
@bacpypes_debugging
def Commandable(datatype, presentValue='presentValue', priorityArray='priorityArray', relinquishDefault='relinquishDefault'):
if _debug: Commandable._debug("Commandable %r ...", datatype)
def Commandable(
datatype,
presentValue="presentValue",
priorityArray="priorityArray",
relinquishDefault="relinquishDefault",
):
if _debug:
Commandable._debug("Commandable %r ...", datatype)
class _Commando(object):
@ -591,69 +797,100 @@ def Commandable(datatype, presentValue='presentValue', priorityArray='priorityAr
WritableProperty(presentValue, datatype),
ReadableProperty(priorityArray, PriorityArray),
ReadableProperty(relinquishDefault, datatype),
]
]
_pv_choice = None
def __init__(self, **kwargs):
if _debug: Commandable._debug("_Commando.__init__(%r, %r, %r, %r) %r", datatype, presentValue, priorityArray, relinquishDefault, kwargs)
if _debug:
Commandable._debug(
"_Commando.__init__(%r, %r, %r, %r) %r",
datatype,
presentValue,
priorityArray,
relinquishDefault,
kwargs,
)
super(_Commando, self).__init__(**kwargs)
# build a default value in case one is needed
default_value = datatype().value
if issubclass(datatype, Enumerated):
default_value = datatype._xlate_table[default_value]
if _debug: Commandable._debug(" - default_value: %r", default_value)
if _debug:
Commandable._debug(" - default_value: %r", default_value)
# see if a present value was provided
if (presentValue not in kwargs):
if presentValue not in kwargs:
setattr(self, presentValue, default_value)
# see if a priority array was provided
if (priorityArray not in kwargs):
if priorityArray not in kwargs:
setattr(self, priorityArray, PriorityArray())
# see if a present value was provided
if (relinquishDefault not in kwargs):
if relinquishDefault not in kwargs:
setattr(self, relinquishDefault, default_value)
def _highest_priority_value(self):
if _debug: Commandable._debug("_highest_priority_value")
if _debug:
Commandable._debug("_highest_priority_value")
priority_array = getattr(self, priorityArray)
for i in range(1, 17):
priority_value = priority_array[i]
if priority_value.null is None:
if _debug: Commandable._debug(" - found at index: %r", i)
if _debug:
Commandable._debug(" - found at index: %r", i)
value = getattr(priority_value, _Commando._pv_choice)
value_source = "###"
if issubclass(datatype, Enumerated):
value = datatype._xlate_table[value]
if _debug: Commandable._debug(" - remapped enumeration: %r", value)
if _debug:
Commandable._debug(" - remapped enumeration: %r", value)
break
else:
value = getattr(self, relinquishDefault)
value_source = None
if _debug: Commandable._debug(" - value, value_source: %r, %r", value, value_source)
if _debug:
Commandable._debug(
" - value, value_source: %r, %r", value, value_source
)
# return what you found
return value, value_source
def WriteProperty(self, property, value, arrayIndex=None, priority=None, direct=False):
if _debug: Commandable._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", property, value, arrayIndex, priority, direct)
def WriteProperty(
self, property, value, arrayIndex=None, priority=None, direct=False
):
if _debug:
Commandable._debug(
"WriteProperty %r %r arrayIndex=%r priority=%r direct=%r",
property,
value,
arrayIndex,
priority,
direct,
)
# when writing to the presentValue with a priority
if (property == presentValue):
if _debug: Commandable._debug(" - writing to %s, priority %r", presentValue, priority)
if property == presentValue:
if _debug:
Commandable._debug(
" - writing to %s, priority %r", presentValue, priority
)
# default (lowest) priority
if priority is None:
priority = 16
if _debug: Commandable._debug(" - translate to priority array, index %d", priority)
if _debug:
Commandable._debug(
" - translate to priority array, index %d", priority
)
# translate to updating the priority array
property = priorityArray
@ -661,39 +898,58 @@ def Commandable(datatype, presentValue='presentValue', priorityArray='priorityAr
priority = None
# update the priority array entry
if (property == priorityArray):
if (arrayIndex is None):
if _debug: Commandable._debug(" - writing entire %s", priorityArray)
if property == priorityArray:
if arrayIndex is None:
if _debug:
Commandable._debug(" - writing entire %s", priorityArray)
# pass along the request
super(_Commando, self).WriteProperty(
property, value,
arrayIndex=arrayIndex, priority=priority, direct=direct,
)
property,
value,
arrayIndex=arrayIndex,
priority=priority,
direct=direct,
)
else:
if _debug: Commandable._debug(" - writing to %s, array index %d", priorityArray, arrayIndex)
if _debug:
Commandable._debug(
" - writing to %s, array index %d",
priorityArray,
arrayIndex,
)
# check the bounds
if arrayIndex == 0:
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
raise ExecutionError(
errorClass="property", errorCode="writeAccessDenied"
)
if (arrayIndex < 1) or (arrayIndex > 16):
raise ExecutionError(errorClass='property', errorCode='invalidArrayIndex')
raise ExecutionError(
errorClass="property", errorCode="invalidArrayIndex"
)
# update the specific priorty value element
priority_value = getattr(self, priorityArray)[arrayIndex]
if _debug: Commandable._debug(" - priority_value: %r", priority_value)
if _debug:
Commandable._debug(" - priority_value: %r", priority_value)
# the null or the choice has to be set, the other clear
if value == ():
if _debug: Commandable._debug(" - write a null")
if _debug:
Commandable._debug(" - write a null")
priority_value.null = value
setattr(priority_value, _Commando._pv_choice, None)
else:
if _debug: Commandable._debug(" - write a value")
if _debug:
Commandable._debug(" - write a value")
if issubclass(datatype, Enumerated):
value = datatype._xlate_table[value]
if _debug: Commandable._debug(" - remapped enumeration: %r", value)
if _debug:
Commandable._debug(
" - remapped enumeration: %r", value
)
priority_value.null = None
setattr(priority_value, _Commando._pv_choice, value)
@ -704,7 +960,8 @@ def Commandable(datatype, presentValue='presentValue', priorityArray='priorityAr
# compare with the current value
current_value = getattr(self, presentValue)
if value == current_value:
if _debug: Commandable._debug(" - no present value change")
if _debug:
Commandable._debug(" - no present value change")
return
# turn this into a present value change
@ -712,12 +969,22 @@ def Commandable(datatype, presentValue='presentValue', priorityArray='priorityAr
arrayIndex = priority = None
# allow the request to pass through
if _debug: Commandable._debug(" - super: %r %r arrayIndex=%r priority=%r", property, value, arrayIndex, priority)
if _debug:
Commandable._debug(
" - super: %r %r arrayIndex=%r priority=%r",
property,
value,
arrayIndex,
priority,
)
super(_Commando, self).WriteProperty(
property, value,
arrayIndex=arrayIndex, priority=priority, direct=direct,
)
property,
value,
arrayIndex=arrayIndex,
priority=priority,
direct=direct,
)
# look up a matching priority value choice
for element in PriorityValue.choiceElements:
@ -725,160 +992,217 @@ def Commandable(datatype, presentValue='presentValue', priorityArray='priorityAr
_Commando._pv_choice = element.name
break
else:
_Commando._pv_choice = 'constructedValue'
if _debug: Commandable._debug(" - _pv_choice: %r", _Commando._pv_choice)
_Commando._pv_choice = "constructedValue"
if _debug:
Commandable._debug(" - _pv_choice: %r", _Commando._pv_choice)
# return the class
return _Commando
#
# MinOnOffTask
#
@bacpypes_debugging
class MinOnOffTask(OneShotTask):
def __init__(self, binary_obj):
if _debug: MinOnOffTask._debug("__init__ %s", repr(binary_obj))
if _debug:
MinOnOffTask._debug("__init__ %s", repr(binary_obj))
OneShotTask.__init__(self)
# save a reference to the object
self.binary_obj = binary_obj
# listen for changes to the present value
self.binary_obj._property_monitors['presentValue'].append(self.present_value_change)
self.binary_obj._property_monitors["presentValue"].append(
self.present_value_change
)
def present_value_change(self, old_value, new_value):
if _debug: MinOnOffTask._debug("present_value_change %r %r", old_value, new_value)
if _debug:
MinOnOffTask._debug("present_value_change %r %r", old_value, new_value)
# if there's no value change, skip all this
if old_value == new_value:
if _debug: MinOnOffTask._debug(" - no state change")
if _debug:
MinOnOffTask._debug(" - no state change")
return
# get the minimum on/off time
if new_value == 'inactive':
task_delay = getattr(self.binary_obj, 'minimumOnTime') or 0
if _debug: MinOnOffTask._debug(" - minimum on: %r", task_delay)
elif new_value == 'active':
task_delay = getattr(self.binary_obj, 'minimumOffTime') or 0
if _debug: MinOnOffTask._debug(" - minimum off: %r", task_delay)
if new_value == "inactive":
task_delay = getattr(self.binary_obj, "minimumOnTime") or 0
if _debug:
MinOnOffTask._debug(" - minimum on: %r", task_delay)
elif new_value == "active":
task_delay = getattr(self.binary_obj, "minimumOffTime") or 0
if _debug:
MinOnOffTask._debug(" - minimum off: %r", task_delay)
else:
raise ValueError("unrecognized present value for %r: %r" % (self.binary_obj.objectIdentifier, new_value))
raise ValueError(
"unrecognized present value for %r: %r"
% (self.binary_obj.objectIdentifier, new_value)
)
# if there's no delay, don't bother
if not task_delay:
if _debug: MinOnOffTask._debug(" - no delay")
if _debug:
MinOnOffTask._debug(" - no delay")
return
# set the value at priority 6
self.binary_obj.WriteProperty('presentValue', new_value, priority=6)
self.binary_obj.WriteProperty("presentValue", new_value, priority=6)
# install this to run, if there is a delay
self.install_task(delta=task_delay)
def process_task(self):
if _debug: MinOnOffTask._debug("process_task(%s)", self.binary_obj.objectName)
if _debug:
MinOnOffTask._debug("process_task(%s)", self.binary_obj.objectName)
# clear the value at priority 6
self.binary_obj.WriteProperty('presentValue', (), priority=6)
self.binary_obj.WriteProperty("presentValue", (), priority=6)
#
# MinOnOff
#
@bacpypes_debugging
class MinOnOff(object):
def __init__(self, **kwargs):
if _debug: MinOnOff._debug("__init__ ...")
if _debug:
MinOnOff._debug("__init__ ...")
super(MinOnOff, self).__init__(**kwargs)
# create the timer task
self._min_on_off_task = MinOnOffTask(self)
#
# Commandable Standard Objects
#
class AccessDoorCmdObject(Commandable(DoorValue), AccessDoorObject):
pass
class AnalogOutputCmdObject(Commandable(Real), AnalogOutputObject):
pass
class AnalogValueCmdObject(Commandable(Real), AnalogValueObject):
pass
### class BinaryLightingOutputCmdObject(Commandable(Real), BinaryLightingOutputObject):
### pass
class BinaryOutputCmdObject(Commandable(BinaryPV), MinOnOff, BinaryOutputObject):
pass
class BinaryValueCmdObject(Commandable(BinaryPV), MinOnOff, BinaryValueObject):
pass
class BitStringValueCmdObject(Commandable(BitString), BitStringValueObject):
pass
class CharacterStringValueCmdObject(Commandable(CharacterString), CharacterStringValueObject):
class CharacterStringValueCmdObject(
Commandable(CharacterString), CharacterStringValueObject
):
pass
class DateValueCmdObject(Commandable(Date), DateValueObject):
pass
class DatePatternValueCmdObject(Commandable(Date), DatePatternValueObject):
pass
class DateTimeValueCmdObject(Commandable(DateTime), DateTimeValueObject):
pass
class DateTimePatternValueCmdObject(Commandable(DateTime), DateTimePatternValueObject):
pass
class IntegerValueCmdObject(Commandable(Integer), IntegerValueObject):
pass
class LargeAnalogValueCmdObject(Commandable(Double), LargeAnalogValueObject):
pass
class LightingOutputCmdObject(Commandable(Real), LightingOutputObject):
pass
class MultiStateOutputCmdObject(Commandable(Unsigned), MultiStateOutputObject):
pass
class MultiStateValueCmdObject(Commandable(Unsigned), MultiStateValueObject):
pass
class OctetStringValueCmdObject(Commandable(OctetString), OctetStringValueObject):
pass
class PositiveIntegerValueCmdObject(Commandable(Unsigned), PositiveIntegerValueObject):
pass
class TimeValueCmdObject(Commandable(Time), TimeValueObject):
pass
class TimePatternValueCmdObject(Commandable(Time), TimePatternValueObject):
pass
@bacpypes_debugging
class ChannelValueProperty(Property):
def __init__(self):
if _debug: ChannelValueProperty._debug("__init__")
Property.__init__(self, 'presentValue', ChannelValue, default=None, optional=False, mutable=True)
if _debug:
ChannelValueProperty._debug("__init__")
Property.__init__(
self,
"presentValue",
ChannelValue,
default=None,
optional=False,
mutable=True,
)
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug: ChannelValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
if _debug:
ChannelValueProperty._debug(
"WriteProperty %r %r arrayIndex=%r priority=%r direct=%r",
obj,
value,
arrayIndex,
priority,
direct,
)
### Clause 12.53.5, page 487
raise NotImplementedError()
class ChannelCmdObject(ChannelObject):
properties = [
ChannelValueProperty(),
]
]

View File

@ -216,6 +216,7 @@ class Property(object):
# see if it can be changed
if not self.mutable:
if _debug: Property._debug(" - property is immutable")
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
# if changing the length of the array, the value is unsigned
@ -1109,7 +1110,7 @@ class AnalogValueObject(Object):
@register_object_type
class AuditLogObject(Object):
objectType = 'analogLog'
objectType = 'auditLog'
properties = \
[ ReadableProperty('statusFlags', StatusFlags)
@ -1217,7 +1218,7 @@ class BinaryInputObject(Object):
@register_object_type
class BinaryLightingOutputObject(Object):
objectType = 'binaryLightingOutputObject'
objectType = 'binaryLightingOutput'
properties = \
[ WritableProperty('presentValue', BinaryLightingPV)
@ -2458,7 +2459,7 @@ class OctetStringValueObject(Object):
_object_supports_cov = True
properties = \
[ ReadableProperty('presentValue', CharacterString)
[ ReadableProperty('presentValue', OctetString)
, ReadableProperty('statusFlags', StatusFlags)
, OptionalProperty('eventState', EventState)
, OptionalProperty('reliability', Reliability)

View File

@ -1696,6 +1696,7 @@ class ObjectType(Enumerated):
, 'program':16
, 'pulseConverter':24
, 'schedule':17
, 'staging':60
, 'structuredView':29
, 'timePatternValue':49
, 'timeValue':50

View File

@ -90,8 +90,8 @@ class Subscription(OneShotTask, DebugContents):
'lifetime',
)
def __init__(self, obj_ref, client_addr, proc_id, obj_id, confirmed, lifetime=0):
if _debug: Subscription._debug("__init__ %r %r %r %r %r %r", obj_ref, client_addr, proc_id, obj_id, confirmed, lifetime)
def __init__(self, obj_ref, client_addr, proc_id, obj_id, confirmed, lifetime, cov_inc):
if _debug: Subscription._debug("__init__ %r %r %r %r %r %r %r", obj_ref, client_addr, proc_id, obj_id, confirmed, lifetime, cov_inc)
OneShotTask.__init__(self)
# save the reference to the related object
@ -102,10 +102,12 @@ class Subscription(OneShotTask, DebugContents):
self.proc_id = proc_id
self.obj_id = obj_id
self.confirmed = confirmed
self.lifetime = lifetime
self.covIncrement = cov_inc
# if lifetime is none, consider permanent subscription (0)
self.lifetime = 0 if lifetime is None else lifetime
self.install_task(delta=self.lifetime)
# if lifetime is zero this is a permanent subscription
if lifetime > 0:
self.install_task(delta=self.lifetime)
def cancel_subscription(self):
if _debug: Subscription._debug("cancel_subscription")
@ -455,7 +457,7 @@ class PulseConverterCriteria(COVIncrementCriteria):
# check for a new period
if new_value != 0:
self.cov_period_task = RecurringFunctionTask(new_value * 1000, self.cov_period_x)
self.cov_period_task = RecurringFunctionTask(new_value * 1000, self.send_cov_notifications)
self.cov_period_task.install_task()
if _debug: PulseConverterCriteria._debug(" - new task created and installed")
@ -737,7 +739,7 @@ class ChangeOfValueServices(Capability):
if _debug: ChangeOfValueServices._debug(" - create a subscription")
# make a subscription
cov = Subscription(obj, client_addr, proc_id, obj_id, confirmed, lifetime)
cov = Subscription(obj, client_addr, proc_id, obj_id, confirmed, lifetime, None)
if _debug: ChangeOfValueServices._debug(" - cov: %r", cov)
# add it to our subscriptions lists
@ -755,3 +757,82 @@ class ChangeOfValueServices(Capability):
if _debug: ChangeOfValueServices._debug(" - send a notification")
deferred(cov_detection.send_cov_notifications, cov)
def do_SubscribeCOVPropertyRequest(self, apdu):
if _debug: ChangeOfValueServices._debug("do_SubscribeCOVPropertyRequest %r", apdu)
# extract the pieces
client_addr = apdu.pduSource
proc_id = apdu.subscriberProcessIdentifier
obj_id = apdu.monitoredObjectIdentifier
confirmed = apdu.issueConfirmedNotifications
lifetime = apdu.lifetime
prop_id = apdu.monitoredPropertyIdentifier
cov_inc = apdu.covIncrement
# request is to cancel the subscription
cancel_subscription = (confirmed is None) and (lifetime is None)
# find the object
obj = self.get_object_id(obj_id)
if _debug: ChangeOfValueServices._debug(" - object: %r", obj)
if not obj:
raise ExecutionError(errorClass='object', errorCode='unknownObject')
# check to see if the object supports COV
if not obj._object_supports_cov:
raise ExecutionError(errorClass='services', errorCode='covSubscriptionFailed')
# look for an algorithm already associated with this object
cov_detection = self.cov_detections.get(obj, None)
# if there isn't one, make one and associate it with the object
if not cov_detection:
# look for an associated class and if it's not there it's not supported
criteria_class = criteria_type_map.get(obj_id[0], None)
if not criteria_class:
raise ExecutionError(errorClass='services', errorCode='covSubscriptionFailed')
# make one of these and bind it to the object
cov_detection = criteria_class(obj)
# keep track of it for other subscriptions
self.cov_detections[obj] = cov_detection
if _debug: ChangeOfValueServices._debug(" - cov_detection: %r", cov_detection)
# can a match be found?
cov = cov_detection.cov_subscriptions.find(client_addr, proc_id, obj_id)
if _debug: ChangeOfValueServices._debug(" - cov: %r", cov)
# if a match was found, update the subscription
if cov:
if cancel_subscription:
if _debug: ChangeOfValueServices._debug(" - cancel the subscription")
self.cancel_subscription(cov)
else:
if _debug: ChangeOfValueServices._debug(" - renew the subscription")
cov.renew_subscription(lifetime)
else:
if cancel_subscription:
if _debug: ChangeOfValueServices._debug(" - cancel a subscription that doesn't exist")
else:
if _debug: ChangeOfValueServices._debug(" - create a subscription")
# make a subscription
cov = Subscription(obj, client_addr, proc_id, obj_id,
confirmed, lifetime, cov_inc)
if _debug: ChangeOfValueServices._debug(" - cov: %r", cov)
# add it to our subscriptions lists
self.add_subscription(cov)
# success
response = SimpleAckPDU(context=apdu)
# return the result
self.response(response)
# if the subscription is not being canceled, it is new or renewed,
# so send it a notification when you get a chance.
if not cancel_subscription:
if _debug: ChangeOfValueServices._debug(" - send a notification")
deferred(cov_detection.send_cov_notifications, cov)

View File

@ -15,11 +15,12 @@ from bacpypes.core import run, deferred, enable_sleeping
from bacpypes.iocb import IOCB
from bacpypes.pdu import Address
from bacpypes.apdu import SubscribeCOVRequest, SimpleAckPDU, RejectPDU, AbortPDU
from bacpypes.apdu import SubscribeCOVRequest, SimpleAckPDU, RejectPDU, AbortPDU, SubscribeCOVPropertyRequest
from bacpypes.primitivedata import ObjectIdentifier
from bacpypes.app import BIPSimpleApplication
from bacpypes.local.device import LocalDeviceObject
from bacpypes.basetypes import PropertyReference
# some debugging
_debug = 0
@ -145,8 +146,14 @@ class SubscribeCOVConsoleCmd(ConsoleCmd):
lifetime = None
# build a request
request = SubscribeCOVRequest(
subscriberProcessIdentifier=proc_id, monitoredObjectIdentifier=obj_id
# request = SubscribeCOVRequest(
# subscriberProcessIdentifier=proc_id, monitoredObjectIdentifier=obj_id
# )
request = SubscribeCOVPropertyRequest(
subscriberProcessIdentifier=proc_id,
monitoredObjectIdentifier=obj_id,
monitoredPropertyIdentifier=PropertyReference(propertyIdentifier=85),
covIncrement=2
)
request.pduDestination = Address(addr)

View File

@ -222,7 +222,7 @@ def main():
)
# add an argument for interval
parser.add_argument('net2', type=int,
parser.add_argument('net2', type=int, nargs='+',
help='network number of second network',
)
@ -252,25 +252,10 @@ def main():
local_address = Address(args.addr1)
local_network = args.net1
vlan_network = args.net2
# create the VLAN router, bind it to the local network
router = VLANRouter(local_address, local_network)
# create a VLAN
vlan = Network(broadcast_address=LocalBroadcast())
# create a node for the router, address 1 on the VLAN
router_addr = Address(1)
router_node = Node(router_addr)
vlan.add_node(router_node)
# bind the router stack to the vlan network through this node
router.nsap.bind(router_node, vlan_network, router_addr)
# send network topology
deferred(router.nse.i_am_router_to_network)
# add the dynamic property list
if args.plist:
RandomAnalogValueObject.properties.append(CurrentPropertyList())
@ -278,40 +263,55 @@ def main():
# register it now that all its properties are defined
register_object_type(RandomAnalogValueObject, vendor_id=999)
# make some devices
for device_number in range(2, 2 + args.count):
# device identifier is assigned from the address
device_instance = vlan_network * 100 + device_number
_log.debug(" - device_instance: %r", device_instance)
for vlan_network in args.net2:
# create a VLAN
vlan = Network(broadcast_address=LocalBroadcast())
# make a vlan device object
vlan_device = \
LocalDeviceObject(
objectName="VLAN Node %d" % (device_instance,),
objectIdentifier=('device', device_instance),
maxApduLengthAccepted=1024,
segmentationSupported='noSegmentation',
vendorIdentifier=15,
# create a node for the router, address 1 on the VLAN
router_addr = Address(1)
router_node = Node(router_addr)
vlan.add_node(router_node)
# bind the router stack to the vlan network through this node
router.nsap.bind(router_node, vlan_network, router_addr)
# send network topology
deferred(router.nse.i_am_router_to_network)
# make some devices
for device_number in range(2, 2 + args.count):
# device identifier is assigned from the address
device_instance = vlan_network * 100 + device_number
_log.debug(" - device_instance: %r", device_instance)
# make a vlan device object
vlan_device = \
LocalDeviceObject(
objectName="VLAN Node %d" % (device_instance,),
objectIdentifier=('device', device_instance),
maxApduLengthAccepted=1024,
segmentationSupported='noSegmentation',
vendorIdentifier=15,
)
_log.debug(" - vlan_device: %r", vlan_device)
vlan_address = Address(device_number)
_log.debug(" - vlan_address: %r", vlan_address)
# make the application, add it to the network
vlan_app = VLANApplication(vlan_device, vlan_address)
vlan.add_node(vlan_app.vlan_node)
_log.debug(" - vlan_app: %r", vlan_app)
# make a random value object
ravo = RandomAnalogValueObject(
objectIdentifier=('analogValue', 1),
objectName='Random-1-%d' % (device_instance,),
)
_log.debug(" - vlan_device: %r", vlan_device)
_log.debug(" - ravo: %r", ravo)
vlan_address = Address(device_number)
_log.debug(" - vlan_address: %r", vlan_address)
# make the application, add it to the network
vlan_app = VLANApplication(vlan_device, vlan_address)
vlan.add_node(vlan_app.vlan_node)
_log.debug(" - vlan_app: %r", vlan_app)
# make a random value object
ravo = RandomAnalogValueObject(
objectIdentifier=('analogValue', 1),
objectName='Random-1-%d' % (device_instance,),
)
_log.debug(" - ravo: %r", ravo)
# add it to the device
vlan_app.add_object(ravo)
# add it to the device
vlan_app.add_object(ravo)
_log.debug("running")

View File

@ -66,6 +66,16 @@ class TestConsoleCmd(ConsoleCmd):
y = _localtime()
print("y: {}".format(y))
def do_dc(self, args):
"""dc"""
args = args.split()
if _debug:
TestConsoleCmd._debug("do_dc %r", args)
for so in schedule_objects:
print(so.objectName)
so.debug_contents()
#
# __main__
@ -103,7 +113,7 @@ def main():
objectName="Schedule 1",
presentValue=Integer(8),
effectivePeriod=DateRange(startDate=(0, 1, 1, 1), endDate=(254, 12, 31, 2),),
weeklySchedule=[
weeklySchedule=ArrayOf(DailySchedule, 7)([
DailySchedule(
daySchedule=[
TimeValue(time=(8, 0, 0, 0), value=Integer(8)),
@ -113,7 +123,7 @@ def main():
]
),
]
* 7,
* 7),
scheduleDefault=Integer(0),
)
_log.debug(" - so: %r", so)
@ -129,7 +139,7 @@ def main():
objectName="Schedule 2",
presentValue=CharacterString(""),
effectivePeriod=DateRange(startDate=(0, 1, 1, 1), endDate=(254, 12, 31, 2),),
exceptionSchedule=[
exceptionSchedule=ArrayOf(SpecialEvent)([
SpecialEvent(
period=SpecialEventPeriod(
calendarEntry=CalendarEntry(date=Date("2000-01-01").value,),
@ -140,7 +150,7 @@ def main():
],
eventPriority=1,
),
],
]),
scheduleDefault=CharacterString("Don't panic."),
)
_log.debug(" - so: %r", so)
@ -155,7 +165,7 @@ def main():
objectName="Schedule 3",
presentValue=CharacterString(""),
effectivePeriod=DateRange(startDate=(0, 1, 1, 1), endDate=(254, 12, 31, 2),),
exceptionSchedule=[
exceptionSchedule=ArrayOf(SpecialEvent)([
SpecialEvent(
period=SpecialEventPeriod(
calendarEntry=CalendarEntry(weekNDay=xtob("FF.FF.05"),),
@ -165,7 +175,7 @@ def main():
],
eventPriority=1,
),
],
]),
scheduleDefault=CharacterString("Keep working."),
)
_log.debug(" - so: %r", so)
@ -181,7 +191,7 @@ def main():
objectName="Schedule 4",
presentValue=Real(73.5),
effectivePeriod=DateRange(startDate=(0, 1, 1, 1), endDate=(254, 12, 31, 2),),
weeklySchedule=[
weeklySchedule=ArrayOf(DailySchedule, 7)([
DailySchedule(
daySchedule=[
TimeValue(time=(9, 0, 0, 0), value=Real(78.0)),
@ -189,7 +199,7 @@ def main():
]
),
]
* 7,
* 7),
scheduleDefault=Real(72.0),
listOfObjectPropertyReferences=SequenceOf(DeviceObjectPropertyReference)(
[
@ -212,7 +222,7 @@ def main():
objectName="Schedule 5",
presentValue=Integer(0),
effectivePeriod=DateRange(startDate=(0, 1, 1, 1), endDate=(254, 12, 31, 2),),
exceptionSchedule=[
exceptionSchedule=ArrayOf(SpecialEvent)([
SpecialEvent(
period=SpecialEventPeriod(
calendarEntry=CalendarEntry(weekNDay=xtob("FF.FF.FF"),),
@ -260,7 +270,7 @@ def main():
listOfTimeValues=[TimeValue(time=(1, 0, 0, 0), value=Integer(1)),],
eventPriority=5,
),
],
]),
scheduleDefault=Integer(0),
)
_log.debug(" - so: %r", so)
@ -278,7 +288,7 @@ def main():
objectName="Schedule 6",
presentValue=Integer(0),
effectivePeriod=DateRange(startDate=(0, 1, 1, 1), endDate=(254, 12, 31, 2),),
exceptionSchedule=[
exceptionSchedule=ArrayOf(SpecialEvent)([
SpecialEvent(
period=SpecialEventPeriod(
calendarEntry=CalendarEntry(weekNDay=xtob("FF.FF.FF"),),
@ -286,7 +296,7 @@ def main():
listOfTimeValues=ltv,
eventPriority=1,
),
],
]),
scheduleDefault=Integer(0),
)
_log.debug(" - so: %r", so)

View File

@ -81,7 +81,7 @@ class TestConsoleCmd(ConsoleCmd):
start_time = Time(start_string).value
stop_time = Time(stop_string).value
exception_schedule = [
exception_schedule = ArrayOf(SpecialEvent)([
SpecialEvent(
period=SpecialEventPeriod(
calendarEntry=CalendarEntry(date=except_date)
@ -92,7 +92,7 @@ class TestConsoleCmd(ConsoleCmd):
],
eventPriority=1,
)
]
])
if _debug:
TestConsoleCmd._debug(" - exception_schedule: %r", exception_schedule)
@ -152,7 +152,7 @@ def main():
objectName="Test Schedule",
presentValue=Real(8.0),
effectivePeriod=DateRange(startDate=(0, 1, 1, 1), endDate=(254, 12, 31, 2)),
weeklySchedule=[
weeklySchedule=ArrayOf(DailySchedule, 7)([
DailySchedule(
daySchedule=[
TimeValue(time=(8, 0, 0, 0), value=Real(8.0)),
@ -161,7 +161,7 @@ def main():
]
)
]
* 7,
* 7),
listOfObjectPropertyReferences=ListOf(DeviceObjectPropertyReference)(
[
DeviceObjectPropertyReference(

View File

@ -7,11 +7,11 @@ that sits behind a NAT and a "global" network of other NAT router peers.
$ python NATRouter.py addr1 port1 net1 addr2 port2 net2
addr1 - local address like 192.168.1.10/24
port1 - local port
net1 - local network number
addr2 - global address like 201.1.1.1:47809
port2 - local mapped port
net2 - global network number
port1 - local port like 47808
net1 - local network number like 1
addr2 - global address like 201.1.1.1
port2 - local mapped port like 47809
net2 - global network number like 2
The sample addresses are like running BR1 from Figure J-8, Clause J.7.5.
"""
@ -67,7 +67,7 @@ class NATRouter:
# global address
global_addr = Address(addr2)
nat_addr = Address("{}:{}".format(addr1, port2))
nat_addr = Address("{}:{}".format(addr2, port2))
# create a NAT stack
self.s2_bip = BIPNAT(global_addr)

View File

@ -197,12 +197,12 @@ def main():
# parse the command line arguments
parser = ConfigArgumentParser(description=__doc__)
# add an argument for interval
# add an argument for the device identifier
parser.add_argument('device_id', type=int,
help='device identifier',
)
# add an argument for interval
# add an argument for the address of the device
parser.add_argument('device_addr', type=str,
help='device address',
)

View File

@ -0,0 +1,125 @@
#!/usr/bin/env python
"""
This sample application shows how to extend one of the basic objects, an Analog
Value Object in this case, to allow the object to be renamed.
"""
from bacpypes.debugging import ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.object import AnalogValueObject, BinaryValueObject, register_object_type
from bacpypes.app import BIPSimpleApplication
from bacpypes.local.device import LocalDeviceObject
from bacpypes.local.object import WriteableObjectNameMixIn, WriteableObjectIdentifierMixIn
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# Analog Value Object that can be renamed
#
@register_object_type
class SampleAnalogValueObject(WriteableObjectNameMixIn, AnalogValueObject):
def __init__(self, **kwargs):
if _debug:
SampleAnalogValueObject._debug("__init__ %r", kwargs)
AnalogValueObject.__init__(self, **kwargs)
# add a callback when the object name has changed
self._property_monitors["objectName"].append(self.object_name_changed)
def object_name_changed(self, old_value, new_value):
if _debug:
SampleAnalogValueObject._debug(
"object_name_changed %r %r", old_value, new_value
)
print("object name changed from %r to %r" % (old_value, new_value))
#
# Binary Value Object that can be given a new object identifier
#
@register_object_type
class SampleBinaryValueObject(WriteableObjectIdentifierMixIn, BinaryValueObject):
def __init__(self, **kwargs):
if _debug:
SampleBinaryValueObject._debug("__init__ %r", kwargs)
BinaryValueObject.__init__(self, **kwargs)
# add a callback when the object name has changed
self._property_monitors["objectIdentifier"].append(self.object_identifier_changed)
def object_identifier_changed(self, old_value, new_value):
if _debug:
SampleBinaryValueObject._debug(
"object_identifier_changed %r %r", old_value, new_value
)
print("object identifier changed from %r to %r" % (old_value, new_value))
#
# __main__
#
def main():
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug:
_log.debug("initialization")
_log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=("device", int(args.ini.objectidentifier)),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a sample application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make some objects
savo = SampleAnalogValueObject(
objectIdentifier=("analogValue", 1),
objectName="SampleAnalogValueObject",
presentValue=123.4,
)
_log.debug(" - savo: %r", savo)
this_application.add_object(savo)
# make some objects
sbvo = SampleBinaryValueObject(
objectIdentifier=("binaryValue", 1),
objectName="SampleBinaryValueObject",
presentValue=True,
)
_log.debug(" - sbvo: %r", sbvo)
this_application.add_object(sbvo)
# make sure they are all there
_log.debug(" - object list: %r", this_device.objectList)
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()

View File

@ -120,8 +120,8 @@ def main():
# make a simple application
this_application = WhoIsRouterApplication(
args.ini.address,
bbmdAddress=Address(args.ini.foreignbbmd),
bbmdTTL=int(args.ini.foreignttl),
bbmdAddress=Address(args.ini.foreignbbmd),
bbmdTTL=int(args.ini.foreignttl)
)
if _debug: _log.debug(" - this_application: %r", this_application)

View File

@ -10,6 +10,7 @@ from . import test_cov_bv
from . import test_cov_pc
from . import test_device
from . import test_device_2
from . import test_file
from . import test_object

View File

@ -63,7 +63,7 @@ class TestWhoIsIAm(unittest.TestCase):
# add the service capability to the IUT
anet.iut.add_capability(WhoIsIAmServices)
# all start states are successful
# send a WhoIs, get back an IAm
anet.td.start_state.doc("1-1-0") \
.send(WhoIsRequest(destination=anet.vlan.broadcast_address)).doc("1-1-1") \
.receive(IAmRequest, pduSource=anet.iut.address).doc("1-1-2") \
@ -85,7 +85,7 @@ class TestWhoIsIAm(unittest.TestCase):
# add the service capability to the iut
anet.iut.add_capability(WhoIsIAmServices)
# all start states are successful
# send a Who-Is, no response
anet.td.start_state.doc("2-1-0") \
.send(WhoIsRequest(
destination=anet.vlan.broadcast_address,
@ -110,7 +110,7 @@ class TestWhoIsIAm(unittest.TestCase):
# add the service capability to the iut
anet.iut.add_capability(WhoIsIAmServices)
# all start states are successful
# send a Who-Is, no response
anet.td.start_state.doc("3-1-0") \
.send(WhoIsRequest(
destination=anet.vlan.broadcast_address,
@ -135,7 +135,7 @@ class TestWhoIsIAm(unittest.TestCase):
# add the service capability to the IUT
anet.iut.add_capability(WhoIsIAmServices)
# all start states are successful
# send a Who-Is, get back an I-Am
anet.td.start_state.doc("4-1-0") \
.send(WhoIsRequest(
destination=anet.vlan.broadcast_address,
@ -156,7 +156,7 @@ class TestWhoIsIAm(unittest.TestCase):
class TestWhoHasIHave(unittest.TestCase):
def test_who_has_object_by_name(self):
"""Test an unconstrained WhoIs, all devices respond."""
"""Test a Who-Has for an object by name."""
if _debug: TestWhoIsIAm._debug("test_who_has_object_by_name")
# create a network
@ -165,7 +165,7 @@ class TestWhoHasIHave(unittest.TestCase):
# add the service capability to the IUT
anet.iut.add_capability(WhoHasIHaveServices)
# all start states are successful
# send the Who-Has, get back a response
anet.td.start_state.doc("5-1-0") \
.send(WhoHasRequest(
destination=anet.vlan.broadcast_address,
@ -181,7 +181,7 @@ class TestWhoHasIHave(unittest.TestCase):
anet.run()
def test_who_has_object_by_id(self):
"""Test an unconstrained WhoIs, all devices respond."""
"""Test a Who-Has for an object by identifier."""
if _debug: TestWhoIsIAm._debug("test_who_has_object_by_id")
# create a network
@ -190,7 +190,7 @@ class TestWhoHasIHave(unittest.TestCase):
# add the service capability to the IUT
anet.iut.add_capability(WhoHasIHaveServices)
# all start states are successful
# send the Who-Has, get back a response
anet.td.start_state.doc("6-1-0") \
.send(WhoHasRequest(
destination=anet.vlan.broadcast_address,

View File

@ -0,0 +1,267 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Test APDU Decoding
------------------
"""
import sys
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.comm import bind
from bacpypes.iocb import IOCB
from bacpypes.pdu import Address, LocalBroadcast
from bacpypes.primitivedata import OpeningTag
from bacpypes.constructeddata import Any
from bacpypes.apdu import (
APDU,
ReadPropertyRequest,
ReadPropertyACK,
Error,
)
from bacpypes.vlan import Network, Node
from bacpypes.app import ApplicationIOController
from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement
from bacpypes.local.device import LocalDeviceObject
from ..state_machine import (
StateMachine,
StateMachineGroup,
ClientStateMachine,
TrafficLog,
)
from ..time_machine import reset_time_machine, run_time_machine
# some debugging
_debug = 0
_log = ModuleLogger(globals())
@bacpypes_debugging
class ApplicationNetwork(StateMachineGroup):
def __init__(self, test_name):
if _debug:
ApplicationNetwork._debug("__init__ %r", test_name)
StateMachineGroup.__init__(self)
# reset the time machine
reset_time_machine()
if _debug:
ApplicationNetwork._debug(" - time machine reset")
# create a traffic log
self.traffic_log = TrafficLog()
# make a little LAN
self.vlan = Network(broadcast_address=LocalBroadcast())
self.vlan.traffic_log = self.traffic_log
# test device object
self.td_device_object = LocalDeviceObject(
objectName="td",
objectIdentifier=("device", 10),
maxApduLengthAccepted=1024,
segmentationSupported="noSegmentation",
vendorIdentifier=999,
)
# test device
self.td = ApplicationStateMachine(self.td_device_object, self.vlan)
self.append(self.td)
# error device generates bad packets
self.ed = ApplicationLayerStateMachine(20, self.vlan)
self.append(self.ed)
def run(self, time_limit=60.0):
if _debug:
ApplicationNetwork._debug("run %r", time_limit)
# run the group
super(ApplicationNetwork, self).run()
if _debug:
ApplicationNetwork._debug(" - group running")
# run it for some time
run_time_machine(time_limit)
if _debug:
ApplicationNetwork._debug(" - time machine finished")
for state_machine in self.state_machines:
ApplicationNetwork._debug(" - machine: %r", state_machine)
for direction, pdu in state_machine.transaction_log:
ApplicationNetwork._debug(" %s %s", direction, str(pdu))
# traffic log has what was processed on each vlan
self.traffic_log.dump(ApplicationNetwork._debug)
# check for success
all_success, some_failed = super(ApplicationNetwork, self).check_for_success()
ApplicationNetwork._debug(
" - all_success, some_failed: %r, %r", all_success, some_failed
)
assert all_success
class _NetworkServiceElement(NetworkServiceElement):
"""
This class turns off the deferred startup function call that broadcasts
I-Am-Router-To-Network and Network-Number-Is messages.
"""
_startup_disabled = True
@bacpypes_debugging
class ApplicationStateMachine(ApplicationIOController, StateMachine):
def __init__(self, localDevice, vlan):
if _debug:
ApplicationStateMachine._debug("__init__ %r %r", localDevice, vlan)
# build an address and save it
self.address = Address(localDevice.objectIdentifier[1])
if _debug:
ApplicationStateMachine._debug(" - address: %r", self.address)
# continue with initialization
ApplicationIOController.__init__(self, localDevice)
StateMachine.__init__(self, name=localDevice.objectName)
# include a application decoder
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(localDevice)
# the segmentation state machines need access to the same device
# information cache as the application
self.smap.deviceInfoCache = self.deviceInfoCache
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = _NetworkServiceElement()
bind(self.nse, self.nsap)
# bind the top layers
bind(self, self.asap, self.smap, self.nsap)
# create a node, added to the network
self.node = Node(self.address, vlan)
# bind the network service to the node, no network number
self.nsap.bind(self.node)
def send(self, apdu):
if _debug:
ApplicationStateMachine._debug("send(%s) %r", self.name, apdu)
# build an IOCB to wrap the request
iocb = IOCB(apdu)
self.request_io(iocb)
def indication(self, apdu):
if _debug:
ApplicationStateMachine._debug("indication(%s) %r", self.name, apdu)
# let the state machine know the request was received
self.receive(apdu)
# allow the application to process it
super(ApplicationStateMachine, self).indication(apdu)
def confirmation(self, apdu):
if _debug:
ApplicationStateMachine._debug("confirmation(%s) %r", self.name, apdu)
# forward the confirmation to the state machine
self.receive(apdu)
# allow the application to process it
super(ApplicationStateMachine, self).confirmation(apdu)
@bacpypes_debugging
class ApplicationLayerStateMachine(ClientStateMachine):
def __init__(self, address, vlan):
if _debug:
ApplicationLayerStateMachine._debug("__init__ %r %r", address, vlan)
ClientStateMachine.__init__(self)
# build a name, save the address
self.name = "app @ %s" % (address,)
self.address = Address(address)
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
if _debug:
ApplicationLayerStateMachine._debug(" - nsap: %r", self.nsap)
# bind this as a client of the network service access point
bind(self, self.nsap)
# create a node, added to the network
self.node = Node(self.address, vlan)
if _debug:
ApplicationLayerStateMachine._debug(" - node: %r", self.node)
# bind the stack to the local network
self.nsap.bind(self.node)
@bacpypes_debugging
class TestAPDUDecodingError(unittest.TestCase):
def test_apdu_bad_reply(self):
"""Confirmed Request - Bad Reply"""
if _debug:
TestAPDUDecodingError._debug("test_apdu_bad_reply")
# create a network
anet = ApplicationNetwork("test_apdu_bad_reply")
# make a bad value
a = Any()
a.tagList.append(OpeningTag(1))
# create a bad APDU to send back
bad_apdu = ReadPropertyACK(
objectIdentifier=("analogValue", 1),
propertyIdentifier="presentValue",
propertyValue=a,
)
bad_apdu.pduDestination = anet.td.address
bad_apdu.apduInvokeID = 1
# send a request to a non-existent device, get it rejected
anet.td.start_state.doc("8-1-0") \
.send(
ReadPropertyRequest(
objectIdentifier=("analogValue", 1),
propertyIdentifier="presentValue",
destination=anet.ed.address,
)).doc("8-1-1") \
.receive(
Error,
errorClass=7, # communication
errorCode=57, # invalidTag
).doc("8-1-2") \
.success()
# error device sends back a badly encoded response
anet.ed.start_state.doc("8-2-0") \
.receive(APDU).doc("8-2-1") \
.send(bad_apdu).doc("8-2-2") \
.success()
# run the group
anet.run()