diff --git a/samples/ReadProperty.py b/samples/ReadProperty.py index 3624333..15bb318 100755 --- a/samples/ReadProperty.py +++ b/samples/ReadProperty.py @@ -15,7 +15,7 @@ from bacpypes.consolecmd import ConsoleCmd from bacpypes.core import run, enable_sleeping from bacpypes.pdu import Address -from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK +from bacpypes.apdu import ReadPropertyRequest, ReadPropertyACK from bacpypes.primitivedata import Unsigned from bacpypes.constructeddata import Array @@ -30,61 +30,6 @@ _log = ModuleLogger(globals()) # globals this_application = None -# -# ReadPropertyApplication -# - -@bacpypes_debugging -class ReadPropertyApplication(BIPSimpleApplication): - - def __init__(self, *args): - if _debug: ReadPropertyApplication._debug("__init__ %r", args) - BIPSimpleApplication.__init__(self, *args) - - # keep track of requests to line up responses - self._request = None - - def request(self, apdu): - if _debug: ReadPropertyApplication._debug("request %r", apdu) - - # save a copy of the request - self._request = apdu - - # forward it along - BIPSimpleApplication.request(self, apdu) - - def confirmation(self, apdu): - if _debug: ReadPropertyApplication._debug("confirmation %r", apdu) - - if isinstance(apdu, Error): - sys.stdout.write("error: %s\n" % (apdu.errorCode,)) - sys.stdout.flush() - - elif isinstance(apdu, AbortPDU): - apdu.debug_contents() - - elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): - # find the datatype - datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) - if _debug: ReadPropertyApplication._debug(" - datatype: %r", datatype) - if not datatype: - raise TypeError("unknown datatype") - - # special case for array parts, others are managed by cast_out - if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): - if apdu.propertyArrayIndex == 0: - value = apdu.propertyValue.cast_out(Unsigned) - else: - value = apdu.propertyValue.cast_out(datatype.subtype) - else: - value = apdu.propertyValue.cast_out(datatype) - if _debug: ReadPropertyApplication._debug(" - value: %r", value) - - sys.stdout.write(str(value) + '\n') - if hasattr(value, 'debug_contents'): - value.debug_contents(file=sys.stdout) - sys.stdout.flush() - # # ReadPropertyConsoleCmd @@ -124,7 +69,45 @@ class ReadPropertyConsoleCmd(ConsoleCmd): if _debug: ReadPropertyConsoleCmd._debug(" - request: %r", request) # give it to the application - this_application.request(request) + iocb = this_application.request(request) + if _debug: ReadPropertyConsoleCmd._debug(" - iocb: %r", iocb) + + # wait for it to complete + iocb.wait() + + # do something for success + if iocb.ioResponse: + apdu = iocb.ioResponse + + # should be an ack + if not isinstance(apdu, ReadPropertyACK): + if _debug: ReadPropertyConsoleCmd._debug(" - not an ack") + return + + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) + if _debug: ReadPropertyConsoleCmd._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError("unknown datatype") + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): + if apdu.propertyArrayIndex == 0: + value = apdu.propertyValue.cast_out(Unsigned) + else: + value = apdu.propertyValue.cast_out(datatype.subtype) + else: + value = apdu.propertyValue.cast_out(datatype) + if _debug: ReadPropertyConsoleCmd._debug(" - value: %r", value) + + sys.stdout.write(str(value) + '\n') + if hasattr(value, 'debug_contents'): + value.debug_contents(file=sys.stdout) + sys.stdout.flush() + + # do something for error/reject/abort + if iocb.ioError: + sys.stdout.write(str(iocb.ioError) + '\n') except Exception as error: ReadPropertyConsoleCmd._exception("exception: %r", error) @@ -169,7 +152,7 @@ def main(): ) # make a simple application - this_application = ReadPropertyApplication(this_device, args.ini.address) + this_application = BIPSimpleApplication(this_device, args.ini.address) # get the services supported services_supported = this_application.get_services_supported() diff --git a/samples/ReadProperty2.py b/samples/ReadProperty2.py deleted file mode 100644 index 15bb318..0000000 --- a/samples/ReadProperty2.py +++ /dev/null @@ -1,178 +0,0 @@ -#!/usr/bin/env python - -""" -This application presents a 'console' prompt to the user asking for read commands -which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK -and prints the value. -""" - -import sys - -from bacpypes.debugging import bacpypes_debugging, ModuleLogger -from bacpypes.consolelogging import ConfigArgumentParser -from bacpypes.consolecmd import ConsoleCmd - -from bacpypes.core import run, enable_sleeping - -from bacpypes.pdu import Address -from bacpypes.apdu import ReadPropertyRequest, ReadPropertyACK -from bacpypes.primitivedata import Unsigned -from bacpypes.constructeddata import Array - -from bacpypes.app import BIPSimpleApplication -from bacpypes.object import get_object_class, get_datatype -from bacpypes.service.device import LocalDeviceObject - -# some debugging -_debug = 0 -_log = ModuleLogger(globals()) - -# globals -this_application = None - - -# -# ReadPropertyConsoleCmd -# - -@bacpypes_debugging -class ReadPropertyConsoleCmd(ConsoleCmd): - - def do_read(self, args): - """read [ ]""" - args = args.split() - if _debug: ReadPropertyConsoleCmd._debug("do_read %r", args) - - try: - addr, obj_type, obj_inst, prop_id = args[:4] - - if obj_type.isdigit(): - obj_type = int(obj_type) - elif not get_object_class(obj_type): - raise ValueError("unknown object type") - - obj_inst = int(obj_inst) - - datatype = get_datatype(obj_type, prop_id) - if not datatype: - raise ValueError("invalid property for object type") - - # build a request - request = ReadPropertyRequest( - objectIdentifier=(obj_type, obj_inst), - propertyIdentifier=prop_id, - ) - request.pduDestination = Address(addr) - - if len(args) == 5: - request.propertyArrayIndex = int(args[4]) - if _debug: ReadPropertyConsoleCmd._debug(" - request: %r", request) - - # give it to the application - iocb = this_application.request(request) - if _debug: ReadPropertyConsoleCmd._debug(" - iocb: %r", iocb) - - # wait for it to complete - iocb.wait() - - # do something for success - if iocb.ioResponse: - apdu = iocb.ioResponse - - # should be an ack - if not isinstance(apdu, ReadPropertyACK): - if _debug: ReadPropertyConsoleCmd._debug(" - not an ack") - return - - # find the datatype - datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) - if _debug: ReadPropertyConsoleCmd._debug(" - datatype: %r", datatype) - if not datatype: - raise TypeError("unknown datatype") - - # special case for array parts, others are managed by cast_out - if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): - if apdu.propertyArrayIndex == 0: - value = apdu.propertyValue.cast_out(Unsigned) - else: - value = apdu.propertyValue.cast_out(datatype.subtype) - else: - value = apdu.propertyValue.cast_out(datatype) - if _debug: ReadPropertyConsoleCmd._debug(" - value: %r", value) - - sys.stdout.write(str(value) + '\n') - if hasattr(value, 'debug_contents'): - value.debug_contents(file=sys.stdout) - sys.stdout.flush() - - # do something for error/reject/abort - if iocb.ioError: - sys.stdout.write(str(iocb.ioError) + '\n') - - except Exception as error: - ReadPropertyConsoleCmd._exception("exception: %r", error) - - def do_rtn(self, args): - """rtn ... """ - args = args.split() - if _debug: ReadPropertyConsoleCmd._debug("do_rtn %r", args) - - # safe to assume only one adapter - adapter = this_application.nsap.adapters[0] - if _debug: ReadPropertyConsoleCmd._debug(" - adapter: %r", adapter) - - # provide the address and a list of network numbers - router_address = Address(args[0]) - network_list = [int(arg) for arg in args[1:]] - - # pass along to the service access point - this_application.nsap.add_router_references(adapter, router_address, network_list) - - -# -# __main__ -# - -def main(): - global this_application - - # parse the command line arguments - args = ConfigArgumentParser(description=__doc__).parse_args() - - if _debug: _log.debug("initialization") - if _debug: _log.debug(" - args: %r", args) - - # make a device object - this_device = LocalDeviceObject( - objectName=args.ini.objectname, - objectIdentifier=int(args.ini.objectidentifier), - maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), - segmentationSupported=args.ini.segmentationsupported, - vendorIdentifier=int(args.ini.vendoridentifier), - ) - - # make a simple application - this_application = BIPSimpleApplication(this_device, args.ini.address) - - # get the services supported - services_supported = this_application.get_services_supported() - if _debug: _log.debug(" - services_supported: %r", services_supported) - - # let the device object know - this_device.protocolServicesSupported = services_supported.value - - # make a console - this_console = ReadPropertyConsoleCmd() - if _debug: _log.debug(" - this_console: %r", this_console) - - # enable sleeping will help with threads - enable_sleeping() - - _log.debug("running") - - run() - - _log.debug("fini") - -if __name__ == "__main__": - main()