1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

pulling in the samples from the master branch to stage prior to working #54

This commit is contained in:
Joel Bender 2015-09-14 20:44:50 -04:00
commit 61a6262ac4
24 changed files with 3953 additions and 0 deletions

11
BACpypes~.ini Normal file
View File

@ -0,0 +1,11 @@
[BACpypes]
objectName: Betelgeuse
address: 128.253.109.40/24
objectIdentifier: 599
maxApduLengthAccepted: 1024
segmentationSupported: segmentedBoth
vendorIdentifier: 15
foreignPort: 0
foreignBBMD: 128.253.109.254
foreignTTL: 30

200
samples/CommandableMixin.py Executable file
View File

@ -0,0 +1,200 @@
#!/usr/bin/python
"""
This sample application demonstrates a mix-in class for commandable properties
(not useful for Binary Out or Binary Value objects that have a minimum on and off
time, or for Channel objects).
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.errors import ExecutionError
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import AnalogValueObject, DateValueObject
from bacpypes.primitivedata import Null
from bacpypes.basetypes import PriorityValue, PriorityArray
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
#
# CommandableMixin
#
@bacpypes_debugging
class CommandableMixin(object):
def __init__(self, init_value, **kwargs):
if _debug: CommandableMixin._debug("__init__ %r, %r", init_value, kwargs)
super(CommandableMixin, self).__init__(**kwargs)
# if no present value given, give it the default value
if ('presentValue' not in kwargs):
if _debug: CommandableMixin._debug(" - initialize present value")
self.presentValue = init_value
# if no priority array given, give it an empty one
if ('priorityArray' not in kwargs):
if _debug: CommandableMixin._debug(" - initialize priority array")
self.priorityArray = PriorityArray()
for i in range(16):
self.priorityArray.append(PriorityValue(null=Null()))
# if no relinquish default value given, give it the default value
if ('relinquishDefault' not in kwargs):
if _debug: CommandableMixin._debug(" - initialize relinquish default")
self.relinquishDefault = init_value
# capture the present value property
self._pv = self._properties['presentValue']
if _debug: CommandableMixin._debug(" - _pv: %r", self._pv)
# capture the datatype
self._pv_datatype = self._pv.datatype
if _debug: CommandableMixin._debug(" - _pv_datatype: %r", self._pv_datatype)
# look up a matching priority value choice
for element in PriorityValue.choiceElements:
if element.klass is self._pv_datatype:
self._pv_choice = element.name
break
else:
self._pv_choice = 'constructedValue'
if _debug: CommandableMixin._debug(" - _pv_choice: %r", self._pv_choice)
def WriteProperty(self, property, value, arrayIndex=None, priority=None, direct=False):
if _debug: CommandableMixin._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", property, value, arrayIndex, priority, direct)
# when writing to the presentValue with a priority
if (property == 'presentValue'):
# default (lowest) priority
if priority is None:
priority = 16
if _debug: CommandableMixin._debug(" - translate to array index %d", priority)
# translate to updating the priority array
property = 'priorityArray'
arrayIndex = priority
priority = None
# update the priority array entry
if (property == 'priorityArray') and (arrayIndex is not None):
# check the bounds
if arrayIndex == 0:
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
if (arrayIndex < 1) or (arrayIndex > 16):
raise ExecutionError(errorClass='property', errorCode='invalidArrayIndex')
# update the specific priorty value element
priority_value = self.priorityArray[arrayIndex]
if _debug: CommandableMixin._debug(" - priority_value: %r", priority_value)
# the null or the choice has to be set, the other clear
if value is ():
if _debug: CommandableMixin._debug(" - write a null")
priority_value.null = value
setattr(priority_value, self._pv_choice, None)
else:
if _debug: CommandableMixin._debug(" - write a value")
priority_value.null = None
setattr(priority_value, self._pv_choice, value)
# look for the highest priority value
for i in range(1, 17):
priority_value = self.priorityArray[i]
if priority_value.null is None:
if (i < arrayIndex):
if _debug: CommandableMixin._debug(" - existing higher priority value")
return
value = getattr(priority_value, self._pv_choice)
break
else:
value = self.relinquishDefault
if _debug: CommandableMixin._debug(" - new present value: %r", value)
property = 'presentValue'
arrayIndex = priority = None
# allow the request to pass through
if _debug: CommandableMixin._debug(" - super: %r %r arrayIndex=%r priority=%r", property, value, arrayIndex, priority)
super(CommandableMixin, self).WriteProperty(
property, value,
arrayIndex=arrayIndex, priority=priority, direct=direct,
)
#
# CommandableAnalogValueObject
#
@bacpypes_debugging
class CommandableAnalogValueObject(CommandableMixin, AnalogValueObject):
def __init__(self, **kwargs):
if _debug: CommandableAnalogValueObject._debug("__init__ %r", kwargs)
CommandableMixin.__init__(self, 0.0, **kwargs)
#
# CommandableDateValueObject
#
@bacpypes_debugging
class CommandableDateValueObject(CommandableMixin, DateValueObject):
def __init__(self, **kwargs):
if _debug: CommandableDateValueObject._debug("__init__ %r", kwargs)
CommandableMixin.__init__(self, False, **kwargs)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a sample application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make a commandable analog value object, add to the device
cavo1 = CommandableAnalogValueObject(
objectIdentifier=('analogValue', 1), objectName='Commandable AV 1'
)
if _debug: _log.debug(" - cavo1: %r", cavo1)
this_application.add_object(cavo1)
# make a commandable binary value object, add to the device
cdvo2 = CommandableDateValueObject(
objectIdentifier=('dateValue', 1), objectName='Commandable2'
)
if _debug: _log.debug(" - cdvo2: %r", cdvo2)
this_application.add_object(cdvo2)
if _debug: _log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
if _debug: _log.debug("finally")

125
samples/IP2IPRouter.py Executable file
View File

@ -0,0 +1,125 @@
#!/usr/bin/python
"""
This sample application presents itself as a router between two
IP networks. This application can run on a single homed machine
by using the same IP address and two different port numbers, or
to be closer to what is typically considered a router, on a
multihomed machine using two different IP addresses and the same
port number.
$ python IP2IPRtouer.py addr1 net1 addr2 net2
addr1 - local address like 192.168.1.2/24:47808
net1 - network number
addr2 - local address like 192.168.3.4/24:47809
net2 - network number
As a router, this does not have an application layer.
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ArgumentParser
from bacpypes.core import run
from bacpypes.comm import bind
from bacpypes.pdu import Address
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement
from bacpypes.bvllservice import BIPSimple, AnnexJCodec, UDPMultiplexer
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# IP2IPRouter
#
@bacpypes_debugging
class IP2IPRouter:
def __init__(self, addr1, net1, addr2, net2):
if _debug: IP2IPRouter._debug("__init__ %r %r %r %r", addr1, net1, addr2, net2)
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
#== First stack
# create a generic BIP stack, bound to the Annex J server
# on the UDP multiplexer
self.s1_bip = BIPSimple()
self.s1_annexj = AnnexJCodec()
self.s1_mux = UDPMultiplexer(addr1)
# bind the bottom layers
bind(self.s1_bip, self.s1_annexj, self.s1_mux.annexJ)
# bind the BIP stack to the local network
self.nsap.bind(self.s1_bip, net1, addr1)
#== Second stack
# create a generic BIP stack, bound to the Annex J server
# on the UDP multiplexer
self.s2_bip = BIPSimple()
self.s2_annexj = AnnexJCodec()
self.s2_mux = UDPMultiplexer(addr2)
# bind the bottom layers
bind(self.s2_bip, self.s2_annexj, self.s2_mux.annexJ)
# bind the BIP stack to the local network
self.nsap.bind(self.s2_bip, net2)
#
# __main__
#
try:
# parse the command line arguments
parser = ArgumentParser(description=__doc__)
# add an argument for interval
parser.add_argument('addr1', type=str,
help='address of first network',
)
# add an argument for interval
parser.add_argument('net1', type=int,
help='network number of first network',
)
# add an argument for interval
parser.add_argument('addr2', type=str,
help='address of second network',
)
# add an argument for interval
parser.add_argument('net2', type=int,
help='network number of second network',
)
# now parse the arguments
args = parser.parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# create the router
router = IP2IPRouter(Address(args.addr1), args.net1, Address(args.addr2), args.net2)
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

View File

@ -0,0 +1,69 @@
#!/usr/bin/python
"""
This sample application provides a single MultiState Value Object to test
reading and writing its various properties.
"""
from bacpypes.debugging import ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import MultiStateValueObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a sample application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make a multistate value object
msvo = MultiStateValueObject(
objectIdentifier=('multiStateValue', 1),
objectName='My Special Object',
presentValue=1,
numberOfStates=3,
stateText=['red', 'green', 'blue'],
)
_log.debug(" - msvo: %r", msvo)
# add it to the device
this_application.add_object(msvo)
_log.debug(" - object list: %r", this_device.objectList)
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

165
samples/MultipleReadProperty.py Executable file
View File

