mirror of
https://github.com/JoelBender/bacpypes
synced 2025-10-05 22:18:16 +08:00
additional sample
This commit is contained in:
parent
1fd59dc29a
commit
ddb076f7f8
153
samples/RandomAccumulatorObject.py
Executable file
153
samples/RandomAccumulatorObject.py
Executable file
|
@ -0,0 +1,153 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
"""
|
||||
This sample application mocks up an accumulator object.
|
||||
"""
|
||||
|
||||
import random
|
||||
|
||||
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
|
||||
from bacpypes.consolelogging import ConfigArgumentParser
|
||||
|
||||
from bacpypes.core import run
|
||||
|
||||
from bacpypes.primitivedata import Unsigned, Date, Time
|
||||
from bacpypes.basetypes import DateTime
|
||||
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
|
||||
from bacpypes.object import AccumulatorObject, Property, register_object_type
|
||||
from bacpypes.errors import ExecutionError
|
||||
|
||||
# some debugging
|
||||
_debug = 0
|
||||
_log = ModuleLogger(globals())
|
||||
|
||||
# globals
|
||||
this_device = None
|
||||
this_application = None
|
||||
|
||||
#
|
||||
# RandomUnsignedValueProperty
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
class RandomUnsignedValueProperty(Property):
|
||||
|
||||
def __init__(self, identifier):
|
||||
if _debug: RandomUnsignedValueProperty._debug("__init__ %r", identifier)
|
||||
Property.__init__(self, identifier, Unsigned, default=None, optional=True, mutable=False)
|
||||
|
||||
def ReadProperty(self, obj, arrayIndex=None):
|
||||
if _debug: RandomUnsignedValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex)
|
||||
|
||||
# access an array
|
||||
if arrayIndex is not None:
|
||||
raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray')
|
||||
|
||||
# return a random value
|
||||
value = int(random.random() * 100.0)
|
||||
if _debug: RandomUnsignedValueProperty._debug(" - value: %r", value)
|
||||
|
||||
return value
|
||||
|
||||
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
|
||||
if _debug: RandomUnsignedValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
|
||||
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
|
||||
|
||||
#
|
||||
# CurrentDateTimeProperty
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
class CurrentDateTimeProperty(Property):
|
||||
|
||||
def __init__(self, identifier):
|
||||
if _debug: CurrentDateTimeProperty._debug("__init__ %r", identifier)
|
||||
Property.__init__(self, identifier, DateTime, default=None, optional=True, mutable=False)
|
||||
|
||||
def ReadProperty(self, obj, arrayIndex=None):
|
||||
if _debug: CurrentDateTimeProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex)
|
||||
|
||||
# access an array
|
||||
if arrayIndex is not None:
|
||||
raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray')
|
||||
|
||||
# get the value
|
||||
current_date = Date().now().value
|
||||
current_time = Time().now().value
|
||||
|
||||
value = DateTime(date=current_date, time=current_time)
|
||||
if _debug: CurrentDateTimeProperty._debug(" - value: %r", value)
|
||||
|
||||
return value
|
||||
|
||||
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
|
||||
if _debug: CurrentDateTimeProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
|
||||
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
|
||||
|
||||
#
|
||||
# Random Accumulator Object
|
||||
#
|
||||
|
||||
@bacpypes_debugging
|
||||
class RandomAccumulatorObject(AccumulatorObject):
|
||||
|
||||
properties = [
|
||||
RandomUnsignedValueProperty('presentValue'),
|
||||
CurrentDateTimeProperty('valueChangeTime'),
|
||||
]
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
if _debug: RandomAccumulatorObject._debug("__init__ %r", kwargs)
|
||||
AccumulatorObject.__init__(self, **kwargs)
|
||||
|
||||
register_object_type(RandomAccumulatorObject)
|
||||
|
||||
#
|
||||
# __main__
|
||||
#
|
||||
|
||||
try:
|
||||
# parse the command line arguments
|
||||
args = ConfigArgumentParser(description=__doc__).parse_args()
|
||||
|
||||
if _debug: _log.debug("initialization")
|
||||
if _debug: _log.debug(" - args: %r", args)
|
||||
|
||||
# make a device object
|
||||
this_device = LocalDeviceObject(
|
||||
objectName=args.ini.objectname,
|
||||
objectIdentifier=('device', int(args.ini.objectidentifier)),
|
||||
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
|
||||
segmentationSupported=args.ini.segmentationsupported,
|
||||
vendorIdentifier=int(args.ini.vendoridentifier),
|
||||
)
|
||||
|
||||
# make a sample application
|
||||
this_application = BIPSimpleApplication(this_device, args.ini.address)
|
||||
|
||||
# get the services supported
|
||||
services_supported = this_application.get_services_supported()
|
||||
if _debug: _log.debug(" - services_supported: %r", services_supported)
|
||||
|
||||
# let the device object know
|
||||
this_device.protocolServicesSupported = services_supported.value
|
||||
|
||||
# make a random input object
|
||||
rao1 = RandomAccumulatorObject(
|
||||
objectIdentifier=('accumulator', 1),
|
||||
objectName='Random1',
|
||||
statusFlags = [0, 0, 0, 0],
|
||||
)
|
||||
_log.debug(" - rao1: %r", rao1)
|
||||
|
||||
# add it to the device
|
||||
this_application.add_object(rao1)
|
||||
_log.debug(" - object list: %r", this_device.objectList)
|
||||
|
||||
run()
|
||||
|
||||
except Exception, e:
|
||||
_log.exception("an error has occurred: %s", e)
|
||||
finally:
|
||||
_log.debug("finally")
|
||||
|
Loading…
Reference in New Issue
Block a user