1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

considerably better version of commandable objects, but still not for channel objects

This commit is contained in:
Joel Bender 2018-01-10 22:44:38 -05:00
parent d0b680760c
commit 47934ebe4f

445
samples/CommandableMixin.py Executable file → Normal file
View File

@ -1,154 +1,371 @@
#!/usr/bin/env python
"""
This sample application demonstrates a mix-in class for commandable properties
(not useful for Binary Out or Binary Value objects that have a minimum on and off
time, or for Channel objects).
Rebuilt Commandable
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.task import OneShotTask
from bacpypes.errors import ExecutionError
from bacpypes.object import AnalogValueObject, DateValueObject
from bacpypes.primitivedata import Null, Date
from bacpypes.basetypes import PriorityValue, PriorityArray
from bacpypes.primitivedata import BitString, CharacterString, Date, Integer, \
Double, Enumerated, OctetString, Real, Time, Unsigned
from bacpypes.basetypes import BinaryPV, ChannelValue, DateTime, DoorValue, PriorityValue, \
PriorityArray
from bacpypes.object import Property, ReadableProperty, WritableProperty, \
register_object_type, \
AccessDoorObject, AnalogOutputObject, AnalogValueObject, \
BinaryOutputObject, BinaryValueObject, BitStringValueObject, CharacterStringValueObject, \
DateValueObject, DatePatternValueObject, DateTimePatternValueObject, \
DateTimeValueObject, IntegerValueObject, \
LargeAnalogValueObject, LightingOutputObject, MultiStateOutputObject, \
MultiStateValueObject, OctetStringValueObject, PositiveIntegerValueObject, \
TimeValueObject, TimePatternValueObject, ChannelObject
from bacpypes.app import BIPSimpleApplication
from bacpypes.service.object import CurrentPropertyListMixIn
from bacpypes.service.device import LocalDeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# CommandableMixin
# Commandable
#
@bacpypes_debugging
class CommandableMixin(object):
def Commandable(datatype, presentValue='presentValue', priorityArray='priorityArray', relinquishDefault='relinquishDefault'):
if _debug: Commandable._debug("Commandable %r ...", datatype)
def __init__(self, init_value, **kwargs):
if _debug: CommandableMixin._debug("__init__ %r, %r", init_value, kwargs)
super(CommandableMixin, self).__init__(**kwargs)
class _Commando(object):
# if no present value given, give it the default value
if ('presentValue' not in kwargs):
if _debug: CommandableMixin._debug(" - initialize present value")
self.presentValue = init_value
properties = [
WritableProperty(presentValue, datatype),
ReadableProperty(priorityArray, PriorityArray),
ReadableProperty(relinquishDefault, datatype),
]
# if no priority array given, give it an empty one
if ('priorityArray' not in kwargs):
if _debug: CommandableMixin._debug(" - initialize priority array")
self.priorityArray = PriorityArray()
for i in range(16):
self.priorityArray.append(PriorityValue(null=Null()))
_pv_choice = None
# if no relinquish default value given, give it the default value
if ('relinquishDefault' not in kwargs):
if _debug: CommandableMixin._debug(" - initialize relinquish default")
self.relinquishDefault = init_value
def __init__(self, **kwargs):
super(_Commando, self).__init__(**kwargs)
# capture the present value property
self._pv = self._properties['presentValue']
if _debug: CommandableMixin._debug(" - _pv: %r", self._pv)
# build a default value in case one is needed
default_value = datatype().value
if issubclass(datatype, Enumerated):
default_value = datatype._xlate_table[default_value]
if _debug: Commandable._debug(" - default_value: %r", default_value)
# capture the datatype
self._pv_datatype = self._pv.datatype
if _debug: CommandableMixin._debug(" - _pv_datatype: %r", self._pv_datatype)
# see if a present value was provided
if (presentValue not in kwargs):
setattr(self, presentValue, default_value)
# look up a matching priority value choice
for element in PriorityValue.choiceElements:
if element.klass is self._pv_datatype:
self._pv_choice = element.name
break
else:
self._pv_choice = 'constructedValue'
if _debug: CommandableMixin._debug(" - _pv_choice: %r", self._pv_choice)
# see if a priority array was provided
if (priorityArray not in kwargs):
new_priority_array = PriorityArray()
for i in range(16):
new_priority_array.append(PriorityValue(null=()))
setattr(self, priorityArray, new_priority_array)
def WriteProperty(self, property, value, arrayIndex=None, priority=None, direct=False):
if _debug: CommandableMixin._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", property, value, arrayIndex, priority, direct)
# see if a present value was provided
if (relinquishDefault not in kwargs):
setattr(self, relinquishDefault, default_value)
# when writing to the presentValue with a priority
if (property == 'presentValue'):
# default (lowest) priority
if priority is None:
priority = 16
if _debug: CommandableMixin._debug(" - translate to array index %d", priority)
def _highest_priority_value(self):
if _debug: Commandable._debug("_highest_priority_value")
# translate to updating the priority array
property = 'priorityArray'
arrayIndex = priority
priority = None
# update the priority array entry
if (property == 'priorityArray') and (arrayIndex is not None):
# check the bounds
if arrayIndex == 0:
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
if (arrayIndex < 1) or (arrayIndex > 16):
raise ExecutionError(errorClass='property', errorCode='invalidArrayIndex')
# update the specific priorty value element
priority_value = self.priorityArray[arrayIndex]
if _debug: CommandableMixin._debug(" - priority_value: %r", priority_value)
# the null or the choice has to be set, the other clear
if value is ():
if _debug: CommandableMixin._debug(" - write a null")
priority_value.null = value
setattr(priority_value, self._pv_choice, None)
else:
if _debug: CommandableMixin._debug(" - write a value")
priority_value.null = None
setattr(priority_value, self._pv_choice, value)
# look for the highest priority value
priority_array = getattr(self, priorityArray)
for i in range(1, 17):
priority_value = self.priorityArray[i]
priority_value = priority_array[i]
if priority_value.null is None:
if (i < arrayIndex):
if _debug: CommandableMixin._debug(" - existing higher priority value")
return
value = getattr(priority_value, self._pv_choice)
if _debug: Commandable._debug(" - found at index: %r", i)
value = getattr(priority_value, _Commando._pv_choice)
value_source = "###"
if issubclass(datatype, Enumerated):
value = datatype._xlate_table[value]
if _debug: Commandable._debug(" - remapped enumeration: %r", value)
break
else:
value = self.relinquishDefault
if _debug: CommandableMixin._debug(" - new present value: %r", value)
value = getattr(self, relinquishDefault)
value_source = None
property = 'presentValue'
arrayIndex = priority = None
if _debug: Commandable._debug(" - value, value_source: %r, %r", value, value_source)
# allow the request to pass through
if _debug: CommandableMixin._debug(" - super: %r %r arrayIndex=%r priority=%r", property, value, arrayIndex, priority)
super(CommandableMixin, self).WriteProperty(
property, value,
arrayIndex=arrayIndex, priority=priority, direct=direct,
)
# return what you found
return value, value_source
def WriteProperty(self, property, value, arrayIndex=None, priority=None, direct=False):
if _debug: Commandable._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", property, value, arrayIndex, priority, direct)
# when writing to the presentValue with a priority
if (property == presentValue):
if _debug: Commandable._debug(" - writing to %s, priority %r", presentValue, priority)
# default (lowest) priority
if priority is None:
priority = 16
if _debug: Commandable._debug(" - translate to priority array, index %d", priority)
# translate to updating the priority array
property = priorityArray
arrayIndex = priority
priority = None
# update the priority array entry
if (property == priorityArray):
if (arrayIndex is None):
if _debug: Commandable._debug(" - writing entire %s", priorityArray)
# pass along the request
super(_Commando, self).WriteProperty(
property, value,
arrayIndex=arrayIndex, priority=priority, direct=direct,
)
else:
if _debug: Commandable._debug(" - writing to %s, array index %d", priorityArray, arrayIndex)
# check the bounds
if arrayIndex == 0:
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
if (arrayIndex < 1) or (arrayIndex > 16):
raise ExecutionError(errorClass='property', errorCode='invalidArrayIndex')
# update the specific priorty value element
priority_value = getattr(self, priorityArray)[arrayIndex]
if _debug: Commandable._debug(" - priority_value: %r", priority_value)
# the null or the choice has to be set, the other clear
if value is ():
if _debug: Commandable._debug(" - write a null")
priority_value.null = value
setattr(priority_value, _Commando._pv_choice, None)
else:
if _debug: Commandable._debug(" - write a value")
if issubclass(datatype, Enumerated):
value = datatype._xlate_table[value]
if _debug: Commandable._debug(" - remapped enumeration: %r", value)
priority_value.null = None
setattr(priority_value, _Commando._pv_choice, value)
# look for the highest priority value
value, value_source = self._highest_priority_value()
# compare with the current value
current_value = getattr(self, presentValue)
if value == current_value:
if _debug: Commandable._debug(" - no present value change")
return
# turn this into a present value change
property = presentValue
arrayIndex = priority = None
# allow the request to pass through
if _debug: Commandable._debug(" - super: %r %r arrayIndex=%r priority=%r", property, value, arrayIndex, priority)
super(_Commando, self).WriteProperty(
property, value,
arrayIndex=arrayIndex, priority=priority, direct=direct,
)
# look up a matching priority value choice
for element in PriorityValue.choiceElements:
if issubclass(datatype, element.klass):
_Commando._pv_choice = element.name
break
else:
_Commando._pv_choice = 'constructedValue'
if _debug: Commandable._debug(" - _pv_choice: %r", _Commando._pv_choice)
# return the class
return _Commando
#
# CommandableAnalogValueObject
# MinOnOffTask
#
@bacpypes_debugging
class CommandableAnalogValueObject(CommandableMixin, AnalogValueObject):
class MinOnOffTask(OneShotTask):
def __init__(self, **kwargs):
if _debug: CommandableAnalogValueObject._debug("__init__ %r", kwargs)
CommandableMixin.__init__(self, 0.0, **kwargs)
def __init__(self, binary_obj):
if _debug: MinOnOffTask._debug("__init__ %s", repr(binary_obj))
OneShotTask.__init__(self)
# save a reference to the object
self.binary_obj = binary_obj
# listen for changes to the present value
self.binary_obj._property_monitors['presentValue'].append(self.present_value_change)
def present_value_change(self, old_value, new_value):
if _debug: MinOnOffTask._debug("present_value_change %r %r", old_value, new_value)
# if there's no value change, skip all this
if old_value == new_value:
if _debug: MinOnOffTask._debug(" - no state change")
return
# get the minimum on/off time
if new_value == 'inactive':
task_delay = getattr(self.binary_obj, 'minimumOnTime') or 0
if _debug: MinOnOffTask._debug(" - minimum on: %r", task_delay)
elif new_value == 'active':
task_delay = getattr(self.binary_obj, 'minimumOffTime') or 0
if _debug: MinOnOffTask._debug(" - minimum off: %r", task_delay)
else:
raise ValueError("unrecognized present value for %r: %r" % (self.binary_obj.objectIdentifier, new_value))
# if there's no delay, don't bother
if not task_delay:
if _debug: MinOnOffTask._debug(" - no delay")
return
# set the value at priority 6
self.binary_obj.WriteProperty('presentValue', new_value, priority=6)
# install this to run, if there is a delay
self.install_task(delta=task_delay)
def process_task(self):
if _debug: MinOnOffTask._debug("process_task(%s)", self.binary_obj.objectName)
# clear the value at priority 6
self.binary_obj.WriteProperty('presentValue', (), priority=6)
#
# CommandableDateValueObject
# MinOnOff
#
@bacpypes_debugging
class CommandableDateValueObject(CommandableMixin, DateValueObject):
class MinOnOff(object):
def __init__(self, **kwargs):
if _debug: CommandableDateValueObject._debug("__init__ %r", kwargs)
CommandableMixin.__init__(self, None, **kwargs)
if _debug: MinOnOff._debug("__init__ ...")
super(MinOnOff, self).__init__(**kwargs)
# create the timer task
self._min_on_off_task = MinOnOffTask(self)
#
# Commandable Standard Objects
#
class AccessDoorObjectCmd(Commandable(DoorValue), AccessDoorObject):
pass
class AnalogOutputObjectCmd(Commandable(Real), AnalogOutputObject):
pass
class AnalogValueObjectCmd(Commandable(Real), AnalogValueObject):
pass
### class BinaryLightingOutputObjectCmd(Commandable(Real), BinaryLightingOutputObject):
### pass
class BinaryOutputObjectCmd(Commandable(BinaryPV), MinOnOff, BinaryOutputObject):
pass
class BinaryValueObjectCmd(Commandable(BinaryPV), MinOnOff, BinaryValueObject):
pass
class BitStringValueObjectCmd(Commandable(BitString), BitStringValueObject):
pass
class CharacterStringValueObjectCmd(Commandable(CharacterString), CharacterStringValueObject):
pass
class DateValueObjectCmd(Commandable(Date), DateValueObject):
pass
class DatePatternValueObjectCmd(Commandable(Date), DatePatternValueObject):
pass
class DateTimeValueObjectCmd(Commandable(DateTime), DateTimeValueObject):
pass
class DateTimePatternValueObjectCmd(Commandable(DateTime), DateTimePatternValueObject):
pass
class IntegerValueObjectCmd(Commandable(Integer), IntegerValueObject):
pass
class LargeAnalogValueObjectCmd(Commandable(Double), LargeAnalogValueObject):
pass
class LightingOutputObjectCmd(Commandable(Real), LightingOutputObject):
pass
class MultiStateOutputObjectCmd(Commandable(Unsigned), MultiStateOutputObject):
pass
class MultiStateValueObjectCmd(Commandable(Unsigned), MultiStateValueObject):
pass
class OctetStringValueObjectCmd(Commandable(OctetString), OctetStringValueObject):
pass
class PositiveIntegerValueObjectCmd(Commandable(Unsigned), PositiveIntegerValueObject):
pass
class TimeValueObjectCmd(Commandable(Time), TimeValueObject):
pass
class TimePatternValueObjectCmd(Commandable(Time), TimePatternValueObject):
pass
#
# ChannelValueProperty
#
class ChannelValueProperty(Property):
def __init__(self):
if _debug: ChannelValueProperty._debug("__init__")
Property.__init__(self, 'presentValue', ChannelValue, default=None, optional=False, mutable=True)
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug: ChannelValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
### Clause 12.53.5, page 487
raise NotImplementedError()
#
# ChannelObjectCmd
#
class ChannelObjectCmd(ChannelObject):
properties = [
ChannelValueProperty(),
]
##
##
##
##
##
@register_object_type(vendor_id=999)
class LocalAnalogValueObjectCmd(CurrentPropertyListMixIn, AnalogValueObjectCmd):
pass
@register_object_type(vendor_id=999)
class LocalBinaryOutputObjectCmd(CurrentPropertyListMixIn, BinaryOutputObjectCmd):
pass
@register_object_type(vendor_id=999)
class LocalDateValueObjectCmd(CurrentPropertyListMixIn, DateValueObjectCmd):
pass
#
# __main__
@ -174,22 +391,36 @@ def main():
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make a commandable analog value object, add to the device
cavo1 = CommandableAnalogValueObject(
objectIdentifier=('analogValue', 1), objectName='Commandable1',
avo1 = LocalAnalogValueObjectCmd(
objectIdentifier=('analogValue', 1),
objectName='avo1',
)
if _debug: _log.debug(" - cavo1: %r", cavo1)
this_application.add_object(cavo1)
if _debug: _log.debug(" - avo1: %r", avo1)
this_application.add_object(avo1)
# make a commandable binary output object, add to the device
boo1 = LocalBinaryOutputObjectCmd(
objectIdentifier=('binaryOutput', 1),
objectName='boo1',
presentValue='inactive',
relinquishDefault='inactive',
minimumOnTime=5, # let it warm up
minimumOffTime=10, # let it cool off
)
if _debug: _log.debug(" - boo1: %r", boo1)
this_application.add_object(boo1)
# get the current date
today = Date().now()
# make a commandable date value object, add to the device
cdvo2 = CommandableDateValueObject(
objectIdentifier=('dateValue', 1), objectName='Commandable2',
dvo1 = LocalDateValueObjectCmd(
objectIdentifier=('dateValue', 1),
objectName='dvo1',
presentValue=today.value,
)
if _debug: _log.debug(" - cdvo2: %r", cdvo2)
this_application.add_object(cdvo2)
if _debug: _log.debug(" - dvo1: %r", dvo1)
this_application.add_object(dvo1)
if _debug: _log.debug("running")