#!/usr/bin/env python """ This application presents a 'console' prompt to the user asking for read commands which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK and prints the value. """ import sys from bacpypes.debugging import bacpypes_debugging, ModuleLogger from bacpypes.consolelogging import ConfigArgumentParser from bacpypes.consolecmd import ConsoleCmd from bacpypes.core import run, enable_sleeping from bacpypes.iocb import IOCB from bacpypes.pdu import Address from bacpypes.apdu import ReadPropertyRequest, ReadPropertyACK from bacpypes.primitivedata import Unsigned from bacpypes.constructeddata import Array from bacpypes.app import BIPSimpleApplication from bacpypes.object import get_object_class, get_datatype from bacpypes.local.device import LocalDeviceObject # some debugging _debug = 0 _log = ModuleLogger(globals()) # globals this_application = None # # ReadPropertyConsoleCmd # @bacpypes_debugging class ReadPropertyConsoleCmd(ConsoleCmd): def do_read(self, args): """read [ ]""" args = args.split() if _debug: ReadPropertyConsoleCmd._debug("do_read %r", args) try: addr, obj_type, obj_inst, prop_id = args[:4] if obj_type.isdigit(): obj_type = int(obj_type) elif not get_object_class(obj_type): raise ValueError("unknown object type") obj_inst = int(obj_inst) datatype = get_datatype(obj_type, prop_id) if not datatype: raise ValueError("invalid property for object type") # build a request request = ReadPropertyRequest( objectIdentifier=(obj_type, obj_inst), propertyIdentifier=prop_id, ) request.pduDestination = Address(addr) if len(args) == 5: request.propertyArrayIndex = int(args[4]) if _debug: ReadPropertyConsoleCmd._debug(" - request: %r", request) # make an IOCB iocb = IOCB(request) if _debug: ReadPropertyConsoleCmd._debug(" - iocb: %r", iocb) # give it to the application this_application.request_io(iocb) # wait for it to complete iocb.wait() # do something for error/reject/abort if iocb.ioError: sys.stdout.write(str(iocb.ioError) + '\n') # do something for success elif iocb.ioResponse: apdu = iocb.ioResponse # should be an ack if not isinstance(apdu, ReadPropertyACK): if _debug: ReadPropertyConsoleCmd._debug(" - not an ack") return # find the datatype datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) if _debug: ReadPropertyConsoleCmd._debug(" - datatype: %r", datatype) if not datatype: raise TypeError("unknown datatype") # special case for array parts, others are managed by cast_out if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): if apdu.propertyArrayIndex == 0: value = apdu.propertyValue.cast_out(Unsigned) else: value = apdu.propertyValue.cast_out(datatype.subtype) else: value = apdu.propertyValue.cast_out(datatype) if _debug: ReadPropertyConsoleCmd._debug(" - value: %r", value) sys.stdout.write(str(value) + '\n') if hasattr(value, 'debug_contents'): value.debug_contents(file=sys.stdout) sys.stdout.flush() # do something with nothing? else: if _debug: ReadPropertyConsoleCmd._debug(" - ioError or ioResponse expected") except Exception as error: ReadPropertyConsoleCmd._exception("exception: %r", error) def do_rtn(self, args): """rtn ... """ args = args.split() if _debug: ReadPropertyConsoleCmd._debug("do_rtn %r", args) # provide the address and a list of network numbers router_address = Address(args[0]) network_list = [int(arg) for arg in args[1:]] # pass along to the service access point this_application.nsap.add_router_references(None, router_address, network_list) # # __main__ # def main(): global this_application # parse the command line arguments args = ConfigArgumentParser(description=__doc__).parse_args() if _debug: _log.debug("initialization") if _debug: _log.debug(" - args: %r", args) # make a device object this_device = LocalDeviceObject(ini=args.ini) if _debug: _log.debug(" - this_device: %r", this_device) # make a simple application this_application = BIPSimpleApplication(this_device, args.ini.address) # make a console this_console = ReadPropertyConsoleCmd() if _debug: _log.debug(" - this_console: %r", this_console) # enable sleeping will help with threads enable_sleeping() _log.debug("running") run() _log.debug("fini") if __name__ == "__main__": main()