1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00
bacpypes/tests/test_service/helpers.py
2017-09-13 23:31:09 -04:00

194 lines
5.8 KiB
Python

#!/usr/bin/env python
"""
Service Helper Classes
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.comm import Client, Server, bind
from bacpypes.pdu import Address, LocalBroadcast, PDU
from bacpypes.vlan import Network, Node
from bacpypes.app import Application
from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement
from bacpypes.service.device import LocalDeviceObject
from ..state_machine import StateMachine, StateMachineGroup
from ..time_machine import reset_time_machine, run_time_machine
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# ApplicationNetwork
#
@bacpypes_debugging
class ApplicationNetwork(StateMachineGroup):
def __init__(self):
if _debug: ApplicationNetwork._debug("__init__")
StateMachineGroup.__init__(self)
# reset the time machine
reset_time_machine()
if _debug: ApplicationNetwork._debug(" - time machine reset")
# make a little LAN
self.vlan = Network(broadcast_address=LocalBroadcast())
# test device object
td_device_object = LocalDeviceObject(
objectName="td",
objectIdentifier=("device", 10),
maxApduLengthAccepted=1024,
segmentationSupported='noSegmentation',
vendorIdentifier=999,
)
# test device
self.td = ApplicationNode(td_device_object, self.vlan)
self.append(self.td)
# implementation under test device object
iut_device_object = LocalDeviceObject(
objectName="iut",
objectIdentifier=("device", 20),
maxApduLengthAccepted=1024,
segmentationSupported='noSegmentation',
vendorIdentifier=999,
)
# implementation under test
self.iut = ApplicationNode(iut_device_object, self.vlan)
self.append(self.iut)
def run(self, time_limit=60.0):
if _debug: ApplicationNetwork._debug("run %r", time_limit)
# run the group
super(ApplicationNetwork, self).run()
if _debug: ApplicationNetwork._debug(" - group running")
# run it for some time
run_time_machine(time_limit)
if _debug:
ApplicationNetwork._debug(" - time machine finished")
for state_machine in self.state_machines:
ApplicationNetwork._debug(" - machine: %r", state_machine)
for direction, pdu in state_machine.transaction_log:
ApplicationNetwork._debug(" %s %s", direction, str(pdu))
# check for success
all_success, some_failed = super(ApplicationNetwork, self).check_for_success()
assert all_success
#
# SnifferNode
#
@bacpypes_debugging
class SnifferNode(Client, StateMachine):
def __init__(self, vlan):
if _debug: SnifferNode._debug("__init__ %r", vlan)
# save the name and give it a blank address
self.name = "sniffer"
self.address = Address()
# continue with initialization
Client.__init__(self)
StateMachine.__init__(self)
# create a promiscuous node, added to the network
self.node = Node(self.address, vlan, promiscuous=True)
if _debug: SnifferNode._debug(" - node: %r", self.node)
# bind this to the node
bind(self, self.node)
def send(self, pdu):
if _debug: SnifferNode._debug("send(%s) %r", self.name, pdu)
raise RuntimeError("sniffers don't send")
def confirmation(self, pdu):
if _debug: SnifferNode._debug("confirmation(%s) %r", self.name, pdu)
# pass to the state machine
self.receive(pdu)
#
# ApplicationNode
#
@bacpypes_debugging
class ApplicationNode(Application, StateMachine):
def __init__(self, localDevice, vlan):
if _debug: ApplicationNode._debug("__init__ %r %r", localDevice, vlan)
# save the name and address
self.name = localDevice.objectName
self.address = Address(localDevice.objectIdentifier[1])
# continue with initialization
Application.__init__(self, localDevice, self.address)
StateMachine.__init__(self)
# include a application decoder
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(localDevice)
# the segmentation state machines need access to the same device
# information cache as the application
self.smap.deviceInfoCache = self.deviceInfoCache
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
# bind the top layers
bind(self, self.asap, self.smap, self.nsap)
# create a node, added to the network
self.node = Node(self.address, vlan)
# bind the network service to the node, no network number
self.nsap.bind(self.node)
def send(self, apdu):
if _debug: ApplicationNode._debug("send(%s) %r", self.name, apdu)
# send the apdu down the stack
self.request(apdu)
def indication(self, apdu):
if _debug: ApplicationNode._debug("indication(%s) %r", self.name, apdu)
# let the state machine know the request was received
self.receive(apdu)
# allow the application to process it
super(ApplicationNode, self).indication(apdu)
def confirmation(self, apdu):
if _debug: ApplicationNode._debug("confirmation(%s) %r", self.name, apdu)
# forward the confirmation to the state machine
self.receive(apdu)