1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

closes #152, with future work needed to help Windows mutli-threaded applications

This commit is contained in:
Joel Bender 2017-12-22 10:15:26 -05:00
commit d826f1e8b0
4 changed files with 211 additions and 15 deletions

View File

@ -144,6 +144,9 @@ def run(spin=SPIN, sigterm=stop, sigusr1=print_stack):
time.sleep(sleeptime)
delta -= sleeptime
# delta should be no more than the spin value
delta = min(delta, spin)
# if there are deferred functions, use a small delta
if deferredFns:
delta = min(delta, 0.001)

View File

@ -142,6 +142,9 @@ def run(spin=SPIN, sigterm=stop, sigusr1=print_stack):
time.sleep(sleeptime)
delta -= sleeptime
# delta should be no more than the spin value
delta = min(delta, spin)
# if there are deferred functions, use a small delta
if deferredFns:
delta = min(delta, 0.001)

View File

@ -142,6 +142,9 @@ def run(spin=SPIN, sigterm=stop, sigusr1=print_stack):
time.sleep(sleeptime)
delta -= sleeptime
# delta should be no more than the spin value
delta = min(delta, spin)
# if there are deferred functions, use a small delta
if deferredFns:
delta = min(delta, 0.001)

View File

@ -6,11 +6,15 @@ The console accepts commands that change the properties of an object that
triggers the notifications.
"""
import time
from threading import Thread
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run, enable_sleeping
from bacpypes.core import run, deferred, enable_sleeping
from bacpypes.task import RecurringTask
from bacpypes.app import BIPSimpleApplication
from bacpypes.object import AnalogValueObject, BinaryValueObject
@ -22,6 +26,8 @@ _debug = 0
_log = ModuleLogger(globals())
# test globals
test_av = None
test_bv = None
test_application = None
#
@ -180,8 +186,147 @@ class COVConsoleCmd(ConsoleCmd):
print("exception: %s" % (err,))
@bacpypes_debugging
class TestAnalogValueTask(RecurringTask):
"""
An instance of this class is created when '--avtask <interval>' is
specified as a command line argument. Every <interval> seconds it
changes the value of the test_av present value.
"""
def __init__(self, interval):
if _debug: TestAnalogValueTask._debug("__init__ %r", interval)
RecurringTask.__init__(self, interval * 1000)
# make a list of test values
self.test_values = list(float(i * 10) for i in range(10))
def process_task(self):
if _debug: TestAnalogValueTask._debug("process_task")
global test_av
# pop the next value
next_value = self.test_values.pop(0)
self.test_values.append(next_value)
if _debug: TestAnalogValueTask._debug(" - next_value: %r", next_value)
# change the point
test_av.presentValue = next_value
@bacpypes_debugging
class TestAnalogValueThread(Thread):
"""
An instance of this class is created when '--avthread <interval>' is
specified as a command line argument. Every <interval> seconds it
changes the value of the test_av present value.
"""
def __init__(self, interval):
if _debug: TestAnalogValueThread._debug("__init__ %r", interval)
Thread.__init__(self)
# runs as a daemon
self.daemon = True
# save the interval
self.interval = interval
# make a list of test values
self.test_values = list(100.0 + float(i * 10) for i in range(10))
def run(self):
if _debug: TestAnalogValueThread._debug("run")
global test_av
while True:
# pop the next value
next_value = self.test_values.pop(0)
self.test_values.append(next_value)
if _debug: TestAnalogValueThread._debug(" - next_value: %r", next_value)
# change the point
test_av.presentValue = next_value
# sleep
time.sleep(self.interval)
@bacpypes_debugging
class TestBinaryValueTask(RecurringTask):
"""
An instance of this class is created when '--bvtask <interval>' is
specified as a command line argument. Every <interval> seconds it
changes the value of the test_bv present value.
"""
def __init__(self, interval):
if _debug: TestBinaryValueTask._debug("__init__ %r", interval)
RecurringTask.__init__(self, interval * 1000)
# save the interval
self.interval = interval
# make a list of test values
self.test_values = [True, False]
def process_task(self):
if _debug: TestBinaryValueTask._debug("process_task")
global test_bv
# pop the next value
next_value = self.test_values.pop(0)
self.test_values.append(next_value)
if _debug: TestBinaryValueTask._debug(" - next_value: %r", next_value)
# change the point
test_bv.presentValue = next_value
@bacpypes_debugging
class TestBinaryValueThread(RecurringTask, Thread):
"""
An instance of this class is created when '--bvthread <interval>' is
specified as a command line argument. Every <interval> seconds it
changes the value of the test_bv present value.
"""
def __init__(self, interval):
if _debug: TestBinaryValueThread._debug("__init__ %r", interval)
Thread.__init__(self)
# runs as a daemon
self.daemon = True
# save the interval
self.interval = interval
# make a list of test values
self.test_values = [True, False]
def run(self):
if _debug: TestBinaryValueThread._debug("run")
global test_bv
while True:
# pop the next value
next_value = self.test_values.pop(0)
self.test_values.append(next_value)
if _debug: TestBinaryValueThread._debug(" - next_value: %r", next_value)
# change the point
test_bv.presentValue = next_value
# sleep
time.sleep(self.interval)
def main():
global test_application
global test_av, test_bv, test_application
# make a parser
parser = ConfigArgumentParser(description=__doc__)
@ -191,6 +336,28 @@ def main():
help="create a console",
)
# analog value task and thread
parser.add_argument("--avtask", type=float,
help="analog value recurring task",
)
parser.add_argument("--avthread", type=float,
help="analog value thread",
)
# analog value task and thread
parser.add_argument("--bvtask", type=float,
help="binary value recurring task",
)
parser.add_argument("--bvthread", type=float,
help="binary value thread",
)
# provide a different spin value
parser.add_argument("--spin", type=float,
help="spin time",
default=1.0,
)
# parse the command line arguments
args = parser.parse_args()
@ -209,18 +376,6 @@ def main():
# make a sample application
test_application = SubscribeCOVApplication(test_device, args.ini.address)
# make a binary value object
test_bv = BinaryValueObject(
objectIdentifier=('binaryValue', 1),
objectName='bv',
presentValue='inactive',
statusFlags=[0, 0, 0, 0],
)
_log.debug(" - test_bv: %r", test_bv)
# add it to the device
test_application.add_object(test_bv)
# make an analog value object
test_av = AnalogValueObject(
objectIdentifier=('analogValue', 1),
@ -235,6 +390,18 @@ def main():
test_application.add_object(test_av)
_log.debug(" - object list: %r", test_device.objectList)
# make a binary value object
test_bv = BinaryValueObject(
objectIdentifier=('binaryValue', 1),
objectName='bv',
presentValue='inactive',
statusFlags=[0, 0, 0, 0],
)
_log.debug(" - test_bv: %r", test_bv)
# add it to the device
test_application.add_object(test_bv)
# get the services supported
services_supported = test_application.get_services_supported()
if _debug: _log.debug(" - services_supported: %r", services_supported)
@ -250,9 +417,29 @@ def main():
# enable sleeping will help with threads
enable_sleeping()
# analog value task
if args.avtask:
test_av_task = TestAnalogValueTask(args.avtask)
test_av_task.install_task()
# analog value thread
if args.avthread:
test_av_thread = TestAnalogValueThread(args.avthread)
deferred(test_av_thread.start)
# binary value task
if args.bvtask:
test_bv_task = TestBinaryValueTask(args.bvtask)
test_bv_task.install_task()
# binary value thread
if args.bvthread:
test_bv_thread = TestBinaryValueThread(args.bvthread)
deferred(test_bv_thread.start)
_log.debug("running")
run()
run(args.spin)
_log.debug("fini")