1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

shuffle some pieces around, add some more tests

This commit is contained in:
Joel Bender 2017-09-13 23:31:09 -04:00
parent ef0f012ead
commit dd0a519021
3 changed files with 237 additions and 112 deletions

View File

@ -13,8 +13,10 @@ from bacpypes.vlan import Network, Node
from bacpypes.app import Application
from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement
from bacpypes.service.device import LocalDeviceObject
from ..state_machine import StateMachine
from ..state_machine import StateMachine, StateMachineGroup
from ..time_machine import reset_time_machine, run_time_machine
# some debugging
@ -22,6 +24,71 @@ _debug = 0
_log = ModuleLogger(globals())
#
# ApplicationNetwork
#
@bacpypes_debugging
class ApplicationNetwork(StateMachineGroup):
def __init__(self):
if _debug: ApplicationNetwork._debug("__init__")
StateMachineGroup.__init__(self)
# reset the time machine
reset_time_machine()
if _debug: ApplicationNetwork._debug(" - time machine reset")
# make a little LAN
self.vlan = Network(broadcast_address=LocalBroadcast())
# test device object
td_device_object = LocalDeviceObject(
objectName="td",
objectIdentifier=("device", 10),
maxApduLengthAccepted=1024,
segmentationSupported='noSegmentation',
vendorIdentifier=999,
)
# test device
self.td = ApplicationNode(td_device_object, self.vlan)
self.append(self.td)
# implementation under test device object
iut_device_object = LocalDeviceObject(
objectName="iut",
objectIdentifier=("device", 20),
maxApduLengthAccepted=1024,
segmentationSupported='noSegmentation',
vendorIdentifier=999,
)
# implementation under test
self.iut = ApplicationNode(iut_device_object, self.vlan)
self.append(self.iut)
def run(self, time_limit=60.0):
if _debug: ApplicationNetwork._debug("run %r", time_limit)
# run the group
super(ApplicationNetwork, self).run()
if _debug: ApplicationNetwork._debug(" - group running")
# run it for some time
run_time_machine(time_limit)
if _debug:
ApplicationNetwork._debug(" - time machine finished")
for state_machine in self.state_machines:
ApplicationNetwork._debug(" - machine: %r", state_machine)
for direction, pdu in state_machine.transaction_log:
ApplicationNetwork._debug(" %s %s", direction, str(pdu))
# check for success
all_success, some_failed = super(ApplicationNetwork, self).check_for_success()
assert all_success
#
# SnifferNode
#
@ -29,15 +96,17 @@ _log = ModuleLogger(globals())
@bacpypes_debugging
class SnifferNode(Client, StateMachine):
def __init__(self, localAddress, vlan):
if _debug: SnifferNode._debug("__init__ %r %r", localAddress, vlan)
def __init__(self, vlan):
if _debug: SnifferNode._debug("__init__ %r", vlan)
# save the name and give it a blank address
self.name = "sniffer"
self.address = Address()
# continue with initialization
Client.__init__(self)
StateMachine.__init__(self)
# save the name and address
self.name = "sniffer"
self.address = localAddress
# create a promiscuous node, added to the network
self.node = Node(self.address, vlan, promiscuous=True)
if _debug: SnifferNode._debug(" - node: %r", self.node)
@ -63,14 +132,16 @@ class SnifferNode(Client, StateMachine):
@bacpypes_debugging
class ApplicationNode(Application, StateMachine):
def __init__(self, localDevice, localAddress, vlan):
if _debug: ApplicationNode._debug("__init__ %r %r %r", localDevice, localAddress, vlan)
Application.__init__(self, localDevice, localAddress)
StateMachine.__init__(self)
def __init__(self, localDevice, vlan):
if _debug: ApplicationNode._debug("__init__ %r %r", localDevice, vlan)
# save the name and address
self.name = localDevice.objectName
self.address = localAddress
self.address = Address(localDevice.objectIdentifier[1])
# continue with initialization
Application.__init__(self, localDevice, self.address)
StateMachine.__init__(self)
# include a application decoder
self.asap = ApplicationServiceAccessPoint()

View File

