mirror of
https://github.com/JoelBender/bacpypes
synced 2025-09-28 22:15:23 +08:00
mixin classes for objects that can be renamed and given new identities
This commit is contained in:
parent
5ce45a45b7
commit
df91a98fd1
|
@ -5,21 +5,62 @@ import re
|
|||
from ..debugging import bacpypes_debugging, ModuleLogger
|
||||
|
||||
from ..task import OneShotTask
|
||||
from ..primitivedata import Atomic, Null, BitString, CharacterString, \
|
||||
Date, Integer, Double, Enumerated, OctetString, Real, Time, Unsigned
|
||||
from ..basetypes import PropertyIdentifier, DateTime, NameValue, BinaryPV, \
|
||||
ChannelValue, DoorValue, PriorityValue, PriorityArray
|
||||
from ..primitivedata import (
|
||||
Atomic,
|
||||
Null,
|
||||
BitString,
|
||||
CharacterString,
|
||||
Date,
|
||||
Integer,
|
||||
Double,
|
||||
Enumerated,
|
||||
OctetString,
|
||||
Real,
|
||||
Time,
|
||||
Unsigned,
|
||||
ObjectIdentifier,
|
||||
)
|
||||
from ..basetypes import (
|
||||
PropertyIdentifier,
|
||||
DateTime,
|
||||
NameValue,
|
||||
BinaryPV,
|
||||
ChannelValue,
|
||||
DoorValue,
|
||||
PriorityValue,
|
||||
PriorityArray,
|
||||
)
|
||||
from ..constructeddata import Array, ArrayOf, SequenceOf
|
||||
|
||||
from ..errors import ExecutionError
|
||||
from ..object import Property, ReadableProperty, WritableProperty, OptionalProperty, Object, \
|
||||
AccessDoorObject, AnalogOutputObject, AnalogValueObject, \
|
||||
BinaryOutputObject, BinaryValueObject, BitStringValueObject, CharacterStringValueObject, \
|
||||
DateValueObject, DatePatternValueObject, DateTimePatternValueObject, \
|
||||
DateTimeValueObject, IntegerValueObject, \
|
||||
LargeAnalogValueObject, LightingOutputObject, MultiStateOutputObject, \
|
||||
MultiStateValueObject, OctetStringValueObject, PositiveIntegerValueObject, \
|
||||
TimeValueObject, TimePatternValueObject, ChannelObject
|
||||
from ..object import (
|
||||
Property,
|
||||
ReadableProperty,
|
||||
WritableProperty,
|
||||
OptionalProperty,
|
||||
Object,
|
||||
AccessDoorObject,
|
||||
AnalogOutputObject,
|
||||
AnalogValueObject,
|
||||
BinaryOutputObject,
|
||||
BinaryValueObject,
|
||||
BitStringValueObject,
|
||||
CharacterStringValueObject,
|
||||
DateValueObject,
|
||||
DatePatternValueObject,
|
||||
DateTimePatternValueObject,
|
||||
DateTimeValueObject,
|
||||
IntegerValueObject,
|
||||
LargeAnalogValueObject,
|
||||
LightingOutputObject,
|
||||
MultiStateOutputObject,
|
||||
MultiStateValueObject,
|
||||
OctetStringValueObject,
|
||||
PositiveIntegerValueObject,
|
||||
TimeValueObject,
|
||||
TimePatternValueObject,
|
||||
ChannelObject,
|
||||
)
|
||||
|
||||
|
||||
# some debugging
|
||||
|
@ -33,22 +74,35 @@ ArrayOfPropertyIdentifier = ArrayOf(PropertyIdentifier)
|
|||
# CurrentPropertyList
|
||||
#
|
||||
|
||||
|
||||
@bacpypes_debugging
|
||||
class CurrentPropertyList(Property):
|
||||
|
||||
def __init__(self):
|
||||
if _debug: CurrentPropertyList._debug("__init__")
|
||||
Property.__init__(self, 'propertyList', ArrayOfPropertyIdentifier, default=None, optional=True, mutable=False)
|
||||
if _debug:
|
||||
CurrentPropertyList._debug("__init__")
|
||||
Property.__init__(
|
||||
self,
|
||||
"propertyList",
|
||||
ArrayOfPropertyIdentifier,
|
||||
default=None,
|
||||
optional=True,
|
||||
mutable=False,
|
||||
)
|
||||
|
||||
def ReadProperty(self, obj, arrayIndex=None):
|
||||
if _debug: CurrentPropertyList._debug("ReadProperty %r %r", obj, arrayIndex)
|
||||
if _debug:
|
||||
CurrentPropertyList._debug("ReadProperty %r %r", obj, arrayIndex)
|
||||
|
||||
# make a list of the properties that have values
|
||||
property_list = [k for k, v in obj._values.items()
|
||||
property_list = [
|
||||
k
|
||||
for k, v in obj._values.items()
|
||||
if v is not None
|
||||
and k not in ('objectName', 'objectType', 'objectIdentifier', 'propertyList')
|
||||
]
|
||||
if _debug: CurrentPropertyList._debug(" - property_list: %r", property_list)
|
||||
and k
|
||||
not in ("objectName", "objectType", "objectIdentifier", "propertyList")
|
||||
]
|
||||
if _debug:
|
||||
CurrentPropertyList._debug(" - property_list: %r", property_list)
|
||||
|
||||
# sort the list so it's stable
|
||||
property_list.sort()
|
||||
|
@ -63,52 +117,160 @@ class CurrentPropertyList(Property):
|
|||
|
||||
# asking for an index
|
||||
if arrayIndex > len(property_list):
|
||||
raise ExecutionError(errorClass='property', errorCode='invalidArrayIndex')
|
||||
raise ExecutionError(errorClass="property", errorCode="invalidArrayIndex")
|
||||
return property_list[arrayIndex - 1]
|
||||
|
||||
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
|
||||
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
|
||||
raise ExecutionError(errorClass="property", errorCode="writeAccessDenied")
|
||||
|
||||
|
||||
#
|
||||
# CurrentPropertyListMixIn
|
||||
#
|
||||
|
||||
|
||||
@bacpypes_debugging
|
||||
class CurrentPropertyListMixIn(Object):
|
||||
|
||||
properties = [
|
||||
CurrentPropertyList(),
|
||||
]
|
||||
]
|
||||
|
||||
|
||||
#
|
||||
# WriteableObjectName
|
||||
#
|
||||
|
||||
|
||||
@bacpypes_debugging
|
||||
class WriteableObjectName(WritableProperty):
|
||||
|
||||
class WriteableObjectName(Property):
|
||||
def __init__(self):
|
||||
if _debug: WriteableObjectName._debug("__init__")
|
||||
WritableProperty.__init__(self, 'objectName', CharacterString, default=None, optional=False)
|
||||
|
||||
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
|
||||
if _debug:
|
||||
WritableProperty._debug(
|
||||
WriteableObjectName._debug("__init__")
|
||||
Property.__init__(
|
||||
self, "objectName", CharacterString, optional=False, mutable=True
|
||||
)
|
||||
|
||||
def WriteProperty(self, obj, new_name, arrayIndex=None, priority=None, direct=False):
|
||||
if _debug:
|
||||
WriteableObjectName._debug(
|
||||
"WriteProperty %r %r arrayIndex=%r priority=%r direct=%r",
|
||||
obj, value, arrayIndex, priority, direct,
|
||||
obj,
|
||||
new_name,
|
||||
arrayIndex,
|
||||
priority,
|
||||
direct,
|
||||
)
|
||||
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
|
||||
|
||||
# check for being bound to an application
|
||||
old_name = None
|
||||
if getattr(obj, "_app", None):
|
||||
if _debug:
|
||||
WriteableObjectName._debug(" - more error checking")
|
||||
|
||||
old_name = obj._values["objectName"]
|
||||
if new_name == old_name:
|
||||
return
|
||||
if new_name in obj._app.objectName:
|
||||
raise ExecutionError(errorClass="property", errorCode="duplicateName")
|
||||
|
||||
# pass it along
|
||||
Property.WriteProperty(self, obj, new_name, arrayIndex, priority, direct)
|
||||
|
||||
if old_name is not None:
|
||||
if _debug:
|
||||
WriteableObjectName._debug(" - update the application")
|
||||
del obj._app.objectName[old_name]
|
||||
obj._app.objectName[new_name] = obj
|
||||
|
||||
|
||||
#
|
||||
# WriteableObjectNameMixIn
|
||||
#
|
||||
|
||||
|
||||
@bacpypes_debugging
|
||||
class WriteableObjectNameMixIn(Object):
|
||||
|
||||
properties = [
|
||||
WriteableObjectName(),
|
||||
]
|
||||
]
|
||||
|
||||
|
||||
#
|
||||
# WriteableObjectIdentifier
|
||||
#
|
||||
|
||||
|
||||
@bacpypes_debugging
|
||||
class WriteableObjectIdentifier(Property):
|
||||
def __init__(self):
|
||||
if _debug:
|
||||
WriteableObjectIdentifier._debug("__init__")
|
||||
Property.__init__(
|
||||
self, "objectIdentifier", ObjectIdentifier, optional=False, mutable=True
|
||||
)
|
||||
|
||||
def WriteProperty(
|
||||
self, obj, new_identifier, arrayIndex=None, priority=None, direct=False
|
||||
):
|
||||
if _debug:
|
||||
WriteableObjectIdentifier._debug(
|
||||
"WriteProperty %r %r arrayIndex=%r priority=%r direct=%r",
|
||||
obj,
|
||||
new_identifier,
|
||||
arrayIndex,
|
||||
priority,
|
||||
direct,
|
||||
)
|
||||
|
||||
# check for being bound to an application
|
||||
old_identifier = None
|
||||
if getattr(obj, "_app", None):
|
||||
if _debug:
|
||||
WriteableObjectIdentifier._debug(" - more error checking")
|
||||
|
||||
old_identifier = obj._values["objectIdentifier"]
|
||||
if new_identifier == old_identifier:
|
||||
return
|
||||
if new_identifier in obj._app.objectIdentifier:
|
||||
raise ExecutionError(
|
||||
errorClass="property", errorCode="duplicateObjectId"
|
||||
)
|
||||
if new_identifier[0] != old_identifier[0]:
|
||||
raise ExecutionError(
|
||||
errorClass="property", errorCode="valueOutOfRange"
|
||||
)
|
||||
|
||||
# pass it along
|
||||
Property.WriteProperty(self, obj, new_identifier, arrayIndex, priority, direct)
|
||||
|
||||
if old_identifier is not None:
|
||||
if _debug:
|
||||
WriteableObjectIdentifier._debug(" - update the application")
|
||||
|
||||
del obj._app.objectIdentifier[old_identifier]
|
||||
obj._app.objectIdentifier[new_identifier] = obj
|
||||
|
||||
# update the object list of the local device object
|
||||
app_object_list = obj._app.localDevice.objectList
|
||||
indx = app_object_list.index(old_identifier)
|
||||
del app_object_list[indx]
|
||||
app_object_list.append(new_identifier)
|
||||
|
||||
|
||||
#
|
||||
# WriteableObjectIdentifierMixIn
|
||||
#
|
||||
|
||||
|
||||
@bacpypes_debugging
|
||||
class WriteableObjectIdentifierMixIn(Object):
|
||||
|
||||
properties = [
|
||||
WriteableObjectIdentifier(),
|
||||
]
|
||||
|
||||
|
||||
#
|
||||
# Turtle Reference Patterns
|
||||
|
@ -175,6 +337,7 @@ blank_node_re = re.compile(u"^" + BLANK_NODE_LABEL + u"$", re.UNICODE)
|
|||
# see https://tools.ietf.org/html/bcp47#section-2.1 for better syntax
|
||||
language_tag_re = re.compile(u"^[A-Za-z0-9-]+$", re.UNICODE)
|
||||
|
||||
|
||||
class IRI:
|
||||
# regex from RFC 3986
|
||||
_e = r"^(?:([^:/?#]+):)?(?://([^/?#]*))?([^?#]*)(?:\?([^#]*))?(?:#(.*))?"
|
||||
|
@ -346,7 +509,8 @@ class TagSet:
|
|||
def index(self, name, value=None):
|
||||
"""Find the first name with dictionary semantics or (name, value) with
|
||||
list semantics."""
|
||||
if _debug: TagSet._debug("index %r %r", name, value)
|
||||
if _debug:
|
||||
TagSet._debug("index %r %r", name, value)
|
||||
|
||||
# if this is a NameValue rip it apart first
|
||||
if isinstance(name, NameValue):
|
||||
|
@ -378,7 +542,8 @@ class TagSet:
|
|||
|
||||
def add(self, name, value=None):
|
||||
"""Add a (name, value) with mutable set semantics."""
|
||||
if _debug: TagSet._debug("add %r %r", name, value)
|
||||
if _debug:
|
||||
TagSet._debug("add %r %r", name, value)
|
||||
|
||||
# provide a Null if you are adding a is-a relationship, wrap strings
|
||||
# to be friendly
|
||||
|
@ -397,31 +562,31 @@ class TagSet:
|
|||
if not isinstance(value, CharacterString):
|
||||
raise TypeError("value must be an string")
|
||||
|
||||
v = self.get('@base')
|
||||
v = self.get("@base")
|
||||
if v and v.value == value.value:
|
||||
pass
|
||||
else:
|
||||
raise ValueError("@base exists")
|
||||
|
||||
# if not iriref_re.match(value.value):
|
||||
# raise ValueError("value must be an IRI")
|
||||
# if not iriref_re.match(value.value):
|
||||
# raise ValueError("value must be an IRI")
|
||||
|
||||
elif name == "@id":
|
||||
if not isinstance(value, CharacterString):
|
||||
raise TypeError("value must be an string")
|
||||
|
||||
v = self.get('@id')
|
||||
v = self.get("@id")
|
||||
if v and v.value == value.value:
|
||||
pass
|
||||
else:
|
||||
raise ValueError("@id exists")
|
||||
|
||||
# # check the patterns
|
||||
# for pattern in (blank_node_re, prefixed_name_re, local_name_re, iriref_re):
|
||||
# if pattern.match(value.value):
|
||||
# break
|
||||
# else:
|
||||
# raise ValueError("invalid value for @id")
|
||||
# # check the patterns
|
||||
# for pattern in (blank_node_re, prefixed_name_re, local_name_re, iriref_re):
|
||||
# if pattern.match(value.value):
|
||||
# break
|
||||
# else:
|
||||
# raise ValueError("invalid value for @id")
|
||||
|
||||
elif name == "@language":
|
||||
if not isinstance(value, CharacterString):
|
||||
|
@ -440,7 +605,7 @@ class TagSet:
|
|||
if not isinstance(value, CharacterString):
|
||||
raise TypeError("value must be an string")
|
||||
|
||||
v = self.get('@vocab')
|
||||
v = self.get("@vocab")
|
||||
if v and v.value == value.value:
|
||||
pass
|
||||
else:
|
||||
|
@ -459,16 +624,16 @@ class TagSet:
|
|||
else:
|
||||
raise ValueError("prefix exists: %r" % (name,))
|
||||
|
||||
# if not iriref_re.match(value.value):
|
||||
# raise ValueError("value must be an IRI")
|
||||
# if not iriref_re.match(value.value):
|
||||
# raise ValueError("value must be an IRI")
|
||||
|
||||
else:
|
||||
# # check the patterns
|
||||
# for pattern in (prefixed_name_re, local_name_re, iriref_re):
|
||||
# if pattern.match(name):
|
||||
# break
|
||||
# else:
|
||||
# raise ValueError("invalid name")
|
||||
# # check the patterns
|
||||
# for pattern in (prefixed_name_re, local_name_re, iriref_re):
|
||||
# if pattern.match(name):
|
||||
# break
|
||||
# else:
|
||||
# raise ValueError("invalid name")
|
||||
pass
|
||||
|
||||
# check the value
|
||||
|
@ -483,7 +648,8 @@ class TagSet:
|
|||
|
||||
def discard(self, name, value=None):
|
||||
"""Discard a (name, value) with mutable set semantics."""
|
||||
if _debug: TagSet._debug("discard %r %r", name, value)
|
||||
if _debug:
|
||||
TagSet._debug("discard %r %r", name, value)
|
||||
|
||||
# provide a Null if you are adding a is-a relationship, wrap strings
|
||||
# to be friendly
|
||||
|
@ -497,7 +663,8 @@ class TagSet:
|
|||
|
||||
def append(self, name_value):
|
||||
"""Override the append operation for mutable set semantics."""
|
||||
if _debug: TagSet._debug("append %r", name_value)
|
||||
if _debug:
|
||||
TagSet._debug("append %r", name_value)
|
||||
|
||||
if not isinstance(name_value, NameValue):
|
||||
raise TypeError
|
||||
|
@ -508,7 +675,8 @@ class TagSet:
|
|||
def get(self, key, default=None):
|
||||
"""Get the value of a key or default value if the key was not found,
|
||||
dictionary semantics."""
|
||||
if _debug: TagSet._debug("get %r %r", key, default)
|
||||
if _debug:
|
||||
TagSet._debug("get %r %r", key, default)
|
||||
|
||||
try:
|
||||
if not isinstance(key, str):
|
||||
|
@ -521,7 +689,8 @@ class TagSet:
|
|||
"""If item is an integer, return the value of the NameValue element
|
||||
with array/sequence semantics. If the item is a string, return the
|
||||
value with dictionary semantics."""
|
||||
if _debug: TagSet._debug("__getitem__ %r", item)
|
||||
if _debug:
|
||||
TagSet._debug("__getitem__ %r", item)
|
||||
|
||||
# integers imply index
|
||||
if isinstance(item, int):
|
||||
|
@ -533,7 +702,8 @@ class TagSet:
|
|||
"""If item is an integer, change the value of the NameValue element
|
||||
with array/sequence semantics. If the item is a string, change the
|
||||
current value or add a new value with dictionary semantics."""
|
||||
if _debug: TagSet._debug("__setitem__ %r %r", item, value)
|
||||
if _debug:
|
||||
TagSet._debug("__setitem__ %r %r", item, value)
|
||||
|
||||
# integers imply index
|
||||
if isinstance(item, int):
|
||||
|
@ -567,7 +737,8 @@ class TagSet:
|
|||
"""If the item is a integer, delete the element with array semantics, or
|
||||
if the item is a string, delete the element with dictionary semantics,
|
||||
or (name, value) with mutable set semantics."""
|
||||
if _debug: TagSet._debug("__delitem__ %r", item)
|
||||
if _debug:
|
||||
TagSet._debug("__delitem__ %r", item)
|
||||
|
||||
# integers imply index
|
||||
if isinstance(item, int):
|
||||
|
@ -582,7 +753,8 @@ class TagSet:
|
|||
return super(TagSet, self).__delitem__(indx)
|
||||
|
||||
def __contains__(self, key):
|
||||
if _debug: TagSet._debug("__contains__ %r", key)
|
||||
if _debug:
|
||||
TagSet._debug("__contains__ %r", key)
|
||||
|
||||
try:
|
||||
if isinstance(key, tuple):
|
||||
|
@ -606,14 +778,18 @@ class SequenceOfNameValue(TagSet, SequenceOf(NameValue)):
|
|||
|
||||
|
||||
class TagsMixIn(Object):
|
||||
properties = \
|
||||
[ OptionalProperty('tags', ArrayOfNameValue)
|
||||
]
|
||||
properties = [OptionalProperty("tags", ArrayOfNameValue)]
|
||||
|
||||
|
||||
@bacpypes_debugging
|
||||
def Commandable(datatype, presentValue='presentValue', priorityArray='priorityArray', relinquishDefault='relinquishDefault'):
|
||||
if _debug: Commandable._debug("Commandable %r ...", datatype)
|
||||
def Commandable(
|
||||
datatype,
|
||||
presentValue="presentValue",
|
||||
priorityArray="priorityArray",
|
||||
relinquishDefault="relinquishDefault",
|
||||
):
|
||||
if _debug:
|
||||
Commandable._debug("Commandable %r ...", datatype)
|
||||
|
||||
class _Commando(object):
|
||||
|
||||
|
@ -621,69 +797,100 @@ def Commandable(datatype, presentValue='presentValue', priorityArray='priorityAr
|
|||
WritableProperty(presentValue, datatype),
|
||||
ReadableProperty(priorityArray, PriorityArray),
|
||||
ReadableProperty(relinquishDefault, datatype),
|
||||
]
|
||||
]
|
||||
|
||||
_pv_choice = None
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if _debug: Commandable._debug("_Commando.__init__(%r, %r, %r, %r) %r", datatype, presentValue, priorityArray, relinquishDefault, kwargs)
|
||||
if _debug:
|
||||
Commandable._debug(
|
||||
"_Commando.__init__(%r, %r, %r, %r) %r",
|
||||
datatype,
|
||||
presentValue,
|
||||
priorityArray,
|
||||
relinquishDefault,
|
||||
kwargs,
|
||||
)
|
||||
super(_Commando, self).__init__(**kwargs)
|
||||
|
||||
# build a default value in case one is needed
|
||||
default_value = datatype().value
|
||||
if issubclass(datatype, Enumerated):
|
||||
default_value = datatype._xlate_table[default_value]
|
||||
if _debug: Commandable._debug(" - default_value: %r", default_value)
|
||||
if _debug:
|
||||
Commandable._debug(" - default_value: %r", default_value)
|
||||
|
||||
# see if a present value was provided
|
||||
if (presentValue not in kwargs):
|
||||
if presentValue not in kwargs:
|
||||
setattr(self, presentValue, default_value)
|
||||
|
||||
# see if a priority array was provided
|
||||
if (priorityArray not in kwargs):
|
||||
if priorityArray not in kwargs:
|
||||
setattr(self, priorityArray, PriorityArray())
|
||||
|
||||
# see if a present value was provided
|
||||
if (relinquishDefault not in kwargs):
|
||||
if relinquishDefault not in kwargs:
|
||||
setattr(self, relinquishDefault, default_value)
|
||||
|
||||
def _highest_priority_value(self):
|
||||
if _debug: Commandable._debug("_highest_priority_value")
|
||||
if _debug:
|
||||
Commandable._debug("_highest_priority_value")
|
||||
|
||||
priority_array = getattr(self, priorityArray)
|
||||
for i in range(1, 17):
|
||||
priority_value = priority_array[i]
|
||||
if priority_value.null is None:
|
||||
if _debug: Commandable._debug(" - found at index: %r", i)
|
||||
if _debug:
|
||||
Commandable._debug(" - found at index: %r", i)
|
||||
|
||||
value = getattr(priority_value, _Commando._pv_choice)
|
||||
value_source = "###"
|
||||
|
||||
if issubclass(datatype, Enumerated):
|
||||
value = datatype._xlate_table[value]
|
||||
if _debug: Commandable._debug(" - remapped enumeration: %r", value)
|
||||
if _debug:
|
||||
Commandable._debug(" - remapped enumeration: %r", value)
|
||||
|
||||
break
|
||||
else:
|
||||
value = getattr(self, relinquishDefault)
|
||||
value_source = None
|
||||
|
||||
if _debug: Commandable._debug(" - value, value_source: %r, %r", value, value_source)
|
||||
if _debug:
|
||||
Commandable._debug(
|
||||
" - value, value_source: %r, %r", value, value_source
|
||||
)
|
||||
|
||||
# return what you found
|
||||
return value, value_source
|
||||
|
||||
def WriteProperty(self, property, value, arrayIndex=None, priority=None, direct=False):
|
||||
if _debug: Commandable._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", property, value, arrayIndex, priority, direct)
|
||||
def WriteProperty(
|
||||
self, property, value, arrayIndex=None, priority=None, direct=False
|
||||
):
|
||||
if _debug:
|
||||
Commandable._debug(
|
||||
"WriteProperty %r %r arrayIndex=%r priority=%r direct=%r",
|
||||
property,
|
||||
value,
|
||||
arrayIndex,
|
||||
priority,
|
||||
direct,
|
||||
)
|
||||
|
||||
# when writing to the presentValue with a priority
|
||||
if (property == presentValue):
|
||||
if _debug: Commandable._debug(" - writing to %s, priority %r", presentValue, priority)
|
||||
if property == presentValue:
|
||||
if _debug:
|
||||
Commandable._debug(
|
||||
" - writing to %s, priority %r", presentValue, priority
|
||||
)
|
||||
|
||||
# default (lowest) priority
|
||||
if priority is None:
|
||||
priority = 16
|
||||
if _debug: Commandable._debug(" - translate to priority array, index %d", priority)
|
||||
if _debug:
|
||||
Commandable._debug(
|
||||
" - translate to priority array, index %d", priority
|
||||
)
|
||||
|
||||
# translate to updating the priority array
|
||||
property = priorityArray
|
||||
|
@ -691,39 +898,58 @@ def Commandable(datatype, presentValue='presentValue', priorityArray='priorityAr
|
|||
priority = None
|
||||
|
||||
# update the priority array entry
|
||||
if (property == priorityArray):
|
||||
if (arrayIndex is None):
|
||||
if _debug: Commandable._debug(" - writing entire %s", priorityArray)
|
||||
if property == priorityArray:
|
||||
if arrayIndex is None:
|
||||
if _debug:
|
||||
Commandable._debug(" - writing entire %s", priorityArray)
|
||||
|
||||
# pass along the request
|
||||
super(_Commando, self).WriteProperty(
|
||||
property, value,
|
||||
arrayIndex=arrayIndex, priority=priority, direct=direct,
|
||||
)
|
||||
property,
|
||||
value,
|
||||
arrayIndex=arrayIndex,
|
||||
priority=priority,
|
||||
direct=direct,
|
||||
)
|
||||
else:
|
||||
if _debug: Commandable._debug(" - writing to %s, array index %d", priorityArray, arrayIndex)
|
||||
if _debug:
|
||||
Commandable._debug(
|
||||
" - writing to %s, array index %d",
|
||||
priorityArray,
|
||||
arrayIndex,
|
||||
)
|
||||
|
||||
# check the bounds
|
||||
if arrayIndex == 0:
|
||||
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
|
||||
raise ExecutionError(
|
||||
errorClass="property", errorCode="writeAccessDenied"
|
||||
)
|
||||
if (arrayIndex < 1) or (arrayIndex > 16):
|
||||
raise ExecutionError(errorClass='property', errorCode='invalidArrayIndex')
|
||||
raise ExecutionError(
|
||||
errorClass="property", errorCode="invalidArrayIndex"
|
||||
)
|
||||
|
||||
# update the specific priorty value element
|
||||
priority_value = getattr(self, priorityArray)[arrayIndex]
|
||||
if _debug: Commandable._debug(" - priority_value: %r", priority_value)
|
||||
if _debug:
|
||||
Commandable._debug(" - priority_value: %r", priority_value)
|
||||
|
||||
# the null or the choice has to be set, the other clear
|
||||
if value == ():
|
||||
if _debug: Commandable._debug(" - write a null")
|
||||
if _debug:
|
||||
Commandable._debug(" - write a null")
|
||||
priority_value.null = value
|
||||
setattr(priority_value, _Commando._pv_choice, None)
|
||||
else:
|
||||
if _debug: Commandable._debug(" - write a value")
|
||||
if _debug:
|
||||
Commandable._debug(" - write a value")
|
||||
|
||||
if issubclass(datatype, Enumerated):
|
||||
value = datatype._xlate_table[value]
|
||||
if _debug: Commandable._debug(" - remapped enumeration: %r", value)
|
||||
if _debug:
|
||||
Commandable._debug(
|
||||
" - remapped enumeration: %r", value
|
||||
)
|
||||
|
||||
priority_value.null = None
|
||||
setattr(priority_value, _Commando._pv_choice, value)
|
||||
|
@ -734,7 +960,8 @@ def Commandable(datatype, presentValue='presentValue', priorityArray='priorityAr
|
|||
# compare with the current value
|
||||
current_value = getattr(self, presentValue)
|
||||
if value == current_value:
|
||||
if _debug: Commandable._debug(" - no present value change")
|
||||
if _debug:
|
||||
Commandable._debug(" - no present value change")
|
||||
return
|
||||
|
||||
# turn this into a present value change
|
||||
|
@ -742,12 +969,22 @@ def Commandable(datatype, presentValue='presentValue', priorityArray='priorityAr
|
|||
arrayIndex = priority = None
|
||||
|
||||
# allow the request to pass through
|
||||
if _debug: Commandable._debug(" - super: %r %r arrayIndex=%r priority=%r", property, value, arrayIndex, priority)
|
||||
if _debug:
|
||||
Commandable._debug(
|
||||
" - super: %r %r arrayIndex=%r priority=%r",
|
||||
property,
|
||||
value,
|
||||
arrayIndex,
|
||||
priority,
|
||||
)
|
||||
|
||||
super(_Commando, self).WriteProperty(
|
||||
property, value,
|
||||
arrayIndex=arrayIndex, priority=priority, direct=direct,
|
||||
)
|
||||
property,
|
||||
value,
|
||||
arrayIndex=arrayIndex,
|
||||
priority=priority,
|
||||
direct=direct,
|
||||
)
|
||||
|
||||
# look up a matching priority value choice
|
||||
for element in PriorityValue.choiceElements:
|
||||
|
@ -755,160 +992,217 @@ def Commandable(datatype, presentValue='presentValue', priorityArray='priorityAr
|
|||
_Commando._pv_choice = element.name
|
||||
break
|
||||
else:
|
||||
_Commando._pv_choice = 'constructedValue'
|
||||
if _debug: Commandable._debug(" - _pv_choice: %r", _Commando._pv_choice)
|
||||
_Commando._pv_choice = "constructedValue"
|
||||
if _debug:
|
||||
Commandable._debug(" - _pv_choice: %r", _Commando._pv_choice)
|
||||
|
||||
# return the class
|
||||
return _Commando
|
||||
|
||||
|
||||
#
|
||||
# MinOnOffTask
|
||||
#
|
||||
|
||||
|
||||
@bacpypes_debugging
|
||||
class MinOnOffTask(OneShotTask):
|
||||
|
||||
def __init__(self, binary_obj):
|
||||
if _debug: MinOnOffTask._debug("__init__ %s", repr(binary_obj))
|
||||
if _debug:
|
||||
MinOnOffTask._debug("__init__ %s", repr(binary_obj))
|
||||
OneShotTask.__init__(self)
|
||||
|
||||
# save a reference to the object
|
||||
self.binary_obj = binary_obj
|
||||
|
||||
# listen for changes to the present value
|
||||
self.binary_obj._property_monitors['presentValue'].append(self.present_value_change)
|
||||
self.binary_obj._property_monitors["presentValue"].append(
|
||||
self.present_value_change
|
||||
)
|
||||
|
||||
def present_value_change(self, old_value, new_value):
|
||||
if _debug: MinOnOffTask._debug("present_value_change %r %r", old_value, new_value)
|
||||
if _debug:
|
||||
MinOnOffTask._debug("present_value_change %r %r", old_value, new_value)
|
||||
|
||||
# if there's no value change, skip all this
|
||||
if old_value == new_value:
|
||||
if _debug: MinOnOffTask._debug(" - no state change")
|
||||
if _debug:
|
||||
MinOnOffTask._debug(" - no state change")
|
||||
return
|
||||
|
||||
# get the minimum on/off time
|
||||
if new_value == 'inactive':
|
||||
task_delay = getattr(self.binary_obj, 'minimumOnTime') or 0
|
||||
if _debug: MinOnOffTask._debug(" - minimum on: %r", task_delay)
|
||||
elif new_value == 'active':
|
||||
task_delay = getattr(self.binary_obj, 'minimumOffTime') or 0
|
||||
if _debug: MinOnOffTask._debug(" - minimum off: %r", task_delay)
|
||||
if new_value == "inactive":
|
||||
task_delay = getattr(self.binary_obj, "minimumOnTime") or 0
|
||||
if _debug:
|
||||
MinOnOffTask._debug(" - minimum on: %r", task_delay)
|
||||
elif new_value == "active":
|
||||
task_delay = getattr(self.binary_obj, "minimumOffTime") or 0
|
||||
if _debug:
|
||||
MinOnOffTask._debug(" - minimum off: %r", task_delay)
|
||||
else:
|
||||
raise ValueError("unrecognized present value for %r: %r" % (self.binary_obj.objectIdentifier, new_value))
|
||||
raise ValueError(
|
||||
"unrecognized present value for %r: %r"
|
||||
% (self.binary_obj.objectIdentifier, new_value)
|
||||
)
|
||||
|
||||
# if there's no delay, don't bother
|
||||
if not task_delay:
|
||||
if _debug: MinOnOffTask._debug(" - no delay")
|
||||
if _debug:
|
||||
MinOnOffTask._debug(" - no delay")
|
||||
return
|
||||
|
||||
# set the value at priority 6
|
||||
self.binary_obj.WriteProperty('presentValue', new_value, priority=6)
|
||||
self.binary_obj.WriteProperty("presentValue", new_value, priority=6)
|
||||
|
||||
# install this to run, if there is a delay
|
||||
self.install_task(delta=task_delay)
|
||||
|
||||
def process_task(self):
|
||||
if _debug: MinOnOffTask._debug("process_task(%s)", self.binary_obj.objectName)
|
||||
if _debug:
|
||||
MinOnOffTask._debug("process_task(%s)", self.binary_obj.objectName)
|
||||
|
||||
# clear the value at priority 6
|
||||
self.binary_obj.WriteProperty('presentValue', (), priority=6)
|
||||
self.binary_obj.WriteProperty("presentValue", (), priority=6)
|
||||
|
||||
|
||||
#
|
||||
# MinOnOff
|
||||
#
|
||||
|
||||
|
||||
@bacpypes_debugging
|
||||
class MinOnOff(object):
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if _debug: MinOnOff._debug("__init__ ...")
|
||||
if _debug:
|
||||
MinOnOff._debug("__init__ ...")
|
||||
super(MinOnOff, self).__init__(**kwargs)
|
||||
|
||||
# create the timer task
|
||||
self._min_on_off_task = MinOnOffTask(self)
|
||||
|
||||
|
||||
#
|
||||
# Commandable Standard Objects
|
||||
#
|
||||
|
||||
|
||||
class AccessDoorCmdObject(Commandable(DoorValue), AccessDoorObject):
|
||||
pass
|
||||
|
||||
|
||||
class AnalogOutputCmdObject(Commandable(Real), AnalogOutputObject):
|
||||
pass
|
||||
|
||||
|
||||
class AnalogValueCmdObject(Commandable(Real), AnalogValueObject):
|
||||
pass
|
||||
|
||||
|
||||
### class BinaryLightingOutputCmdObject(Commandable(Real), BinaryLightingOutputObject):
|
||||
### pass
|
||||
|
||||
|
||||
class BinaryOutputCmdObject(Commandable(BinaryPV), MinOnOff, BinaryOutputObject):
|
||||
pass
|
||||
|
||||
|
||||
class BinaryValueCmdObject(Commandable(BinaryPV), MinOnOff, BinaryValueObject):
|
||||
pass
|
||||
|
||||
|
||||
class BitStringValueCmdObject(Commandable(BitString), BitStringValueObject):
|
||||
pass
|
||||
|
||||
class CharacterStringValueCmdObject(Commandable(CharacterString), CharacterStringValueObject):
|
||||
|
||||
class CharacterStringValueCmdObject(
|
||||
Commandable(CharacterString), CharacterStringValueObject
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class DateValueCmdObject(Commandable(Date), DateValueObject):
|
||||
pass
|
||||
|
||||
|
||||
class DatePatternValueCmdObject(Commandable(Date), DatePatternValueObject):
|
||||
pass
|
||||
|
||||
|
||||
class DateTimeValueCmdObject(Commandable(DateTime), DateTimeValueObject):
|
||||
pass
|
||||
|
||||
|
||||
class DateTimePatternValueCmdObject(Commandable(DateTime), DateTimePatternValueObject):
|
||||
pass
|
||||
|
||||
|
||||
class IntegerValueCmdObject(Commandable(Integer), IntegerValueObject):
|
||||
pass
|
||||
|
||||
|
||||
class LargeAnalogValueCmdObject(Commandable(Double), LargeAnalogValueObject):
|
||||
pass
|
||||
|
||||
|
||||
class LightingOutputCmdObject(Commandable(Real), LightingOutputObject):
|
||||
pass
|
||||
|
||||
|
||||
class MultiStateOutputCmdObject(Commandable(Unsigned), MultiStateOutputObject):
|
||||
pass
|
||||
|
||||
|
||||
class MultiStateValueCmdObject(Commandable(Unsigned), MultiStateValueObject):
|
||||
pass
|
||||
|
||||
|
||||
class OctetStringValueCmdObject(Commandable(OctetString), OctetStringValueObject):
|
||||
pass
|
||||
|
||||
|
||||
class PositiveIntegerValueCmdObject(Commandable(Unsigned), PositiveIntegerValueObject):
|
||||
pass
|
||||
|
||||
|
||||
class TimeValueCmdObject(Commandable(Time), TimeValueObject):
|
||||
pass
|
||||
|
||||
|
||||
class TimePatternValueCmdObject(Commandable(Time), TimePatternValueObject):
|
||||
pass
|
||||
|
||||
|
||||
@bacpypes_debugging
|
||||
class ChannelValueProperty(Property):
|
||||
|
||||
def __init__(self):
|
||||
if _debug: ChannelValueProperty._debug("__init__")
|
||||
Property.__init__(self, 'presentValue', ChannelValue, default=None, optional=False, mutable=True)
|
||||
if _debug:
|
||||
ChannelValueProperty._debug("__init__")
|
||||
Property.__init__(
|
||||
self,
|
||||
"presentValue",
|
||||
ChannelValue,
|
||||
default=None,
|
||||
optional=False,
|
||||
mutable=True,
|
||||
)
|
||||
|
||||
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
|
||||
if _debug: ChannelValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
|
||||
if _debug:
|
||||
ChannelValueProperty._debug(
|
||||
"WriteProperty %r %r arrayIndex=%r priority=%r direct=%r",
|
||||
obj,
|
||||
value,
|
||||
arrayIndex,
|
||||
priority,
|
||||
direct,
|
||||
)
|
||||
|
||||
### Clause 12.53.5, page 487
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class ChannelCmdObject(ChannelObject):
|
||||
|
||||
properties = [
|
||||
ChannelValueProperty(),
|
||||
]
|
||||
]
|
||||
|
|
|
@ -216,6 +216,7 @@ class Property(object):
|
|||
|
||||
# see if it can be changed
|
||||
if not self.mutable:
|
||||
if _debug: Property._debug(" - property is immutable")
|
||||
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
|
||||
|
||||
# if changing the length of the array, the value is unsigned
|
||||
|
|
125
samples/RenameAnalogValueObject.py
Normal file
125
samples/RenameAnalogValueObject.py
Normal file
|
@ -0,0 +1,125 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
This sample application shows how to extend one of the basic objects, an Analog
|
||||
Value Object in this case, to allow the object to be renamed.
|
||||
"""
|
||||
|
||||
from bacpypes.debugging import ModuleLogger
|
||||
from bacpypes.consolelogging import ConfigArgumentParser
|
||||
|
||||
from bacpypes.core import run
|
||||
|
||||
from bacpypes.object import AnalogValueObject, BinaryValueObject, register_object_type
|
||||
|
||||
from bacpypes.app import BIPSimpleApplication
|
||||
from bacpypes.local.device import LocalDeviceObject
|
||||
from bacpypes.local.object import WriteableObjectNameMixIn, WriteableObjectIdentifierMixIn
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
||||
#
|
||||
# Analog Value Object that can be renamed
|
||||
#
|
||||
|
||||
|
||||
@register_object_type
|
||||
class SampleAnalogValueObject(WriteableObjectNameMixIn, AnalogValueObject):
|
||||
def __init__(self, **kwargs):
|
||||
if _debug:
|
||||
SampleAnalogValueObject._debug("__init__ %r", kwargs)
|
||||
AnalogValueObject.__init__(self, **kwargs)
|
||||
|
||||
# add a callback when the object name has changed
|
||||
self._property_monitors["objectName"].append(self.object_name_changed)
|
||||
|
||||
def object_name_changed(self, old_value, new_value):
|
||||
if _debug:
|
||||
SampleAnalogValueObject._debug(
|
||||
"object_name_changed %r %r", old_value, new_value
|
||||
)
|
||||
print("object name changed from %r to %r" % (old_value, new_value))
|
||||
|
||||
|
||||
#
|
||||
# Binary Value Object that can be given a new object identifier
|
||||
#
|
||||
|
||||
|
||||
@register_object_type
|
||||
class SampleBinaryValueObject(WriteableObjectIdentifierMixIn, BinaryValueObject):
|
||||
def __init__(self, **kwargs):
|
||||
if _debug:
|
||||
SampleBinaryValueObject._debug("__init__ %r", kwargs)
|
||||
BinaryValueObject.__init__(self, **kwargs)
|
||||
|
||||
# add a callback when the object name has changed
|
||||
self._property_monitors["objectIdentifier"].append(self.object_identifier_changed)
|
||||
|
||||
def object_identifier_changed(self, old_value, new_value):
|
||||
if _debug:
|
||||
SampleBinaryValueObject._debug(
|
||||
"object_identifier_changed %r %r", old_value, new_value
|
||||
)
|
||||
print("object identifier changed from %r to %r" % (old_value, new_value))
|
||||
|
||||
|
||||
#
|
||||
# __main__
|
||||
#
|
||||
|
||||
|
||||
def main():
|
||||
# parse the command line arguments
|
||||
args = ConfigArgumentParser(description=__doc__).parse_args()
|
||||
|
||||
if _debug:
|
||||
_log.debug("initialization")
|
||||
_log.debug(" - args: %r", args)
|
||||
|
||||
# make a device object
|
||||
this_device = LocalDeviceObject(
|
||||
objectName=args.ini.objectname,
|
||||
objectIdentifier=("device", int(args.ini.objectidentifier)),
|
||||
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
|
||||
segmentationSupported=args.ini.segmentationsupported,
|
||||
vendorIdentifier=int(args.ini.vendoridentifier),
|
||||
)
|
||||
|
||||
# make a sample application
|
||||
this_application = BIPSimpleApplication(this_device, args.ini.address)
|
||||
|
||||
# make some objects
|
||||
savo = SampleAnalogValueObject(
|
||||
objectIdentifier=("analogValue", 1),
|
||||
objectName="SampleAnalogValueObject",
|
||||
presentValue=123.4,
|
||||
)
|
||||
_log.debug(" - savo: %r", savo)
|
||||
|
||||
this_application.add_object(savo)
|
||||
|
||||
# make some objects
|
||||
sbvo = SampleBinaryValueObject(
|
||||
objectIdentifier=("binaryValue", 1),
|
||||
objectName="SampleBinaryValueObject",
|
||||
presentValue=True,
|
||||
)
|
||||
_log.debug(" - sbvo: %r", sbvo)
|
||||
|
||||
this_application.add_object(sbvo)
|
||||
|
||||
# make sure they are all there
|
||||
_log.debug(" - object list: %r", this_device.objectList)
|
||||
|
||||
_log.debug("running")
|
||||
|
||||
run()
|
||||
|
||||
_log.debug("fini")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue
Block a user