diff --git a/py25/bacpypes/bvll.py b/py25/bacpypes/bvll.py index a848c65..2346e7e 100755 --- a/py25/bacpypes/bvll.py +++ b/py25/bacpypes/bvll.py @@ -224,7 +224,7 @@ class WriteBroadcastDistributionTable(BVLPDU): BVLCI.update(bvlpdu, self) for bdte in self.bvlciBDT: bvlpdu.put_data( bdte.addrAddr ) - bvlpdu.put_data( bdte.addrMask ) + bvlpdu.put_long( bdte.addrMask ) def decode(self, bvlpdu): BVLCI.update(self, bvlpdu) @@ -401,6 +401,11 @@ class FDTEntry(DebugContents): self.fdTTL = None self.fdRemain = None + def __eq__(self, other): + """Return true iff entries are identical.""" + return (self.fdAddress == other.fdAddress) and \ + (self.fdTTL == other.fdTTL) and (self.fdRemain == other.fdRemain) + def bvlpdu_contents(self, use_dict=None, as_class=dict): """Return the contents of an object as a dict.""" # make/extend the dictionary of content diff --git a/py25/bacpypes/iocb.py b/py25/bacpypes/iocb.py index 1494476..a57bce2 100644 --- a/py25/bacpypes/iocb.py +++ b/py25/bacpypes/iocb.py @@ -72,7 +72,7 @@ _identLock = threading.Lock() class IOCB(DebugContents): - _debugContents = \ + _debug_contents = \ ( 'args', 'kwargs' , 'ioState', 'ioResponse-', 'ioError' , 'ioController', 'ioServerRef', 'ioControllerRef', 'ioClientID', 'ioClientAddr' @@ -227,7 +227,7 @@ bacpypes_debugging(IOCB) class IOChainMixIn(DebugContents): - _debugContents = ( 'ioChain++', ) + _debug_contents = ( 'ioChain++', ) def __init__(self, iocb): if _debug: IOChainMixIn._debug("__init__ %r", iocb) @@ -366,7 +366,7 @@ bacpypes_debugging(IOChain) class IOGroup(IOCB, DebugContents): - _debugContents = ('ioMembers',) + _debug_contents = ('ioMembers',) def __init__(self): """Initialize a group.""" @@ -704,7 +704,7 @@ class IOQController(IOController): # if we're busy, queue it if (self.state != CTRL_IDLE): - if _debug: IOQController._debug(" - busy, request queued") + if _debug: IOQController._debug(" - busy, request queued, active_iocb: %r", self.active_iocb) iocb.ioState = PENDING self.ioQueue.put(iocb) @@ -719,6 +719,7 @@ class IOQController(IOController): except: # extract the error err = sys.exc_info()[1] + if _debug: IOQController._debug(" - process_io() exception: %r", err) # if there was an error, abort the request if err: @@ -871,7 +872,7 @@ class ClientController(Client, IOQController): self.request(iocb.args[0]) def confirmation(self, pdu): - if _debug: ClientController._debug("confirmation %r %r", args, kwargs) + if _debug: ClientController._debug("confirmation %r", pdu) # make sure it has an active iocb if not self.active_iocb: @@ -917,13 +918,18 @@ bacpypes_debugging(SieveQueue) class SieveClientController(Client, IOController): - def __init__(self): + def __init__(self, queue_class=SieveQueue): if _debug: SieveClientController._debug("__init__") Client.__init__(self) IOController.__init__(self) + # make sure it's the correct class + if not issubclass(queue_class, SieveQueue): + raise TypeError("queue class must be a subclass of SieveQueue") + # queues for each address self.queues = {} + self.queue_class = queue_class def process_io(self, iocb): if _debug: SieveClientController._debug("process_io %r", iocb) @@ -936,7 +942,7 @@ class SieveClientController(Client, IOController): queue = self.queues.get(destination_address, None) if not queue: if _debug: SieveClientController._debug(" - new queue") - queue = SieveQueue(self.request, destination_address) + queue = self.queue_class(self.request, destination_address) self.queues[destination_address] = queue if _debug: SieveClientController._debug(" - queue: %r", queue) diff --git a/py25/bacpypes/primitivedata.py b/py25/bacpypes/primitivedata.py index 363672f..1335041 100755 --- a/py25/bacpypes/primitivedata.py +++ b/py25/bacpypes/primitivedata.py @@ -618,7 +618,7 @@ class Unsigned(Atomic): @classmethod def is_valid(cls, arg): """Return True if arg is valid value for the class.""" - return isinstance(arg, (int, long)) and (arg >= 0) + return isinstance(arg, (int, long)) and (not isinstance(arg, bool)) and (arg >= 0) def __str__(self): return "Unsigned(%s)" % (self.value, ) @@ -693,7 +693,7 @@ class Integer(Atomic): @classmethod def is_valid(cls, arg): """Return True if arg is valid value for the class.""" - return isinstance(arg, (int, long)) + return isinstance(arg, (int, long)) and (not isinstance(arg, bool)) def __str__(self): return "Integer(%s)" % (self.value, ) diff --git a/py25/bacpypes/vlan.py b/py25/bacpypes/vlan.py index cc9d8ab..731d72e 100755 --- a/py25/bacpypes/vlan.py +++ b/py25/bacpypes/vlan.py @@ -5,6 +5,8 @@ Virtual Local Area Network """ import random +import socket +import struct from copy import deepcopy from .errors import ConfigurationError @@ -12,7 +14,7 @@ from .debugging import ModuleLogger, bacpypes_debugging from .core import deferred from .pdu import Address -from .comm import Server +from .comm import Client, Server, bind # some debugging _debug = 0 @@ -24,11 +26,13 @@ _log = ModuleLogger(globals()) class Network: - def __init__(self, dropPercent=0.0): - if _debug: Network._debug("__init__ dropPercent=%r", dropPercent) + def __init__(self, name='', broadcast_address=None, drop_percent=0.0): + if _debug: Network._debug("__init__ name=%r broadcast_address=%r drop_percent=%r", name, broadcast_address, drop_percent) + self.name = name self.nodes = [] - self.dropPercent = dropPercent + self.broadcast_address = broadcast_address + self.drop_percent = drop_percent def add_node(self, node): """ Add a node to this network, let the node know which network it's on. """ @@ -37,6 +41,10 @@ class Network: self.nodes.append(node) node.lan = self + # update the node name + if not node.name: + node.name = '%s:%s' % (self.name, node.address) + def remove_node(self, node): """ Remove a node from this network. """ if _debug: Network._debug("remove_node %r", node) @@ -48,29 +56,25 @@ class Network: """ Process a PDU by sending a copy to each node as dictated by the addressing and if a node is promiscuous. """ - if _debug: Network._debug("process_pdu %r", pdu) + if _debug: Network._debug("[%s]process_pdu %r", self.name, pdu) - if self.dropPercent != 0.0: - if (random.random() * 100.0) < self.dropPercent: + # randomly drop a packet + if self.drop_percent != 0.0: + if (random.random() * 100.0) < self.drop_percent: if _debug: Network._debug(" - packet dropped") return - if not pdu.pduDestination or not isinstance(pdu.pduDestination, Address): - raise RuntimeError("invalid destination address") - - elif pdu.pduDestination.addrType == Address.localBroadcastAddr: + if pdu.pduDestination == self.broadcast_address: for n in self.nodes: if (pdu.pduSource != n.address): + if _debug: Network._debug(" - match: %r", n) n.response(deepcopy(pdu)) - - elif pdu.pduDestination.addrType == Address.localStationAddr: + else: for n in self.nodes: if n.promiscuous or (pdu.pduDestination == n.address): + if _debug: Network._debug(" - match: %r", n) n.response(deepcopy(pdu)) - else: - raise RuntimeError("invalid destination address type") - def __len__(self): """ Simple way to determine the number of nodes in the network. """ if _debug: Network._debug("__len__") @@ -84,21 +88,19 @@ bacpypes_debugging(Network) class Node(Server): - def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None): + def __init__(self, addr, lan=None, name='', promiscuous=False, spoofing=False, sid=None): if _debug: - Node._debug("__init__ %r lan=%r promiscuous=%r spoofing=%r sid=%r", - addr, lan, promiscuous, spoofing, sid + Node._debug("__init__ %r lan=%r name=%r, promiscuous=%r spoofing=%r sid=%r", + addr, lan, name, promiscuous, spoofing, sid ) Server.__init__(self, sid) - if not isinstance(addr, Address): - raise TypeError("addr must be an address") - self.lan = None self.address = addr + self.name = name # bind to a lan if it was provided - if lan: + if lan is not None: self.bind(lan) # might receive all packets and might spoof @@ -113,7 +115,7 @@ class Node(Server): def indication(self, pdu): """Send a message.""" - if _debug: Node._debug("indication %r", pdu) + if _debug: Node._debug("[%s]indication %r", self.name, pdu) # make sure we're connected if not self.lan: @@ -130,3 +132,133 @@ class Node(Server): deferred(self.lan.process_pdu, pdu) bacpypes_debugging(Node) + +# +# IPNetwork +# + +class IPNetwork(Network): + + """ + IPNetwork instances are Network objects where the addresses on the + network are tuples that would be used for sockets like ('1.2.3.4', 5). + The first node added to the network sets the broadcast address, like + ('1.2.3.255', 5) and the other nodes must have the same tuple. + """ + + def __init__(self): + if _debug: IPNetwork._debug("__init__") + Network.__init__(self) + + def add_node(self, node): + if _debug: IPNetwork._debug("add_node %r", node) + + # first node sets the broadcast tuple, other nodes much match + if not self.nodes: + self.broadcast_address = node.addrBroadcastTuple + elif node.addrBroadcastTuple != self.broadcast_address: + raise ValueError("nodes must all have the same broadcast tuple") + + # continue along + Network.add_node(self, node) + +bacpypes_debugging(IPNetwork) + +# +# IPNode +# + +class IPNode(Node): + + """ + An IPNode is a Node where the address is an Address that has an address + tuple and a broadcast tuple that would be used for socket communications. + """ + + def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None): + if _debug: IPNode._debug("__init__ %r lan=%r", addr, lan) + + # make sure it's an Address that has appropriate pieces + if not isinstance(addr, Address) or (not hasattr(addr, 'addrTuple')) \ + or (not hasattr(addr, 'addrBroadcastTuple')): + raise ValueError("malformed address") + + # save the address information + self.addrTuple = addr.addrTuple + self.addrBroadcastTuple = addr.addrBroadcastTuple + + # continue initializing + Node.__init__(self, addr.addrTuple, lan=lan, promiscuous=promiscuous, spoofing=spoofing, sid=sid) + +bacpypes_debugging(IPNode) + +# +# IPRouterNode +# + +class IPRouterNode(Client): + + def __init__(self, router, addr, lan=None): + if _debug: IPRouterNode._debug("__init__ %r %r lan=%r", router, addr, lan) + + # save the reference to the router + self.router = router + + # make ourselves an IPNode and bind to it + self.node = IPNode(addr, lan=lan, promiscuous=True, spoofing=True) + bind(self, self.node) + + # save our mask and subnet + self.addrMask = addr.addrMask + self.addrSubnet = addr.addrSubnet + + def confirmation(self, pdu): + if _debug: IPRouterNode._debug("confirmation %r", pdu) + + self.router.process_pdu(self, pdu) + + def process_pdu(self, pdu): + if _debug: IPRouterNode._debug("process_pdu %r", pdu) + + # pass it downstream + self.request(pdu) + +bacpypes_debugging(IPRouterNode) + +# +# IPRouter +# + +class IPRouter: + + def __init__(self): + if _debug: IPRouter._debug("__init__") + + # connected network nodes + self.nodes = [] + + def add_network(self, addr, lan): + if _debug: IPRouter._debug("add_network %r %r", addr, lan) + + node = IPRouterNode(self, addr, lan) + if _debug: IPRouter._debug(" - node: %r", node) + + self.nodes.append(node) + + def process_pdu(self, node, pdu): + if _debug: IPRouter._debug("process_pdu %r %r", node, pdu) + + # unpack the address part of the destination + addrstr = socket.inet_aton(pdu.pduDestination[0]) + ipaddr = struct.unpack('!L', addrstr)[0] + if _debug: IPRouter._debug(" - ipaddr: %r", ipaddr) + + # loop through the other nodes + for inode in self.nodes: + if inode is not node: + if (ipaddr & inode.addrMask) == inode.addrSubnet: + if _debug: IPRouter._debug(" - inode: %r", inode) + inode.process_pdu(pdu) + +bacpypes_debugging(IPRouter) + diff --git a/py27/bacpypes/bvll.py b/py27/bacpypes/bvll.py index f4a68b4..30573b7 100755 --- a/py27/bacpypes/bvll.py +++ b/py27/bacpypes/bvll.py @@ -221,7 +221,7 @@ class WriteBroadcastDistributionTable(BVLPDU): BVLCI.update(bvlpdu, self) for bdte in self.bvlciBDT: bvlpdu.put_data( bdte.addrAddr ) - bvlpdu.put_data( bdte.addrMask ) + bvlpdu.put_long( bdte.addrMask ) def decode(self, bvlpdu): BVLCI.update(self, bvlpdu) @@ -398,6 +398,11 @@ class FDTEntry(DebugContents): self.fdTTL = None self.fdRemain = None + def __eq__(self, other): + """Return true iff entries are identical.""" + return (self.fdAddress == other.fdAddress) and \ + (self.fdTTL == other.fdTTL) and (self.fdRemain == other.fdRemain) + def bvlpdu_contents(self, use_dict=None, as_class=dict): """Return the contents of an object as a dict.""" # make/extend the dictionary of content diff --git a/py27/bacpypes/iocb.py b/py27/bacpypes/iocb.py index 8d86a00..15908cc 100644 --- a/py27/bacpypes/iocb.py +++ b/py27/bacpypes/iocb.py @@ -73,7 +73,7 @@ _identLock = threading.Lock() @bacpypes_debugging class IOCB(DebugContents): - _debugContents = \ + _debug_contents = \ ( 'args', 'kwargs' , 'ioState', 'ioResponse-', 'ioError' , 'ioController', 'ioServerRef', 'ioControllerRef', 'ioClientID', 'ioClientAddr' @@ -227,7 +227,7 @@ class IOCB(DebugContents): @bacpypes_debugging class IOChainMixIn(DebugContents): - _debugContents = ( 'ioChain++', ) + _debug_contents = ( 'ioChain++', ) def __init__(self, iocb): if _debug: IOChainMixIn._debug("__init__ %r", iocb) @@ -364,7 +364,7 @@ class IOChain(IOCB, IOChainMixIn): @bacpypes_debugging class IOGroup(IOCB, DebugContents): - _debugContents = ('ioMembers',) + _debug_contents = ('ioMembers',) def __init__(self): """Initialize a group.""" @@ -699,7 +699,7 @@ class IOQController(IOController): # if we're busy, queue it if (self.state != CTRL_IDLE): - if _debug: IOQController._debug(" - busy, request queued") + if _debug: IOQController._debug(" - busy, request queued, active_iocb: %r", self.active_iocb) iocb.ioState = PENDING self.ioQueue.put(iocb) @@ -714,6 +714,7 @@ class IOQController(IOController): except: # extract the error err = sys.exc_info()[1] + if _debug: IOQController._debug(" - process_io() exception: %r", err) # if there was an error, abort the request if err: @@ -865,7 +866,7 @@ class ClientController(Client, IOQController): self.request(iocb.args[0]) def confirmation(self, pdu): - if _debug: ClientController._debug("confirmation %r %r", args, kwargs) + if _debug: ClientController._debug("confirmation %r", pdu) # make sure it has an active iocb if not self.active_iocb: @@ -909,13 +910,18 @@ class SieveQueue(IOQController): @bacpypes_debugging class SieveClientController(Client, IOController): - def __init__(self): + def __init__(self, queue_class=SieveQueue): if _debug: SieveClientController._debug("__init__") Client.__init__(self) IOController.__init__(self) + # make sure it's the correct class + if not issubclass(queue_class, SieveQueue): + raise TypeError("queue class must be a subclass of SieveQueue") + # queues for each address self.queues = {} + self.queue_class = queue_class def process_io(self, iocb): if _debug: SieveClientController._debug("process_io %r", iocb) @@ -928,7 +934,7 @@ class SieveClientController(Client, IOController): queue = self.queues.get(destination_address, None) if not queue: if _debug: SieveClientController._debug(" - new queue") - queue = SieveQueue(self.request, destination_address) + queue = self.queue_class(self.request, destination_address) self.queues[destination_address] = queue if _debug: SieveClientController._debug(" - queue: %r", queue) diff --git a/py27/bacpypes/primitivedata.py b/py27/bacpypes/primitivedata.py index 474bd39..1047f43 100755 --- a/py27/bacpypes/primitivedata.py +++ b/py27/bacpypes/primitivedata.py @@ -622,7 +622,7 @@ class Unsigned(Atomic): @classmethod def is_valid(cls, arg): """Return True if arg is valid value for the class.""" - return isinstance(arg, (int, long)) and (arg >= 0) + return isinstance(arg, (int, long)) and (not isinstance(arg, bool)) and (arg >= 0) def __str__(self): return "Unsigned(%s)" % (self.value, ) @@ -697,7 +697,7 @@ class Integer(Atomic): @classmethod def is_valid(cls, arg): """Return True if arg is valid value for the class.""" - return isinstance(arg, (int, long)) + return isinstance(arg, (int, long)) and (not isinstance(arg, bool)) def __str__(self): return "Integer(%s)" % (self.value, ) diff --git a/py27/bacpypes/vlan.py b/py27/bacpypes/vlan.py index 855e6ff..5fc50b5 100755 --- a/py27/bacpypes/vlan.py +++ b/py27/bacpypes/vlan.py @@ -5,6 +5,8 @@ Virtual Local Area Network """ import random +import socket +import struct from copy import deepcopy from .errors import ConfigurationError @@ -12,7 +14,7 @@ from .debugging import ModuleLogger, bacpypes_debugging from .core import deferred from .pdu import Address -from .comm import Server +from .comm import Client, Server, bind # some debugging _debug = 0 @@ -25,11 +27,13 @@ _log = ModuleLogger(globals()) @bacpypes_debugging class Network: - def __init__(self, dropPercent=0.0): - if _debug: Network._debug("__init__ dropPercent=%r", dropPercent) + def __init__(self, name='', broadcast_address=None, drop_percent=0.0): + if _debug: Network._debug("__init__ name=%r broadcast_address=%r drop_percent=%r", name, broadcast_address, drop_percent) + self.name = name self.nodes = [] - self.dropPercent = dropPercent + self.broadcast_address = broadcast_address + self.drop_percent = drop_percent def add_node(self, node): """ Add a node to this network, let the node know which network it's on. """ @@ -38,6 +42,10 @@ class Network: self.nodes.append(node) node.lan = self + # update the node name + if not node.name: + node.name = '%s:%s' % (self.name, node.address) + def remove_node(self, node): """ Remove a node from this network. """ if _debug: Network._debug("remove_node %r", node) @@ -49,29 +57,25 @@ class Network: """ Process a PDU by sending a copy to each node as dictated by the addressing and if a node is promiscuous. """ - if _debug: Network._debug("process_pdu %r", pdu) + if _debug: Network._debug("[%s]process_pdu %r", self.name, pdu) - if self.dropPercent != 0.0: - if (random.random() * 100.0) < self.dropPercent: + # randomly drop a packet + if self.drop_percent != 0.0: + if (random.random() * 100.0) < self.drop_percent: if _debug: Network._debug(" - packet dropped") return - if not pdu.pduDestination or not isinstance(pdu.pduDestination, Address): - raise RuntimeError("invalid destination address") - - elif pdu.pduDestination.addrType == Address.localBroadcastAddr: + if pdu.pduDestination == self.broadcast_address: for n in self.nodes: if (pdu.pduSource != n.address): + if _debug: Network._debug(" - match: %r", n) n.response(deepcopy(pdu)) - - elif pdu.pduDestination.addrType == Address.localStationAddr: + else: for n in self.nodes: if n.promiscuous or (pdu.pduDestination == n.address): + if _debug: Network._debug(" - match: %r", n) n.response(deepcopy(pdu)) - else: - raise RuntimeError("invalid destination address type") - def __len__(self): """ Simple way to determine the number of nodes in the network. """ if _debug: Network._debug("__len__") @@ -84,21 +88,19 @@ class Network: @bacpypes_debugging class Node(Server): - def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None): + def __init__(self, addr, lan=None, name='', promiscuous=False, spoofing=False, sid=None): if _debug: - Node._debug("__init__ %r lan=%r promiscuous=%r spoofing=%r sid=%r", - addr, lan, promiscuous, spoofing, sid + Node._debug("__init__ %r lan=%r name=%r, promiscuous=%r spoofing=%r sid=%r", + addr, lan, name, promiscuous, spoofing, sid ) Server.__init__(self, sid) - if not isinstance(addr, Address): - raise TypeError("addr must be an address") - self.lan = None self.address = addr + self.name = name # bind to a lan if it was provided - if lan: + if lan is not None: self.bind(lan) # might receive all packets and might spoof @@ -113,7 +115,7 @@ class Node(Server): def indication(self, pdu): """Send a message.""" - if _debug: Node._debug("indication %r", pdu) + if _debug: Node._debug("[%s]indication %r", self.name, pdu) # make sure we're connected if not self.lan: @@ -128,3 +130,132 @@ class Node(Server): # actual network delivery is deferred deferred(self.lan.process_pdu, pdu) + + +# +# IPNetwork +# + +@bacpypes_debugging +class IPNetwork(Network): + + """ + IPNetwork instances are Network objects where the addresses on the + network are tuples that would be used for sockets like ('1.2.3.4', 5). + The first node added to the network sets the broadcast address, like + ('1.2.3.255', 5) and the other nodes must have the same tuple. + """ + + def __init__(self): + if _debug: IPNetwork._debug("__init__") + Network.__init__(self) + + def add_node(self, node): + if _debug: IPNetwork._debug("add_node %r", node) + + # first node sets the broadcast tuple, other nodes much match + if not self.nodes: + self.broadcast_address = node.addrBroadcastTuple + elif node.addrBroadcastTuple != self.broadcast_address: + raise ValueError("nodes must all have the same broadcast tuple") + + # continue along + Network.add_node(self, node) + + +# +# IPNode +# + +@bacpypes_debugging +class IPNode(Node): + + """ + An IPNode is a Node where the address is an Address that has an address + tuple and a broadcast tuple that would be used for socket communications. + """ + + def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None): + if _debug: IPNode._debug("__init__ %r lan=%r", addr, lan) + + # make sure it's an Address that has appropriate pieces + if not isinstance(addr, Address) or (not hasattr(addr, 'addrTuple')) \ + or (not hasattr(addr, 'addrBroadcastTuple')): + raise ValueError("malformed address") + + # save the address information + self.addrTuple = addr.addrTuple + self.addrBroadcastTuple = addr.addrBroadcastTuple + + # continue initializing + Node.__init__(self, addr.addrTuple, lan=lan, promiscuous=promiscuous, spoofing=spoofing, sid=sid) + + +# +# IPRouterNode +# + +@bacpypes_debugging +class IPRouterNode(Client): + + def __init__(self, router, addr, lan=None): + if _debug: IPRouterNode._debug("__init__ %r %r lan=%r", router, addr, lan) + + # save the reference to the router + self.router = router + + # make ourselves an IPNode and bind to it + self.node = IPNode(addr, lan=lan, promiscuous=True, spoofing=True) + bind(self, self.node) + + # save our mask and subnet + self.addrMask = addr.addrMask + self.addrSubnet = addr.addrSubnet + + def confirmation(self, pdu): + if _debug: IPRouterNode._debug("confirmation %r", pdu) + + self.router.process_pdu(self, pdu) + + def process_pdu(self, pdu): + if _debug: IPRouterNode._debug("process_pdu %r", pdu) + + # pass it downstream + self.request(pdu) + +# +# IPRouter +# + +@bacpypes_debugging +class IPRouter: + + def __init__(self): + if _debug: IPRouter._debug("__init__") + + # connected network nodes + self.nodes = [] + + def add_network(self, addr, lan): + if _debug: IPRouter._debug("add_network %r %r", addr, lan) + + node = IPRouterNode(self, addr, lan) + if _debug: IPRouter._debug(" - node: %r", node) + + self.nodes.append(node) + + def process_pdu(self, node, pdu): + if _debug: IPRouter._debug("process_pdu %r %r", node, pdu) + + # unpack the address part of the destination + addrstr = socket.inet_aton(pdu.pduDestination[0]) + ipaddr = struct.unpack('!L', addrstr)[0] + if _debug: IPRouter._debug(" - ipaddr: %r", ipaddr) + + # loop through the other nodes + for inode in self.nodes: + if inode is not node: + if (ipaddr & inode.addrMask) == inode.addrSubnet: + if _debug: IPRouter._debug(" - inode: %r", inode) + inode.process_pdu(pdu) + diff --git a/py34/bacpypes/bvll.py b/py34/bacpypes/bvll.py index f4a68b4..30573b7 100755 --- a/py34/bacpypes/bvll.py +++ b/py34/bacpypes/bvll.py @@ -221,7 +221,7 @@ class WriteBroadcastDistributionTable(BVLPDU): BVLCI.update(bvlpdu, self) for bdte in self.bvlciBDT: bvlpdu.put_data( bdte.addrAddr ) - bvlpdu.put_data( bdte.addrMask ) + bvlpdu.put_long( bdte.addrMask ) def decode(self, bvlpdu): BVLCI.update(self, bvlpdu) @@ -398,6 +398,11 @@ class FDTEntry(DebugContents): self.fdTTL = None self.fdRemain = None + def __eq__(self, other): + """Return true iff entries are identical.""" + return (self.fdAddress == other.fdAddress) and \ + (self.fdTTL == other.fdTTL) and (self.fdRemain == other.fdRemain) + def bvlpdu_contents(self, use_dict=None, as_class=dict): """Return the contents of an object as a dict.""" # make/extend the dictionary of content diff --git a/py34/bacpypes/iocb.py b/py34/bacpypes/iocb.py index 8d86a00..c2a2c38 100644 --- a/py34/bacpypes/iocb.py +++ b/py34/bacpypes/iocb.py @@ -73,7 +73,7 @@ _identLock = threading.Lock() @bacpypes_debugging class IOCB(DebugContents): - _debugContents = \ + _debug_contents = \ ( 'args', 'kwargs' , 'ioState', 'ioResponse-', 'ioError' , 'ioController', 'ioServerRef', 'ioControllerRef', 'ioClientID', 'ioClientAddr' @@ -227,7 +227,7 @@ class IOCB(DebugContents): @bacpypes_debugging class IOChainMixIn(DebugContents): - _debugContents = ( 'ioChain++', ) + _debug_contents = ( 'ioChain++', ) def __init__(self, iocb): if _debug: IOChainMixIn._debug("__init__ %r", iocb) @@ -364,7 +364,7 @@ class IOChain(IOCB, IOChainMixIn): @bacpypes_debugging class IOGroup(IOCB, DebugContents): - _debugContents = ('ioMembers',) + _debug_contents = ('ioMembers',) def __init__(self): """Initialize a group.""" @@ -699,7 +699,7 @@ class IOQController(IOController): # if we're busy, queue it if (self.state != CTRL_IDLE): - if _debug: IOQController._debug(" - busy, request queued") + if _debug: IOQController._debug(" - busy, request queued, active_iocb: %r", self.active_iocb) iocb.ioState = PENDING self.ioQueue.put(iocb) @@ -714,9 +714,11 @@ class IOQController(IOController): except: # extract the error err = sys.exc_info()[1] + if _debug: IOQController._debug(" - process_io() exception: %r", err) # if there was an error, abort the request if err: + if _debug: IOQController._debug(" - aborting") self.abort_io(iocb, err) def process_io(self, iocb): @@ -865,7 +867,7 @@ class ClientController(Client, IOQController): self.request(iocb.args[0]) def confirmation(self, pdu): - if _debug: ClientController._debug("confirmation %r %r", args, kwargs) + if _debug: ClientController._debug("confirmation %r", pdu) # make sure it has an active iocb if not self.active_iocb: @@ -909,13 +911,18 @@ class SieveQueue(IOQController): @bacpypes_debugging class SieveClientController(Client, IOController): - def __init__(self): + def __init__(self, queue_class=SieveQueue): if _debug: SieveClientController._debug("__init__") Client.__init__(self) IOController.__init__(self) + # make sure it's the correct class + if not issubclass(queue_class, SieveQueue): + raise TypeError("queue class must be a subclass of SieveQueue") + # queues for each address self.queues = {} + self.queue_class = queue_class def process_io(self, iocb): if _debug: SieveClientController._debug("process_io %r", iocb) @@ -928,7 +935,7 @@ class SieveClientController(Client, IOController): queue = self.queues.get(destination_address, None) if not queue: if _debug: SieveClientController._debug(" - new queue") - queue = SieveQueue(self.request, destination_address) + queue = self.queue_class(self.request, destination_address) self.queues[destination_address] = queue if _debug: SieveClientController._debug(" - queue: %r", queue) diff --git a/py34/bacpypes/primitivedata.py b/py34/bacpypes/primitivedata.py index 66aefba..d7d1306 100755 --- a/py34/bacpypes/primitivedata.py +++ b/py34/bacpypes/primitivedata.py @@ -640,7 +640,7 @@ class Unsigned(Atomic): @classmethod def is_valid(cls, arg): """Return True if arg is valid value for the class.""" - return isinstance(arg, (int, long)) and (arg >= 0) + return isinstance(arg, int) and (not isinstance(arg, bool)) and (arg >= 0) def __str__(self): return "Unsigned(%s)" % (self.value, ) @@ -713,7 +713,7 @@ class Integer(Atomic): @classmethod def is_valid(cls, arg): """Return True if arg is valid value for the class.""" - return isinstance(arg, int) + return isinstance(arg, int) and (not isinstance(arg, bool)) def __str__(self): return "Integer(%s)" % (self.value, ) diff --git a/py34/bacpypes/vlan.py b/py34/bacpypes/vlan.py index 855e6ff..5fc50b5 100755 --- a/py34/bacpypes/vlan.py +++ b/py34/bacpypes/vlan.py @@ -5,6 +5,8 @@ Virtual Local Area Network """ import random +import socket +import struct from copy import deepcopy from .errors import ConfigurationError @@ -12,7 +14,7 @@ from .debugging import ModuleLogger, bacpypes_debugging from .core import deferred from .pdu import Address -from .comm import Server +from .comm import Client, Server, bind # some debugging _debug = 0 @@ -25,11 +27,13 @@ _log = ModuleLogger(globals()) @bacpypes_debugging class Network: - def __init__(self, dropPercent=0.0): - if _debug: Network._debug("__init__ dropPercent=%r", dropPercent) + def __init__(self, name='', broadcast_address=None, drop_percent=0.0): + if _debug: Network._debug("__init__ name=%r broadcast_address=%r drop_percent=%r", name, broadcast_address, drop_percent) + self.name = name self.nodes = [] - self.dropPercent = dropPercent + self.broadcast_address = broadcast_address + self.drop_percent = drop_percent def add_node(self, node): """ Add a node to this network, let the node know which network it's on. """ @@ -38,6 +42,10 @@ class Network: self.nodes.append(node) node.lan = self + # update the node name + if not node.name: + node.name = '%s:%s' % (self.name, node.address) + def remove_node(self, node): """ Remove a node from this network. """ if _debug: Network._debug("remove_node %r", node) @@ -49,29 +57,25 @@ class Network: """ Process a PDU by sending a copy to each node as dictated by the addressing and if a node is promiscuous. """ - if _debug: Network._debug("process_pdu %r", pdu) + if _debug: Network._debug("[%s]process_pdu %r", self.name, pdu) - if self.dropPercent != 0.0: - if (random.random() * 100.0) < self.dropPercent: + # randomly drop a packet + if self.drop_percent != 0.0: + if (random.random() * 100.0) < self.drop_percent: if _debug: Network._debug(" - packet dropped") return - if not pdu.pduDestination or not isinstance(pdu.pduDestination, Address): - raise RuntimeError("invalid destination address") - - elif pdu.pduDestination.addrType == Address.localBroadcastAddr: + if pdu.pduDestination == self.broadcast_address: for n in self.nodes: if (pdu.pduSource != n.address): + if _debug: Network._debug(" - match: %r", n) n.response(deepcopy(pdu)) - - elif pdu.pduDestination.addrType == Address.localStationAddr: + else: for n in self.nodes: if n.promiscuous or (pdu.pduDestination == n.address): + if _debug: Network._debug(" - match: %r", n) n.response(deepcopy(pdu)) - else: - raise RuntimeError("invalid destination address type") - def __len__(self): """ Simple way to determine the number of nodes in the network. """ if _debug: Network._debug("__len__") @@ -84,21 +88,19 @@ class Network: @bacpypes_debugging class Node(Server): - def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None): + def __init__(self, addr, lan=None, name='', promiscuous=False, spoofing=False, sid=None): if _debug: - Node._debug("__init__ %r lan=%r promiscuous=%r spoofing=%r sid=%r", - addr, lan, promiscuous, spoofing, sid + Node._debug("__init__ %r lan=%r name=%r, promiscuous=%r spoofing=%r sid=%r", + addr, lan, name, promiscuous, spoofing, sid ) Server.__init__(self, sid) - if not isinstance(addr, Address): - raise TypeError("addr must be an address") - self.lan = None self.address = addr + self.name = name # bind to a lan if it was provided - if lan: + if lan is not None: self.bind(lan) # might receive all packets and might spoof @@ -113,7 +115,7 @@ class Node(Server): def indication(self, pdu): """Send a message.""" - if _debug: Node._debug("indication %r", pdu) + if _debug: Node._debug("[%s]indication %r", self.name, pdu) # make sure we're connected if not self.lan: @@ -128,3 +130,132 @@ class Node(Server): # actual network delivery is deferred deferred(self.lan.process_pdu, pdu) + + +# +# IPNetwork +# + +@bacpypes_debugging +class IPNetwork(Network): + + """ + IPNetwork instances are Network objects where the addresses on the + network are tuples that would be used for sockets like ('1.2.3.4', 5). + The first node added to the network sets the broadcast address, like + ('1.2.3.255', 5) and the other nodes must have the same tuple. + """ + + def __init__(self): + if _debug: IPNetwork._debug("__init__") + Network.__init__(self) + + def add_node(self, node): + if _debug: IPNetwork._debug("add_node %r", node) + + # first node sets the broadcast tuple, other nodes much match + if not self.nodes: + self.broadcast_address = node.addrBroadcastTuple + elif node.addrBroadcastTuple != self.broadcast_address: + raise ValueError("nodes must all have the same broadcast tuple") + + # continue along + Network.add_node(self, node) + + +# +# IPNode +# + +@bacpypes_debugging +class IPNode(Node): + + """ + An IPNode is a Node where the address is an Address that has an address + tuple and a broadcast tuple that would be used for socket communications. + """ + + def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None): + if _debug: IPNode._debug("__init__ %r lan=%r", addr, lan) + + # make sure it's an Address that has appropriate pieces + if not isinstance(addr, Address) or (not hasattr(addr, 'addrTuple')) \ + or (not hasattr(addr, 'addrBroadcastTuple')): + raise ValueError("malformed address") + + # save the address information + self.addrTuple = addr.addrTuple + self.addrBroadcastTuple = addr.addrBroadcastTuple + + # continue initializing + Node.__init__(self, addr.addrTuple, lan=lan, promiscuous=promiscuous, spoofing=spoofing, sid=sid) + + +# +# IPRouterNode +# + +@bacpypes_debugging +class IPRouterNode(Client): + + def __init__(self, router, addr, lan=None): + if _debug: IPRouterNode._debug("__init__ %r %r lan=%r", router, addr, lan) + + # save the reference to the router + self.router = router + + # make ourselves an IPNode and bind to it + self.node = IPNode(addr, lan=lan, promiscuous=True, spoofing=True) + bind(self, self.node) + + # save our mask and subnet + self.addrMask = addr.addrMask + self.addrSubnet = addr.addrSubnet + + def confirmation(self, pdu): + if _debug: IPRouterNode._debug("confirmation %r", pdu) + + self.router.process_pdu(self, pdu) + + def process_pdu(self, pdu): + if _debug: IPRouterNode._debug("process_pdu %r", pdu) + + # pass it downstream + self.request(pdu) + +# +# IPRouter +# + +@bacpypes_debugging +class IPRouter: + + def __init__(self): + if _debug: IPRouter._debug("__init__") + + # connected network nodes + self.nodes = [] + + def add_network(self, addr, lan): + if _debug: IPRouter._debug("add_network %r %r", addr, lan) + + node = IPRouterNode(self, addr, lan) + if _debug: IPRouter._debug(" - node: %r", node) + + self.nodes.append(node) + + def process_pdu(self, node, pdu): + if _debug: IPRouter._debug("process_pdu %r %r", node, pdu) + + # unpack the address part of the destination + addrstr = socket.inet_aton(pdu.pduDestination[0]) + ipaddr = struct.unpack('!L', addrstr)[0] + if _debug: IPRouter._debug(" - ipaddr: %r", ipaddr) + + # loop through the other nodes + for inode in self.nodes: + if inode is not node: + if (ipaddr & inode.addrMask) == inode.addrSubnet: + if _debug: IPRouter._debug(" - inode: %r", inode) + inode.process_pdu(pdu) + diff --git a/sandbox/io.py b/sandbox/io.py index e2fdbe6..a6d98f6 100644 --- a/sandbox/io.py +++ b/sandbox/io.py @@ -83,7 +83,7 @@ _identLock = threading.Lock() @bacpypes_debugging class IOCB(DebugContents): - _debugContents = \ + _debug_contents = \ ( 'args', 'kwargs' , 'ioState', 'ioResponse-', 'ioError' , 'ioController', 'ioServerRef', 'ioControllerRef', 'ioClientID', 'ioClientAddr' diff --git a/tests/__init__.py b/tests/__init__.py index 58de024..4a65594 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -20,3 +20,6 @@ from . import test_pdu from . import test_primitive_data from . import test_utilities from . import test_vlan + +from . import test_bvll + diff --git a/tests/state_machine.py b/tests/state_machine.py index f9fe04c..118a709 100755 --- a/tests/state_machine.py +++ b/tests/state_machine.py @@ -45,10 +45,18 @@ class SendTransition(Transition): class ReceiveTransition(Transition): - def __init__(self, pdu, next_state): + def __init__(self, criteria, next_state): Transition.__init__(self, next_state) - self.pdu = pdu + self.criteria = criteria + + +class EventTransition(Transition): + + def __init__(self, event_id, next_state): + Transition.__init__(self, next_state) + + self.event_id = event_id class TimeoutTransition(Transition): @@ -59,6 +67,36 @@ class TimeoutTransition(Transition): self.timeout = timeout +# +# match_pdu +# + +@bacpypes_debugging +def match_pdu(pdu, pdu_type=None, **pdu_attrs): + if _debug: match_pdu._debug("match_pdu %r %r %r", pdu, pdu_type, pdu_attrs) + + # check the type + if pdu_type and not isinstance(pdu, pdu_type): + if _debug: match_pdu._debug(" - wrong type") + return False + + # check for matching attribute values + for attr_name, attr_value in pdu_attrs.items(): + if not hasattr(pdu, attr_name): + if _debug: match_pdu._debug(" - missing attr: %r", attr_name) + return False + if getattr(pdu, attr_name) != attr_value: + if _debug: StateMachine._debug(" - attr value: %r, %r", attr_name, attr_value) + return False + if _debug: match_pdu._debug(" - successful_match") + + return True + + +# +# State +# + @bacpypes_debugging class State(object): @@ -88,6 +126,11 @@ class State(object): self.send_transitions = [] self.receive_transitions = [] + # empty lists of event transitions + self.set_event_transitions = [] + self.clear_event_transitions = [] + self.wait_event_transitions = [] + # timeout transition self.timeout_transition = None @@ -215,14 +258,26 @@ class State(object): """Called after each PDU sent.""" self.state_machine.after_send(pdu) - def receive(self, pdu, next_state=None): + def receive(self, pdu_type, **pdu_attrs): """Create a ReceiveTransition from this state to another, possibly new, state. The next state is returned for method chaining. - :param pdu: PDU to match + :param criteria: PDU to match :param next_state: destination state after a successful match + + Simulate the function as if it was defined in Python3.5+ like this: + def receive(self, *pdu_type, next_state=None, **pdu_attrs) """ - if _debug: State._debug("receive(%s) %r next_state=%r", self.doc_string, pdu, next_state) + if _debug: State._debug("receive(%s) %r %r", self.doc_string, pdu_type, pdu_attrs) + + # extract the next_state keyword argument + if 'next_state' in pdu_attrs: + next_state = pdu_attrs['next_state'] + if _debug: State._debug(" - next_state: %r", next_state) + + del pdu_attrs['next_state'] + else: + next_state = None # maybe build a new state if not next_state: @@ -231,8 +286,12 @@ class State(object): elif next_state not in self.state_machine.states: raise ValueError("off the rails") + # create a bundle of the match criteria + criteria = (pdu_type, pdu_attrs) + if _debug: State._debug(" - criteria: %r", criteria) + # add this to the list of transitions - self.receive_transitions.append(ReceiveTransition(pdu, next_state)) + self.receive_transitions.append(ReceiveTransition(criteria, next_state)) # return the next state return next_state @@ -253,6 +312,60 @@ class State(object): # pass along to the state machine self.state_machine.unexpected_receive(pdu) + def set_event(self, event_id): + """Create an EventTransition for this state that sets an event. The + current state is returned for method chaining. + + :param event_id: event identifier + """ + if _debug: State._debug("set_event(%s) %r", self.doc_string, event_id) + + # add this to the list of transitions + self.set_event_transitions.append(EventTransition(event_id, None)) + + # return the next state + return self + + def event_set(self, event_id): + """Called with the event that was set.""" + pass + + def clear_event(self, event_id): + """Create an EventTransition for this state that clears an event. The + current state is returned for method chaining. + + :param event_id: event identifier + """ + if _debug: State._debug("clear_event(%s) %r", self.doc_string, event_id) + + # add this to the list of transitions + self.clear_event_transitions.append(EventTransition(event_id, None)) + + # return the next state + return next_state + + def wait_event(self, event_id, next_state=None): + """Create an EventTransition from this state to another, possibly new, + state. The next state is returned for method chaining. + + :param pdu: PDU to send + :param next_state: state to transition to after sending + """ + if _debug: State._debug("wait_event(%s) %r next_state=%r", self.doc_string, event_id, next_state) + + # maybe build a new state + if not next_state: + next_state = self.state_machine.new_state() + if _debug: State._debug(" - new next_state: %r", next_state) + elif next_state not in self.state_machine.states: + raise ValueError("off the rails") + + # add this to the list of transitions + self.wait_event_transitions.append(EventTransition(event_id, next_state)) + + # return the next state + return next_state + def timeout(self, delay, next_state=None): """Create a TimeoutTransition from this state to another, possibly new, state. There can only be one timeout transition per state. The next @@ -473,6 +586,18 @@ class StateMachine(object): # here we are current_state = self.current_state = state + # events are managed by a state machine group + if self.machine_group: + # setting events + for transition in current_state.set_event_transitions: + if _debug: StateMachine._debug(" - setting event: %r", transition.event_id) + self.machine_group.set_event(transition.event_id) + + # clearing events + for transition in current_state.clear_event_transitions: + if _debug: StateMachine._debug(" - clearing event: %r", transition.event_id) + self.machine_group.clear_event(transition.event_id) + # check for success state if current_state.is_success_state: if _debug: StateMachine._debug(" - success state") @@ -508,20 +633,39 @@ class StateMachine(object): # assume we can stay next_state = None + # events are managed by a state machine group + if self.machine_group: + # check to see if there are events that are already set + for transition in current_state.wait_event_transitions: + if _debug: StateMachine._debug(" - waiting event: %r", transition.event_id) + + # check for a transition + if transition.event_id in self.machine_group.events: + next_state = transition.next_state + if _debug: StateMachine._debug(" - next_state: %r", next_state) + + if next_state is not current_state: + break + else: + if _debug: StateMachine._debug(" - no events already set") + else: + if _debug: StateMachine._debug(" - not part of a group") + # send everything that needs to be sent - for transition in current_state.send_transitions: - if _debug: StateMachine._debug(" - sending: %r", transition) + if not next_state: + for transition in current_state.send_transitions: + if _debug: StateMachine._debug(" - sending: %r", transition) - current_state.before_send(transition.pdu) - self.send(transition.pdu) - current_state.after_send(transition.pdu) + current_state.before_send(transition.pdu) + self.send(transition.pdu) + current_state.after_send(transition.pdu) - # check for a transition - next_state = transition.next_state - if _debug: StateMachine._debug(" - next_state: %r", next_state) + # check for a transition + next_state = transition.next_state + if _debug: StateMachine._debug(" - next_state: %r", next_state) - if next_state is not current_state: - break + if next_state is not current_state: + break if not next_state: if _debug: StateMachine._debug(" - nowhere to go") @@ -552,10 +696,19 @@ class StateMachine(object): # add a reference to the pdu in the transaction log self.transaction_log.append(("<<<", pdu),) + def send(self, pdu): + raise NotImplementedError("send not implemented") + def after_send(self, pdu): """Called after each PDU sent.""" pass + def before_receive(self, pdu): + """Called with each PDU received before matching.""" + + # add a reference to the pdu in the transaction log + self.transaction_log.append((">>>", pdu),) + def receive(self, pdu): if _debug: StateMachine._debug("receive %r", pdu) @@ -581,7 +734,7 @@ class StateMachine(object): # look for a matching receive transition for transition in current_state.receive_transitions: - if self.match_pdu(pdu, transition.pdu): + if self.match_pdu(pdu, transition.criteria): if _debug: StateMachine._debug(" - match found") match_found = True @@ -606,12 +759,6 @@ class StateMachine(object): self.goto_state(next_state) - def before_receive(self, pdu): - """Called with each PDU received before matching.""" - - # add a reference to the pdu in the transaction log - self.transaction_log.append((">>>", pdu),) - def after_receive(self, pdu): """Called with PDU received after match.""" pass @@ -624,6 +771,49 @@ class StateMachine(object): # go to the unexpected receive state (failing) self.goto_state(self.unexpected_receive_state) + def event_set(self, event_id): + """Called by the state machine group when an event is set, the state + machine checks to see if it's waiting for the event and makes the + state transition if there is a match.""" + if _debug: StateMachine._debug("event_set %r", event_id) + + if not self.running: + if _debug: StateMachine._debug(" - not running") + return + + # check to see if we are transitioning + if self.state_transitioning: + if _debug: StateMachine._debug(" - transitioning") + return + if not self.current_state: + raise RuntimeError("no current state") + current_state = self.current_state + + match_found = False + + # look for a matching event transition + for transition in current_state.wait_event_transitions: + if transition.event_id == event_id: + if _debug: StateMachine._debug(" - match found") + match_found = True + + # let the state know this event was set + current_state.event_set(event_id) + + # check for a transition + next_state = transition.next_state + if _debug: StateMachine._debug(" - next_state: %r", next_state) + + if next_state is not current_state: + break + else: + if _debug: StateMachine._debug(" - going nowhere") + + if match_found and next_state is not current_state: + if _debug: StateMachine._debug(" - going") + + self.goto_state(next_state) + def state_timeout(self): if _debug: StateMachine._debug("state_timeout") @@ -644,13 +834,14 @@ class StateMachine(object): # go to the state specified self.goto_state(self.timeout_state) - def send(self, pdu): - raise NotImplementedError("send not implemented") + def match_pdu(self, pdu, criteria): + if _debug: StateMachine._debug("match_pdu %r %r", pdu, criteria) - def match_pdu(self, pdu, transition_pdu): - if _debug: StateMachine._debug("match_pdu %r %r", pdu, transition_pdu) + # separate the pdu_type and attributes to match + pdu_type, pdu_attrs = criteria - return pdu == transition_pdu + # pass along to the global function + return match_pdu(pdu, pdu_type, **pdu_attrs) def __repr__(self): if not self.running: @@ -698,6 +889,9 @@ class StateMachineGroup(object): self.is_success_state = None self.is_fail_state = None + # set of events that are set + self.events = set() + def append(self, state_machine): """Add a state machine to the end of the list of state machines.""" if _debug: StateMachineGroup._debug("append %r", state_machine) @@ -743,6 +937,32 @@ class StateMachineGroup(object): self.is_success_state = False self.is_fail_state = False + # events that are set + self.events = set() + + def set_event(self, event_id): + """Save an event as 'set' and pass it to the state machines to see + if they are in a state that is waiting for the event.""" + if _debug: StateMachineGroup._debug("set_event %r", event_id) + + self.events.add(event_id) + if _debug: StateMachineGroup._debug(" - event set") + + # pass along to each machine + for state_machine in self.state_machines: + if _debug: StateMachineGroup._debug(" - state_machine: %r", state_machine) + state_machine.event_set(event_id) + + def clear_event(self, event_id): + """Remove an event from the set of elements that are 'set'.""" + if _debug: StateMachineGroup._debug("clear_event %r", event_id) + + if event_id in self.events: + self.events.remove(event_id) + if _debug: StateMachineGroup._debug(" - event cleared") + else: + if _debug: StateMachineGroup._debug(" - noop") + def run(self): """Runs all the machines in the group.""" if _debug: StateMachineGroup._debug("run") @@ -810,8 +1030,8 @@ class StateMachineGroup(object): some_failed = some_failed or state_machine.current_state.is_fail_state if _debug: - StateMachineGroup._debug(" all_success: %r", all_success) - StateMachineGroup._debug(" some_failed: %r", some_failed) + StateMachineGroup._debug(" - all_success: %r", all_success) + StateMachineGroup._debug(" - some_failed: %r", some_failed) # return the results of the check return (all_success, some_failed) diff --git a/tests/test__template.py b/tests/test__template.py index 29c73c9..d170d09 100644 --- a/tests/test__template.py +++ b/tests/test__template.py @@ -17,43 +17,65 @@ _log = ModuleLogger(globals()) @bacpypes_debugging def setup_module(): + """This function is called once at the beginning of all of the tests + in this module.""" if _debug: setup_module._debug("setup_module") @bacpypes_debugging def teardown_module(): + """This function is called once at the end of the tests in this module.""" if _debug: teardown_module._debug("teardown_module") @bacpypes_debugging def setup_function(function): + """This function is called before each module level test function.""" if _debug: setup_function._debug("setup_function %r", function) @bacpypes_debugging def teardown_function(function): + """This function is called after each module level test function.""" if _debug: teardown_function._debug("teardown_function %r", function) +@bacpypes_debugging +def test_some_function(*args, **kwargs): + """This is a module level test function.""" + if _debug: setup_function._debug("test_some_function %r %r", args, kwargs) + + @bacpypes_debugging class TestCaseTemplate(unittest.TestCase): @classmethod def setup_class(cls): + """This function is called once before the test case is instantiated + for each of the tests.""" if _debug: TestCaseTemplate._debug("setup_class") @classmethod def teardown_class(cls): + """This function is called once at the end after the last instance + of the test case has been abandon.""" if _debug: TestCaseTemplate._debug("teardown_class") def setup_method(self, method): + """This function is called before each test method is called as is + given a reference to the test method.""" if _debug: TestCaseTemplate._debug("setup_method %r", method) def teardown_method(self, method): + """This function is called after each test method has been called and + is given a reference to the test method.""" if _debug: TestCaseTemplate._debug("teardown_method %r", method) def test_something(self): + """This is a method level test function.""" if _debug: TestCaseTemplate._debug("test_something") def test_something_else(self): + """This is another method level test function.""" if _debug: TestCaseTemplate._debug("test_something_else") + diff --git a/tests/test_bvll/__init__.py b/tests/test_bvll/__init__.py new file mode 100644 index 0000000..95546aa --- /dev/null +++ b/tests/test_bvll/__init__.py @@ -0,0 +1,8 @@ +#!/usr/bin/python + +""" +Test BVLL Module +""" + +from . import test_codec + diff --git a/tests/test_bvll/test_codec.py b/tests/test_bvll/test_codec.py new file mode 100644 index 0000000..22105a8 --- /dev/null +++ b/tests/test_bvll/test_codec.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Test BVLL Encoding and Decoding +------------------------------- +""" + +import string +import unittest + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger, btox, xtob + +from bacpypes.comm import bind +from bacpypes.pdu import PDU, Address, LocalBroadcast + +from bacpypes.bvll import Result, WriteBroadcastDistributionTable, \ + ReadBroadcastDistributionTable, ReadBroadcastDistributionTableAck, \ + ForwardedNPDU, RegisterForeignDevice, ReadForeignDeviceTable, \ + ReadForeignDeviceTableAck, FDTEntry, DeleteForeignDeviceTableEntry, \ + DistributeBroadcastToNetwork, OriginalUnicastNPDU, \ + OriginalBroadcastNPDU +from bacpypes.bvllservice import AnnexJCodec + +from ..trapped_classes import TrappedClient, TrappedServer +from ..state_machine import match_pdu + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + + +# extended form of xtob that first removes whitespace and period seperators +xxtob = lambda s: xtob(''.join(s.split()).replace('.', '')) + + +@bacpypes_debugging +class TestAnnexJCodec(unittest.TestCase): + + def setup_method(self, method): + """This function is called before each test method is called as is + given a reference to the test method.""" + if _debug: TestAnnexJCodec._debug("setup_method %r", method) + + # minature trapped stack + self.client = TrappedClient() + self.codec = AnnexJCodec() + self.server = TrappedServer() + bind(self.client, self.codec, self.server) + + def request(self, pdu): + """Pass the PDU to the client to send down the stack.""" + self.client.request(pdu) + + def indication(self, pdu_type=None, **pdu_attrs): + """Check what the server received.""" + assert match_pdu(self.server.indication_received, pdu_type, **pdu_attrs) + + def response(self, pdu): + """Pass the PDU to the server to send up the stack.""" + self.server.response(pdu) + + def confirmation(self, pdu_type=None, **pdu_attrs): + """Check what the client received.""" + assert match_pdu(self.client.confirmation_received, pdu_type, **pdu_attrs) + + def test_result(self): + """Test the Result encoding and decoding.""" + if _debug: TestAnnexJCodec._debug("test_codec_01") + + # Request successful + self.request(Result(0)) + self.indication(pduData=xtob('810000060000')) + + self.response(PDU(xtob('810000060000'))) + self.confirmation(Result, bvlciResultCode=0) + + # Request error condition + self.request(Result(1)) + self.indication(pduData=xtob('810000060001')) + + self.response(PDU(xtob('810000060001'))) + self.confirmation(Result, bvlciResultCode=1) + + def test_write_broadcast_distribution_table(self): + """Test the WriteBroadcastDistributionTable encoding and decoding.""" + if _debug: TestAnnexJCodec._debug("test_write_broadcast_distribution_table") + + # write an empty table + self.request(WriteBroadcastDistributionTable([])) + self.indication(pduData=xxtob('81.01.0004')) + + self.response(PDU(xxtob('81.01.0004'))) + self.confirmation(WriteBroadcastDistributionTable, bvlciBDT=[]) + + # write a table with an element + addr = Address('192.168.0.254/24') + pdu_bytes = xxtob('81.01.000e' + 'c0.a8.00.fe.ba.c0.ff.ff.ff.00' + ) + + self.request(WriteBroadcastDistributionTable([addr])) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) + self.confirmation(WriteBroadcastDistributionTable, bvlciBDT=[addr]) + + def test_read_broadcast_distribution_table(self): + """Test the ReadBroadcastDistributionTable encoding and decoding.""" + if _debug: TestAnnexJCodec._debug("test_read_broadcast_distribution_table") + + # read the table + self.request(ReadBroadcastDistributionTable()) + self.indication(pduData=xxtob('81.02.0004')) + + self.response(PDU(xxtob('81.02.0004'))) + self.confirmation(ReadBroadcastDistributionTable) + + def test_read_broadcast_distribution_table_ack(self): + """Test the ReadBroadcastDistributionTableAck encoding and decoding.""" + if _debug: TestAnnexJCodec._debug("test_read_broadcast_distribution_table_ack") + + # read returns an empty table + self.request(ReadBroadcastDistributionTableAck([])) + self.indication(pduData=xxtob('81.03.0004')) + + self.response(PDU(xxtob('81.03.0004'))) + self.confirmation(ReadBroadcastDistributionTableAck, bvlciBDT=[]) + + # read returns a table with an element + addr = Address('192.168.0.254/24') + pdu_bytes = xxtob('81.03.000e' + 'c0.a8.00.fe.ba.c0.ff.ff.ff.00' + ) + + self.request(ReadBroadcastDistributionTableAck([addr])) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) + self.confirmation(ReadBroadcastDistributionTableAck, bvlciBDT=[addr]) + + def test_forwarded_npdu(self): + """Test the ForwardedNPDU encoding and decoding.""" + if _debug: TestAnnexJCodec._debug("test_forwarded_npdu") + + # read returns a table with an element + addr = Address('192.168.0.1') + xpdu = xxtob('deadbeef') + pdu_bytes = xxtob('81.04.000e' + 'c0.a8.00.01.ba.c0' # original source address + 'deadbeef' # forwarded PDU + ) + + self.request(ForwardedNPDU(addr, xpdu)) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) + self.confirmation(ForwardedNPDU, bvlciAddress=addr, pduData=xpdu) + + def test_register_foreign_device(self): + """Test the RegisterForeignDevice encoding and decoding.""" + if _debug: TestAnnexJCodec._debug("test_register_foreign_device") + + # register as a foreign device with a 30 second time-to-live + pdu_bytes = xxtob('81.05.0006' + '001e' # time-to-live + ) + + self.request(RegisterForeignDevice(30)) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) + self.confirmation(RegisterForeignDevice, bvlciTimeToLive=30) + + def test_read_foreign_device_table(self): + """Test the ReadForeignDeviceTable encoding and decoding.""" + if _debug: TestAnnexJCodec._debug("test_read_foreign_device_table") + + # read returns an empty table + self.request(ReadForeignDeviceTable()) + self.indication(pduData=xxtob('81.06.0004')) + + self.response(PDU(xxtob('81.06.0004'))) + self.confirmation(ReadForeignDeviceTable) + + def test_read_foreign_device_table_ack(self): + """Test the ReadForeignDeviceTableAck encoding and decoding.""" + if _debug: TestAnnexJCodec._debug("test_read_foreign_device_table_ack") + + # read returns an empty table + self.request(ReadForeignDeviceTableAck([])) + self.indication(pduData=xxtob('81.07.0004')) + + self.response(PDU(xxtob('81.07.0004'))) + self.confirmation(ReadForeignDeviceTableAck, bvlciFDT=[]) + + # read returns a table with one entry + fdte = FDTEntry() + fdte.fdAddress = Address("192.168.0.10") + fdte.fdTTL = 30 + fdte.fdRemain = 15 + pdu_bytes = xxtob('81.07.000e' + 'c0.a8.00.0a.ba.c0' + '001e.000f' + ) + + self.request(ReadForeignDeviceTableAck([fdte])) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) + self.confirmation(ReadForeignDeviceTableAck, bvlciFDT=[fdte]) + + def test_delete_foreign_device_table_entry(self): + """Test the DeleteForeignDeviceTableEntry encoding and decoding.""" + if _debug: TestAnnexJCodec._debug("test_delete_foreign_device_table_entry") + + # delete an element + addr = Address('192.168.0.11/24') + pdu_bytes = xxtob('81.08.000a' + 'c0.a8.00.0b.ba.c0' # address of entry to be deleted + ) + + self.request(DeleteForeignDeviceTableEntry(addr)) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) + self.confirmation(DeleteForeignDeviceTableEntry, bvlciAddress=addr) + + def test_distribute_broadcast_to_network(self): + """Test the DistributeBroadcastToNetwork encoding and decoding.""" + if _debug: TestAnnexJCodec._debug("test_distribute_broadcast_to_network") + + # read returns a table with an element + xpdu = xxtob('deadbeef') + pdu_bytes = xxtob('81.09.0008' + 'deadbeef' # PDU to broadcast + ) + + self.request(DistributeBroadcastToNetwork(xpdu)) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) + self.confirmation(DistributeBroadcastToNetwork, pduData=xpdu) + + def test_original_unicast_npdu(self): + """Test the OriginalUnicastNPDU encoding and decoding.""" + if _debug: TestAnnexJCodec._debug("test_original_unicast_npdu") + + # read returns a table with an element + xpdu = xxtob('deadbeef') + pdu_bytes = xxtob('81.0a.0008' + 'deadbeef' # PDU being unicast + ) + + self.request(OriginalUnicastNPDU(xpdu)) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) + self.confirmation(OriginalUnicastNPDU, pduData=xpdu) + + def test_original_broadcast_npdu(self): + """Test the OriginalBroadcastNPDU encoding and decoding.""" + if _debug: TestAnnexJCodec._debug("test_original_broadcast_npdu") + + # read returns a table with an element + xpdu = xxtob('deadbeef') + pdu_bytes = xxtob('81.0b.0008' + 'deadbeef' # PDU being broadcast + ) + + self.request(OriginalBroadcastNPDU(xpdu)) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) + self.confirmation(OriginalBroadcastNPDU, pduData=xpdu) + diff --git a/tests/test_iocb/__init__.py b/tests/test_iocb/__init__.py new file mode 100644 index 0000000..2e7964a --- /dev/null +++ b/tests/test_iocb/__init__.py @@ -0,0 +1,14 @@ +#!/usr/bin/python + +""" +Test BACpypes IOCB Module +""" + +from . import test_iocb +from . import test_iochain +from . import test_iogroup +from . import test_ioqueue +from . import test_iocontroller +from . import test_ioqcontroller +from . import test_clientcontroller +from . import test_sieveclientcontroller diff --git a/tests/test_iocb/test_clientcontroller.py b/tests/test_iocb/test_clientcontroller.py new file mode 100644 index 0000000..fdffa2a --- /dev/null +++ b/tests/test_iocb/test_clientcontroller.py @@ -0,0 +1 @@ +# placeholder diff --git a/tests/test_iocb/test_iocb.py b/tests/test_iocb/test_iocb.py new file mode 100644 index 0000000..fdffa2a --- /dev/null +++ b/tests/test_iocb/test_iocb.py @@ -0,0 +1 @@ +# placeholder diff --git a/tests/test_iocb/test_iochain.py b/tests/test_iocb/test_iochain.py new file mode 100644 index 0000000..fdffa2a --- /dev/null +++ b/tests/test_iocb/test_iochain.py @@ -0,0 +1 @@ +# placeholder diff --git a/tests/test_iocb/test_iocontroller.py b/tests/test_iocb/test_iocontroller.py new file mode 100644 index 0000000..fdffa2a --- /dev/null +++ b/tests/test_iocb/test_iocontroller.py @@ -0,0 +1 @@ +# placeholder diff --git a/tests/test_iocb/test_iogroup.py b/tests/test_iocb/test_iogroup.py new file mode 100644 index 0000000..fdffa2a --- /dev/null +++ b/tests/test_iocb/test_iogroup.py @@ -0,0 +1 @@ +# placeholder diff --git a/tests/test_iocb/test_ioqcontroller.py b/tests/test_iocb/test_ioqcontroller.py new file mode 100644 index 0000000..fdffa2a --- /dev/null +++ b/tests/test_iocb/test_ioqcontroller.py @@ -0,0 +1 @@ +# placeholder diff --git a/tests/test_iocb/test_ioqueue.py b/tests/test_iocb/test_ioqueue.py new file mode 100644 index 0000000..fdffa2a --- /dev/null +++ b/tests/test_iocb/test_ioqueue.py @@ -0,0 +1 @@ +# placeholder diff --git a/tests/test_iocb/test_sieveclientcontroller.py b/tests/test_iocb/test_sieveclientcontroller.py new file mode 100644 index 0000000..fdffa2a --- /dev/null +++ b/tests/test_iocb/test_sieveclientcontroller.py @@ -0,0 +1 @@ +# placeholder diff --git a/tests/test_primitive_data/test_integer.py b/tests/test_primitive_data/test_integer.py index 5ff144a..464a85f 100644 --- a/tests/test_primitive_data/test_integer.py +++ b/tests/test_primitive_data/test_integer.py @@ -6,6 +6,7 @@ Test Primitive Data Integer --------------------------- """ +import sys import unittest from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob @@ -78,6 +79,15 @@ class TestInteger(unittest.TestCase): obj = Integer() assert obj.value == 0 + assert Integer.is_valid(1) + assert Integer.is_valid(-1) + if sys.version[0] == 2: + assert Integer.is_valid(long(1)) + assert Integer.is_valid(long(-1)) + + assert not Integer.is_valid(True) + assert not Integer.is_valid(1.0) + with self.assertRaises(TypeError): Integer("some string") with self.assertRaises(TypeError): @@ -139,4 +149,5 @@ class TestInteger(unittest.TestCase): integer_endec(-8388608, '800000') integer_endec(2147483647, '7fffffff') - integer_endec(-2147483648, '80000000') \ No newline at end of file + integer_endec(-2147483648, '80000000') + diff --git a/tests/test_primitive_data/test_unsigned.py b/tests/test_primitive_data/test_unsigned.py index 7cc7e5f..65d6935 100644 --- a/tests/test_primitive_data/test_unsigned.py +++ b/tests/test_primitive_data/test_unsigned.py @@ -6,6 +6,7 @@ Test Primitive Data Unsigned ---------------------------- """ +import sys import unittest from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob @@ -78,6 +79,16 @@ class TestUnsigned(unittest.TestCase): obj = Unsigned() assert obj.value == 0 + assert Unsigned.is_valid(1) + assert not Unsigned.is_valid(-1) + if sys.version[0] == 2: + assert Unsigned.is_valid(long(1)) + assert not Unsigned.is_valid(long(-1)) + + assert not Unsigned.is_valid(True) + assert not Unsigned.is_valid(-1) + assert not Unsigned.is_valid(1.0) + with self.assertRaises(TypeError): Unsigned("some string") with self.assertRaises(TypeError): @@ -138,4 +149,5 @@ class TestUnsigned(unittest.TestCase): unsigned_endec(8388608, '800000') unsigned_endec(2147483647, '7fffffff') - unsigned_endec(2147483648, '80000000') \ No newline at end of file + unsigned_endec(2147483648, '80000000') + diff --git a/tests/test_utilities/test_state_machine.py b/tests/test_utilities/test_state_machine.py index f6256c4..8ca7f1f 100644 --- a/tests/test_utilities/test_state_machine.py +++ b/tests/test_utilities/test_state_machine.py @@ -9,7 +9,7 @@ Test Utilities State Machine import unittest from bacpypes.debugging import bacpypes_debugging, ModuleLogger -from ..state_machine import State, StateMachine, StateMachineGroup +from ..state_machine import State, StateMachine, StateMachineGroup, match_pdu from ..time_machine import reset_time_machine, run_time_machine from ..trapped_classes import TrappedState, TrappedStateMachine @@ -18,6 +18,51 @@ _debug = 0 _log = ModuleLogger(globals()) +@bacpypes_debugging +class TPDU: + + def __init__(self, **kwargs): + if _debug: TPDU._debug("__init__ %r", kwargs) + + self.__dict__.update(kwargs) + + def __repr__(self): + return ''.format(', '.join( + '{}={}'.format(k, v) for k,v in self.__dict__.items(), + )) + + +@bacpypes_debugging +class TestMatchPDU(unittest.TestCase): + + def test_match_pdu(self): + if _debug: TestMatchPDU._debug("test_match_pdu") + + tpdu = TPDU(x=1) + Anon = type('Anon', (), {}) + anon = Anon() + + # no criteria passes + assert match_pdu(tpdu) + assert match_pdu(anon) + + # matching/not matching types + assert match_pdu(tpdu, TPDU) + assert not match_pdu(tpdu, Anon) + assert match_pdu(tpdu, (TPDU, Anon)) + + # matching/not matching attributes + assert match_pdu(tpdu, x=1) + assert not match_pdu(tpdu, x=2) + assert not match_pdu(tpdu, y=1) + assert not match_pdu(anon, x=1) + + # matching/not matching types and attributes + assert match_pdu(tpdu, TPDU, x=1) + assert not match_pdu(tpdu, TPDU, x=2) + assert not match_pdu(tpdu, TPDU, y=1) + + @bacpypes_debugging class TestState(unittest.TestCase): @@ -29,6 +74,7 @@ class TestState(unittest.TestCase): ns = ts.doc("test state") assert ts.doc_string == "test state" assert ns is ts + if _debug: TestState._debug(" - passed") def test_state_success(self): if _debug: TestState._debug("test_state_success") @@ -43,6 +89,7 @@ class TestState(unittest.TestCase): ts.success() with self.assertRaises(RuntimeError): ts.fail() + if _debug: TestState._debug(" - passed") def test_state_fail(self): if _debug: TestState._debug("test_state_fail") @@ -57,9 +104,11 @@ class TestState(unittest.TestCase): ts.success() with self.assertRaises(RuntimeError): ts.fail() + if _debug: TestState._debug(" - passed") def test_something_else(self): if _debug: TestState._debug("test_something_else") + if _debug: TestState._debug(" - passed") @bacpypes_debugging @@ -77,12 +126,13 @@ class TestStateMachine(unittest.TestCase): # check for still running in the start state assert tsm.running assert tsm.current_state is tsm.start_state + if _debug: TestStateMachine._debug(" - passed") def test_state_machine_success(self): if _debug: TestStateMachine._debug("test_state_machine_success") # create a trapped state machine - tsm = TrappedStateMachine(state_subclass=TrappedState) + tsm = TrappedStateMachine() assert isinstance(tsm.start_state, TrappedState) # make the start state a success @@ -94,12 +144,13 @@ class TestStateMachine(unittest.TestCase): # check for success assert not tsm.running assert tsm.current_state.is_success_state + if _debug: TestStateMachine._debug(" - passed") def test_state_machine_fail(self): if _debug: TestStateMachine._debug("test_state_machine_fail") # create a trapped state machine - tsm = TrappedStateMachine(state_subclass=TrappedState) + tsm = TrappedStateMachine() assert isinstance(tsm.start_state, TrappedState) # make the start state a fail @@ -111,15 +162,16 @@ class TestStateMachine(unittest.TestCase): # check for success assert not tsm.running assert tsm.current_state.is_fail_state + if _debug: TestStateMachine._debug(" - passed") def test_state_machine_send(self): if _debug: TestStateMachine._debug("test_state_machine_send") # create a trapped state machine - tsm = TrappedStateMachine(state_subclass=TrappedState) + tsm = TrappedStateMachine() # make pdu object - pdu = object() + pdu = TPDU() # make a send transition from start to success, run the machine tsm.start_state.send(pdu).success() @@ -141,18 +193,19 @@ class TestStateMachine(unittest.TestCase): # check the transaction log assert len(tsm.transaction_log) == 1 assert tsm.transaction_log[0][1] is pdu + if _debug: TestStateMachine._debug(" - passed") def test_state_machine_receive(self): if _debug: TestStateMachine._debug("test_state_machine_receive") # create a trapped state machine - tsm = TrappedStateMachine(state_subclass=TrappedState) + tsm = TrappedStateMachine() # make pdu object - pdu = object() + pdu = TPDU() # make a receive transition from start to success, run the machine - tsm.start_state.receive(pdu).success() + tsm.start_state.receive(TPDU).success() tsm.run() # check for still running @@ -174,19 +227,20 @@ class TestStateMachine(unittest.TestCase): # check the transaction log assert len(tsm.transaction_log) == 1 assert tsm.transaction_log[0][1] is pdu + if _debug: TestStateMachine._debug(" - passed") def test_state_machine_unexpected(self): if _debug: TestStateMachine._debug("test_state_machine_unexpected") # create a trapped state machine - tsm = TrappedStateMachine(state_subclass=TrappedState) + tsm = TrappedStateMachine() # make pdu object - good_pdu = object() - bad_pdu = object() + good_pdu = TPDU(a=1) + bad_pdu = TPDU(b=2) # make a receive transition from start to success, run the machine - tsm.start_state.receive(good_pdu).success() + tsm.start_state.receive(TPDU, a=1).success() tsm.run() # check for still running @@ -206,21 +260,24 @@ class TestStateMachine(unittest.TestCase): # check the transaction log assert len(tsm.transaction_log) == 1 assert tsm.transaction_log[0][1] is bad_pdu + if _debug: TestStateMachine._debug(" - passed") def test_state_machine_loop_01(self): if _debug: TestStateMachine._debug("test_state_machine_loop_01") # create a trapped state machine - tsm = TrappedStateMachine(state_subclass=TrappedState) + tsm = TrappedStateMachine() # make pdu object - first_pdu = object() - second_pdu = object() + first_pdu = TPDU(a=1) + if _debug: TestStateMachine._debug(" - first_pdu: %r", first_pdu) + second_pdu = TPDU(a=2) + if _debug: TestStateMachine._debug(" - second_pdu: %r", second_pdu) # after sending the first pdu, wait for the second s0 = tsm.start_state s1 = s0.send(first_pdu) - s2 = s1.receive(second_pdu) + s2 = s1.receive(TPDU, a=2) s2.success() # run the machine @@ -229,6 +286,7 @@ class TestStateMachine(unittest.TestCase): # check for still running and waiting assert tsm.running assert tsm.current_state is s1 + if _debug: TestStateMachine._debug(" - still running and waiting") # give the machine the second pdu tsm.receive(second_pdu) @@ -236,31 +294,34 @@ class TestStateMachine(unittest.TestCase): # check for success assert not tsm.running assert tsm.current_state.is_success_state + if _debug: TestStateMachine._debug(" - success") # check the callbacks assert s0.before_send_pdu is first_pdu assert s0.after_send_pdu is first_pdu assert s1.before_receive_pdu is second_pdu assert s1.after_receive_pdu is second_pdu + if _debug: TestStateMachine._debug(" - callbacks passed") # check the transaction log assert len(tsm.transaction_log) == 2 assert tsm.transaction_log[0][1] is first_pdu assert tsm.transaction_log[1][1] is second_pdu + if _debug: TestStateMachine._debug(" - transaction log passed") def test_state_machine_loop_02(self): if _debug: TestStateMachine._debug("test_state_machine_loop_02") # create a trapped state machine - tsm = TrappedStateMachine(state_subclass=TrappedState) + tsm = TrappedStateMachine() # make pdu object - first_pdu = object() - second_pdu = object() + first_pdu = TPDU(a=1) + second_pdu = TPDU(a=2) # when the first pdu is received, send the second s0 = tsm.start_state - s1 = s0.receive(first_pdu) + s1 = s0.receive(TPDU, a=1) s2 = s1.send(second_pdu) s2.success() @@ -269,6 +330,7 @@ class TestStateMachine(unittest.TestCase): # check for still running assert tsm.running + if _debug: TestStateMachine._debug(" - still running") # give the machine the first pdu tsm.receive(first_pdu) @@ -276,17 +338,20 @@ class TestStateMachine(unittest.TestCase): # check for success assert not tsm.running assert tsm.current_state.is_success_state + if _debug: TestStateMachine._debug(" - success") # check the callbacks assert s0.before_receive_pdu is first_pdu assert s0.after_receive_pdu is first_pdu assert s1.before_send_pdu is second_pdu assert s1.after_send_pdu is second_pdu + if _debug: TestStateMachine._debug(" - callbacks passed") # check the transaction log assert len(tsm.transaction_log) == 2 assert tsm.transaction_log[0][1] is first_pdu assert tsm.transaction_log[1][1] is second_pdu + if _debug: TestStateMachine._debug(" - transaction log passed") @bacpypes_debugging @@ -296,7 +361,7 @@ class TestStateMachineTimeout1(unittest.TestCase): if _debug: TestStateMachineTimeout1._debug("test_state_machine_timeout_1") # create a trapped state machine - tsm = TrappedStateMachine(state_subclass=TrappedState) + tsm = TrappedStateMachine() # make a timeout transition from start to success tsm.start_state.timeout(1.0).success() @@ -312,6 +377,7 @@ class TestStateMachineTimeout1(unittest.TestCase): # check for success assert not tsm.running assert tsm.current_state.is_success_state + if _debug: TestStateMachine._debug(" - passed") @bacpypes_debugging @@ -321,11 +387,11 @@ class TestStateMachineTimeout2(unittest.TestCase): if _debug: TestStateMachineTimeout2._debug("test_state_machine_timeout_2") # make some pdu's - first_pdu = object() - second_pdu = object() + first_pdu = TPDU(a=1) + second_pdu = TPDU(a=2) # create a trapped state machine - tsm = TrappedStateMachine(state_subclass=TrappedState) + tsm = TrappedStateMachine() s0 = tsm.start_state # send something, wait, send something, wait, success @@ -350,6 +416,7 @@ class TestStateMachineTimeout2(unittest.TestCase): assert len(tsm.transaction_log) == 2 assert tsm.transaction_log[0][1] is first_pdu assert tsm.transaction_log[1][1] is second_pdu + if _debug: TestStateMachine._debug(" - passed") @bacpypes_debugging @@ -362,7 +429,7 @@ class TestStateMachineGroup(unittest.TestCase): smg = StateMachineGroup() # create a trapped state machine, start state is success - tsm = TrappedStateMachine(state_subclass=TrappedState) + tsm = TrappedStateMachine() tsm.start_state.success() # add it to the group @@ -381,6 +448,7 @@ class TestStateMachineGroup(unittest.TestCase): assert not tsm.running assert tsm.current_state.is_success_state assert smg.is_success_state + if _debug: TestStateMachine._debug(" - passed") def test_state_machine_group_fail(self): if _debug: TestStateMachineGroup._debug("test_state_machine_group_fail") @@ -389,7 +457,7 @@ class TestStateMachineGroup(unittest.TestCase): smg = StateMachineGroup() # create a trapped state machine, start state is fail - tsm = TrappedStateMachine(state_subclass=TrappedState) + tsm = TrappedStateMachine() tsm.start_state.fail() # add it to the group @@ -408,3 +476,71 @@ class TestStateMachineGroup(unittest.TestCase): assert not tsm.running assert tsm.current_state.is_fail_state assert smg.is_fail_state + if _debug: TestStateMachine._debug(" - passed") + + +@bacpypes_debugging +class TestStateMachineEvents(unittest.TestCase): + + def test_state_machine_event_01(self): + if _debug: TestStateMachineEvents._debug("test_state_machine_event_01") + + # create a state machine group + smg = StateMachineGroup() + + # create a trapped state machine, start state is success + tsm1 = TrappedStateMachine() + tsm1.start_state.set_event('e').success() + smg.append(tsm1) + + # create another trapped state machine, waiting for the event + tsm2 = TrappedStateMachine() + tsm2.start_state.wait_event('e').success() + smg.append(tsm2) + + reset_time_machine() + if _debug: TestStateMachineEvents._debug(" - time machine reset") + + # tell the group to run + smg.run() + + run_time_machine(60.0) + if _debug: TestStateMachineEvents._debug(" - time machine finished") + + # check for success + assert tsm1.current_state.is_success_state + assert tsm2.current_state.is_success_state + assert smg.is_success_state + if _debug: TestStateMachineEvents._debug(" - passed") + + def test_state_machine_event_02(self): + if _debug: TestStateMachineEvents._debug("test_state_machine_event_02") + + # create a state machine group + smg = StateMachineGroup() + + # create a trapped state machine, waiting for an event + tsm1 = TrappedStateMachine() + tsm1.start_state.wait_event('e').success() + smg.append(tsm1) + + # create another trapped state machine, start state is success + tsm2 = TrappedStateMachine() + tsm2.start_state.set_event('e').success() + smg.append(tsm2) + + reset_time_machine() + if _debug: TestStateMachineEvents._debug(" - time machine reset") + + # tell the group to run + smg.run() + + run_time_machine(60.0) + if _debug: TestStateMachineEvents._debug(" - time machine finished") + + # check for success + assert tsm1.current_state.is_success_state + assert tsm2.current_state.is_success_state + assert smg.is_success_state + if _debug: TestStateMachineEvents._debug(" - passed") + diff --git a/tests/test_vlan/__init__.py b/tests/test_vlan/__init__.py index ad90205..93998fd 100644 --- a/tests/test_vlan/__init__.py +++ b/tests/test_vlan/__init__.py @@ -8,3 +8,5 @@ This module tests the VLAN networking. """ from . import test_network +from . import test_ipnetwork + diff --git a/tests/test_vlan/test_ipnetwork.py b/tests/test_vlan/test_ipnetwork.py new file mode 100644 index 0000000..a825f98 --- /dev/null +++ b/tests/test_vlan/test_ipnetwork.py @@ -0,0 +1,374 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Test IPNetwork +------------ + +This module tests the basic functionality of a crudely simulated IPv4 network, +source and destination addresses are tuples like those used for sockets and +the UDPDirector. +""" + +import unittest + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger + +from bacpypes.pdu import Address, LocalBroadcast, PDU +from bacpypes.comm import bind +from bacpypes.vlan import IPNetwork, IPNode, IPRouter + +from ..state_machine import ClientStateMachine, StateMachineGroup +from ..time_machine import reset_time_machine, run_time_machine + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + + +@bacpypes_debugging +class TNetwork(StateMachineGroup): + + def __init__(self, node_count, address_pattern): + if _debug: TNetwork._debug("__init__ %r", node_count) + StateMachineGroup.__init__(self) + + self.vlan = IPNetwork() + + for i in range(node_count): + node_address = Address(address_pattern.format(i + 1)) + node = IPNode(node_address, self.vlan) + if _debug: TNetwork._debug(" - node: %r", node) + + # bind a client state machine to the node + csm = ClientStateMachine() + bind(csm, node) + + # add it to this group + self.append(csm) + + def run(self, time_limit=60.0): + if _debug: TNetwork._debug("run %r", time_limit) + + # reset the time machine + reset_time_machine() + if _debug: TNetwork._debug(" - time machine reset") + + # run the group + super(TNetwork, self).run() + + # run it for some time + run_time_machine(time_limit) + if _debug: TNetwork._debug(" - time machine finished") + + # check for success + all_success, some_failed = super(TNetwork, self).check_for_success() + assert all_success + + +@bacpypes_debugging +class TestVLAN(unittest.TestCase): + + def __init__(self, *args, **kwargs): + if _debug: TestVLAN._debug("__init__ %r %r", args, kwargs) + super(TestVLAN, self).__init__(*args, **kwargs) + + def test_idle(self): + """Test that a very quiet network can exist. This is not a network + test so much as a state machine group test. + """ + if _debug: TestVLAN._debug("test_idle") + + # two element network + tnet = TNetwork(2, "192.168.1.{}/24") + tnode1, tnode2 = tnet.state_machines + + # set the start states of both machines to success + tnode1.start_state.success() + tnode2.start_state.success() + + # run the group + tnet.run() + + def test_send_receive(self): + """Test that a node can send a message to another node. + """ + if _debug: TestVLAN._debug("test_send_receive") + + # two element network + tnet = TNetwork(2, "192.168.2.{}/24") + tnode1, tnode2 = tnet.state_machines + + # make a PDU from node 1 to node 2 + pdu = PDU(b'data', + source=('192.168.2.1', 47808), + destination=('192.168.2.2', 47808), + ) + if _debug: TestVLAN._debug(" - pdu: %r", pdu) + + # node 1 sends the pdu, mode 2 gets it + tnode1.start_state.send(pdu).success() + tnode2.start_state.receive(PDU, + pduSource=('192.168.2.1', 47808), + ).success() + + # run the group + tnet.run() + + def test_broadcast(self): + """Test that a node can send out a 'local broadcast' message which will + be received by every other node. + """ + if _debug: TestVLAN._debug("test_broadcast") + + # three element network + tnet = TNetwork(3, "192.168.3.{}/24") + tnode1, tnode2, tnode3 = tnet.state_machines + + # make a broadcast PDU + pdu = PDU(b'data', + source=('192.168.3.1', 47808), + destination=('192.168.3.255', 47808), + ) + if _debug: TestVLAN._debug(" - pdu: %r", pdu) + + # node 1 sends the pdu, node 2 and 3 each get it + tnode1.start_state.send(pdu).success() + tnode2.start_state.receive(PDU, + pduSource=('192.168.3.1', 47808), + ).success() + tnode3.start_state.receive(PDU, + pduSource=('192.168.3.1', 47808) + ).success() + + # run the group + tnet.run() + + def test_spoof_fail(self): + """Test verifying that a node cannot send out packets with a source + address other than its own, see also test_spoof_pass(). + """ + if _debug: TestVLAN._debug("test_spoof_fail") + + # one element network + tnet = TNetwork(1, "192.168.4.{}/24") + tnode1, = tnet.state_machines + + # make a unicast PDU with the wrong source + pdu = PDU(b'data', + source=('192.168.4.2', 47808), + destination=('192.168.4.3', 47808), + ) + + # the node sends the pdu and would be a success but... + tnode1.start_state.send(pdu).success() + + # when the node attempts to send it raises an error + with self.assertRaises(RuntimeError): + tnet.run() + + def test_spoof_pass(self): + """Test allowing a node to send out packets with a source address + other than its own, see also test_spoof_fail(). + """ + if _debug: TestVLAN._debug("test_spoof_pass") + + # one node network + tnet = TNetwork(1, "192.168.5.{}/24") + tnode1, = tnet.state_machines + + # reach into the network and enable spoofing for the node + tnet.vlan.nodes[0].spoofing = True + + # make a unicast PDU from a fictitious node + pdu = PDU(b'data', + source=('192.168.5.3', 47808), + destination=('192.168.5.1', 47808), + ) + + # node 1 sends the pdu, but gets it back as if it was from node 3 + tnode1.start_state.send(pdu).receive(PDU, + pduSource=('192.168.5.3', 47808), + ).success() + + # run the group + tnet.run() + + def test_promiscuous_pass(self): + """Test 'promiscuous mode' of a node which allows it to receive every + packet sent on the network. This is like the network is a hub, or + the node is connected to a 'monitor' port on a managed switch. + """ + if _debug: TestVLAN._debug("test_promiscuous_pass") + + # three element network + tnet = TNetwork(3, "192.168.6.{}/24") + tnode1, tnode2, tnode3 = tnet.state_machines + + # reach into the network and enable promiscuous mode + tnet.vlan.nodes[2].promiscuous = True + + # make a PDU from node 1 to node 2 + pdu = PDU(b'data', + source=('192.168.6.1', 47808), + destination=('192.168.6.2', 47808), + ) + + # node 1 sends the pdu to node 2, node 3 also gets a copy + tnode1.start_state.send(pdu).success() + tnode2.start_state.receive(PDU, + pduSource=('192.168.6.1', 47808), + ).success() + tnode3.start_state.receive(PDU, + pduDestination=('192.168.6.2', 47808), + ).success() + + # run the group + tnet.run() + + def test_promiscuous_fail(self): + if _debug: TestVLAN._debug("test_promiscuous_fail") + + # three element network + tnet = TNetwork(3, "192.168.7.{}/24") + tnode1, tnode2, tnode3 = tnet.state_machines + + # make a PDU from node 1 to node 2 + pdu = PDU(b'data', + source=('192.168.7.1', 47808), + destination=('192.168.7.2', 47808), + ) + + # node 1 sends the pdu to node 2, node 3 waits and gets nothing + tnode1.start_state.send(pdu).success() + tnode2.start_state.receive(PDU, + pduSource=('192.168.7.1', 47808), + ).success() + + # if node 3 receives anything it will trigger unexpected receive and fail + tnode3.start_state.timeout(1).success() + + # run the group + tnet.run() + + +@bacpypes_debugging +class TestRouter(unittest.TestCase): + + def __init__(self, *args, **kwargs): + if _debug: TestRouter._debug("__init__ %r %r", args, kwargs) + super(TestRouter, self).__init__(*args, **kwargs) + + def setup_method(self, method): + """This function is called before each test method is called as is + given a reference to the test method.""" + if _debug: TestRouter._debug("setup_method %r", method) + + # create a state machine group that has all nodes on all networks + self.smg = StateMachineGroup() + + # make some networks + vlan10 = IPNetwork() + vlan20 = IPNetwork() + + # make a router and add the networks + trouter = IPRouter() + trouter.add_network(Address("192.168.10.1/24"), vlan10) + trouter.add_network(Address("192.168.20.1/24"), vlan20) + + # add nodes to the networks + for pattern, lan in ( + ("192.168.10.{}/24", vlan10), + ("192.168.20.{}/24", vlan20), + ): + for i in range(2): + node_address = Address(pattern.format(i + 2)) + node = IPNode(node_address, lan) + if _debug: TestRouter._debug(" - node: %r", node) + + # bind a client state machine to the node + csm = ClientStateMachine() + bind(csm, node) + + # add it to the group + self.smg.append(csm) + + def teardown_method(self, method): + """This function is called after each test method has been called and + is given a reference to the test method.""" + if _debug: TestRouter._debug("teardown_method %r", method) + + # reset the time machine + reset_time_machine() + if _debug: TestRouter._debug(" - time machine reset") + + # run the group + self.smg.run() + + # run it for some time + run_time_machine(60.0) + if _debug: TestRouter._debug(" - time machine finished") + + # check for success + all_success, some_failed = self.smg.check_for_success() + assert all_success + + def test_idle(self): + if _debug: TestRouter._debug("test_idle") + + # all successful + for csm in self.smg.state_machines: + csm.start_state.success() + + def test_send_receive(self): + """Test that a node can send a message to another node on + a different network. + """ + if _debug: TestRouter._debug("test_send_receive") + + # unpack the state machines + csm_10_2, csm_10_3, csm_20_2, csm_20_3 = self.smg.state_machines + + # make a PDU from network 10 node 1 to network 20 node 2 + pdu = PDU(b'data', + source=('192.168.10.2', 47808), + destination=('192.168.20.3', 47808), + ) + if _debug: TestVLAN._debug(" - pdu: %r", pdu) + + # node 1 sends the pdu, mode 2 gets it + csm_10_2.start_state.send(pdu).success() + csm_20_3.start_state.receive(PDU, + pduSource=('192.168.10.2', 47808), + ).success() + + # other nodes get nothing + csm_10_3.start_state.timeout(1).success() + csm_20_2.start_state.timeout(1).success() + + def test_remote_broadcast(self): + """Test that a node can send a message to all of the other nodes on + a different network. + """ + if _debug: TestRouter._debug("test_remote_broadcast") + + # unpack the state machines + csm_10_2, csm_10_3, csm_20_2, csm_20_3 = self.smg.state_machines + + # make a PDU from network 10 node 1 to network 20 node 2 + pdu = PDU(b'data', + source=('192.168.10.2', 47808), + destination=('192.168.20.255', 47808), + ) + if _debug: TestVLAN._debug(" - pdu: %r", pdu) + + # node 10-2 sends the pdu, node 10-3 gets nothing, nodes 20-2 and 20-3 get it + csm_10_2.start_state.send(pdu).success() + csm_10_3.start_state.timeout(1).success() + csm_20_2.start_state.receive(PDU, + pduSource=('192.168.10.2', 47808), + ).success() + csm_20_3.start_state.receive(PDU, + pduSource=('192.168.10.2', 47808), + ).success() + diff --git a/tests/test_vlan/test_network.py b/tests/test_vlan/test_network.py index 4a14615..27b3658 100644 --- a/tests/test_vlan/test_network.py +++ b/tests/test_vlan/test_network.py @@ -2,58 +2,255 @@ # -*- coding: utf-8 -*- """ -Test Module Template --------------------- +Test Network +------------ + +This module tests the basic functionality of a VLAN network. Each test "runs" +on a VLAN with two nodes, node_1 and node_2, and each has a state machine. """ import unittest from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.pdu import Address, LocalBroadcast, PDU +from bacpypes.comm import bind +from bacpypes.vlan import Network, Node + +from ..state_machine import ClientStateMachine, StateMachineGroup +from ..time_machine import reset_time_machine, run_time_machine + # some debugging _debug = 0 _log = ModuleLogger(globals()) @bacpypes_debugging -def setup_module(): - if _debug: setup_module._debug("setup_module") +class TNetwork(StateMachineGroup): + + def __init__(self, node_count): + if _debug: TNetwork._debug("__init__ %r", node_count) + StateMachineGroup.__init__(self) + + self.vlan = Network(broadcast_address=0) + + for i in range(node_count): + node = Node(i + 1, self.vlan) + + # bind a client state machine to the node + csm = ClientStateMachine() + bind(csm, node) + + # add it to this group + self.append(csm) + + def run(self, time_limit=60.0): + if _debug: TNetwork._debug("run %r", time_limit) + + # reset the time machine + reset_time_machine() + if _debug: TNetwork._debug(" - time machine reset") + + # run the group + super(TNetwork, self).run() + + # run it for some time + run_time_machine(time_limit) + if _debug: TNetwork._debug(" - time machine finished") + + # check for success + all_success, some_failed = super(TNetwork, self).check_for_success() + assert all_success @bacpypes_debugging -def teardown_module(): - if _debug: teardown_module._debug("teardown_module") +class TestVLAN(unittest.TestCase): + def __init__(self, *args, **kwargs): + if _debug: TestVLAN._debug("__init__ %r %r", args, kwargs) + super(TestVLAN, self).__init__(*args, **kwargs) + + def test_idle(self): + """Test that a very quiet network can exist. This is not a network + test so much as a state machine group test. + """ + if _debug: TestVLAN._debug("test_idle") + + # two element network + tnet = TNetwork(2) + tnode1, tnode2 = tnet.state_machines + + # set the start states of both machines to success + tnode1.start_state.success() + tnode2.start_state.success() + + # run the group + tnet.run() + + def test_send_receive(self): + """Test that a node can send a message to another node. + """ + if _debug: TestVLAN._debug("test_send_receive") + + # two element network + tnet = TNetwork(2) + tnode1, tnode2 = tnet.state_machines + + # make a PDU from node 1 to node 2 + pdu = PDU(b'data', source=1, destination=2) + if _debug: TestVLAN._debug(" - pdu: %r", pdu) + + # node 1 sends the pdu, mode 2 gets it + tnode1.start_state.send(pdu).success() + tnode2.start_state.receive(PDU, pduSource=1).success() + + # run the group + tnet.run() + + def test_broadcast(self): + """Test that a node can send out a 'local broadcast' message which will + be received by every other node. + """ + if _debug: TestVLAN._debug("test_broadcast") + + # three element network + tnet = TNetwork(3) + tnode1, tnode2, tnode3 = tnet.state_machines + + # make a broadcast PDU + pdu = PDU(b'data', source=1, destination=0) + if _debug: TestVLAN._debug(" - pdu: %r", pdu) + + # node 1 sends the pdu, node 2 and 3 each get it + tnode1.start_state.send(pdu).success() + tnode2.start_state.receive(PDU, pduSource=1).success() + tnode3.start_state.receive(PDU, pduSource=1).success() + + # run the group + tnet.run() + + def test_spoof_fail(self): + """Test verifying that a node cannot send out packets with a source + address other than its own, see also test_spoof_pass(). + """ + if _debug: TestVLAN._debug("test_spoof_fail") + + # two element network + tnet = TNetwork(1) + tnode1, = tnet.state_machines + + # make a unicast PDU with the wrong source + pdu = PDU(b'data', source=2, destination=3) + + # the node sends the pdu and would be a success but... + tnode1.start_state.send(pdu).success() + + # when the node attempts to send it raises an error + with self.assertRaises(RuntimeError): + tnet.run() + + def test_spoof_pass(self): + """Test allowing a node to send out packets with a source address + other than its own, see also test_spoof_fail(). + """ + if _debug: TestVLAN._debug("test_spoof_pass") + + # one node network + tnet = TNetwork(1) + tnode1, = tnet.state_machines + + # reach into the network and enable spoofing for the node + tnet.vlan.nodes[0].spoofing = True + + # make a unicast PDU from a fictitious node + pdu = PDU(b'data', source=3, destination=1) + + # node 1 sends the pdu, but gets it back as if it was from node 3 + tnode1.start_state.send(pdu).receive(PDU, pduSource=3).success() + + # run the group + tnet.run() + + def test_promiscuous_pass(self): + """Test 'promiscuous mode' of a node which allows it to receive every + packet sent on the network. This is like the network is a hub, or + the node is connected to a 'monitor' port on a managed switch. + """ + if _debug: TestVLAN._debug("test_promiscuous_pass") + + # three element network + tnet = TNetwork(3) + tnode1, tnode2, tnode3 = tnet.state_machines + + # reach into the network and enable promiscuous mode + tnet.vlan.nodes[2].promiscuous = True + + # make a PDU from node 1 to node 2 + pdu = PDU(b'data', source=1, destination=2) + + # node 1 sends the pdu to node 2, node 3 also gets a copy + tnode1.start_state.send(pdu).success() + tnode2.start_state.receive(PDU, pduSource=1).success() + tnode3.start_state.receive(PDU, pduDestination=2).success() + + # run the group + tnet.run() + + def test_promiscuous_fail(self): + if _debug: TestVLAN._debug("test_promiscuous_fail") + + # three element network + tnet = TNetwork(3) + tnode1, tnode2, tnode3 = tnet.state_machines + + # make a PDU from node 1 to node 2 + pdu = PDU(b'data', source=1, destination=2) + + # node 1 sends the pdu to node 2, node 3 waits and gets nothing + tnode1.start_state.send(pdu).success() + tnode2.start_state.receive(PDU, pduSource=1).success() + + # if node 3 receives anything it will trigger unexpected receive and fail + tnode3.start_state.timeout(0.5).success() + + # run the group + tnet.run() @bacpypes_debugging -def setup_function(function): - if _debug: setup_function._debug("setup_function %r", function) +class TestVLANEvents(unittest.TestCase): + def __init__(self, *args, **kwargs): + if _debug: TestVLANEvents._debug("__init__ %r %r", args, kwargs) + super(TestVLANEvents, self).__init__(*args, **kwargs) -@bacpypes_debugging -def teardown_function(function): - if _debug: teardown_function._debug("teardown_function %r", function) + def test_send_receive(self): + """Test that a node can send a message to another node and use + events to continue with the messages. + """ + if _debug: TestVLAN._debug("test_send_receive") + # two element network + tnet = TNetwork(2) + tnode1, tnode2 = tnet.state_machines -@bacpypes_debugging -class TestCaseTemplate(unittest.TestCase): + # make a PDU from node 1 to node 2 + dead_pdu = PDU(b'dead', source=1, destination=2) + if _debug: TestVLAN._debug(" - dead_pdu: %r", dead_pdu) - @classmethod - def setup_class(cls): - if _debug: TestCaseTemplate._debug("setup_class") + # make a PDU from node 1 to node 2 + beef_pdu = PDU(b'beef', source=1, destination=2) + if _debug: TestVLAN._debug(" - beef_pdu: %r", beef_pdu) - @classmethod - def teardown_class(cls): - if _debug: TestCaseTemplate._debug("teardown_class") + # node 1 sends dead_pdu, waits for event, sends beef_pdu + tnode1.start_state \ + .send(dead_pdu).wait_event('e') \ + .send(beef_pdu).success() - def setup_method(self, method): - if _debug: TestCaseTemplate._debug("setup_module %r", method) + # node 2 receives dead_pdu, sets event, waits for beef_pdu + tnode2.start_state \ + .receive(PDU, pduData=b'dead').set_event('e') \ + .receive(PDU, pduData=b'beef').success() - def teardown_method(self, method): - if _debug: TestCaseTemplate._debug("teardown_method %r", method) + # run the group + tnet.run() - def test_something(self): - if _debug: TestCaseTemplate._debug("test_something") - - def test_something_else(self): - if _debug: TestCaseTemplate._debug("test_something_else") diff --git a/tests/trapped_classes.py b/tests/trapped_classes.py index e4bec91..25b2a42 100755 --- a/tests/trapped_classes.py +++ b/tests/trapped_classes.py @@ -122,21 +122,24 @@ class TrappedStateMachine(Trapper, StateMachine): throw an exception. """ + def __init__(self, **kwargs): + """Initialize a trapped state machine.""" + if _debug: TrappedStateMachine._debug("__init__ %r", kwargs) + + # provide a default state subclass + if 'state_subclass' not in kwargs: + kwargs['state_subclass'] = TrappedState + + # pass them all along + super(TrappedStateMachine, self).__init__(**kwargs) + def send(self, pdu): - """Called to send a PDU. - """ + """Called to send a PDU.""" if _debug: TrappedStateMachine._debug("send %r", pdu) # keep a copy self.sent = pdu - def match_pdu(self, pdu, transition_pdu): - """Very strong match condition.""" - if _debug: TrappedStateMachine._debug("match_pdu %r %r", pdu, transition_pdu) - - # must be identical objects - return pdu is transition_pdu - @bacpypes_debugging class TrappedClient(Client):