1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-10-05 22:18:16 +08:00
bacpypes/samples/mini_device.py
2019-11-17 14:57:47 -05:00

140 lines
3.5 KiB
Python
Executable File

#!/usr/bin/python
"""
This sample application is a server that supports many core services that
applications need to present data on a BACnet network. It supports Who-Is
and I-Am for device binding, Read and Write Property, Read and Write
Property Multiple, and COV subscriptions.
Change the process_task() function to do something on a regular INTERVAL
number of seconds.
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.task import RecurringTask
from bacpypes.app import BIPSimpleApplication
from bacpypes.object import AnalogValueObject, BinaryValueObject
from bacpypes.local.device import LocalDeviceObject
from bacpypes.service.cov import ChangeOfValueServices
from bacpypes.service.object import ReadWritePropertyMultipleServices
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# settings
INTERVAL = 1.0
# globals
test_application = None
test_av = None
test_bv = None
@bacpypes_debugging
class SampleApplication(
BIPSimpleApplication, ReadWritePropertyMultipleServices, ChangeOfValueServices
):
pass
@bacpypes_debugging
class DoSomething(RecurringTask):
def __init__(self, interval):
if _debug:
DoSomething._debug("__init__ %r", interval)
RecurringTask.__init__(self, interval * 1000)
# save the interval
self.interval = interval
# make a list of test values
self.test_values = [
("active", 1.0),
("inactive", 2.0),
("active", 3.0),
("inactive", 4.0),
]
def process_task(self):
if _debug:
DoSomething._debug("process_task")
global test_av, test_bv
# pop the next value
next_value = self.test_values.pop(0)
self.test_values.append(next_value)
if _debug:
DoSomething._debug(" - next_value: %r", next_value)
# change the point
test_av.presentValue = next_value[1]
test_bv.presentValue = next_value[0]
def main():
global test_av, test_bv, test_application
# make a parser
parser = ConfigArgumentParser(description=__doc__)
# parse the command line arguments
args = parser.parse_args()
if _debug:
_log.debug("initialization")
if _debug:
_log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(ini=args.ini)
if _debug:
_log.debug(" - this_device: %r", this_device)
# make a sample application
test_application = SampleApplication(this_device, args.ini.address)
# make an analog value object
test_av = AnalogValueObject(
objectIdentifier=("analogValue", 1),
objectName="av",
presentValue=0.0,
statusFlags=[0, 0, 0, 0],
covIncrement=1.0,
)
_log.debug(" - test_av: %r", test_av)
# add it to the device
test_application.add_object(test_av)
_log.debug(" - object list: %r", this_device.objectList)
# make a binary value object
test_bv = BinaryValueObject(
objectIdentifier=("binaryValue", 1),
objectName="bv",
presentValue="inactive",
statusFlags=[0, 0, 0, 0],
)
_log.debug(" - test_bv: %r", test_bv)
# add it to the device
test_application.add_object(test_bv)
# binary value task
do_something_task = DoSomething(INTERVAL)
do_something_task.install_task()
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()