1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-10-13 00:42:44 +08:00

merging in the stage before continuing work

This commit is contained in:
Joel Bender 2015-12-18 21:11:01 -05:00
commit 99f43c8bf1
16 changed files with 763 additions and 49 deletions

View File

@ -5,3 +5,5 @@
Covered in dark, velvety chocolate, when you pop it into your Python path, stainless steel bolts spring out and plunge straight through both cheeks. Covered in dark, velvety chocolate, when you pop it into your Python path, stainless steel bolts spring out and plunge straight through both cheeks.
BACpypes provides a BACnet application layer and network layer written in Python for daemons, scripting, and graphical interfaces. BACpypes provides a BACnet application layer and network layer written in Python for daemons, scripting, and graphical interfaces.
[![Join the chat at https://gitter.im/JoelBender/bacpypes](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/JoelBender/bacpypes?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)

View File

@ -4,6 +4,7 @@
BACnet Virtual Link Layer Service BACnet Virtual Link Layer Service
""" """
import sys
import struct import struct
from time import time as _time from time import time as _time
@ -14,7 +15,8 @@ from .task import OneShotTask, RecurringTask
from .comm import Client, Server, bind, \ from .comm import Client, Server, bind, \
ServiceAccessPoint, ApplicationServiceElement ServiceAccessPoint, ApplicationServiceElement
from .pdu import Address, LocalBroadcast, LocalStation, PDU from .pdu import Address, LocalBroadcast, LocalStation, PDU, \
unpack_ip_addr
from .bvll import BVLPDU, DeleteForeignDeviceTableEntry, \ from .bvll import BVLPDU, DeleteForeignDeviceTableEntry, \
DistributeBroadcastToNetwork, FDTEntry, ForwardedNPDU, \ DistributeBroadcastToNetwork, FDTEntry, ForwardedNPDU, \
OriginalBroadcastNPDU, OriginalUnicastNPDU, \ OriginalBroadcastNPDU, OriginalUnicastNPDU, \
@ -88,8 +90,8 @@ class UDPMultiplexer:
self.directPort = UDPDirector(self.addrTuple) self.directPort = UDPDirector(self.addrTuple)
bind(self.direct, self.directPort) bind(self.direct, self.directPort)
# create and bind the broadcast address # create and bind the broadcast address for non-Windows
if specialBroadcast and (not noBroadcast): if specialBroadcast and (not noBroadcast) and 'win' not in sys.platform:
self.broadcast = _MultiplexClient(self) self.broadcast = _MultiplexClient(self)
self.broadcastPort = UDPDirector(self.addrBroadcastTuple, reuse=True) self.broadcastPort = UDPDirector(self.addrBroadcastTuple, reuse=True)
bind(self.direct, self.broadcastPort) bind(self.direct, self.broadcastPort)
@ -108,7 +110,7 @@ class UDPMultiplexer:
dest = self.addrBroadcastTuple dest = self.addrBroadcastTuple
if _debug: UDPMultiplexer._debug(" - requesting local broadcast: %r", dest) if _debug: UDPMultiplexer._debug(" - requesting local broadcast: %r", dest)
elif pdu.pduDestination.addrType == Address.localStationAddr: elif pdu.pduDestination.addrType == Address.localStationAddr:
dest = pdu.pduDestination.addrTuple dest = unpack_ip_addr(pdu.pduDestination.addrAddr)
if _debug: UDPMultiplexer._debug(" - requesting local station: %r", dest) if _debug: UDPMultiplexer._debug(" - requesting local station: %r", dest)
else: else:
raise RuntimeError("invalid destination address type") raise RuntimeError("invalid destination address type")

View File

@ -7,7 +7,7 @@ Console Communications
import sys import sys
import asyncore import asyncore
from .debugging import Logging, ModuleLogger from .debugging import bacpypes_debugging, ModuleLogger
from .core import deferred from .core import deferred
from .comm import PDU, Client, Server from .comm import PDU, Client, Server
@ -16,17 +16,25 @@ from .comm import PDU, Client, Server
_debug = 0 _debug = 0
_log = ModuleLogger(globals()) _log = ModuleLogger(globals())
#
# asyncore.file_dispatcher is only available in Unix. This is a hack that
# allows the ConsoleClient and ConsoleServer to initialize on Windows.
#
try: try:
asyncore.file_dispatcher asyncore.file_dispatcher
except: except:
class _barf: pass class _barf:
def __init__(self, *args):
pass
asyncore.file_dispatcher = _barf asyncore.file_dispatcher = _barf
# #
# ConsoleClient # ConsoleClient
# #
class ConsoleClient(asyncore.file_dispatcher, Client, Logging): class ConsoleClient(asyncore.file_dispatcher, Client):
def __init__(self, cid=None): def __init__(self, cid=None):
ConsoleClient._debug("__init__ cid=%r", cid) ConsoleClient._debug("__init__ cid=%r", cid)
@ -52,11 +60,13 @@ class ConsoleClient(asyncore.file_dispatcher, Client, Logging):
except Exception, err: except Exception, err:
ConsoleClient._exception("Confirmation sys.stdout.write exception: %r", err) ConsoleClient._exception("Confirmation sys.stdout.write exception: %r", err)
bacpypes_debugging(ConsoleClient)
# #
# ConsoleServer # ConsoleServer
# #
class ConsoleServer(asyncore.file_dispatcher, Server, Logging): class ConsoleServer(asyncore.file_dispatcher, Server):
def __init__(self, sid=None): def __init__(self, sid=None):
ConsoleServer._debug("__init__ sid=%r", sid) ConsoleServer._debug("__init__ sid=%r", sid)
@ -81,3 +91,5 @@ class ConsoleServer(asyncore.file_dispatcher, Server, Logging):
sys.stdout.write(pdu.pduData) sys.stdout.write(pdu.pduData)
except Exception, err: except Exception, err:
ConsoleServer._exception("Indication sys.stdout.write exception: %r", err) ConsoleServer._exception("Indication sys.stdout.write exception: %r", err)
bacpypes_debugging(ConsoleServer)

View File

@ -23,6 +23,7 @@ _log = ModuleLogger(globals())
# #
class Tag(object): class Tag(object):
applicationTagClass = 0 applicationTagClass = 0
contextTagClass = 1 contextTagClass = 1
openingTagClass = 2 openingTagClass = 2
@ -1155,6 +1156,8 @@ _date_patterns = [
class Date(Atomic): class Date(Atomic):
_app_tag = Tag.dateAppTag
def __init__(self, arg=None, year=255, month=255, day=255, day_of_week=255): def __init__(self, arg=None, year=255, month=255, day=255, day_of_week=255):
self.value = (year, month, day, day_of_week) self.value = (year, month, day, day_of_week)

View File

@ -4,6 +4,7 @@
BACnet Virtual Link Layer Service BACnet Virtual Link Layer Service
""" """
import sys
import struct import struct
from time import time as _time from time import time as _time
@ -14,7 +15,8 @@ from .task import OneShotTask, RecurringTask
from .comm import Client, Server, bind, \ from .comm import Client, Server, bind, \
ServiceAccessPoint, ApplicationServiceElement ServiceAccessPoint, ApplicationServiceElement
from .pdu import Address, LocalBroadcast, LocalStation, PDU from .pdu import Address, LocalBroadcast, LocalStation, PDU, \
unpack_ip_addr
from .bvll import BVLPDU, DeleteForeignDeviceTableEntry, \ from .bvll import BVLPDU, DeleteForeignDeviceTableEntry, \
DistributeBroadcastToNetwork, FDTEntry, ForwardedNPDU, \ DistributeBroadcastToNetwork, FDTEntry, ForwardedNPDU, \
OriginalBroadcastNPDU, OriginalUnicastNPDU, \ OriginalBroadcastNPDU, OriginalUnicastNPDU, \
@ -89,8 +91,8 @@ class UDPMultiplexer:
self.directPort = UDPDirector(self.addrTuple) self.directPort = UDPDirector(self.addrTuple)
bind(self.direct, self.directPort) bind(self.direct, self.directPort)
# create and bind the broadcast address # create and bind the broadcast address for non-Windows
if specialBroadcast and (not noBroadcast): if specialBroadcast and (not noBroadcast) and 'win' not in sys.platform:
self.broadcast = _MultiplexClient(self) self.broadcast = _MultiplexClient(self)
self.broadcastPort = UDPDirector(self.addrBroadcastTuple, reuse=True) self.broadcastPort = UDPDirector(self.addrBroadcastTuple, reuse=True)
bind(self.direct, self.broadcastPort) bind(self.direct, self.broadcastPort)
@ -109,7 +111,7 @@ class UDPMultiplexer:
dest = self.addrBroadcastTuple dest = self.addrBroadcastTuple
if _debug: UDPMultiplexer._debug(" - requesting local broadcast: %r", dest) if _debug: UDPMultiplexer._debug(" - requesting local broadcast: %r", dest)
elif pdu.pduDestination.addrType == Address.localStationAddr: elif pdu.pduDestination.addrType == Address.localStationAddr:
dest = pdu.pduDestination.addrTuple dest = unpack_ip_addr(pdu.pduDestination.addrAddr)
if _debug: UDPMultiplexer._debug(" - requesting local station: %r", dest) if _debug: UDPMultiplexer._debug(" - requesting local station: %r", dest)
else: else:
raise RuntimeError("invalid destination address type") raise RuntimeError("invalid destination address type")

View File

@ -7,7 +7,7 @@ Console Communications
import sys import sys
import asyncore import asyncore
from .debugging import Logging, ModuleLogger from .debugging import bacpypes_debugging, ModuleLogger
from .core import deferred from .core import deferred
from .comm import PDU, Client, Server from .comm import PDU, Client, Server
@ -16,20 +16,29 @@ from .comm import PDU, Client, Server
_debug = 0 _debug = 0
_log = ModuleLogger(globals()) _log = ModuleLogger(globals())
#
# asyncore.file_dispatcher is only available in Unix. This is a hack that
# allows the ConsoleClient and ConsoleServer to initialize on Windows.
#
try: try:
asyncore.file_dispatcher asyncore.file_dispatcher
except: except:
class _barf: pass class _barf:
def __init__(self, *args):
pass
asyncore.file_dispatcher = _barf asyncore.file_dispatcher = _barf
# #
# ConsoleClient # ConsoleClient
# #
class ConsoleClient(asyncore.file_dispatcher, Client, Logging): @bacpypes_debugging
class ConsoleClient(asyncore.file_dispatcher, Client):
def __init__(self, cid=None): def __init__(self, cid=None):
ConsoleClient._debug("__init__ cid=%r", cid) if _debug: ConsoleClient._debug("__init__ cid=%r", cid)
asyncore.file_dispatcher.__init__(self, sys.stdin) asyncore.file_dispatcher.__init__(self, sys.stdin)
Client.__init__(self, cid) Client.__init__(self, cid)
@ -40,13 +49,13 @@ class ConsoleClient(asyncore.file_dispatcher, Client, Logging):
return False # we don't have anything to write return False # we don't have anything to write
def handle_read(self): def handle_read(self):
deferred(ConsoleClient._debug, "handle_read") if _debug: deferred(ConsoleClient._debug, "handle_read")
data = sys.stdin.read() data = sys.stdin.read()
deferred(ConsoleClient._debug, " - data: %r", data) if _debug: deferred(ConsoleClient._debug, " - data: %r", data)
deferred(self.request, PDU(data)) deferred(self.request, PDU(data))
def confirmation(self, pdu): def confirmation(self, pdu):
deferred(ConsoleClient._debug, "confirmation %r", pdu) if _debug: deferred(ConsoleClient._debug, "confirmation %r", pdu)
try: try:
sys.stdout.write(pdu.pduData) sys.stdout.write(pdu.pduData)
except Exception as err: except Exception as err:
@ -56,10 +65,11 @@ class ConsoleClient(asyncore.file_dispatcher, Client, Logging):
# ConsoleServer # ConsoleServer
# #
class ConsoleServer(asyncore.file_dispatcher, Server, Logging): @bacpypes_debugging
class ConsoleServer(asyncore.file_dispatcher, Server):
def __init__(self, sid=None): def __init__(self, sid=None):
ConsoleServer._debug("__init__ sid=%r", sid) if _debug: ConsoleServer._debug("__init__ sid=%r", sid)
asyncore.file_dispatcher.__init__(self, sys.stdin) asyncore.file_dispatcher.__init__(self, sys.stdin)
Server.__init__(self, sid) Server.__init__(self, sid)
@ -70,13 +80,13 @@ class ConsoleServer(asyncore.file_dispatcher, Server, Logging):
return False # we don't have anything to write return False # we don't have anything to write
def handle_read(self): def handle_read(self):
deferred(ConsoleServer._debug, "handle_read") if _debug: deferred(ConsoleServer._debug, "handle_read")
data = sys.stdin.read() data = sys.stdin.read()
deferred(ConsoleServer._debug, " - data: %r", data) if _debug: deferred(ConsoleServer._debug, " - data: %r", data)
deferred(self.response, PDU(data)) deferred(self.response, PDU(data))
def indication(self, pdu): def indication(self, pdu):
deferred(ConsoleServer._debug, "Indication %r", pdu) if _debug: deferred(ConsoleServer._debug, "Indication %r", pdu)
try: try:
sys.stdout.write(pdu.pduData) sys.stdout.write(pdu.pduData)
except Exception as err: except Exception as err:

View File

@ -23,6 +23,7 @@ _log = ModuleLogger(globals())
# #
class Tag(object): class Tag(object):
applicationTagClass = 0 applicationTagClass = 0
contextTagClass = 1 contextTagClass = 1
openingTagClass = 2 openingTagClass = 2
@ -1161,6 +1162,8 @@ _date_patterns = [
class Date(Atomic): class Date(Atomic):
_app_tag = Tag.dateAppTag
def __init__(self, arg=None, year=255, month=255, day=255, day_of_week=255): def __init__(self, arg=None, year=255, month=255, day=255, day_of_week=255):
self.value = (year, month, day, day_of_week) self.value = (year, month, day, day_of_week)

View File

@ -4,6 +4,7 @@
BACnet Virtual Link Layer Service BACnet Virtual Link Layer Service
""" """
import sys
import struct import struct
from time import time as _time from time import time as _time
@ -14,7 +15,8 @@ from .task import OneShotTask, RecurringTask
from .comm import Client, Server, bind, \ from .comm import Client, Server, bind, \
ServiceAccessPoint, ApplicationServiceElement ServiceAccessPoint, ApplicationServiceElement
from .pdu import Address, LocalBroadcast, LocalStation, PDU from .pdu import Address, LocalBroadcast, LocalStation, PDU, \
unpack_ip_addr
from .bvll import BVLPDU, DeleteForeignDeviceTableEntry, \ from .bvll import BVLPDU, DeleteForeignDeviceTableEntry, \
DistributeBroadcastToNetwork, FDTEntry, ForwardedNPDU, \ DistributeBroadcastToNetwork, FDTEntry, ForwardedNPDU, \
OriginalBroadcastNPDU, OriginalUnicastNPDU, \ OriginalBroadcastNPDU, OriginalUnicastNPDU, \
@ -89,14 +91,13 @@ class UDPMultiplexer:
self.directPort = UDPDirector(self.addrTuple) self.directPort = UDPDirector(self.addrTuple)
bind(self.direct, self.directPort) bind(self.direct, self.directPort)
# create and bind the broadcast address # create and bind the broadcast address for non-Windows
if specialBroadcast and (not noBroadcast): if specialBroadcast and (not noBroadcast) and 'win' not in sys.platform:
self.broadcast = _MultiplexClient(self) self.broadcast = _MultiplexClient(self)
self.broadcastPort = UDPDirector(self.addrBroadcastTuple, reuse=True) self.broadcastPort = UDPDirector(self.addrBroadcastTuple, reuse=True)
bind(self.direct, self.broadcastPort) bind(self.direct, self.broadcastPort)
else: else:
self.broadcast = None self.broadcast = None
# create and bind the Annex H and J servers # create and bind the Annex H and J servers
self.annexH = _MultiplexServer(self) self.annexH = _MultiplexServer(self)
self.annexJ = _MultiplexServer(self) self.annexJ = _MultiplexServer(self)

View File

@ -7,7 +7,7 @@ Console Communications
import sys import sys
import asyncore import asyncore
from .debugging import Logging, ModuleLogger from .debugging import bacpypes_debugging, ModuleLogger
from .core import deferred from .core import deferred
from .comm import PDU, Client, Server from .comm import PDU, Client, Server
@ -16,20 +16,29 @@ from .comm import PDU, Client, Server
_debug = 0 _debug = 0
_log = ModuleLogger(globals()) _log = ModuleLogger(globals())
#
# asyncore.file_dispatcher is only available in Unix. This is a hack that
# allows the ConsoleClient and ConsoleServer to initialize on Windows.
#
try: try:
asyncore.file_dispatcher asyncore.file_dispatcher
except: except:
class _barf: pass class _barf:
def __init__(self, *args):
pass
asyncore.file_dispatcher = _barf asyncore.file_dispatcher = _barf
# #
# ConsoleClient # ConsoleClient
# #
class ConsoleClient(asyncore.file_dispatcher, Client, Logging): @bacpypes_debugging
class ConsoleClient(asyncore.file_dispatcher, Client):
def __init__(self, cid=None): def __init__(self, cid=None):
ConsoleClient._debug("__init__ cid=%r", cid) if _debug: ConsoleClient._debug("__init__ cid=%r", cid)
asyncore.file_dispatcher.__init__(self, sys.stdin) asyncore.file_dispatcher.__init__(self, sys.stdin)
Client.__init__(self, cid) Client.__init__(self, cid)
@ -40,15 +49,15 @@ class ConsoleClient(asyncore.file_dispatcher, Client, Logging):
return False # we don't have anything to write return False # we don't have anything to write
def handle_read(self): def handle_read(self):
deferred(ConsoleClient._debug, "handle_read") if _debug: deferred(ConsoleClient._debug, "handle_read")
data = sys.stdin.read() data = sys.stdin.read()
deferred(ConsoleClient._debug, " - data: %r", data) if _debug: deferred(ConsoleClient._debug, " - data: %r", data)
deferred(self.request, PDU(data)) deferred(self.request, PDU(data.encode('utf_8')))
def confirmation(self, pdu): def confirmation(self, pdu):
deferred(ConsoleClient._debug, "confirmation %r", pdu) if _debug: deferred(ConsoleClient._debug, "confirmation %r", pdu)
try: try:
sys.stdout.write(pdu.pduData) sys.stdout.write(pdu.pduData.decode('utf_8'))
except Exception as err: except Exception as err:
ConsoleClient._exception("Confirmation sys.stdout.write exception: %r", err) ConsoleClient._exception("Confirmation sys.stdout.write exception: %r", err)
@ -56,10 +65,11 @@ class ConsoleClient(asyncore.file_dispatcher, Client, Logging):
# ConsoleServer # ConsoleServer
# #
class ConsoleServer(asyncore.file_dispatcher, Server, Logging): @bacpypes_debugging
class ConsoleServer(asyncore.file_dispatcher, Server):
def __init__(self, sid=None): def __init__(self, sid=None):
ConsoleServer._debug("__init__ sid=%r", sid) if _debug: ConsoleServer._debug("__init__ sid=%r", sid)
asyncore.file_dispatcher.__init__(self, sys.stdin) asyncore.file_dispatcher.__init__(self, sys.stdin)
Server.__init__(self, sid) Server.__init__(self, sid)
@ -70,14 +80,14 @@ class ConsoleServer(asyncore.file_dispatcher, Server, Logging):
return False # we don't have anything to write return False # we don't have anything to write
def handle_read(self): def handle_read(self):
deferred(ConsoleServer._debug, "handle_read") if _debug: deferred(ConsoleServer._debug, "handle_read")
data = sys.stdin.read() data = sys.stdin.read()
deferred(ConsoleServer._debug, " - data: %r", data) if _debug: deferred(ConsoleServer._debug, " - data: %r", data)
deferred(self.response, PDU(data)) deferred(self.response, PDU(data.encode('utf_8')))
def indication(self, pdu): def indication(self, pdu):
deferred(ConsoleServer._debug, "Indication %r", pdu) if _debug: deferred(ConsoleServer._debug, "Indication %r", pdu)
try: try:
sys.stdout.write(pdu.pduData) sys.stdout.write(pdu.pduData.decode('utf_8'))
except Exception as err: except Exception as err:
ConsoleServer._exception("Indication sys.stdout.write exception: %r", err) ConsoleServer._exception("Indication sys.stdout.write exception: %r", err)

View File

@ -23,6 +23,7 @@ _log = ModuleLogger(globals())
# #
class Tag(object): class Tag(object):
applicationTagClass = 0 applicationTagClass = 0
contextTagClass = 1 contextTagClass = 1
openingTagClass = 2 openingTagClass = 2
@ -1169,6 +1170,8 @@ _date_patterns = [
class Date(Atomic): class Date(Atomic):
_app_tag = Tag.dateAppTag
def __init__(self, arg=None, year=255, month=255, day=255, day_of_week=255): def __init__(self, arg=None, year=255, month=255, day=255, day_of_week=255):
self.value = (year, month, day, day_of_week) self.value = (year, month, day, day_of_week)

View File

@ -7,8 +7,8 @@
rm -Rfv build/ rm -Rfv build/
# Python 2.5 doesn't support wheels # Python 2.5 doesn't support wheels
python2.5 setup.py bdist_egg # python2.5 setup.py bdist_egg
rm -Rfv build/ # rm -Rfv build/
for ver in 2.7 3.4; do for ver in 2.7 3.4; do
python$ver setup.py bdist_egg python$ver setup.py bdist_egg

252
samples/BBMD2VLANRouter.py Executable file
View File

@ -0,0 +1,252 @@
#!/usr/bin/python
"""
This sample application presents itself as a BBMD sitting on an IP network
that is also a router to a VLAN. The VLAN has a device on it with an analog
value object that returns a random value for the present value.
$ python BBMD2VLANRouter.py addr1 net1 addr2 net2
addr1 - local address like 192.168.1.2/24:47808
net1 - network number
addr2 - local address like 12
net2 - network number
Note that the device instance number of the virtual device will be 100 times
the network number plus the address (net2 * 100 + addr2).
"""
import random
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ArgumentParser
from bacpypes.core import run
from bacpypes.comm import bind
from bacpypes.pdu import Address
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement
from bacpypes.bvllservice import BIPBBMD, AnnexJCodec, UDPMultiplexer
from bacpypes.app import LocalDeviceObject, Application
from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint
from bacpypes.primitivedata import Real
from bacpypes.object import AnalogValueObject, Property
from bacpypes.vlan import Network, Node
from bacpypes.errors import ExecutionError
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# RandomValueProperty
#
@bacpypes_debugging
class RandomValueProperty(Property):
def __init__(self, identifier):
if _debug: RandomValueProperty._debug("__init__ %r", identifier)
Property.__init__(self, identifier, Real, default=None, optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
if _debug: RandomValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex)
# access an array
if arrayIndex is not None:
raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray')
# return a random value
value = random.random() * 100.0
if _debug: RandomValueProperty._debug(" - value: %r", value)
return value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug: RandomValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# Random Value Object Type
#
@bacpypes_debugging
class RandomAnalogValueObject(AnalogValueObject):
properties = [
RandomValueProperty('presentValue'),
]
def __init__(self, **kwargs):
if _debug: RandomAnalogValueObject._debug("__init__ %r", kwargs)
AnalogValueObject.__init__(self, **kwargs)
#
# VLANApplication
#
@bacpypes_debugging
class VLANApplication(Application):
def __init__(self, vlan_device, vlan_address, aseID=None):
if _debug: VLANApplication._debug("__init__ %r %r aseID=%r", vlan_device, vlan_address, aseID)
Application.__init__(self, vlan_device, local_address, aseID)
# include a application decoder
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(vlan_device)
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
# bind the top layers
bind(self, self.asap, self.smap, self.nsap)
# create a vlan node at the assigned address
self.vlan_node = Node(vlan_address)
# bind the stack to the node, no network number
self.nsap.bind(self.vlan_node)
def request(self, apdu):
if _debug: VLANApplication._debug("[%s]request %r", self.vlan_node.address, apdu)
Application.request(self, apdu)
def indication(self, apdu):
if _debug: VLANApplication._debug("[%s]indication %r", self.vlan_node.address, apdu)
Application.indication(self, apdu)
def response(self, apdu):
if _debug: VLANApplication._debug("[%s]response %r", self.vlan_node.address, apdu)
Application.response(self, apdu)
def confirmation(self, apdu):
if _debug: VLANApplication._debug("[%s]confirmation %r", self.vlan_node.address, apdu)
Application.confirmation(self, apdu)
#
# VLANRouter
#
@bacpypes_debugging
class VLANRouter:
def __init__(self, local_address, local_network):
if _debug: VLANRouter._debug("__init__ %r %r", local_address, local_network)
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
# create a BBMD, bound to the Annex J server
# on the UDP multiplexer
self.bip = BIPBBMD(local_address)
self.annexj = AnnexJCodec()
self.mux = UDPMultiplexer(local_address)
# bind the bottom layers
bind(self.bip, self.annexj, self.mux.annexJ)
# bind the BIP stack to the local network
self.nsap.bind(self.bip, local_network, local_address)
#
# __main__
#
# parse the command line arguments
parser = ArgumentParser(description=__doc__)
# add an argument for interval
parser.add_argument('addr1', type=str,
help='address of first network',
)
# add an argument for interval
parser.add_argument('net1', type=int,
help='network number of first network',
)
# add an argument for interval
parser.add_argument('addr2', type=str,
help='address of second network',
)
# add an argument for interval
parser.add_argument('net2', type=int,
help='network number of second network',
)
# now parse the arguments
args = parser.parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
local_address = Address(args.addr1)
local_network = args.net1
vlan_address = Address(args.addr2)
vlan_network = args.net2
# create the VLAN router, bind it to the local network
router = VLANRouter(local_address, local_network)
# create a VLAN
vlan = Network()
# create a node for the router, address 1 on the VLAN
router_node = Node(Address(1))
vlan.add_node(router_node)
# bind the router stack to the vlan network through this node
router.nsap.bind(router_node, vlan_network)
# device identifier is assigned from the address
device_instance = vlan_network * 100 + int(args.addr2)
_log.debug(" - device_instance: %r", device_instance)
# make a vlan device object
vlan_device = \
LocalDeviceObject(
objectName="VLAN Node %d" % (device_instance,),
objectIdentifier=('device', device_instance),
maxApduLengthAccepted=1024,
segmentationSupported='noSegmentation',
vendorIdentifier=15,
)
_log.debug(" - vlan_device: %r", vlan_device)
# make the application, add it to the network
vlan_app = VLANApplication(vlan_device, vlan_address)
vlan.add_node(vlan_app.vlan_node)
_log.debug(" - vlan_app: %r", vlan_app)
# make a random value object
ravo = RandomAnalogValueObject(
objectIdentifier=('analogValue', 1),
objectName='Device%d/Random1' % (device_instance,),
)
_log.debug(" - ravo1: %r", ravo)
# add it to the device
vlan_app.add_object(ravo)
_log.debug("running")
run()
_log.debug("fini")

View File

@ -0,0 +1,153 @@
#!/usr/bin/python
"""
This sample application mocks up an accumulator object.
"""
import random
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ConfigArgumentParser
from bacpypes.core import run
from bacpypes.primitivedata import Unsigned, Date, Time
from bacpypes.basetypes import DateTime
from bacpypes.app import LocalDeviceObject, BIPSimpleApplication
from bacpypes.object import AccumulatorObject, Property, register_object_type
from bacpypes.errors import ExecutionError
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
this_device = None
this_application = None
#
# RandomUnsignedValueProperty
#
@bacpypes_debugging
class RandomUnsignedValueProperty(Property):
def __init__(self, identifier):
if _debug: RandomUnsignedValueProperty._debug("__init__ %r", identifier)
Property.__init__(self, identifier, Unsigned, default=None, optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
if _debug: RandomUnsignedValueProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex)
# access an array
if arrayIndex is not None:
raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray')
# return a random value
value = int(random.random() * 100.0)
if _debug: RandomUnsignedValueProperty._debug(" - value: %r", value)
return value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug: RandomUnsignedValueProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# CurrentDateTimeProperty
#
@bacpypes_debugging
class CurrentDateTimeProperty(Property):
def __init__(self, identifier):
if _debug: CurrentDateTimeProperty._debug("__init__ %r", identifier)
Property.__init__(self, identifier, DateTime, default=None, optional=True, mutable=False)
def ReadProperty(self, obj, arrayIndex=None):
if _debug: CurrentDateTimeProperty._debug("ReadProperty %r arrayIndex=%r", obj, arrayIndex)
# access an array
if arrayIndex is not None:
raise ExecutionError(errorClass='property', errorCode='propertyIsNotAnArray')
# get the value
current_date = Date().now().value
current_time = Time().now().value
value = DateTime(date=current_date, time=current_time)
if _debug: CurrentDateTimeProperty._debug(" - value: %r", value)
return value
def WriteProperty(self, obj, value, arrayIndex=None, priority=None, direct=False):
if _debug: CurrentDateTimeProperty._debug("WriteProperty %r %r arrayIndex=%r priority=%r direct=%r", obj, value, arrayIndex, priority, direct)
raise ExecutionError(errorClass='property', errorCode='writeAccessDenied')
#
# Random Accumulator Object
#
@bacpypes_debugging
class RandomAccumulatorObject(AccumulatorObject):
properties = [
RandomUnsignedValueProperty('presentValue'),
CurrentDateTimeProperty('valueChangeTime'),
]
def __init__(self, **kwargs):
if _debug: RandomAccumulatorObject._debug("__init__ %r", kwargs)
AccumulatorObject.__init__(self, **kwargs)
register_object_type(RandomAccumulatorObject)
#
# __main__
#
try:
# parse the command line arguments
args = ConfigArgumentParser(description=__doc__).parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
# make a device object
this_device = LocalDeviceObject(
objectName=args.ini.objectname,
objectIdentifier=('device', int(args.ini.objectidentifier)),
maxApduLengthAccepted=int(args.ini.maxapdulengthaccepted),
segmentationSupported=args.ini.segmentationsupported,
vendorIdentifier=int(args.ini.vendoridentifier),
)
# make a sample application
this_application = BIPSimpleApplication(this_device, args.ini.address)
# get the services supported
services_supported = this_application.get_services_supported()
if _debug: _log.debug(" - services_supported: %r", services_supported)
# let the device object know
this_device.protocolServicesSupported = services_supported.value
# make a random input object
rao1 = RandomAccumulatorObject(
objectIdentifier=('accumulator', 1),
objectName='Random1',
statusFlags = [0, 0, 0, 0],
)
_log.debug(" - rao1: %r", rao1)
# add it to the device
this_application.add_object(rao1)
_log.debug(" - object list: %r", this_device.objectList)
run()
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

245
samples/UDPConsole.py Executable file
View File

@ -0,0 +1,245 @@
#!/usr/bin/python
"""
UDPConsole
==========
This is a sample application that is similar to the UDPMultiplexer. It opens
a socket for unicast messages and (optionally) another for broadcast messages.
Run this application with a BACpypes IP address parameter.
$ python UDPConsole.py <addr> [ --nobroadcast ]
The address can be one of the following forms:
192.168.1.10 - unicast socket, no broadcast socket, port 47808
192.168.1.10/24 - unicast socket, 192.168.1.255 broadcast socket, port 47808
192.168.1.10:12345 - unicast socket, no broadcast socket, port 12345
192.168.1.10/24:12345 - unicast socket, 192.168.1.255 broadcast socket, port 12345
any - special tuple ('', 47808)
any:12345 - special tuple ('', 12345)
Use the --nobroadcast option to prevent the application from opening the
broadcast socket when one would otherwise be opened.
To send a packet, enter in a string in the form <addr> <message> where <addr>
is a BACpyes IP address (which may include the socket) or '*' for a local
broadcast.
Linux/MacOS Test Cases
----------------------
Here are some test cases for Linux and MacOS.
Using Any
~~~~~~~~~
$ python samples/UDPConsole.py any
* hi
received u'hi' from ('10.0.1.5', 47808)
In this case the application received its own broadcast, but did not recognize
it as a broadcast message and did not recognize that it came from itself.
Broadcast messages from other devices sent to 255.255.255.255 or 10.0.1.255
are received, but also not recognized as broadcast messages.
Using the Local Address
~~~~~~~~~~~~~~~~~~~~~~~
$ python samples/UDPConsole.py 10.0.1.5
* hi
received u'hi' from self
In this case it received its own broadcast and it recognized that it came from
itself, but it did not recognize it as a broadcast message. Broadcast messages
from other devices sent to 255.255.255.255 or 10.0.1.255 are not received.
Using the CIDR Address
~~~~~~~~~~~~~~~~~~~~~~
$ python samples/UDPConsole.py 10.0.1.5/24
* hi
received broadcast u'hi' from self
In this case it received its own broadcast, recognized that it came from itself,
and also recognized it was sent as a broadcast message. Broadcast messages
from other devices sent to 255.255.255.255 are not received, but those sent to
10.0.1.255 are received and recognized as broadcast messages.
"""
import sys
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.udp import UDPDirector
from bacpypes.comm import Client, Server, bind
from bacpypes.pdu import Address, PDU
from bacpypes.core import run, stop
from bacpypes.consolelogging import ArgumentParser
from bacpypes.console import ConsoleClient
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
local_unicast_tuple = None
local_broadcast_tuple = None
#
# MiddleMan
#
@bacpypes_debugging
class MiddleMan(Client, Server):
def indication(self, pdu):
if _debug: MiddleMan._debug('indication %r', pdu)
if not pdu.pduData:
stop()
return
# decode the line and trim off the eol
line = pdu.pduData.decode('utf_8')[:-1]
if _debug: MiddleMan._debug(' - line: %r', line)
line_parts = line.split(' ', 1)
if _debug: MiddleMan._debug(' - line_parts: %r', line_parts)
if len(line_parts) != 2:
sys.stderr.write("err: invalid line: %r\n" % (line,))
return
addr, msg = line_parts
try:
address = Address(str(addr))
if _debug: MiddleMan._debug(' - address: %r', address)
except Exception as err:
sys.stderr.write("err: invalid address %r: %r\n" % (addr, err))
return
# check for a broadcast message
if address.addrType == Address.localBroadcastAddr:
dest = local_broadcast_tuple
if _debug: MiddleMan._debug(" - requesting local broadcast: %r", dest)
elif address.addrType == Address.localStationAddr:
dest = address.addrTuple
if _debug: MiddleMan._debug(" - requesting local station: %r", dest)
else:
sys.stderr.write("err: invalid destination address type\n")
return
# send it along
try:
self.request(PDU(msg.encode('utf_8'), destination=dest))
except Exception as err:
sys.stderr.write("err: %r\n" % (err,))
return
def confirmation(self, pdu):
if _debug: MiddleMan._debug('confirmation %r', pdu)
# decode the line
line = pdu.pduData.decode('utf_8')
if _debug: MiddleMan._debug(' - line: %r', line)
if pdu.pduSource == local_unicast_tuple:
sys.stdout.write("received %r from self\n" % (line,))
else:
sys.stdout.write("received %r from %s\n" % (
line, pdu.pduSource,
))
#
# BroadcastReceiver
#
@bacpypes_debugging
class BroadcastReceiver(Client):
def confirmation(self, pdu):
if _debug: BroadcastReceiver._debug('confirmation %r', pdu)
# decode the line
line = pdu.pduData.decode('utf_8')
if _debug: MiddleMan._debug(' - line: %r', line)
if pdu.pduSource == local_unicast_tuple:
sys.stdout.write("received broadcast %r from self\n" % (line,))
else:
sys.stdout.write("received broadcast %r from %s\n" % (
line, pdu.pduSource,
))
#
# __main__
#
try:
# parse the command line arguments
parser = ArgumentParser(description=__doc__)
parser.add_argument("address",
help="address of socket",
)
parser.add_argument("--nobroadcast",
action="store_true",
dest="noBroadcast",
default=False,
help="do not create a broadcast socket",
)
args = parser.parse_args()
if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)
if args.address == "any":
local_unicast_tuple = ('', 47808)
local_broadcast_tuple = ('255.255.255.255', 47808)
elif args.address.startswith("any:"):
port = int(args.address[4:])
local_unicast_tuple = ('', port)
local_broadcast_tuple = ('255.255.255.255', port)
else:
address = Address(args.address)
if _debug: _log.debug(" - local_address: %r", address)
local_unicast_tuple = address.addrTuple
local_broadcast_tuple = address.addrBroadcastTuple
if _debug: _log.debug(" - local_unicast_tuple: %r", local_unicast_tuple)
if _debug: _log.debug(" - local_broadcast_tuple: %r", local_broadcast_tuple)
console = ConsoleClient()
middle_man = MiddleMan()
unicast_director = UDPDirector(local_unicast_tuple)
bind(console, middle_man, unicast_director)
if args.noBroadcast:
_log.debug(" - skipping broadcast")
elif local_unicast_tuple == local_broadcast_tuple:
_log.debug(" - identical unicast and broadcast tuples")
elif local_broadcast_tuple[0] == '255.255.255.255':
_log.debug(" - special broadcast address only for sending")
else:
broadcast_receiver = BroadcastReceiver()
broadcast_director = UDPDirector(local_broadcast_tuple, reuse=True)
bind(broadcast_receiver, broadcast_director)
_log.debug("running")
run()
except Exception as e:
_log.exception("an error has occurred: %s", e)
finally:
_log.debug("finally")

View File

@ -140,6 +140,22 @@ class WhoIsIAmConsoleCmd(ConsoleCmd):
except Exception, e: except Exception, e:
WhoIsIAmConsoleCmd._exception("exception: %r", e) WhoIsIAmConsoleCmd._exception("exception: %r", e)
def do_rtn(self, args):
"""rtn <addr> <net> ... """
args = args.split()
if _debug: WhoIsIAmConsoleCmd._debug("do_rtn %r", args)
# safe to assume only one adapter
adapter = this_application.nsap.adapters[0]
if _debug: WhoIsIAmConsoleCmd._debug(" - adapter: %r", adapter)
# provide the address and a list of network numbers
router_address = Address(args[0])
network_list = [int(arg) for arg in args[1:]]
# pass along to the service access point
this_application.nsap.add_router_references(adapter, router_address, network_list)
bacpypes_debugging(WhoIsIAmConsoleCmd) bacpypes_debugging(WhoIsIAmConsoleCmd)
# #

6
setup.py Executable file → Normal file
View File

@ -24,13 +24,13 @@ test_requirements = [
] ]
setup( setup(
name='bacpypes', name="bacpypes",
version="0.13.3", version="0.13.7",
description="Testing multiple versions of python", description="Testing multiple versions of python",
long_description="This is a long line of text", long_description="This is a long line of text",
author="Joel Bender", author="Joel Bender",
author_email="joel@carrickbender.com", author_email="joel@carrickbender.com",
url="http://something.com", url="https://github.com/JoelBender/bacpypes",
packages=[ packages=[
'bacpypes', 'bacpypes',
], ],