diff --git a/BACpypes~.ini b/BACpypes~.ini new file mode 100644 index 0000000..4b16488 --- /dev/null +++ b/BACpypes~.ini @@ -0,0 +1,11 @@ +[BACpypes] +objectName: Betelgeuse +address: 128.253.109.40/24 +objectIdentifier: 599 +maxApduLengthAccepted: 1024 +segmentationSupported: segmentedBoth +vendorIdentifier: 15 +foreignPort: 0 +foreignBBMD: 128.253.109.254 +foreignTTL: 30 + diff --git a/samples/CommandableMixin.py b/samples/CommandableMixin.py new file mode 100755 index 0000000..ffad52e --- /dev/null +++ b/samples/CommandableMixin.py @@ -0,0 +1,200 @@ +#!/usr/bin/python + +""" +This sample application demonstrates a mix-in class for commandable properties +(not useful for Binary Out or Binary Value objects that have a minimum on and off +time, or for Channel objects). +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run +from bacpypes.errors import ExecutionError + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import AnalogValueObject, DateValueObject +from bacpypes.primitivedata import Null +from bacpypes.basetypes import PriorityValue, PriorityArray + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# +# CommandableMixin +# + +@bacpypes_debugging +class CommandableMixin(object): + + def __init__(self, init_value, **kwargs): + if _debug: CommandableMixin._debug("__init__ %r, %r", init_value, kwargs) + super(CommandableMixin, self).__init__(**kwargs) + + # if no present value given, give it the default value + if ('presentValue' not in kwargs): + if _debug: CommandableMixin._debug(" - initialize present value") + self.presentValue = init_value + + # if no priority array given, give it an empty one + if ('priorityArray' not in kwargs): + if _debug: CommandableMixin._debug(" - initialize priority array") + self.priorityArray = PriorityArray() + for i in range(16): + self.priorityArray.append(PriorityValue(null=Null())) + + # if no relinquish default value given, give it the default value + if ('relinquishDefault' not in kwargs): + if _debug: CommandableMixin._debug(" - initialize relinquish default") + self.relinquishDefault = init_value + + # capture the present value property + self._pv = self._properties['presentValue'] + if _debug: CommandableMixin._debug(" - _pv: %r", self._pv) + + # capture the datatype + self._pv_datatype = self._pv.datatype + if _debug: CommandableMixin._debug(" - _pv_datatype: %r", self._pv_datatype) + + # look up a matching priority value choice + for element in PriorityValue.choiceElements: + if element.klass is self._pv_datatype: + self._pv_choice = element.name + break + else: + self._pv_choice = 'constructedValue' + if _debug: CommandableMixin._debug(" - _pv_choice: %r", self._pv_choice) + + def WriteProperty(self, property, value, arrayIndex=None, priority=None, direct=False): + if _debug: CommandableMixin._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", property, value, arrayIndex, priority, direct) + + # when writing to the presentValue with a priority + if (property == 'presentValue'): + # default (lowest) priority + if priority is None: + priority = 16 + if _debug: CommandableMixin._debug(" - translate to array index %d", priority) + + # translate to updating the priority array + property = 'priorityArray' + arrayIndex = priority + priority = None + + # update the priority array entry + if (property == 'priorityArray') and (arrayIndex is not None): + # check the bounds + if arrayIndex == 0: + raise ExecutionError(errorClass='property', errorCode='writeAccessDenied') + if (arrayIndex < 1) or (arrayIndex > 16): + raise ExecutionError(errorClass='property', errorCode='invalidArrayIndex') + + # update the specific priorty value element + priority_value = self.priorityArray[arrayIndex] + if _debug: CommandableMixin._debug(" - priority_value: %r", priority_value) + + # the null or the choice has to be set, the other clear + if value is (): + if _debug: CommandableMixin._debug(" - write a null") + priority_value.null = value + setattr(priority_value, self._pv_choice, None) + else: + if _debug: CommandableMixin._debug(" - write a value") + priority_value.null = None + setattr(priority_value, self._pv_choice, value) + + # look for the highest priority value + for i in range(1, 17): + priority_value = self.priorityArray[i] + if priority_value.null is None: + if (i < arrayIndex): + if _debug: CommandableMixin._debug(" - existing higher priority value") + return + value = getattr(priority_value, self._pv_choice) + break + else: + value = self.relinquishDefault + if _debug: CommandableMixin._debug(" - new present value: %r", value) + + property = 'presentValue' + arrayIndex = priority = None + + # allow the request to pass through + if _debug: CommandableMixin._debug(" - super: %r %r arrayIndex=%r priority=%r", property, value, arrayIndex, priority) + super(CommandableMixin, self).WriteProperty( + property, value, + arrayIndex=arrayIndex, priority=priority, direct=direct, + ) + +# +# CommandableAnalogValueObject +# + +@bacpypes_debugging +class CommandableAnalogValueObject(CommandableMixin, AnalogValueObject): + + def __init__(self, **kwargs): + if _debug: CommandableAnalogValueObject._debug("__init__ %r", kwargs) + CommandableMixin.__init__(self, 0.0, **kwargs) + +# +# CommandableDateValueObject +# + +@bacpypes_debugging +class CommandableDateValueObject(CommandableMixin, DateValueObject): + + def __init__(self, **kwargs): + if _debug: CommandableDateValueObject._debug("__init__ %r", kwargs) + CommandableMixin.__init__(self, False, **kwargs) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a sample application + this_application = BIPSimpleApplication(this_device, args.ini.address) + + # make a commandable analog value object, add to the device + cavo1 = CommandableAnalogValueObject( + objectIdentifier=('analogValue', 1), objectName='Commandable AV 1' + ) + if _debug: _log.debug(" - cavo1: %r", cavo1) + this_application.add_object(cavo1) + + # make a commandable binary value object, add to the device + cdvo2 = CommandableDateValueObject( + objectIdentifier=('dateValue', 1), objectName='Commandable2' + ) + if _debug: _log.debug(" - cdvo2: %r", cdvo2) + this_application.add_object(cdvo2) + + if _debug: _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + if _debug: _log.debug("finally") + diff --git a/samples/IP2IPRouter.py b/samples/IP2IPRouter.py new file mode 100755 index 0000000..2939999 --- /dev/null +++ b/samples/IP2IPRouter.py @@ -0,0 +1,125 @@ +#!/usr/bin/python + +""" +This sample application presents itself as a router between two +IP networks. This application can run on a single homed machine +by using the same IP address and two different port numbers, or +to be closer to what is typically considered a router, on a +multihomed machine using two different IP addresses and the same +port number. + +$ python IP2IPRtouer.py addr1 net1 addr2 net2 + + addr1 - local address like 192.168.1.2/24:47808 + net1 - network number + addr2 - local address like 192.168.3.4/24:47809 + net2 - network number + +As a router, this does not have an application layer. +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ArgumentParser + +from bacpypes.core import run +from bacpypes.comm import bind + +from bacpypes.pdu import Address +from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement +from bacpypes.bvllservice import BIPSimple, AnnexJCodec, UDPMultiplexer + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# +# IP2IPRouter +# + +@bacpypes_debugging +class IP2IPRouter: + + def __init__(self, addr1, net1, addr2, net2): + if _debug: IP2IPRouter._debug("__init__ %r %r %r %r", addr1, net1, addr2, net2) + + # a network service access point will be needed + self.nsap = NetworkServiceAccessPoint() + + # give the NSAP a generic network layer service element + self.nse = NetworkServiceElement() + bind(self.nse, self.nsap) + + #== First stack + + # create a generic BIP stack, bound to the Annex J server + # on the UDP multiplexer + self.s1_bip = BIPSimple() + self.s1_annexj = AnnexJCodec() + self.s1_mux = UDPMultiplexer(addr1) + + # bind the bottom layers + bind(self.s1_bip, self.s1_annexj, self.s1_mux.annexJ) + + # bind the BIP stack to the local network + self.nsap.bind(self.s1_bip, net1, addr1) + + #== Second stack + + # create a generic BIP stack, bound to the Annex J server + # on the UDP multiplexer + self.s2_bip = BIPSimple() + self.s2_annexj = AnnexJCodec() + self.s2_mux = UDPMultiplexer(addr2) + + # bind the bottom layers + bind(self.s2_bip, self.s2_annexj, self.s2_mux.annexJ) + + # bind the BIP stack to the local network + self.nsap.bind(self.s2_bip, net2) + +# +# __main__ +# + +try: + # parse the command line arguments + parser = ArgumentParser(description=__doc__) + + # add an argument for interval + parser.add_argument('addr1', type=str, + help='address of first network', + ) + + # add an argument for interval + parser.add_argument('net1', type=int, + help='network number of first network', + ) + + # add an argument for interval + parser.add_argument('addr2', type=str, + help='address of second network', + ) + + # add an argument for interval + parser.add_argument('net2', type=int, + help='network number of second network', + ) + + # now parse the arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # create the router + router = IP2IPRouter(Address(args.addr1), args.net1, Address(args.addr2), args.net2) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/MultiStateValueObject.py b/samples/MultiStateValueObject.py new file mode 100755 index 0000000..c7b9079 --- /dev/null +++ b/samples/MultiStateValueObject.py @@ -0,0 +1,69 @@ +#!/usr/bin/python + +""" +This sample application provides a single MultiState Value Object to test +reading and writing its various properties. +""" + +from bacpypes.debugging import ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import MultiStateValueObject + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a sample application + this_application = BIPSimpleApplication(this_device, args.ini.address) + + # make a multistate value object + msvo = MultiStateValueObject( + objectIdentifier=('multiStateValue', 1), + objectName='My Special Object', + presentValue=1, + numberOfStates=3, + stateText=['red', 'green', 'blue'], + ) + _log.debug(" - msvo: %r", msvo) + + # add it to the device + this_application.add_object(msvo) + _log.debug(" - object list: %r", this_device.objectList) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/MultipleReadProperty.py b/samples/MultipleReadProperty.py new file mode 100755 index 0000000..c4ce92c --- /dev/null +++ b/samples/MultipleReadProperty.py @@ -0,0 +1,165 @@ +#!/usr/bin/python + +""" +Mutliple Read Property + +This application has a static list of points that it would like to read. It reads the +values of each of them in turn and then quits. +""" + +from collections import deque + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run, stop, deferred + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_datatype + +from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK +from bacpypes.primitivedata import Unsigned +from bacpypes.constructeddata import Array +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# point list +point_list = [ + ('1.2.3.4', 'analogValue', 1, 'presentValue'), + ('1.2.3.4', 'analogValue', 2, 'presentValue'), + ] + +# +# ReadPointListApplication +# + +@bacpypes_debugging +class ReadPointListApplication(BIPSimpleApplication): + + def __init__(self, point_list, *args): + if _debug: ReadPointListApplication._debug("__init__ %r, %r", point_list, args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + # make a list of the response values + self.response_values = [] + + # turn the point list into a queue + self.point_queue = deque(point_list) + + def next_request(self): + if _debug: ReadPointListApplication._debug("next_request") + + # check to see if we're done + if not self.point_queue: + if _debug: ReadPointListApplication._debug(" - done") + stop() + return + + # get the next request + addr, obj_type, obj_inst, prop_id = self.point_queue.popleft() + + # build a request + self._request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + self._request.pduDestination = Address(addr) + if _debug: ReadPointListApplication._debug(" - request: %r", self._request) + + # forward it along + BIPSimpleApplication.request(self, self._request) + + def confirmation(self, apdu): + if _debug: ReadPointListApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + if _debug: ReadPointListApplication._debug(" - error: %r", apdu) + self.response_values.append(apdu) + + elif isinstance(apdu, AbortPDU): + if _debug: ReadPointListApplication._debug(" - abort: %r", apdu) + self.response_values.append(apdu) + + elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) + if _debug: ReadPointListApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): + if apdu.propertyArrayIndex == 0: + value = apdu.propertyValue.cast_out(Unsigned) + else: + value = apdu.propertyValue.cast_out(datatype.subtype) + else: + value = apdu.propertyValue.cast_out(datatype) + if _debug: ReadPointListApplication._debug(" - value: %r", value) + + # save the value + self.response_values.append(value) + + # fire off another request + deferred(self.next_request) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadPointListApplication(point_list, this_device, args.ini.address) + + # fire off a request when the core has a chance + deferred(this_application.next_request) + + _log.debug("running") + + run() + + # dump out the results + for request, response in zip(point_list, this_application.response_values): + print request, response + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/RandomAnalogValueObject.py b/samples/RandomAnalogValueObject.py new file mode 100755 index 0000000..fec1181 --- /dev/null +++ b/samples/RandomAnalogValueObject.py @@ -0,0 +1,126 @@ +#!/usr/bin/python + +""" +This sample application shows how to extend one of the basic objects, an Analog +Value Object in this case, to provide a present value. This type of code is used +when the application is providing a BACnet interface to a collection of data. +It assumes that almost all of the default behaviour of a BACpypes application is +sufficient. +""" + +import random + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.primitivedata import Real +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import AnalogValueObject, Property, register_object_type +from bacpypes.errors import ExecutionError + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# +# RandomValueProperty +# + +@bacpypes_debugging +class RandomValueProperty(Property): + + def __init__(self, identifier): + if _debug: RandomValueProperty._debug("__init__ %r", identifier) + Property.__init__(self, identifier, Real, default=None, optional=True, mutable=False) + + def ReadProperty(self, obj, arrayIndex=None): + if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex) + + # access an array + if arrayIndex is not None: + raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray') + + # return a random value + value = random.random() * 100.0 + if _debug: RandomValueProperty._debug(" - value: %r", value) + + return value + + def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False): + if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct) + raise ExecutionError(errorClass='property', errorCode='writeAccessDenied') + +# +# Random Value Object Type +# + +@bacpypes_debugging +class RandomAnalogValueObject(AnalogValueObject): + + properties = [ + RandomValueProperty('presentValue'), + ] + + def __init__(self, **kwargs): + if _debug: RandomAnalogValueObject._debug("__init__ %r", kwargs) + AnalogValueObject.__init__(self, **kwargs) + +register_object_type(RandomAnalogValueObject) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=('device', int(args.ini.objectidentifier)), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a sample application + this_application = BIPSimpleApplication(this_device, args.ini.address) + + # make a random input object + ravo1 = RandomAnalogValueObject( + objectIdentifier=('analogValue', 1), objectName='Random1' + ) + _log.debug(" - ravo1: %r", ravo1) + + ravo1d = ravo1._dict_contents() + print ravo1d + + ravo2 = RandomAnalogValueObject( + objectIdentifier=('analogValue', 2), objectName='Random2' + ) + _log.debug(" - ravo2: %r", ravo2) + + # add it to the device + this_application.add_object(ravo1) + this_application.add_object(ravo2) + _log.debug(" - object list: %r", this_device.objectList) + + print this_device._dict_contents() + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/ReadProperty.py b/samples/ReadProperty.py new file mode 100755 index 0000000..b2972a4 --- /dev/null +++ b/samples/ReadProperty.py @@ -0,0 +1,176 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for read commands +which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK +and prints the value. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_object_class, get_datatype + +from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK +from bacpypes.primitivedata import Unsigned +from bacpypes.constructeddata import Array +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# ReadPropertyApplication +# + +class ReadPropertyApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: ReadPropertyApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: ReadPropertyApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: ReadPropertyApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) + if _debug: ReadPropertyApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): + if apdu.propertyArrayIndex == 0: + value = apdu.propertyValue.cast_out(Unsigned) + else: + value = apdu.propertyValue.cast_out(datatype.subtype) + else: + value = apdu.propertyValue.cast_out(datatype) + if _debug: ReadPropertyApplication._debug(" - value: %r", value) + + sys.stdout.write(str(value) + '\n') + if hasattr(value, 'debug_contents'): + value.debug_contents(file=sys.stdout) + sys.stdout.flush() + +bacpypes_debugging(ReadPropertyApplication) + +# +# ReadPropertyConsoleCmd +# + +class ReadPropertyConsoleCmd(ConsoleCmd): + + def do_read(self, args): + """read [ ]""" + args = args.split() + if _debug: ReadPropertyConsoleCmd._debug("do_read %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type): + raise ValueError, "unknown object type" + + obj_inst = int(obj_inst) + + datatype = get_datatype(obj_type, prop_id) + if not datatype: + raise ValueError, "invalid property for object type" + + # build a request + request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + request.pduDestination = Address(addr) + + if len(args) == 5: + request.propertyArrayIndex = int(args[4]) + if _debug: ReadPropertyConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadPropertyConsoleCmd._exception("exception: %r", e) + +bacpypes_debugging(ReadPropertyConsoleCmd) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadPropertyApplication(this_device, args.ini.address) + this_console = ReadPropertyConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/ReadPropertyAny.py b/samples/ReadPropertyAny.py new file mode 100755 index 0000000..7a6193c --- /dev/null +++ b/samples/ReadPropertyAny.py @@ -0,0 +1,177 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for read commands +which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK +and prints the value. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_object_class, get_datatype + +from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK +from bacpypes.primitivedata import Tag +from bacpypes.constructeddata import Array +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# ReadPropertyAnyApplication +# + +class ReadPropertyAnyApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: ReadPropertyAnyApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: ReadPropertyAnyApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: ReadPropertyAnyApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): + # peek at the value tag + value_tag = apdu.propertyValue.tagList.Peek() + if _debug: ReadPropertyAnyApplication._debug(" - value_tag: %r", value_tag) + + # make sure that it is application tagged + if value_tag.tagClass != Tag.applicationTagClass: + sys.stdout.write("value is not application encoded\n") + + else: + # find the datatype + datatype = Tag._app_tag_class[value_tag.tagNumber] + if _debug: ReadPropertyAnyApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # cast out the value + value = apdu.propertyValue.cast_out(datatype) + if _debug: ReadPropertyAnyApplication._debug(" - value: %r", value) + + sys.stdout.write(str(value) + '\n') + + sys.stdout.flush() + +bacpypes_debugging(ReadPropertyAnyApplication) + +# +# ReadPropertyAnyConsoleCmd +# + +class ReadPropertyAnyConsoleCmd(ConsoleCmd): + + def do_read(self, args): + """read [ ]""" + args = args.split() + if _debug: ReadPropertyAnyConsoleCmd._debug("do_read %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type): + raise ValueError, "unknown object type" + + obj_inst = int(obj_inst) + + if prop_id.isdigit(): + prop_id = int(prop_id) + + # build a request + request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + request.pduDestination = Address(addr) + + if len(args) == 5: + request.propertyArrayIndex = int(args[4]) + if _debug: ReadPropertyAnyConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadPropertyAnyConsoleCmd._exception("exception: %r", e) + +bacpypes_debugging(ReadPropertyAnyConsoleCmd) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadPropertyAnyApplication(this_device, args.ini.address) + this_console = ReadPropertyAnyConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/ReadPropertyMultiple.py b/samples/ReadPropertyMultiple.py new file mode 100755 index 0000000..ab7c1ae --- /dev/null +++ b/samples/ReadPropertyMultiple.py @@ -0,0 +1,245 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for read commands +which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK +and prints the value. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_object_class, get_datatype + +from bacpypes.apdu import ReadPropertyMultipleRequest, PropertyReference, ReadAccessSpecification, Error, AbortPDU, ReadPropertyMultipleACK +from bacpypes.primitivedata import Unsigned +from bacpypes.constructeddata import Array +from bacpypes.basetypes import PropertyIdentifier, ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# ReadPropertyMultipleApplication +# + +@bacpypes_debugging +class ReadPropertyMultipleApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: ReadPropertyMultipleApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: ReadPropertyMultipleApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: ReadPropertyMultipleApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + elif (isinstance(self._request, ReadPropertyMultipleRequest)) and (isinstance(apdu, ReadPropertyMultipleACK)): + # loop through the results + for result in apdu.listOfReadAccessResults: + # here is the object identifier + objectIdentifier = result.objectIdentifier + if _debug: ReadPropertyMultipleApplication._debug(" - objectIdentifier: %r", objectIdentifier) + + # now come the property values per object + for element in result.listOfResults: + # get the property and array index + propertyIdentifier = element.propertyIdentifier + if _debug: ReadPropertyMultipleApplication._debug(" - propertyIdentifier: %r", propertyIdentifier) + propertyArrayIndex = element.propertyArrayIndex + if _debug: ReadPropertyMultipleApplication._debug(" - propertyArrayIndex: %r", propertyArrayIndex) + + # here is the read result + readResult = element.readResult + + sys.stdout.write(propertyIdentifier) + if propertyArrayIndex is not None: + sys.stdout.write("[" + str(propertyArrayIndex) + "]") + + # check for an error + if readResult.propertyAccessError is not None: + sys.stdout.write(" ! " + str(readResult.propertyAccessError) + '\n') + + else: + # here is the value + propertyValue = readResult.propertyValue + + # find the datatype + datatype = get_datatype(objectIdentifier[0], propertyIdentifier) + if _debug: ReadPropertyMultipleApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (propertyArrayIndex is not None): + if propertyArrayIndex == 0: + value = propertyValue.cast_out(Unsigned) + else: + value = propertyValue.cast_out(datatype.subtype) + else: + value = propertyValue.cast_out(datatype) + if _debug: ReadPropertyMultipleApplication._debug(" - value: %r", value) + + sys.stdout.write(" = " + str(value) + '\n') + sys.stdout.flush() + +# +# ReadPropertyMultipleConsoleCmd +# + +@bacpypes_debugging +class ReadPropertyMultipleConsoleCmd(ConsoleCmd): + + def do_read(self, args): + """read ( ( [ ] )... )...""" + args = args.split() + if _debug: ReadPropertyMultipleConsoleCmd._debug("do_read %r", args) + + try: + i = 0 + addr = args[i] + i += 1 + + read_access_spec_list = [] + while i < len(args): + obj_type = args[i] + i += 1 + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type): + raise ValueError, "unknown object type" + + obj_inst = int(args[i]) + i += 1 + + prop_reference_list = [] + while i < len(args): + prop_id = args[i] + if prop_id not in PropertyIdentifier.enumerations: + break + + i += 1 + if prop_id in ('all', 'required', 'optional'): + pass + else: + datatype = get_datatype(obj_type, prop_id) + if not datatype: + raise ValueError, "invalid property for object type" + + # build a property reference + prop_reference = PropertyReference( + propertyIdentifier=prop_id, + ) + + # check for an array index + if (i < len(args)) and args[i].isdigit(): + prop_reference.propertyArrayIndex = int(args[i]) + i += 1 + + # add it to the list + prop_reference_list.append(prop_reference) + + # check for at least one property + if not prop_reference_list: + raise ValueError, "provide at least one property" + + # build a read access specification + read_access_spec = ReadAccessSpecification( + objectIdentifier=(obj_type, obj_inst), + listOfPropertyReferences=prop_reference_list, + ) + + # add it to the list + read_access_spec_list.append(read_access_spec) + + # check for at least one + if not read_access_spec_list: + raise RuntimeError, "at least one read access specification required" + + # build the request + request = ReadPropertyMultipleRequest( + listOfReadAccessSpecs=read_access_spec_list, + ) + request.pduDestination = Address(addr) + if _debug: ReadPropertyMultipleConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadPropertyMultipleConsoleCmd._exception("exception: %r", e) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadPropertyMultipleApplication(this_device, args.ini.address) + this_console = ReadPropertyMultipleConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/ReadPropertyMultipleServer.py b/samples/ReadPropertyMultipleServer.py new file mode 100755 index 0000000..3454f59 --- /dev/null +++ b/samples/ReadPropertyMultipleServer.py @@ -0,0 +1,319 @@ +#!/usr/bin/python + +""" +This sample application shows how to extend the basic functionality of a device +to support the ReadPropertyMultiple service. +""" + +import random + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.primitivedata import Atomic, Real, Unsigned +from bacpypes.constructeddata import Array, Any +from bacpypes.basetypes import ServicesSupported, ErrorType +from bacpypes.apdu import ReadPropertyMultipleACK, ReadAccessResult, ReadAccessResultElement, ReadAccessResultElementChoice +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import AnalogValueObject, Property, PropertyError, register_object_type +from bacpypes.apdu import Error +from bacpypes.errors import ExecutionError + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# +# RandomValueProperty +# + +@bacpypes_debugging +class RandomValueProperty(Property): + + def __init__(self, identifier): + if _debug: RandomValueProperty._debug("__init__ %r", identifier) + Property.__init__(self, identifier, Real, default=None, optional=True, mutable=False) + + def ReadProperty(self, obj, arrayIndex=None): + if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex) + + # access an array + if arrayIndex is not None: + raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray') + + # return a random value + value = random.random() * 100.0 + if _debug: RandomValueProperty._debug(" - value: %r", value) + + return value + + def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False): + if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct) + raise ExecutionError(errorClass='property', errorCode='writeAccessDenied') + +# +# Random Value Object Type +# + +@bacpypes_debugging +class RandomAnalogValueObject(AnalogValueObject): + + properties = [ + RandomValueProperty('presentValue'), + ] + + def __init__(self, **kwargs): + if _debug: RandomAnalogValueObject._debug("__init__ %r", kwargs) + AnalogValueObject.__init__(self, **kwargs) + +register_object_type(RandomAnalogValueObject) + +# +# ReadPropertyToAny +# + +@bacpypes_debugging +def ReadPropertyToAny(obj, propertyIdentifier, propertyArrayIndex=None): + """Read the specified property of the object, with the optional array index, + and cast the result into an Any object.""" + if _debug: ReadPropertyToAny._debug("ReadPropertyToAny %s %r %r", obj, propertyIdentifier, propertyArrayIndex) + + # get the datatype + datatype = obj.get_datatype(propertyIdentifier) + if _debug: ReadPropertyToAny._debug(" - datatype: %r", datatype) + if datatype is None: + raise ExecutionError(errorClass='property', errorCode='datatypeNotSupported') + + # get the value + value = obj.ReadProperty(propertyIdentifier, propertyArrayIndex) + if _debug: ReadPropertyToAny._debug(" - value: %r", value) + if value is None: + raise ExecutionError(errorClass='property', errorCode='unknownProperty') + + # change atomic values into something encodeable + if issubclass(datatype, Atomic): + value = datatype(value) + elif issubclass(datatype, Array) and (propertyArrayIndex is not None): + if propertyArrayIndex == 0: + value = Unsigned(value) + elif issubclass(datatype.subtype, Atomic): + value = datatype.subtype(value) + elif not isinstance(value, datatype.subtype): + raise TypeError, "invalid result datatype, expecting %s and got %s" \ + % (datatype.subtype.__name__, type(value).__name__) + elif not isinstance(value, datatype): + raise TypeError, "invalid result datatype, expecting %s and got %s" \ + % (datatype.__name__, type(value).__name__) + if _debug: ReadPropertyToAny._debug(" - encodeable value: %r", value) + + # encode the value + result = Any() + result.cast_in(value) + if _debug: ReadPropertyToAny._debug(" - result: %r", result) + + # return the object + return result + +# +# ReadPropertyToResultElement +# + +@bacpypes_debugging +def ReadPropertyToResultElement(obj, propertyIdentifier, propertyArrayIndex=None): + """Read the specified property of the object, with the optional array index, + and cast the result into an Any object.""" + if _debug: ReadPropertyToResultElement._debug("ReadPropertyToResultElement %s %r %r", obj, propertyIdentifier, propertyArrayIndex) + + # save the result in the property value + read_result = ReadAccessResultElementChoice() + + try: + read_result.propertyValue = ReadPropertyToAny(obj, propertyIdentifier, propertyArrayIndex) + if _debug: ReadPropertyToResultElement._debug(" - success") + except PropertyError, error: + if _debug: ReadPropertyToResultElement._debug(" - error: %r", error) + read_result.propertyAccessError = ErrorType(errorClass='property', errorCode='unknownProperty') + except ExecutionError, error: + if _debug: ReadPropertyToResultElement._debug(" - error: %r", error) + read_result.propertyAccessError = ErrorType(errorClass=error.errorClass, errorCode=error.errorCode) + + # make an element for this value + read_access_result_element = ReadAccessResultElement( + propertyIdentifier=propertyIdentifier, + propertyArrayIndex=propertyArrayIndex, + readResult=read_result, + ) + if _debug: ReadPropertyToResultElement._debug(" - read_access_result_element: %r", read_access_result_element) + + # fini + return read_access_result_element + +# +# ReadPropertyMultipleApplication +# + +@bacpypes_debugging +class ReadPropertyMultipleApplication(BIPSimpleApplication): + + def __init__(self, *args, **kwargs): + if _debug: ReadPropertyMultipleApplication._debug("__init__ %r %r", args, kwargs) + BIPSimpleApplication.__init__(self, *args, **kwargs) + + def do_ReadPropertyMultipleRequest(self, apdu): + """Respond to a ReadPropertyMultiple Request.""" + if _debug: ReadPropertyMultipleApplication._debug("do_ReadPropertyMultipleRequest %r", apdu) + + # response is a list of read access results (or an error) + resp = None + read_access_result_list = [] + + # loop through the request + for read_access_spec in apdu.listOfReadAccessSpecs: + # get the object identifier + objectIdentifier = read_access_spec.objectIdentifier + if _debug: ReadPropertyMultipleApplication._debug(" - objectIdentifier: %r", objectIdentifier) + + # check for wildcard + if (objectIdentifier == ('device', 4194303)): + if _debug: ReadPropertyMultipleApplication._debug(" - wildcard device identifier") + objectIdentifier = self.localDevice.objectIdentifier + + # get the object + obj = self.get_object_id(objectIdentifier) + if _debug: ReadPropertyMultipleApplication._debug(" - object: %r", obj) + + # make sure it exists + if not obj: + resp = Error(errorClass='object', errorCode='unknownObject', context=apdu) + if _debug: ReadPropertyMultipleApplication._debug(" - unknown object error: %r", resp) + break + + # build a list of result elements + read_access_result_element_list = [] + + # loop through the property references + for prop_reference in read_access_spec.listOfPropertyReferences: + # get the property identifier + propertyIdentifier = prop_reference.propertyIdentifier + if _debug: ReadPropertyMultipleApplication._debug(" - propertyIdentifier: %r", propertyIdentifier) + + # get the array index (optional) + propertyArrayIndex = prop_reference.propertyArrayIndex + if _debug: ReadPropertyMultipleApplication._debug(" - propertyArrayIndex: %r", propertyArrayIndex) + + # check for special property identifiers + if propertyIdentifier in ('all', 'required', 'optional'): + for propId, prop in obj._properties.items(): + if _debug: ReadPropertyMultipleApplication._debug(" - checking: %r %r", propId, prop.optional) + + if (propertyIdentifier == 'all'): + pass + elif (propertyIdentifier == 'required') and (prop.optional): + if _debug: ReadPropertyMultipleApplication._debug(" - not a required property") + continue + elif (propertyIdentifier == 'optional') and (not prop.optional): + if _debug: ReadPropertyMultipleApplication._debug(" - not an optional property") + continue + + # read the specific property + read_access_result_element = ReadPropertyToResultElement(obj, propId, propertyArrayIndex) + + # check for undefined property + if read_access_result_element.readResult.propertyAccessError \ + and read_access_result_element.readResult.propertyAccessError.errorCode == 'unknownProperty': + continue + + # add it to the list + read_access_result_element_list.append(read_access_result_element) + + else: + # read the specific property + read_access_result_element = ReadPropertyToResultElement(obj, propertyIdentifier, propertyArrayIndex) + + # add it to the list + read_access_result_element_list.append(read_access_result_element) + + # build a read access result + read_access_result = ReadAccessResult( + objectIdentifier=objectIdentifier, + listOfResults=read_access_result_element_list + ) + if _debug: ReadPropertyMultipleApplication._debug(" - read_access_result: %r", read_access_result) + + # add it to the list + read_access_result_list.append(read_access_result) + + # this is a ReadPropertyMultiple ack + if not resp: + resp = ReadPropertyMultipleACK(context=apdu) + resp.listOfReadAccessResults = read_access_result_list + if _debug: ReadPropertyMultipleApplication._debug(" - resp: %r", resp) + + # return the result + self.response(resp) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['readPropertyMultiple'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a sample application + this_application = ReadPropertyMultipleApplication(this_device, args.ini.address) + + # make a random input object + ravo1 = RandomAnalogValueObject( + objectIdentifier=('analogValue', 1), objectName='Random1' + ) + _log.debug(" - ravo1: %r", ravo1) + + ravo2 = RandomAnalogValueObject( + objectIdentifier=('analogValue', 2), objectName='Random2' + ) + _log.debug(" - ravo2: %r", ravo2) + + # add it to the device + this_application.add_object(ravo1) + this_application.add_object(ravo2) + _log.debug(" - object list: %r", this_device.objectList) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/ReadRange.py b/samples/ReadRange.py new file mode 100755 index 0000000..adb5149 --- /dev/null +++ b/samples/ReadRange.py @@ -0,0 +1,166 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for readrange commands +which create ReadRangeRequest PDUs, then lines up the coorresponding ReadRangeACK +and prints the value. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_object_class, get_datatype + +from bacpypes.apdu import Error, AbortPDU, ReadRangeRequest, ReadRangeACK +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# ReadRangeApplication +# + +@bacpypes_debugging +class ReadRangeApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: ReadRangeApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: ReadRangeApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: ReadRangeApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + elif (isinstance(self._request, ReadRangeRequest)) and (isinstance(apdu, ReadRangeACK)): + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) + if _debug: ReadRangeApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # cast out of the single Any element into the datatype + value = apdu.itemData[0].cast_out(datatype) + + # dump it out + for i, item in enumerate(value): + sys.stdout.write("[%d]\n" % (i,)) + item.debug_contents(file=sys.stdout, indent=2) + sys.stdout.flush() + +# +# ReadRangeConsoleCmd +# + +@bacpypes_debugging +class ReadRangeConsoleCmd(ConsoleCmd): + + def do_readrange(self, args): + """readrange [ ]""" + args = args.split() + if _debug: ReadRangeConsoleCmd._debug("do_readrange %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type): + raise ValueError, "unknown object type" + + obj_inst = int(obj_inst) + + datatype = get_datatype(obj_type, prop_id) + if not datatype: + raise ValueError, "invalid property for object type" + + # build a request + request = ReadRangeRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + request.pduDestination = Address(addr) + + if len(args) == 5: + request.propertyArrayIndex = int(args[4]) + if _debug: ReadRangeConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadRangeConsoleCmd._exception("exception: %r", e) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadRangeApplication(this_device, args.ini.address) + this_console = ReadRangeConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/ReadWriteFile.py b/samples/ReadWriteFile.py new file mode 100755 index 0000000..d6d5bc5 --- /dev/null +++ b/samples/ReadWriteFile.py @@ -0,0 +1,269 @@ +#!/usr/bin/python + +""" +ReadWriteFile.py + +This application presents a 'console' prompt to the user asking for commands. + +The 'readrecord' and 'writerecord' commands are used with record oriented files, +and the 'readstream' and 'writestream' commands are used with stream oriented +files. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication + +from bacpypes.apdu import Error, AbortPDU, \ + AtomicReadFileRequest, \ + AtomicReadFileRequestAccessMethodChoice, \ + AtomicReadFileRequestAccessMethodChoiceRecordAccess, \ + AtomicReadFileRequestAccessMethodChoiceStreamAccess, \ + AtomicReadFileACK, \ + AtomicWriteFileRequest, \ + AtomicWriteFileRequestAccessMethodChoice, \ + AtomicWriteFileRequestAccessMethodChoiceRecordAccess, \ + AtomicWriteFileRequestAccessMethodChoiceStreamAccess, \ + AtomicWriteFileACK +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# reference a simple application +this_application = None + +# +# TestApplication +# + +@bacpypes_debugging +class TestApplication(BIPSimpleApplication): + + def request(self, apdu): + if _debug: TestApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: TestApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + elif (isinstance(self._request, AtomicReadFileRequest)) and (isinstance(apdu, AtomicReadFileACK)): + # suck out the record data + if apdu.accessMethod.recordAccess: + value = apdu.accessMethod.recordAccess.fileRecordData + elif apdu.accessMethod.streamAccess: + value = apdu.accessMethod.streamAccess.fileData + TestApplication._debug(" - value: %r", value) + + sys.stdout.write(repr(value) + '\n') + sys.stdout.flush() + + elif (isinstance(self._request, AtomicWriteFileRequest)) and (isinstance(apdu, AtomicWriteFileACK)): + # suck out the record data + if apdu.fileStartPosition is not None: + value = apdu.fileStartPosition + elif apdu.fileStartRecord is not None: + value = apdu.fileStartRecord + TestApplication._debug(" - value: %r", value) + + sys.stdout.write(repr(value) + '\n') + sys.stdout.flush() + +# +# TestConsoleCmd +# + +@bacpypes_debugging +class TestConsoleCmd(ConsoleCmd): + + def do_readrecord(self, args): + """readrecord """ + args = args.split() + if _debug: TestConsoleCmd._debug("do_readrecord %r", args) + + try: + addr, obj_inst, start_record, record_count = args + + obj_type = 'file' + obj_inst = int(obj_inst) + start_record = int(start_record) + record_count = int(record_count) + + # build a request + request = AtomicReadFileRequest( + fileIdentifier=(obj_type, obj_inst), + accessMethod=AtomicReadFileRequestAccessMethodChoice( + recordAccess=AtomicReadFileRequestAccessMethodChoiceRecordAccess( + fileStartRecord=start_record, + requestedRecordCount=record_count, + ), + ), + ) + request.pduDestination = Address(addr) + if _debug: TestConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + TestConsoleCmd._exception("exception: %r", e) + + def do_readstream(self, args): + """readstream """ + args = args.split() + if _debug: TestConsoleCmd._debug("do_readstream %r", args) + + try: + addr, obj_inst, start_position, octet_count = args + + obj_type = 'file' + obj_inst = int(obj_inst) + start_position = int(start_position) + octet_count = int(octet_count) + + # build a request + request = AtomicReadFileRequest( + fileIdentifier=(obj_type, obj_inst), + accessMethod=AtomicReadFileRequestAccessMethodChoice( + streamAccess=AtomicReadFileRequestAccessMethodChoiceStreamAccess( + fileStartPosition=start_position, + requestedOctetCount=octet_count, + ), + ), + ) + request.pduDestination = Address(addr) + if _debug: TestConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + TestConsoleCmd._exception("exception: %r", e) + + def do_writerecord(self, args): + """writerecord [ ... ]""" + args = args.split() + if _debug: TestConsoleCmd._debug("do_writerecord %r", args) + + try: + addr, obj_inst, start_record, record_count = args[0:4] + + obj_type = 'file' + obj_inst = int(obj_inst) + start_record = int(start_record) + record_count = int(record_count) + record_data = list(args[4:]) + + # build a request + request = AtomicWriteFileRequest( + fileIdentifier=(obj_type, obj_inst), + accessMethod=AtomicWriteFileRequestAccessMethodChoice( + recordAccess=AtomicWriteFileRequestAccessMethodChoiceRecordAccess( + fileStartRecord=start_record, + recordCount=record_count, + fileRecordData=record_data, + ), + ), + ) + request.pduDestination = Address(addr) + if _debug: TestConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + TestConsoleCmd._exception("exception: %r", e) + + def do_writestream(self, args): + """writestream """ + args = args.split() + if _debug: TestConsoleCmd._debug("do_writestream %r", args) + + try: + addr, obj_inst, start_position, data = args + + obj_type = 'file' + obj_inst = int(obj_inst) + start_position = int(start_position) + + # build a request + request = AtomicWriteFileRequest( + fileIdentifier=(obj_type, obj_inst), + accessMethod=AtomicWriteFileRequestAccessMethodChoice( + streamAccess=AtomicWriteFileRequestAccessMethodChoiceStreamAccess( + fileStartPosition=start_position, + fileData=data, + ), + ), + ) + request.pduDestination = Address(addr) + if _debug: TestConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + TestConsoleCmd._exception("exception: %r", e) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = TestApplication(this_device, args.ini.address) + TestConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/ReadWriteFileServer.py b/samples/ReadWriteFileServer.py new file mode 100755 index 0000000..8ed6e6e --- /dev/null +++ b/samples/ReadWriteFileServer.py @@ -0,0 +1,219 @@ +#!/usr/bin/python + +""" +ReadWriteFileServer.py + +This sample application is a BACnet device that has one record access file at +('file', 1) and one stream access file at ('file', 2). +""" + +import random +import string + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import FileObject, register_object_type + +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# +# Local Record Access File Object Type +# + +@bacpypes_debugging +class LocalRecordAccessFileObject(FileObject): + + def __init__(self, **kwargs): + """ Initialize a record accessed file object. """ + if _debug: + LocalRecordAccessFileObject._debug("__init__ %r", + kwargs, + ) + FileObject.__init__(self, + fileAccessMethod='recordAccess', + **kwargs + ) + + self._record_data = [ + ''.join(random.choice(string.ascii_letters) + for i in range(random.randint(10, 20))) + for j in range(random.randint(10, 20)) + ] + if _debug: LocalRecordAccessFileObject._debug(" - %d records", + len(self._record_data), + ) + + def __len__(self): + """ Return the number of records. """ + if _debug: LocalRecordAccessFileObject._debug("__len__") + + return len(self._record_data) + + def ReadFile(self, start_record, record_count): + """ Read a number of records starting at a specific record. """ + if _debug: LocalRecordAccessFileObject._debug("ReadFile %r %r", + start_record, record_count, + ) + + # end of file is true if last record is returned + end_of_file = (start_record+record_count) >= len(self._record_data) + + return end_of_file, \ + self._record_data[start_record:start_record + record_count] + + def WriteFile(self, start_record, record_count, record_data): + """ Write a number of records, starting at a specific record. """ + # check for append + if (start_record < 0): + start_record = len(self._record_data) + self._record_data.extend(record_data) + + # check to extend the file out to start_record records + elif (start_record > len(self._record_data)): + self._record_data.extend(['' for i in range(start_record - len(self._record_data))]) + start_record = len(self._record_data) + self._record_data.extend(record_data) + + # slice operation works for other cases + else: + self._record_data[start_record:start_record + record_count] = record_data + + # return where the 'writing' actually started + return start_record + +register_object_type(LocalRecordAccessFileObject) + +# +# Local Stream Access File Object Type +# + +@bacpypes_debugging +class LocalStreamAccessFileObject(FileObject): + + def __init__(self, **kwargs): + """ Initialize a stream accessed file object. """ + if _debug: + LocalStreamAccessFileObject._debug("__init__ %r", + kwargs, + ) + FileObject.__init__(self, + fileAccessMethod='streamAccess', + **kwargs + ) + + self._file_data = ''.join(random.choice(string.ascii_letters) + for i in range(random.randint(100, 200))) + if _debug: LocalRecordAccessFileObject._debug(" - %d octets", + len(self._file_data), + ) + + def __len__(self): + """ Return the number of octets in the file. """ + if _debug: LocalStreamAccessFileObject._debug("__len__") + + return len(self._file_data) + + def ReadFile(self, start_position, octet_count): + """ Read a chunk of data out of the file. """ + if _debug: LocalStreamAccessFileObject._debug("ReadFile %r %r", + start_position, octet_count, + ) + + # end of file is true if last record is returned + end_of_file = (start_position+octet_count) >= len(self._file_data) + + return end_of_file, \ + self._file_data[start_position:start_position + octet_count] + + def WriteFile(self, start_position, data): + """ Write a number of octets, starting at a specific offset. """ + # check for append + if (start_position < 0): + start_position = len(self._file_data) + self._file_data += data + + # check to extend the file out to start_record records + elif (start_position > len(self._file_data)): + self._file_data += '\0' * (start_position - len(self._file_data)) + start_position = len(self._file_data) + self._file_data += data + + # no slice assignment, strings are immutable + else: + data_len = len(data) + prechunk = self._file_data[:start_position] + postchunk = self._file_data[start_position + data_len:] + self._file_data = prechunk + data + postchunk + + # return where the 'writing' actually started + return start_position + +register_object_type(LocalStreamAccessFileObject) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + pss['atomicReadFile'] = 1 + pss['atomicWriteFile'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a sample application + this_application = BIPSimpleApplication(this_device, args.ini.address) + + # make a record access file, add to the device + f1 = LocalRecordAccessFileObject( + objectIdentifier=('file', 1), + objectName='RecordAccessFile1' + ) + _log.debug(" - f1: %r", f1) + this_application.add_object(f1) + + # make a stream access file, add to the device + f2 = LocalStreamAccessFileObject( + objectIdentifier=('file', 2), + objectName='StreamAccessFile2' + ) + _log.debug(" - f2: %r", f2) + this_application.add_object(f2) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/ReadWriteProperty.py b/samples/ReadWriteProperty.py new file mode 100755 index 0000000..87578b5 --- /dev/null +++ b/samples/ReadWriteProperty.py @@ -0,0 +1,258 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for commands. + +For 'read' commands it will create ReadPropertyRequest PDUs, then lines up the +coorresponding ReadPropertyACK and prints the value. For 'write' commands it +will create WritePropertyRequst PDUs and prints out a simple acknowledgement. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_object_class, get_datatype + +from bacpypes.apdu import Error, AbortPDU, SimpleAckPDU, \ + ReadPropertyRequest, ReadPropertyACK, WritePropertyRequest +from bacpypes.primitivedata import Null, Atomic, Integer, Unsigned, Real +from bacpypes.constructeddata import Array, Any +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# ReadPropertyApplication +# + +@bacpypes_debugging +class ReadPropertyApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: ReadPropertyApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: ReadPropertyApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: ReadPropertyApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + if isinstance(apdu, SimpleAckPDU): + sys.stdout.write("ack\n") + sys.stdout.flush() + + elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) + if _debug: ReadPropertyApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): + if apdu.propertyArrayIndex == 0: + value = apdu.propertyValue.cast_out(Unsigned) + else: + value = apdu.propertyValue.cast_out(datatype.subtype) + else: + value = apdu.propertyValue.cast_out(datatype) + if _debug: ReadPropertyApplication._debug(" - value: %r", value) + + sys.stdout.write(str(value) + '\n') + sys.stdout.flush() + +# +# ReadWritePropertyConsoleCmd +# + +@bacpypes_debugging +class ReadWritePropertyConsoleCmd(ConsoleCmd): + + def do_read(self, args): + """read [ ]""" + args = args.split() + if _debug: ReadWritePropertyConsoleCmd._debug("do_read %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type): + raise ValueError, "unknown object type" + + obj_inst = int(obj_inst) + + datatype = get_datatype(obj_type, prop_id) + if not datatype: + raise ValueError, "invalid property for object type" + + # build a request + request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + request.pduDestination = Address(addr) + + if len(args) == 5: + request.propertyArrayIndex = int(args[4]) + if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadWritePropertyConsoleCmd._exception("exception: %r", e) + + def do_write(self, args): + """write [ ] [ ]""" + args = args.split() + ReadWritePropertyConsoleCmd._debug("do_write %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + if obj_type.isdigit(): + obj_type = int(obj_type) + obj_inst = int(obj_inst) + value = args[4] + + indx = None + if len(args) >= 6: + if args[5] != "-": + indx = int(args[5]) + if _debug: ReadWritePropertyConsoleCmd._debug(" - indx: %r", indx) + + priority = None + if len(args) >= 7: + priority = int(args[6]) + if _debug: ReadWritePropertyConsoleCmd._debug(" - priority: %r", priority) + + # get the datatype + datatype = get_datatype(obj_type, prop_id) + if _debug: ReadWritePropertyConsoleCmd._debug(" - datatype: %r", datatype) + + # change atomic values into something encodeable, null is a special case + if (value == 'null'): + value = Null() + elif issubclass(datatype, Atomic): + if datatype is Integer: + value = int(value) + elif datatype is Real: + value = float(value) + elif datatype is Unsigned: + value = int(value) + value = datatype(value) + elif issubclass(datatype, Array) and (indx is not None): + if indx == 0: + value = Integer(value) + elif issubclass(datatype.subtype, Atomic): + value = datatype.subtype(value) + elif not isinstance(value, datatype.subtype): + raise TypeError, "invalid result datatype, expecting %s" % (datatype.subtype.__name__,) + elif not isinstance(value, datatype): + raise TypeError, "invalid result datatype, expecting %s" % (datatype.__name__,) + if _debug: ReadWritePropertyConsoleCmd._debug(" - encodeable value: %r %s", value, type(value)) + + # build a request + request = WritePropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id + ) + request.pduDestination = Address(addr) + + # save the value + request.propertyValue = Any() + try: + request.propertyValue.cast_in(value) + except Exception, e: + ReadWritePropertyConsoleCmd._exception("WriteProperty cast error: %r", e) + + # optional array index + if indx is not None: + request.propertyArrayIndex = indx + + # optional priority + if priority is not None: + request.priority = priority + + if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadWritePropertyConsoleCmd._exception("exception: %r", e) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadPropertyApplication(this_device, args.ini.address) + this_console = ReadWritePropertyConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/RecurringMultipleReadProperty.py b/samples/RecurringMultipleReadProperty.py new file mode 100644 index 0000000..e1c24ae --- /dev/null +++ b/samples/RecurringMultipleReadProperty.py @@ -0,0 +1,198 @@ +#!/usr/bin/python + +""" +Mutliple Read Property + +This application has a static list of points that it would like to read. It reads the +values of each of them in turn and then quits. +""" + +from collections import deque + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run, deferred +from bacpypes.task import RecurringTask + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_datatype + +from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK +from bacpypes.primitivedata import Unsigned +from bacpypes.constructeddata import Array +from bacpypes.basetypes import ServicesSupported + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# point list +point_list = [ + ('1.2.3.4', 'analogValue', 1, 'presentValue'), + ('1.2.3.4', 'analogValue', 2, 'presentValue'), + ] + +# +# PrairieDog +# + +@bacpypes_debugging +class PrairieDog(BIPSimpleApplication, RecurringTask): + + def __init__(self, interval, *args): + if _debug: PrairieDog._debug("__init__ %r, %r", interval, args) + BIPSimpleApplication.__init__(self, *args) + RecurringTask.__init__(self, interval * 1000) + + # keep track of requests to line up responses + self._request = None + + # start out idle + self.is_busy = False + self.point_queue = deque() + self.response_values = [] + + # install it + self.install_task() + + def process_task(self): + if _debug: PrairieDog._debug("process_task") + global point_list + + # check to see if we're idle + if self.is_busy: + if _debug: PrairieDog._debug(" - busy") + return + + # now we are busy + self.is_busy = True + + # turn the point list into a queue + self.point_queue = deque(point_list) + + # clean out the list of the response values + self.response_values = [] + + # fire off the next request + self.next_request() + + def next_request(self): + if _debug: PrairieDog._debug("next_request") + + # check to see if we're done + if not self.point_queue: + if _debug: PrairieDog._debug(" - done") + + # dump out the results + for request, response in zip(point_list, self.response_values): + print request, response + + # no longer busy + self.is_busy = False + + return + + # get the next request + addr, obj_type, obj_inst, prop_id = self.point_queue.popleft() + + # build a request + self._request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + self._request.pduDestination = Address(addr) + if _debug: PrairieDog._debug(" - request: %r", self._request) + + # forward it along + BIPSimpleApplication.request(self, self._request) + + def confirmation(self, apdu): + if _debug: PrairieDog._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + if _debug: PrairieDog._debug(" - error: %r", apdu) + self.response_values.append(apdu) + + elif isinstance(apdu, AbortPDU): + if _debug: PrairieDog._debug(" - abort: %r", apdu) + self.response_values.append(apdu) + + elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) + if _debug: PrairieDog._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): + if apdu.propertyArrayIndex == 0: + value = apdu.propertyValue.cast_out(Unsigned) + else: + value = apdu.propertyValue.cast_out(datatype.subtype) + else: + value = apdu.propertyValue.cast_out(datatype) + if _debug: PrairieDog._debug(" - value: %r", value) + + # save the value + self.response_values.append(value) + + # fire off another request + deferred(self.next_request) + +# +# __main__ +# + +try: + # parse the command line arguments + parser = ConfigArgumentParser(description=__doc__) + + # add an argument for interval + parser.add_argument('interval', type=int, + help='repeat rate in seconds', + ) + + # now parse the arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a dog + this_application = PrairieDog(args.interval, this_device, args.ini.address) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/RecurringTask.py b/samples/RecurringTask.py new file mode 100755 index 0000000..fafb56c --- /dev/null +++ b/samples/RecurringTask.py @@ -0,0 +1,74 @@ +#!/usr/bin/python + +""" +This application demonstrates doing something at a regular interval. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run +from bacpypes.task import RecurringTask + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# +# PrairieDog +# + +@bacpypes_debugging +class PrairieDog(RecurringTask): + + def __init__(self, dog_number, interval): + if _debug: PrairieDog._debug("__init__ %r %r", dog_number, interval) + + # save the identity + self.dog_number = dog_number + + # this is a recurring task + RecurringTask.__init__(self, interval) + + # install it + self.install_task() + + def process_task(self): + if _debug: PrairieDog._debug("process_task") + + sys.stdout.write("%d woof!\n" % (self.dog_number,)) + +# +# __main__ +# + +try: + # parse the command line arguments + parser = ConfigArgumentParser(description=__doc__) + + # add an argument for seconds per dog + parser.add_argument('seconds', metavar='N', type=int, nargs='+', + help='number of seconds for each dog', + ) + + # now parse the arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make some dogs + for i, sec in enumerate(args.seconds): + dog = PrairieDog(i, sec * 1000) + if _debug: _log.debug(" - dog: %r", dog) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/SampleApplication.py b/samples/SampleApplication.py new file mode 100755 index 0000000..f4ca085 --- /dev/null +++ b/samples/SampleApplication.py @@ -0,0 +1,92 @@ +#!/usr/bin/python + +""" +Sample Application +================== + +This sample application is the simplest BACpypes application that is +a complete stack. Using an INI file it will configure a +LocalDeviceObject, create a SampleApplication instance, and run, +waiting for a keyboard interrupt or a TERM signal to quit. + +There is no input or output for this application, but by adding --debug to +the command line when it is run you can check the behavior of the stack by +seeing what is sent and received. +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# +# SampleApplication +# + +class SampleApplication(BIPSimpleApplication): + + def __init__(self, device, address): + if _debug: SampleApplication._debug("__init__ %r %r", device, address) + BIPSimpleApplication.__init__(self, device, address) + + def request(self, apdu): + if _debug: SampleApplication._debug("request %r", apdu) + BIPSimpleApplication.request(self, apdu) + + def indication(self, apdu): + if _debug: SampleApplication._debug("indication %r", apdu) + BIPSimpleApplication.indication(self, apdu) + + def response(self, apdu): + if _debug: SampleApplication._debug("response %r", apdu) + BIPSimpleApplication.response(self, apdu) + + def confirmation(self, apdu): + if _debug: SampleApplication._debug("confirmation %r", apdu) + BIPSimpleApplication.confirmation(self, apdu) + +bacpypes_debugging(SampleApplication) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + vendorName="Hello", + ) + + # make a sample application + this_application = SampleApplication(this_device, args.ini.address) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/SampleConsoleCmd.py b/samples/SampleConsoleCmd.py new file mode 100755 index 0000000..48d10ad --- /dev/null +++ b/samples/SampleConsoleCmd.py @@ -0,0 +1,101 @@ +#!/usr/bin/python + +""" +This sample application is a simple BACpypes application that +presents a console prompt. Almost identical to the SampleApplication, +the BACnet application is minimal, but with the console commands +that match the command line options like 'buggers' and 'debug' the +user can add debugging "on the fly". +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# SampleApplication +# + +class SampleApplication(BIPSimpleApplication): + + def __init__(self, device, address): + if _debug: SampleApplication._debug("__init__ %r %r", device, address) + BIPSimpleApplication.__init__(self, device, address) + + def request(self, apdu): + if _debug: SampleApplication._debug("request %r", apdu) + BIPSimpleApplication.request(self, apdu) + + def indication(self, apdu): + if _debug: SampleApplication._debug("indication %r", apdu) + BIPSimpleApplication.indication(self, apdu) + + def response(self, apdu): + if _debug: SampleApplication._debug("response %r", apdu) + BIPSimpleApplication.response(self, apdu) + + def confirmation(self, apdu): + if _debug: SampleApplication._debug("confirmation %r", apdu) + BIPSimpleApplication.confirmation(self, apdu) + +bacpypes_debugging(SampleApplication) + +# +# SampleConsoleCmd +# + +class SampleConsoleCmd(ConsoleCmd): + + def do_nothing(self, args): + """nothing can be done""" + args = args.split() + if _debug: SampleConsoleCmd._debug("do_nothing %r", args) + +bacpypes_debugging(SampleConsoleCmd) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a sample application + this_application = SampleApplication(this_device, args.ini.address) + this_console = SampleConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/VendorAVObject.py b/samples/VendorAVObject.py new file mode 100755 index 0000000..974c704 --- /dev/null +++ b/samples/VendorAVObject.py @@ -0,0 +1,135 @@ +#!/usr/bin/python + +""" +This sample application shows how to extend one of the basic objects, an Analog +Value Object in this case, to provide a present value. This type of code is used +when the application is providing a BACnet interface to a collection of data. +It assumes that almost all of the default behaviour of a BACpypes application is +sufficient. +""" + +import random + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.primitivedata import Real +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import AnalogValueObject, Property, register_object_type +from bacpypes.errors import ExecutionError + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +vendor_id = 999 + +# +# RandomValueProperty +# + +@bacpypes_debugging +class RandomValueProperty(Property): + + def __init__(self, identifier): + if _debug: RandomValueProperty._debug("__init__ %r", identifier) + Property.__init__(self, identifier, Real, default=None, optional=True, mutable=False) + + # writing to this property changes the multiplier + self.multiplier = 100.0 + + def ReadProperty(self, obj, arrayIndex=None): + if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex) + + # access an array + if arrayIndex is not None: + raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray') + + # return a random value + value = random.random() * self.multiplier + if _debug: RandomValueProperty._debug(" - value: %r", value) + + return value + + def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False): + if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct) + + # change the multiplier + self.multiplier = value + +# +# Vendor Analog Value Object Type +# + +@bacpypes_debugging +class VendorAVObject(AnalogValueObject): + objectType = 513 + + properties = [ + RandomValueProperty(5504), + ] + + def __init__(self, **kwargs): + if _debug: VendorAVObject._debug("__init__ %r", kwargs) + AnalogValueObject.__init__(self, **kwargs) + +register_object_type(VendorAVObject, vendor_id=vendor_id) + +# +# main +# + +@bacpypes_debugging +def main(): + if _debug: main._debug("initialization") + + try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: main._debug("initialization") + if _debug: main._debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=vendor_id, + ) + + # make a sample application + this_application = BIPSimpleApplication(this_device, args.ini.address) + + # make some objects + ravo1 = VendorAVObject( + objectIdentifier=(513, 1), objectName='Random1' + ) + if _debug: main._debug(" - ravo1: %r", ravo1) + + ravo2 = VendorAVObject( + objectIdentifier=(513, 2), objectName='Random2' + ) + if _debug: main._debug(" - ravo2: %r", ravo2) + + # add it to the device + this_application.add_object(ravo1) + this_application.add_object(ravo2) + if _debug: main._debug(" - object list: %r", this_device.objectList) + + if _debug: main._debug("running") + + run() + + except Exception, e: + main._exception("an error has occurred: %s", e) + finally: + if _debug: main._debug("finally") + +if __name__ == '__main__': + main() + diff --git a/samples/VendorReadWriteProperty.py b/samples/VendorReadWriteProperty.py new file mode 100755 index 0000000..3489526 --- /dev/null +++ b/samples/VendorReadWriteProperty.py @@ -0,0 +1,285 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for commands. + +For 'read' commands it will create ReadPropertyRequest PDUs, then lines up the +coorresponding ReadPropertyACK and prints the value. For 'write' commands it +will create WritePropertyRequst PDUs and prints out a simple acknowledgement. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication +from bacpypes.object import get_object_class, get_datatype + +from bacpypes.apdu import Error, AbortPDU, SimpleAckPDU, \ + ReadPropertyRequest, ReadPropertyACK, WritePropertyRequest +from bacpypes.primitivedata import Null, Atomic, Integer, Unsigned, Real +from bacpypes.constructeddata import Array, Any +from bacpypes.basetypes import ServicesSupported + +import VendorAVObject + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# +# ReadPropertyApplication +# + +@bacpypes_debugging +class ReadPropertyApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: ReadPropertyApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: ReadPropertyApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: ReadPropertyApplication._debug("confirmation %r", apdu) + + if isinstance(apdu, Error): + sys.stdout.write("error: %s\n" % (apdu.errorCode,)) + sys.stdout.flush() + + elif isinstance(apdu, AbortPDU): + apdu.debug_contents() + + if isinstance(apdu, SimpleAckPDU): + sys.stdout.write("ack\n") + sys.stdout.flush() + + elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)): + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier, VendorAVObject.vendor_id) + if _debug: ReadPropertyApplication._debug(" - datatype: %r", datatype) + if not datatype: + raise TypeError, "unknown datatype" + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): + if apdu.propertyArrayIndex == 0: + value = apdu.propertyValue.cast_out(Unsigned) + else: + value = apdu.propertyValue.cast_out(datatype.subtype) + else: + value = apdu.propertyValue.cast_out(datatype) + if _debug: ReadPropertyApplication._debug(" - value: %r", value) + + sys.stdout.write(str(value) + '\n') + sys.stdout.flush() + +# +# ReadWritePropertyConsoleCmd +# + +@bacpypes_debugging +class ReadWritePropertyConsoleCmd(ConsoleCmd): + + def do_read(self, args): + """read [ ]""" + global this_application + + args = args.split() + if _debug: ReadWritePropertyConsoleCmd._debug("do_read %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type, VendorAVObject.vendor_id): + raise ValueError, "unknown object type" + if _debug: ReadWritePropertyConsoleCmd._debug(" - obj_type: %r", obj_type) + + obj_inst = int(obj_inst) + if _debug: ReadWritePropertyConsoleCmd._debug(" - obj_inst: %r", obj_inst) + + if prop_id.isdigit(): + prop_id = int(prop_id) + if _debug: ReadWritePropertyConsoleCmd._debug(" - prop_id: %r", prop_id) + + datatype = get_datatype(obj_type, prop_id, VendorAVObject.vendor_id) + if not datatype: + raise ValueError, "invalid property for object type" + + # build a request + request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + ) + request.pduDestination = Address(addr) + + if len(args) == 5: + request.propertyArrayIndex = int(args[4]) + if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadWritePropertyConsoleCmd._exception("exception: %r", e) + + def do_write(self, args): + """write [ ] [ ]""" + global this_application + + args = args.split() + ReadWritePropertyConsoleCmd._debug("do_write %r", args) + + try: + addr, obj_type, obj_inst, prop_id = args[:4] + + if obj_type.isdigit(): + obj_type = int(obj_type) + elif not get_object_class(obj_type, VendorAVObject.vendor_id): + raise ValueError, "unknown object type" + if _debug: ReadWritePropertyConsoleCmd._debug(" - obj_type: %r", obj_type) + + obj_inst = int(obj_inst) + if _debug: ReadWritePropertyConsoleCmd._debug(" - obj_inst: %r", obj_inst) + + if prop_id.isdigit(): + prop_id = int(prop_id) + if _debug: ReadWritePropertyConsoleCmd._debug(" - prop_id: %r", prop_id) + + value = args[4] + + indx = None + if len(args) >= 6: + if args[5] != "-": + indx = int(args[5]) + if _debug: ReadWritePropertyConsoleCmd._debug(" - indx: %r", indx) + + priority = None + if len(args) >= 7: + priority = int(args[6]) + if _debug: ReadWritePropertyConsoleCmd._debug(" - priority: %r", priority) + + # get the datatype + datatype = get_datatype(obj_type, prop_id, VendorAVObject.vendor_id) + if _debug: ReadWritePropertyConsoleCmd._debug(" - datatype: %r", datatype) + + # change atomic values into something encodeable, null is a special case + if (value == 'null'): + value = Null() + elif issubclass(datatype, Atomic): + if datatype is Integer: + value = int(value) + elif datatype is Real: + value = float(value) + elif datatype is Unsigned: + value = int(value) + value = datatype(value) + elif issubclass(datatype, Array) and (indx is not None): + if indx == 0: + value = Integer(value) + elif issubclass(datatype.subtype, Atomic): + value = datatype.subtype(value) + elif not isinstance(value, datatype.subtype): + raise TypeError, "invalid result datatype, expecting %s" % (datatype.subtype.__name__,) + elif not isinstance(value, datatype): + raise TypeError, "invalid result datatype, expecting %s" % (datatype.__name__,) + if _debug: ReadWritePropertyConsoleCmd._debug(" - encodeable value: %r %s", value, type(value)) + + # build a request + request = WritePropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id + ) + request.pduDestination = Address(addr) + + # save the value + request.propertyValue = Any() + try: + request.propertyValue.cast_in(value) + except Exception, e: + ReadWritePropertyConsoleCmd._exception("WriteProperty cast error: %r", e) + + # optional array index + if indx is not None: + request.propertyArrayIndex = indx + + # optional priority + if priority is not None: + request.priority = priority + + if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + ReadWritePropertyConsoleCmd._exception("exception: %r", e) + +# +# main +# + +@bacpypes_debugging +def main(): + if _debug: main._debug("initialization") + global this_application + + try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: main._debug("initialization") + if _debug: main._debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = ReadPropertyApplication(this_device, args.ini.address) + this_console = ReadWritePropertyConsoleCmd() + + main._debug("running") + + run() + + except Exception, e: + main._exception("an error has occurred: %s", e) + finally: + main._debug("finally") + +if __name__ == '__main__': + main() + diff --git a/samples/WhoHasIHaveApplication.py b/samples/WhoHasIHaveApplication.py new file mode 100755 index 0000000..a02e2b4 --- /dev/null +++ b/samples/WhoHasIHaveApplication.py @@ -0,0 +1,116 @@ +#!/usr/bin/python + +""" +This sample application builds on the first sample by overriding the default +processing for Who-Has and I-Have requests, counting them, then continuing on +with the regular processing. After the run() function has completed it will +dump a formatted summary of the requests it has received. Note that these +services are relatively rare even in large networks. +""" + +from collections import defaultdict + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# counters +who_has_counter = defaultdict(int) +i_have_counter = defaultdict(int) + +# +# WhoHasIHaveApplication +# + +@bacpypes_debugging +class WhoHasIHaveApplication(BIPSimpleApplication): + + def __init__(self, device, address): + if _debug: WhoHasIHaveApplication._debug("__init__ %r %r", device, address) + BIPSimpleApplication.__init__(self, device, address) + + def do_WhoHasRequest(self, apdu): + """Respond to a Who-Has request.""" + if _debug: WhoHasIHaveApplication._debug("do_WhoHasRequest, %r", apdu) + + key = (str(apdu.pduSource),) + if apdu.object.objectIdentifier is not None: + key += (str(apdu.object.objectIdentifier),) + elif apdu.object.objectName is not None: + key += (apdu.object.objectName,) + else: + print "(rejected APDU:" + apdu.debug_contents() + print ")" + return + + # count the times this has been received + who_has_counter[key] += 1 + + def do_IHaveRequest(self, apdu): + """Respond to a I-Have request.""" + if _debug: WhoHasIHaveApplication._debug("do_IHaveRequest %r", apdu) + + key = ( + str(apdu.pduSource), + str(apdu.deviceIdentifier), + str(apdu.objectIdentifier), + apdu.objectName + ) + + # count the times this has been received + i_have_counter[key] += 1 + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a sample application + this_application = WhoHasIHaveApplication(this_device, args.ini.address) + + _log.debug("running") + + # run until stopped, ^C works + run() + + print "----- Who Has -----" + for (src, objname), count in sorted(who_has_counter.items()): + print "%-20s %-30s %4d" % (src, objname, count) + print + + print "----- I Have -----" + for (src, devid, objid, objname), count in sorted(i_have_counter.items()): + print "%-20s %-20s %-20s %-20s %4d" % (src, devid, objid, objname, count) + print + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/WhoIsIAm.py b/samples/WhoIsIAm.py new file mode 100755 index 0000000..ad5dd4f --- /dev/null +++ b/samples/WhoIsIAm.py @@ -0,0 +1,187 @@ +#!/usr/bin/python + +""" +This application presents a 'console' prompt to the user asking for Who-Is and I-Am +commands which create the related APDUs, then lines up the coorresponding I-Am +for incoming traffic and prints out the contents. +""" + +import sys + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address, GlobalBroadcast +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication + +from bacpypes.apdu import WhoIsRequest, IAmRequest +from bacpypes.basetypes import ServicesSupported +from bacpypes.errors import DecodingError + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None +this_console = None + +# +# WhoIsIAmApplication +# + +class WhoIsIAmApplication(BIPSimpleApplication): + + def __init__(self, *args): + if _debug: WhoIsIAmApplication._debug("__init__ %r", args) + BIPSimpleApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, apdu): + if _debug: WhoIsIAmApplication._debug("request %r", apdu) + + # save a copy of the request + self._request = apdu + + # forward it along + BIPSimpleApplication.request(self, apdu) + + def confirmation(self, apdu): + if _debug: WhoIsIAmApplication._debug("confirmation %r", apdu) + + # forward it along + BIPSimpleApplication.confirmation(self, apdu) + + def indication(self, apdu): + if _debug: WhoIsIAmApplication._debug("indication %r", apdu) + + if (isinstance(self._request, WhoIsRequest)) and (isinstance(apdu, IAmRequest)): + device_type, device_instance = apdu.iAmDeviceIdentifier + if device_type != 'device': + raise DecodingError, "invalid object type" + + if (self._request.deviceInstanceRangeLowLimit is not None) and \ + (device_instance < self._request.deviceInstanceRangeLowLimit): + pass + elif (self._request.deviceInstanceRangeHighLimit is not None) and \ + (device_instance > self._request.deviceInstanceRangeHighLimit): + pass + else: + # print out the contents + sys.stdout.write('pduSource = ' + repr(apdu.pduSource) + '\n') + sys.stdout.write('iAmDeviceIdentifier = ' + str(apdu.iAmDeviceIdentifier) + '\n') + sys.stdout.write('maxAPDULengthAccepted = ' + str(apdu.maxAPDULengthAccepted) + '\n') + sys.stdout.write('segmentationSupported = ' + str(apdu.segmentationSupported) + '\n') + sys.stdout.write('vendorID = ' + str(apdu.vendorID) + '\n') + sys.stdout.flush() + + # forward it along + BIPSimpleApplication.indication(self, apdu) + +bacpypes_debugging(WhoIsIAmApplication) + +# +# WhoIsIAmConsoleCmd +# + +class WhoIsIAmConsoleCmd(ConsoleCmd): + + def do_whois(self, args): + """whois [ ] [ ]""" + args = args.split() + if _debug: WhoIsIAmConsoleCmd._debug("do_whois %r", args) + + try: + # build a request + request = WhoIsRequest() + if (len(args) == 1) or (len(args) == 3): + request.pduDestination = Address(args[0]) + del args[0] + else: + request.pduDestination = GlobalBroadcast() + + if len(args) == 2: + request.deviceInstanceRangeLowLimit = int(args[0]) + request.deviceInstanceRangeHighLimit = int(args[1]) + if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + WhoIsIAmConsoleCmd._exception("exception: %r", e) + + def do_iam(self, args): + """iam""" + args = args.split() + if _debug: WhoIsIAmConsoleCmd._debug("do_iam %r", args) + + try: + # build a request + request = IAmRequest() + request.pduDestination = GlobalBroadcast() + + # set the parameters from the device object + request.iAmDeviceIdentifier = this_device.objectIdentifier + request.maxAPDULengthAccepted = this_device.maxApduLengthAccepted + request.segmentationSupported = this_device.segmentationSupported + request.vendorID = this_device.vendorIdentifier + if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request) + + # give it to the application + this_application.request(request) + + except Exception, e: + WhoIsIAmConsoleCmd._exception("exception: %r", e) + +bacpypes_debugging(WhoIsIAmConsoleCmd) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # build a bit string that knows about the bit names + pss = ServicesSupported() + pss['whoIs'] = 1 + pss['iAm'] = 1 + pss['readProperty'] = 1 + pss['writeProperty'] = 1 + + # set the property value to be just the bits + this_device.protocolServicesSupported = pss.value + + # make a simple application + this_application = WhoIsIAmApplication(this_device, args.ini.address) + this_console = WhoIsIAmConsoleCmd() + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") + diff --git a/samples/WhoIsIAmApplication.py b/samples/WhoIsIAmApplication.py new file mode 100755 index 0000000..c795f8c --- /dev/null +++ b/samples/WhoIsIAmApplication.py @@ -0,0 +1,112 @@ +#!/usr/bin/python + +""" +This sample application builds on the first sample by overriding the default +processing for Who-Is and I-Am requests, counting them, then continuing on +with the regular processing. After the run() function has completed it will +dump a formatted summary of the requests it has received. +""" + +from collections import defaultdict + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser + +from bacpypes.core import run + +from bacpypes.app import LocalDeviceObject, BIPSimpleApplication + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_device = None +this_application = None + +# counters +who_is_counter = defaultdict(int) +i_am_counter = defaultdict(int) + +# +# WhoIsIAmApplication +# + +@bacpypes_debugging +class WhoIsIAmApplication(BIPSimpleApplication): + + def __init__(self, device, address): + if _debug: WhoIsIAmApplication._debug("__init__ %r %r", device, address) + BIPSimpleApplication.__init__(self, device, address) + + def do_WhoIsRequest(self, apdu): + """Respond to a Who-Is request.""" + if _debug: WhoIsIAmApplication._debug("do_WhoIsRequest %r", apdu) + + # build a key from the source and parameters + key = (str(apdu.pduSource), + apdu.deviceInstanceRangeLowLimit, + apdu.deviceInstanceRangeHighLimit, + ) + + # count the times this has been received + who_is_counter[key] += 1 + + # pass back to the default implementation + BIPSimpleApplication.do_WhoIsRequest(self, apdu) + + def do_IAmRequest(self, apdu): + """Given an I-Am request, cache it.""" + if _debug: WhoIsIAmApplication._debug("do_IAmRequest %r", apdu) + + # build a key from the source, just use the instance number + key = (str(apdu.pduSource), + apdu.iAmDeviceIdentifier[1], + ) + + # count the times this has been received + i_am_counter[key] += 1 + + # no default implementation + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a device object + this_device = LocalDeviceObject( + objectName=args.ini.objectname, + objectIdentifier=int(args.ini.objectidentifier), + maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted), + segmentationSupported=args.ini.segmentationsupported, + vendorIdentifier=int(args.ini.vendoridentifier), + ) + + # make a sample application + this_application = WhoIsIAmApplication(this_device, args.ini.address) + + _log.debug("running") + + run() + + print "----- Who Is -----" + for (src, lowlim, hilim), count in sorted(who_is_counter.items()): + print "%-20s %8s %8s %4d" % (src, lowlim, hilim, count) + print + + print "----- I Am -----" + for (src, devid), count in sorted(i_am_counter.items()): + print "%-20s %8d %4d" % (src, devid, count) + print + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally") diff --git a/samples/WhoIsRouter.py b/samples/WhoIsRouter.py new file mode 100755 index 0000000..9759431 --- /dev/null +++ b/samples/WhoIsRouter.py @@ -0,0 +1,128 @@ +#!/usr/bin/python + +""" +This sample application has just a network stack, not a full application, +and is a way to create InitializeRoutingTable and WhoIsRouterToNetwork requests. +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ConfigArgumentParser +from bacpypes.consolecmd import ConsoleCmd + +from bacpypes.core import run + +from bacpypes.pdu import Address +from bacpypes.npdu import InitializeRoutingTable, WhoIsRouterToNetwork +from bacpypes.app import BIPNetworkApplication + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_application = None +this_console = None + +# +# WhoIsRouterApplication +# + +@bacpypes_debugging +class WhoIsRouterApplication(BIPNetworkApplication): + + def __init__(self, *args): + if _debug: WhoIsRouterApplication._debug("__init__ %r", args) + BIPNetworkApplication.__init__(self, *args) + + # keep track of requests to line up responses + self._request = None + + def request(self, adapter, npdu): + if _debug: WhoIsRouterApplication._debug("request %r %r", adapter, npdu) + + # save a copy of the request + self._request = npdu + + # forward it along + BIPNetworkApplication.request(self, adapter, npdu) + + def indication(self, adapter, npdu): + if _debug: WhoIsRouterApplication._debug("indication %r %r", adapter, npdu) + BIPNetworkApplication.indication(self, adapter, npdu) + + def response(self, adapter, npdu): + if _debug: WhoIsRouterApplication._debug("response %r %r", adapter, npdu) + BIPNetworkApplication.response(self, adapter, npdu) + + def confirmation(self, adapter, npdu): + if _debug: WhoIsRouterApplication._debug("confirmation %r %r", adapter, npdu) + BIPNetworkApplication.confirmation(self, adapter, npdu) + +# +# WhoIsRouterConsoleCmd +# + +@bacpypes_debugging +class WhoIsRouterConsoleCmd(ConsoleCmd): + + def do_irt(self, args): + """irt """ + args = args.split() + if _debug: WhoIsRouterConsoleCmd._debug("do_irt %r", args) + + # build a request + try: + request = InitializeRoutingTable() + request.pduDestination = Address(args[0]) + except: + print "invalid arguments" + return + + # give it to the application + this_application.request(this_application.nsap.adapters[0], request) + + def do_wirtn(self, args): + """wirtn [ ]""" + args = args.split() + if _debug: WhoIsRouterConsoleCmd._debug("do_irt %r", args) + + # build a request + try: + request = WhoIsRouterToNetwork() + request.pduDestination = Address(args[0]) + if (len(args) > 1): + request.wirtnNetwork = int(args[1]) + except: + print "invalid arguments" + return + + # give it to the application + this_application.request(this_application.nsap.adapters[0], request) + +# +# __main__ +# + +try: + # parse the command line arguments + args = ConfigArgumentParser(description=__doc__).parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + # make a simple application + this_application = WhoIsRouterApplication(args.ini.address) + if _debug: _log.debug(" - this_application: %r", this_application) + + # make a console + this_console = WhoIsRouterConsoleCmd() + if _debug: _log.debug(" - this_console: %r", this_console) + + _log.debug("running") + + run() + +except Exception, e: + _log.exception("an error has occurred: %s", e) +finally: + _log.debug("finally")