@ -0,0 +1,165 @@
#!/usr/bin/python
"""
Mutliple Read Property
This application has a static list of points that it would like to read. It reads the
values of each of them in turn and then quits.
"""
from collections import deque
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run, stop, deferred
from bacpypes.pdu import Address
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import get_datatype
from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK
from bacpypes.primitivedata import Unsigned
from bacpypes.constructeddata import Array
from bacpypes.basetypes import ServicesSupported
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
this_console = None
# point list
point_list = [
('1.2.3.4', 'analogValue', 1, 'presentValue'),
('1.2.3.4', 'analogValue', 2, 'presentValue'),
]
#
# ReadPointListApplication
#
@bacpypes_debugging
class ReadPointListApplication(BIPSimpleApplication):
def __init__(self, point_list, *args):
if _debug: ReadPointListApplication._debug("__init__ %r, %r", point_list, args)
BIPSimpleApplication.__init__(self, *args)
# keep track of requests to line up responses
self._request = None
# make a list of the response values
self.response_values = []
# turn the point list into a queue
self.point_queue = deque(point_list)
def next_request(self):
if _debug: ReadPointListApplication._debug("next_request")
# check to see if we're done
if not self.point_queue:
if _debug: ReadPointListApplication._debug(" - done")
stop()
return
# get the next request
addr, obj_type, obj_inst, prop_id = self.point_queue.popleft()
# build a request
self._request = ReadPropertyRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id,
)
self._request.pduDestination = Address(addr)
if _debug: ReadPointListApplication._debug(" - request: %r", self._request)
# forward it along
BIPSimpleApplication.request(self, self._request)
def confirmation(self, apdu):
if _debug: ReadPointListApplication._debug("confirmation %r", apdu)
if isinstance(apdu, Error):
if _debug: ReadPointListApplication._debug(" - error: %r", apdu)
self.response_values.append(apdu)
elif isinstance(apdu, AbortPDU):
if _debug: ReadPointListApplication._debug(" - abort: %r", apdu)
self.response_values.append(apdu)
elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)):
# find the datatype
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier)
if _debug: ReadPointListApplication._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError, "unknown datatype"
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None):
if apdu.propertyArrayIndex == 0:
value = apdu.propertyValue.cast_out(Unsigned)
else:
value = apdu.propertyValue.cast_out(datatype.subtype)
else:
value = apdu.propertyValue.cast_out(datatype)
if _debug: ReadPointListApplication._debug(" - value: %r", value)
# save the value
self.response_values.append(value)
# fire off another request
deferred(self.next_request)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['writeProperty'] = 1
# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value
# make a simple application
this_application = ReadPointListApplication(point_list, this_device, args.ini.address)
# fire off a request when the core has a chance
deferred(this_application.next_request)
_log.debug("running")
run()
# dump out the results
for request, response in zip(point_list, this_application.response_values):
print request, response
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

View File

@ -0,0 +1,126 @@
#!/usr/bin/python
"""
This sample application shows how to extend one of the basic objects, an Analog
Value Object in this case, to provide a present value. This type of code is used
when the application is providing a BACnet interface to a collection of data.
It assumes that almost all of the default behaviour of a BACpypes application is
sufficient.
"""
import random
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.primitivedata import Real
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import AnalogValueObject, Property, register_object_type
from bacpypes.errors import ExecutionError
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
#
# RandomValueProperty
#
@bacpypes_debugging
class RandomValueProperty(Property):
def __init__(self, identifier):
if _debug: RandomValueProperty._debug("__init__ %r", identifier)
Property.__init__(self, identifier, Real, default=None, optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex)
# access an array
if arrayIndex is not None:
raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray')
# return a random value
value = random.random() * 100.0
if _debug: RandomValueProperty._debug(" - value: %r", value)
return value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# Random Value Object Type
#
@bacpypes_debugging
class RandomAnalogValueObject(AnalogValueObject):
properties = [
RandomValueProperty('presentValue'),
]
def __init__(self, **kwargs):
if _debug: RandomAnalogValueObject._debug("__init__ %r", kwargs)
AnalogValueObject.__init__(self, **kwargs)
register_object_type(RandomAnalogValueObject)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=('device', int(args.ini.objectidentifier)),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a sample application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make a random input object
ravo1 = RandomAnalogValueObject(
objectIdentifier=('analogValue', 1), objectName='Random1'
)
_log.debug(" - ravo1: %r", ravo1)
ravo1d = ravo1._dict_contents()
print ravo1d
ravo2 = RandomAnalogValueObject(
objectIdentifier=('analogValue', 2), objectName='Random2'
)
_log.debug(" - ravo2: %r", ravo2)
# add it to the device
this_application.add_object(ravo1)
this_application.add_object(ravo2)
_log.debug(" - object list: %r", this_device.objectList)
print this_device._dict_contents()
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

176
samples/ReadProperty.py Executable file
View File

@ -0,0 +1,176 @@
#!/usr/bin/python
"""
This application presents a 'console' prompt to the user asking for read commands
which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK
and prints the value.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run
from bacpypes.pdu import Address
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import get_object_class, get_datatype
from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK
from bacpypes.primitivedata import Unsigned
from bacpypes.constructeddata import Array
from bacpypes.basetypes import ServicesSupported
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
this_console = None
#
# ReadPropertyApplication
#
class ReadPropertyApplication(BIPSimpleApplication):
def __init__(self, *args):
if _debug: ReadPropertyApplication._debug("__init__ %r", args)
BIPSimpleApplication.__init__(self, *args)
# keep track of requests to line up responses
self._request = None
def request(self, apdu):
if _debug: ReadPropertyApplication._debug("request %r", apdu)
# save a copy of the request
self._request = apdu
# forward it along
BIPSimpleApplication.request(self, apdu)
def confirmation(self, apdu):
if _debug: ReadPropertyApplication._debug("confirmation %r", apdu)
if isinstance(apdu, Error):
sys.stdout.write("error: %s\n" % (apdu.errorCode,))
sys.stdout.flush()
elif isinstance(apdu, AbortPDU):
apdu.debug_contents()
elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)):
# find the datatype
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier)
if _debug: ReadPropertyApplication._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError, "unknown datatype"
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None):
if apdu.propertyArrayIndex == 0:
value = apdu.propertyValue.cast_out(Unsigned)
else:
value = apdu.propertyValue.cast_out(datatype.subtype)
else:
value = apdu.propertyValue.cast_out(datatype)
if _debug: ReadPropertyApplication._debug(" - value: %r", value)
sys.stdout.write(str(value) + '\n')
if hasattr(value, 'debug_contents'):
value.debug_contents(file=sys.stdout)
sys.stdout.flush()
bacpypes_debugging(ReadPropertyApplication)
#
# ReadPropertyConsoleCmd
#
class ReadPropertyConsoleCmd(ConsoleCmd):
def do_read(self, args):
"""read <addr> <type> <inst> <prop> [ <indx> ]"""
args = args.split()
if _debug: ReadPropertyConsoleCmd._debug("do_read %r", args)
try:
addr, obj_type, obj_inst, prop_id = args[:4]
if obj_type.isdigit():
obj_type = int(obj_type)
elif not get_object_class(obj_type):
raise ValueError, "unknown object type"
obj_inst = int(obj_inst)
datatype = get_datatype(obj_type, prop_id)
if not datatype:
raise ValueError, "invalid property for object type"
# build a request
request = ReadPropertyRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id,
)
request.pduDestination = Address(addr)
if len(args) == 5:
request.propertyArrayIndex = int(args[4])
if _debug: ReadPropertyConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
ReadPropertyConsoleCmd._exception("exception: %r", e)
bacpypes_debugging(ReadPropertyConsoleCmd)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['writeProperty'] = 1
# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value
# make a simple application
this_application = ReadPropertyApplication(this_device, args.ini.address)
this_console = ReadPropertyConsoleCmd()
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

177
samples/ReadPropertyAny.py Executable file
View File

@ -0,0 +1,177 @@
#!/usr/bin/python
"""
This application presents a 'console' prompt to the user asking for read commands
which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK
and prints the value.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run
from bacpypes.pdu import Address
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import get_object_class, get_datatype
from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK
from bacpypes.primitivedata import Tag
from bacpypes.constructeddata import Array
from bacpypes.basetypes import ServicesSupported
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
this_console = None
#
# ReadPropertyAnyApplication
#
class ReadPropertyAnyApplication(BIPSimpleApplication):
def __init__(self, *args):
if _debug: ReadPropertyAnyApplication._debug("__init__ %r", args)
BIPSimpleApplication.__init__(self, *args)
# keep track of requests to line up responses
self._request = None
def request(self, apdu):
if _debug: ReadPropertyAnyApplication._debug("request %r", apdu)
# save a copy of the request
self._request = apdu
# forward it along
BIPSimpleApplication.request(self, apdu)
def confirmation(self, apdu):
if _debug: ReadPropertyAnyApplication._debug("confirmation %r", apdu)
if isinstance(apdu, Error):
sys.stdout.write("error: %s\n" % (apdu.errorCode,))
sys.stdout.flush()
elif isinstance(apdu, AbortPDU):
apdu.debug_contents()
elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)):
# peek at the value tag
value_tag = apdu.propertyValue.tagList.Peek()
if _debug: ReadPropertyAnyApplication._debug(" - value_tag: %r", value_tag)
# make sure that it is application tagged
if value_tag.tagClass != Tag.applicationTagClass:
sys.stdout.write("value is not application encoded\n")
else:
# find the datatype
datatype = Tag._app_tag_class[value_tag.tagNumber]
if _debug: ReadPropertyAnyApplication._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError, "unknown datatype"
# cast out the value
value = apdu.propertyValue.cast_out(datatype)
if _debug: ReadPropertyAnyApplication._debug(" - value: %r", value)
sys.stdout.write(str(value) + '\n')
sys.stdout.flush()
bacpypes_debugging(ReadPropertyAnyApplication)
#
# ReadPropertyAnyConsoleCmd
#
class ReadPropertyAnyConsoleCmd(ConsoleCmd):
def do_read(self, args):
"""read <addr> <type> <inst> <prop> [ <indx> ]"""
args = args.split()
if _debug: ReadPropertyAnyConsoleCmd._debug("do_read %r", args)
try:
addr, obj_type, obj_inst, prop_id = args[:4]
if obj_type.isdigit():
obj_type = int(obj_type)
elif not get_object_class(obj_type):
raise ValueError, "unknown object type"
obj_inst = int(obj_inst)
if prop_id.isdigit():
prop_id = int(prop_id)
# build a request
request = ReadPropertyRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id,
)
request.pduDestination = Address(addr)
if len(args) == 5:
request.propertyArrayIndex = int(args[4])
if _debug: ReadPropertyAnyConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
ReadPropertyAnyConsoleCmd._exception("exception: %r", e)
bacpypes_debugging(ReadPropertyAnyConsoleCmd)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['writeProperty'] = 1
# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value
# make a simple application
this_application = ReadPropertyAnyApplication(this_device, args.ini.address)
this_console = ReadPropertyAnyConsoleCmd()
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

245
samples/ReadPropertyMultiple.py Executable file
View File

