mirror of
https://github.com/JoelBender/bacpypes
synced 2025-09-28 22:15:23 +08:00
add some AV and BV task and thread options
This commit is contained in:
parent
d316111a06
commit
3fd5419790
|
@ -6,11 +6,15 @@ The console accepts commands that change the properties of an object that
|
||||||
triggers the notifications.
|
triggers the notifications.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
from threading import Thread
|
||||||
|
|
||||||
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
|
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
|
||||||
from bacpypes.consolelogging import ConfigArgumentParser
|
from bacpypes.consolelogging import ConfigArgumentParser
|
||||||
from bacpypes.consolecmd import ConsoleCmd
|
from bacpypes.consolecmd import ConsoleCmd
|
||||||
|
|
||||||
from bacpypes.core import run, enable_sleeping
|
from bacpypes.core import run, deferred, enable_sleeping
|
||||||
|
from bacpypes.task import RecurringTask
|
||||||
|
|
||||||
from bacpypes.app import BIPSimpleApplication
|
from bacpypes.app import BIPSimpleApplication
|
||||||
from bacpypes.object import AnalogValueObject, BinaryValueObject
|
from bacpypes.object import AnalogValueObject, BinaryValueObject
|
||||||
|
@ -22,6 +26,8 @@ _debug = 0
|
||||||
_log = ModuleLogger(globals())
|
_log = ModuleLogger(globals())
|
||||||
|
|
||||||
# test globals
|
# test globals
|
||||||
|
test_av = None
|
||||||
|
test_bv = None
|
||||||
test_application = None
|
test_application = None
|
||||||
|
|
||||||
#
|
#
|
||||||
|
@ -180,8 +186,147 @@ class COVConsoleCmd(ConsoleCmd):
|
||||||
print("exception: %s" % (err,))
|
print("exception: %s" % (err,))
|
||||||
|
|
||||||
|
|
||||||
|
@bacpypes_debugging
|
||||||
|
class TestAnalogValueTask(RecurringTask):
|
||||||
|
|
||||||
|
"""
|
||||||
|
An instance of this class is created when '--avtask <interval>' is
|
||||||
|
specified as a command line argument. Every <interval> seconds it
|
||||||
|
changes the value of the test_av present value.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, interval):
|
||||||
|
if _debug: TestAnalogValueTask._debug("__init__ %r", interval)
|
||||||
|
RecurringTask.__init__(self, interval * 1000)
|
||||||
|
|
||||||
|
# make a list of test values
|
||||||
|
self.test_values = list(float(i * 10) for i in range(10))
|
||||||
|
|
||||||
|
def process_task(self):
|
||||||
|
if _debug: TestAnalogValueTask._debug("process_task")
|
||||||
|
global test_av
|
||||||
|
|
||||||
|
# pop the next value
|
||||||
|
next_value = self.test_values.pop(0)
|
||||||
|
self.test_values.append(next_value)
|
||||||
|
if _debug: TestAnalogValueTask._debug(" - next_value: %r", next_value)
|
||||||
|
|
||||||
|
# change the point
|
||||||
|
test_av.presentValue = next_value
|
||||||
|
|
||||||
|
|
||||||
|
@bacpypes_debugging
|
||||||
|
class TestAnalogValueThread(Thread):
|
||||||
|
|
||||||
|
"""
|
||||||
|
An instance of this class is created when '--avthread <interval>' is
|
||||||
|
specified as a command line argument. Every <interval> seconds it
|
||||||
|
changes the value of the test_av present value.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, interval):
|
||||||
|
if _debug: TestAnalogValueThread._debug("__init__ %r", interval)
|
||||||
|
Thread.__init__(self)
|
||||||
|
|
||||||
|
# runs as a daemon
|
||||||
|
self.daemon = True
|
||||||
|
|
||||||
|
# save the interval
|
||||||
|
self.interval = interval
|
||||||
|
|
||||||
|
# make a list of test values
|
||||||
|
self.test_values = list(100.0 + float(i * 10) for i in range(10))
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
if _debug: TestAnalogValueThread._debug("run")
|
||||||
|
global test_av
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# pop the next value
|
||||||
|
next_value = self.test_values.pop(0)
|
||||||
|
self.test_values.append(next_value)
|
||||||
|
if _debug: TestAnalogValueThread._debug(" - next_value: %r", next_value)
|
||||||
|
|
||||||
|
# change the point
|
||||||
|
test_av.presentValue = next_value
|
||||||
|
|
||||||
|
# sleep
|
||||||
|
time.sleep(self.interval)
|
||||||
|
|
||||||
|
|
||||||
|
@bacpypes_debugging
|
||||||
|
class TestBinaryValueTask(RecurringTask):
|
||||||
|
|
||||||
|
"""
|
||||||
|
An instance of this class is created when '--bvtask <interval>' is
|
||||||
|
specified as a command line argument. Every <interval> seconds it
|
||||||
|
changes the value of the test_bv present value.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, interval):
|
||||||
|
if _debug: TestBinaryValueTask._debug("__init__ %r", interval)
|
||||||
|
RecurringTask.__init__(self, interval * 1000)
|
||||||
|
|
||||||
|
# save the interval
|
||||||
|
self.interval = interval
|
||||||
|
|
||||||
|
# make a list of test values
|
||||||
|
self.test_values = [True, False]
|
||||||
|
|
||||||
|
def process_task(self):
|
||||||
|
if _debug: TestBinaryValueTask._debug("process_task")
|
||||||
|
global test_bv
|
||||||
|
|
||||||
|
# pop the next value
|
||||||
|
next_value = self.test_values.pop(0)
|
||||||
|
self.test_values.append(next_value)
|
||||||
|
if _debug: TestBinaryValueTask._debug(" - next_value: %r", next_value)
|
||||||
|
|
||||||
|
# change the point
|
||||||
|
test_bv.presentValue = next_value
|
||||||
|
|
||||||
|
|
||||||
|
@bacpypes_debugging
|
||||||
|
class TestBinaryValueThread(RecurringTask, Thread):
|
||||||
|
|
||||||
|
"""
|
||||||
|
An instance of this class is created when '--bvthread <interval>' is
|
||||||
|
specified as a command line argument. Every <interval> seconds it
|
||||||
|
changes the value of the test_bv present value.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, interval):
|
||||||
|
if _debug: TestBinaryValueThread._debug("__init__ %r", interval)
|
||||||
|
Thread.__init__(self)
|
||||||
|
|
||||||
|
# runs as a daemon
|
||||||
|
self.daemon = True
|
||||||
|
|
||||||
|
# save the interval
|
||||||
|
self.interval = interval
|
||||||
|
|
||||||
|
# make a list of test values
|
||||||
|
self.test_values = [True, False]
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
if _debug: TestBinaryValueThread._debug("run")
|
||||||
|
global test_bv
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# pop the next value
|
||||||
|
next_value = self.test_values.pop(0)
|
||||||
|
self.test_values.append(next_value)
|
||||||
|
if _debug: TestBinaryValueThread._debug(" - next_value: %r", next_value)
|
||||||
|
|
||||||
|
# change the point
|
||||||
|
test_bv.presentValue = next_value
|
||||||
|
|
||||||
|
# sleep
|
||||||
|
time.sleep(self.interval)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
global test_application
|
global test_av, test_bv, test_application
|
||||||
|
|
||||||
# make a parser
|
# make a parser
|
||||||
parser = ConfigArgumentParser(description=__doc__)
|
parser = ConfigArgumentParser(description=__doc__)
|
||||||
|
@ -191,6 +336,28 @@ def main():
|
||||||
help="create a console",
|
help="create a console",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# analog value task and thread
|
||||||
|
parser.add_argument("--avtask", type=float,
|
||||||
|
help="analog value recurring task",
|
||||||
|
)
|
||||||
|
parser.add_argument("--avthread", type=float,
|
||||||
|
help="analog value thread",
|
||||||
|
)
|
||||||
|
|
||||||
|
# analog value task and thread
|
||||||
|
parser.add_argument("--bvtask", type=float,
|
||||||
|
help="binary value recurring task",
|
||||||
|
)
|
||||||
|
parser.add_argument("--bvthread", type=float,
|
||||||
|
help="binary value thread",
|
||||||
|
)
|
||||||
|
|
||||||
|
# provide a different spin value
|
||||||
|
parser.add_argument("--spin", type=float,
|
||||||
|
help="spin time",
|
||||||
|
default=1.0,
|
||||||
|
)
|
||||||
|
|
||||||
# parse the command line arguments
|
# parse the command line arguments
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
@ -209,18 +376,6 @@ def main():
|
||||||
# make a sample application
|
# make a sample application
|
||||||
test_application = SubscribeCOVApplication(test_device, args.ini.address)
|
test_application = SubscribeCOVApplication(test_device, args.ini.address)
|
||||||
|
|
||||||
# make a binary value object
|
|
||||||
test_bv = BinaryValueObject(
|
|
||||||
objectIdentifier=('binaryValue', 1),
|
|
||||||
objectName='bv',
|
|
||||||
presentValue='inactive',
|
|
||||||
statusFlags=[0, 0, 0, 0],
|
|
||||||
)
|
|
||||||
_log.debug(" - test_bv: %r", test_bv)
|
|
||||||
|
|
||||||
# add it to the device
|
|
||||||
test_application.add_object(test_bv)
|
|
||||||
|
|
||||||
# make an analog value object
|
# make an analog value object
|
||||||
test_av = AnalogValueObject(
|
test_av = AnalogValueObject(
|
||||||
objectIdentifier=('analogValue', 1),
|
objectIdentifier=('analogValue', 1),
|
||||||
|
@ -235,6 +390,18 @@ def main():
|
||||||
test_application.add_object(test_av)
|
test_application.add_object(test_av)
|
||||||
_log.debug(" - object list: %r", test_device.objectList)
|
_log.debug(" - object list: %r", test_device.objectList)
|
||||||
|
|
||||||
|
# make a binary value object
|
||||||
|
test_bv = BinaryValueObject(
|
||||||
|
objectIdentifier=('binaryValue', 1),
|
||||||
|
objectName='bv',
|
||||||
|
presentValue='inactive',
|
||||||
|
statusFlags=[0, 0, 0, 0],
|
||||||
|
)
|
||||||
|
_log.debug(" - test_bv: %r", test_bv)
|
||||||
|
|
||||||
|
# add it to the device
|
||||||
|
test_application.add_object(test_bv)
|
||||||
|
|
||||||
# get the services supported
|
# get the services supported
|
||||||
services_supported = test_application.get_services_supported()
|
services_supported = test_application.get_services_supported()
|
||||||
if _debug: _log.debug(" - services_supported: %r", services_supported)
|
if _debug: _log.debug(" - services_supported: %r", services_supported)
|
||||||
|
@ -250,9 +417,29 @@ def main():
|
||||||
# enable sleeping will help with threads
|
# enable sleeping will help with threads
|
||||||
enable_sleeping()
|
enable_sleeping()
|
||||||
|
|
||||||
|
# analog value task
|
||||||
|
if args.avtask:
|
||||||
|
test_av_task = TestAnalogValueTask(args.avtask)
|
||||||
|
test_av_task.install_task()
|
||||||
|
|
||||||
|
# analog value thread
|
||||||
|
if args.avthread:
|
||||||
|
test_av_thread = TestAnalogValueThread(args.avthread)
|
||||||
|
deferred(test_av_thread.start)
|
||||||
|
|
||||||
|
# binary value task
|
||||||
|
if args.bvtask:
|
||||||
|
test_bv_task = TestBinaryValueTask(args.bvtask)
|
||||||
|
test_bv_task.install_task()
|
||||||
|
|
||||||
|
# binary value thread
|
||||||
|
if args.bvthread:
|
||||||
|
test_bv_thread = TestBinaryValueThread(args.bvthread)
|
||||||
|
deferred(test_bv_thread.start)
|
||||||
|
|
||||||
_log.debug("running")
|
_log.debug("running")
|
||||||
|
|
||||||
run()
|
run(args.spin)
|
||||||
|
|
||||||
_log.debug("fini")
|
_log.debug("fini")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user