1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

additional tests

This commit is contained in:
Joel Bender 2017-09-22 00:27:04 -04:00
parent 3459dad17f
commit b10250f8fa
4 changed files with 423 additions and 39 deletions

View File

@ -5,4 +5,5 @@ Test Network Layer Functionality
"""
from . import test_codec
from . import test_simple

212
tests/test_npdu/helpers.py Normal file
View File

@ -0,0 +1,212 @@
#!/usr/bin/env python
"""
Network VLAN Helper Classes
"""
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.comm import Client, Server, bind
from bacpypes.pdu import Address, LocalBroadcast, PDU
from bacpypes.npdu import npdu_types, NPDU
from bacpypes.vlan import Node
from bacpypes.app import Application
from bacpypes.appservice import StateMachineAccessPoint, ApplicationServiceAccessPoint
from bacpypes.netservice import NetworkServiceAccessPoint, NetworkServiceElement
from ..state_machine import ClientStateMachine
# some debugging
_debug = 0
_log = ModuleLogger(globals())
@bacpypes_debugging
class NPDUCodec(Client, Server):
def __init__(self):
if _debug: NPDUCodec._debug("__init__")
Client.__init__(self)
Server.__init__(self)
def indication(self, npdu):
if _debug: NPDUCodec._debug("indication %r", npdu)
# first as a generic NPDU
xpdu = NPDU()
npdu.encode(xpdu)
# now as a vanilla PDU
ypdu = PDU()
xpdu.encode(ypdu)
if _debug: NPDUCodec._debug(" - encoded: %r", ypdu)
# send it downstream
self.request(ypdu)
def confirmation(self, pdu):
if _debug: NPDUCodec._debug("confirmation %r", pdu)
# decode as a generic NPDU
xpdu = NPDU()
xpdu.decode(pdu)
# drop application layer messages
if xpdu.npduNetMessage is None:
return
# do a deeper decode of the NPDU
ypdu = npdu_types[xpdu.npduNetMessage]()
ypdu.decode(xpdu)
# send it upstream
self.response(ypdu)
#
# _repr
#
class _repr:
def __repr__(self):
if not self.running:
state_text = "idle "
else:
state_text = "in "
state_text += repr(self.current_state)
return "<%s(%s) %s at %s>" % (
self.__class__.__name__,
getattr(self, 'address', '?'),
state_text,
hex(id(self)),
)
#
# SnifferNode
#
@bacpypes_debugging
class SnifferNode(_repr, ClientStateMachine):
def __init__(self, address, vlan):
if _debug: SnifferNode._debug("__init__ %r %r", address, vlan)
ClientStateMachine.__init__(self)
# save the name and address
self.name = address
self.address = Address(address)
# create a promiscuous node, added to the network
self.node = Node(self.address, vlan, promiscuous=True)
if _debug: SnifferNode._debug(" - node: %r", self.node)
# bind this to the node
bind(self, self.node)
#
# NetworkLayerNode
#
@bacpypes_debugging
class NetworkLayerNode(_repr, ClientStateMachine):
def __init__(self, address, vlan):
if _debug: NetworkLayerNode._debug("__init__ %r %r", address, vlan)
ClientStateMachine.__init__(self)
# save the name and address
self.name = address
self.address = Address(address)
# create a network layer encoder/decoder
self.codec = NPDUCodec()
if _debug: SnifferNode._debug(" - codec: %r", self.codec)
# create a node, added to the network
self.node = Node(self.address, vlan)
if _debug: SnifferNode._debug(" - node: %r", self.node)
# bind this to the node
bind(self, self.codec, self.node)
#
# RouterNode
#
@bacpypes_debugging
class RouterNode:
def __init__(self):
if _debug: RouterNode._debug("__init__")
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
def add_network(self, address, vlan, net):
if _debug: RouterNode._debug("add_network %r %r %r", address, vlan, net)
# convert the address to an Address
address = Address(address)
# create a node, added to the network
node = Node(address, vlan)
if _debug: RouterNode._debug(" - node: %r", self.node)
# bind the BIP stack to the local network
self.nsap.bind(node, net)
#
# ApplicationNode
#
@bacpypes_debugging
class ApplicationNode(_repr, Application, ClientStateMachine):
def __init__(self, localDevice, vlan):
if _debug: ApplicationNode._debug("__init__ %r %r", address, vlan)
# build an address and save it
self.address = Address(localDevice.objectIdentifier[1])
if _debug: ApplicationNode._debug(" - address: %r", self.address)
# continue with initialization
Application.__init__(self, localDevice, self.address)
StateMachine.__init__(self, name=localDevice.objectName)
# include a application decoder
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it
# can know if it should support segmentation
self.smap = StateMachineAccessPoint(localDevice)
# the segmentation state machines need access to the same device
# information cache as the application
self.smap.deviceInfoCache = self.deviceInfoCache
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
# bind the top layers
bind(self, self.asap, self.smap, self.nsap)
# create a node, added to the network
node = Node(self.address, vlan)
if _debug: RouterNode._debug(" - node: %r", self.node)
# bind the BIP stack to the local network
self.nsap.bind(node, net)

View File

@ -26,50 +26,13 @@ from bacpypes.npdu import (
from ..trapped_classes import TrappedClient, TrappedServer
from ..state_machine import match_pdu
from .helpers import NPDUCodec
# some debugging
_debug = 0
_log = ModuleLogger(globals())
@bacpypes_debugging
class NPDUCodec(Client, Server):
def __init__(self):
if _debug: NPDUCodec._debug("__init__")
Client.__init__(self)
Server.__init__(self)
def indication(self, npdu):
if _debug: NPDUCodec._debug("indication %r", npdu)
# first as a generic NPDU
xpdu = NPDU()
npdu.encode(xpdu)
# now as a vanilla PDU
ypdu = PDU()
xpdu.encode(ypdu)
if _debug: NPDUCodec._debug(" - encoded: %r", ypdu)
# send it downstream
self.request(ypdu)
def confirmation(self, pdu):
if _debug: NPDUCodec._debug("confirmation %r", pdu)
# decode as a generic NPDU
xpdu = NPDU()
xpdu.decode(pdu)
# do a deeper decode of the NPDU
ypdu = npdu_types[xpdu.npduNetMessage]()
ypdu.decode(xpdu)
# send it upstream
self.response(ypdu)
@bacpypes_debugging
class TestNPDUCodec(unittest.TestCase):

View File

@ -0,0 +1,208 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Test NPDU Encoding and Decoding
-------------------------------
"""
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, btox, xtob
from bacpypes.comm import Client, Server, bind
from bacpypes.pdu import PDU, Address, LocalBroadcast
from bacpypes.vlan import Network
from bacpypes.npdu import (
npdu_types, NPDU,
WhoIsRouterToNetwork, IAmRouterToNetwork, ICouldBeRouterToNetwork,
RejectMessageToNetwork, RouterBusyToNetwork, RouterAvailableToNetwork,
RoutingTableEntry, InitializeRoutingTable, InitializeRoutingTableAck,
EstablishConnectionToNetwork, DisconnectConnectionToNetwork,
WhatIsNetworkNumber, NetworkNumberIs,
)
from ..state_machine import match_pdu, StateMachineGroup
from ..time_machine import reset_time_machine, run_time_machine
from .helpers import SnifferNode, NetworkLayerNode, RouterNode
# some debugging
_debug = 0
_log = ModuleLogger(globals())
#
# TNetwork
#
@bacpypes_debugging
class TNetwork(StateMachineGroup):
def __init__(self):
if _debug: TNetwork._debug("__init__")
StateMachineGroup.__init__(self)
# reset the time machine
reset_time_machine()
if _debug: TNetwork._debug(" - time machine reset")
# make a little LAN
self.vlan1 = Network(name="vlan1", broadcast_address=LocalBroadcast())
# test device
self.td = NetworkLayerNode("1", self.vlan1)
self.append(self.td)
# sniffer node
self.sniffer1 = SnifferNode("2", self.vlan1)
self.append(self.sniffer1)
# make another little LAN
self.vlan2 = Network(name="vlan2", broadcast_address=LocalBroadcast())
# sniffer node
self.sniffer2 = SnifferNode("3", self.vlan2)
self.append(self.sniffer2)
# implementation under test
self.iut = RouterNode()
# add the networks
self.iut.add_network("4", self.vlan1, 1)
self.iut.add_network("5", self.vlan2, 2)
def run(self, time_limit=60.0):
if _debug: TNetwork._debug("run %r", time_limit)
# run the group
super(TNetwork, self).run()
# run it for some time
run_time_machine(time_limit)
if _debug:
TNetwork._debug(" - time machine finished")
for state_machine in self.state_machines:
TNetwork._debug(" - machine: %r", state_machine)
for direction, pdu in state_machine.transaction_log:
TNetwork._debug(" %s %s", direction, str(pdu))
# check for success
all_success, some_failed = super(TNetwork, self).check_for_success()
assert all_success
@bacpypes_debugging
class TestSimple(unittest.TestCase):
def test_idle(self):
"""Test an idle network, nothing happens is success."""
if _debug: TestSimple._debug("test_idle")
# create a network
tnet = TNetwork()
# all start states are successful
tnet.td.start_state.success()
tnet.sniffer1.start_state.success()
tnet.sniffer2.start_state.success()
# run the group
tnet.run()
@bacpypes_debugging
class TestWhoIsRouterToNetwork(unittest.TestCase):
def test_01(self):
"""Test broadcast for any router."""
if _debug: TestWhoIsRouterToNetwork._debug("test_01")
# create a network
tnet = TNetwork()
# all start states are successful
tnet.td.start_state.doc("1-1-0") \
.send(WhoIsRouterToNetwork(
destination=LocalBroadcast(),
)).doc("1-1-1") \
.receive(IAmRouterToNetwork,
iartnNetworkList=[2],
).doc("1-1-2") \
.success()
tnet.sniffer1.start_state.success()
# nothing received on network 2
tnet.sniffer2.start_state.doc("1-2-0") \
.timeout(3).doc("1-2-1") \
.success()
# run the group
tnet.run()
def test_02(self):
"""Test broadcast for existing router."""
if _debug: TestWhoIsRouterToNetwork._debug("test_02")
# create a network
tnet = TNetwork()
# all start states are successful
tnet.td.start_state.doc("2-1-0") \
.send(WhoIsRouterToNetwork(2,
destination=LocalBroadcast(),
)).doc("2-1-1") \
.receive(IAmRouterToNetwork,
iartnNetworkList=[2],
).doc("2-1-2") \
.success()
tnet.sniffer1.start_state.success()
# nothing received on network 2
tnet.sniffer2.start_state.doc("2-2-0") \
.timeout(3).doc("2-2-1") \
.success()
# run the group
tnet.run()
def test_03(self):
"""Test broadcast for a non-existent router."""
if _debug: TestWhoIsRouterToNetwork._debug("test_03")
# create a network
tnet = TNetwork()
# send request, receive nothing back
tnet.td.start_state.doc("3-1-0") \
.send(WhoIsRouterToNetwork(3,
destination=LocalBroadcast(),
)).doc("3-1-1") \
.timeout(3).doc("3-1-2") \
.success()
# sniffer on network 1 sees the request
tnet.sniffer1.start_state.doc("3-2-0") \
.receive(PDU,
pduData=xtob('01.80' # version, network layer
'00 0003' # message type and network
)
).doc("3-2-1") \
.success()
# sniffer on network 2 sees request forwarded by router
tnet.sniffer2.start_state.doc("3-3-0") \
.receive(PDU,
pduData=xtob('01.88' # version, network layer, routed
'0001 01 01' # snet/slen/sadr
'00 0003' # message type and network
),
).doc("3-3-1") \
.success()
# run the group
tnet.run()