@ -0,0 +1,245 @@
#!/usr/bin/python
"""
This application presents a 'console' prompt to the user asking for read commands
which create ReadPropertyRequest PDUs, then lines up the coorresponding ReadPropertyACK
and prints the value.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run
from bacpypes.pdu import Address
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import get_object_class, get_datatype
from bacpypes.apdu import ReadPropertyMultipleRequest, PropertyReference, ReadAccessSpecification, Error, AbortPDU, ReadPropertyMultipleACK
from bacpypes.primitivedata import Unsigned
from bacpypes.constructeddata import Array
from bacpypes.basetypes import PropertyIdentifier, ServicesSupported
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
this_console = None
#
# ReadPropertyMultipleApplication
#
@bacpypes_debugging
class ReadPropertyMultipleApplication(BIPSimpleApplication):
def __init__(self, *args):
if _debug: ReadPropertyMultipleApplication._debug("__init__ %r", args)
BIPSimpleApplication.__init__(self, *args)
# keep track of requests to line up responses
self._request = None
def request(self, apdu):
if _debug: ReadPropertyMultipleApplication._debug("request %r", apdu)
# save a copy of the request
self._request = apdu
# forward it along
BIPSimpleApplication.request(self, apdu)
def confirmation(self, apdu):
if _debug: ReadPropertyMultipleApplication._debug("confirmation %r", apdu)
if isinstance(apdu, Error):
sys.stdout.write("error: %s\n" % (apdu.errorCode,))
sys.stdout.flush()
elif isinstance(apdu, AbortPDU):
apdu.debug_contents()
elif (isinstance(self._request, ReadPropertyMultipleRequest)) and (isinstance(apdu, ReadPropertyMultipleACK)):
# loop through the results
for result in apdu.listOfReadAccessResults:
# here is the object identifier
objectIdentifier = result.objectIdentifier
if _debug: ReadPropertyMultipleApplication._debug(" - objectIdentifier: %r", objectIdentifier)
# now come the property values per object
for element in result.listOfResults:
# get the property and array index
propertyIdentifier = element.propertyIdentifier
if _debug: ReadPropertyMultipleApplication._debug(" - propertyIdentifier: %r", propertyIdentifier)
propertyArrayIndex = element.propertyArrayIndex
if _debug: ReadPropertyMultipleApplication._debug(" - propertyArrayIndex: %r", propertyArrayIndex)
# here is the read result
readResult = element.readResult
sys.stdout.write(propertyIdentifier)
if propertyArrayIndex is not None:
sys.stdout.write("[" + str(propertyArrayIndex) + "]")
# check for an error
if readResult.propertyAccessError is not None:
sys.stdout.write(" ! " + str(readResult.propertyAccessError) + '\n')
else:
# here is the value
propertyValue = readResult.propertyValue
# find the datatype
datatype = get_datatype(objectIdentifier[0], propertyIdentifier)
if _debug: ReadPropertyMultipleApplication._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError, "unknown datatype"
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (propertyArrayIndex is not None):
if propertyArrayIndex == 0:
value = propertyValue.cast_out(Unsigned)
else:
value = propertyValue.cast_out(datatype.subtype)
else:
value = propertyValue.cast_out(datatype)
if _debug: ReadPropertyMultipleApplication._debug(" - value: %r", value)
sys.stdout.write(" = " + str(value) + '\n')
sys.stdout.flush()
#
# ReadPropertyMultipleConsoleCmd
#
@bacpypes_debugging
class ReadPropertyMultipleConsoleCmd(ConsoleCmd):
def do_read(self, args):
"""read <addr> ( <type> <inst> ( <prop> [ <indx> ] )... )..."""
args = args.split()
if _debug: ReadPropertyMultipleConsoleCmd._debug("do_read %r", args)
try:
i = 0
addr = args[i]
i += 1
read_access_spec_list = []
while i < len(args):
obj_type = args[i]
i += 1
if obj_type.isdigit():
obj_type = int(obj_type)
elif not get_object_class(obj_type):
raise ValueError, "unknown object type"
obj_inst = int(args[i])
i += 1
prop_reference_list = []
while i < len(args):
prop_id = args[i]
if prop_id not in PropertyIdentifier.enumerations:
break
i += 1
if prop_id in ('all', 'required', 'optional'):
pass
else:
datatype = get_datatype(obj_type, prop_id)
if not datatype:
raise ValueError, "invalid property for object type"
# build a property reference
prop_reference = PropertyReference(
propertyIdentifier=prop_id,
)
# check for an array index
if (i < len(args)) and args[i].isdigit():
prop_reference.propertyArrayIndex = int(args[i])
i += 1
# add it to the list
prop_reference_list.append(prop_reference)
# check for at least one property
if not prop_reference_list:
raise ValueError, "provide at least one property"
# build a read access specification
read_access_spec = ReadAccessSpecification(
objectIdentifier=(obj_type, obj_inst),
listOfPropertyReferences=prop_reference_list,
)
# add it to the list
read_access_spec_list.append(read_access_spec)
# check for at least one
if not read_access_spec_list:
raise RuntimeError, "at least one read access specification required"
# build the request
request = ReadPropertyMultipleRequest(
listOfReadAccessSpecs=read_access_spec_list,
)
request.pduDestination = Address(addr)
if _debug: ReadPropertyMultipleConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
ReadPropertyMultipleConsoleCmd._exception("exception: %r", e)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['writeProperty'] = 1
# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value
# make a simple application
this_application = ReadPropertyMultipleApplication(this_device, args.ini.address)
this_console = ReadPropertyMultipleConsoleCmd()
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

View File

@ -0,0 +1,319 @@
#!/usr/bin/python
"""
This sample application shows how to extend the basic functionality of a device
to support the ReadPropertyMultiple service.
"""
import random
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.primitivedata import Atomic, Real, Unsigned
from bacpypes.constructeddata import Array, Any
from bacpypes.basetypes import ServicesSupported, ErrorType
from bacpypes.apdu import ReadPropertyMultipleACK, ReadAccessResult, ReadAccessResultElement, ReadAccessResultElementChoice
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import AnalogValueObject, Property, PropertyError, register_object_type
from bacpypes.apdu import Error
from bacpypes.errors import ExecutionError
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
#
# RandomValueProperty
#
@bacpypes_debugging
class RandomValueProperty(Property):
def __init__(self, identifier):
if _debug: RandomValueProperty._debug("__init__ %r", identifier)
Property.__init__(self, identifier, Real, default=None, optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex)
# access an array
if arrayIndex is not None:
raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray')
# return a random value
value = random.random() * 100.0
if _debug: RandomValueProperty._debug(" - value: %r", value)
return value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# Random Value Object Type
#
@bacpypes_debugging
class RandomAnalogValueObject(AnalogValueObject):
properties = [
RandomValueProperty('presentValue'),
]
def __init__(self, **kwargs):
if _debug: RandomAnalogValueObject._debug("__init__ %r", kwargs)
AnalogValueObject.__init__(self, **kwargs)
register_object_type(RandomAnalogValueObject)
#
# ReadPropertyToAny
#
@bacpypes_debugging
def ReadPropertyToAny(obj, propertyIdentifier, propertyArrayIndex=None):
"""Read the specified property of the object, with the optional array index,
and cast the result into an Any object."""
if _debug: ReadPropertyToAny._debug("ReadPropertyToAny %s %r %r", obj, propertyIdentifier, propertyArrayIndex)
# get the datatype
datatype = obj.get_datatype(propertyIdentifier)
if _debug: ReadPropertyToAny._debug(" - datatype: %r", datatype)
if datatype is None:
raise ExecutionError(errorClass='property', errorCode='datatypeNotSupported')
# get the value
value = obj.ReadProperty(propertyIdentifier, propertyArrayIndex)
if _debug: ReadPropertyToAny._debug(" - value: %r", value)
if value is None:
raise ExecutionError(errorClass='property', errorCode='unknownProperty')
# change atomic values into something encodeable
if issubclass(datatype, Atomic):
value = datatype(value)
elif issubclass(datatype, Array) and (propertyArrayIndex is not None):
if propertyArrayIndex == 0:
value = Unsigned(value)
elif issubclass(datatype.subtype, Atomic):
value = datatype.subtype(value)
elif not isinstance(value, datatype.subtype):
raise TypeError, "invalid result datatype, expecting %s and got %s" \
% (datatype.subtype.__name__, type(value).__name__)
elif not isinstance(value, datatype):
raise TypeError, "invalid result datatype, expecting %s and got %s" \
% (datatype.__name__, type(value).__name__)
if _debug: ReadPropertyToAny._debug(" - encodeable value: %r", value)
# encode the value
result = Any()
result.cast_in(value)
if _debug: ReadPropertyToAny._debug(" - result: %r", result)
# return the object
return result
#
# ReadPropertyToResultElement
#
@bacpypes_debugging
def ReadPropertyToResultElement(obj, propertyIdentifier, propertyArrayIndex=None):
"""Read the specified property of the object, with the optional array index,
and cast the result into an Any object."""
if _debug: ReadPropertyToResultElement._debug("ReadPropertyToResultElement %s %r %r", obj, propertyIdentifier, propertyArrayIndex)
# save the result in the property value
read_result = ReadAccessResultElementChoice()
try:
read_result.propertyValue = ReadPropertyToAny(obj, propertyIdentifier, propertyArrayIndex)
if _debug: ReadPropertyToResultElement._debug(" - success")
except PropertyError, error:
if _debug: ReadPropertyToResultElement._debug(" - error: %r", error)
read_result.propertyAccessError = ErrorType(errorClass='property', errorCode='unknownProperty')
except ExecutionError, error:
if _debug: ReadPropertyToResultElement._debug(" - error: %r", error)
read_result.propertyAccessError = ErrorType(errorClass=error.errorClass, errorCode=error.errorCode)
# make an element for this value
read_access_result_element = ReadAccessResultElement(
propertyIdentifier=propertyIdentifier,
propertyArrayIndex=propertyArrayIndex,
readResult=read_result,
)
if _debug: ReadPropertyToResultElement._debug(" - read_access_result_element: %r", read_access_result_element)
# fini
return read_access_result_element
#
# ReadPropertyMultipleApplication
#
@bacpypes_debugging
class ReadPropertyMultipleApplication(BIPSimpleApplication):
def __init__(self, *args, **kwargs):
if _debug: ReadPropertyMultipleApplication._debug("__init__ %r %r", args, kwargs)
BIPSimpleApplication.__init__(self, *args, **kwargs)
def do_ReadPropertyMultipleRequest(self, apdu):
"""Respond to a ReadPropertyMultiple Request."""
if _debug: ReadPropertyMultipleApplication._debug("do_ReadPropertyMultipleRequest %r", apdu)
# response is a list of read access results (or an error)
resp = None
read_access_result_list = []
# loop through the request
for read_access_spec in apdu.listOfReadAccessSpecs:
# get the object identifier
objectIdentifier = read_access_spec.objectIdentifier
if _debug: ReadPropertyMultipleApplication._debug(" - objectIdentifier: %r", objectIdentifier)
# check for wildcard
if (objectIdentifier == ('device', 4194303)):
if _debug: ReadPropertyMultipleApplication._debug(" - wildcard device identifier")
objectIdentifier = self.localDevice.objectIdentifier
# get the object
obj = self.get_object_id(objectIdentifier)
if _debug: ReadPropertyMultipleApplication._debug(" - object: %r", obj)
# make sure it exists
if not obj:
resp = Error(errorClass='object', errorCode='unknownObject', context=apdu)
if _debug: ReadPropertyMultipleApplication._debug(" - unknown object error: %r", resp)
break
# build a list of result elements
read_access_result_element_list = []
# loop through the property references
for prop_reference in read_access_spec.listOfPropertyReferences:
# get the property identifier
propertyIdentifier = prop_reference.propertyIdentifier
if _debug: ReadPropertyMultipleApplication._debug(" - propertyIdentifier: %r", propertyIdentifier)
# get the array index (optional)
propertyArrayIndex = prop_reference.propertyArrayIndex
if _debug: ReadPropertyMultipleApplication._debug(" - propertyArrayIndex: %r", propertyArrayIndex)
# check for special property identifiers
if propertyIdentifier in ('all', 'required', 'optional'):
for propId, prop in obj._properties.items():
if _debug: ReadPropertyMultipleApplication._debug(" - checking: %r %r", propId, prop.optional)
if (propertyIdentifier == 'all'):
pass
elif (propertyIdentifier == 'required') and (prop.optional):
if _debug: ReadPropertyMultipleApplication._debug(" - not a required property")
continue
elif (propertyIdentifier == 'optional') and (not prop.optional):
if _debug: ReadPropertyMultipleApplication._debug(" - not an optional property")
continue
# read the specific property
read_access_result_element = ReadPropertyToResultElement(obj, propId, propertyArrayIndex)
# check for undefined property
if read_access_result_element.readResult.propertyAccessError \
and read_access_result_element.readResult.propertyAccessError.errorCode == 'unknownProperty':
continue
# add it to the list
read_access_result_element_list.append(read_access_result_element)
else:
# read the specific property
read_access_result_element = ReadPropertyToResultElement(obj, propertyIdentifier, propertyArrayIndex)
# add it to the list
read_access_result_element_list.append(read_access_result_element)
# build a read access result
read_access_result = ReadAccessResult(
objectIdentifier=objectIdentifier,
listOfResults=read_access_result_element_list
)
if _debug: ReadPropertyMultipleApplication._debug(" - read_access_result: %r", read_access_result)
# add it to the list
read_access_result_list.append(read_access_result)
# this is a ReadPropertyMultiple ack
if not resp:
resp = ReadPropertyMultipleACK(context=apdu)
resp.listOfReadAccessResults = read_access_result_list
if _debug: ReadPropertyMultipleApplication._debug(" - resp: %r", resp)
# return the result
self.response(resp)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['readPropertyMultiple'] = 1
pss['writeProperty'] = 1
# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value
# make a sample application
this_application = ReadPropertyMultipleApplication(this_device, args.ini.address)
# make a random input object
ravo1 = RandomAnalogValueObject(
objectIdentifier=('analogValue', 1), objectName='Random1'
)
_log.debug(" - ravo1: %r", ravo1)
ravo2 = RandomAnalogValueObject(
objectIdentifier=('analogValue', 2), objectName='Random2'
)
_log.debug(" - ravo2: %r", ravo2)
# add it to the device
this_application.add_object(ravo1)
this_application.add_object(ravo2)
_log.debug(" - object list: %r", this_device.objectList)
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

166
samples/ReadRange.py Executable file
View File

@ -0,0 +1,166 @@
#!/usr/bin/python
"""
This application presents a 'console' prompt to the user asking for readrange commands
which create ReadRangeRequest PDUs, then lines up the coorresponding ReadRangeACK
and prints the value.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run
from bacpypes.pdu import Address
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import get_object_class, get_datatype
from bacpypes.apdu import Error, AbortPDU, ReadRangeRequest, ReadRangeACK
from bacpypes.basetypes import ServicesSupported
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
this_console = None
#
# ReadRangeApplication
#
@bacpypes_debugging
class ReadRangeApplication(BIPSimpleApplication):
def __init__(self, *args):
if _debug: ReadRangeApplication._debug("__init__ %r", args)
BIPSimpleApplication.__init__(self, *args)
# keep track of requests to line up responses
self._request = None
def request(self, apdu):
if _debug: ReadRangeApplication._debug("request %r", apdu)
# save a copy of the request
self._request = apdu
# forward it along
BIPSimpleApplication.request(self, apdu)
def confirmation(self, apdu):
if _debug: ReadRangeApplication._debug("confirmation %r", apdu)
if isinstance(apdu, Error):
sys.stdout.write("error: %s\n" % (apdu.errorCode,))
sys.stdout.flush()
elif isinstance(apdu, AbortPDU):
apdu.debug_contents()
elif (isinstance(self._request, ReadRangeRequest)) and (isinstance(apdu, ReadRangeACK)):
# find the datatype
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier)
if _debug: ReadRangeApplication._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError, "unknown datatype"
# cast out of the single Any element into the datatype
value = apdu.itemData[0].cast_out(datatype)
# dump it out
for i, item in enumerate(value):
sys.stdout.write("[%d]\n" % (i,))
item.debug_contents(file=sys.stdout, indent=2)
sys.stdout.flush()
#
# ReadRangeConsoleCmd
#
@bacpypes_debugging
class ReadRangeConsoleCmd(ConsoleCmd):
def do_readrange(self, args):
"""readrange <addr> <type> <inst> <prop> [ <indx> ]"""
args = args.split()
if _debug: ReadRangeConsoleCmd._debug("do_readrange %r", args)
try:
addr, obj_type, obj_inst, prop_id = args[:4]
if obj_type.isdigit():
obj_type = int(obj_type)
elif not get_object_class(obj_type):
raise ValueError, "unknown object type"
obj_inst = int(obj_inst)
datatype = get_datatype(obj_type, prop_id)
if not datatype:
raise ValueError, "invalid property for object type"
# build a request
request = ReadRangeRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id,
)
request.pduDestination = Address(addr)
if len(args) == 5:
request.propertyArrayIndex = int(args[4])
if _debug: ReadRangeConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
ReadRangeConsoleCmd._exception("exception: %r", e)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['writeProperty'] = 1
# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value
# make a simple application
this_application = ReadRangeApplication(this_device, args.ini.address)
this_console = ReadRangeConsoleCmd()
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

