1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-10-27 00:57:47 +08:00

0.17.1 released

This commit is contained in:
Joel Bender
2018-05-26 22:11:16 -04:00
parent 292ccf90cf
commit e2f5fb3023
81 changed files with 3606 additions and 1571 deletions

View File

@@ -6,10 +6,19 @@ B/IP VLAN Helper Classes
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.comm import Client, Server, bind
from bacpypes.comm import Client, Server, ApplicationServiceElement, bind
from bacpypes.pdu import Address, LocalBroadcast, PDU, unpack_ip_addr
from bacpypes.vlan import IPNode
from bacpypes.app import DeviceInfoCache, Application
from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement
from bacpypes.object import register_object_type
from bacpypes.local.device import LocalDeviceObject
from bacpypes.service.device import WhoIsIAmServices
from bacpypes.service.object import ReadWritePropertyServices
from ..state_machine import ClientStateMachine
from bacpypes.bvllservice import BIPSimple, BIPForeign, BIPBBMD, AnnexJCodec
@@ -26,6 +35,12 @@ _log = ModuleLogger(globals())
@bacpypes_debugging
class FauxMultiplexer(Client, Server):
"""This class is a placeholder for UDPMultiplexer without the code that
determines if the upstream packets are Annex-H or Annex-J packets, it
assumes they are all Annex-J. It creates and binds itself to an IPNode
which is added to an IPNetwork.
"""
def __init__(self, addr, network=None, cid=None, sid=None):
if _debug: FauxMultiplexer._debug("__init__")
@@ -81,37 +96,54 @@ class FauxMultiplexer(Client, Server):
self.response(PDU(pdu, source=src, destination=dest))
#
# SnifferNode
# SnifferStateMachine
#
@bacpypes_debugging
class SnifferNode(ClientStateMachine):
class SnifferStateMachine(ClientStateMachine):
"""This class acts as a sniffer for BVLL messages. The client state
machine sits above an Annex-J codec so the send and receive PDUs are
BVLL PDUs.
"""
def __init__(self, address, vlan):
if _debug: SnifferNode._debug("__init__ %r %r", address, vlan)
if _debug: SnifferStateMachine._debug("__init__ %r %r", address, vlan)
ClientStateMachine.__init__(self)
# save the name and address
self.name = address
self.address = Address(address)
# create a promiscuous node, added to the network
self.node = IPNode(self.address, vlan, promiscuous=True)
if _debug: SnifferNode._debug(" - node: %r", self.node)
# BACnet/IP interpreter
self.annexj = AnnexJCodec()
# bind this to the node
bind(self, self.node)
# fake multiplexer has a VLAN node in it
self.mux = FauxMultiplexer(self.address, vlan)
# might receive all packets and allow spoofing
self.mux.node.promiscuous = True
self.mux.node.spoofing = True
# bind the stack together
bind(self, self.annexj, self.mux)
#
# CodecNode
# BIPStateMachine
#
@bacpypes_debugging
class CodecNode(ClientStateMachine):
class BIPStateMachine(ClientStateMachine):
"""This class is an application layer for BVLL messages that has no BVLL
processing like the 'simple', 'foreign', or 'bbmd' versions. The client
state machine sits above and Annex-J codec so the send and receive PDUs are
BVLL PDUs.
"""
def __init__(self, address, vlan):
if _debug: CodecNode._debug("__init__ %r %r", address, vlan)
if _debug: BIPStateMachine._debug("__init__ %r %r", address, vlan)
ClientStateMachine.__init__(self)
# save the name and address
@@ -129,14 +161,18 @@ class CodecNode(ClientStateMachine):
#
# SimpleNode
# BIPSimpleStateMachine
#
@bacpypes_debugging
class SimpleNode(ClientStateMachine):
class BIPSimpleStateMachine(ClientStateMachine):
"""This class sits on a BIPSimple instance, the send() and receive()
parameters are NPDUs.
"""
def __init__(self, address, vlan):
if _debug: SimpleNode._debug("__init__ %r %r", address, vlan)
if _debug: BIPSimpleStateMachine._debug("__init__ %r %r", address, vlan)
ClientStateMachine.__init__(self)
# save the name and address
@@ -155,14 +191,18 @@ class SimpleNode(ClientStateMachine):
#
# ForeignNode
# BIPForeignStateMachine
#
@bacpypes_debugging
class ForeignNode(ClientStateMachine):
class BIPForeignStateMachine(ClientStateMachine):
"""This class sits on a BIPForeign instance, the send() and receive()
parameters are NPDUs.
"""
def __init__(self, address, vlan):
if _debug: ForeignNode._debug("__init__ %r %r", address, vlan)
if _debug: BIPForeignStateMachine._debug("__init__ %r %r", address, vlan)
ClientStateMachine.__init__(self)
# save the name and address
@@ -180,14 +220,18 @@ class ForeignNode(ClientStateMachine):
bind(self, self.bip, self.annexj, self.mux)
#
# BBMDNode
# BIPBBMDStateMachine
#
@bacpypes_debugging
class BBMDNode(ClientStateMachine):
class BIPBBMDStateMachine(ClientStateMachine):
"""This class sits on a BIPBBMD instance, the send() and receive()
parameters are NPDUs.
"""
def __init__(self, address, vlan):
if _debug: BBMDNode._debug("__init__ %r %r", address, vlan)
if _debug: BIPBBMDStateMachine._debug("__init__ %r %r", address, vlan)
ClientStateMachine.__init__(self)
# save the name and address
@@ -200,7 +244,7 @@ class BBMDNode(ClientStateMachine):
# build an address, full mask
bdt_address = "%s/32:%d" % self.address.addrTuple
if _debug: BBMDNode._debug(" - bdt_address: %r", bdt_address)
if _debug: BIPBBMDStateMachine._debug(" - bdt_address: %r", bdt_address)
# add itself as the first entry in the BDT
self.bip.add_peer(Address(bdt_address))
@@ -211,3 +255,209 @@ class BBMDNode(ClientStateMachine):
# bind the stack together
bind(self, self.bip, self.annexj, self.mux)
#
# BIPSimpleNode
#
@bacpypes_debugging
class BIPSimpleNode:
"""This class is a BIPSimple instance that is not bound to a state machine."""
def __init__(self, address, vlan):
if _debug: BIPSimpleNode._debug("__init__ %r %r", address, vlan)
# save the name and address
self.name = address
self.address = Address(address)
# BACnet/IP interpreter
self.bip = BIPSimple()
self.annexj = AnnexJCodec()
# fake multiplexer has a VLAN node in it
self.mux = FauxMultiplexer(self.address, vlan)
# bind the stack together
bind(self.bip, self.annexj, self.mux)
#
# BIPBBMDNode
#
@bacpypes_debugging
class BIPBBMDNode:
"""This class is a BIPBBMD instance that is not bound to a state machine."""
def __init__(self, address, vlan):
if _debug: BIPBBMDNode._debug("__init__ %r %r", address, vlan)
# save the name and address
self.name = address
self.address = Address(address)
if _debug: BIPBBMDNode._debug(" - address: %r", self.address)
# BACnet/IP interpreter
self.bip = BIPBBMD(self.address)
self.annexj = AnnexJCodec()
# build an address, full mask
bdt_address = "%s/32:%d" % self.address.addrTuple
if _debug: BIPBBMDNode._debug(" - bdt_address: %r", bdt_address)
# add itself as the first entry in the BDT
self.bip.add_peer(Address(bdt_address))
# fake multiplexer has a VLAN node in it
self.mux = FauxMultiplexer(self.address, vlan)
# bind the stack together
bind(self.bip, self.annexj, self.mux)
#
# TestDeviceObject
#
@register_object_type(vendor_id=999)
class TestDeviceObject(LocalDeviceObject):
pass
#
# BIPSimpleApplicationLayerStateMachine
#
@bacpypes_debugging
class BIPSimpleApplicationLayerStateMachine(ApplicationServiceElement, ClientStateMachine):
def __init__(self, address, vlan):
if _debug: BIPSimpleApplicationLayerStateMachine._debug("__init__ %r %r", address, vlan)
# build a name, save the address
self.name = "app @ %s" % (address,)
self.address = Address(address)
# build a local device object
local_device = TestDeviceObject(
objectName=self.name,
objectIdentifier=('device', 998),
vendorIdentifier=999,
)
# build an address and save it
self.address = Address(address)
if _debug: BIPSimpleApplicationLayerStateMachine._debug(" - address: %r", self.address)
# continue with initialization
ApplicationServiceElement.__init__(self)
ClientStateMachine.__init__(self, name=local_device.objectName)
# include a application decoder
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(local_device)
# the segmentation state machines need access to some device
# information cache, usually shared with the application
self.smap.deviceInfoCache = DeviceInfoCache()
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
# bind the top layers
bind(self, self.asap, self.smap, self.nsap)
# BACnet/IP interpreter
self.bip = BIPSimple()
self.annexj = AnnexJCodec()
# fake multiplexer has a VLAN node in it
self.mux = FauxMultiplexer(self.address, vlan)
# bind the stack together
bind(self.bip, self.annexj, self.mux)
# bind the stack to the local network
self.nsap.bind(self.bip)
def indication(self, apdu):
if _debug: BIPSimpleApplicationLayerStateMachine._debug("indication %r", apdu)
self.receive(apdu)
def confirmation(self, apdu):
if _debug: BIPSimpleApplicationLayerStateMachine._debug("confirmation %r %r", apdu)
self.receive(apdu)
#
# BIPBBMDApplication
#
class BIPBBMDApplication(Application, WhoIsIAmServices, ReadWritePropertyServices):
def __init__(self, address, vlan):
if _debug: BIPBBMDApplication._debug("__init__ %r %r", address, vlan)
# build a name, save the address
self.name = "app @ %s" % (address,)
self.address = Address(address)
if _debug: BIPBBMDApplication._debug(" - address: %r", self.address)
# build a local device object
local_device = TestDeviceObject(
objectName=self.name,
objectIdentifier=('device', 999),
vendorIdentifier=999,
)
# continue with initialization
Application.__init__(self, local_device, self.address)
# include a application decoder
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(local_device)
# the segmentation state machines need access to the same device
# information cache as the application
self.smap.deviceInfoCache = self.deviceInfoCache
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
# bind the top layers
bind(self, self.asap, self.smap, self.nsap)
# BACnet/IP interpreter
self.bip = BIPBBMD(self.address)
self.annexj = AnnexJCodec()
# build an address, full mask
bdt_address = "%s/32:%d" % self.address.addrTuple
if _debug: BIPBBMDNode._debug(" - bdt_address: %r", bdt_address)
# add itself as the first entry in the BDT
self.bip.add_peer(Address(bdt_address))
# fake multiplexer has a VLAN node in it
self.mux = FauxMultiplexer(self.address, vlan)
# bind the stack together
bind(self.bip, self.annexj, self.mux)
# bind the stack to the local network
self.nsap.bind(self.bip)

