1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-10-27 00:57:47 +08:00

merging #131 which ended up being lots of tests

This commit is contained in:
Joel Bender
2017-09-09 00:55:06 -04:00
29 changed files with 1881 additions and 190 deletions

View File

@@ -224,7 +224,7 @@ class WriteBroadcastDistributionTable(BVLPDU):
BVLCI.update(bvlpdu, self)
for bdte in self.bvlciBDT:
bvlpdu.put_data( bdte.addrAddr )
bvlpdu.put_data( bdte.addrMask )
bvlpdu.put_long( bdte.addrMask )
def decode(self, bvlpdu):
BVLCI.update(self, bvlpdu)
@@ -401,6 +401,11 @@ class FDTEntry(DebugContents):
self.fdTTL = None
self.fdRemain = None
def __eq__(self, other):
"""Return true iff entries are identical."""
return (self.fdAddress == other.fdAddress) and \
(self.fdTTL == other.fdTTL) and (self.fdRemain == other.fdRemain)
def bvlpdu_contents(self, use_dict=None, as_class=dict):
"""Return the contents of an object as a dict."""
# make/extend the dictionary of content

View File

@@ -72,7 +72,7 @@ _identLock = threading.Lock()
class IOCB(DebugContents):
_debugContents = \
_debug_contents = \
( 'args', 'kwargs'
, 'ioState', 'ioResponse-', 'ioError'
, 'ioController', 'ioServerRef', 'ioControllerRef', 'ioClientID', 'ioClientAddr'
@@ -227,7 +227,7 @@ bacpypes_debugging(IOCB)
class IOChainMixIn(DebugContents):
_debugContents = ( 'ioChain++', )
_debug_contents = ( 'ioChain++', )
def __init__(self, iocb):
if _debug: IOChainMixIn._debug("__init__ %r", iocb)
@@ -366,7 +366,7 @@ bacpypes_debugging(IOChain)
class IOGroup(IOCB, DebugContents):
_debugContents = ('ioMembers',)
_debug_contents = ('ioMembers',)
def __init__(self):
"""Initialize a group."""
@@ -704,7 +704,7 @@ class IOQController(IOController):
# if we're busy, queue it
if (self.state != CTRL_IDLE):
if _debug: IOQController._debug(" - busy, request queued")
if _debug: IOQController._debug(" - busy, request queued, active_iocb: %r", self.active_iocb)
iocb.ioState = PENDING
self.ioQueue.put(iocb)
@@ -719,6 +719,7 @@ class IOQController(IOController):
except:
# extract the error
err = sys.exc_info()[1]
if _debug: IOQController._debug(" - process_io() exception: %r", err)
# if there was an error, abort the request
if err:
@@ -871,7 +872,7 @@ class ClientController(Client, IOQController):
self.request(iocb.args[0])
def confirmation(self, pdu):
if _debug: ClientController._debug("confirmation %r %r", args, kwargs)
if _debug: ClientController._debug("confirmation %r", pdu)
# make sure it has an active iocb
if not self.active_iocb:
@@ -917,13 +918,18 @@ bacpypes_debugging(SieveQueue)
class SieveClientController(Client, IOController):
def __init__(self):
def __init__(self, queue_class=SieveQueue):
if _debug: SieveClientController._debug("__init__")
Client.__init__(self)
IOController.__init__(self)
# make sure it's the correct class
if not issubclass(queue_class, SieveQueue):
raise TypeError("queue class must be a subclass of SieveQueue")
# queues for each address
self.queues = {}
self.queue_class = queue_class
def process_io(self, iocb):
if _debug: SieveClientController._debug("process_io %r", iocb)
@@ -936,7 +942,7 @@ class SieveClientController(Client, IOController):
queue = self.queues.get(destination_address, None)
if not queue:
if _debug: SieveClientController._debug(" - new queue")
queue = SieveQueue(self.request, destination_address)
queue = self.queue_class(self.request, destination_address)
self.queues[destination_address] = queue
if _debug: SieveClientController._debug(" - queue: %r", queue)

View File

@@ -5,6 +5,8 @@ Virtual Local Area Network
"""
import random
import socket
import struct
from copy import deepcopy
from .errors import ConfigurationError
@@ -12,7 +14,7 @@ from .debugging import ModuleLogger, bacpypes_debugging
from .core import deferred
from .pdu import Address
from .comm import Server
from .comm import Client, Server, bind
# some debugging
_debug = 0
@@ -24,11 +26,13 @@ _log = ModuleLogger(globals())
class Network:
def __init__(self, dropPercent=0.0):
if _debug: Network._debug("__init__ dropPercent=%r", dropPercent)
def __init__(self, name='', broadcast_address=None, drop_percent=0.0):
if _debug: Network._debug("__init__ name=%r broadcast_address=%r drop_percent=%r", name, broadcast_address, drop_percent)
self.name = name
self.nodes = []
self.dropPercent = dropPercent
self.broadcast_address = broadcast_address
self.drop_percent = drop_percent
def add_node(self, node):
""" Add a node to this network, let the node know which network it's on. """
@@ -37,6 +41,10 @@ class Network:
self.nodes.append(node)
node.lan = self
# update the node name
if not node.name:
node.name = '%s:%s' % (self.name, node.address)
def remove_node(self, node):
""" Remove a node from this network. """
if _debug: Network._debug("remove_node %r", node)
@@ -48,29 +56,25 @@ class Network:
""" Process a PDU by sending a copy to each node as dictated by the
addressing and if a node is promiscuous.
"""
if _debug: Network._debug("process_pdu %r", pdu)
if _debug: Network._debug("[%s]process_pdu %r", self.name, pdu)
if self.dropPercent != 0.0:
if (random.random() * 100.0) < self.dropPercent:
# randomly drop a packet
if self.drop_percent != 0.0:
if (random.random() * 100.0) < self.drop_percent:
if _debug: Network._debug(" - packet dropped")
return
if not pdu.pduDestination or not isinstance(pdu.pduDestination, Address):
raise RuntimeError("invalid destination address")
elif pdu.pduDestination.addrType == Address.localBroadcastAddr:
if pdu.pduDestination == self.broadcast_address:
for n in self.nodes:
if (pdu.pduSource != n.address):
if _debug: Network._debug(" - match: %r", n)
n.response(deepcopy(pdu))
elif pdu.pduDestination.addrType == Address.localStationAddr:
else:
for n in self.nodes:
if n.promiscuous or (pdu.pduDestination == n.address):
if _debug: Network._debug(" - match: %r", n)
n.response(deepcopy(pdu))
else:
raise RuntimeError("invalid destination address type")
def __len__(self):
""" Simple way to determine the number of nodes in the network. """
if _debug: Network._debug("__len__")
@@ -84,21 +88,19 @@ bacpypes_debugging(Network)
class Node(Server):
def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None):
def __init__(self, addr, lan=None, name='', promiscuous=False, spoofing=False, sid=None):
if _debug:
Node._debug("__init__ %r lan=%r promiscuous=%r spoofing=%r sid=%r",
addr, lan, promiscuous, spoofing, sid
Node._debug("__init__ %r lan=%r name=%r, promiscuous=%r spoofing=%r sid=%r",
addr, lan, name, promiscuous, spoofing, sid
)
Server.__init__(self, sid)
if not isinstance(addr, Address):
raise TypeError("addr must be an address")
self.lan = None
self.address = addr
self.name = name
# bind to a lan if it was provided
if lan:
if lan is not None:
self.bind(lan)
# might receive all packets and might spoof
@@ -113,7 +115,7 @@ class Node(Server):
def indication(self, pdu):
"""Send a message."""
if _debug: Node._debug("indication %r", pdu)
if _debug: Node._debug("[%s]indication %r", self.name, pdu)
# make sure we're connected
if not self.lan:
@@ -130,3 +132,133 @@ class Node(Server):
deferred(self.lan.process_pdu, pdu)
bacpypes_debugging(Node)
#
# IPNetwork
#
class IPNetwork(Network):
"""
IPNetwork instances are Network objects where the addresses on the
network are tuples that would be used for sockets like ('1.2.3.4', 5).
The first node added to the network sets the broadcast address, like
('1.2.3.255', 5) and the other nodes must have the same tuple.
"""
def __init__(self):
if _debug: IPNetwork._debug("__init__")
Network.__init__(self)
def add_node(self, node):
if _debug: IPNetwork._debug("add_node %r", node)
# first node sets the broadcast tuple, other nodes much match
if not self.nodes:
self.broadcast_address = node.addrBroadcastTuple
elif node.addrBroadcastTuple != self.broadcast_address:
raise ValueError("nodes must all have the same broadcast tuple")
# continue along
Network.add_node(self, node)
bacpypes_debugging(IPNetwork)
#
# IPNode
#
class IPNode(Node):
"""
An IPNode is a Node where the address is an Address that has an address
tuple and a broadcast tuple that would be used for socket communications.
"""
def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None):
if _debug: IPNode._debug("__init__ %r lan=%r", addr, lan)
# make sure it's an Address that has appropriate pieces
if not isinstance(addr, Address) or (not hasattr(addr, 'addrTuple')) \
or (not hasattr(addr, 'addrBroadcastTuple')):
raise ValueError("malformed address")
# save the address information
self.addrTuple = addr.addrTuple
self.addrBroadcastTuple = addr.addrBroadcastTuple
# continue initializing
Node.__init__(self, addr.addrTuple, lan=lan, promiscuous=promiscuous, spoofing=spoofing, sid=sid)
bacpypes_debugging(IPNode)
#
# IPRouterNode
#
class IPRouterNode(Client):
def __init__(self, router, addr, lan=None):
if _debug: IPRouterNode._debug("__init__ %r %r lan=%r", router, addr, lan)
# save the reference to the router
self.router = router
# make ourselves an IPNode and bind to it
self.node = IPNode(addr, lan=lan, promiscuous=True, spoofing=True)
bind(self, self.node)
# save our mask and subnet
self.addrMask = addr.addrMask
self.addrSubnet = addr.addrSubnet
def confirmation(self, pdu):
if _debug: IPRouterNode._debug("confirmation %r", pdu)
self.router.process_pdu(self, pdu)
def process_pdu(self, pdu):
if _debug: IPRouterNode._debug("process_pdu %r", pdu)
# pass it downstream
self.request(pdu)
bacpypes_debugging(IPRouterNode)
#
# IPRouter
#
class IPRouter:
def __init__(self):
if _debug: IPRouter._debug("__init__")
# connected network nodes
self.nodes = []
def add_network(self, addr, lan):
if _debug: IPRouter._debug("add_network %r %r", addr, lan)
node = IPRouterNode(self, addr, lan)
if _debug: IPRouter._debug(" - node: %r", node)
self.nodes.append(node)
def process_pdu(self, node, pdu):
if _debug: IPRouter._debug("process_pdu %r %r", node, pdu)
# unpack the address part of the destination
addrstr = socket.inet_aton(pdu.pduDestination[0])
ipaddr = struct.unpack('!L', addrstr)[0]
if _debug: IPRouter._debug(" - ipaddr: %r", ipaddr)
# loop through the other nodes
for inode in self.nodes:
if inode is not node:
if (ipaddr & inode.addrMask) == inode.addrSubnet:
if _debug: IPRouter._debug(" - inode: %r", inode)
inode.process_pdu(pdu)
bacpypes_debugging(IPRouter)