1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

remove the restriction on VLAN nodes to have address of type Address

This commit is contained in:
Joel Bender 2017-08-15 00:21:39 -04:00
parent c58d255153
commit 75b3e1af6b
4 changed files with 42 additions and 94 deletions

View File

@ -24,11 +24,12 @@ _log = ModuleLogger(globals())
class Network:
def __init__(self, dropPercent=0.0):
if _debug: Network._debug("__init__ dropPercent=%r", dropPercent)
def __init__(self, broadcast_address=None, drop_percent=0.0):
if _debug: Network._debug("__init__ broadcast_address=%r drop_percent=%r", broadcast_address, drop_percent)
self.nodes = []
self.dropPercent = dropPercent
self.broadcast_address = broadcast_address
self.drop_percent = drop_percent
def add_node(self, node):
""" Add a node to this network, let the node know which network it's on. """
@ -50,27 +51,21 @@ class Network:
"""
if _debug: Network._debug("process_pdu %r", pdu)
if self.dropPercent != 0.0:
if (random.random() * 100.0) < self.dropPercent:
# randomly drop a packet
if self.drop_percent != 0.0:
if (random.random() * 100.0) < self.drop_percent:
if _debug: Network._debug(" - packet dropped")
return
if not pdu.pduDestination or not isinstance(pdu.pduDestination, Address):
raise RuntimeError("invalid destination address")
elif pdu.pduDestination.addrType == Address.localBroadcastAddr:
if pdu.pduDestination == self.broadcast_address:
for n in self.nodes:
if (pdu.pduSource != n.address):
n.response(deepcopy(pdu))
elif pdu.pduDestination.addrType == Address.localStationAddr:
else:
for n in self.nodes:
if n.promiscuous or (pdu.pduDestination == n.address):
n.response(deepcopy(pdu))
else:
raise RuntimeError("invalid destination address type")
def __len__(self):
""" Simple way to determine the number of nodes in the network. """
if _debug: Network._debug("__len__")
@ -91,9 +86,6 @@ class Node(Server):
)
Server.__init__(self, sid)
if not isinstance(addr, Address):
raise TypeError("addr must be an address")
self.lan = None
self.address = addr
@ -130,3 +122,4 @@ class Node(Server):
deferred(self.lan.process_pdu, pdu)
bacpypes_debugging(Node)

View File

@ -25,11 +25,12 @@ _log = ModuleLogger(globals())
@bacpypes_debugging
class Network:
def __init__(self, dropPercent=0.0):
if _debug: Network._debug("__init__ dropPercent=%r", dropPercent)
def __init__(self, broadcast_address=None, drop_percent=0.0):
if _debug: Network._debug("__init__ broadcast_address=%r drop_percent=%r", broadcast_address, drop_percent)
self.nodes = []
self.dropPercent = dropPercent
self.broadcast_address = broadcast_address
self.drop_percent = drop_percent
def add_node(self, node):
""" Add a node to this network, let the node know which network it's on. """
@ -51,27 +52,21 @@ class Network:
"""
if _debug: Network._debug("process_pdu %r", pdu)
if self.dropPercent != 0.0:
if (random.random() * 100.0) < self.dropPercent:
# randomly drop a packet
if self.drop_percent != 0.0:
if (random.random() * 100.0) < self.drop_percent:
if _debug: Network._debug(" - packet dropped")
return
if not pdu.pduDestination or not isinstance(pdu.pduDestination, Address):
raise RuntimeError("invalid destination address")
elif pdu.pduDestination.addrType == Address.localBroadcastAddr:
if pdu.pduDestination == self.broadcast_address:
for n in self.nodes:
if (pdu.pduSource != n.address):
n.response(deepcopy(pdu))
elif pdu.pduDestination.addrType == Address.localStationAddr:
else:
for n in self.nodes:
if n.promiscuous or (pdu.pduDestination == n.address):
n.response(deepcopy(pdu))
else:
raise RuntimeError("invalid destination address type")
def __len__(self):
""" Simple way to determine the number of nodes in the network. """
if _debug: Network._debug("__len__")
@ -91,9 +86,6 @@ class Node(Server):
)
Server.__init__(self, sid)
if not isinstance(addr, Address):
raise TypeError("addr must be an address")
self.lan = None
self.address = addr

View File

@ -25,11 +25,12 @@ _log = ModuleLogger(globals())
@bacpypes_debugging
class Network:
def __init__(self, dropPercent=0.0):
if _debug: Network._debug("__init__ dropPercent=%r", dropPercent)
def __init__(self, broadcast_address=None, drop_percent=0.0):
if _debug: Network._debug("__init__ broadcast_address=%r drop_percent=%r", broadcast_address, drop_percent)
self.nodes = []
self.dropPercent = dropPercent
self.broadcast_address = broadcast_address
self.drop_percent = drop_percent
def add_node(self, node):
""" Add a node to this network, let the node know which network it's on. """
@ -51,27 +52,21 @@ class Network:
"""
if _debug: Network._debug("process_pdu %r", pdu)
if self.dropPercent != 0.0:
if (random.random() * 100.0) < self.dropPercent:
# randomly drop a packet
if self.drop_percent != 0.0:
if (random.random() * 100.0) < self.drop_percent:
if _debug: Network._debug(" - packet dropped")
return
if not pdu.pduDestination or not isinstance(pdu.pduDestination, Address):
raise RuntimeError("invalid destination address")
elif pdu.pduDestination.addrType == Address.localBroadcastAddr:
if pdu.pduDestination == self.broadcast_address:
for n in self.nodes:
if (pdu.pduSource != n.address):
n.response(deepcopy(pdu))
elif pdu.pduDestination.addrType == Address.localStationAddr:
else:
for n in self.nodes:
if n.promiscuous or (pdu.pduDestination == n.address):
n.response(deepcopy(pdu))
else:
raise RuntimeError("invalid destination address type")
def __len__(self):
""" Simple way to determine the number of nodes in the network. """
if _debug: Network._debug("__len__")
@ -91,9 +86,6 @@ class Node(Server):
)
Server.__init__(self, sid)
if not isinstance(addr, Address):
raise TypeError("addr must be an address")
self.lan = None
self.address = addr

View File

@ -63,10 +63,10 @@ class TNetwork(StateMachineGroup):
if _debug: TNetwork._debug("__init__ %r", node_count)
StateMachineGroup.__init__(self)
self.vlan = Network()
self.vlan = Network(broadcast_address=0)
for i in range(node_count):
node = Node(Address(i + 1), self.vlan)
node = Node(i + 1, self.vlan)
# bind a client state machine to the node
csm = ClientStateMachine()
@ -132,17 +132,12 @@ class TestVLAN(unittest.TestCase):
tnet = TNetwork(2)
# make a PDU from node 1 to node 2
pdu = PDU(b'data',
source=Address(1),
destination=Address(2),
)
pdu = PDU(b'data', source=1, destination=2)
if _debug: TestVLAN._debug(" - pdu: %r", pdu)
# node 1 sends the pdu, mode 2 gets it
tnet[1].start_state.send(pdu).success()
tnet[2].start_state.receive(ZPDU(
pduSource=Address(1),
)).success()
tnet[2].start_state.receive(ZPDU(pduSource=1)).success()
# run the group
tnet.run()
@ -157,20 +152,13 @@ class TestVLAN(unittest.TestCase):
tnet = TNetwork(3)
# make a broadcast PDU
pdu = PDU(b'data',
source=Address(1),
destination=LocalBroadcast(),
)
pdu = PDU(b'data', source=1, destination=0)
if _debug: TestVLAN._debug(" - pdu: %r", pdu)
# node 1 sends the pdu, node 2 and 3 each get it
tnet[1].start_state.send(pdu).success()
tnet[2].start_state.receive(ZPDU(
pduSource=Address(1),
)).success()
tnet[3].start_state.receive(ZPDU(
pduSource=Address(1),
)).success()
tnet[2].start_state.receive(ZPDU(pduSource=1)).success()
tnet[3].start_state.receive(ZPDU(pduSource=1)).success()
# run the group
tnet.run()
@ -210,15 +198,10 @@ class TestVLAN(unittest.TestCase):
tnet.vlan.nodes[0].spoofing = True
# make a unicast PDU from a fictitious node
pdu = PDU(b'data',
source=Address(3),
destination=Address(1),
)
pdu = PDU(b'data', source=3, destination=1)
# node 1 sends the pdu, but gets it back as if it was from node 3
tnet[1].start_state.send(pdu).receive(ZPDU(
pduSource=Address(3),
)).success()
tnet[1].start_state.send(pdu).receive(ZPDU(pduSource=3)).success()
# run the group
tnet.run()
@ -237,19 +220,12 @@ class TestVLAN(unittest.TestCase):
tnet.vlan.nodes[2].promiscuous = True
# make a PDU from node 1 to node 2
pdu = PDU(b'data',
source=Address(1),
destination=Address(2),
)
pdu = PDU(b'data', source=1, destination=2)
# node 1 sends the pdu to node 2, node 3 also gets a copy
tnet[1].start_state.send(pdu).success()
tnet[2].start_state.receive(ZPDU(
pduSource=Address(1),
)).success()
tnet[3].start_state.receive(ZPDU(
pduDestination=Address(2),
)).success()
tnet[2].start_state.receive(ZPDU(pduSource=1)).success()
tnet[3].start_state.receive(ZPDU(pduDestination=2)).success()
# run the group
tnet.run()
@ -261,16 +237,11 @@ class TestVLAN(unittest.TestCase):
tnet = TNetwork(3)
# make a PDU from node 1 to node 2
pdu = PDU(b'data',
source=Address(1),
destination=Address(2),
)
pdu = PDU(b'data', source=1, destination=2)
# node 1 sends the pdu to node 2, node 3 waits and gets nothing
tnet[1].start_state.send(pdu).success()
tnet[2].start_state.receive(ZPDU(
pduSource=Address(1),
)).success()
tnet[2].start_state.receive(ZPDU(pduSource=1)).success()
# if node 3 receives anything it will trigger unexpected receive and fail
tnet[3].start_state.timeout(0.5).success()