269
samples/ReadWriteFile.py Executable file
View File

@ -0,0 +1,269 @@
#!/usr/bin/python
"""
ReadWriteFile.py
This application presents a 'console' prompt to the user asking for commands.
The 'readrecord' and 'writerecord' commands are used with record oriented files,
and the 'readstream' and 'writestream' commands are used with stream oriented
files.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run
from bacpypes.pdu import Address
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.apdu import Error, AbortPDU, \
AtomicReadFileRequest, \
AtomicReadFileRequestAccessMethodChoice, \
AtomicReadFileRequestAccessMethodChoiceRecordAccess, \
AtomicReadFileRequestAccessMethodChoiceStreamAccess, \
AtomicReadFileACK, \
AtomicWriteFileRequest, \
AtomicWriteFileRequestAccessMethodChoice, \
AtomicWriteFileRequestAccessMethodChoiceRecordAccess, \
AtomicWriteFileRequestAccessMethodChoiceStreamAccess, \
AtomicWriteFileACK
from bacpypes.basetypes import ServicesSupported
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# reference a simple application
this_application = None
#
# TestApplication
#
@bacpypes_debugging
class TestApplication(BIPSimpleApplication):
def request(self, apdu):
if _debug: TestApplication._debug("request %r", apdu)
# save a copy of the request
self._request = apdu
# forward it along
BIPSimpleApplication.request(self, apdu)
def confirmation(self, apdu):
if _debug: TestApplication._debug("confirmation %r", apdu)
if isinstance(apdu, Error):
sys.stdout.write("error: %s\n" % (apdu.errorCode,))
sys.stdout.flush()
elif isinstance(apdu, AbortPDU):
apdu.debug_contents()
elif (isinstance(self._request, AtomicReadFileRequest)) and (isinstance(apdu, AtomicReadFileACK)):
# suck out the record data
if apdu.accessMethod.recordAccess:
value = apdu.accessMethod.recordAccess.fileRecordData
elif apdu.accessMethod.streamAccess:
value = apdu.accessMethod.streamAccess.fileData
TestApplication._debug(" - value: %r", value)
sys.stdout.write(repr(value) + '\n')
sys.stdout.flush()
elif (isinstance(self._request, AtomicWriteFileRequest)) and (isinstance(apdu, AtomicWriteFileACK)):
# suck out the record data
if apdu.fileStartPosition is not None:
value = apdu.fileStartPosition
elif apdu.fileStartRecord is not None:
value = apdu.fileStartRecord
TestApplication._debug(" - value: %r", value)
sys.stdout.write(repr(value) + '\n')
sys.stdout.flush()
#
# TestConsoleCmd
#
@bacpypes_debugging
class TestConsoleCmd(ConsoleCmd):
def do_readrecord(self, args):
"""readrecord <addr> <inst> <start> <count>"""
args = args.split()
if _debug: TestConsoleCmd._debug("do_readrecord %r", args)
try:
addr, obj_inst, start_record, record_count = args
obj_type = 'file'
obj_inst = int(obj_inst)
start_record = int(start_record)
record_count = int(record_count)
# build a request
request = AtomicReadFileRequest(
fileIdentifier=(obj_type, obj_inst),
accessMethod=AtomicReadFileRequestAccessMethodChoice(
recordAccess=AtomicReadFileRequestAccessMethodChoiceRecordAccess(
fileStartRecord=start_record,
requestedRecordCount=record_count,
),
),
)
request.pduDestination = Address(addr)
if _debug: TestConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
TestConsoleCmd._exception("exception: %r", e)
def do_readstream(self, args):
"""readstream <addr> <inst> <start> <count>"""
args = args.split()
if _debug: TestConsoleCmd._debug("do_readstream %r", args)
try:
addr, obj_inst, start_position, octet_count = args
obj_type = 'file'
obj_inst = int(obj_inst)
start_position = int(start_position)
octet_count = int(octet_count)
# build a request
request = AtomicReadFileRequest(
fileIdentifier=(obj_type, obj_inst),
accessMethod=AtomicReadFileRequestAccessMethodChoice(
streamAccess=AtomicReadFileRequestAccessMethodChoiceStreamAccess(
fileStartPosition=start_position,
requestedOctetCount=octet_count,
),
),
)
request.pduDestination = Address(addr)
if _debug: TestConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
TestConsoleCmd._exception("exception: %r", e)
def do_writerecord(self, args):
"""writerecord <addr> <inst> <start> <count> [ <data> ... ]"""
args = args.split()
if _debug: TestConsoleCmd._debug("do_writerecord %r", args)
try:
addr, obj_inst, start_record, record_count = args[0:4]
obj_type = 'file'
obj_inst = int(obj_inst)
start_record = int(start_record)
record_count = int(record_count)
record_data = list(args[4:])
# build a request
request = AtomicWriteFileRequest(
fileIdentifier=(obj_type, obj_inst),
accessMethod=AtomicWriteFileRequestAccessMethodChoice(
recordAccess=AtomicWriteFileRequestAccessMethodChoiceRecordAccess(
fileStartRecord=start_record,
recordCount=record_count,
fileRecordData=record_data,
),
),
)
request.pduDestination = Address(addr)
if _debug: TestConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
TestConsoleCmd._exception("exception: %r", e)
def do_writestream(self, args):
"""writestream <addr> <inst> <start> <data>"""
args = args.split()
if _debug: TestConsoleCmd._debug("do_writestream %r", args)
try:
addr, obj_inst, start_position, data = args
obj_type = 'file'
obj_inst = int(obj_inst)
start_position = int(start_position)
# build a request
request = AtomicWriteFileRequest(
fileIdentifier=(obj_type, obj_inst),
accessMethod=AtomicWriteFileRequestAccessMethodChoice(
streamAccess=AtomicWriteFileRequestAccessMethodChoiceStreamAccess(
fileStartPosition=start_position,
fileData=data,
),
),
)
request.pduDestination = Address(addr)
if _debug: TestConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
TestConsoleCmd._exception("exception: %r", e)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['writeProperty'] = 1
# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value
# make a simple application
this_application = TestApplication(this_device, args.ini.address)
TestConsoleCmd()
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

219
samples/ReadWriteFileServer.py Executable file
View File

@ -0,0 +1,219 @@
#!/usr/bin/python
"""
ReadWriteFileServer.py
This sample application is a BACnet device that has one record access file at
('file', 1) and one stream access file at ('file', 2).
"""
import random
import string
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import FileObject, register_object_type
from bacpypes.basetypes import ServicesSupported
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# Local Record Access File Object Type
#
@bacpypes_debugging
class LocalRecordAccessFileObject(FileObject):
def __init__(self, **kwargs):
""" Initialize a record accessed file object. """
if _debug:
LocalRecordAccessFileObject._debug("__init__ %r",
kwargs,
)
FileObject.__init__(self,
fileAccessMethod='recordAccess',
**kwargs
)
self._record_data = [
''.join(random.choice(string.ascii_letters)
for i in range(random.randint(10, 20)))
for j in range(random.randint(10, 20))
]
if _debug: LocalRecordAccessFileObject._debug(" - %d records",
len(self._record_data),
)
def __len__(self):
""" Return the number of records. """
if _debug: LocalRecordAccessFileObject._debug("__len__")
return len(self._record_data)
def ReadFile(self, start_record, record_count):
""" Read a number of records starting at a specific record. """
if _debug: LocalRecordAccessFileObject._debug("ReadFile %r %r",
start_record, record_count,
)
# end of file is true if last record is returned
end_of_file = (start_record+record_count) >= len(self._record_data)
return end_of_file, \
self._record_data[start_record:start_record + record_count]
def WriteFile(self, start_record, record_count, record_data):
""" Write a number of records, starting at a specific record. """
# check for append
if (start_record < 0):
start_record = len(self._record_data)
self._record_data.extend(record_data)
# check to extend the file out to start_record records
elif (start_record > len(self._record_data)):
self._record_data.extend(['' for i in range(start_record - len(self._record_data))])
start_record = len(self._record_data)
self._record_data.extend(record_data)
# slice operation works for other cases
else:
self._record_data[start_record:start_record + record_count] = record_data
# return where the 'writing' actually started
return start_record
register_object_type(LocalRecordAccessFileObject)
#
# Local Stream Access File Object Type
#
@bacpypes_debugging
class LocalStreamAccessFileObject(FileObject):
def __init__(self, **kwargs):
""" Initialize a stream accessed file object. """
if _debug:
LocalStreamAccessFileObject._debug("__init__ %r",
kwargs,
)
FileObject.__init__(self,
fileAccessMethod='streamAccess',
**kwargs
)
self._file_data = ''.join(random.choice(string.ascii_letters)
for i in range(random.randint(100, 200)))
if _debug: LocalRecordAccessFileObject._debug(" - %d octets",
len(self._file_data),
)
def __len__(self):
""" Return the number of octets in the file. """
if _debug: LocalStreamAccessFileObject._debug("__len__")
return len(self._file_data)
def ReadFile(self, start_position, octet_count):
""" Read a chunk of data out of the file. """
if _debug: LocalStreamAccessFileObject._debug("ReadFile %r %r",
start_position, octet_count,
)
# end of file is true if last record is returned
end_of_file = (start_position+octet_count) >= len(self._file_data)
return end_of_file, \
self._file_data[start_position:start_position + octet_count]
def WriteFile(self, start_position, data):
""" Write a number of octets, starting at a specific offset. """
# check for append
if (start_position < 0):
start_position = len(self._file_data)
self._file_data += data
# check to extend the file out to start_record records
elif (start_position > len(self._file_data)):
self._file_data += '\0' * (start_position - len(self._file_data))
start_position = len(self._file_data)
self._file_data += data
# no slice assignment, strings are immutable
else:
data_len = len(data)
prechunk = self._file_data[:start_position]
postchunk = self._file_data[start_position + data_len:]
self._file_data = prechunk + data + postchunk
# return where the 'writing' actually started
return start_position
register_object_type(LocalStreamAccessFileObject)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['writeProperty'] = 1
pss['atomicReadFile'] = 1
pss['atomicWriteFile'] = 1
# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value
# make a sample application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make a record access file, add to the device
f1 = LocalRecordAccessFileObject(
objectIdentifier=('file', 1),
objectName='RecordAccessFile1'
)
_log.debug(" - f1: %r", f1)
this_application.add_object(f1)
# make a stream access file, add to the device
f2 = LocalStreamAccessFileObject(
objectIdentifier=('file', 2),
objectName='StreamAccessFile2'
)
_log.debug(" - f2: %r", f2)
this_application.add_object(f2)
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

258
samples/ReadWriteProperty.py Executable file
View File

@ -0,0 +1,258 @@
#!/usr/bin/python
"""
This application presents a 'console' prompt to the user asking for commands.
For 'read' commands it will create ReadPropertyRequest PDUs, then lines up the
coorresponding ReadPropertyACK and prints the value. For 'write' commands it
will create WritePropertyRequst PDUs and prints out a simple acknowledgement.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run
from bacpypes.pdu import Address
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import get_object_class, get_datatype
from bacpypes.apdu import Error, AbortPDU, SimpleAckPDU, \
ReadPropertyRequest, ReadPropertyACK, WritePropertyRequest
from bacpypes.primitivedata import Null, Atomic, Integer, Unsigned, Real
from bacpypes.constructeddata import Array, Any
from bacpypes.basetypes import ServicesSupported
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
this_console = None
#
# ReadPropertyApplication
#
@bacpypes_debugging
class ReadPropertyApplication(BIPSimpleApplication):
def __init__(self, *args):
if _debug: ReadPropertyApplication._debug("__init__ %r", args)
BIPSimpleApplication.__init__(self, *args)
# keep track of requests to line up responses
self._request = None
def request(self, apdu):
if _debug: ReadPropertyApplication._debug("request %r", apdu)
# save a copy of the request
self._request = apdu
# forward it along
BIPSimpleApplication.request(self, apdu)
def confirmation(self, apdu):
if _debug: ReadPropertyApplication._debug("confirmation %r", apdu)
if isinstance(apdu, Error):
sys.stdout.write("error: %s\n" % (apdu.errorCode,))
sys.stdout.flush()
elif isinstance(apdu, AbortPDU):
apdu.debug_contents()
if isinstance(apdu, SimpleAckPDU):
sys.stdout.write("ack\n")
sys.stdout.flush()
elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)):
# find the datatype
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier)
if _debug: ReadPropertyApplication._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError, "unknown datatype"
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None):
if apdu.propertyArrayIndex == 0:
value = apdu.propertyValue.cast_out(Unsigned)
else:
value = apdu.propertyValue.cast_out(datatype.subtype)
else:
value = apdu.propertyValue.cast_out(datatype)
if _debug: ReadPropertyApplication._debug(" - value: %r", value)
sys.stdout.write(str(value) + '\n')
sys.stdout.flush()
#
# ReadWritePropertyConsoleCmd
#
@bacpypes_debugging
class ReadWritePropertyConsoleCmd(ConsoleCmd):
def do_read(self, args):
"""read <addr> <type> <inst> <prop> [ <indx> ]"""
args = args.split()
if _debug: ReadWritePropertyConsoleCmd._debug("do_read %r", args)
try:
addr, obj_type, obj_inst, prop_id = args[:4]
if obj_type.isdigit():
obj_type = int(obj_type)
elif not get_object_class(obj_type):
raise ValueError, "unknown object type"
obj_inst = int(obj_inst)
datatype = get_datatype(obj_type, prop_id)
if not datatype:
raise ValueError, "invalid property for object type"
# build a request
request = ReadPropertyRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id,
)
request.pduDestination = Address(addr)
if len(args) == 5:
request.propertyArrayIndex = int(args[4])
if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
ReadWritePropertyConsoleCmd._exception("exception: %r", e)
def do_write(self, args):
"""write <addr> <type> <inst> <prop> <value> [ <indx> ] [ <priority> ]"""
args = args.split()
ReadWritePropertyConsoleCmd._debug("do_write %r", args)
try:
addr, obj_type, obj_inst, prop_id = args[:4]
if obj_type.isdigit():
obj_type = int(obj_type)
obj_inst = int(obj_inst)
value = args[4]
indx = None
if len(args) >= 6:
if args[5] != "-":
indx = int(args[5])
if _debug: ReadWritePropertyConsoleCmd._debug(" - indx: %r", indx)
priority = None
if len(args) >= 7:
priority = int(args[6])
if _debug: ReadWritePropertyConsoleCmd._debug(" - priority: %r", priority)
# get the datatype
datatype = get_datatype(obj_type, prop_id)
if _debug: ReadWritePropertyConsoleCmd._debug(" - datatype: %r", datatype)
# change atomic values into something encodeable, null is a special case
if (value == 'null'):
value = Null()
elif issubclass(datatype, Atomic):
if datatype is Integer:
value = int(value)
elif datatype is Real:
value = float(value)
elif datatype is Unsigned:
value = int(value)
value = datatype(value)
elif issubclass(datatype, Array) and (indx is not None):
if indx == 0:
value = Integer(value)
elif issubclass(datatype.subtype, Atomic):
value = datatype.subtype(value)
elif not isinstance(value, datatype.subtype):
raise TypeError, "invalid result datatype, expecting %s" % (datatype.subtype.__name__,)
elif not isinstance(value, datatype):
raise TypeError, "invalid result datatype, expecting %s" % (datatype.__name__,)
if _debug: ReadWritePropertyConsoleCmd._debug(" - encodeable value: %r %s", value, type(value))
# build a request
request = WritePropertyRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id
)
request.pduDestination = Address(addr)
# save the value
request.propertyValue = Any()
try:
request.propertyValue.cast_in(value)
except Exception, e:
ReadWritePropertyConsoleCmd._exception("WriteProperty cast error: %r", e)
# optional array index
if indx is not None:
request.propertyArrayIndex = indx
# optional priority
if priority is not None:
request.priority = priority
if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
ReadWritePropertyConsoleCmd._exception("exception: %r", e)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['writeProperty'] = 1
# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value
# make a simple application
this_application = ReadPropertyApplication(this_device, args.ini.address)
this_console = ReadWritePropertyConsoleCmd()
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

