1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

update samples to use services

This commit is contained in:
Joel Bender 2016-08-31 10:27:00 -04:00
parent aebc8eb01b
commit 4eb58b6238
2 changed files with 80 additions and 278 deletions

View File

@ -18,7 +18,7 @@ from bacpypes.pdu import Address
from bacpypes.object import get_object_class, get_datatype from bacpypes.object import get_object_class, get_datatype
from bacpypes.apdu import ReadPropertyMultipleRequest, PropertyReference, \ from bacpypes.apdu import ReadPropertyMultipleRequest, PropertyReference, \
ReadAccessSpecification, Error, AbortPDU, ReadPropertyMultipleACK ReadAccessSpecification, ReadPropertyMultipleACK
from bacpypes.primitivedata import Unsigned from bacpypes.primitivedata import Unsigned
from bacpypes.constructeddata import Array from bacpypes.constructeddata import Array
from bacpypes.basetypes import PropertyIdentifier from bacpypes.basetypes import PropertyIdentifier
@ -31,91 +31,7 @@ _debug = 0
_log = ModuleLogger(globals()) _log = ModuleLogger(globals())
# globals # globals
this_device = None
this_application = None this_application = None
this_console = None
#
# ReadPropertyMultipleApplication
#
@bacpypes_debugging
class ReadPropertyMultipleApplication(BIPSimpleApplication):
def __init__(self, *args):
if _debug: ReadPropertyMultipleApplication._debug("__init__ %r", args)
BIPSimpleApplication.__init__(self, *args)
# keep track of requests to line up responses
self._request = None
def request(self, apdu):
if _debug: ReadPropertyMultipleApplication._debug("request %r", apdu)
# save a copy of the request
self._request = apdu
# forward it along
BIPSimpleApplication.request(self, apdu)
def confirmation(self, apdu):
if _debug: ReadPropertyMultipleApplication._debug("confirmation %r", apdu)
if isinstance(apdu, Error):
sys.stdout.write("error: %s\n" % (apdu.errorCode,))
sys.stdout.flush()
elif isinstance(apdu, AbortPDU):
apdu.debug_contents()
elif (isinstance(self._request, ReadPropertyMultipleRequest)) and (isinstance(apdu, ReadPropertyMultipleACK)):
# loop through the results
for result in apdu.listOfReadAccessResults:
# here is the object identifier
objectIdentifier = result.objectIdentifier
if _debug: ReadPropertyMultipleApplication._debug(" - objectIdentifier: %r", objectIdentifier)
# now come the property values per object
for element in result.listOfResults:
# get the property and array index
propertyIdentifier = element.propertyIdentifier
if _debug: ReadPropertyMultipleApplication._debug(" - propertyIdentifier: %r", propertyIdentifier)
propertyArrayIndex = element.propertyArrayIndex
if _debug: ReadPropertyMultipleApplication._debug(" - propertyArrayIndex: %r", propertyArrayIndex)
# here is the read result
readResult = element.readResult
sys.stdout.write(propertyIdentifier)
if propertyArrayIndex is not None:
sys.stdout.write("[" + str(propertyArrayIndex) + "]")
# check for an error
if readResult.propertyAccessError is not None:
sys.stdout.write(" ! " + str(readResult.propertyAccessError) + '\n')
else:
# here is the value
propertyValue = readResult.propertyValue
# find the datatype
datatype = get_datatype(objectIdentifier[0], propertyIdentifier)
if _debug: ReadPropertyMultipleApplication._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError("unknown datatype")
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (propertyArrayIndex is not None):
if propertyArrayIndex == 0:
value = propertyValue.cast_out(Unsigned)
else:
value = propertyValue.cast_out(datatype.subtype)
else:
value = propertyValue.cast_out(datatype)
if _debug: ReadPropertyMultipleApplication._debug(" - value: %r", value)
sys.stdout.write(" = " + str(value) + '\n')
sys.stdout.flush()
# #
# ReadPropertyMultipleConsoleCmd # ReadPropertyMultipleConsoleCmd
@ -199,7 +115,72 @@ class ReadPropertyMultipleConsoleCmd(ConsoleCmd):
if _debug: ReadPropertyMultipleConsoleCmd._debug(" - request: %r", request) if _debug: ReadPropertyMultipleConsoleCmd._debug(" - request: %r", request)
# give it to the application # give it to the application
this_application.request(request) iocb = this_application.request(request)
if _debug: ReadPropertyMultipleConsoleCmd._debug(" - iocb: %r", iocb)
# wait for it to complete
iocb.wait()
# do something for success
if iocb.ioResponse:
apdu = iocb.ioResponse
# should be an ack
if not isinstance(apdu, ReadPropertyMultipleACK):
if _debug: ReadPropertyMultipleConsoleCmd._debug(" - not an ack")
return
# loop through the results
for result in apdu.listOfReadAccessResults:
# here is the object identifier
objectIdentifier = result.objectIdentifier
if _debug: ReadPropertyMultipleConsoleCmd._debug(" - objectIdentifier: %r", objectIdentifier)
# now come the property values per object
for element in result.listOfResults:
# get the property and array index
propertyIdentifier = element.propertyIdentifier
if _debug: ReadPropertyMultipleConsoleCmd._debug(" - propertyIdentifier: %r", propertyIdentifier)
propertyArrayIndex = element.propertyArrayIndex
if _debug: ReadPropertyMultipleConsoleCmd._debug(" - propertyArrayIndex: %r", propertyArrayIndex)
# here is the read result
readResult = element.readResult
sys.stdout.write(propertyIdentifier)
if propertyArrayIndex is not None:
sys.stdout.write("[" + str(propertyArrayIndex) + "]")
# check for an error
if readResult.propertyAccessError is not None:
sys.stdout.write(" ! " + str(readResult.propertyAccessError) + '\n')
else:
# here is the value
propertyValue = readResult.propertyValue
# find the datatype
datatype = get_datatype(objectIdentifier[0], propertyIdentifier)
if _debug: ReadPropertyMultipleConsoleCmd._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError("unknown datatype")
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (propertyArrayIndex is not None):
if propertyArrayIndex == 0:
value = propertyValue.cast_out(Unsigned)
else:
value = propertyValue.cast_out(datatype.subtype)
else:
value = propertyValue.cast_out(datatype)
if _debug: ReadPropertyMultipleConsoleCmd._debug(" - value: %r", value)
sys.stdout.write(" = " + str(value) + '\n')
sys.stdout.flush()
# do something for error/reject/abort
if iocb.ioError:
sys.stdout.write(str(iocb.ioError) + '\n')
except Exception as error: except Exception as error:
ReadPropertyMultipleConsoleCmd._exception("exception: %r", error) ReadPropertyMultipleConsoleCmd._exception("exception: %r", error)
@ -227,7 +208,7 @@ def main():
) )
# make a simple application # make a simple application
this_application = ReadPropertyMultipleApplication(this_device, args.ini.address) this_application = BIPSimpleApplication(this_device, args.ini.address)
# get the services supported # get the services supported
services_supported = this_application.get_services_supported() services_supported = this_application.get_services_supported()