@ -11,91 +11,18 @@ import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
from bacpypes.pdu import Address, LocalBroadcast, PDU
from bacpypes.apdu import WhoIsRequest, IAmRequest
from bacpypes.apdu import WhoIsRequest, IAmRequest, \
WhoHasRequest, WhoHasLimits, WhoHasObject, IHaveRequest
from bacpypes.vlan import Network
from bacpypes.service.device import LocalDeviceObject, WhoIsIAmServices
from bacpypes.service.device import WhoIsIAmServices, WhoHasIHaveServices
from ..state_machine import match_pdu, StateMachineGroup
from ..time_machine import reset_time_machine, run_time_machine
from .helpers import SnifferNode, ApplicationNode
from .helpers import ApplicationNetwork, ApplicationNode
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# TNetwork
#
@bacpypes_debugging
class TNetwork(StateMachineGroup):
def __init__(self):
if _debug: TNetwork._debug("__init__")
StateMachineGroup.__init__(self)
# reset the time machine
reset_time_machine()
if _debug: TNetwork._debug(" - time machine reset")
# make a little LAN
self.vlan = Network(broadcast_address=LocalBroadcast())
# test device object
td_device_object = LocalDeviceObject(
objectName="td",
objectIdentifier=("device", 1),
maxApduLengthAccepted=1024,
segmentationSupported='noSegmentation',
vendorIdentifier=999,
)
# test device
self.td = ApplicationNode(td_device_object, Address(1), self.vlan)
self.append(self.td)
# implementation under test device object
iut_device_object = LocalDeviceObject(
objectName="iut",
objectIdentifier=("device", 2),
maxApduLengthAccepted=1024,
segmentationSupported='noSegmentation',
vendorIdentifier=999,
)
# implementation under test
self.iut = ApplicationNode(iut_device_object, Address(2), self.vlan)
self.append(self.iut)
# sniffer node
self.sniffer = SnifferNode(Address(3), self.vlan)
self.append(self.sniffer)
def run(self, time_limit=60.0):
if _debug: TNetwork._debug("run %r", time_limit)
# run the group
super(TNetwork, self).run()
if _debug: TNetwork._debug(" - group running")
# run it for some time
run_time_machine(time_limit)
if _debug:
TNetwork._debug(" - time machine finished")
for state_machine in self.state_machines:
TNetwork._debug(" - machine: %r", state_machine)
for direction, pdu in state_machine.transaction_log:
TNetwork._debug(" %s %s", direction, str(pdu))
# check for success
all_success, some_failed = super(TNetwork, self).check_for_success()
assert all_success
@bacpypes_debugging
class TestBasic(unittest.TestCase):
@ -104,41 +31,168 @@ class TestBasic(unittest.TestCase):
if _debug: TestBasic._debug("test_basic")
# create a network
tnet = TNetwork()
anet = ApplicationNetwork()
# all start states are successful
tnet.td.start_state.success()
tnet.iut.start_state.success()
tnet.sniffer.start_state.success()
anet.td.start_state.success()
anet.iut.start_state.success()
# run the group
tnet.run()
anet.run()
@bacpypes_debugging
class TestWhoIsIAm(unittest.TestCase):
def test_whois(self):
"""Test an idle network, nothing happens is success."""
if _debug: TestWhoIsIAm._debug("test_whois")
def test_whois_unconstrained(self):
"""Test an unconstrained WhoIs, all devices respond."""
if _debug: TestWhoIsIAm._debug("test_whois_unconstrained")
# create a network
tnet = TNetwork()
anet = ApplicationNetwork()
# add the service capability to the iut
tnet.iut.add_capability(WhoIsIAmServices)
# add the service capability to the IUT
anet.iut.add_capability(WhoIsIAmServices)
# all start states are successful
tnet.td.start_state.doc("1-1-0") \
.send(WhoIsRequest(destination=tnet.vlan.broadcast_address)).doc("1-1-1") \
.receive(IAmRequest, pduSource=tnet.iut.address).doc("1-1-2") \
anet.td.start_state.doc("1-1-0") \
.send(WhoIsRequest(destination=anet.vlan.broadcast_address)).doc("1-1-1") \
.receive(IAmRequest, pduSource=anet.iut.address).doc("1-1-2") \
.success()
# application layer above the iut is idle
tnet.iut.start_state.success()
# no sniffing yet
tnet.sniffer.start_state.success()
# no IUT application layer matching
anet.iut.start_state.success()
# run the group
tnet.run()
anet.run()
def test_whois_range_below(self):
"""Test range below."""
if _debug: TestWhoIsIAm._debug("test_whois_range_below")
# create a network
anet = ApplicationNetwork()
# add the service capability to the iut
anet.iut.add_capability(WhoIsIAmServices)
# all start states are successful
anet.td.start_state.doc("2-1-0") \
.send(WhoIsRequest(
destination=anet.vlan.broadcast_address,
deviceInstanceRangeLowLimit=0,
deviceInstanceRangeHighLimit=9,
)).doc("2-1-1") \
.success()
# no IUT application layer matching
anet.iut.start_state.success()
# run the group
anet.run()
def test_whois_range_above(self):
"""Test range above."""
if _debug: TestWhoIsIAm._debug("test_whois_range_above")
# create a network
anet = ApplicationNetwork()
# add the service capability to the iut
anet.iut.add_capability(WhoIsIAmServices)
# all start states are successful
anet.td.start_state.doc("3-1-0") \
.send(WhoIsRequest(
destination=anet.vlan.broadcast_address,
deviceInstanceRangeLowLimit=21,
deviceInstanceRangeHighLimit=29,
)).doc("3-1-1") \
.success()
# no IUT application layer matching
anet.iut.start_state.success()
# run the group
anet.run()
def test_whois_range(self):
"""Test a WhoIs, included range."""
if _debug: TestWhoIsIAm._debug("test_whois_range")
# create a network
anet = ApplicationNetwork()
# add the service capability to the IUT
anet.iut.add_capability(WhoIsIAmServices)
# all start states are successful
anet.td.start_state.doc("4-1-0") \
.send(WhoIsRequest(
destination=anet.vlan.broadcast_address,
deviceInstanceRangeLowLimit=19,
deviceInstanceRangeHighLimit=21,
)).doc("4-1-1") \
.receive(IAmRequest, pduSource=anet.iut.address).doc("4-1-2") \
.success()
# no IUT application layer matching
anet.iut.start_state.success()
# run the group
anet.run()
@bacpypes_debugging
class TestWhoHasIHave(unittest.TestCase):
def test_who_has_object_by_name(self):
"""Test an unconstrained WhoIs, all devices respond."""
if _debug: TestWhoIsIAm._debug("test_who_has_object_by_name")
# create a network
anet = ApplicationNetwork()
# add the service capability to the IUT
anet.iut.add_capability(WhoHasIHaveServices)
# all start states are successful
anet.td.start_state.doc("5-1-0") \
.send(WhoHasRequest(
destination=anet.vlan.broadcast_address,
object=WhoHasObject(objectName="iut"),
)).doc("5-1-1") \
.receive(IHaveRequest, pduSource=anet.iut.address).doc("5-1-2") \
.success()
# no IUT application layer matching
anet.iut.start_state.success()
# run the group
anet.run()
def test_who_has_object_by_id(self):
"""Test an unconstrained WhoIs, all devices respond."""
if _debug: TestWhoIsIAm._debug("test_who_has_object_by_id")
# create a network
anet = ApplicationNetwork()
# add the service capability to the IUT
anet.iut.add_capability(WhoHasIHaveServices)
# all start states are successful
anet.td.start_state.doc("6-1-0") \
.send(WhoHasRequest(
destination=anet.vlan.broadcast_address,
object=WhoHasObject(objectIdentifier=('device', 20)),
)).doc("6-1-1") \
.receive(IHaveRequest, pduSource=anet.iut.address).doc("6-1-2") \
.success()
# no IUT application layer matching
anet.iut.start_state.success()
# run the group
anet.run()

View File

@ -69,8 +69,8 @@ class SampleOneShotTask(OneShotTask):
self.process_task_called = []
def process_task(self):
if _debug: SampleOneShotTask._debug("process_task @ %r", time_machine.current_time)
global time_machine
if _debug: SampleOneShotTask._debug("process_task @ %r", time_machine.current_time)
# add the current time
self.process_task_called.append(time_machine.current_time)
@ -81,8 +81,8 @@ sample_task_function_called = []
@bacpypes_debugging
def sample_task_function(*args, **kwargs):
if _debug: sample_task_function._debug("sample_task_function %r %r @ %r", args, kwargs, time_machine.current_time)
global sample_task_function_called, time_machine
if _debug: sample_task_function._debug("sample_task_function %r %r @ %r", args, kwargs, time_machine.current_time)
# bump the counter
sample_task_function_called.append(time_machine.current_time)
@ -98,8 +98,8 @@ class SampleRecurringTask(RecurringTask):
self.process_task_called = []
def process_task(self):
if _debug: SampleRecurringTask._debug("process_task @ %r", time_machine.current_time)
global time_machine
if _debug: SampleRecurringTask._debug("process_task @ %r", time_machine.current_time)
# add the current time
self.process_task_called.append(time_machine.current_time)