1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

add a detect module

This commit is contained in:
Joel Bender 2016-09-16 01:34:02 -04:00
parent 8e9869c13c
commit 04e18c35af
2 changed files with 146 additions and 1 deletions

View File

@ -5,9 +5,9 @@ Service Subpackage
"""
from . import test
from . import detect
from . import device
from . import object
from . import cov
from . import file

145
py27/bacpypes/service/detect.py Executable file
View File

@ -0,0 +1,145 @@
#!/usr/bin/env python
"""
Detection
"""
from functools import partial
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.core import deferred
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# DetectionMonitor
#
@bacpypes_debugging
class DetectionMonitor:
def __init__(self, algorithm, parameter, obj, prop, filter=None):
if _debug: DetectionMonitor._debug("__init__ ...")
# keep track of the parameter values
self.algorithm = algorithm
self.parameter = parameter
self.obj = obj
self.prop = prop
self.filter = None
def property_change(self, old_value, new_value):
if _debug: DetectionMonitor._debug("property_change %r %r", old_value, new_value)
# set the parameter value
setattr(self.algorithm, self.parameter, new_value)
# if the algorithm is already triggered, don't bother checking for more
if self.algorithm._triggered:
if _debug: DetectionMonitor._debug(" - already triggered")
return
# if there is a special filter, use it, otherwise use !=
if self.filter:
trigger = self.filter(old_value, new_value)
else:
trigger = (old_value != new_value)
if _debug: DetectionMonitor._debug(" - trigger: %r", trigger)
# trigger it
if trigger:
deferred(self.algorithm._execute)
if _debug: DetectionMonitor._debug(" - deferred: %r", self.algorithm._execute)
self.algorithm._triggered = True
#
# monitor_filter
#
def monitor_filter(parameter):
def transfer_filter_decorator(fn):
fn._monitor_filter = parameter
return fn
return transfer_filter_decorator
#
# DetectionAlgorithm
#
@bacpypes_debugging
class DetectionAlgorithm:
def __init__(self):
if _debug: DetectionAlgorithm._debug("__init__")
# monitor objects
self._monitors = []
# triggered flag, set when a parameter changed and the monitor
# schedules the algorithm to execute
self._triggered = False
def bind(self, **kwargs):
if _debug: DetectionAlgorithm._debug("bind %r", kwargs)
# build a map of methods that are filters. These have been decorated
# with monitor_filter, but they are unbound methods (or simply
# functions in Python3) at the time they are decorated but by looking
# for them now they are bound to this instance.
monitor_filters = {}
for attr_name in dir(self):
attr = getattr(self, attr_name)
if hasattr(attr, "_monitor_filter"):
monitor_filters[attr._monitor_filter] = attr
if _debug: DetectionAlgorithm._debug(" - monitor_filters: %r", monitor_filters)
for parameter, (obj, prop) in kwargs.items():
if not hasattr(self, parameter):
if _debug: DetectionAlgorithm._debug(" - no matching parameter: %r", parameter)
# make a detection monitor
monitor = DetectionMonitor(self, parameter, obj, prop)
if _debug: DetectionAlgorithm._debug(" - monitor: %r", monitor)
# check to see if there is a custom filter for it
if parameter in monitor_filters:
monitor.filter = monitor_filters[parameter]
# keep track of all of these objects for if/when we unbind
self._monitors.append(monitor)
# add the property value monitor function
obj._property_monitors[prop].append(monitor.property_change)
# set the parameter value to the property value if it's not None
property_value = obj._values[prop]
if property_value is not None:
if _debug: DetectionAlgorithm._debug(" - %s: %r", parameter, property_value)
setattr(self, parameter, property_value)
def unbind(self):
if _debug: DetectionAlgorithm._debug("unbind")
# remove the property value monitor functions
for monitor in self._monitors:
if _debug: DetectionAlgorithm._debug(" - monitor: %r", monitor)
monitor.obj._property_monitors[monitor.prop].remove(monitor.property_change)
# abandon the array
self._monitors = []
def _execute(self):
if _debug: DetectionAlgorithm._debug("_execute")
# provided by the derived class
self.execute()
# turn the trigger off
self._triggered = False
def execute(self):
raise notImplementedError("execute not implemented")