View File

@ -12,21 +12,26 @@ from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run from bacpypes.core import run
from bacpypes.primitivedata import Atomic, Real, Unsigned from bacpypes.primitivedata import Real
from bacpypes.constructeddata import Array, Any from bacpypes.object import AnalogValueObject, Property, register_object_type
from bacpypes.basetypes import ErrorType
from bacpypes.apdu import ReadPropertyMultipleACK, ReadAccessResult, ReadAccessResultElement, ReadAccessResultElementChoice
from bacpypes.object import AnalogValueObject, Property, PropertyError, register_object_type
from bacpypes.apdu import Error
from bacpypes.errors import ExecutionError from bacpypes.errors import ExecutionError
from bacpypes.app import BIPSimpleApplication from bacpypes.app import BIPSimpleApplication
from bacpypes.service.device import LocalDeviceObject from bacpypes.service.device import ReadWritePropertyMultipleServices, \
LocalDeviceObject
# some debugging # some debugging
_debug = 0 _debug = 0
_log = ModuleLogger(globals()) _log = ModuleLogger(globals())
#
# ReadPropertyMultipleApplication
#
@bacpypes_debugging
class ReadPropertyMultipleApplication(BIPSimpleApplication, ReadWritePropertyMultipleServices):
pass
# #
# RandomValueProperty # RandomValueProperty
# #
@ -72,190 +77,6 @@ class RandomAnalogValueObject(AnalogValueObject):
register_object_type(RandomAnalogValueObject) register_object_type(RandomAnalogValueObject)
#
# ReadPropertyToAny
#
@bacpypes_debugging
def ReadPropertyToAny(obj, propertyIdentifier, propertyArrayIndex=None):
"""Read the specified property of the object, with the optional array index,
and cast the result into an Any object."""
if _debug: ReadPropertyToAny._debug("ReadPropertyToAny %s %r %r", obj, propertyIdentifier, propertyArrayIndex)
# get the datatype
datatype = obj.get_datatype(propertyIdentifier)
if _debug: ReadPropertyToAny._debug(" - datatype: %r", datatype)
if datatype is None:
raise ExecutionError(errorClass='property', errorCode='datatypeNotSupported')
# get the value
value = obj.ReadProperty(propertyIdentifier, propertyArrayIndex)
if _debug: ReadPropertyToAny._debug(" - value: %r", value)
if value is None:
raise ExecutionError(errorClass='property', errorCode='unknownProperty')
# change atomic values into something encodeable
if issubclass(datatype, Atomic):
value = datatype(value)
elif issubclass(datatype, Array) and (propertyArrayIndex is not None):
if propertyArrayIndex == 0:
value = Unsigned(value)
elif issubclass(datatype.subtype, Atomic):
value = datatype.subtype(value)
elif not isinstance(value, datatype.subtype):
raise TypeError("invalid result datatype, expecting %s and got %s" \
% (datatype.subtype.__name__, type(value).__name__))
elif not isinstance(value, datatype):
raise TypeError("invalid result datatype, expecting %s and got %s" \
% (datatype.__name__, type(value).__name__))
if _debug: ReadPropertyToAny._debug(" - encodeable value: %r", value)
# encode the value
result = Any()
result.cast_in(value)
if _debug: ReadPropertyToAny._debug(" - result: %r", result)
# return the object
return result
#
# ReadPropertyToResultElement
#
@bacpypes_debugging
def ReadPropertyToResultElement(obj, propertyIdentifier, propertyArrayIndex=None):
"""Read the specified property of the object, with the optional array index,
and cast the result into an Any object."""
if _debug: ReadPropertyToResultElement._debug("ReadPropertyToResultElement %s %r %r", obj, propertyIdentifier, propertyArrayIndex)
# save the result in the property value
read_result = ReadAccessResultElementChoice()
try:
read_result.propertyValue = ReadPropertyToAny(obj, propertyIdentifier, propertyArrayIndex)
if _debug: ReadPropertyToResultElement._debug(" - success")
except PropertyError as error:
if _debug: ReadPropertyToResultElement._debug(" - error: %r", error)
read_result.propertyAccessError = ErrorType(errorClass='property', errorCode='unknownProperty')
except ExecutionError as error:
if _debug: ReadPropertyToResultElement._debug(" - error: %r", error)
read_result.propertyAccessError = ErrorType(errorClass=error.errorClass, errorCode=error.errorCode)
# make an element for this value
read_access_result_element = ReadAccessResultElement(
propertyIdentifier=propertyIdentifier,
propertyArrayIndex=propertyArrayIndex,
readResult=read_result,
)
if _debug: ReadPropertyToResultElement._debug(" - read_access_result_element: %r", read_access_result_element)
# fini
return read_access_result_element
#
# ReadPropertyMultipleApplication
#
@bacpypes_debugging
class ReadPropertyMultipleApplication(BIPSimpleApplication):
def __init__(self, *args, **kwargs):
if _debug: ReadPropertyMultipleApplication._debug("__init__ %r %r", args, kwargs)
BIPSimpleApplication.__init__(self, *args, **kwargs)
def do_ReadPropertyMultipleRequest(self, apdu):
"""Respond to a ReadPropertyMultiple Request."""
if _debug: ReadPropertyMultipleApplication._debug("do_ReadPropertyMultipleRequest %r", apdu)
# response is a list of read access results (or an error)
resp = None
read_access_result_list = []
# loop through the request
for read_access_spec in apdu.listOfReadAccessSpecs:
# get the object identifier
objectIdentifier = read_access_spec.objectIdentifier
if _debug: ReadPropertyMultipleApplication._debug(" - objectIdentifier: %r", objectIdentifier)
# check for wildcard
if (objectIdentifier == ('device', 4194303)):
if _debug: ReadPropertyMultipleApplication._debug(" - wildcard device identifier")
objectIdentifier = self.localDevice.objectIdentifier
# get the object
obj = self.get_object_id(objectIdentifier)
if _debug: ReadPropertyMultipleApplication._debug(" - object: %r", obj)
# make sure it exists
if not obj:
resp = Error(errorClass='object', errorCode='unknownObject', context=apdu)
if _debug: ReadPropertyMultipleApplication._debug(" - unknown object error: %r", resp)
break
# build a list of result elements
read_access_result_element_list = []
# loop through the property references
for prop_reference in read_access_spec.listOfPropertyReferences:
# get the property identifier
propertyIdentifier = prop_reference.propertyIdentifier
if _debug: ReadPropertyMultipleApplication._debug(" - propertyIdentifier: %r", propertyIdentifier)
# get the array index (optional)
propertyArrayIndex = prop_reference.propertyArrayIndex
if _debug: ReadPropertyMultipleApplication._debug(" - propertyArrayIndex: %r", propertyArrayIndex)
# check for special property identifiers
if propertyIdentifier in ('all', 'required', 'optional'):
for propId, prop in obj._properties.items():
if _debug: ReadPropertyMultipleApplication._debug(" - checking: %r %r", propId, prop.optional)
if (propertyIdentifier == 'all'):
pass
elif (propertyIdentifier == 'required') and (prop.optional):
if _debug: ReadPropertyMultipleApplication._debug(" - not a required property")
continue
elif (propertyIdentifier == 'optional') and (not prop.optional):
if _debug: ReadPropertyMultipleApplication._debug(" - not an optional property")
continue
# read the specific property
read_access_result_element = ReadPropertyToResultElement(obj, propId, propertyArrayIndex)
# check for undefined property
if read_access_result_element.readResult.propertyAccessError \
and read_access_result_element.readResult.propertyAccessError.errorCode == 'unknownProperty':
continue
# add it to the list
read_access_result_element_list.append(read_access_result_element)
else:
# read the specific property
read_access_result_element = ReadPropertyToResultElement(obj, propertyIdentifier, propertyArrayIndex)
# add it to the list
read_access_result_element_list.append(read_access_result_element)
# build a read access result
read_access_result = ReadAccessResult(
objectIdentifier=objectIdentifier,
listOfResults=read_access_result_element_list
)
if _debug: ReadPropertyMultipleApplication._debug(" - read_access_result: %r", read_access_result)
# add it to the list
read_access_result_list.append(read_access_result)
# this is a ReadPropertyMultiple ack
if not resp:
resp = ReadPropertyMultipleACK(context=apdu)
resp.listOfReadAccessResults = read_access_result_list
if _debug: ReadPropertyMultipleApplication._debug(" - resp: %r", resp)
# return the result
self.response(resp)
# #
# __main__ # __main__
# #