diff --git a/samples/ReadAllProperties.py b/samples/ReadAllProperties.py new file mode 100755 index 0000000..b52d872 --- /dev/null +++ b/samples/ReadAllProperties.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python + +""" +This application presents a 'console' prompt to the user asking for read commands +which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK +and prints the value. +""" + +import sys +from collections import deque + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run, stop, deferred +from bacpypes.iocb import IOCB + +from bacpypes.pdu import Address +from bacpypes.apdu import ReadPropertyRequest, ReadPropertyACK +from bacpypes.primitivedata import Unsigned +from bacpypes.constructeddata import Array + +from bacpypes.app import BIPSimpleApplication +from bacpypes.object import get_object_class, get_datatype +from bacpypes.service.device import LocalDeviceObject + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_application = None +device_address = None +object_identifier = None +property_list = None + +# +# ReadPropertyApplication +# + +@bacpypes_debugging +class ReadPropertyApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: ReadPropertyApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # current property being read + self.property_identifier = None + + def next_request(self): + if _debug: ReadPropertyApplication._debug("next_request") + global device_address, object_identifier, property_list + + # check to see if we're done + if not property_list: + if _debug: ReadPropertyApplication._debug(" - done") + stop() + return + + # get the next request + self.property_identifier = property_list.popleft() + if _debug: ReadPropertyApplication._debug(" - property_identifier: %r", self.property_identifier) + + # build a request + request = ReadPropertyRequest( + destination=device_address, + objectIdentifier=object_identifier, + propertyIdentifier=self.property_identifier, + ) + if _debug: ReadPropertyApplication._debug(" - request: %r", request) + + # make an IOCB + iocb = IOCB(request) + + # set a callback for the response + iocb.add_callback(self.complete_request) + if _debug: ReadPropertyApplication._debug(" - iocb: %r", iocb) + + # send the request + this_application.request_io(iocb) + + def complete_request(self, iocb): + if _debug: ReadPropertyApplication._debug("complete_request %r", iocb) + + if iocb.ioResponse: + apdu = iocb.ioResponse + + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], self.property_identifier) + if _debug: ReadPropertyApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError("unknown datatype") + + # special case for array parts, others are managed by cast_out + value = apdu.propertyValue.cast_out(datatype) + if _debug: ReadPropertyApplication._debug(" - value: %r", value) + + sys.stdout.write(self.property_identifier + " = " + str(value) + '\n') + if hasattr(value, 'debug_contents'): + value.debug_contents(file=sys.stdout) + sys.stdout.flush() + + if iocb.ioError: + if _debug: ReadPropertyApplication._debug(" - error: %r", iocb.ioError) + + # if it is an unknown property, just skip to the next one + if getattr(iocb.ioError, 'errorCode', '') != 'unknownProperty': + sys.stdout.write(self.property_identifier + "! " + str(iocb.ioError) + '\n') + sys.stdout.flush() + + # fire off another request + deferred(self.next_request) + + +# +# __main__ +# + +def main(): + global this_application, device_address, object_identifier, property_list + + # parse the command line arguments + parser = ConfigArgumentParser(description=__doc__) + parser.add_argument( + "address", + help="device address", + ) + parser.add_argument( + "objtype", + help="object types, e.g., analogInput", + ) + parser.add_argument( + "objinstance", type=int, + help="object instance", + ) + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # interpret the address + device_address = Address(args.address) + if _debug: _log.debug(" - device_address: %r", device_address) + + # build an identifier + object_identifier = (args.objtype, args.objinstance) + if _debug: _log.debug(" - object_identifier: %r", object_identifier) + + # get the object class + object_class = get_object_class(args.objtype) + if _debug: _log.debug(" - object_class: %r", object_class) + + # make a queue of the properties + property_list = deque(prop.identifier for prop in object_class.properties) + if _debug: _log.debug(" - property_list: %r", property_list) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a simple application + this_application = ReadPropertyApplication(this_device, args.ini.address) + + # get the services supported + services_supported = this_application.get_services_supported() + if _debug: _log.debug(" - services_supported: %r", services_supported) + + # let the device object know + this_device.protocolServicesSupported = services_supported.value + + # fire off a request when the core has a chance + deferred(this_application.next_request) + + _log.debug("running") + + run() + + _log.debug("fini") + +if __name__ == "__main__": + main()