View File

@ -0,0 +1,198 @@
#!/usr/bin/python
"""
Mutliple Read Property
This application has a static list of points that it would like to read. It reads the
values of each of them in turn and then quits.
"""
from collections import deque
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run, deferred
from bacpypes.task import RecurringTask
from bacpypes.pdu import Address
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import get_datatype
from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK
from bacpypes.primitivedata import Unsigned
from bacpypes.constructeddata import Array
from bacpypes.basetypes import ServicesSupported
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
this_console = None
# point list
point_list = [
('1.2.3.4', 'analogValue', 1, 'presentValue'),
('1.2.3.4', 'analogValue', 2, 'presentValue'),
]
#
# PrairieDog
#
@bacpypes_debugging
class PrairieDog(BIPSimpleApplication, RecurringTask):
def __init__(self, interval, *args):
if _debug: PrairieDog._debug("__init__ %r, %r", interval, args)
BIPSimpleApplication.__init__(self, *args)
RecurringTask.__init__(self, interval * 1000)
# keep track of requests to line up responses
self._request = None
# start out idle
self.is_busy = False
self.point_queue = deque()
self.response_values = []
# install it
self.install_task()
def process_task(self):
if _debug: PrairieDog._debug("process_task")
global point_list
# check to see if we're idle
if self.is_busy:
if _debug: PrairieDog._debug(" - busy")
return
# now we are busy
self.is_busy = True
# turn the point list into a queue
self.point_queue = deque(point_list)
# clean out the list of the response values
self.response_values = []
# fire off the next request
self.next_request()
def next_request(self):
if _debug: PrairieDog._debug("next_request")
# check to see if we're done
if not self.point_queue:
if _debug: PrairieDog._debug(" - done")
# dump out the results
for request, response in zip(point_list, self.response_values):
print request, response
# no longer busy
self.is_busy = False
return
# get the next request
addr, obj_type, obj_inst, prop_id = self.point_queue.popleft()
# build a request
self._request = ReadPropertyRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id,
)
self._request.pduDestination = Address(addr)
if _debug: PrairieDog._debug(" - request: %r", self._request)
# forward it along
BIPSimpleApplication.request(self, self._request)
def confirmation(self, apdu):
if _debug: PrairieDog._debug("confirmation %r", apdu)
if isinstance(apdu, Error):
if _debug: PrairieDog._debug(" - error: %r", apdu)
self.response_values.append(apdu)
elif isinstance(apdu, AbortPDU):
if _debug: PrairieDog._debug(" - abort: %r", apdu)
self.response_values.append(apdu)
elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)):
# find the datatype
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier)
if _debug: PrairieDog._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError, "unknown datatype"
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None):
if apdu.propertyArrayIndex == 0:
value = apdu.propertyValue.cast_out(Unsigned)
else:
value = apdu.propertyValue.cast_out(datatype.subtype)
else:
value = apdu.propertyValue.cast_out(datatype)
if _debug: PrairieDog._debug(" - value: %r", value)
# save the value
self.response_values.append(value)
# fire off another request
deferred(self.next_request)
#
# __main__
#
try:
# parse the command line arguments
parser = ConfigArgumentParser(description=__doc__)
# add an argument for interval
parser.add_argument('interval', type=int,
help='repeat rate in seconds',
)
# now parse the arguments
args = parser.parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['writeProperty'] = 1
# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value
# make a dog
this_application = PrairieDog(args.interval, this_device, args.ini.address)
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