View File

@@ -1 +1,313 @@
# placeholder
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Test BBMD
---------
"""
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
from bacpypes.pdu import Address, PDU, LocalBroadcast
from bacpypes.vlan import IPNetwork, IPRouter
from bacpypes.bvll import (
Result,
WriteBroadcastDistributionTable,
ReadBroadcastDistributionTable, ReadBroadcastDistributionTableAck,
ForwardedNPDU,
RegisterForeignDevice,
ReadForeignDeviceTable, ReadForeignDeviceTableAck,
DeleteForeignDeviceTableEntry,
DistributeBroadcastToNetwork,
OriginalUnicastNPDU,
OriginalBroadcastNPDU,
)
from bacpypes.apdu import (
WhoIsRequest, IAmRequest,
ReadPropertyRequest, ReadPropertyACK,
AbortPDU,
)
from ..state_machine import StateMachineGroup, TrafficLog
from ..time_machine import reset_time_machine, run_time_machine
from .helpers import (
SnifferStateMachine, BIPStateMachine, BIPSimpleStateMachine,
BIPForeignStateMachine, BIPBBMDStateMachine,
BIPSimpleNode, BIPBBMDNode,
BIPSimpleApplicationLayerStateMachine,
BIPBBMDApplication,
)
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# TNetwork
#
@bacpypes_debugging
class TNetwork(StateMachineGroup):
def __init__(self, count):
if _debug: TNetwork._debug("__init__ %r", count)
StateMachineGroup.__init__(self)
# reset the time machine
reset_time_machine()
if _debug: TNetwork._debug(" - time machine reset")
# create a traffic log
self.traffic_log = TrafficLog()
# make a router
self.router = IPRouter()
# make the networks
self.vlan = []
for net in range(1, count + 1):
# make a network and set the traffic log
ip_network = IPNetwork("192.168.{}.0/24".format(net))
ip_network.traffic_log = self.traffic_log
# make a router
router_address = Address("192.168.{}.1/24".format(net))
self.router.add_network(router_address, ip_network)
self.vlan.append(ip_network)
def run(self, time_limit=60.0):
if _debug: TNetwork._debug("run %r", time_limit)
# run the group
super(TNetwork, self).run()
# run it for some time
run_time_machine(time_limit)
if _debug: TNetwork._debug(" - time machine finished")
# check for success
all_success, some_failed = super(TNetwork, self).check_for_success()
if _debug:
TNetwork._debug(" - all_success, some_failed: %r, %r", all_success, some_failed)
for state_machine in self.state_machines:
if state_machine.running:
TNetwork._debug(" %r (running)", state_machine)
elif not state_machine.current_state:
TNetwork._debug(" %r (not started)", state_machine)
else:
TNetwork._debug(" %r", state_machine)
for direction, pdu in state_machine.transaction_log:
TNetwork._debug(" %s %r", direction, pdu)
# traffic log has what was processed on each vlan
self.traffic_log.dump(TNetwork._debug)
assert all_success
@bacpypes_debugging
class TestNonBBMD(unittest.TestCase):
def setup_method(self, method):
"""This function is called before each test method is called and is
given a reference to the test method."""
if _debug: TestNonBBMD._debug("setup_method %r", method)
# create a network
self.tnet = TNetwork(1)
# test device
self.td = BIPStateMachine("192.168.1.2/24", self.tnet.vlan[0])
self.tnet.append(self.td)
# implementation under test
self.iut = BIPSimpleNode("192.168.1.3/24", self.tnet.vlan[0])
def test_write_bdt_fail(self):
"""Test writing a BDT."""
if _debug: TestNonBBMD._debug("test_write_bdt_fail")
# read the broadcast distribution table, get a nack
self.td.start_state.doc("1-1-0") \
.send(WriteBroadcastDistributionTable(destination=self.iut.address)).doc("1-1-1") \
.receive(Result, bvlciResultCode=0x0010).doc("1-1-2") \
.success()
# run the group
self.tnet.run()
def test_read_bdt_fail(self):
"""Test reading a BDT."""
if _debug: TestNonBBMD._debug("test_read_bdt_fail")
# read the broadcast distribution table, get a nack
self.td.start_state.doc("1-2-0") \
.send(ReadBroadcastDistributionTable(destination=self.iut.address)).doc("1-2-1") \
.receive(Result, bvlciResultCode=0x0020).doc("1-2-2") \
.success()
# run the group
self.tnet.run()
def test_register_fail(self):
"""Test registering as a foreign device to a non-BBMD."""
if _debug: TestNonBBMD._debug("test_read_fdt_success")
# read the broadcast distribution table, get a nack
self.td.start_state.doc("1-3-0") \
.send(RegisterForeignDevice(10, destination=self.iut.address)).doc("1-3-1") \
.receive(Result, bvlciResultCode=0x0030).doc("1-3-2") \
.success()
# run the group
self.tnet.run()
def test_read_fdt_fail(self):
"""Test reading an FDT from a non-BBMD."""
if _debug: TestNonBBMD._debug("test_read_fdt_success")
# read the broadcast distribution table, get a nack
self.td.start_state.doc("1-4-0") \
.send(ReadForeignDeviceTable(destination=self.iut.address)).doc("1-4-1") \
.receive(Result, bvlciResultCode=0x0040).doc("1-4-2") \
.success()
# run the group
self.tnet.run()
def test_delete_fail(self):
"""Test deleting an FDT entry from a non-BBMD."""
if _debug: TestNonBBMD._debug("test_delete_fail")
# read the broadcast distribution table, get a nack
self.td.start_state.doc("1-5-0") \
.send(DeleteForeignDeviceTableEntry(Address("1.2.3.4"), destination=self.iut.address)).doc("1-5-1") \
.receive(Result, bvlciResultCode=0x0050).doc("1-5-2") \
.success()
# run the group
self.tnet.run()
def test_distribute_fail(self):
"""Test asking a non-BBMD to distribute a broadcast."""
if _debug: TestNonBBMD._debug("test_delete_fail")
# read the broadcast distribution table, get a nack
self.td.start_state.doc("1-6-0") \
.send(DistributeBroadcastToNetwork(xtob('deadbeef'), destination=self.iut.address)).doc("1-6-1") \
.receive(Result, bvlciResultCode=0x0060).doc("1-6-2") \
.success()
# run the group
self.tnet.run()
@bacpypes_debugging
class TestBBMD(unittest.TestCase):
def test_14_2_1_1(self):
"""14.2.1.1 Execute Forwarded-NPDU (One-hop Distribution)."""
if _debug: TestBBMD._debug("test_14_2_1_1")
# create a network
tnet = TNetwork(2)
# implementation under test
iut = BIPBBMDApplication("192.168.1.2/24", tnet.vlan[0])
if _debug: TestBBMD._debug(" - iut.bip: %r", iut.bip)
# BBMD on net 2
bbmd1 = BIPBBMDNode("192.168.2.2/24", tnet.vlan[1])
# add the IUT as a one-hop peer
bbmd1.bip.add_peer(Address("192.168.1.2/24"))
if _debug: TestBBMD._debug(" - bbmd1.bip: %r", bbmd1.bip)
# test device
td = BIPSimpleApplicationLayerStateMachine("192.168.2.3/24", tnet.vlan[1])
tnet.append(td)
# listener looks for extra traffic
listener = BIPStateMachine("192.168.1.3/24", tnet.vlan[0])
listener.mux.node.promiscuous = True
tnet.append(listener)
# broadcast a forwarded NPDU
td.start_state.doc("2-1-0") \
.send(WhoIsRequest(destination=LocalBroadcast())).doc("2-1-1") \
.receive(IAmRequest).doc("2-1-2") \
.success()
# listen for the directed broadcast, then the original unicast,
# then fail if there's anything else
listener.start_state.doc("2-2-0") \
.receive(ForwardedNPDU).doc("2-2-1") \
.receive(OriginalUnicastNPDU).doc("2-2-2") \
.timeout(3).doc("2-2-3") \
.success()
# run the group
tnet.run()
def test_14_2_1_2(self):
"""14.2.1.1 Execute Forwarded-NPDU (Two-hop Distribution)."""
if _debug: TestBBMD._debug("test_14_2_1_2")
# create a network
tnet = TNetwork(2)
# implementation under test
iut = BIPBBMDApplication("192.168.1.2/24", tnet.vlan[0])
if _debug: TestBBMD._debug(" - iut.bip: %r", iut.bip)
# BBMD on net 2
bbmd1 = BIPBBMDNode("192.168.2.2/24", tnet.vlan[1])
# add the IUT as a two-hop peer
bbmd1.bip.add_peer(Address("192.168.1.2/32"))
if _debug: TestBBMD._debug(" - bbmd1.bip: %r", bbmd1.bip)
# test device
td = BIPSimpleApplicationLayerStateMachine("192.168.2.3/24", tnet.vlan[1])
tnet.append(td)
# listener looks for extra traffic
listener = BIPStateMachine("192.168.1.3/24", tnet.vlan[0])
listener.mux.node.promiscuous = True
tnet.append(listener)
# broadcast a forwarded NPDU
td.start_state.doc("2-3-0") \
.send(WhoIsRequest(destination=LocalBroadcast())).doc("2-3-1") \
.receive(IAmRequest).doc("2-3-2") \
.success()
# listen for the forwarded NPDU. The packet will be sent upstream which
# will generate the original unicast going back, then it will be
# re-broadcast on the local LAN. Fail if there's anything after that.
s241 = listener.start_state.doc("2-4-0") \
.receive(ForwardedNPDU).doc("2-4-1")
# look for the original unicast going back, followed by the rebroadcast
# of the forwarded NPDU on the local LAN
both = s241 \
.receive(OriginalUnicastNPDU).doc("2-4-1-a") \
.receive(ForwardedNPDU).doc("2-4-1-b")
# fail if anything is received after both packets
both.timeout(3).doc("2-4-4") \
.success()
# allow the two packets in either order
s241.receive(ForwardedNPDU).doc("2-4-2-a") \
.receive(OriginalUnicastNPDU, next_state=both).doc("2-4-2-b")
# run the group
tnet.run()

View File

@@ -12,12 +12,20 @@ from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
from bacpypes.pdu import Address, PDU, LocalBroadcast
from bacpypes.vlan import IPNetwork, IPRouter
from bacpypes.bvll import ReadForeignDeviceTable, ReadForeignDeviceTableAck
from bacpypes.bvll import (
Result, RegisterForeignDevice,
ReadForeignDeviceTable, ReadForeignDeviceTableAck,
DistributeBroadcastToNetwork, ForwardedNPDU,
OriginalUnicastNPDU, OriginalBroadcastNPDU,
)
from ..state_machine import StateMachineGroup
from ..state_machine import StateMachineGroup, TrafficLog
from ..time_machine import reset_time_machine, run_time_machine
from .helpers import SnifferNode, CodecNode, SimpleNode, ForeignNode, BBMDNode
from .helpers import (
SnifferStateMachine, BIPStateMachine,
BIPSimpleStateMachine, BIPForeignStateMachine, BIPBBMDStateMachine,
)
# some debugging
_debug = 0
@@ -39,23 +47,28 @@ class TNetwork(StateMachineGroup):
reset_time_machine()
if _debug: TNetwork._debug(" - time machine reset")
# create a traffic log
self.traffic_log = TrafficLog()
# make a router
self.router = IPRouter()
# make a home LAN
self.home_vlan = IPNetwork()
self.router.add_network(Address("192.168.5.1/24"), self.home_vlan)
self.vlan_5 = IPNetwork("192.168.5.0/24")
self.vlan_5.traffic_log = self.traffic_log
self.router.add_network(Address("192.168.5.1/24"), self.vlan_5)
# make a remote LAN
self.remote_vlan = IPNetwork()
self.router.add_network(Address("192.168.6.1/24"), self.remote_vlan)
self.vlan_6 = IPNetwork("192.168.6.0/24")
self.vlan_6.traffic_log = self.traffic_log
self.router.add_network(Address("192.168.6.1/24"), self.vlan_6)
# the foreign device
self.fd = ForeignNode("192.168.6.2/24", self.remote_vlan)
self.fd = BIPForeignStateMachine("192.168.6.2/24", self.vlan_6)
self.append(self.fd)
# bbmd
self.bbmd = BBMDNode("192.168.5.3/24", self.home_vlan)
self.bbmd = BIPBBMDStateMachine("192.168.5.3/24", self.vlan_5)
self.append(self.bbmd)
def run(self, time_limit=60.0):
@@ -70,6 +83,22 @@ class TNetwork(StateMachineGroup):
# check for success
all_success, some_failed = super(TNetwork, self).check_for_success()
if _debug:
TNetwork._debug(" - all_success, some_failed: %r, %r", all_success, some_failed)
for state_machine in self.state_machines:
if state_machine.running:
TNetwork._debug(" %r (running)", state_machine)
elif not state_machine.current_state:
TNetwork._debug(" %r (not started)", state_machine)
else:
TNetwork._debug(" %r", state_machine)
for direction, pdu in state_machine.transaction_log:
TNetwork._debug(" %s %s", direction, str(pdu))
# traffic log has what was processed on each vlan
self.traffic_log.dump(TNetwork._debug)
assert all_success
@@ -97,58 +126,46 @@ class TestForeign(unittest.TestCase):
# create a network
tnet = TNetwork()
# add an addition codec node to the home vlan
cnode = CodecNode("192.168.5.2/24", tnet.home_vlan)
tnet.append(cnode)
# home sniffer node
home_sniffer = SnifferNode("192.168.5.254/24", tnet.home_vlan)
tnet.append(home_sniffer)
# remote sniffer node
remote_sniffer = SnifferNode("192.168.6.254/24", tnet.remote_vlan)
tnet.append(remote_sniffer)
# tell the B/IP layer of the foreign device to register
tnet.fd.start_state \
.call(tnet.fd.bip.register, tnet.bbmd.address, 30) \
.success()
# sniffer pieces
registration_request = xtob('81.05.0006' # bvlci
'001e' # time-to-live
)
registration_ack = xtob('81.00.0006.0000') # simple ack
# remote sniffer node
remote_sniffer = SnifferStateMachine("192.168.6.254/24", tnet.vlan_6)
tnet.append(remote_sniffer)
# remote sniffer sees registration
# sniffer traffic
remote_sniffer.start_state.doc("1-1-0") \
.receive(PDU, pduData=registration_request).doc("1-1-1") \
.receive(PDU, pduData=registration_ack).doc("1-1-2") \
.receive(RegisterForeignDevice).doc("1-1-1") \
.receive(Result).doc("1-1-2") \
.set_event('fd-registered').doc("1-1-3") \
.success()
# the bbmd is idle
tnet.bbmd.start_state.success()
# read the FDT
cnode.start_state.doc("1-2-0") \
# home snooper node
home_snooper = BIPStateMachine("192.168.5.2/24", tnet.vlan_5)
tnet.append(home_snooper)
# snooper will read the foreign device table
home_snooper.start_state.doc("1-2-0") \
.wait_event('fd-registered').doc("1-2-1") \
.send(ReadForeignDeviceTable(destination=tnet.bbmd.address)).doc("1-2-2") \
.receive(ReadForeignDeviceTableAck).doc("1-2-3") \
.success()
# the tnode reads the registration table
read_fdt_request = xtob('81.06.0004') # bvlci
read_fdt_ack = xtob('81.07.000e' # read-ack
'c0.a8.06.02.ba.c0 001e 0023' # address, ttl, remaining
)
# home sniffer node
home_sniffer = SnifferStateMachine("192.168.5.254/24", tnet.vlan_5)
tnet.append(home_sniffer)
# home sniffer sees registration
# sniffer traffic
home_sniffer.start_state.doc("1-3-0") \
.receive(PDU, pduData=registration_request).doc("1-3-1") \
.receive(PDU, pduData=registration_ack).doc("1-3-2") \
.receive(PDU, pduData=read_fdt_request).doc("1-3-3") \
.receive(PDU, pduData=read_fdt_ack).doc("1-3-4") \
.receive(RegisterForeignDevice).doc("1-3-1") \
.receive(Result).doc("1-3-2") \
.receive(ReadForeignDeviceTable).doc("1-3-3") \
.receive(ReadForeignDeviceTableAck).doc("1-3-4") \
.success()
# run the group
@@ -170,21 +187,15 @@ class TestForeign(unittest.TestCase):
tnet.bbmd.start_state.success()
# remote sniffer node
remote_sniffer = SnifferNode("192.168.6.254/24", tnet.remote_vlan)
remote_sniffer = SnifferStateMachine("192.168.6.254/24", tnet.vlan_6)
tnet.append(remote_sniffer)
# sniffer pieces
registration_request = xtob('81.05.0006' # bvlci
'000a' # time-to-live
)
registration_ack = xtob('81.00.0006.0000') # simple ack
# remote sniffer sees registration
# sniffer traffic
remote_sniffer.start_state.doc("2-1-0") \
.receive(PDU, pduData=registration_request).doc("2-1-1") \
.receive(PDU, pduData=registration_ack).doc("2-1-2") \
.receive(PDU, pduData=registration_request).doc("2-1-3") \
.receive(PDU, pduData=registration_ack).doc("2-1-4") \
.receive(RegisterForeignDevice).doc("2-1-1") \
.receive(Result).doc("2-1-2") \
.receive(RegisterForeignDevice).doc("2-1-3") \
.receive(Result).doc("2-1-4") \
.success()
# run the group
@@ -205,7 +216,7 @@ class TestForeign(unittest.TestCase):
# register, wait for ack, send some beef
tnet.fd.start_state.doc("3-1-0") \
.call(tnet.fd.bip.register, tnet.bbmd.address, 60).doc("3-1-1") \
.wait_event('fd-registered').doc("3-1-2") \
.wait_event('3-registered').doc("3-1-2") \
.send(pdu).doc("3-1-3") \
.success()
@@ -215,24 +226,15 @@ class TestForeign(unittest.TestCase):
.success()
# remote sniffer node
remote_sniffer = SnifferNode("192.168.6.254/24", tnet.remote_vlan)
remote_sniffer = SnifferStateMachine("192.168.6.254/24", tnet.vlan_6)
tnet.append(remote_sniffer)
# sniffer pieces
registration_request = xtob('81.05.0006' # bvlci
'003c' # time-to-live (60)
)
registration_ack = xtob('81.00.0006.0000') # simple ack
unicast_pdu = xtob('81.0a.0008' # original unicast bvlci
'dead.beef' # PDU being unicast
)
# remote sniffer sees registration
# sniffer traffic
remote_sniffer.start_state.doc("3-2-0") \
.receive(PDU, pduData=registration_request).doc("3-2-1") \
.receive(PDU, pduData=registration_ack).doc("3-2-2") \
.set_event('fd-registered').doc("3-2-3") \
.receive(PDU, pduData=unicast_pdu).doc("3-2-4") \
.receive(RegisterForeignDevice).doc("3-2-1") \
.receive(Result).doc("3-2-2") \
.set_event('3-registered').doc("3-2-3") \
.receive(OriginalUnicastNPDU).doc("3-2-4") \
.success()
# run the group
@@ -258,38 +260,29 @@ class TestForeign(unittest.TestCase):
.success()
# the bbmd is happy when it gets the pdu
tnet.bbmd.start_state \
.receive(PDU, pduSource=tnet.fd.address, pduData=pdu_data) \
.success()
# home sniffer node
home_node = SimpleNode("192.168.5.254/24", tnet.home_vlan)
tnet.append(home_node)
# home node happy when getting the pdu, broadcast by the bbmd
home_node.start_state.doc("4-2-0") \
tnet.bbmd.start_state.doc("4-2-0") \
.receive(PDU, pduSource=tnet.fd.address, pduData=pdu_data).doc("4-2-1") \
.success()
# home simple node
home_node = BIPSimpleStateMachine("192.168.5.254/24", tnet.vlan_5)
tnet.append(home_node)
# home node happy when getting the pdu, broadcast by the bbmd
home_node.start_state.doc("4-3-0") \
.receive(PDU, pduSource=tnet.fd.address, pduData=pdu_data).doc("4-3-1") \
.success()
# remote sniffer node
remote_sniffer = SnifferNode("192.168.6.254/24", tnet.remote_vlan)
remote_sniffer = SnifferStateMachine("192.168.6.254/24", tnet.vlan_6)
tnet.append(remote_sniffer)
# sniffer pieces
registration_request = xtob('81.05.0006' # bvlci
'003c' # time-to-live (60)
)
registration_ack = xtob('81.00.0006.0000') # simple ack
distribute_pdu = xtob('81.09.0008' # bvlci
'deadbeef' # PDU to broadcast
)
# remote sniffer sees registration
remote_sniffer.start_state.doc("4-3-0") \
.receive(PDU, pduData=registration_request).doc("4-3-1") \
.receive(PDU, pduData=registration_ack).doc("4-3-2") \
# remote traffic
remote_sniffer.start_state.doc("4-4-0") \
.receive(RegisterForeignDevice).doc("4-4-1") \
.receive(Result).doc("4-4-2") \
.set_event('4-registered') \
.receive(PDU, pduData=distribute_pdu).doc("4-3-3") \
.receive(DistributeBroadcastToNetwork).doc("4-4-3") \
.success()
# run the group

View File

@@ -11,12 +11,15 @@ import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
from bacpypes.pdu import PDU, LocalBroadcast
from bacpypes.bvll import OriginalUnicastNPDU, OriginalBroadcastNPDU
from bacpypes.vlan import IPNetwork
from ..state_machine import match_pdu, StateMachineGroup
from ..time_machine import reset_time_machine, run_time_machine
from .helpers import SnifferNode, SimpleNode
from .helpers import (
SnifferStateMachine, BIPSimpleStateMachine,
)
# some debugging
_debug = 0
@@ -42,15 +45,15 @@ class TNetwork(StateMachineGroup):
self.vlan = IPNetwork()
# test device
self.td = SimpleNode("192.168.4.1/24", self.vlan)
self.td = BIPSimpleStateMachine("192.168.4.1/24", self.vlan)
self.append(self.td)
# implementation under test
self.iut = SimpleNode("192.168.4.2/24", self.vlan)
self.iut = BIPSimpleStateMachine("192.168.4.2/24", self.vlan)
self.append(self.iut)
# sniffer node
self.sniffer = SnifferNode("192.168.4.254/24", self.vlan)
self.sniffer = SnifferStateMachine("192.168.4.254/24", self.vlan)
self.append(self.sniffer)
@@ -109,12 +112,10 @@ class TestSimple(unittest.TestCase):
tnet.iut.start_state.receive(PDU, pduSource=tnet.td.address).success()
# sniffer sees message on the wire
tnet.sniffer.start_state.receive(PDU,
tnet.sniffer.start_state.receive(OriginalUnicastNPDU,
pduSource=tnet.td.address.addrTuple,
pduDestination=tnet.iut.address.addrTuple,
pduData=xtob('81.0a.0008' # original unicast bvlci
'deadbeef' # PDU being unicast
),
pduData=pdu_data,
).timeout(1.0).success()
# run the group
@@ -137,12 +138,10 @@ class TestSimple(unittest.TestCase):
tnet.iut.start_state.receive(PDU, pduSource=tnet.td.address).success()
# sniffer sees message on the wire
tnet.sniffer.start_state.receive(PDU,
pduSource=tnet.td.address.addrTuple,
pduDestination=('192.168.4.255', 47808),
pduData=xtob('81.0b.0008' # original broadcast bvlci
'deadbeef' # PDU being unicast
),
tnet.sniffer.start_state.receive(OriginalBroadcastNPDU,
pduSource=tnet.td.address.addrTuple,
# pduDestination=('192.168.4.255', 47808),
pduData=pdu_data,
).timeout(1.0).success()
# run the group