1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00
bacpypes/samples/ReadPropertyMultipleServer.py

312 lines
12 KiB
Python
Executable File

#!/usr/bin/env python
"""
This sample application shows how to extend the basic functionality of a device
to support the ReadPropertyMultiple service.
"""
import random
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.primitivedata import Atomic, Real, Unsigned
from bacpypes.constructeddata import Array, Any
from bacpypes.basetypes import ErrorType
from bacpypes.apdu import ReadPropertyMultipleACK, ReadAccessResult, ReadAccessResultElement, ReadAccessResultElementChoice
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import AnalogValueObject, Property, PropertyError, register_object_type
from bacpypes.apdu import Error
from bacpypes.errors import ExecutionError
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# RandomValueProperty
#
@bacpypes_debugging
class RandomValueProperty(Property):
def __init__(self, identifier):
if _debug: RandomValueProperty._debug("__init__ %r", identifier)
Property.__init__(self, identifier, Real, default=None, optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex)
# access an array
if arrayIndex is not None:
raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray')
# return a random value
value = random.random() * 100.0
if _debug: RandomValueProperty._debug(" - value: %r", value)
return value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# Random Value Object Type
#
@bacpypes_debugging
class RandomAnalogValueObject(AnalogValueObject):
properties = [
RandomValueProperty('presentValue'),
]
def __init__(self, **kwargs):
if _debug: RandomAnalogValueObject._debug("__init__ %r", kwargs)
AnalogValueObject.__init__(self, **kwargs)
register_object_type(RandomAnalogValueObject)
#
# ReadPropertyToAny
#
@bacpypes_debugging
def ReadPropertyToAny(obj, propertyIdentifier, propertyArrayIndex=None):
"""Read the specified property of the object, with the optional array index,
and cast the result into an Any object."""
if _debug: ReadPropertyToAny._debug("ReadPropertyToAny %s %r %r", obj, propertyIdentifier, propertyArrayIndex)
# get the datatype
datatype = obj.get_datatype(propertyIdentifier)
if _debug: ReadPropertyToAny._debug(" - datatype: %r", datatype)
if datatype is None:
raise ExecutionError(errorClass='property', errorCode='datatypeNotSupported')
# get the value
value = obj.ReadProperty(propertyIdentifier, propertyArrayIndex)
if _debug: ReadPropertyToAny._debug(" - value: %r", value)
if value is None:
raise ExecutionError(errorClass='property', errorCode='unknownProperty')
# change atomic values into something encodeable
if issubclass(datatype, Atomic):
value = datatype(value)
elif issubclass(datatype, Array) and (propertyArrayIndex is not None):
if propertyArrayIndex == 0:
value = Unsigned(value)
elif issubclass(datatype.subtype, Atomic):
value = datatype.subtype(value)
elif not isinstance(value, datatype.subtype):
raise TypeError("invalid result datatype, expecting %s and got %s" \
% (datatype.subtype.__name__, type(value).__name__))
elif not isinstance(value, datatype):
raise TypeError("invalid result datatype, expecting %s and got %s" \
% (datatype.__name__, type(value).__name__))
if _debug: ReadPropertyToAny._debug(" - encodeable value: %r", value)
# encode the value
result = Any()
result.cast_in(value)
if _debug: ReadPropertyToAny._debug(" - result: %r", result)
# return the object
return result
#
# ReadPropertyToResultElement
#
@bacpypes_debugging
def ReadPropertyToResultElement(obj, propertyIdentifier, propertyArrayIndex=None):
"""Read the specified property of the object, with the optional array index,
and cast the result into an Any object."""
if _debug: ReadPropertyToResultElement._debug("ReadPropertyToResultElement %s %r %r", obj, propertyIdentifier, propertyArrayIndex)
# save the result in the property value
read_result = ReadAccessResultElementChoice()
try:
read_result.propertyValue = ReadPropertyToAny(obj, propertyIdentifier, propertyArrayIndex)
if _debug: ReadPropertyToResultElement._debug(" - success")
except PropertyError as error:
if _debug: ReadPropertyToResultElement._debug(" - error: %r", error)
read_result.propertyAccessError = ErrorType(errorClass='property', errorCode='unknownProperty')
except ExecutionError as error:
if _debug: ReadPropertyToResultElement._debug(" - error: %r", error)
read_result.propertyAccessError = ErrorType(errorClass=error.errorClass, errorCode=error.errorCode)
# make an element for this value
read_access_result_element = ReadAccessResultElement(
propertyIdentifier=propertyIdentifier,
propertyArrayIndex=propertyArrayIndex,
readResult=read_result,
)
if _debug: ReadPropertyToResultElement._debug(" - read_access_result_element: %r", read_access_result_element)
# fini
return read_access_result_element
#
# ReadPropertyMultipleApplication
#
@bacpypes_debugging
class ReadPropertyMultipleApplication(BIPSimpleApplication):
def __init__(self, *args, **kwargs):
if _debug: ReadPropertyMultipleApplication._debug("__init__ %r %r", args, kwargs)
BIPSimpleApplication.__init__(self, *args, **kwargs)
def do_ReadPropertyMultipleRequest(self, apdu):
"""Respond to a ReadPropertyMultiple Request."""
if _debug: ReadPropertyMultipleApplication._debug("do_ReadPropertyMultipleRequest %r", apdu)
# response is a list of read access results (or an error)
resp = None
read_access_result_list = []
# loop through the request
for read_access_spec in apdu.listOfReadAccessSpecs:
# get the object identifier
objectIdentifier = read_access_spec.objectIdentifier
if _debug: ReadPropertyMultipleApplication._debug(" - objectIdentifier: %r", objectIdentifier)
# check for wildcard
if (objectIdentifier == ('device', 4194303)):
if _debug: ReadPropertyMultipleApplication._debug(" - wildcard device identifier")
objectIdentifier = self.localDevice.objectIdentifier
# get the object
obj = self.get_object_id(objectIdentifier)
if _debug: ReadPropertyMultipleApplication._debug(" - object: %r", obj)
# make sure it exists
if not obj:
resp = Error(errorClass='object', errorCode='unknownObject', context=apdu)
if _debug: ReadPropertyMultipleApplication._debug(" - unknown object error: %r", resp)
break
# build a list of result elements
read_access_result_element_list = []
# loop through the property references
for prop_reference in read_access_spec.listOfPropertyReferences:
# get the property identifier
propertyIdentifier = prop_reference.propertyIdentifier
if _debug: ReadPropertyMultipleApplication._debug(" - propertyIdentifier: %r", propertyIdentifier)
# get the array index (optional)
propertyArrayIndex = prop_reference.propertyArrayIndex
if _debug: ReadPropertyMultipleApplication._debug(" - propertyArrayIndex: %r", propertyArrayIndex)
# check for special property identifiers
if propertyIdentifier in ('all', 'required', 'optional'):
for propId, prop in obj._properties.items():
if _debug: ReadPropertyMultipleApplication._debug(" - checking: %r %r", propId, prop.optional)
if (propertyIdentifier == 'all'):
pass
elif (propertyIdentifier == 'required') and (prop.optional):
if _debug: ReadPropertyMultipleApplication._debug(" - not a required property")
continue
elif (propertyIdentifier == 'optional') and (not prop.optional):
if _debug: ReadPropertyMultipleApplication._debug(" - not an optional property")
continue
# read the specific property
read_access_result_element = ReadPropertyToResultElement(obj, propId, propertyArrayIndex)
# check for undefined property
if read_access_result_element.readResult.propertyAccessError \
and read_access_result_element.readResult.propertyAccessError.errorCode == 'unknownProperty':
continue
# add it to the list
read_access_result_element_list.append(read_access_result_element)
else:
# read the specific property
read_access_result_element = ReadPropertyToResultElement(obj, propertyIdentifier, propertyArrayIndex)
# add it to the list
read_access_result_element_list.append(read_access_result_element)
# build a read access result
read_access_result = ReadAccessResult(
objectIdentifier=objectIdentifier,
listOfResults=read_access_result_element_list
)
if _debug: ReadPropertyMultipleApplication._debug(" - read_access_result: %r", read_access_result)
# add it to the list
read_access_result_list.append(read_access_result)
# this is a ReadPropertyMultiple ack
if not resp:
resp = ReadPropertyMultipleACK(context=apdu)
resp.listOfReadAccessResults = read_access_result_list
if _debug: ReadPropertyMultipleApplication._debug(" - resp: %r", resp)
# return the result
self.response(resp)
#
# __main__
#
def main():
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a sample application
this_application = ReadPropertyMultipleApplication(this_device, args.ini.address)
# make a random input object
ravo1 = RandomAnalogValueObject(
objectIdentifier=('analogValue', 1), objectName='Random1'
)
_log.debug(" - ravo1: %r", ravo1)
ravo2 = RandomAnalogValueObject(
objectIdentifier=('analogValue', 2), objectName='Random2'
)
_log.debug(" - ravo2: %r", ravo2)
# add it to the device
this_application.add_object(ravo1)
this_application.add_object(ravo2)
_log.debug(" - object list: %r", this_device.objectList)
# get the services supported
services_supported = this_application.get_services_supported()
if _debug: _log.debug(" - services_supported: %r", services_supported)
# let the device object know
this_device.protocolServicesSupported = services_supported.value
_log.debug("running")
run()
_log.debug("fini")
if __name__ == "__main__":
main()