74
samples/RecurringTask.py Executable file
View File

@ -0,0 +1,74 @@
#!/usr/bin/python
"""
This application demonstrates doing something at a regular interval.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.task import RecurringTask
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# PrairieDog
#
@bacpypes_debugging
class PrairieDog(RecurringTask):
def __init__(self, dog_number, interval):
if _debug: PrairieDog._debug("__init__ %r %r", dog_number, interval)
# save the identity
self.dog_number = dog_number
# this is a recurring task
RecurringTask.__init__(self, interval)
# install it
self.install_task()
def process_task(self):
if _debug: PrairieDog._debug("process_task")
sys.stdout.write("%d woof!\n" % (self.dog_number,))
#
# __main__
#
try:
# parse the command line arguments
parser = ConfigArgumentParser(description=__doc__)
# add an argument for seconds per dog
parser.add_argument('seconds', metavar='N', type=int, nargs='+',
help='number of seconds for each dog',
)
# now parse the arguments
args = parser.parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make some dogs
for i, sec in enumerate(args.seconds):
dog = PrairieDog(i, sec * 1000)
if _debug: _log.debug(" - dog: %r", dog)
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

92
samples/SampleApplication.py Executable file
View File

@ -0,0 +1,92 @@
#!/usr/bin/python
"""
Sample Application
==================
This sample application is the simplest BACpypes application that is
a complete stack. Using an INI file it will configure a
LocalDeviceObject, create a SampleApplication instance, and run,
waiting for a keyboard interrupt or a TERM signal to quit.
There is no input or output for this application, but by adding --debug to
the command line when it is run you can check the behavior of the stack by
seeing what is sent and received.
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
#
# SampleApplication
#
class SampleApplication(BIPSimpleApplication):
def __init__(self, device, address):
if _debug: SampleApplication._debug("__init__ %r %r", device, address)
BIPSimpleApplication.__init__(self, device, address)
def request(self, apdu):
if _debug: SampleApplication._debug("request %r", apdu)
BIPSimpleApplication.request(self, apdu)
def indication(self, apdu):
if _debug: SampleApplication._debug("indication %r", apdu)
BIPSimpleApplication.indication(self, apdu)
def response(self, apdu):
if _debug: SampleApplication._debug("response %r", apdu)
BIPSimpleApplication.response(self, apdu)
def confirmation(self, apdu):
if _debug: SampleApplication._debug("confirmation %r", apdu)
BIPSimpleApplication.confirmation(self, apdu)
bacpypes_debugging(SampleApplication)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
vendorName="Hello",
)
# make a sample application
this_application = SampleApplication(this_device, args.ini.address)
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

101
samples/SampleConsoleCmd.py Executable file
View File

