mirror of
https://github.com/JoelBender/bacpypes
synced 2025-09-28 22:15:23 +08:00
339 lines
9.0 KiB
Python
Executable File
339 lines
9.0 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
"""
|
|
"""
|
|
|
|
import sys
|
|
|
|
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
|
|
from bacpypes.consolelogging import ArgumentParser
|
|
|
|
from bacpypes.core import run, deferred
|
|
from bacpypes.comm import bind
|
|
|
|
from bacpypes.pdu import Address
|
|
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement
|
|
from bacpypes.bvllservice import BIPBBMD, AnnexJCodec, UDPMultiplexer
|
|
|
|
from bacpypes.app import Application
|
|
from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint
|
|
from bacpypes.service.device import LocalDeviceObject, WhoIsIAmServices
|
|
from bacpypes.service.object import ReadWritePropertyServices
|
|
|
|
from bacpypes.apdu import ReadPropertyRequest
|
|
|
|
from bacpypes.vlan import Network, Node
|
|
from bacpypes.errors import ExecutionError
|
|
|
|
# some debugging
|
|
_debug = 0
|
|
_log = ModuleLogger(globals())
|
|
|
|
#
|
|
# VLANApplication
|
|
#
|
|
|
|
@bacpypes_debugging
|
|
class VLANApplication(Application, WhoIsIAmServices, ReadWritePropertyServices):
|
|
|
|
def __init__(self, objectName, deviceInstance, address, aseID=None):
|
|
if _debug: VLANApplication._debug("__init__ %r %r %r aseID=%r", objectName, deviceInstance, address, aseID)
|
|
|
|
# make an address
|
|
vlan_address = Address(address)
|
|
_log.debug(" - vlan_address: %r", vlan_address)
|
|
|
|
# make a device object
|
|
vlan_device = LocalDeviceObject(
|
|
objectName=objectName,
|
|
objectIdentifier=('device', deviceInstance),
|
|
maxApduLengthAccepted=1024,
|
|
segmentationSupported='noSegmentation',
|
|
vendorIdentifier=15,
|
|
)
|
|
_log.debug(" - vlan_device: %r", vlan_device)
|
|
|
|
# continue with the initialization
|
|
Application.__init__(self, vlan_device, vlan_address, aseID)
|
|
|
|
# include a application decoder
|
|
self.asap = ApplicationServiceAccessPoint()
|
|
|
|
# pass the device object to the state machine access point so it
|
|
# can know if it should support segmentation
|
|
self.smap = StateMachineAccessPoint(vlan_device)
|
|
|
|
# the segmentation state machines need access to the same device
|
|
# information cache as the application
|
|
self.smap.deviceInfoCache = self.deviceInfoCache
|
|
|
|
# a network service access point will be needed
|
|
self.nsap = NetworkServiceAccessPoint()
|
|
|
|
# give the NSAP a generic network layer service element
|
|
self.nse = NetworkServiceElement()
|
|
bind(self.nse, self.nsap)
|
|
|
|
# bind the top layers
|
|
bind(self, self.asap, self.smap, self.nsap)
|
|
|
|
# create a vlan node at the assigned address
|
|
self.vlan_node = Node(vlan_address)
|
|
if _debug: VLANApplication._debug(" - vlan_node: %r", self.vlan_node)
|
|
|
|
# bind the stack to the node, no network number
|
|
self.nsap.bind(self.vlan_node)
|
|
if _debug: VLANApplication._debug(" - node bound")
|
|
|
|
def request(self, apdu):
|
|
if _debug: VLANApplication._debug("[%s]request %r", self.localDevice.objectName, apdu)
|
|
Application.request(self, apdu)
|
|
|
|
def indication(self, apdu):
|
|
if _debug: VLANApplication._debug("[%s]indication %r", self.localDevice.objectName, apdu)
|
|
Application.indication(self, apdu)
|
|
|
|
def response(self, apdu):
|
|
if _debug: VLANApplication._debug("[%s]response %r", self.localDevice.objectName, apdu)
|
|
Application.response(self, apdu)
|
|
|
|
def confirmation(self, apdu):
|
|
if _debug: VLANApplication._debug("[%s]confirmation %r", self.localDevice.objectName, apdu)
|
|
|
|
#
|
|
# VLANRouter
|
|
#
|
|
|
|
@bacpypes_debugging
|
|
class VLANRouter:
|
|
|
|
def __init__(self):
|
|
if _debug: VLANRouter._debug("__init__")
|
|
|
|
# a network service access point will be needed
|
|
self.nsap = NetworkServiceAccessPoint()
|
|
|
|
# give the NSAP a generic network layer service element
|
|
self.nse = NetworkServiceElement()
|
|
bind(self.nse, self.nsap)
|
|
|
|
def bind(self, vlan, address, net):
|
|
if _debug: VLANRouter._debug("bind %r %r %r", vlan, address, net)
|
|
|
|
# create a VLAN node for the router with the given address
|
|
vlan_node = Node(Address(address))
|
|
|
|
# add it to the VLAN
|
|
vlan.add_node(vlan_node)
|
|
|
|
# bind the router stack to the vlan network through this node
|
|
self.nsap.bind(vlan_node, net)
|
|
if _debug: _log.debug(" - bound to vlan")
|
|
|
|
#
|
|
# __main__
|
|
#
|
|
|
|
def main():
|
|
# parse the command line arguments
|
|
parser = ArgumentParser(description=__doc__)
|
|
|
|
# add an argument for which test to run
|
|
parser.add_argument('test_id', type=int,
|
|
help='test number',
|
|
)
|
|
|
|
# now parse the arguments
|
|
args = parser.parse_args()
|
|
|
|
if _debug: _log.debug("initialization")
|
|
if _debug: _log.debug(" - args: %r", args)
|
|
|
|
#
|
|
# Router1
|
|
#
|
|
|
|
# create the router
|
|
router1 = VLANRouter()
|
|
if _debug: _log.debug(" - router1: %r", router1)
|
|
|
|
#
|
|
# VLAN-1
|
|
#
|
|
|
|
# create VLAN-1
|
|
vlan1 = Network(name='1')
|
|
if _debug: _log.debug(" - vlan1: %r", vlan1)
|
|
|
|
# bind the router to the vlan
|
|
router1.bind(vlan1, 1, 1)
|
|
if _debug: _log.debug(" - router1 bound to VLAN-1")
|
|
|
|
# make the application, add it to the network
|
|
vlan1_app = VLANApplication(
|
|
objectName="VLAN Node 102",
|
|
deviceInstance=102,
|
|
address=2,
|
|
)
|
|
vlan1.add_node(vlan1_app.vlan_node)
|
|
_log.debug(" - vlan1_app: %r", vlan1_app)
|
|
|
|
#
|
|
# VLAN-2
|
|
#
|
|
|
|
# create VLAN-2
|
|
vlan2 = Network(name='2')
|
|
if _debug: _log.debug(" - vlan2: %r", vlan2)
|
|
|
|
# bind the router stack to the vlan network through this node
|
|
router1.bind(vlan2, 1, 2)
|
|
if _debug: _log.debug(" - router1 bound to VLAN-2")
|
|
|
|
# make the application, add it to the network
|
|
vlan2_app = VLANApplication(
|
|
objectName="VLAN Node 202",
|
|
deviceInstance=202,
|
|
address=2,
|
|
)
|
|
vlan2.add_node(vlan2_app.vlan_node)
|
|
_log.debug(" - vlan2_app: %r", vlan2_app)
|
|
|
|
#
|
|
# VLAN-3
|
|
#
|
|
|
|
# create VLAN-3
|
|
vlan3 = Network(name='3')
|
|
if _debug: _log.debug(" - vlan3: %r", vlan3)
|
|
|
|
# bind the router stack to the vlan network through this node
|
|
router1.bind(vlan3, 1, 3)
|
|
if _debug: _log.debug(" - router1 bound to VLAN-3")
|
|
|
|
# make a vlan device object
|
|
vlan3_device = \
|
|
LocalDeviceObject(
|
|
objectName="VLAN Node 302",
|
|
objectIdentifier=('device', 302),
|
|
maxApduLengthAccepted=1024,
|
|
segmentationSupported='noSegmentation',
|
|
vendorIdentifier=15,
|
|
)
|
|
_log.debug(" - vlan3_device: %r", vlan3_device)
|
|
|
|
# make the application, add it to the network
|
|
vlan3_app = VLANApplication(
|
|
objectName="VLAN Node 302",
|
|
deviceInstance=302,
|
|
address=2,
|
|
)
|
|
vlan3.add_node(vlan3_app.vlan_node)
|
|
_log.debug(" - vlan3_app: %r", vlan3_app)
|
|
|
|
|
|
#
|
|
# Router2
|
|
#
|
|
|
|
# create the router
|
|
router2 = VLANRouter()
|
|
if _debug: _log.debug(" - router2: %r", router2)
|
|
|
|
# bind the router stack to the vlan network through this node
|
|
router2.bind(vlan3, 255, 3)
|
|
if _debug: _log.debug(" - router2 bound to VLAN-3")
|
|
|
|
#
|
|
# VLAN-4
|
|
#
|
|
|
|
# create VLAN-4
|
|
vlan4 = Network(name='4')
|
|
if _debug: _log.debug(" - vlan4: %r", vlan4)
|
|
|
|
# bind the router stack to the vlan network through this node
|
|
router2.bind(vlan4, 1, 4)
|
|
if _debug: _log.debug(" - router2 bound to VLAN-4")
|
|
|
|
# make the application, add it to the network
|
|
vlan4_app = VLANApplication(
|
|
objectName="VLAN Node 402",
|
|
deviceInstance=402,
|
|
address=2,
|
|
)
|
|
vlan4.add_node(vlan4_app.vlan_node)
|
|
_log.debug(" - vlan4_app: %r", vlan4_app)
|
|
|
|
|
|
#
|
|
# Test 1
|
|
#
|
|
|
|
if args.test_id == 1:
|
|
# ask the first device to Who-Is everybody
|
|
deferred(vlan1_app.who_is)
|
|
|
|
|
|
#
|
|
# Test 2
|
|
#
|
|
|
|
if args.test_id == 2:
|
|
# make a read request
|
|
read_property_request = ReadPropertyRequest(
|
|
destination=Address("2:2"),
|
|
objectIdentifier=('device', 202),
|
|
propertyIdentifier='objectName',
|
|
)
|
|
|
|
# ask the first device to send it
|
|
deferred(vlan1_app.request, read_property_request)
|
|
|
|
|
|
#
|
|
# Test 3
|
|
#
|
|
|
|
if args.test_id == 3:
|
|
# make a read request
|
|
read_property_request = ReadPropertyRequest(
|
|
destination=Address("3:2"),
|
|
objectIdentifier=('device', 302),
|
|
propertyIdentifier='objectName',
|
|
)
|
|
|
|
# ask the first device to send it
|
|
deferred(vlan1_app.request, read_property_request)
|
|
|
|
|
|
#
|
|
# Test 4
|
|
#
|
|
|
|
if args.test_id == 4:
|
|
# make a read request
|
|
read_property_request = ReadPropertyRequest(
|
|
destination=Address("4:2"),
|
|
objectIdentifier=('device', 402),
|
|
propertyIdentifier='objectName',
|
|
)
|
|
|
|
# ask the first device to send it
|
|
deferred(vlan1_app.request, read_property_request)
|
|
|
|
|
|
#
|
|
# Let the test run
|
|
#
|
|
|
|
_log.debug("running")
|
|
|
|
run()
|
|
|
|
_log.debug("fini")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|