diff --git a/samples/ReadPropertyForeign.py b/samples/ReadPropertyForeign.py new file mode 100755 index 0000000..7d81844 --- /dev/null +++ b/samples/ReadPropertyForeign.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python + +""" +This application presents a 'console' prompt to the user asking for read commands +which create ReadPropertyRequest PDUs, then lines up the coorresponding +ReadPropertyACK and prints the value. + +In addition to the usual INI parameters that are common to BACpypes applications, +this application references two additional parameters: + + foreignBBMD: the BACpypes IP Address of the BBMD to register + foreignTTL: the time-to-live to keep the registration alive + +The BBMDForeign class will send the BVLL registration request after the core +starts up and maintain it. If the device does not get an 'ack' then it will +not send requests, even to devices that it would be able to talk otherwise. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run, deferred, enable_sleeping +from bacpypes.iocb import IOCB + +from bacpypes.pdu import Address +from bacpypes.apdu import ReadPropertyRequest, ReadPropertyACK +from bacpypes.primitivedata import Unsigned, ObjectIdentifier +from bacpypes.constructeddata import Array + +from bacpypes.app import BIPForeignApplication +from bacpypes.object import get_datatype +from bacpypes.local.device import LocalDeviceObject + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_application = None + + +# +# ReadPropertyConsoleCmd +# + +@bacpypes_debugging +class ReadPropertyConsoleCmd(ConsoleCmd): + + def do_read(self, args): + """read [ ]""" + args = args.split() + if _debug: ReadPropertyConsoleCmd._debug("do_read %r", args) + + try: + addr, obj_id, prop_id = args[:3] + obj_id = ObjectIdentifier(obj_id).value + + datatype = get_datatype(obj_id[0], prop_id) + if not datatype: + raise ValueError("invalid property for object type") + + # build a request + request = ReadPropertyRequest( + objectIdentifier=obj_id, + propertyIdentifier=prop_id, + ) + request.pduDestination = Address(addr) + + if len(args) == 4: + request.propertyArrayIndex = int(args[3]) + if _debug: ReadPropertyConsoleCmd._debug(" - request: %r", request) + + # make an IOCB + iocb = IOCB(request) + if _debug: ReadPropertyConsoleCmd._debug(" - iocb: %r", iocb) + + # give it to the application + deferred(this_application.request_io, iocb) + + # wait for it to complete + iocb.wait() + + # do something for error/reject/abort + if iocb.ioError: + sys.stdout.write(str(iocb.ioError) + '\n') + + # do something for success + elif iocb.ioResponse: + apdu = iocb.ioResponse + + # should be an ack + if not isinstance(apdu, ReadPropertyACK): + if _debug: ReadPropertyConsoleCmd._debug(" - not an ack") + return + + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) + if _debug: ReadPropertyConsoleCmd._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError("unknown datatype") + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): + if apdu.propertyArrayIndex == 0: + value = apdu.propertyValue.cast_out(Unsigned) + else: + value = apdu.propertyValue.cast_out(datatype.subtype) + else: + value = apdu.propertyValue.cast_out(datatype) + if _debug: ReadPropertyConsoleCmd._debug(" - value: %r", value) + + sys.stdout.write(str(value) + '\n') + if hasattr(value, 'debug_contents'): + value.debug_contents(file=sys.stdout) + sys.stdout.flush() + + # do something with nothing? + else: + if _debug: ReadPropertyConsoleCmd._debug(" - ioError or ioResponse expected") + + except Exception as error: + ReadPropertyConsoleCmd._exception("exception: %r", error) + + def do_rtn(self, args): + """rtn ... """ + args = args.split() + if _debug: ReadPropertyConsoleCmd._debug("do_rtn %r", args) + + # provide the address and a list of network numbers + router_address = Address(args[0]) + network_list = [int(arg) for arg in args[1:]] + + # pass along to the service access point + this_application.nsap.add_router_references(None, router_address, network_list) + + +# +# __main__ +# + +def main(): + global this_application + + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject(ini=args.ini) + if _debug: _log.debug(" - this_device: %r", this_device) + + # make a simple application + this_application = BIPForeignApplication( + this_device, args.ini.address, + Address(args.ini.foreignbbmd), + int(args.ini.foreignttl), + ) + + # make a console + this_console = ReadPropertyConsoleCmd() + if _debug: _log.debug(" - this_console: %r", this_console) + + # enable sleeping will help with threads + enable_sleeping() + + _log.debug("running") + + run() + + _log.debug("fini") + +if __name__ == "__main__": + main()