From 41199871a2ecd19b51e5faf75cfc83094048f766 Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Tue, 25 Aug 2015 21:26:08 -0400 Subject: [PATCH 1/7] starting with the SourceForge samples --- BACpypes~.ini | 11 + samples/CommandableMixin.py | 200 ++++++++++++++ samples/IP2IPRouter.py | 125 +++++++++ samples/MultiStateValueObject.py | 69 +++++ samples/MultipleReadProperty.py | 165 ++++++++++++ samples/RandomAnalogValueObject.py | 126 +++++++++ samples/ReadProperty.py | 174 +++++++++++++ samples/ReadPropertyMultiple.py | 245 +++++++++++++++++ samples/ReadPropertyMultipleServer.py | 319 +++++++++++++++++++++++ samples/ReadRange.py | 166 ++++++++++++ samples/ReadWriteFile.py | 269 +++++++++++++++++++ samples/ReadWriteFileServer.py | 219 ++++++++++++++++ samples/ReadWriteProperty.py | 258 ++++++++++++++++++ samples/RecurringMultipleReadProperty.py | 198 ++++++++++++++ samples/RecurringTask.py | 74 ++++++ samples/SampleApplication.py | 87 +++++++ samples/SampleConsoleCmd.py | 99 +++++++ samples/VendorAVObject.py | 135 ++++++++++ samples/VendorReadWriteProperty.py | 285 ++++++++++++++++++++ samples/WhoHasIHaveApplication.py | 116 +++++++++ samples/WhoIsIAm.py | 185 +++++++++++++ samples/WhoIsIAmApplication.py | 112 ++++++++ samples/WhoIsRouter.py | 128 +++++++++ 23 files changed, 3765 insertions(+) create mode 100644 BACpypes~.ini create mode 100755 samples/CommandableMixin.py create mode 100755 samples/IP2IPRouter.py create mode 100755 samples/MultiStateValueObject.py create mode 100755 samples/MultipleReadProperty.py create mode 100755 samples/RandomAnalogValueObject.py create mode 100755 samples/ReadProperty.py create mode 100755 samples/ReadPropertyMultiple.py create mode 100755 samples/ReadPropertyMultipleServer.py create mode 100755 samples/ReadRange.py create mode 100755 samples/ReadWriteFile.py create mode 100755 samples/ReadWriteFileServer.py create mode 100755 samples/ReadWriteProperty.py create mode 100644 samples/RecurringMultipleReadProperty.py create mode 100755 samples/RecurringTask.py create mode 100755 samples/SampleApplication.py create mode 100755 samples/SampleConsoleCmd.py create mode 100755 samples/VendorAVObject.py create mode 100755 samples/VendorReadWriteProperty.py create mode 100755 samples/WhoHasIHaveApplication.py create mode 100755 samples/WhoIsIAm.py create mode 100755 samples/WhoIsIAmApplication.py create mode 100755 samples/WhoIsRouter.py diff --git a/BACpypes~.ini b/BACpypes~.ini new file mode 100644 index 0000000..4b16488 --- /dev/null +++ b/BACpypes~.ini @@ -0,0 +1,11 @@ +[BACpypes] +objectName: Betelgeuse +address: 128.253.109.40/24 +objectIdentifier: 599 +maxApduLengthAccepted: 1024 +segmentationSupported: segmentedBoth +vendorIdentifier: 15 +foreignPort: 0 +foreignBBMD: 128.253.109.254 +foreignTTL: 30 + diff --git a/samples/CommandableMixin.py b/samples/CommandableMixin.py new file mode 100755 index 0000000..ffad52e --- /dev/null +++ b/samples/CommandableMixin.py @@ -0,0 +1,200 @@ +#!/usr/bin/python + +""" +This sample application demonstrates a mix-in class for commandable properties +(not useful for Binary Out or Binary Value objects that have a minimum on and off +time, or for Channel objects). +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run +from bacpypes.errors import ExecutionError + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import AnalogValueObject, DateValueObject +from bacpypes.primitivedata import Null +from bacpypes.basetypes import PriorityValue, PriorityArray + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# +# CommandableMixin +# + +@bacpypes_debugging +class CommandableMixin(object): + + def __init__(self, init_value, **kwargs): + if _debug: CommandableMixin._debug("__init__ %r, %r", init_value, kwargs) + super(CommandableMixin, self).__init__(**kwargs) + + # if no present value given, give it the default value + if ('presentValue' not in kwargs): + if _debug: CommandableMixin._debug(" - initialize present value") + self.presentValue = init_value + + # if no priority array given, give it an empty one + if ('priorityArray' not in kwargs): + if _debug: CommandableMixin._debug(" - initialize priority array") + self.priorityArray = PriorityArray() + for i in range(16): + self.priorityArray.append(PriorityValue(null=Null())) + + # if no relinquish default value given, give it the default value + if ('relinquishDefault' not in kwargs): + if _debug: CommandableMixin._debug(" - initialize relinquish default") + self.relinquishDefault = init_value + + # capture the present value property + self._pv = self._properties['presentValue'] + if _debug: CommandableMixin._debug(" - _pv: %r", self._pv) + + # capture the datatype + self._pv_datatype = self._pv.datatype + if _debug: CommandableMixin._debug(" - _pv_datatype: %r", self._pv_datatype) + + # look up a matching priority value choice + for element in PriorityValue.choiceElements: + if element.klass is self._pv_datatype: + self._pv_choice = element.name + break + else: + self._pv_choice = 'constructedValue' + if _debug: CommandableMixin._debug(" - _pv_choice: %r", self._pv_choice) + + def WriteProperty(self, property, value, arrayIndex=None, priority=None, direct=False): + if _debug: CommandableMixin._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", property, value, arrayIndex, priority, direct) + + # when writing to the presentValue with a priority + if (property == 'presentValue'): + # default (lowest) priority + if priority is None: + priority = 16 + if _debug: CommandableMixin._debug(" - translate to array index %d", priority) + + # translate to updating the priority array + property = 'priorityArray' + arrayIndex = priority + priority = None + + # update the priority array entry + if (property == 'priorityArray') and (arrayIndex is not None): + # check the bounds + if arrayIndex == 0: + raise ExecutionError(errorClass='property', errorCode='writeAccessDenied') + if (arrayIndex < 1) or (arrayIndex > 16): + raise ExecutionError(errorClass='property', errorCode='invalidArrayIndex') + + # update the specific priorty value element + priority_value = self.priorityArray[arrayIndex] + if _debug: CommandableMixin._debug(" - priority_value: %r", priority_value) + + # the null or the choice has to be set, the other clear + if value is (): + if _debug: CommandableMixin._debug(" - write a null") + priority_value.null = value + setattr(priority_value, self._pv_choice, None) + else: + if _debug: CommandableMixin._debug(" - write a value") + priority_value.null = None + setattr(priority_value, self._pv_choice, value) + + # look for the highest priority value + for i in range(1, 17): + priority_value = self.priorityArray[i] + if priority_value.null is None: + if (i < arrayIndex): + if _debug: CommandableMixin._debug(" - existing higher priority value") + return + value = getattr(priority_value, self._pv_choice) + break + else: + value = self.relinquishDefault + if _debug: CommandableMixin._debug(" - new present value: %r", value) + + property = 'presentValue' + arrayIndex = priority = None + + # allow the request to pass through + if _debug: CommandableMixin._debug(" - super: %r %r arrayIndex=%r priority=%r", property, value, arrayIndex, priority) + super(CommandableMixin, self).WriteProperty( + property, value, + arrayIndex=arrayIndex, priority=priority, direct=direct, + ) + +# +# CommandableAnalogValueObject +# + +@bacpypes_debugging +class CommandableAnalogValueObject(CommandableMixin, AnalogValueObject): + + def __init__(self, **kwargs): + if _debug: CommandableAnalogValueObject._debug("__init__ %r", kwargs) + CommandableMixin.__init__(self, 0.0, **kwargs) + +# +# CommandableDateValueObject +# + +@bacpypes_debugging +class CommandableDateValueObject(CommandableMixin, DateValueObject): + + def __init__(self, **kwargs): + if _debug: CommandableDateValueObject._debug("__init__ %r", kwargs) + CommandableMixin.__init__(self, False, **kwargs) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a sample application + this_application = BIPSimpleApplication(this_device, args.ini.address) + + # make a commandable analog value object, add to the device + cavo1 = CommandableAnalogValueObject( + objectIdentifier=('analogValue', 1), objectName='Commandable AV 1' + ) + if _debug: _log.debug(" - cavo1: %r", cavo1) + this_application.add_object(cavo1) + + # make a commandable binary value object, add to the device + cdvo2 = CommandableDateValueObject( + objectIdentifier=('dateValue', 1), objectName='Commandable2' + ) + if _debug: _log.debug(" - cdvo2: %r", cdvo2) + this_application.add_object(cdvo2) + + if _debug: _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + if _debug: _log.debug("finally") + diff --git a/samples/IP2IPRouter.py b/samples/IP2IPRouter.py new file mode 100755 index 0000000..2939999 --- /dev/null +++ b/samples/IP2IPRouter.py @@ -0,0 +1,125 @@ +#!/usr/bin/python + +""" +This sample application presents itself as a router between two +IP networks. This application can run on a single homed machine +by using the same IP address and two different port numbers, or +to be closer to what is typically considered a router, on a +multihomed machine using two different IP addresses and the same +port number. + +$ python IP2IPRtouer.py addr1 net1 addr2 net2 + + addr1 - local address like 192.168.1.2/24:47808 + net1 - network number + addr2 - local address like 192.168.3.4/24:47809 + net2 - network number + +As a router, this does not have an application layer. +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ArgumentParser + +from bacpypes.core import run +from bacpypes.comm import bind + +from bacpypes.pdu import Address +from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement +from bacpypes.bvllservice import BIPSimple, AnnexJCodec, UDPMultiplexer + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# +# IP2IPRouter +# + +@bacpypes_debugging +class IP2IPRouter: + + def __init__(self, addr1, net1, addr2, net2): + if _debug: IP2IPRouter._debug("__init__ %r %r %r %r", addr1, net1, addr2, net2) + + # a network service access point will be needed + self.nsap = NetworkServiceAccessPoint() + + # give the NSAP a generic network layer service element + self.nse = NetworkServiceElement() + bind(self.nse, self.nsap) + + #== First stack + + # create a generic BIP stack, bound to the Annex J server + # on the UDP multiplexer + self.s1_bip = BIPSimple() + self.s1_annexj = AnnexJCodec() + self.s1_mux = UDPMultiplexer(addr1) + + # bind the bottom layers + bind(self.s1_bip, self.s1_annexj, self.s1_mux.annexJ) + + # bind the BIP stack to the local network + self.nsap.bind(self.s1_bip, net1, addr1) + + #== Second stack + + # create a generic BIP stack, bound to the Annex J server + # on the UDP multiplexer + self.s2_bip = BIPSimple() + self.s2_annexj = AnnexJCodec() + self.s2_mux = UDPMultiplexer(addr2) + + # bind the bottom layers + bind(self.s2_bip, self.s2_annexj, self.s2_mux.annexJ) + + # bind the BIP stack to the local network + self.nsap.bind(self.s2_bip, net2) + +# +# __main__ +# + +try: + # parse the command line arguments + parser = ArgumentParser(description=__doc__) + + # add an argument for interval + parser.add_argument('addr1', type=str, + help='address of first network', + ) + + # add an argument for interval + parser.add_argument('net1', type=int, + help='network number of first network', + ) + + # add an argument for interval + parser.add_argument('addr2', type=str, + help='address of second network', + ) + + # add an argument for interval + parser.add_argument('net2', type=int, + help='network number of second network', + ) + + # now parse the arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # create the router + router = IP2IPRouter(Address(args.addr1), args.net1, Address(args.addr2), args.net2) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/MultiStateValueObject.py b/samples/MultiStateValueObject.py new file mode 100755 index 0000000..c7b9079 --- /dev/null +++ b/samples/MultiStateValueObject.py @@ -0,0 +1,69 @@ +#!/usr/bin/python + +""" +This sample application provides a single MultiState Value Object to test +reading and writing its various properties. +""" + +from bacpypes.debugging import ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import MultiStateValueObject + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a sample application + this_application = BIPSimpleApplication(this_device, args.ini.address) + + # make a multistate value object + msvo = MultiStateValueObject( + objectIdentifier=('multiStateValue', 1), + objectName='My Special Object', + presentValue=1, + numberOfStates=3, + stateText=['red', 'green', 'blue'], + ) + _log.debug(" - msvo: %r", msvo) + + # add it to the device + this_application.add_object(msvo) + _log.debug(" - object list: %r", this_device.objectList) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/MultipleReadProperty.py b/samples/MultipleReadProperty.py new file mode 100755 index 0000000..c4ce92c --- /dev/null +++ b/samples/MultipleReadProperty.py @@ -0,0 +1,165 @@ +#!/usr/bin/python + +""" +Mutliple Read Property + +This application has a static list of points that it would like to read. It reads the +values of each of them in turn and then quits. +""" + +from collections import deque + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run, stop, deferred + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_datatype + +from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK +from bacpypes.primitivedata import Unsigned +from bacpypes.constructeddata import Array +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# point list +point_list = [ + ('1.2.3.4', 'analogValue', 1, 'presentValue'), + ('1.2.3.4', 'analogValue', 2, 'presentValue'), + ] + +# +# ReadPointListApplication +# + +@bacpypes_debugging +class ReadPointListApplication(BIPSimpleApplication): + + def __init__(self, point_list, *args): + if _debug: ReadPointListApplication._debug("__init__ %r, %r", point_list, args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + # make a list of the response values + self.response_values = [] + + # turn the point list into a queue + self.point_queue = deque(point_list) + + def next_request(self): + if _debug: ReadPointListApplication._debug("next_request") + + # check to see if we're done + if not self.point_queue: + if _debug: ReadPointListApplication._debug(" - done") + stop() + return + + # get the next request + addr, obj_type, obj_inst, prop_id = self.point_queue.popleft() + + # build a request + self._request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + self._request.pduDestination = Address(addr) + if _debug: ReadPointListApplication._debug(" - request: %r", self._request) + + # forward it along + BIPSimpleApplication.request(self, self._request) + + def confirmation(self, apdu): + if _debug: ReadPointListApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + if _debug: ReadPointListApplication._debug(" - error: %r", apdu) + self.response_values.append(apdu) + + elif isinstance(apdu, AbortPDU): + if _debug: ReadPointListApplication._debug(" - abort: %r", apdu) + self.response_values.append(apdu) + + elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) + if _debug: ReadPointListApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): + if apdu.propertyArrayIndex == 0: + value = apdu.propertyValue.cast_out(Unsigned) + else: + value = apdu.propertyValue.cast_out(datatype.subtype) + else: + value = apdu.propertyValue.cast_out(datatype) + if _debug: ReadPointListApplication._debug(" - value: %r", value) + + # save the value + self.response_values.append(value) + + # fire off another request + deferred(self.next_request) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadPointListApplication(point_list, this_device, args.ini.address) + + # fire off a request when the core has a chance + deferred(this_application.next_request) + + _log.debug("running") + + run() + + # dump out the results + for request, response in zip(point_list, this_application.response_values): + print request, response + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/RandomAnalogValueObject.py b/samples/RandomAnalogValueObject.py new file mode 100755 index 0000000..fec1181 --- /dev/null +++ b/samples/RandomAnalogValueObject.py @@ -0,0 +1,126 @@ +#!/usr/bin/python + +""" +This sample application shows how to extend one of the basic objects, an Analog +Value Object in this case, to provide a present value. This type of code is used +when the application is providing a BACnet interface to a collection of data. +It assumes that almost all of the default behaviour of a BACpypes application is +sufficient. +""" + +import random + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.primitivedata import Real +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import AnalogValueObject, Property, register_object_type +from bacpypes.errors import ExecutionError + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# +# RandomValueProperty +# + +@bacpypes_debugging +class RandomValueProperty(Property): + + def __init__(self, identifier): + if _debug: RandomValueProperty._debug("__init__ %r", identifier) + Property.__init__(self, identifier, Real, default=None, optional=True, mutable=False) + + def ReadProperty(self, obj, arrayIndex=None): + if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex) + + # access an array + if arrayIndex is not None: + raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray') + + # return a random value + value = random.random() * 100.0 + if _debug: RandomValueProperty._debug(" - value: %r", value) + + return value + + def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False): + if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct) + raise ExecutionError(errorClass='property', errorCode='writeAccessDenied') + +# +# Random Value Object Type +# + +@bacpypes_debugging +class RandomAnalogValueObject(AnalogValueObject): + + properties = [ + RandomValueProperty('presentValue'), + ] + + def __init__(self, **kwargs): + if _debug: RandomAnalogValueObject._debug("__init__ %r", kwargs) + AnalogValueObject.__init__(self, **kwargs) + +register_object_type(RandomAnalogValueObject) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=('device', int(args.ini.objectidentifier)), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a sample application + this_application = BIPSimpleApplication(this_device, args.ini.address) + + # make a random input object + ravo1 = RandomAnalogValueObject( + objectIdentifier=('analogValue', 1), objectName='Random1' + ) + _log.debug(" - ravo1: %r", ravo1) + + ravo1d = ravo1._dict_contents() + print ravo1d + + ravo2 = RandomAnalogValueObject( + objectIdentifier=('analogValue', 2), objectName='Random2' + ) + _log.debug(" - ravo2: %r", ravo2) + + # add it to the device + this_application.add_object(ravo1) + this_application.add_object(ravo2) + _log.debug(" - object list: %r", this_device.objectList) + + print this_device._dict_contents() + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/ReadProperty.py b/samples/ReadProperty.py new file mode 100755 index 0000000..04c6e42 --- /dev/null +++ b/samples/ReadProperty.py @@ -0,0 +1,174 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for read commands +which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK +and prints the value. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_object_class, get_datatype + +from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK +from bacpypes.primitivedata import Unsigned +from bacpypes.constructeddata import Array +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# ReadPropertyApplication +# + +@bacpypes_debugging +class ReadPropertyApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: ReadPropertyApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: ReadPropertyApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: ReadPropertyApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) + if _debug: ReadPropertyApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): + if apdu.propertyArrayIndex == 0: + value = apdu.propertyValue.cast_out(Unsigned) + else: + value = apdu.propertyValue.cast_out(datatype.subtype) + else: + value = apdu.propertyValue.cast_out(datatype) + if _debug: ReadPropertyApplication._debug(" - value: %r", value) + + sys.stdout.write(str(value) + '\n') + if hasattr(value, 'debug_contents'): + value.debug_contents(file=sys.stdout) + sys.stdout.flush() + +# +# ReadPropertyConsoleCmd +# + +@bacpypes_debugging +class ReadPropertyConsoleCmd(ConsoleCmd): + + def do_read(self, args): + """read [ ]""" + args = args.split() + if _debug: ReadPropertyConsoleCmd._debug("do_read %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type): + raise ValueError, "unknown object type" + + obj_inst = int(obj_inst) + + datatype = get_datatype(obj_type, prop_id) + if not datatype: + raise ValueError, "invalid property for object type" + + # build a request + request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + request.pduDestination = Address(addr) + + if len(args) == 5: + request.propertyArrayIndex = int(args[4]) + if _debug: ReadPropertyConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadPropertyConsoleCmd._exception("exception: %r", e) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadPropertyApplication(this_device, args.ini.address) + this_console = ReadPropertyConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/ReadPropertyMultiple.py b/samples/ReadPropertyMultiple.py new file mode 100755 index 0000000..ab7c1ae --- /dev/null +++ b/samples/ReadPropertyMultiple.py @@ -0,0 +1,245 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for read commands +which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK +and prints the value. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_object_class, get_datatype + +from bacpypes.apdu import ReadPropertyMultipleRequest, PropertyReference, ReadAccessSpecification, Error, AbortPDU, ReadPropertyMultipleACK +from bacpypes.primitivedata import Unsigned +from bacpypes.constructeddata import Array +from bacpypes.basetypes import PropertyIdentifier, ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# ReadPropertyMultipleApplication +# + +@bacpypes_debugging +class ReadPropertyMultipleApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: ReadPropertyMultipleApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: ReadPropertyMultipleApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: ReadPropertyMultipleApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + elif (isinstance(self._request, ReadPropertyMultipleRequest)) and (isinstance(apdu, ReadPropertyMultipleACK)): + # loop through the results + for result in apdu.listOfReadAccessResults: + # here is the object identifier + objectIdentifier = result.objectIdentifier + if _debug: ReadPropertyMultipleApplication._debug(" - objectIdentifier: %r", objectIdentifier) + + # now come the property values per object + for element in result.listOfResults: + # get the property and array index + propertyIdentifier = element.propertyIdentifier + if _debug: ReadPropertyMultipleApplication._debug(" - propertyIdentifier: %r", propertyIdentifier) + propertyArrayIndex = element.propertyArrayIndex + if _debug: ReadPropertyMultipleApplication._debug(" - propertyArrayIndex: %r", propertyArrayIndex) + + # here is the read result + readResult = element.readResult + + sys.stdout.write(propertyIdentifier) + if propertyArrayIndex is not None: + sys.stdout.write("[" + str(propertyArrayIndex) + "]") + + # check for an error + if readResult.propertyAccessError is not None: + sys.stdout.write(" ! " + str(readResult.propertyAccessError) + '\n') + + else: + # here is the value + propertyValue = readResult.propertyValue + + # find the datatype + datatype = get_datatype(objectIdentifier[0], propertyIdentifier) + if _debug: ReadPropertyMultipleApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (propertyArrayIndex is not None): + if propertyArrayIndex == 0: + value = propertyValue.cast_out(Unsigned) + else: + value = propertyValue.cast_out(datatype.subtype) + else: + value = propertyValue.cast_out(datatype) + if _debug: ReadPropertyMultipleApplication._debug(" - value: %r", value) + + sys.stdout.write(" = " + str(value) + '\n') + sys.stdout.flush() + +# +# ReadPropertyMultipleConsoleCmd +# + +@bacpypes_debugging +class ReadPropertyMultipleConsoleCmd(ConsoleCmd): + + def do_read(self, args): + """read ( ( [ ] )... )...""" + args = args.split() + if _debug: ReadPropertyMultipleConsoleCmd._debug("do_read %r", args) + + try: + i = 0 + addr = args[i] + i += 1 + + read_access_spec_list = [] + while i < len(args): + obj_type = args[i] + i += 1 + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type): + raise ValueError, "unknown object type" + + obj_inst = int(args[i]) + i += 1 + + prop_reference_list = [] + while i < len(args): + prop_id = args[i] + if prop_id not in PropertyIdentifier.enumerations: + break + + i += 1 + if prop_id in ('all', 'required', 'optional'): + pass + else: + datatype = get_datatype(obj_type, prop_id) + if not datatype: + raise ValueError, "invalid property for object type" + + # build a property reference + prop_reference = PropertyReference( + propertyIdentifier=prop_id, + ) + + # check for an array index + if (i < len(args)) and args[i].isdigit(): + prop_reference.propertyArrayIndex = int(args[i]) + i += 1 + + # add it to the list + prop_reference_list.append(prop_reference) + + # check for at least one property + if not prop_reference_list: + raise ValueError, "provide at least one property" + + # build a read access specification + read_access_spec = ReadAccessSpecification( + objectIdentifier=(obj_type, obj_inst), + listOfPropertyReferences=prop_reference_list, + ) + + # add it to the list + read_access_spec_list.append(read_access_spec) + + # check for at least one + if not read_access_spec_list: + raise RuntimeError, "at least one read access specification required" + + # build the request + request = ReadPropertyMultipleRequest( + listOfReadAccessSpecs=read_access_spec_list, + ) + request.pduDestination = Address(addr) + if _debug: ReadPropertyMultipleConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadPropertyMultipleConsoleCmd._exception("exception: %r", e) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadPropertyMultipleApplication(this_device, args.ini.address) + this_console = ReadPropertyMultipleConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/ReadPropertyMultipleServer.py b/samples/ReadPropertyMultipleServer.py new file mode 100755 index 0000000..3454f59 --- /dev/null +++ b/samples/ReadPropertyMultipleServer.py @@ -0,0 +1,319 @@ +#!/usr/bin/python + +""" +This sample application shows how to extend the basic functionality of a device +to support the ReadPropertyMultiple service. +""" + +import random + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.primitivedata import Atomic, Real, Unsigned +from bacpypes.constructeddata import Array, Any +from bacpypes.basetypes import ServicesSupported, ErrorType +from bacpypes.apdu import ReadPropertyMultipleACK, ReadAccessResult, ReadAccessResultElement, ReadAccessResultElementChoice +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import AnalogValueObject, Property, PropertyError, register_object_type +from bacpypes.apdu import Error +from bacpypes.errors import ExecutionError + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# +# RandomValueProperty +# + +@bacpypes_debugging +class RandomValueProperty(Property): + + def __init__(self, identifier): + if _debug: RandomValueProperty._debug("__init__ %r", identifier) + Property.__init__(self, identifier, Real, default=None, optional=True, mutable=False) + + def ReadProperty(self, obj, arrayIndex=None): + if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex) + + # access an array + if arrayIndex is not None: + raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray') + + # return a random value + value = random.random() * 100.0 + if _debug: RandomValueProperty._debug(" - value: %r", value) + + return value + + def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False): + if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct) + raise ExecutionError(errorClass='property', errorCode='writeAccessDenied') + +# +# Random Value Object Type +# + +@bacpypes_debugging +class RandomAnalogValueObject(AnalogValueObject): + + properties = [ + RandomValueProperty('presentValue'), + ] + + def __init__(self, **kwargs): + if _debug: RandomAnalogValueObject._debug("__init__ %r", kwargs) + AnalogValueObject.__init__(self, **kwargs) + +register_object_type(RandomAnalogValueObject) + +# +# ReadPropertyToAny +# + +@bacpypes_debugging +def ReadPropertyToAny(obj, propertyIdentifier, propertyArrayIndex=None): + """Read the specified property of the object, with the optional array index, + and cast the result into an Any object.""" + if _debug: ReadPropertyToAny._debug("ReadPropertyToAny %s %r %r", obj, propertyIdentifier, propertyArrayIndex) + + # get the datatype + datatype = obj.get_datatype(propertyIdentifier) + if _debug: ReadPropertyToAny._debug(" - datatype: %r", datatype) + if datatype is None: + raise ExecutionError(errorClass='property', errorCode='datatypeNotSupported') + + # get the value + value = obj.ReadProperty(propertyIdentifier, propertyArrayIndex) + if _debug: ReadPropertyToAny._debug(" - value: %r", value) + if value is None: + raise ExecutionError(errorClass='property', errorCode='unknownProperty') + + # change atomic values into something encodeable + if issubclass(datatype, Atomic): + value = datatype(value) + elif issubclass(datatype, Array) and (propertyArrayIndex is not None): + if propertyArrayIndex == 0: + value = Unsigned(value) + elif issubclass(datatype.subtype, Atomic): + value = datatype.subtype(value) + elif not isinstance(value, datatype.subtype): + raise TypeError, "invalid result datatype, expecting %s and got %s" \ + % (datatype.subtype.__name__, type(value).__name__) + elif not isinstance(value, datatype): + raise TypeError, "invalid result datatype, expecting %s and got %s" \ + % (datatype.__name__, type(value).__name__) + if _debug: ReadPropertyToAny._debug(" - encodeable value: %r", value) + + # encode the value + result = Any() + result.cast_in(value) + if _debug: ReadPropertyToAny._debug(" - result: %r", result) + + # return the object + return result + +# +# ReadPropertyToResultElement +# + +@bacpypes_debugging +def ReadPropertyToResultElement(obj, propertyIdentifier, propertyArrayIndex=None): + """Read the specified property of the object, with the optional array index, + and cast the result into an Any object.""" + if _debug: ReadPropertyToResultElement._debug("ReadPropertyToResultElement %s %r %r", obj, propertyIdentifier, propertyArrayIndex) + + # save the result in the property value + read_result = ReadAccessResultElementChoice() + + try: + read_result.propertyValue = ReadPropertyToAny(obj, propertyIdentifier, propertyArrayIndex) + if _debug: ReadPropertyToResultElement._debug(" - success") + except PropertyError, error: + if _debug: ReadPropertyToResultElement._debug(" - error: %r", error) + read_result.propertyAccessError = ErrorType(errorClass='property', errorCode='unknownProperty') + except ExecutionError, error: + if _debug: ReadPropertyToResultElement._debug(" - error: %r", error) + read_result.propertyAccessError = ErrorType(errorClass=error.errorClass, errorCode=error.errorCode) + + # make an element for this value + read_access_result_element = ReadAccessResultElement( + propertyIdentifier=propertyIdentifier, + propertyArrayIndex=propertyArrayIndex, + readResult=read_result, + ) + if _debug: ReadPropertyToResultElement._debug(" - read_access_result_element: %r", read_access_result_element) + + # fini + return read_access_result_element + +# +# ReadPropertyMultipleApplication +# + +@bacpypes_debugging +class ReadPropertyMultipleApplication(BIPSimpleApplication): + + def __init__(self, *args, **kwargs): + if _debug: ReadPropertyMultipleApplication._debug("__init__ %r %r", args, kwargs) + BIPSimpleApplication.__init__(self, *args, **kwargs) + + def do_ReadPropertyMultipleRequest(self, apdu): + """Respond to a ReadPropertyMultiple Request.""" + if _debug: ReadPropertyMultipleApplication._debug("do_ReadPropertyMultipleRequest %r", apdu) + + # response is a list of read access results (or an error) + resp = None + read_access_result_list = [] + + # loop through the request + for read_access_spec in apdu.listOfReadAccessSpecs: + # get the object identifier + objectIdentifier = read_access_spec.objectIdentifier + if _debug: ReadPropertyMultipleApplication._debug(" - objectIdentifier: %r", objectIdentifier) + + # check for wildcard + if (objectIdentifier == ('device', 4194303)): + if _debug: ReadPropertyMultipleApplication._debug(" - wildcard device identifier") + objectIdentifier = self.localDevice.objectIdentifier + + # get the object + obj = self.get_object_id(objectIdentifier) + if _debug: ReadPropertyMultipleApplication._debug(" - object: %r", obj) + + # make sure it exists + if not obj: + resp = Error(errorClass='object', errorCode='unknownObject', context=apdu) + if _debug: ReadPropertyMultipleApplication._debug(" - unknown object error: %r", resp) + break + + # build a list of result elements + read_access_result_element_list = [] + + # loop through the property references + for prop_reference in read_access_spec.listOfPropertyReferences: + # get the property identifier + propertyIdentifier = prop_reference.propertyIdentifier + if _debug: ReadPropertyMultipleApplication._debug(" - propertyIdentifier: %r", propertyIdentifier) + + # get the array index (optional) + propertyArrayIndex = prop_reference.propertyArrayIndex + if _debug: ReadPropertyMultipleApplication._debug(" - propertyArrayIndex: %r", propertyArrayIndex) + + # check for special property identifiers + if propertyIdentifier in ('all', 'required', 'optional'): + for propId, prop in obj._properties.items(): + if _debug: ReadPropertyMultipleApplication._debug(" - checking: %r %r", propId, prop.optional) + + if (propertyIdentifier == 'all'): + pass + elif (propertyIdentifier == 'required') and (prop.optional): + if _debug: ReadPropertyMultipleApplication._debug(" - not a required property") + continue + elif (propertyIdentifier == 'optional') and (not prop.optional): + if _debug: ReadPropertyMultipleApplication._debug(" - not an optional property") + continue + + # read the specific property + read_access_result_element = ReadPropertyToResultElement(obj, propId, propertyArrayIndex) + + # check for undefined property + if read_access_result_element.readResult.propertyAccessError \ + and read_access_result_element.readResult.propertyAccessError.errorCode == 'unknownProperty': + continue + + # add it to the list + read_access_result_element_list.append(read_access_result_element) + + else: + # read the specific property + read_access_result_element = ReadPropertyToResultElement(obj, propertyIdentifier, propertyArrayIndex) + + # add it to the list + read_access_result_element_list.append(read_access_result_element) + + # build a read access result + read_access_result = ReadAccessResult( + objectIdentifier=objectIdentifier, + listOfResults=read_access_result_element_list + ) + if _debug: ReadPropertyMultipleApplication._debug(" - read_access_result: %r", read_access_result) + + # add it to the list + read_access_result_list.append(read_access_result) + + # this is a ReadPropertyMultiple ack + if not resp: + resp = ReadPropertyMultipleACK(context=apdu) + resp.listOfReadAccessResults = read_access_result_list + if _debug: ReadPropertyMultipleApplication._debug(" - resp: %r", resp) + + # return the result + self.response(resp) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['readPropertyMultiple'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a sample application + this_application = ReadPropertyMultipleApplication(this_device, args.ini.address) + + # make a random input object + ravo1 = RandomAnalogValueObject( + objectIdentifier=('analogValue', 1), objectName='Random1' + ) + _log.debug(" - ravo1: %r", ravo1) + + ravo2 = RandomAnalogValueObject( + objectIdentifier=('analogValue', 2), objectName='Random2' + ) + _log.debug(" - ravo2: %r", ravo2) + + # add it to the device + this_application.add_object(ravo1) + this_application.add_object(ravo2) + _log.debug(" - object list: %r", this_device.objectList) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/ReadRange.py b/samples/ReadRange.py new file mode 100755 index 0000000..adb5149 --- /dev/null +++ b/samples/ReadRange.py @@ -0,0 +1,166 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for readrange commands +which create ReadRangeRequest PDUs, then lines up the coorresponding ReadRangeACK +and prints the value. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_object_class, get_datatype + +from bacpypes.apdu import Error, AbortPDU, ReadRangeRequest, ReadRangeACK +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# ReadRangeApplication +# + +@bacpypes_debugging +class ReadRangeApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: ReadRangeApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: ReadRangeApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: ReadRangeApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + elif (isinstance(self._request, ReadRangeRequest)) and (isinstance(apdu, ReadRangeACK)): + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) + if _debug: ReadRangeApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # cast out of the single Any element into the datatype + value = apdu.itemData[0].cast_out(datatype) + + # dump it out + for i, item in enumerate(value): + sys.stdout.write("[%d]\n" % (i,)) + item.debug_contents(file=sys.stdout, indent=2) + sys.stdout.flush() + +# +# ReadRangeConsoleCmd +# + +@bacpypes_debugging +class ReadRangeConsoleCmd(ConsoleCmd): + + def do_readrange(self, args): + """readrange [ ]""" + args = args.split() + if _debug: ReadRangeConsoleCmd._debug("do_readrange %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type): + raise ValueError, "unknown object type" + + obj_inst = int(obj_inst) + + datatype = get_datatype(obj_type, prop_id) + if not datatype: + raise ValueError, "invalid property for object type" + + # build a request + request = ReadRangeRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + request.pduDestination = Address(addr) + + if len(args) == 5: + request.propertyArrayIndex = int(args[4]) + if _debug: ReadRangeConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadRangeConsoleCmd._exception("exception: %r", e) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadRangeApplication(this_device, args.ini.address) + this_console = ReadRangeConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/ReadWriteFile.py b/samples/ReadWriteFile.py new file mode 100755 index 0000000..d6d5bc5 --- /dev/null +++ b/samples/ReadWriteFile.py @@ -0,0 +1,269 @@ +#!/usr/bin/python + +""" +ReadWriteFile.py + +This application presents a 'console' prompt to the user asking for commands. + +The 'readrecord' and 'writerecord' commands are used with record oriented files, +and the 'readstream' and 'writestream' commands are used with stream oriented +files. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication + +from bacpypes.apdu import Error, AbortPDU, \ + AtomicReadFileRequest, \ + AtomicReadFileRequestAccessMethodChoice, \ + AtomicReadFileRequestAccessMethodChoiceRecordAccess, \ + AtomicReadFileRequestAccessMethodChoiceStreamAccess, \ + AtomicReadFileACK, \ + AtomicWriteFileRequest, \ + AtomicWriteFileRequestAccessMethodChoice, \ + AtomicWriteFileRequestAccessMethodChoiceRecordAccess, \ + AtomicWriteFileRequestAccessMethodChoiceStreamAccess, \ + AtomicWriteFileACK +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# reference a simple application +this_application = None + +# +# TestApplication +# + +@bacpypes_debugging +class TestApplication(BIPSimpleApplication): + + def request(self, apdu): + if _debug: TestApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: TestApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + elif (isinstance(self._request, AtomicReadFileRequest)) and (isinstance(apdu, AtomicReadFileACK)): + # suck out the record data + if apdu.accessMethod.recordAccess: + value = apdu.accessMethod.recordAccess.fileRecordData + elif apdu.accessMethod.streamAccess: + value = apdu.accessMethod.streamAccess.fileData + TestApplication._debug(" - value: %r", value) + + sys.stdout.write(repr(value) + '\n') + sys.stdout.flush() + + elif (isinstance(self._request, AtomicWriteFileRequest)) and (isinstance(apdu, AtomicWriteFileACK)): + # suck out the record data + if apdu.fileStartPosition is not None: + value = apdu.fileStartPosition + elif apdu.fileStartRecord is not None: + value = apdu.fileStartRecord + TestApplication._debug(" - value: %r", value) + + sys.stdout.write(repr(value) + '\n') + sys.stdout.flush() + +# +# TestConsoleCmd +# + +@bacpypes_debugging +class TestConsoleCmd(ConsoleCmd): + + def do_readrecord(self, args): + """readrecord """ + args = args.split() + if _debug: TestConsoleCmd._debug("do_readrecord %r", args) + + try: + addr, obj_inst, start_record, record_count = args + + obj_type = 'file' + obj_inst = int(obj_inst) + start_record = int(start_record) + record_count = int(record_count) + + # build a request + request = AtomicReadFileRequest( + fileIdentifier=(obj_type, obj_inst), + accessMethod=AtomicReadFileRequestAccessMethodChoice( + recordAccess=AtomicReadFileRequestAccessMethodChoiceRecordAccess( + fileStartRecord=start_record, + requestedRecordCount=record_count, + ), + ), + ) + request.pduDestination = Address(addr) + if _debug: TestConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + TestConsoleCmd._exception("exception: %r", e) + + def do_readstream(self, args): + """readstream """ + args = args.split() + if _debug: TestConsoleCmd._debug("do_readstream %r", args) + + try: + addr, obj_inst, start_position, octet_count = args + + obj_type = 'file' + obj_inst = int(obj_inst) + start_position = int(start_position) + octet_count = int(octet_count) + + # build a request + request = AtomicReadFileRequest( + fileIdentifier=(obj_type, obj_inst), + accessMethod=AtomicReadFileRequestAccessMethodChoice( + streamAccess=AtomicReadFileRequestAccessMethodChoiceStreamAccess( + fileStartPosition=start_position, + requestedOctetCount=octet_count, + ), + ), + ) + request.pduDestination = Address(addr) + if _debug: TestConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + TestConsoleCmd._exception("exception: %r", e) + + def do_writerecord(self, args): + """writerecord [ ... ]""" + args = args.split() + if _debug: TestConsoleCmd._debug("do_writerecord %r", args) + + try: + addr, obj_inst, start_record, record_count = args[0:4] + + obj_type = 'file' + obj_inst = int(obj_inst) + start_record = int(start_record) + record_count = int(record_count) + record_data = list(args[4:]) + + # build a request + request = AtomicWriteFileRequest( + fileIdentifier=(obj_type, obj_inst), + accessMethod=AtomicWriteFileRequestAccessMethodChoice( + recordAccess=AtomicWriteFileRequestAccessMethodChoiceRecordAccess( + fileStartRecord=start_record, + recordCount=record_count, + fileRecordData=record_data, + ), + ), + ) + request.pduDestination = Address(addr) + if _debug: TestConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + TestConsoleCmd._exception("exception: %r", e) + + def do_writestream(self, args): + """writestream """ + args = args.split() + if _debug: TestConsoleCmd._debug("do_writestream %r", args) + + try: + addr, obj_inst, start_position, data = args + + obj_type = 'file' + obj_inst = int(obj_inst) + start_position = int(start_position) + + # build a request + request = AtomicWriteFileRequest( + fileIdentifier=(obj_type, obj_inst), + accessMethod=AtomicWriteFileRequestAccessMethodChoice( + streamAccess=AtomicWriteFileRequestAccessMethodChoiceStreamAccess( + fileStartPosition=start_position, + fileData=data, + ), + ), + ) + request.pduDestination = Address(addr) + if _debug: TestConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + TestConsoleCmd._exception("exception: %r", e) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = TestApplication(this_device, args.ini.address) + TestConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/ReadWriteFileServer.py b/samples/ReadWriteFileServer.py new file mode 100755 index 0000000..8ed6e6e --- /dev/null +++ b/samples/ReadWriteFileServer.py @@ -0,0 +1,219 @@ +#!/usr/bin/python + +""" +ReadWriteFileServer.py + +This sample application is a BACnet device that has one record access file at +('file', 1) and one stream access file at ('file', 2). +""" + +import random +import string + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import FileObject, register_object_type + +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# +# Local Record Access File Object Type +# + +@bacpypes_debugging +class LocalRecordAccessFileObject(FileObject): + + def __init__(self, **kwargs): + """ Initialize a record accessed file object. """ + if _debug: + LocalRecordAccessFileObject._debug("__init__ %r", + kwargs, + ) + FileObject.__init__(self, + fileAccessMethod='recordAccess', + **kwargs + ) + + self._record_data = [ + ''.join(random.choice(string.ascii_letters) + for i in range(random.randint(10, 20))) + for j in range(random.randint(10, 20)) + ] + if _debug: LocalRecordAccessFileObject._debug(" - %d records", + len(self._record_data), + ) + + def __len__(self): + """ Return the number of records. """ + if _debug: LocalRecordAccessFileObject._debug("__len__") + + return len(self._record_data) + + def ReadFile(self, start_record, record_count): + """ Read a number of records starting at a specific record. """ + if _debug: LocalRecordAccessFileObject._debug("ReadFile %r %r", + start_record, record_count, + ) + + # end of file is true if last record is returned + end_of_file = (start_record+record_count) >= len(self._record_data) + + return end_of_file, \ + self._record_data[start_record:start_record + record_count] + + def WriteFile(self, start_record, record_count, record_data): + """ Write a number of records, starting at a specific record. """ + # check for append + if (start_record < 0): + start_record = len(self._record_data) + self._record_data.extend(record_data) + + # check to extend the file out to start_record records + elif (start_record > len(self._record_data)): + self._record_data.extend(['' for i in range(start_record - len(self._record_data))]) + start_record = len(self._record_data) + self._record_data.extend(record_data) + + # slice operation works for other cases + else: + self._record_data[start_record:start_record + record_count] = record_data + + # return where the 'writing' actually started + return start_record + +register_object_type(LocalRecordAccessFileObject) + +# +# Local Stream Access File Object Type +# + +@bacpypes_debugging +class LocalStreamAccessFileObject(FileObject): + + def __init__(self, **kwargs): + """ Initialize a stream accessed file object. """ + if _debug: + LocalStreamAccessFileObject._debug("__init__ %r", + kwargs, + ) + FileObject.__init__(self, + fileAccessMethod='streamAccess', + **kwargs + ) + + self._file_data = ''.join(random.choice(string.ascii_letters) + for i in range(random.randint(100, 200))) + if _debug: LocalRecordAccessFileObject._debug(" - %d octets", + len(self._file_data), + ) + + def __len__(self): + """ Return the number of octets in the file. """ + if _debug: LocalStreamAccessFileObject._debug("__len__") + + return len(self._file_data) + + def ReadFile(self, start_position, octet_count): + """ Read a chunk of data out of the file. """ + if _debug: LocalStreamAccessFileObject._debug("ReadFile %r %r", + start_position, octet_count, + ) + + # end of file is true if last record is returned + end_of_file = (start_position+octet_count) >= len(self._file_data) + + return end_of_file, \ + self._file_data[start_position:start_position + octet_count] + + def WriteFile(self, start_position, data): + """ Write a number of octets, starting at a specific offset. """ + # check for append + if (start_position < 0): + start_position = len(self._file_data) + self._file_data += data + + # check to extend the file out to start_record records + elif (start_position > len(self._file_data)): + self._file_data += '\0' * (start_position - len(self._file_data)) + start_position = len(self._file_data) + self._file_data += data + + # no slice assignment, strings are immutable + else: + data_len = len(data) + prechunk = self._file_data[:start_position] + postchunk = self._file_data[start_position + data_len:] + self._file_data = prechunk + data + postchunk + + # return where the 'writing' actually started + return start_position + +register_object_type(LocalStreamAccessFileObject) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + pss['atomicReadFile'] = 1 + pss['atomicWriteFile'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a sample application + this_application = BIPSimpleApplication(this_device, args.ini.address) + + # make a record access file, add to the device + f1 = LocalRecordAccessFileObject( + objectIdentifier=('file', 1), + objectName='RecordAccessFile1' + ) + _log.debug(" - f1: %r", f1) + this_application.add_object(f1) + + # make a stream access file, add to the device + f2 = LocalStreamAccessFileObject( + objectIdentifier=('file', 2), + objectName='StreamAccessFile2' + ) + _log.debug(" - f2: %r", f2) + this_application.add_object(f2) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/ReadWriteProperty.py b/samples/ReadWriteProperty.py new file mode 100755 index 0000000..87578b5 --- /dev/null +++ b/samples/ReadWriteProperty.py @@ -0,0 +1,258 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for commands. + +For 'read' commands it will create ReadPropertyRequest PDUs, then lines up the +coorresponding ReadPropertyACK and prints the value. For 'write' commands it +will create WritePropertyRequst PDUs and prints out a simple acknowledgement. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_object_class, get_datatype + +from bacpypes.apdu import Error, AbortPDU, SimpleAckPDU, \ + ReadPropertyRequest, ReadPropertyACK, WritePropertyRequest +from bacpypes.primitivedata import Null, Atomic, Integer, Unsigned, Real +from bacpypes.constructeddata import Array, Any +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# ReadPropertyApplication +# + +@bacpypes_debugging +class ReadPropertyApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: ReadPropertyApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: ReadPropertyApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: ReadPropertyApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + if isinstance(apdu, SimpleAckPDU): + sys.stdout.write("ack\n") + sys.stdout.flush() + + elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) + if _debug: ReadPropertyApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): + if apdu.propertyArrayIndex == 0: + value = apdu.propertyValue.cast_out(Unsigned) + else: + value = apdu.propertyValue.cast_out(datatype.subtype) + else: + value = apdu.propertyValue.cast_out(datatype) + if _debug: ReadPropertyApplication._debug(" - value: %r", value) + + sys.stdout.write(str(value) + '\n') + sys.stdout.flush() + +# +# ReadWritePropertyConsoleCmd +# + +@bacpypes_debugging +class ReadWritePropertyConsoleCmd(ConsoleCmd): + + def do_read(self, args): + """read [ ]""" + args = args.split() + if _debug: ReadWritePropertyConsoleCmd._debug("do_read %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type): + raise ValueError, "unknown object type" + + obj_inst = int(obj_inst) + + datatype = get_datatype(obj_type, prop_id) + if not datatype: + raise ValueError, "invalid property for object type" + + # build a request + request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + request.pduDestination = Address(addr) + + if len(args) == 5: + request.propertyArrayIndex = int(args[4]) + if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadWritePropertyConsoleCmd._exception("exception: %r", e) + + def do_write(self, args): + """write [ ] [ ]""" + args = args.split() + ReadWritePropertyConsoleCmd._debug("do_write %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + if obj_type.isdigit(): + obj_type = int(obj_type) + obj_inst = int(obj_inst) + value = args[4] + + indx = None + if len(args) >= 6: + if args[5] != "-": + indx = int(args[5]) + if _debug: ReadWritePropertyConsoleCmd._debug(" - indx: %r", indx) + + priority = None + if len(args) >= 7: + priority = int(args[6]) + if _debug: ReadWritePropertyConsoleCmd._debug(" - priority: %r", priority) + + # get the datatype + datatype = get_datatype(obj_type, prop_id) + if _debug: ReadWritePropertyConsoleCmd._debug(" - datatype: %r", datatype) + + # change atomic values into something encodeable, null is a special case + if (value == 'null'): + value = Null() + elif issubclass(datatype, Atomic): + if datatype is Integer: + value = int(value) + elif datatype is Real: + value = float(value) + elif datatype is Unsigned: + value = int(value) + value = datatype(value) + elif issubclass(datatype, Array) and (indx is not None): + if indx == 0: + value = Integer(value) + elif issubclass(datatype.subtype, Atomic): + value = datatype.subtype(value) + elif not isinstance(value, datatype.subtype): + raise TypeError, "invalid result datatype, expecting %s" % (datatype.subtype.__name__,) + elif not isinstance(value, datatype): + raise TypeError, "invalid result datatype, expecting %s" % (datatype.__name__,) + if _debug: ReadWritePropertyConsoleCmd._debug(" - encodeable value: %r %s", value, type(value)) + + # build a request + request = WritePropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id + ) + request.pduDestination = Address(addr) + + # save the value + request.propertyValue = Any() + try: + request.propertyValue.cast_in(value) + except Exception, e: + ReadWritePropertyConsoleCmd._exception("WriteProperty cast error: %r", e) + + # optional array index + if indx is not None: + request.propertyArrayIndex = indx + + # optional priority + if priority is not None: + request.priority = priority + + if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadWritePropertyConsoleCmd._exception("exception: %r", e) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadPropertyApplication(this_device, args.ini.address) + this_console = ReadWritePropertyConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/RecurringMultipleReadProperty.py b/samples/RecurringMultipleReadProperty.py new file mode 100644 index 0000000..e1c24ae --- /dev/null +++ b/samples/RecurringMultipleReadProperty.py @@ -0,0 +1,198 @@ +#!/usr/bin/python + +""" +Mutliple Read Property + +This application has a static list of points that it would like to read. It reads the +values of each of them in turn and then quits. +""" + +from collections import deque + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run, deferred +from bacpypes.task import RecurringTask + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_datatype + +from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK +from bacpypes.primitivedata import Unsigned +from bacpypes.constructeddata import Array +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# point list +point_list = [ + ('1.2.3.4', 'analogValue', 1, 'presentValue'), + ('1.2.3.4', 'analogValue', 2, 'presentValue'), + ] + +# +# PrairieDog +# + +@bacpypes_debugging +class PrairieDog(BIPSimpleApplication, RecurringTask): + + def __init__(self, interval, *args): + if _debug: PrairieDog._debug("__init__ %r, %r", interval, args) + BIPSimpleApplication.__init__(self, *args) + RecurringTask.__init__(self, interval * 1000) + + # keep track of requests to line up responses + self._request = None + + # start out idle + self.is_busy = False + self.point_queue = deque() + self.response_values = [] + + # install it + self.install_task() + + def process_task(self): + if _debug: PrairieDog._debug("process_task") + global point_list + + # check to see if we're idle + if self.is_busy: + if _debug: PrairieDog._debug(" - busy") + return + + # now we are busy + self.is_busy = True + + # turn the point list into a queue + self.point_queue = deque(point_list) + + # clean out the list of the response values + self.response_values = [] + + # fire off the next request + self.next_request() + + def next_request(self): + if _debug: PrairieDog._debug("next_request") + + # check to see if we're done + if not self.point_queue: + if _debug: PrairieDog._debug(" - done") + + # dump out the results + for request, response in zip(point_list, self.response_values): + print request, response + + # no longer busy + self.is_busy = False + + return + + # get the next request + addr, obj_type, obj_inst, prop_id = self.point_queue.popleft() + + # build a request + self._request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + self._request.pduDestination = Address(addr) + if _debug: PrairieDog._debug(" - request: %r", self._request) + + # forward it along + BIPSimpleApplication.request(self, self._request) + + def confirmation(self, apdu): + if _debug: PrairieDog._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + if _debug: PrairieDog._debug(" - error: %r", apdu) + self.response_values.append(apdu) + + elif isinstance(apdu, AbortPDU): + if _debug: PrairieDog._debug(" - abort: %r", apdu) + self.response_values.append(apdu) + + elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) + if _debug: PrairieDog._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): + if apdu.propertyArrayIndex == 0: + value = apdu.propertyValue.cast_out(Unsigned) + else: + value = apdu.propertyValue.cast_out(datatype.subtype) + else: + value = apdu.propertyValue.cast_out(datatype) + if _debug: PrairieDog._debug(" - value: %r", value) + + # save the value + self.response_values.append(value) + + # fire off another request + deferred(self.next_request) + +# +# __main__ +# + +try: + # parse the command line arguments + parser = ConfigArgumentParser(description=__doc__) + + # add an argument for interval + parser.add_argument('interval', type=int, + help='repeat rate in seconds', + ) + + # now parse the arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a dog + this_application = PrairieDog(args.interval, this_device, args.ini.address) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/RecurringTask.py b/samples/RecurringTask.py new file mode 100755 index 0000000..fafb56c --- /dev/null +++ b/samples/RecurringTask.py @@ -0,0 +1,74 @@ +#!/usr/bin/python + +""" +This application demonstrates doing something at a regular interval. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run +from bacpypes.task import RecurringTask + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# +# PrairieDog +# + +@bacpypes_debugging +class PrairieDog(RecurringTask): + + def __init__(self, dog_number, interval): + if _debug: PrairieDog._debug("__init__ %r %r", dog_number, interval) + + # save the identity + self.dog_number = dog_number + + # this is a recurring task + RecurringTask.__init__(self, interval) + + # install it + self.install_task() + + def process_task(self): + if _debug: PrairieDog._debug("process_task") + + sys.stdout.write("%d woof!\n" % (self.dog_number,)) + +# +# __main__ +# + +try: + # parse the command line arguments + parser = ConfigArgumentParser(description=__doc__) + + # add an argument for seconds per dog + parser.add_argument('seconds', metavar='N', type=int, nargs='+', + help='number of seconds for each dog', + ) + + # now parse the arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make some dogs + for i, sec in enumerate(args.seconds): + dog = PrairieDog(i, sec * 1000) + if _debug: _log.debug(" - dog: %r", dog) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/SampleApplication.py b/samples/SampleApplication.py new file mode 100755 index 0000000..07cc403 --- /dev/null +++ b/samples/SampleApplication.py @@ -0,0 +1,87 @@ +#!/usr/bin/python + +""" +This sample application is the simplest BACpypes application that is +a complete stack. Using an INI file it will configure a +LocalDeviceObject, create a SampleApplication instance, and run, +waiting for a keyboard interrupt or a TERM signal to quit. There is no +input or output for this application, but by adding --debug to the +command line when it is run you can check the behavior of the stack by +seeing what is sent and received. +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# +# SampleApplication +# + +@bacpypes_debugging +class SampleApplication(BIPSimpleApplication): + + def __init__(self, device, address): + if _debug: SampleApplication._debug("__init__ %r %r", device, address) + BIPSimpleApplication.__init__(self, device, address) + + def request(self, apdu): + if _debug: SampleApplication._debug("request %r", apdu) + BIPSimpleApplication.request(self, apdu) + + def indication(self, apdu): + if _debug: SampleApplication._debug("indication %r", apdu) + BIPSimpleApplication.indication(self, apdu) + + def response(self, apdu): + if _debug: SampleApplication._debug("response %r", apdu) + BIPSimpleApplication.response(self, apdu) + + def confirmation(self, apdu): + if _debug: SampleApplication._debug("confirmation %r", apdu) + BIPSimpleApplication.confirmation(self, apdu) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + vendorName="Hello", + ) + + # make a sample application + this_application = SampleApplication(this_device, args.ini.address) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/SampleConsoleCmd.py b/samples/SampleConsoleCmd.py new file mode 100755 index 0000000..4345207 --- /dev/null +++ b/samples/SampleConsoleCmd.py @@ -0,0 +1,99 @@ +#!/usr/bin/python + +""" +This sample application is a simple BACpypes application that +presents a console prompt. Almost identical to the SampleApplication, +the BACnet application is minimal, but with the console commands +that match the command line options like 'buggers' and 'debug' the +user can add debugging "on the fly". +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# SampleApplication +# + +@bacpypes_debugging +class SampleApplication(BIPSimpleApplication): + + def __init__(self, device, address): + if _debug: SampleApplication._debug("__init__ %r %r", device, address) + BIPSimpleApplication.__init__(self, device, address) + + def request(self, apdu): + if _debug: SampleApplication._debug("request %r", apdu) + BIPSimpleApplication.request(self, apdu) + + def indication(self, apdu): + if _debug: SampleApplication._debug("indication %r", apdu) + BIPSimpleApplication.indication(self, apdu) + + def response(self, apdu): + if _debug: SampleApplication._debug("response %r", apdu) + BIPSimpleApplication.response(self, apdu) + + def confirmation(self, apdu): + if _debug: SampleApplication._debug("confirmation %r", apdu) + BIPSimpleApplication.confirmation(self, apdu) + +# +# SampleConsoleCmd +# + +@bacpypes_debugging +class SampleConsoleCmd(ConsoleCmd): + + def do_nothing(self, args): + """nothing can be done""" + args = args.split() + if _debug: SampleConsoleCmd._debug("do_nothing %r", args) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a sample application + this_application = SampleApplication(this_device, args.ini.address) + this_console = SampleConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/VendorAVObject.py b/samples/VendorAVObject.py new file mode 100755 index 0000000..974c704 --- /dev/null +++ b/samples/VendorAVObject.py @@ -0,0 +1,135 @@ +#!/usr/bin/python + +""" +This sample application shows how to extend one of the basic objects, an Analog +Value Object in this case, to provide a present value. This type of code is used +when the application is providing a BACnet interface to a collection of data. +It assumes that almost all of the default behaviour of a BACpypes application is +sufficient. +""" + +import random + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.primitivedata import Real +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import AnalogValueObject, Property, register_object_type +from bacpypes.errors import ExecutionError + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +vendor_id = 999 + +# +# RandomValueProperty +# + +@bacpypes_debugging +class RandomValueProperty(Property): + + def __init__(self, identifier): + if _debug: RandomValueProperty._debug("__init__ %r", identifier) + Property.__init__(self, identifier, Real, default=None, optional=True, mutable=False) + + # writing to this property changes the multiplier + self.multiplier = 100.0 + + def ReadProperty(self, obj, arrayIndex=None): + if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex) + + # access an array + if arrayIndex is not None: + raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray') + + # return a random value + value = random.random() * self.multiplier + if _debug: RandomValueProperty._debug(" - value: %r", value) + + return value + + def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False): + if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct) + + # change the multiplier + self.multiplier = value + +# +# Vendor Analog Value Object Type +# + +@bacpypes_debugging +class VendorAVObject(AnalogValueObject): + objectType = 513 + + properties = [ + RandomValueProperty(5504), + ] + + def __init__(self, **kwargs): + if _debug: VendorAVObject._debug("__init__ %r", kwargs) + AnalogValueObject.__init__(self, **kwargs) + +register_object_type(VendorAVObject, vendor_id=vendor_id) + +# +# main +# + +@bacpypes_debugging +def main(): + if _debug: main._debug("initialization") + + try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: main._debug("initialization") + if _debug: main._debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=vendor_id, + ) + + # make a sample application + this_application = BIPSimpleApplication(this_device, args.ini.address) + + # make some objects + ravo1 = VendorAVObject( + objectIdentifier=(513, 1), objectName='Random1' + ) + if _debug: main._debug(" - ravo1: %r", ravo1) + + ravo2 = VendorAVObject( + objectIdentifier=(513, 2), objectName='Random2' + ) + if _debug: main._debug(" - ravo2: %r", ravo2) + + # add it to the device + this_application.add_object(ravo1) + this_application.add_object(ravo2) + if _debug: main._debug(" - object list: %r", this_device.objectList) + + if _debug: main._debug("running") + + run() + + except Exception, e: + main._exception("an error has occurred: %s", e) + finally: + if _debug: main._debug("finally") + +if __name__ == '__main__': + main() + diff --git a/samples/VendorReadWriteProperty.py b/samples/VendorReadWriteProperty.py new file mode 100755 index 0000000..3489526 --- /dev/null +++ b/samples/VendorReadWriteProperty.py @@ -0,0 +1,285 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for commands. + +For 'read' commands it will create ReadPropertyRequest PDUs, then lines up the +coorresponding ReadPropertyACK and prints the value. For 'write' commands it +will create WritePropertyRequst PDUs and prints out a simple acknowledgement. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_object_class, get_datatype + +from bacpypes.apdu import Error, AbortPDU, SimpleAckPDU, \ + ReadPropertyRequest, ReadPropertyACK, WritePropertyRequest +from bacpypes.primitivedata import Null, Atomic, Integer, Unsigned, Real +from bacpypes.constructeddata import Array, Any +from bacpypes.basetypes import ServicesSupported + +import VendorAVObject + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# +# ReadPropertyApplication +# + +@bacpypes_debugging +class ReadPropertyApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: ReadPropertyApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: ReadPropertyApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: ReadPropertyApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + if isinstance(apdu, SimpleAckPDU): + sys.stdout.write("ack\n") + sys.stdout.flush() + + elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier, VendorAVObject.vendor_id) + if _debug: ReadPropertyApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): + if apdu.propertyArrayIndex == 0: + value = apdu.propertyValue.cast_out(Unsigned) + else: + value = apdu.propertyValue.cast_out(datatype.subtype) + else: + value = apdu.propertyValue.cast_out(datatype) + if _debug: ReadPropertyApplication._debug(" - value: %r", value) + + sys.stdout.write(str(value) + '\n') + sys.stdout.flush() + +# +# ReadWritePropertyConsoleCmd +# + +@bacpypes_debugging +class ReadWritePropertyConsoleCmd(ConsoleCmd): + + def do_read(self, args): + """read [ ]""" + global this_application + + args = args.split() + if _debug: ReadWritePropertyConsoleCmd._debug("do_read %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type, VendorAVObject.vendor_id): + raise ValueError, "unknown object type" + if _debug: ReadWritePropertyConsoleCmd._debug(" - obj_type: %r", obj_type) + + obj_inst = int(obj_inst) + if _debug: ReadWritePropertyConsoleCmd._debug(" - obj_inst: %r", obj_inst) + + if prop_id.isdigit(): + prop_id = int(prop_id) + if _debug: ReadWritePropertyConsoleCmd._debug(" - prop_id: %r", prop_id) + + datatype = get_datatype(obj_type, prop_id, VendorAVObject.vendor_id) + if not datatype: + raise ValueError, "invalid property for object type" + + # build a request + request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + request.pduDestination = Address(addr) + + if len(args) == 5: + request.propertyArrayIndex = int(args[4]) + if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadWritePropertyConsoleCmd._exception("exception: %r", e) + + def do_write(self, args): + """write [ ] [ ]""" + global this_application + + args = args.split() + ReadWritePropertyConsoleCmd._debug("do_write %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type, VendorAVObject.vendor_id): + raise ValueError, "unknown object type" + if _debug: ReadWritePropertyConsoleCmd._debug(" - obj_type: %r", obj_type) + + obj_inst = int(obj_inst) + if _debug: ReadWritePropertyConsoleCmd._debug(" - obj_inst: %r", obj_inst) + + if prop_id.isdigit(): + prop_id = int(prop_id) + if _debug: ReadWritePropertyConsoleCmd._debug(" - prop_id: %r", prop_id) + + value = args[4] + + indx = None + if len(args) >= 6: + if args[5] != "-": + indx = int(args[5]) + if _debug: ReadWritePropertyConsoleCmd._debug(" - indx: %r", indx) + + priority = None + if len(args) >= 7: + priority = int(args[6]) + if _debug: ReadWritePropertyConsoleCmd._debug(" - priority: %r", priority) + + # get the datatype + datatype = get_datatype(obj_type, prop_id, VendorAVObject.vendor_id) + if _debug: ReadWritePropertyConsoleCmd._debug(" - datatype: %r", datatype) + + # change atomic values into something encodeable, null is a special case + if (value == 'null'): + value = Null() + elif issubclass(datatype, Atomic): + if datatype is Integer: + value = int(value) + elif datatype is Real: + value = float(value) + elif datatype is Unsigned: + value = int(value) + value = datatype(value) + elif issubclass(datatype, Array) and (indx is not None): + if indx == 0: + value = Integer(value) + elif issubclass(datatype.subtype, Atomic): + value = datatype.subtype(value) + elif not isinstance(value, datatype.subtype): + raise TypeError, "invalid result datatype, expecting %s" % (datatype.subtype.__name__,) + elif not isinstance(value, datatype): + raise TypeError, "invalid result datatype, expecting %s" % (datatype.__name__,) + if _debug: ReadWritePropertyConsoleCmd._debug(" - encodeable value: %r %s", value, type(value)) + + # build a request + request = WritePropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id + ) + request.pduDestination = Address(addr) + + # save the value + request.propertyValue = Any() + try: + request.propertyValue.cast_in(value) + except Exception, e: + ReadWritePropertyConsoleCmd._exception("WriteProperty cast error: %r", e) + + # optional array index + if indx is not None: + request.propertyArrayIndex = indx + + # optional priority + if priority is not None: + request.priority = priority + + if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadWritePropertyConsoleCmd._exception("exception: %r", e) + +# +# main +# + +@bacpypes_debugging +def main(): + if _debug: main._debug("initialization") + global this_application + + try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: main._debug("initialization") + if _debug: main._debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadPropertyApplication(this_device, args.ini.address) + this_console = ReadWritePropertyConsoleCmd() + + main._debug("running") + + run() + + except Exception, e: + main._exception("an error has occurred: %s", e) + finally: + main._debug("finally") + +if __name__ == '__main__': + main() + diff --git a/samples/WhoHasIHaveApplication.py b/samples/WhoHasIHaveApplication.py new file mode 100755 index 0000000..a02e2b4 --- /dev/null +++ b/samples/WhoHasIHaveApplication.py @@ -0,0 +1,116 @@ +#!/usr/bin/python + +""" +This sample application builds on the first sample by overriding the default +processing for Who-Has and I-Have requests, counting them, then continuing on +with the regular processing. After the run() function has completed it will +dump a formatted summary of the requests it has received. Note that these +services are relatively rare even in large networks. +""" + +from collections import defaultdict + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# counters +who_has_counter = defaultdict(int) +i_have_counter = defaultdict(int) + +# +# WhoHasIHaveApplication +# + +@bacpypes_debugging +class WhoHasIHaveApplication(BIPSimpleApplication): + + def __init__(self, device, address): + if _debug: WhoHasIHaveApplication._debug("__init__ %r %r", device, address) + BIPSimpleApplication.__init__(self, device, address) + + def do_WhoHasRequest(self, apdu): + """Respond to a Who-Has request.""" + if _debug: WhoHasIHaveApplication._debug("do_WhoHasRequest, %r", apdu) + + key = (str(apdu.pduSource),) + if apdu.object.objectIdentifier is not None: + key += (str(apdu.object.objectIdentifier),) + elif apdu.object.objectName is not None: + key += (apdu.object.objectName,) + else: + print "(rejected APDU:" + apdu.debug_contents() + print ")" + return + + # count the times this has been received + who_has_counter[key] += 1 + + def do_IHaveRequest(self, apdu): + """Respond to a I-Have request.""" + if _debug: WhoHasIHaveApplication._debug("do_IHaveRequest %r", apdu) + + key = ( + str(apdu.pduSource), + str(apdu.deviceIdentifier), + str(apdu.objectIdentifier), + apdu.objectName + ) + + # count the times this has been received + i_have_counter[key] += 1 + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a sample application + this_application = WhoHasIHaveApplication(this_device, args.ini.address) + + _log.debug("running") + + # run until stopped, ^C works + run() + + print "----- Who Has -----" + for (src, objname), count in sorted(who_has_counter.items()): + print "%-20s %-30s %4d" % (src, objname, count) + print + + print "----- I Have -----" + for (src, devid, objid, objname), count in sorted(i_have_counter.items()): + print "%-20s %-20s %-20s %-20s %4d" % (src, devid, objid, objname, count) + print + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/WhoIsIAm.py b/samples/WhoIsIAm.py new file mode 100755 index 0000000..e51af18 --- /dev/null +++ b/samples/WhoIsIAm.py @@ -0,0 +1,185 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for Who-Is and I-Am +commands which create the related APDUs, then lines up the coorresponding I-Am +for incoming traffic and prints out the contents. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address, GlobalBroadcast +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication + +from bacpypes.apdu import WhoIsRequest, IAmRequest +from bacpypes.basetypes import ServicesSupported +from bacpypes.errors import DecodingError + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# WhoIsIAmApplication +# + +@bacpypes_debugging +class WhoIsIAmApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: WhoIsIAmApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: WhoIsIAmApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: WhoIsIAmApplication._debug("confirmation %r", apdu) + + # forward it along + BIPSimpleApplication.confirmation(self, apdu) + + def indication(self, apdu): + if _debug: WhoIsIAmApplication._debug("indication %r", apdu) + + if (isinstance(self._request, WhoIsRequest)) and (isinstance(apdu, IAmRequest)): + device_type, device_instance = apdu.iAmDeviceIdentifier + if device_type != 'device': + raise DecodingError, "invalid object type" + + if (self._request.deviceInstanceRangeLowLimit is not None) and \ + (device_instance < self._request.deviceInstanceRangeLowLimit): + pass + elif (self._request.deviceInstanceRangeHighLimit is not None) and \ + (device_instance > self._request.deviceInstanceRangeHighLimit): + pass + else: + # print out the contents + sys.stdout.write('pduSource = ' + repr(apdu.pduSource) + '\n') + sys.stdout.write('iAmDeviceIdentifier = ' + str(apdu.iAmDeviceIdentifier) + '\n') + sys.stdout.write('maxAPDULengthAccepted = ' + str(apdu.maxAPDULengthAccepted) + '\n') + sys.stdout.write('segmentationSupported = ' + str(apdu.segmentationSupported) + '\n') + sys.stdout.write('vendorID = ' + str(apdu.vendorID) + '\n') + sys.stdout.flush() + + # forward it along + BIPSimpleApplication.indication(self, apdu) + +# +# WhoIsIAmConsoleCmd +# + +@bacpypes_debugging +class WhoIsIAmConsoleCmd(ConsoleCmd): + + def do_whois(self, args): + """whois [ ] [ ]""" + args = args.split() + if _debug: WhoIsIAmConsoleCmd._debug("do_whois %r", args) + + try: + # build a request + request = WhoIsRequest() + if (len(args) == 1) or (len(args) == 3): + request.pduDestination = Address(args[0]) + del args[0] + else: + request.pduDestination = GlobalBroadcast() + + if len(args) == 2: + request.deviceInstanceRangeLowLimit = int(args[0]) + request.deviceInstanceRangeHighLimit = int(args[1]) + if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + WhoIsIAmConsoleCmd._exception("exception: %r", e) + + def do_iam(self, args): + """iam""" + args = args.split() + if _debug: WhoIsIAmConsoleCmd._debug("do_iam %r", args) + + try: + # build a request + request = IAmRequest() + request.pduDestination = GlobalBroadcast() + + # set the parameters from the device object + request.iAmDeviceIdentifier = this_device.objectIdentifier + request.maxAPDULengthAccepted = this_device.maxApduLengthAccepted + request.segmentationSupported = this_device.segmentationSupported + request.vendorID = this_device.vendorIdentifier + if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + WhoIsIAmConsoleCmd._exception("exception: %r", e) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = WhoIsIAmApplication(this_device, args.ini.address) + this_console = WhoIsIAmConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/WhoIsIAmApplication.py b/samples/WhoIsIAmApplication.py new file mode 100755 index 0000000..c795f8c --- /dev/null +++ b/samples/WhoIsIAmApplication.py @@ -0,0 +1,112 @@ +#!/usr/bin/python + +""" +This sample application builds on the first sample by overriding the default +processing for Who-Is and I-Am requests, counting them, then continuing on +with the regular processing. After the run() function has completed it will +dump a formatted summary of the requests it has received. +""" + +from collections import defaultdict + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# counters +who_is_counter = defaultdict(int) +i_am_counter = defaultdict(int) + +# +# WhoIsIAmApplication +# + +@bacpypes_debugging +class WhoIsIAmApplication(BIPSimpleApplication): + + def __init__(self, device, address): + if _debug: WhoIsIAmApplication._debug("__init__ %r %r", device, address) + BIPSimpleApplication.__init__(self, device, address) + + def do_WhoIsRequest(self, apdu): + """Respond to a Who-Is request.""" + if _debug: WhoIsIAmApplication._debug("do_WhoIsRequest %r", apdu) + + # build a key from the source and parameters + key = (str(apdu.pduSource), + apdu.deviceInstanceRangeLowLimit, + apdu.deviceInstanceRangeHighLimit, + ) + + # count the times this has been received + who_is_counter[key] += 1 + + # pass back to the default implementation + BIPSimpleApplication.do_WhoIsRequest(self, apdu) + + def do_IAmRequest(self, apdu): + """Given an I-Am request, cache it.""" + if _debug: WhoIsIAmApplication._debug("do_IAmRequest %r", apdu) + + # build a key from the source, just use the instance number + key = (str(apdu.pduSource), + apdu.iAmDeviceIdentifier[1], + ) + + # count the times this has been received + i_am_counter[key] += 1 + + # no default implementation + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a sample application + this_application = WhoIsIAmApplication(this_device, args.ini.address) + + _log.debug("running") + + run() + + print "----- Who Is -----" + for (src, lowlim, hilim), count in sorted(who_is_counter.items()): + print "%-20s %8s %8s %4d" % (src, lowlim, hilim, count) + print + + print "----- I Am -----" + for (src, devid), count in sorted(i_am_counter.items()): + print "%-20s %8d %4d" % (src, devid, count) + print + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/WhoIsRouter.py b/samples/WhoIsRouter.py new file mode 100755 index 0000000..9759431 --- /dev/null +++ b/samples/WhoIsRouter.py @@ -0,0 +1,128 @@ +#!/usr/bin/python + +""" +This sample application has just a network stack, not a full application, +and is a way to create InitializeRoutingTable and WhoIsRouterToNetwork requests. +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.npdu import InitializeRoutingTable, WhoIsRouterToNetwork +from bacpypes.app import BIPNetworkApplication + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_application = None +this_console = None + +# +# WhoIsRouterApplication +# + +@bacpypes_debugging +class WhoIsRouterApplication(BIPNetworkApplication): + + def __init__(self, *args): + if _debug: WhoIsRouterApplication._debug("__init__ %r", args) + BIPNetworkApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, adapter, npdu): + if _debug: WhoIsRouterApplication._debug("request %r %r", adapter, npdu) + + # save a copy of the request + self._request = npdu + + # forward it along + BIPNetworkApplication.request(self, adapter, npdu) + + def indication(self, adapter, npdu): + if _debug: WhoIsRouterApplication._debug("indication %r %r", adapter, npdu) + BIPNetworkApplication.indication(self, adapter, npdu) + + def response(self, adapter, npdu): + if _debug: WhoIsRouterApplication._debug("response %r %r", adapter, npdu) + BIPNetworkApplication.response(self, adapter, npdu) + + def confirmation(self, adapter, npdu): + if _debug: WhoIsRouterApplication._debug("confirmation %r %r", adapter, npdu) + BIPNetworkApplication.confirmation(self, adapter, npdu) + +# +# WhoIsRouterConsoleCmd +# + +@bacpypes_debugging +class WhoIsRouterConsoleCmd(ConsoleCmd): + + def do_irt(self, args): + """irt """ + args = args.split() + if _debug: WhoIsRouterConsoleCmd._debug("do_irt %r", args) + + # build a request + try: + request = InitializeRoutingTable() + request.pduDestination = Address(args[0]) + except: + print "invalid arguments" + return + + # give it to the application + this_application.request(this_application.nsap.adapters[0], request) + + def do_wirtn(self, args): + """wirtn [ ]""" + args = args.split() + if _debug: WhoIsRouterConsoleCmd._debug("do_irt %r", args) + + # build a request + try: + request = WhoIsRouterToNetwork() + request.pduDestination = Address(args[0]) + if (len(args) > 1): + request.wirtnNetwork = int(args[1]) + except: + print "invalid arguments" + return + + # give it to the application + this_application.request(this_application.nsap.adapters[0], request) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a simple application + this_application = WhoIsRouterApplication(args.ini.address) + if _debug: _log.debug(" - this_application: %r", this_application) + + # make a console + this_console = WhoIsRouterConsoleCmd() + if _debug: _log.debug(" - this_console: %r", this_console) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") From 985c69dd7e76b1c9c7b25100deecfe4350e33435 Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Tue, 25 Aug 2015 22:10:03 -0400 Subject: [PATCH 2/7] Python 2.5 has neither bytes nor bytearray --- py25/bacpypes/primitivedata.py | 62 ++++++++++++++++------------------ 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/py25/bacpypes/primitivedata.py b/py25/bacpypes/primitivedata.py index d00d9dd..aa5cd1d 100755 --- a/py25/bacpypes/primitivedata.py +++ b/py25/bacpypes/primitivedata.py @@ -69,10 +69,8 @@ class Tag(object): def set(self, tclass, tnum, tlvt=0, tdata=''): """set the values of the tag.""" - if isinstance(tdata, bytearray): - tdata = bytes(tdata) - elif not isinstance(tdata, bytes): - raise TypeError("tag data must be bytes or bytearray") + if not isinstance(tdata, str): + raise TypeError("tag data must be string") self.tagClass = tclass self.tagNumber = tnum @@ -81,10 +79,8 @@ class Tag(object): def set_app_data(self, tnum, tdata): """set the values of the tag.""" - if isinstance(tdata, bytearray): - tdata = bytes(tdata) - elif not isinstance(tdata, bytes): - raise TypeError("tag data must be bytes or bytearray") + if not isinstance(tdata, str): + raise TypeError("tag data must be string") self.tagClass = Tag.applicationTagClass self.tagNumber = tnum @@ -174,7 +170,7 @@ class Tag(object): # application tagged boolean now has data if (self.tagNumber == Tag.booleanAppTag): - return ContextTag(context, bytearray([self.tagLVT])) + return ContextTag(context, chr(self.tagLVT)) else: return ContextTag(context, self.tagData) @@ -563,11 +559,11 @@ class Unsigned(Atomic): def encode(self, tag): # rip apart the number - data = bytearray(struct.pack('>L', self.value)) + data = struct.pack('>L', self.value) # reduce the value to the smallest number of octets - while (len(data) > 1) and (data[0] == 0): - del data[0] + while (len(data) > 1) and (data[0] == '\x00'): + data = data[1:] # encode the tag tag.set_app_data(Tag.unsignedAppTag, data) @@ -613,24 +609,24 @@ class Integer(Atomic): def encode(self, tag): # rip apart the number - data = bytearray(struct.pack('>I', self.value & 0xFFFFFFFF)) + data = struct.pack('>I', self.value & 0xFFFFFFFF) - # reduce the value to the smallest number of bytes, be + # reduce the value to the smallest number of octets, be # careful about sign extension if self.value < 0: while (len(data) > 1): - if (data[0] != 255): + if (data[0] != '\xFF'): break - if (data[1] < 128): + if (data[1] < '\x80'): break - del data[0] + data = data[1:] else: while (len(data) > 1): - if (data[0] != 0): + if (data[0] != '\x00'): break - if (data[1] >= 128): + if (data[1] >= '\x80'): break - del data[0] + data = data[1:] # encode the tag tag.set_app_data(Tag.integerAppTag, data) @@ -640,7 +636,7 @@ class Integer(Atomic): raise ValueError("integer application tag required") # byte array easier to deal with - tag_data = bytearray(tag.tagData) + tag_data = [ord(c) for c in tag.tagData] # get the data rslt = tag_data[0] @@ -746,8 +742,8 @@ class OctetString(Atomic): pass elif isinstance(arg, Tag): self.decode(arg) - elif isinstance(arg, (bytes, bytearray)): - self.value = bytes(arg) + elif isinstance(arg, str): + self.value = arg elif isinstance(arg, OctetString): self.value = arg.value else: @@ -801,10 +797,10 @@ class CharacterString(Atomic): raise ValueError("character string application tag required") # byte array easier to deal with - tag_data = bytearray(tag.tagData) + tag_data = tag.tagData # extract the data - self.strEncoding = tag_data[0] + self.strEncoding = ord(tag_data[0]) self.strValue = tag_data[1:] # normalize the value @@ -870,7 +866,7 @@ class BitString(Atomic): unused = used and (8 - used) or 0 # start with the number of unused bits - data = bytearray([unused]) + data = [unused] # build and append each packed octet bits = self.value + [0] * unused @@ -881,13 +877,13 @@ class BitString(Atomic): data.append(x) # encode the tag - tag.set_app_data(Tag.bitStringAppTag, data) + tag.set_app_data(Tag.bitStringAppTag, ''.join(chr(c) for c in data)) def decode(self, tag): if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.bitStringAppTag): raise ValueError("bit string application tag required") - tag_data = bytearray(tag.tagData) + tag_data = [ord(c) for c in tag.tagData] # extract the number of unused bits unused = tag_data[0] @@ -1055,11 +1051,11 @@ class Enumerated(Atomic): raise TypeError("%s is an invalid enumeration value datatype" % (type(self.value),)) # rip apart the number - data = bytearray(struct.pack('>L', value)) + data = struct.pack('>L', value) # reduce the value to the smallest number of octets - while (len(data) > 1) and (data[0] == 0): - del data[0] + while (len(data) > 1) and (data[0] == '\x00'): + data = data[1:] # encode the tag tag.set_app_data(Tag.enumeratedAppTag, data) @@ -1188,7 +1184,7 @@ class Date(Atomic): def encode(self, tag): # encode the tag - tag.set_app_data(Tag.dateAppTag, bytearray(self.value)) + tag.set_app_data(Tag.dateAppTag, ''.join(chr(c) for c in self.value)) def decode(self, tag): if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.dateAppTag): @@ -1280,7 +1276,7 @@ class Time(Atomic): def encode(self, tag): # encode the tag - tag.set_app_data(Tag.timeAppTag, bytearray(self.value)) + tag.set_app_data(Tag.timeAppTag, ''.join(chr(c) for c in self.value)) def decode(self, tag): if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.timeAppTag): From 37c665000a24143e375b23a60a67874ff86c5afb Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Tue, 25 Aug 2015 22:12:33 -0400 Subject: [PATCH 3/7] minor changes to allow these to continue to work in py25 --- samples/ReadProperty.py | 6 ++++-- samples/SampleApplication.py | 3 ++- samples/SampleConsoleCmd.py | 6 ++++-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/samples/ReadProperty.py b/samples/ReadProperty.py index 04c6e42..b2972a4 100755 --- a/samples/ReadProperty.py +++ b/samples/ReadProperty.py @@ -36,7 +36,6 @@ this_console = None # ReadPropertyApplication # -@bacpypes_debugging class ReadPropertyApplication(BIPSimpleApplication): def __init__(self, *args): @@ -87,11 +86,12 @@ class ReadPropertyApplication(BIPSimpleApplication): value.debug_contents(file=sys.stdout) sys.stdout.flush() +bacpypes_debugging(ReadPropertyApplication) + # # ReadPropertyConsoleCmd # -@bacpypes_debugging class ReadPropertyConsoleCmd(ConsoleCmd): def do_read(self, args): @@ -130,6 +130,8 @@ class ReadPropertyConsoleCmd(ConsoleCmd): except Exception, e: ReadPropertyConsoleCmd._exception("exception: %r", e) +bacpypes_debugging(ReadPropertyConsoleCmd) + # # __main__ # diff --git a/samples/SampleApplication.py b/samples/SampleApplication.py index 07cc403..7f39f4a 100755 --- a/samples/SampleApplication.py +++ b/samples/SampleApplication.py @@ -29,7 +29,6 @@ this_application = None # SampleApplication # -@bacpypes_debugging class SampleApplication(BIPSimpleApplication): def __init__(self, device, address): @@ -52,6 +51,8 @@ class SampleApplication(BIPSimpleApplication): if _debug: SampleApplication._debug("confirmation %r", apdu) BIPSimpleApplication.confirmation(self, apdu) +bacpypes_debugging(SampleApplication) + # # __main__ # diff --git a/samples/SampleConsoleCmd.py b/samples/SampleConsoleCmd.py index 4345207..48d10ad 100755 --- a/samples/SampleConsoleCmd.py +++ b/samples/SampleConsoleCmd.py @@ -29,7 +29,6 @@ this_console = None # SampleApplication # -@bacpypes_debugging class SampleApplication(BIPSimpleApplication): def __init__(self, device, address): @@ -52,11 +51,12 @@ class SampleApplication(BIPSimpleApplication): if _debug: SampleApplication._debug("confirmation %r", apdu) BIPSimpleApplication.confirmation(self, apdu) +bacpypes_debugging(SampleApplication) + # # SampleConsoleCmd # -@bacpypes_debugging class SampleConsoleCmd(ConsoleCmd): def do_nothing(self, args): @@ -64,6 +64,8 @@ class SampleConsoleCmd(ConsoleCmd): args = args.split() if _debug: SampleConsoleCmd._debug("do_nothing %r", args) +bacpypes_debugging(SampleConsoleCmd) + # # __main__ # From db3a06fe12f18b1fdaa3fc7f996b90e72304a784 Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Tue, 25 Aug 2015 22:26:21 -0400 Subject: [PATCH 4/7] minor changes to allow these to continue to work in py25 --- samples/WhoIsIAm.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/samples/WhoIsIAm.py b/samples/WhoIsIAm.py index e51af18..ad5dd4f 100755 --- a/samples/WhoIsIAm.py +++ b/samples/WhoIsIAm.py @@ -34,7 +34,6 @@ this_console = None # WhoIsIAmApplication # -@bacpypes_debugging class WhoIsIAmApplication(BIPSimpleApplication): def __init__(self, *args): @@ -85,11 +84,12 @@ class WhoIsIAmApplication(BIPSimpleApplication): # forward it along BIPSimpleApplication.indication(self, apdu) +bacpypes_debugging(WhoIsIAmApplication) + # # WhoIsIAmConsoleCmd # -@bacpypes_debugging class WhoIsIAmConsoleCmd(ConsoleCmd): def do_whois(self, args): @@ -140,6 +140,8 @@ class WhoIsIAmConsoleCmd(ConsoleCmd): except Exception, e: WhoIsIAmConsoleCmd._exception("exception: %r", e) +bacpypes_debugging(WhoIsIAmConsoleCmd) + # # __main__ # From 703f1b06daa1b556c0d836c82d1ba830d3e75e89 Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Tue, 25 Aug 2015 23:06:52 -0400 Subject: [PATCH 5/7] new sample code, seems to work --- samples/ReadPropertyAny.py | 177 +++++++++++++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100755 samples/ReadPropertyAny.py diff --git a/samples/ReadPropertyAny.py b/samples/ReadPropertyAny.py new file mode 100755 index 0000000..7a6193c --- /dev/null +++ b/samples/ReadPropertyAny.py @@ -0,0 +1,177 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for read commands +which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK +and prints the value. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_object_class, get_datatype + +from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK +from bacpypes.primitivedata import Tag +from bacpypes.constructeddata import Array +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# ReadPropertyAnyApplication +# + +class ReadPropertyAnyApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: ReadPropertyAnyApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: ReadPropertyAnyApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: ReadPropertyAnyApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): + # peek at the value tag + value_tag = apdu.propertyValue.tagList.Peek() + if _debug: ReadPropertyAnyApplication._debug(" - value_tag: %r", value_tag) + + # make sure that it is application tagged + if value_tag.tagClass != Tag.applicationTagClass: + sys.stdout.write("value is not application encoded\n") + + else: + # find the datatype + datatype = Tag._app_tag_class[value_tag.tagNumber] + if _debug: ReadPropertyAnyApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # cast out the value + value = apdu.propertyValue.cast_out(datatype) + if _debug: ReadPropertyAnyApplication._debug(" - value: %r", value) + + sys.stdout.write(str(value) + '\n') + + sys.stdout.flush() + +bacpypes_debugging(ReadPropertyAnyApplication) + +# +# ReadPropertyAnyConsoleCmd +# + +class ReadPropertyAnyConsoleCmd(ConsoleCmd): + + def do_read(self, args): + """read [ ]""" + args = args.split() + if _debug: ReadPropertyAnyConsoleCmd._debug("do_read %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type): + raise ValueError, "unknown object type" + + obj_inst = int(obj_inst) + + if prop_id.isdigit(): + prop_id = int(prop_id) + + # build a request + request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + request.pduDestination = Address(addr) + + if len(args) == 5: + request.propertyArrayIndex = int(args[4]) + if _debug: ReadPropertyAnyConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadPropertyAnyConsoleCmd._exception("exception: %r", e) + +bacpypes_debugging(ReadPropertyAnyConsoleCmd) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadPropertyAnyApplication(this_device, args.ini.address) + this_console = ReadPropertyAnyConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") From 67cbeb887f895c93e3571a33c70c52cd8ca792d1 Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Tue, 8 Sep 2015 16:25:52 -0400 Subject: [PATCH 6/7] minor doc string update --- samples/SampleApplication.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/samples/SampleApplication.py b/samples/SampleApplication.py index 7f39f4a..f4ca085 100755 --- a/samples/SampleApplication.py +++ b/samples/SampleApplication.py @@ -1,12 +1,16 @@ #!/usr/bin/python """ +Sample Application +================== + This sample application is the simplest BACpypes application that is a complete stack. Using an INI file it will configure a LocalDeviceObject, create a SampleApplication instance, and run, -waiting for a keyboard interrupt or a TERM signal to quit. There is no -input or output for this application, but by adding --debug to the -command line when it is run you can check the behavior of the stack by +waiting for a keyboard interrupt or a TERM signal to quit. + +There is no input or output for this application, but by adding --debug to +the command line when it is run you can check the behavior of the stack by seeing what is sent and received. """ From e51f97f9f4548fdfeedd4b735df8660b97e92d9f Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Tue, 8 Sep 2015 21:21:14 -0400 Subject: [PATCH 7/7] manually update primitivedata module, this should not have been changed in the samples branch --- py25/bacpypes/primitivedata.py | 234 ++++++++++++++++++++++----------- 1 file changed, 158 insertions(+), 76 deletions(-) diff --git a/py25/bacpypes/primitivedata.py b/py25/bacpypes/primitivedata.py index aa5cd1d..d1c458a 100755 --- a/py25/bacpypes/primitivedata.py +++ b/py25/bacpypes/primitivedata.py @@ -70,7 +70,7 @@ class Tag(object): def set(self, tclass, tnum, tlvt=0, tdata=''): """set the values of the tag.""" if not isinstance(tdata, str): - raise TypeError("tag data must be string") + raise TypeError("tag data must be str") self.tagClass = tclass self.tagNumber = tnum @@ -80,7 +80,7 @@ class Tag(object): def set_app_data(self, tnum, tdata): """set the values of the tag.""" if not isinstance(tdata, str): - raise TypeError("tag data must be string") + raise TypeError("tag data must be str") self.tagClass = Tag.applicationTagClass self.tagNumber = tnum @@ -611,7 +611,7 @@ class Integer(Atomic): # rip apart the number data = struct.pack('>I', self.value & 0xFFFFFFFF) - # reduce the value to the smallest number of octets, be + # reduce the value to the smallest number of bytes, be # careful about sign extension if self.value < 0: while (len(data) > 1): @@ -796,7 +796,6 @@ class CharacterString(Atomic): if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.characterStringAppTag): raise ValueError("character string application tag required") - # byte array easier to deal with tag_data = tag.tagData # extract the data @@ -877,7 +876,7 @@ class BitString(Atomic): data.append(x) # encode the tag - tag.set_app_data(Tag.bitStringAppTag, ''.join(chr(c) for c in data)) + tag.set_app_data(Tag.bitStringAppTag, ''.join(chr(i) for i in data)) def decode(self, tag): if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.bitStringAppTag): @@ -1104,87 +1103,177 @@ def expand_enumerations(klass): # Date # +_mm = r'(?P0?[1-9]|1[0-4]|odd|even|255|[*])' +_dd = r'(?P[0-3]?\d|last|odd|even|255|[*])' +_yy = r'(?P\d{2}|255|[*])' +_yyyy = r'(?P\d{4}|255|[*])' +_dow = r'(?P[1-7]|mon|tue|wed|thu|fri|sat|sun|255|[*])' + +_special_mon = {'*': 255, 'odd': 13, 'even': 14, None: 255} +_special_mon_inv = {255: '*', 13: 'odd', 14: 'even'} + +_special_day = {'*': 255, 'last': 32, 'odd': 33, 'even': 34, None: 255} +_special_day_inv = {255: '*', 32: 'last', 33: 'odd', 34: 'even'} + +_special_dow = {'*': 255, 'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4, 'fri': 5, 'sat': 6, 'sun': 7} +_special_dow_inv = {255: '*', 1: 'mon', 2: 'tue', 3: 'wed', 4: 'thu', 5: 'fri', 6: 'sat', 7: 'sun'} + + +def _merge(*args): + """Create a composite pattern and compile it.""" + return re.compile(r'^' + r'[/-]'.join(args) + r'(?:\s+' + _dow + ')?$') + + +# make a list of compiled patterns +_date_patterns = [ + _merge(_yyyy, _mm, _dd), + _merge(_mm, _dd, _yyyy), + _merge(_dd, _mm, _yyyy), + _merge(_yy, _mm, _dd), + _merge(_mm, _dd, _yy), + _merge(_dd, _mm, _yy), + ] + + class Date(Atomic): - _app_tag = Tag.dateAppTag - _date_regex = re.compile(r"^([*]|\d+)[/]([*]|\d+)[/]([*]|\d+)(?:\s([*]|\w+))?$") - _day_names = ['', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] - - DONT_CARE = 255 - - def __init__(self, arg=None, year=255, month=255, day=255, dayOfWeek=255): - self.value = (year, month, day, dayOfWeek) - + def __init__(self, arg=None, year=255, month=255, day=255, day_of_week=255): + self.value = (year, month, day, day_of_week) + if arg is None: pass - elif isinstance(arg,Tag): + elif isinstance(arg, Tag): self.decode(arg) elif isinstance(arg, tuple): self.value = arg elif isinstance(arg, str): - date_match = Date._date_regex.match(arg) - if not date_match: - raise ValueError("invalid date pattern") - date_groups = date_match.groups() + # lower case everything + arg = arg.lower() - # day/month/year - tup_list = [] - for s in date_groups[:3]: - if s == '*': - tup_list.append(255) - elif s in None: - tup_list.append(0) - else: - tup_list.append(int(s)) + # make a list of the contents from matching patterns + matches = [] + for p in _date_patterns: + m = p.match(arg) + if m: + matches.append(m.groupdict()) - # clean up the year - if (tup_list[2] < 100): - tup_list[2] += 2000 - tup_list[2] -= 1900 + # try to find a good one + match = None + if not matches: + raise ValueError("unmatched") - # day-of-week madness - dow = date_groups[3] - if dow is None: - tup_list.append(0) - elif (dow == '*'): - tup_list.append(255) - elif dow.isdigit(): - tup_list.append(int(dow)) + # if there is only one, success + if len(matches) == 1: + match = matches[0] else: - dow = dow.title() - if dow not in Date._day_names: - raise ValueError("invalid day name") - tup_list.append(Date._day_names.index(dow)) + # check to see if they really are the same + for a, b in zip(matches[:-1],matches[1:]): + if a != b: + raise ValueError("ambiguous") + break + else: + match = matches[0] + + # extract the year and normalize + year = match['year'] + if (year == '*') or (not year): + year = 255 + else: + year = int(year) + if (year == 255): + pass + elif year < 35: + year += 2000 + elif year < 100: + year += 1900 + elif year < 1900: + raise ValueError("invalid year") + + # extract the month and normalize + month = match['month'] + if month in _special_mon: + month = _special_mon[month] + else: + month = int(month) + if (month == 255): + pass + elif (month == 0) or (month > 14): + raise ValueError("invalid month") + + # extract the day and normalize + day = match['day'] + if day in _special_day: + day = _special_day[day] + else: + day = int(day) + if (day == 255): + pass + elif (day == 0) or (day > 34): + raise ValueError("invalid day") + + # extract the day-of-week and normalize + day_of_week = match['dow'] + if day_of_week in _special_dow: + day_of_week = _special_dow[day_of_week] + elif not day_of_week: + pass + else: + day_of_week = int(day_of_week) + if (day_of_week == 255): + pass + elif (day_of_week == 0) or (day_of_week > 7): + raise ValueError("invalid day of week") + + # year becomes the correct octet + if year != 255: + year -= 1900 + + # save the value + self.value = (year, month, day, day_of_week) + + # calculate the day of the week + if not day_of_week: + self.CalcDayOfWeek() - self.value = tuple(tup_list) elif isinstance(arg, Date): self.value = arg.value + else: raise TypeError("invalid constructor datatype") - def now(self): - tup = time.localtime() - - self.value = (tup[0]-1900, tup[1], tup[2], tup[6] + 1) - - return self - def CalcDayOfWeek(self): """Calculate the correct day of the week.""" # rip apart the value - year, month, day, dayOfWeek = self.value + year, month, day, day_of_week = self.value - # make sure all the components are defined - if (year != 255) and (month != 255) and (day != 255): - today = time.mktime( (year + 1900, month, day, 0, 0, 0, 0, 0, -1) ) - dayOfWeek = time.gmtime(today)[6] + 1 + # assume the worst + day_of_week = 255 + + # check for special values + if year == 255: + pass + elif month in _special_mon_inv: + pass + elif day in _special_day_inv: + pass + else: + try: + today = time.mktime( (year + 1900, month, day, 0, 0, 0, 0, 0, -1) ) + day_of_week = time.gmtime(today)[6] + 1 + except OverflowError: + pass # put it back together - self.value = (year, month, day, dayOfWeek) + self.value = (year, month, day, day_of_week) + + def now(self): + tup = time.localtime() + self.value = (tup[0]-1900, tup[1], tup[2], tup[6] + 1) + return self def encode(self, tag): # encode the tag - tag.set_app_data(Tag.dateAppTag, ''.join(chr(c) for c in self.value)) + tag.set_app_data(Tag.dateAppTag, ''.join(chr(i) for i in self.value)) def decode(self, tag): if (tag.tagClass != Tag.applicationTagClass) or (tag.tagNumber != Tag.dateAppTag): @@ -1194,28 +1283,21 @@ class Date(Atomic): self.value = tuple(ord(c) for c in tag.tagData) def __str__(self): + """String representation of the date.""" # rip it apart - year, month, day, dayOfWeek = self.value + year, month, day, day_of_week = self.value - rslt = "Date(" - if month == 255: - rslt += "*/" - else: - rslt += "%d/" % (month,) - if day == 255: - rslt += "*/" - else: - rslt += "%d/" % (day,) if year == 255: - rslt += "* " + year = "*" else: - rslt += "%d " % (year + 1900,) - if dayOfWeek == 255: - rslt += "*)" - else: - rslt += Date._day_names[dayOfWeek] + ")" + year = str(year + 1900) + + month = _special_mon_inv.get(month, str(month)) + day = _special_day_inv.get(day, str(day)) + day_of_week = _special_dow_inv.get(day_of_week, str(day_of_week)) + + return "%s(%s-%s-%s %s)" % (self.__class__.__name__, year, month, day, day_of_week) - return rslt # # Time