@ -0,0 +1,101 @@
#!/usr/bin/python
"""
This sample application is a simple BACpypes application that
presents a console prompt. Almost identical to the SampleApplication,
the BACnet application is minimal, but with the console commands
that match the command line options like 'buggers' and 'debug' the
user can add debugging "on the fly".
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
this_console = None
#
# SampleApplication
#
class SampleApplication(BIPSimpleApplication):
def __init__(self, device, address):
if _debug: SampleApplication._debug("__init__ %r %r", device, address)
BIPSimpleApplication.__init__(self, device, address)
def request(self, apdu):
if _debug: SampleApplication._debug("request %r", apdu)
BIPSimpleApplication.request(self, apdu)
def indication(self, apdu):
if _debug: SampleApplication._debug("indication %r", apdu)
BIPSimpleApplication.indication(self, apdu)
def response(self, apdu):
if _debug: SampleApplication._debug("response %r", apdu)
BIPSimpleApplication.response(self, apdu)
def confirmation(self, apdu):
if _debug: SampleApplication._debug("confirmation %r", apdu)
BIPSimpleApplication.confirmation(self, apdu)
bacpypes_debugging(SampleApplication)
#
# SampleConsoleCmd
#
class SampleConsoleCmd(ConsoleCmd):
def do_nothing(self, args):
"""nothing can be done"""
args = args.split()
if _debug: SampleConsoleCmd._debug("do_nothing %r", args)
bacpypes_debugging(SampleConsoleCmd)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a sample application
this_application = SampleApplication(this_device, args.ini.address)
this_console = SampleConsoleCmd()
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

135
samples/VendorAVObject.py Executable file
View File

@ -0,0 +1,135 @@
#!/usr/bin/python
"""
This sample application shows how to extend one of the basic objects, an Analog
Value Object in this case, to provide a present value. This type of code is used
when the application is providing a BACnet interface to a collection of data.
It assumes that almost all of the default behaviour of a BACpypes application is
sufficient.
"""
import random
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.primitivedata import Real
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import AnalogValueObject, Property, register_object_type
from bacpypes.errors import ExecutionError
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
vendor_id = 999
#
# RandomValueProperty
#
@bacpypes_debugging
class RandomValueProperty(Property):
def __init__(self, identifier):
if _debug: RandomValueProperty._debug("__init__ %r", identifier)
Property.__init__(self, identifier, Real, default=None, optional=True, mutable=False)
# writing to this property changes the multiplier
self.multiplier = 100.0
def ReadProperty(self, obj, arrayIndex=None):
if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex)
# access an array
if arrayIndex is not None:
raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray')
# return a random value
value = random.random() * self.multiplier
if _debug: RandomValueProperty._debug(" - value: %r", value)
return value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
# change the multiplier
self.multiplier = value
#
# Vendor Analog Value Object Type
#
@bacpypes_debugging
class VendorAVObject(AnalogValueObject):
objectType = 513
properties = [
RandomValueProperty(5504),
]
def __init__(self, **kwargs):
if _debug: VendorAVObject._debug("__init__ %r", kwargs)
AnalogValueObject.__init__(self, **kwargs)
register_object_type(VendorAVObject, vendor_id=vendor_id)
#
# main
#
@bacpypes_debugging
def main():
if _debug: main._debug("initialization")
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: main._debug("initialization")
if _debug: main._debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=vendor_id,
)
# make a sample application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# make some objects
ravo1 = VendorAVObject(
objectIdentifier=(513, 1), objectName='Random1'
)
if _debug: main._debug(" - ravo1: %r", ravo1)
ravo2 = VendorAVObject(
objectIdentifier=(513, 2), objectName='Random2'
)
if _debug: main._debug(" - ravo2: %r", ravo2)
# add it to the device
this_application.add_object(ravo1)
this_application.add_object(ravo2)
if _debug: main._debug(" - object list: %r", this_device.objectList)
if _debug: main._debug("running")
run()
except Exception, e:
main._exception("an error has occurred: %s", e)
finally:
if _debug: main._debug("finally")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,285 @@
#!/usr/bin/python
"""
This application presents a 'console' prompt to the user asking for commands.
For 'read' commands it will create ReadPropertyRequest PDUs, then lines up the
coorresponding ReadPropertyACK and prints the value. For 'write' commands it
will create WritePropertyRequst PDUs and prints out a simple acknowledgement.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run
from bacpypes.pdu import Address
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import get_object_class, get_datatype
from bacpypes.apdu import Error, AbortPDU, SimpleAckPDU, \
ReadPropertyRequest, ReadPropertyACK, WritePropertyRequest
from bacpypes.primitivedata import Null, Atomic, Integer, Unsigned, Real
from bacpypes.constructeddata import Array, Any
from bacpypes.basetypes import ServicesSupported
import VendorAVObject
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# ReadPropertyApplication
#
@bacpypes_debugging
class ReadPropertyApplication(BIPSimpleApplication):
def __init__(self, *args):
if _debug: ReadPropertyApplication._debug("__init__ %r", args)
BIPSimpleApplication.__init__(self, *args)
# keep track of requests to line up responses
self._request = None
def request(self, apdu):
if _debug: ReadPropertyApplication._debug("request %r", apdu)
# save a copy of the request
self._request = apdu
# forward it along
BIPSimpleApplication.request(self, apdu)
def confirmation(self, apdu):
if _debug: ReadPropertyApplication._debug("confirmation %r", apdu)
if isinstance(apdu, Error):
sys.stdout.write("error: %s\n" % (apdu.errorCode,))
sys.stdout.flush()
elif isinstance(apdu, AbortPDU):
apdu.debug_contents()
if isinstance(apdu, SimpleAckPDU):
sys.stdout.write("ack\n")
sys.stdout.flush()
elif (isinstance(self._request, ReadPropertyRequest)) and (isinstance(apdu, ReadPropertyACK)):
# find the datatype
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier, VendorAVObject.vendor_id)
if _debug: ReadPropertyApplication._debug(" - datatype: %r", datatype)
if not datatype:
raise TypeError, "unknown datatype"
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None):
if apdu.propertyArrayIndex == 0:
value = apdu.propertyValue.cast_out(Unsigned)
else:
value = apdu.propertyValue.cast_out(datatype.subtype)
else:
value = apdu.propertyValue.cast_out(datatype)
if _debug: ReadPropertyApplication._debug(" - value: %r", value)
sys.stdout.write(str(value) + '\n')
sys.stdout.flush()
#
# ReadWritePropertyConsoleCmd
#
@bacpypes_debugging
class ReadWritePropertyConsoleCmd(ConsoleCmd):
def do_read(self, args):
"""read <addr> <type> <inst> <prop> [ <indx> ]"""
global this_application
args = args.split()
if _debug: ReadWritePropertyConsoleCmd._debug("do_read %r", args)
try:
addr, obj_type, obj_inst, prop_id = args[:4]
if obj_type.isdigit():
obj_type = int(obj_type)
elif not get_object_class(obj_type, VendorAVObject.vendor_id):
raise ValueError, "unknown object type"
if _debug: ReadWritePropertyConsoleCmd._debug(" - obj_type: %r", obj_type)
obj_inst = int(obj_inst)
if _debug: ReadWritePropertyConsoleCmd._debug(" - obj_inst: %r", obj_inst)
if prop_id.isdigit():
prop_id = int(prop_id)
if _debug: ReadWritePropertyConsoleCmd._debug(" - prop_id: %r", prop_id)
datatype = get_datatype(obj_type, prop_id, VendorAVObject.vendor_id)
if not datatype:
raise ValueError, "invalid property for object type"
# build a request
request = ReadPropertyRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id,
)
request.pduDestination = Address(addr)
if len(args) == 5:
request.propertyArrayIndex = int(args[4])
if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
ReadWritePropertyConsoleCmd._exception("exception: %r", e)
def do_write(self, args):
"""write <addr> <type> <inst> <prop> <value> [ <indx> ] [ <priority> ]"""
global this_application
args = args.split()
ReadWritePropertyConsoleCmd._debug("do_write %r", args)
try:
addr, obj_type, obj_inst, prop_id = args[:4]
if obj_type.isdigit():
obj_type = int(obj_type)
elif not get_object_class(obj_type, VendorAVObject.vendor_id):
raise ValueError, "unknown object type"
if _debug: ReadWritePropertyConsoleCmd._debug(" - obj_type: %r", obj_type)
obj_inst = int(obj_inst)
if _debug: ReadWritePropertyConsoleCmd._debug(" - obj_inst: %r", obj_inst)
if prop_id.isdigit():
prop_id = int(prop_id)
if _debug: ReadWritePropertyConsoleCmd._debug(" - prop_id: %r", prop_id)
value = args[4]
indx = None
if len(args) >= 6:
if args[5] != "-":
indx = int(args[5])
if _debug: ReadWritePropertyConsoleCmd._debug(" - indx: %r", indx)
priority = None
if len(args) >= 7:
priority = int(args[6])
if _debug: ReadWritePropertyConsoleCmd._debug(" - priority: %r", priority)
# get the datatype
datatype = get_datatype(obj_type, prop_id, VendorAVObject.vendor_id)
if _debug: ReadWritePropertyConsoleCmd._debug(" - datatype: %r", datatype)
# change atomic values into something encodeable, null is a special case
if (value == 'null'):
value = Null()
elif issubclass(datatype, Atomic):
if datatype is Integer:
value = int(value)
elif datatype is Real:
value = float(value)
elif datatype is Unsigned:
value = int(value)
value = datatype(value)
elif issubclass(datatype, Array) and (indx is not None):
if indx == 0:
value = Integer(value)
elif issubclass(datatype.subtype, Atomic):
value = datatype.subtype(value)
elif not isinstance(value, datatype.subtype):
raise TypeError, "invalid result datatype, expecting %s" % (datatype.subtype.__name__,)
elif not isinstance(value, datatype):
raise TypeError, "invalid result datatype, expecting %s" % (datatype.__name__,)
if _debug: ReadWritePropertyConsoleCmd._debug(" - encodeable value: %r %s", value, type(value))
# build a request
request = WritePropertyRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id
)
request.pduDestination = Address(addr)
# save the value
request.propertyValue = Any()
try:
request.propertyValue.cast_in(value)
except Exception, e:
ReadWritePropertyConsoleCmd._exception("WriteProperty cast error: %r", e)
# optional array index
if indx is not None:
request.propertyArrayIndex = indx
# optional priority
if priority is not None:
request.priority = priority
if _debug: ReadWritePropertyConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
ReadWritePropertyConsoleCmd._exception("exception: %r", e)
#
# main
#
@bacpypes_debugging
def main():
if _debug: main._debug("initialization")
global this_application
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: main._debug("initialization")
if _debug: main._debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['writeProperty'] = 1
# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value
# make a simple application
this_application = ReadPropertyApplication(this_device, args.ini.address)
this_console = ReadWritePropertyConsoleCmd()
main._debug("running")
run()
except Exception, e:
main._exception("an error has occurred: %s", e)
finally:
main._debug("finally")
if __name__ == '__main__':
main()

116
samples/WhoHasIHaveApplication.py Executable file
View File

@ -0,0 +1,116 @@
#!/usr/bin/python
"""
This sample application builds on the first sample by overriding the default
processing for Who-Has and I-Have requests, counting them, then continuing on
with the regular processing. After the run() function has completed it will
dump a formatted summary of the requests it has received. Note that these
services are relatively rare even in large networks.
"""
from collections import defaultdict
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
# counters
who_has_counter = defaultdict(int)
i_have_counter = defaultdict(int)
#
# WhoHasIHaveApplication
#
@bacpypes_debugging
class WhoHasIHaveApplication(BIPSimpleApplication):
def __init__(self, device, address):
if _debug: WhoHasIHaveApplication._debug("__init__ %r %r", device, address)
BIPSimpleApplication.__init__(self, device, address)
def do_WhoHasRequest(self, apdu):
"""Respond to a Who-Has request."""
if _debug: WhoHasIHaveApplication._debug("do_WhoHasRequest, %r", apdu)
key = (str(apdu.pduSource),)
if apdu.object.objectIdentifier is not None:
key += (str(apdu.object.objectIdentifier),)
elif apdu.object.objectName is not None:
key += (apdu.object.objectName,)
else:
print "(rejected APDU:"
apdu.debug_contents()
print ")"
return
# count the times this has been received
who_has_counter[key] += 1
def do_IHaveRequest(self, apdu):
"""Respond to a I-Have request."""
if _debug: WhoHasIHaveApplication._debug("do_IHaveRequest %r", apdu)
key = (
str(apdu.pduSource),
str(apdu.deviceIdentifier),
str(apdu.objectIdentifier),
apdu.objectName
)
# count the times this has been received
i_have_counter[key] += 1
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a sample application
this_application = WhoHasIHaveApplication(this_device, args.ini.address)
_log.debug("running")
# run until stopped, ^C works
run()
print "----- Who Has -----"
for (src, objname), count in sorted(who_has_counter.items()):
print "%-20s %-30s %4d" % (src, objname, count)
print
print "----- I Have -----"
for (src, devid, objid, objname), count in sorted(i_have_counter.items()):
print "%-20s %-20s %-20s %-20s %4d" % (src, devid, objid, objname, count)
print
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

