#!/usr/bin/env python """ This application presents a 'console' prompt to the user asking for readrange commands which create ReadRangeRequest PDUs, then lines up the coorresponding ReadRangeACK and prints the value. """ import sys from bacpypes.debugging import bacpypes_debugging, ModuleLogger from bacpypes.consolelogging import ConfigArgumentParser from bacpypes.consolecmd import ConsoleCmd from bacpypes.core import run, enable_sleeping from bacpypes.pdu import Address from bacpypes.object import get_object_class, get_datatype from bacpypes.apdu import Error, AbortPDU, ReadRangeRequest, ReadRangeACK from bacpypes.app import BIPSimpleApplication from bacpypes.service.device import LocalDeviceObject # some debugging _debug = 0 _log = ModuleLogger(globals()) # globals this_application = None # # ReadRangeApplication # @bacpypes_debugging class ReadRangeApplication(BIPSimpleApplication): def __init__(self, *args): if _debug: ReadRangeApplication._debug("__init__ %r", args) BIPSimpleApplication.__init__(self, *args) # keep track of requests to line up responses self._request = None def request(self, apdu): if _debug: ReadRangeApplication._debug("request %r", apdu) # save a copy of the request self._request = apdu # forward it along BIPSimpleApplication.request(self, apdu) def confirmation(self, apdu): if _debug: ReadRangeApplication._debug("confirmation %r", apdu) if isinstance(apdu, Error): sys.stdout.write("error: %s\n" % (apdu.errorCode,)) sys.stdout.flush() elif isinstance(apdu, AbortPDU): apdu.debug_contents() elif (isinstance(self._request, ReadRangeRequest)) and (isinstance(apdu, ReadRangeACK)): # find the datatype datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) if _debug: ReadRangeApplication._debug(" - datatype: %r", datatype) if not datatype: raise TypeError("unknown datatype") # cast out of the single Any element into the datatype value = apdu.itemData[0].cast_out(datatype) # dump it out for i, item in enumerate(value): sys.stdout.write("[%d]\n" % (i,)) item.debug_contents(file=sys.stdout, indent=2) sys.stdout.flush() # # ReadRangeConsoleCmd # @bacpypes_debugging class ReadRangeConsoleCmd(ConsoleCmd): def do_readrange(self, args): """readrange [ ]""" args = args.split() if _debug: ReadRangeConsoleCmd._debug("do_readrange %r", args) try: addr, obj_type, obj_inst, prop_id = args[:4] if obj_type.isdigit(): obj_type = int(obj_type) elif not get_object_class(obj_type): raise ValueError("unknown object type") obj_inst = int(obj_inst) datatype = get_datatype(obj_type, prop_id) if not datatype: raise ValueError("invalid property for object type") # build a request request = ReadRangeRequest( objectIdentifier=(obj_type, obj_inst), propertyIdentifier=prop_id, ) request.pduDestination = Address(addr) if len(args) == 5: request.propertyArrayIndex = int(args[4]) if _debug: ReadRangeConsoleCmd._debug(" - request: %r", request) # give it to the application this_application.request(request) except Exception as error: ReadRangeConsoleCmd._exception("exception: %r", error) # # __main__ # def main(): global this_application # parse the command line arguments args = ConfigArgumentParser(description=__doc__).parse_args() if _debug: _log.debug("initialization") if _debug: _log.debug(" - args: %r", args) # make a device object this_device = LocalDeviceObject( objectName=args.ini.objectname, objectIdentifier=int(args.ini.objectidentifier), maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), segmentationSupported=args.ini.segmentationsupported, vendorIdentifier=int(args.ini.vendoridentifier), ) # make a simple application this_application = ReadRangeApplication(this_device, args.ini.address) # get the services supported services_supported = this_application.get_services_supported() if _debug: _log.debug(" - services_supported: %r", services_supported) # let the device object know this_device.protocolServicesSupported = services_supported.value # make a console this_console = ReadRangeConsoleCmd() if _debug: _log.debug(" - this_console: %r", this_console) # enable sleeping will help with threads enable_sleeping() _log.debug("running") run() _log.debug("fini") if __name__ == "__main__": main()