1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

add an IPRouter and tests

This commit is contained in:
Joel Bender 2017-08-17 02:53:15 -04:00
parent f1d4626527
commit 2fa62dfe5f
3 changed files with 227 additions and 33 deletions

View File

@ -5,6 +5,8 @@ Virtual Local Area Network
"""
import random
import socket
import struct
from copy import deepcopy
from .errors import ConfigurationError
@ -12,7 +14,7 @@ from .debugging import ModuleLogger, bacpypes_debugging
from .core import deferred
from .pdu import Address
from .comm import Server
from .comm import Client, Server, bind
# some debugging
_debug = 0
@ -61,10 +63,12 @@ class Network:
if pdu.pduDestination == self.broadcast_address:
for n in self.nodes:
if (pdu.pduSource != n.address):
if _debug: Network._debug(" - match: %r", n)
n.response(deepcopy(pdu))
else:
for n in self.nodes:
if n.promiscuous or (pdu.pduDestination == n.address):
if _debug: Network._debug(" - match: %r", n)
n.response(deepcopy(pdu))
def __len__(self):
@ -165,7 +169,7 @@ class IPNode(Node):
tuple and a broadcast tuple that would be used for socket communications.
"""
def __init__(self, addr, lan=None):
def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None):
if _debug: IPNode._debug("__init__ %r lan=%r", addr, lan)
# make sure it's an Address that has appropriate pieces
@ -178,5 +182,74 @@ class IPNode(Node):
self.addrBroadcastTuple = addr.addrBroadcastTuple
# continue initializing
Node.__init__(self, addr.addrTuple, lan=lan)
Node.__init__(self, addr.addrTuple, lan=lan, promiscuous=promiscuous, spoofing=spoofing, sid=sid)
#
# IPRouterNode
#
@bacpypes_debugging
class IPRouterNode(Client):
def __init__(self, router, addr, lan=None):
if _debug: IPRouterNode._debug("__init__ %r %r lan=%r", router, addr, lan)
# save the reference to the router
self.router = router
# make ourselves an IPNode and bind to it
self.node = IPNode(addr, lan=lan, promiscuous=True, spoofing=True)
bind(self, self.node)
# save our mask and subnet
self.addrMask = addr.addrMask
self.addrSubnet = addr.addrSubnet
def confirmation(self, pdu):
if _debug: IPRouterNode._debug("confirmation %r", pdu)
self.router.process_pdu(self, pdu)
def process_pdu(self, pdu):
if _debug: IPRouterNode._debug("process_pdu %r", pdu)
# pass it downstream
self.request(pdu)
#
# IPRouter
#
@bacpypes_debugging
class IPRouter:
def __init__(self):
if _debug: IPRouter._debug("__init__")
# connected network nodes
self.nodes = []
def add_network(self, addr, lan):
if _debug: IPRouter._debug("add_network %r %r", addr, lan)
node = IPRouterNode(self, addr, lan)
if _debug: IPRouter._debug(" - node: %r", node)
self.nodes.append(node)
def process_pdu(self, node, pdu):
if _debug: IPRouter._debug("process_pdu %r %r", node, pdu)
# unpack the address part of the destination
addrstr = socket.inet_aton(pdu.pduDestination[0])
ipaddr = struct.unpack('!L', addrstr)[0]
if _debug: IPRouter._debug(" - ipaddr: %r", ipaddr)
# loop through the other nodes
for inode in self.nodes:
if inode is not node:
if (ipaddr & inode.addrMask) == inode.addrSubnet:
if _debug: IPRouter._debug(" - inode: %r", inode)
inode.process_pdu(pdu)

View File