187
samples/WhoIsIAm.py Executable file
View File

@ -0,0 +1,187 @@
#!/usr/bin/python
"""
This application presents a 'console' prompt to the user asking for Who-Is and I-Am
commands which create the related APDUs, then lines up the coorresponding I-Am
for incoming traffic and prints out the contents.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run
from bacpypes.pdu import Address, GlobalBroadcast
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.apdu import WhoIsRequest, IAmRequest
from bacpypes.basetypes import ServicesSupported
from bacpypes.errors import DecodingError
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
this_console = None
#
# WhoIsIAmApplication
#
class WhoIsIAmApplication(BIPSimpleApplication):
def __init__(self, *args):
if _debug: WhoIsIAmApplication._debug("__init__ %r", args)
BIPSimpleApplication.__init__(self, *args)
# keep track of requests to line up responses
self._request = None
def request(self, apdu):
if _debug: WhoIsIAmApplication._debug("request %r", apdu)
# save a copy of the request
self._request = apdu
# forward it along
BIPSimpleApplication.request(self, apdu)
def confirmation(self, apdu):
if _debug: WhoIsIAmApplication._debug("confirmation %r", apdu)
# forward it along
BIPSimpleApplication.confirmation(self, apdu)
def indication(self, apdu):
if _debug: WhoIsIAmApplication._debug("indication %r", apdu)
if (isinstance(self._request, WhoIsRequest)) and (isinstance(apdu, IAmRequest)):
device_type, device_instance = apdu.iAmDeviceIdentifier
if device_type != 'device':
raise DecodingError, "invalid object type"
if (self._request.deviceInstanceRangeLowLimit is not None) and \
(device_instance < self._request.deviceInstanceRangeLowLimit):
pass
elif (self._request.deviceInstanceRangeHighLimit is not None) and \
(device_instance > self._request.deviceInstanceRangeHighLimit):
pass
else:
# print out the contents
sys.stdout.write('pduSource = ' + repr(apdu.pduSource) + '\n')
sys.stdout.write('iAmDeviceIdentifier = ' + str(apdu.iAmDeviceIdentifier) + '\n')
sys.stdout.write('maxAPDULengthAccepted = ' + str(apdu.maxAPDULengthAccepted) + '\n')
sys.stdout.write('segmentationSupported = ' + str(apdu.segmentationSupported) + '\n')
sys.stdout.write('vendorID = ' + str(apdu.vendorID) + '\n')
sys.stdout.flush()
# forward it along
BIPSimpleApplication.indication(self, apdu)
bacpypes_debugging(WhoIsIAmApplication)
#
# WhoIsIAmConsoleCmd
#
class WhoIsIAmConsoleCmd(ConsoleCmd):
def do_whois(self, args):
"""whois [ <addr>] [ <lolimit> <hilimit> ]"""
args = args.split()
if _debug: WhoIsIAmConsoleCmd._debug("do_whois %r", args)
try:
# build a request
request = WhoIsRequest()
if (len(args) == 1) or (len(args) == 3):
request.pduDestination = Address(args[0])
del args[0]
else:
request.pduDestination = GlobalBroadcast()
if len(args) == 2:
request.deviceInstanceRangeLowLimit = int(args[0])
request.deviceInstanceRangeHighLimit = int(args[1])
if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
WhoIsIAmConsoleCmd._exception("exception: %r", e)
def do_iam(self, args):
"""iam"""
args = args.split()
if _debug: WhoIsIAmConsoleCmd._debug("do_iam %r", args)
try:
# build a request
request = IAmRequest()
request.pduDestination = GlobalBroadcast()
# set the parameters from the device object
request.iAmDeviceIdentifier = this_device.objectIdentifier
request.maxAPDULengthAccepted = this_device.maxApduLengthAccepted
request.segmentationSupported = this_device.segmentationSupported
request.vendorID = this_device.vendorIdentifier
if _debug: WhoIsIAmConsoleCmd._debug(" - request: %r", request)
# give it to the application
this_application.request(request)
except Exception, e:
WhoIsIAmConsoleCmd._exception("exception: %r", e)
bacpypes_debugging(WhoIsIAmConsoleCmd)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# build a bit string that knows about the bit names
pss = ServicesSupported()
pss['whoIs'] = 1
pss['iAm'] = 1
pss['readProperty'] = 1
pss['writeProperty'] = 1
# set the property value to be just the bits
this_device.protocolServicesSupported = pss.value
# make a simple application
this_application = WhoIsIAmApplication(this_device, args.ini.address)
this_console = WhoIsIAmConsoleCmd()
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

112
samples/WhoIsIAmApplication.py Executable file
View File

@ -0,0 +1,112 @@
#!/usr/bin/python
"""
This sample application builds on the first sample by overriding the default
processing for Who-Is and I-Am requests, counting them, then continuing on
with the regular processing. After the run() function has completed it will
dump a formatted summary of the requests it has received.
"""
from collections import defaultdict
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
# counters
who_is_counter = defaultdict(int)
i_am_counter = defaultdict(int)
#
# WhoIsIAmApplication
#
@bacpypes_debugging
class WhoIsIAmApplication(BIPSimpleApplication):
def __init__(self, device, address):
if _debug: WhoIsIAmApplication._debug("__init__ %r %r", device, address)
BIPSimpleApplication.__init__(self, device, address)
def do_WhoIsRequest(self, apdu):
"""Respond to a Who-Is request."""
if _debug: WhoIsIAmApplication._debug("do_WhoIsRequest %r", apdu)
# build a key from the source and parameters
key = (str(apdu.pduSource),
apdu.deviceInstanceRangeLowLimit,
apdu.deviceInstanceRangeHighLimit,
)
# count the times this has been received
who_is_counter[key] += 1
# pass back to the default implementation
BIPSimpleApplication.do_WhoIsRequest(self, apdu)
def do_IAmRequest(self, apdu):
"""Given an I-Am request, cache it."""
if _debug: WhoIsIAmApplication._debug("do_IAmRequest %r", apdu)
# build a key from the source, just use the instance number
key = (str(apdu.pduSource),
apdu.iAmDeviceIdentifier[1],
)
# count the times this has been received
i_am_counter[key] += 1
# no default implementation
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=int(args.ini.objectidentifier),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a sample application
this_application = WhoIsIAmApplication(this_device, args.ini.address)
_log.debug("running")
run()
print "----- Who Is -----"
for (src, lowlim, hilim), count in sorted(who_is_counter.items()):
print "%-20s %8s %8s %4d" % (src, lowlim, hilim, count)
print
print "----- I Am -----"
for (src, devid), count in sorted(i_am_counter.items()):
print "%-20s %8d %4d" % (src, devid, count)
print
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

128
samples/WhoIsRouter.py Executable file
View File

@ -0,0 +1,128 @@
#!/usr/bin/python
"""
This sample application has just a network stack, not a full application,
and is a way to create InitializeRoutingTable and WhoIsRouterToNetwork requests.
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.core import run
from bacpypes.pdu import Address
from bacpypes.npdu import InitializeRoutingTable, WhoIsRouterToNetwork
from bacpypes.app import BIPNetworkApplication
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_application = None
this_console = None
#
# WhoIsRouterApplication
#
@bacpypes_debugging
class WhoIsRouterApplication(BIPNetworkApplication):
def __init__(self, *args):
if _debug: WhoIsRouterApplication._debug("__init__ %r", args)
BIPNetworkApplication.__init__(self, *args)
# keep track of requests to line up responses
self._request = None
def request(self, adapter, npdu):
if _debug: WhoIsRouterApplication._debug("request %r %r", adapter, npdu)
# save a copy of the request
self._request = npdu
# forward it along
BIPNetworkApplication.request(self, adapter, npdu)
def indication(self, adapter, npdu):
if _debug: WhoIsRouterApplication._debug("indication %r %r", adapter, npdu)
BIPNetworkApplication.indication(self, adapter, npdu)
def response(self, adapter, npdu):
if _debug: WhoIsRouterApplication._debug("response %r %r", adapter, npdu)
BIPNetworkApplication.response(self, adapter, npdu)
def confirmation(self, adapter, npdu):
if _debug: WhoIsRouterApplication._debug("confirmation %r %r", adapter, npdu)
BIPNetworkApplication.confirmation(self, adapter, npdu)
#
# WhoIsRouterConsoleCmd
#
@bacpypes_debugging
class WhoIsRouterConsoleCmd(ConsoleCmd):
def do_irt(self, args):
"""irt <addr>"""
args = args.split()
if _debug: WhoIsRouterConsoleCmd._debug("do_irt %r", args)
# build a request
try:
request = InitializeRoutingTable()
request.pduDestination = Address(args[0])
except:
print "invalid arguments"
return
# give it to the application
this_application.request(this_application.nsap.adapters[0], request)
def do_wirtn(self, args):
"""wirtn <addr> [ <net> ]"""
args = args.split()
if _debug: WhoIsRouterConsoleCmd._debug("do_irt %r", args)
# build a request
try:
request = WhoIsRouterToNetwork()
request.pduDestination = Address(args[0])
if (len(args) > 1):
request.wirtnNetwork = int(args[1])
except:
print "invalid arguments"
return
# give it to the application
this_application.request(this_application.nsap.adapters[0], request)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a simple application
this_application = WhoIsRouterApplication(args.ini.address)
if _debug: _log.debug(" - this_application: %r", this_application)
# make a console
this_console = WhoIsRouterConsoleCmd()
if _debug: _log.debug(" - this_console: %r", this_console)
_log.debug("running")
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")