1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00
bacpypes/samples/RandomAnalogValueSleep.py
2017-09-30 03:20:41 -04:00

149 lines
4.2 KiB
Python

#!/usr/bin/env python
"""
Random Value Property with Sleep
This application is a server of analog value objects that return a random
number when the present value is read. This version has an additional
'sleep' time that slows down its performance.
"""
import os
import random
import time
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.primitivedata import Real
from bacpypes.object import AnalogValueObject, Property, register_object_type
from bacpypes.errors import ExecutionError
from bacpypes.app import BIPSimpleApplication
from bacpypes.service.device import LocalDeviceObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# settings
SLEEP_TIME = float(os.getenv('SLEEP_TIME', 0.1))
RANDOM_OBJECT_COUNT = int(os.getenv('RANDOM_OBJECT_COUNT', 10))
# globals
args = None
#
# RandomValueProperty
#
class RandomValueProperty(Property):
def __init__(self, identifier):
if _debug: RandomValueProperty._debug("__init__ %r", identifier)
Property.__init__(self, identifier, Real, default=0.0, optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex)
global args
# access an array
if arrayIndex is not None:
raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray')
# sleep a little
time.sleep(args.sleep)
# return a random value
value = random.random() * 100.0
if _debug: RandomValueProperty._debug(" - value: %r", value)
return value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
bacpypes_debugging(RandomValueProperty)
#
# Random Value Object Type
#
class RandomAnalogValueObject(AnalogValueObject):
properties = [
RandomValueProperty('presentValue'),
]
def __init__(self, **kwargs):
if _debug: RandomAnalogValueObject._debug("__init__ %r", kwargs)
AnalogValueObject.__init__(self, **kwargs)
bacpypes_debugging(RandomAnalogValueObject)
register_object_type(RandomAnalogValueObject)
#
# __main__
#
def main():
global args
# parse the command line arguments
parser = ConfigArgumentParser(description=__doc__)
# add an option to override the sleep time
parser.add_argument('--sleep', type=float,
help="sleep before returning the value",
default=SLEEP_TIME,
)
# parse the command line arguments
args = parser.parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=('device', int(args.ini.objectidentifier)),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a sample application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# get the services supported
services_supported = this_application.get_services_supported()
if _debug: _log.debug(" - services_supported: %r", services_supported)
# let the device object know
this_device.protocolServicesSupported = services_supported.value
# make some random input objects
for i in range(1, RANDOM_OBJECT_COUNT+1):
ravo = RandomAnalogValueObject(
objectIdentifier=('analogValue', i),
objectName='Random-%d' % (i,),
)
_log.debug(" - ravo: %r", ravo)
this_application.add_object(ravo)
# make sure they are all there
_log.debug(" - object list: %r", this_device.objectList)
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()