@ -14,7 +14,7 @@ from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.pdu import Address, LocalBroadcast, PDU
from bacpypes.comm import bind
from bacpypes.vlan import IPNetwork, IPNode
from bacpypes.vlan import IPNetwork, IPNode, IPRouter
from ..state_machine import ClientStateMachine, StateMachineGroup
from ..time_machine import reset_time_machine, run_time_machine
@ -58,14 +58,14 @@ class ZPDU():
@bacpypes_debugging
class TNetwork(StateMachineGroup):
def __init__(self, node_count):
def __init__(self, node_count, address_pattern):
if _debug: TNetwork._debug("__init__ %r", node_count)
StateMachineGroup.__init__(self)
self.vlan = IPNetwork()
for i in range(node_count):
node_address = Address("192.168.0.{}/24".format(i + 1))
node_address = Address(address_pattern.format(i + 1))
node = IPNode(node_address, self.vlan)
if _debug: TNetwork._debug(" - node: %r", node)
@ -115,7 +115,7 @@ class TestVLAN(unittest.TestCase):
if _debug: TestVLAN._debug("test_idle")
# two element network
tnet = TNetwork(2)
tnet = TNetwork(2, "192.168.1.{}/24")
# set the start states of both machines to success
tnet[1].start_state.success()
@ -130,19 +130,19 @@ class TestVLAN(unittest.TestCase):
if _debug: TestVLAN._debug("test_send_receive")
# two element network
tnet = TNetwork(2)
tnet = TNetwork(2, "192.168.2.{}/24")
# make a PDU from node 1 to node 2
pdu = PDU(b'data',
source=('192.168.0.1', 47808),
destination=('192.168.0.2', 47808),
source=('192.168.2.1', 47808),
destination=('192.168.2.2', 47808),
)
if _debug: TestVLAN._debug(" - pdu: %r", pdu)
# node 1 sends the pdu, mode 2 gets it
tnet[1].start_state.send(pdu).success()
tnet[2].start_state.receive(ZPDU(
pduSource=('192.168.0.1', 47808),
pduSource=('192.168.2.1', 47808),
)).success()
# run the group
@ -155,22 +155,22 @@ class TestVLAN(unittest.TestCase):
if _debug: TestVLAN._debug("test_broadcast")
# three element network
tnet = TNetwork(3)
tnet = TNetwork(3, "192.168.3.{}/24")
# make a broadcast PDU
pdu = PDU(b'data',
source=('192.168.0.1', 47808),
destination=('192.168.0.255', 47808),
source=('192.168.3.1', 47808),
destination=('192.168.3.255', 47808),
)
if _debug: TestVLAN._debug(" - pdu: %r", pdu)
# node 1 sends the pdu, node 2 and 3 each get it
tnet[1].start_state.send(pdu).success()
tnet[2].start_state.receive(ZPDU(
pduSource=('192.168.0.1', 47808),
pduSource=('192.168.3.1', 47808),
)).success()
tnet[3].start_state.receive(ZPDU(
pduSource=('192.168.0.1', 47808)
pduSource=('192.168.3.1', 47808)
)).success()
# run the group
@ -183,12 +183,12 @@ class TestVLAN(unittest.TestCase):
if _debug: TestVLAN._debug("test_spoof_fail")
# two element network
tnet = TNetwork(1)
tnet = TNetwork(1, "192.168.4.{}/24")
# make a unicast PDU with the wrong source
pdu = PDU(b'data',
source=('192.168.0.2', 47808),
destination=('192.168.0.3', 47808),
source=('192.168.4.2', 47808),
destination=('192.168.4.3', 47808),
)
# the node sends the pdu and would be a success but...
@ -205,20 +205,20 @@ class TestVLAN(unittest.TestCase):
if _debug: TestVLAN._debug("test_spoof_pass")
# one node network
tnet = TNetwork(1)
tnet = TNetwork(1, "192.168.5.{}/24")
# reach into the network and enable spoofing for the node
tnet.vlan.nodes[0].spoofing = True
# make a unicast PDU from a fictitious node
pdu = PDU(b'data',
source=('192.168.0.3', 47808),
destination=('192.168.0.1', 47808),
source=('192.168.5.3', 47808),
destination=('192.168.5.1', 47808),
)
# node 1 sends the pdu, but gets it back as if it was from node 3
tnet[1].start_state.send(pdu).receive(ZPDU(
pduSource=('192.168.0.3', 47808),
pduSource=('192.168.5.3', 47808),
)).success()
# run the group
@ -232,24 +232,24 @@ class TestVLAN(unittest.TestCase):
if _debug: TestVLAN._debug("test_promiscuous_pass")
# three element network
tnet = TNetwork(3)
tnet = TNetwork(3, "192.168.6.{}/24")
# reach into the network and enable promiscuous mode
tnet.vlan.nodes[2].promiscuous = True
# make a PDU from node 1 to node 2
pdu = PDU(b'data',
source=('192.168.0.1', 47808),
destination=('192.168.0.2', 47808),
source=('192.168.6.1', 47808),
destination=('192.168.6.2', 47808),
)
# node 1 sends the pdu to node 2, node 3 also gets a copy
tnet[1].start_state.send(pdu).success()
tnet[2].start_state.receive(ZPDU(
pduSource=('192.168.0.1', 47808),
pduSource=('192.168.6.1', 47808),
)).success()
tnet[3].start_state.receive(ZPDU(
pduDestination=('192.168.0.2', 47808),
pduDestination=('192.168.6.2', 47808),
)).success()
# run the group
@ -259,18 +259,18 @@ class TestVLAN(unittest.TestCase):
if _debug: TestVLAN._debug("test_promiscuous_fail")
# three element network
tnet = TNetwork(3)
tnet = TNetwork(3, "192.168.7.{}/24")
# make a PDU from node 1 to node 2
pdu = PDU(b'data',
source=('192.168.0.1', 47808),
destination=('192.168.0.2', 47808),
source=('192.168.7.1', 47808),
destination=('192.168.7.2', 47808),
)
# node 1 sends the pdu to node 2, node 3 waits and gets nothing
tnet[1].start_state.send(pdu).success()
tnet[2].start_state.receive(ZPDU(
pduSource=('192.168.0.1', 47808),
pduSource=('192.168.7.1', 47808),
)).success()
# if node 3 receives anything it will trigger unexpected receive and fail
@ -279,3 +279,124 @@ class TestVLAN(unittest.TestCase):
# run the group
tnet.run()
@bacpypes_debugging
class TestRouter(unittest.TestCase):
def __init__(self, *args, **kwargs):
if _debug: TestRouter._debug("__init__ %r %r", args, kwargs)
super(TestRouter, self).__init__(*args, **kwargs)
def setup_method(self, method):
"""This function is called before each test method is called as is
given a reference to the test method."""
if _debug: TestRouter._debug("setup_method %r", method)
# create a state machine group that has all nodes on all networks
self.smg = StateMachineGroup()
# make some networks
vlan10 = IPNetwork()
vlan20 = IPNetwork()
# make a router and add the networks
trouter = IPRouter()
trouter.add_network(Address("192.168.10.1/24"), vlan10)
trouter.add_network(Address("192.168.20.1/24"), vlan20)
# add nodes to the networks
for pattern, lan in (
("192.168.10.{}/24", vlan10),
("192.168.20.{}/24", vlan20),
):
for i in range(2):
node_address = Address(pattern.format(i + 2))
node = IPNode(node_address, lan)
if _debug: TestRouter._debug(" - node: %r", node)
# bind a client state machine to the node
csm = ClientStateMachine()
bind(csm, node)
# add it to the group
self.smg.append(csm)
def teardown_method(self, method):
"""This function is called after each test method has been called and
is given a reference to the test method."""
if _debug: TestRouter._debug("teardown_method %r", method)
# reset the time machine
reset_time_machine()
if _debug: TestRouter._debug(" - time machine reset")
# run the group
self.smg.run()
# run it for some time
run_time_machine(60.0)
if _debug: TestRouter._debug(" - time machine finished")
# check for success
all_success, some_failed = self.smg.check_for_success()
assert all_success
def test_idle(self):
if _debug: TestRouter._debug("test_idle")
# all successful
for csm in self.smg.state_machines:
csm.start_state.success()
def test_send_receive(self):
"""Test that a node can send a message to another node on
a different network.
"""
if _debug: TestRouter._debug("test_send_receive")
# unpack the state machines
csm_10_2, csm_10_3, csm_20_2, csm_20_3 = self.smg.state_machines
# make a PDU from network 10 node 1 to network 20 node 2
pdu = PDU(b'data',
source=('192.168.10.2', 47808),
destination=('192.168.20.3', 47808),
)
if _debug: TestVLAN._debug(" - pdu: %r", pdu)
# node 1 sends the pdu, mode 2 gets it
csm_10_2.start_state.send(pdu).success()
csm_20_3.start_state.receive(ZPDU(
pduSource=('192.168.10.2', 47808),
)).success()
# other nodes get nothing
csm_10_3.start_state.success()
csm_20_2.start_state.success()
def test_remote_broadcast(self):
"""Test that a node can send a message to all of the other nodes on
a different network.
"""
if _debug: TestRouter._debug("test_remote_broadcast")
# unpack the state machines
csm_10_2, csm_10_3, csm_20_2, csm_20_3 = self.smg.state_machines
# make a PDU from network 10 node 1 to network 20 node 2
pdu = PDU(b'data',
source=('192.168.10.2', 47808),
destination=('192.168.20.255', 47808),
)
if _debug: TestVLAN._debug(" - pdu: %r", pdu)
# node 10-2 sends the pdu, nodes 20-2 and 20-3 get it
csm_10_2.start_state.send(pdu).success()
csm_10_3.start_state.success()
csm_20_2.start_state.receive(ZPDU(
pduSource=('192.168.10.2', 47808),
)).success()
csm_20_3.start_state.receive(ZPDU(
pduSource=('192.168.10.2', 47808),
)).success()

View File

@ -29,7 +29,7 @@ _log = ModuleLogger(globals())
class ZPDU():
def __init__(self, cls=None, **kwargs):
if _debug: ZPDU._debug("__init__ %r", kwargs)
if _debug: ZPDU._debug("__init__ %r %r", cls, kwargs)
self.cls = cls
self.kwargs = kwargs