1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

bring branch up to date

This commit is contained in:
Joel Bender 2017-09-09 02:05:43 -04:00
commit e31d7e7d37
34 changed files with 1912 additions and 198 deletions

View File

@ -224,7 +224,7 @@ class WriteBroadcastDistributionTable(BVLPDU):
BVLCI.update(bvlpdu, self)
for bdte in self.bvlciBDT:
bvlpdu.put_data( bdte.addrAddr )
bvlpdu.put_data( bdte.addrMask )
bvlpdu.put_long( bdte.addrMask )
def decode(self, bvlpdu):
BVLCI.update(self, bvlpdu)
@ -401,6 +401,11 @@ class FDTEntry(DebugContents):
self.fdTTL = None
self.fdRemain = None
def __eq__(self, other):
"""Return true iff entries are identical."""
return (self.fdAddress == other.fdAddress) and \
(self.fdTTL == other.fdTTL) and (self.fdRemain == other.fdRemain)
def bvlpdu_contents(self, use_dict=None, as_class=dict):
"""Return the contents of an object as a dict."""
# make/extend the dictionary of content

View File

@ -72,7 +72,7 @@ _identLock = threading.Lock()
class IOCB(DebugContents):
_debugContents = \
_debug_contents = \
( 'args', 'kwargs'
, 'ioState', 'ioResponse-', 'ioError'
, 'ioController', 'ioServerRef', 'ioControllerRef', 'ioClientID', 'ioClientAddr'
@ -227,7 +227,7 @@ bacpypes_debugging(IOCB)
class IOChainMixIn(DebugContents):
_debugContents = ( 'ioChain++', )
_debug_contents = ( 'ioChain++', )
def __init__(self, iocb):
if _debug: IOChainMixIn._debug("__init__ %r", iocb)
@ -366,7 +366,7 @@ bacpypes_debugging(IOChain)
class IOGroup(IOCB, DebugContents):
_debugContents = ('ioMembers',)
_debug_contents = ('ioMembers',)
def __init__(self):
"""Initialize a group."""
@ -704,7 +704,7 @@ class IOQController(IOController):
# if we're busy, queue it
if (self.state != CTRL_IDLE):
if _debug: IOQController._debug(" - busy, request queued")
if _debug: IOQController._debug(" - busy, request queued, active_iocb: %r", self.active_iocb)
iocb.ioState = PENDING
self.ioQueue.put(iocb)
@ -719,6 +719,7 @@ class IOQController(IOController):
except:
# extract the error
err = sys.exc_info()[1]
if _debug: IOQController._debug(" - process_io() exception: %r", err)
# if there was an error, abort the request
if err:
@ -871,7 +872,7 @@ class ClientController(Client, IOQController):
self.request(iocb.args[0])
def confirmation(self, pdu):
if _debug: ClientController._debug("confirmation %r %r", args, kwargs)
if _debug: ClientController._debug("confirmation %r", pdu)
# make sure it has an active iocb
if not self.active_iocb:
@ -917,13 +918,18 @@ bacpypes_debugging(SieveQueue)
class SieveClientController(Client, IOController):
def __init__(self):
def __init__(self, queue_class=SieveQueue):
if _debug: SieveClientController._debug("__init__")
Client.__init__(self)
IOController.__init__(self)
# make sure it's the correct class
if not issubclass(queue_class, SieveQueue):
raise TypeError("queue class must be a subclass of SieveQueue")
# queues for each address
self.queues = {}
self.queue_class = queue_class
def process_io(self, iocb):
if _debug: SieveClientController._debug("process_io %r", iocb)
@ -936,7 +942,7 @@ class SieveClientController(Client, IOController):
queue = self.queues.get(destination_address, None)
if not queue:
if _debug: SieveClientController._debug(" - new queue")
queue = SieveQueue(self.request, destination_address)
queue = self.queue_class(self.request, destination_address)
self.queues[destination_address] = queue
if _debug: SieveClientController._debug(" - queue: %r", queue)

View File

@ -618,7 +618,7 @@ class Unsigned(Atomic):
@classmethod
def is_valid(cls, arg):
"""Return True if arg is valid value for the class."""
return isinstance(arg, (int, long)) and (arg >= 0)
return isinstance(arg, (int, long)) and (not isinstance(arg, bool)) and (arg >= 0)
def __str__(self):
return "Unsigned(%s)" % (self.value, )
@ -693,7 +693,7 @@ class Integer(Atomic):
@classmethod
def is_valid(cls, arg):
"""Return True if arg is valid value for the class."""
return isinstance(arg, (int, long))
return isinstance(arg, (int, long)) and (not isinstance(arg, bool))
def __str__(self):
return "Integer(%s)" % (self.value, )

View File

@ -5,6 +5,8 @@ Virtual Local Area Network
"""
import random
import socket
import struct
from copy import deepcopy
from .errors import ConfigurationError
@ -12,7 +14,7 @@ from .debugging import ModuleLogger, bacpypes_debugging
from .core import deferred
from .pdu import Address
from .comm import Server
from .comm import Client, Server, bind
# some debugging
_debug = 0
@ -24,11 +26,13 @@ _log = ModuleLogger(globals())
class Network:
def __init__(self, dropPercent=0.0):
if _debug: Network._debug("__init__ dropPercent=%r", dropPercent)
def __init__(self, name='', broadcast_address=None, drop_percent=0.0):
if _debug: Network._debug("__init__ name=%r broadcast_address=%r drop_percent=%r", name, broadcast_address, drop_percent)
self.name = name
self.nodes = []
self.dropPercent = dropPercent
self.broadcast_address = broadcast_address
self.drop_percent = drop_percent
def add_node(self, node):
""" Add a node to this network, let the node know which network it's on. """
@ -37,6 +41,10 @@ class Network:
self.nodes.append(node)
node.lan = self
# update the node name
if not node.name:
node.name = '%s:%s' % (self.name, node.address)
def remove_node(self, node):
""" Remove a node from this network. """
if _debug: Network._debug("remove_node %r", node)
@ -48,29 +56,25 @@ class Network:
""" Process a PDU by sending a copy to each node as dictated by the
addressing and if a node is promiscuous.
"""
if _debug: Network._debug("process_pdu %r", pdu)
if _debug: Network._debug("[%s]process_pdu %r", self.name, pdu)
if self.dropPercent != 0.0:
if (random.random() * 100.0) < self.dropPercent:
# randomly drop a packet
if self.drop_percent != 0.0:
if (random.random() * 100.0) < self.drop_percent:
if _debug: Network._debug(" - packet dropped")
return
if not pdu.pduDestination or not isinstance(pdu.pduDestination, Address):
raise RuntimeError("invalid destination address")
elif pdu.pduDestination.addrType == Address.localBroadcastAddr:
if pdu.pduDestination == self.broadcast_address:
for n in self.nodes:
if (pdu.pduSource != n.address):
if _debug: Network._debug(" - match: %r", n)
n.response(deepcopy(pdu))
elif pdu.pduDestination.addrType == Address.localStationAddr:
else:
for n in self.nodes:
if n.promiscuous or (pdu.pduDestination == n.address):
if _debug: Network._debug(" - match: %r", n)
n.response(deepcopy(pdu))
else:
raise RuntimeError("invalid destination address type")
def __len__(self):
""" Simple way to determine the number of nodes in the network. """
if _debug: Network._debug("__len__")
@ -84,21 +88,19 @@ bacpypes_debugging(Network)
class Node(Server):
def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None):
def __init__(self, addr, lan=None, name='', promiscuous=False, spoofing=False, sid=None):
if _debug:
Node._debug("__init__ %r lan=%r promiscuous=%r spoofing=%r sid=%r",
addr, lan, promiscuous, spoofing, sid
Node._debug("__init__ %r lan=%r name=%r, promiscuous=%r spoofing=%r sid=%r",
addr, lan, name, promiscuous, spoofing, sid
)
Server.__init__(self, sid)
if not isinstance(addr, Address):
raise TypeError("addr must be an address")
self.lan = None
self.address = addr
self.name = name
# bind to a lan if it was provided
if lan:
if lan is not None:
self.bind(lan)
# might receive all packets and might spoof
@ -113,7 +115,7 @@ class Node(Server):
def indication(self, pdu):
"""Send a message."""
if _debug: Node._debug("indication %r", pdu)
if _debug: Node._debug("[%s]indication %r", self.name, pdu)
# make sure we're connected
if not self.lan:
@ -130,3 +132,133 @@ class Node(Server):
deferred(self.lan.process_pdu, pdu)
bacpypes_debugging(Node)
#
# IPNetwork
#
class IPNetwork(Network):
"""
IPNetwork instances are Network objects where the addresses on the
network are tuples that would be used for sockets like ('1.2.3.4', 5).
The first node added to the network sets the broadcast address, like
('1.2.3.255', 5) and the other nodes must have the same tuple.
"""
def __init__(self):
if _debug: IPNetwork._debug("__init__")
Network.__init__(self)
def add_node(self, node):
if _debug: IPNetwork._debug("add_node %r", node)
# first node sets the broadcast tuple, other nodes much match
if not self.nodes:
self.broadcast_address = node.addrBroadcastTuple
elif node.addrBroadcastTuple != self.broadcast_address:
raise ValueError("nodes must all have the same broadcast tuple")
# continue along
Network.add_node(self, node)
bacpypes_debugging(IPNetwork)
#
# IPNode
#
class IPNode(Node):
"""
An IPNode is a Node where the address is an Address that has an address
tuple and a broadcast tuple that would be used for socket communications.
"""
def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None):
if _debug: IPNode._debug("__init__ %r lan=%r", addr, lan)
# make sure it's an Address that has appropriate pieces
if not isinstance(addr, Address) or (not hasattr(addr, 'addrTuple')) \
or (not hasattr(addr, 'addrBroadcastTuple')):
raise ValueError("malformed address")
# save the address information
self.addrTuple = addr.addrTuple
self.addrBroadcastTuple = addr.addrBroadcastTuple
# continue initializing
Node.__init__(self, addr.addrTuple, lan=lan, promiscuous=promiscuous, spoofing=spoofing, sid=sid)
bacpypes_debugging(IPNode)
#
# IPRouterNode
#
class IPRouterNode(Client):
def __init__(self, router, addr, lan=None):
if _debug: IPRouterNode._debug("__init__ %r %r lan=%r", router, addr, lan)
# save the reference to the router
self.router = router
# make ourselves an IPNode and bind to it
self.node = IPNode(addr, lan=lan, promiscuous=True, spoofing=True)
bind(self, self.node)
# save our mask and subnet
self.addrMask = addr.addrMask
self.addrSubnet = addr.addrSubnet
def confirmation(self, pdu):
if _debug: IPRouterNode._debug("confirmation %r", pdu)
self.router.process_pdu(self, pdu)
def process_pdu(self, pdu):
if _debug: IPRouterNode._debug("process_pdu %r", pdu)
# pass it downstream
self.request(pdu)
bacpypes_debugging(IPRouterNode)
#
# IPRouter
#
class IPRouter:
def __init__(self):
if _debug: IPRouter._debug("__init__")
# connected network nodes
self.nodes = []
def add_network(self, addr, lan):
if _debug: IPRouter._debug("add_network %r %r", addr, lan)
node = IPRouterNode(self, addr, lan)
if _debug: IPRouter._debug(" - node: %r", node)
self.nodes.append(node)
def process_pdu(self, node, pdu):
if _debug: IPRouter._debug("process_pdu %r %r", node, pdu)
# unpack the address part of the destination
addrstr = socket.inet_aton(pdu.pduDestination[0])
ipaddr = struct.unpack('!L', addrstr)[0]
if _debug: IPRouter._debug(" - ipaddr: %r", ipaddr)
# loop through the other nodes
for inode in self.nodes:
if inode is not node:
if (ipaddr & inode.addrMask) == inode.addrSubnet:
if _debug: IPRouter._debug(" - inode: %r", inode)
inode.process_pdu(pdu)
bacpypes_debugging(IPRouter)

View File

@ -221,7 +221,7 @@ class WriteBroadcastDistributionTable(BVLPDU):
BVLCI.update(bvlpdu, self)
for bdte in self.bvlciBDT:
bvlpdu.put_data( bdte.addrAddr )
bvlpdu.put_data( bdte.addrMask )
bvlpdu.put_long( bdte.addrMask )
def decode(self, bvlpdu):
BVLCI.update(self, bvlpdu)
@ -398,6 +398,11 @@ class FDTEntry(DebugContents):
self.fdTTL = None
self.fdRemain = None
def __eq__(self, other):
"""Return true iff entries are identical."""
return (self.fdAddress == other.fdAddress) and \
(self.fdTTL == other.fdTTL) and (self.fdRemain == other.fdRemain)
def bvlpdu_contents(self, use_dict=None, as_class=dict):
"""Return the contents of an object as a dict."""
# make/extend the dictionary of content

View File

@ -73,7 +73,7 @@ _identLock = threading.Lock()
@bacpypes_debugging
class IOCB(DebugContents):
_debugContents = \
_debug_contents = \
( 'args', 'kwargs'
, 'ioState', 'ioResponse-', 'ioError'
, 'ioController', 'ioServerRef', 'ioControllerRef', 'ioClientID', 'ioClientAddr'
@ -227,7 +227,7 @@ class IOCB(DebugContents):
@bacpypes_debugging
class IOChainMixIn(DebugContents):
_debugContents = ( 'ioChain++', )
_debug_contents = ( 'ioChain++', )
def __init__(self, iocb):
if _debug: IOChainMixIn._debug("__init__ %r", iocb)
@ -364,7 +364,7 @@ class IOChain(IOCB, IOChainMixIn):
@bacpypes_debugging
class IOGroup(IOCB, DebugContents):
_debugContents = ('ioMembers',)
_debug_contents = ('ioMembers',)
def __init__(self):
"""Initialize a group."""
@ -699,7 +699,7 @@ class IOQController(IOController):
# if we're busy, queue it
if (self.state != CTRL_IDLE):
if _debug: IOQController._debug(" - busy, request queued")
if _debug: IOQController._debug(" - busy, request queued, active_iocb: %r", self.active_iocb)
iocb.ioState = PENDING
self.ioQueue.put(iocb)
@ -714,6 +714,7 @@ class IOQController(IOController):
except:
# extract the error
err = sys.exc_info()[1]
if _debug: IOQController._debug(" - process_io() exception: %r", err)
# if there was an error, abort the request
if err:
@ -865,7 +866,7 @@ class ClientController(Client, IOQController):
self.request(iocb.args[0])
def confirmation(self, pdu):
if _debug: ClientController._debug("confirmation %r %r", args, kwargs)
if _debug: ClientController._debug("confirmation %r", pdu)
# make sure it has an active iocb
if not self.active_iocb:
@ -909,13 +910,18 @@ class SieveQueue(IOQController):
@bacpypes_debugging
class SieveClientController(Client, IOController):
def __init__(self):
def __init__(self, queue_class=SieveQueue):
if _debug: SieveClientController._debug("__init__")
Client.__init__(self)
IOController.__init__(self)
# make sure it's the correct class
if not issubclass(queue_class, SieveQueue):
raise TypeError("queue class must be a subclass of SieveQueue")
# queues for each address
self.queues = {}
self.queue_class = queue_class
def process_io(self, iocb):
if _debug: SieveClientController._debug("process_io %r", iocb)
@ -928,7 +934,7 @@ class SieveClientController(Client, IOController):
queue = self.queues.get(destination_address, None)
if not queue:
if _debug: SieveClientController._debug(" - new queue")
queue = SieveQueue(self.request, destination_address)
queue = self.queue_class(self.request, destination_address)
self.queues[destination_address] = queue
if _debug: SieveClientController._debug(" - queue: %r", queue)

View File

@ -622,7 +622,7 @@ class Unsigned(Atomic):
@classmethod
def is_valid(cls, arg):
"""Return True if arg is valid value for the class."""
return isinstance(arg, (int, long)) and (arg >= 0)
return isinstance(arg, (int, long)) and (not isinstance(arg, bool)) and (arg >= 0)
def __str__(self):
return "Unsigned(%s)" % (self.value, )
@ -697,7 +697,7 @@ class Integer(Atomic):
@classmethod
def is_valid(cls, arg):
"""Return True if arg is valid value for the class."""
return isinstance(arg, (int, long))
return isinstance(arg, (int, long)) and (not isinstance(arg, bool))
def __str__(self):
return "Integer(%s)" % (self.value, )

View File

@ -5,6 +5,8 @@ Virtual Local Area Network
"""
import random
import socket
import struct
from copy import deepcopy
from .errors import ConfigurationError
@ -12,7 +14,7 @@ from .debugging import ModuleLogger, bacpypes_debugging
from .core import deferred
from .pdu import Address
from .comm import Server
from .comm import Client, Server, bind
# some debugging
_debug = 0
@ -25,11 +27,13 @@ _log = ModuleLogger(globals())
@bacpypes_debugging
class Network:
def __init__(self, dropPercent=0.0):
if _debug: Network._debug("__init__ dropPercent=%r", dropPercent)
def __init__(self, name='', broadcast_address=None, drop_percent=0.0):
if _debug: Network._debug("__init__ name=%r broadcast_address=%r drop_percent=%r", name, broadcast_address, drop_percent)
self.name = name
self.nodes = []
self.dropPercent = dropPercent
self.broadcast_address = broadcast_address
self.drop_percent = drop_percent
def add_node(self, node):
""" Add a node to this network, let the node know which network it's on. """
@ -38,6 +42,10 @@ class Network:
self.nodes.append(node)
node.lan = self
# update the node name
if not node.name:
node.name = '%s:%s' % (self.name, node.address)
def remove_node(self, node):
""" Remove a node from this network. """
if _debug: Network._debug("remove_node %r", node)
@ -49,29 +57,25 @@ class Network:
""" Process a PDU by sending a copy to each node as dictated by the
addressing and if a node is promiscuous.
"""
if _debug: Network._debug("process_pdu %r", pdu)
if _debug: Network._debug("[%s]process_pdu %r", self.name, pdu)
if self.dropPercent != 0.0:
if (random.random() * 100.0) < self.dropPercent:
# randomly drop a packet
if self.drop_percent != 0.0:
if (random.random() * 100.0) < self.drop_percent:
if _debug: Network._debug(" - packet dropped")
return
if not pdu.pduDestination or not isinstance(pdu.pduDestination, Address):
raise RuntimeError("invalid destination address")
elif pdu.pduDestination.addrType == Address.localBroadcastAddr:
if pdu.pduDestination == self.broadcast_address:
for n in self.nodes:
if (pdu.pduSource != n.address):
if _debug: Network._debug(" - match: %r", n)
n.response(deepcopy(pdu))
elif pdu.pduDestination.addrType == Address.localStationAddr:
else:
for n in self.nodes:
if n.promiscuous or (pdu.pduDestination == n.address):
if _debug: Network._debug(" - match: %r", n)
n.response(deepcopy(pdu))
else:
raise RuntimeError("invalid destination address type")
def __len__(self):
""" Simple way to determine the number of nodes in the network. """
if _debug: Network._debug("__len__")
@ -84,21 +88,19 @@ class Network:
@bacpypes_debugging
class Node(Server):
def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None):
def __init__(self, addr, lan=None, name='', promiscuous=False, spoofing=False, sid=None):
if _debug:
Node._debug("__init__ %r lan=%r promiscuous=%r spoofing=%r sid=%r",
addr, lan, promiscuous, spoofing, sid
Node._debug("__init__ %r lan=%r name=%r, promiscuous=%r spoofing=%r sid=%r",
addr, lan, name, promiscuous, spoofing, sid
)
Server.__init__(self, sid)
if not isinstance(addr, Address):
raise TypeError("addr must be an address")
self.lan = None
self.address = addr
self.name = name
# bind to a lan if it was provided
if lan:
if lan is not None:
self.bind(lan)
# might receive all packets and might spoof
@ -113,7 +115,7 @@ class Node(Server):
def indication(self, pdu):
"""Send a message."""
if _debug: Node._debug("indication %r", pdu)
if _debug: Node._debug("[%s]indication %r", self.name, pdu)
# make sure we're connected
if not self.lan:
@ -128,3 +130,132 @@ class Node(Server):
# actual network delivery is deferred
deferred(self.lan.process_pdu, pdu)
#
# IPNetwork
#
@bacpypes_debugging
class IPNetwork(Network):
"""
IPNetwork instances are Network objects where the addresses on the
network are tuples that would be used for sockets like ('1.2.3.4', 5).
The first node added to the network sets the broadcast address, like
('1.2.3.255', 5) and the other nodes must have the same tuple.
"""
def __init__(self):
if _debug: IPNetwork._debug("__init__")
Network.__init__(self)
def add_node(self, node):
if _debug: IPNetwork._debug("add_node %r", node)
# first node sets the broadcast tuple, other nodes much match
if not self.nodes:
self.broadcast_address = node.addrBroadcastTuple
elif node.addrBroadcastTuple != self.broadcast_address:
raise ValueError("nodes must all have the same broadcast tuple")
# continue along
Network.add_node(self, node)
#
# IPNode
#
@bacpypes_debugging
class IPNode(Node):
"""
An IPNode is a Node where the address is an Address that has an address
tuple and a broadcast tuple that would be used for socket communications.
"""
def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None):
if _debug: IPNode._debug("__init__ %r lan=%r", addr, lan)
# make sure it's an Address that has appropriate pieces
if not isinstance(addr, Address) or (not hasattr(addr, 'addrTuple')) \
or (not hasattr(addr, 'addrBroadcastTuple')):
raise ValueError("malformed address")
# save the address information
self.addrTuple = addr.addrTuple
self.addrBroadcastTuple = addr.addrBroadcastTuple
# continue initializing
Node.__init__(self, addr.addrTuple, lan=lan, promiscuous=promiscuous, spoofing=spoofing, sid=sid)
#
# IPRouterNode
#
@bacpypes_debugging
class IPRouterNode(Client):
def __init__(self, router, addr, lan=None):
if _debug: IPRouterNode._debug("__init__ %r %r lan=%r", router, addr, lan)
# save the reference to the router
self.router = router
# make ourselves an IPNode and bind to it
self.node = IPNode(addr, lan=lan, promiscuous=True, spoofing=True)
bind(self, self.node)
# save our mask and subnet
self.addrMask = addr.addrMask
self.addrSubnet = addr.addrSubnet
def confirmation(self, pdu):
if _debug: IPRouterNode._debug("confirmation %r", pdu)
self.router.process_pdu(self, pdu)
def process_pdu(self, pdu):
if _debug: IPRouterNode._debug("process_pdu %r", pdu)
# pass it downstream
self.request(pdu)
#
# IPRouter
#
@bacpypes_debugging
class IPRouter:
def __init__(self):
if _debug: IPRouter._debug("__init__")
# connected network nodes
self.nodes = []
def add_network(self, addr, lan):
if _debug: IPRouter._debug("add_network %r %r", addr, lan)
node = IPRouterNode(self, addr, lan)
if _debug: IPRouter._debug(" - node: %r", node)
self.nodes.append(node)
def process_pdu(self, node, pdu):
if _debug: IPRouter._debug("process_pdu %r %r", node, pdu)
# unpack the address part of the destination
addrstr = socket.inet_aton(pdu.pduDestination[0])
ipaddr = struct.unpack('!L', addrstr)[0]
if _debug: IPRouter._debug(" - ipaddr: %r", ipaddr)
# loop through the other nodes
for inode in self.nodes:
if inode is not node:
if (ipaddr & inode.addrMask) == inode.addrSubnet:
if _debug: IPRouter._debug(" - inode: %r", inode)
inode.process_pdu(pdu)

View File

@ -221,7 +221,7 @@ class WriteBroadcastDistributionTable(BVLPDU):
BVLCI.update(bvlpdu, self)
for bdte in self.bvlciBDT:
bvlpdu.put_data( bdte.addrAddr )
bvlpdu.put_data( bdte.addrMask )
bvlpdu.put_long( bdte.addrMask )
def decode(self, bvlpdu):
BVLCI.update(self, bvlpdu)
@ -398,6 +398,11 @@ class FDTEntry(DebugContents):
self.fdTTL = None
self.fdRemain = None
def __eq__(self, other):
"""Return true iff entries are identical."""
return (self.fdAddress == other.fdAddress) and \
(self.fdTTL == other.fdTTL) and (self.fdRemain == other.fdRemain)
def bvlpdu_contents(self, use_dict=None, as_class=dict):
"""Return the contents of an object as a dict."""
# make/extend the dictionary of content

View File

@ -73,7 +73,7 @@ _identLock = threading.Lock()
@bacpypes_debugging
class IOCB(DebugContents):
_debugContents = \
_debug_contents = \
( 'args', 'kwargs'
, 'ioState', 'ioResponse-', 'ioError'
, 'ioController', 'ioServerRef', 'ioControllerRef', 'ioClientID', 'ioClientAddr'
@ -227,7 +227,7 @@ class IOCB(DebugContents):
@bacpypes_debugging
class IOChainMixIn(DebugContents):
_debugContents = ( 'ioChain++', )
_debug_contents = ( 'ioChain++', )
def __init__(self, iocb):
if _debug: IOChainMixIn._debug("__init__ %r", iocb)
@ -364,7 +364,7 @@ class IOChain(IOCB, IOChainMixIn):
@bacpypes_debugging
class IOGroup(IOCB, DebugContents):
_debugContents = ('ioMembers',)
_debug_contents = ('ioMembers',)
def __init__(self):
"""Initialize a group."""
@ -699,7 +699,7 @@ class IOQController(IOController):
# if we're busy, queue it
if (self.state != CTRL_IDLE):
if _debug: IOQController._debug(" - busy, request queued")
if _debug: IOQController._debug(" - busy, request queued, active_iocb: %r", self.active_iocb)
iocb.ioState = PENDING
self.ioQueue.put(iocb)
@ -714,9 +714,11 @@ class IOQController(IOController):
except:
# extract the error
err = sys.exc_info()[1]
if _debug: IOQController._debug(" - process_io() exception: %r", err)
# if there was an error, abort the request
if err:
if _debug: IOQController._debug(" - aborting")
self.abort_io(iocb, err)
def process_io(self, iocb):
@ -865,7 +867,7 @@ class ClientController(Client, IOQController):
self.request(iocb.args[0])
def confirmation(self, pdu):
if _debug: ClientController._debug("confirmation %r %r", args, kwargs)
if _debug: ClientController._debug("confirmation %r", pdu)
# make sure it has an active iocb
if not self.active_iocb:
@ -909,13 +911,18 @@ class SieveQueue(IOQController):
@bacpypes_debugging
class SieveClientController(Client, IOController):
def __init__(self):
def __init__(self, queue_class=SieveQueue):
if _debug: SieveClientController._debug("__init__")
Client.__init__(self)
IOController.__init__(self)
# make sure it's the correct class
if not issubclass(queue_class, SieveQueue):
raise TypeError("queue class must be a subclass of SieveQueue")
# queues for each address
self.queues = {}
self.queue_class = queue_class
def process_io(self, iocb):
if _debug: SieveClientController._debug("process_io %r", iocb)
@ -928,7 +935,7 @@ class SieveClientController(Client, IOController):
queue = self.queues.get(destination_address, None)
if not queue:
if _debug: SieveClientController._debug(" - new queue")
queue = SieveQueue(self.request, destination_address)
queue = self.queue_class(self.request, destination_address)
self.queues[destination_address] = queue
if _debug: SieveClientController._debug(" - queue: %r", queue)

View File

@ -640,7 +640,7 @@ class Unsigned(Atomic):
@classmethod
def is_valid(cls, arg):
"""Return True if arg is valid value for the class."""
return isinstance(arg, (int, long)) and (arg >= 0)
return isinstance(arg, int) and (not isinstance(arg, bool)) and (arg >= 0)
def __str__(self):
return "Unsigned(%s)" % (self.value, )
@ -713,7 +713,7 @@ class Integer(Atomic):
@classmethod
def is_valid(cls, arg):
"""Return True if arg is valid value for the class."""
return isinstance(arg, int)
return isinstance(arg, int) and (not isinstance(arg, bool))
def __str__(self):
return "Integer(%s)" % (self.value, )

View File

@ -5,6 +5,8 @@ Virtual Local Area Network
"""
import random
import socket
import struct
from copy import deepcopy
from .errors import ConfigurationError
@ -12,7 +14,7 @@ from .debugging import ModuleLogger, bacpypes_debugging
from .core import deferred
from .pdu import Address
from .comm import Server
from .comm import Client, Server, bind
# some debugging
_debug = 0
@ -25,11 +27,13 @@ _log = ModuleLogger(globals())
@bacpypes_debugging
class Network:
def __init__(self, dropPercent=0.0):
if _debug: Network._debug("__init__ dropPercent=%r", dropPercent)
def __init__(self, name='', broadcast_address=None, drop_percent=0.0):
if _debug: Network._debug("__init__ name=%r broadcast_address=%r drop_percent=%r", name, broadcast_address, drop_percent)
self.name = name
self.nodes = []
self.dropPercent = dropPercent
self.broadcast_address = broadcast_address
self.drop_percent = drop_percent
def add_node(self, node):
""" Add a node to this network, let the node know which network it's on. """
@ -38,6 +42,10 @@ class Network:
self.nodes.append(node)
node.lan = self
# update the node name
if not node.name:
node.name = '%s:%s' % (self.name, node.address)
def remove_node(self, node):
""" Remove a node from this network. """
if _debug: Network._debug("remove_node %r", node)
@ -49,29 +57,25 @@ class Network:
""" Process a PDU by sending a copy to each node as dictated by the
addressing and if a node is promiscuous.
"""
if _debug: Network._debug("process_pdu %r", pdu)
if _debug: Network._debug("[%s]process_pdu %r", self.name, pdu)
if self.dropPercent != 0.0:
if (random.random() * 100.0) < self.dropPercent:
# randomly drop a packet
if self.drop_percent != 0.0:
if (random.random() * 100.0) < self.drop_percent:
if _debug: Network._debug(" - packet dropped")
return
if not pdu.pduDestination or not isinstance(pdu.pduDestination, Address):
raise RuntimeError("invalid destination address")
elif pdu.pduDestination.addrType == Address.localBroadcastAddr:
if pdu.pduDestination == self.broadcast_address:
for n in self.nodes:
if (pdu.pduSource != n.address):
if _debug: Network._debug(" - match: %r", n)
n.response(deepcopy(pdu))
elif pdu.pduDestination.addrType == Address.localStationAddr:
else:
for n in self.nodes:
if n.promiscuous or (pdu.pduDestination == n.address):
if _debug: Network._debug(" - match: %r", n)
n.response(deepcopy(pdu))
else:
raise RuntimeError("invalid destination address type")
def __len__(self):
""" Simple way to determine the number of nodes in the network. """
if _debug: Network._debug("__len__")
@ -84,21 +88,19 @@ class Network:
@bacpypes_debugging
class Node(Server):
def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None):
def __init__(self, addr, lan=None, name='', promiscuous=False, spoofing=False, sid=None):
if _debug:
Node._debug("__init__ %r lan=%r promiscuous=%r spoofing=%r sid=%r",
addr, lan, promiscuous, spoofing, sid
Node._debug("__init__ %r lan=%r name=%r, promiscuous=%r spoofing=%r sid=%r",
addr, lan, name, promiscuous, spoofing, sid
)
Server.__init__(self, sid)
if not isinstance(addr, Address):
raise TypeError("addr must be an address")
self.lan = None
self.address = addr
self.name = name
# bind to a lan if it was provided
if lan:
if lan is not None:
self.bind(lan)
# might receive all packets and might spoof
@ -113,7 +115,7 @@ class Node(Server):
def indication(self, pdu):
"""Send a message."""
if _debug: Node._debug("indication %r", pdu)
if _debug: Node._debug("[%s]indication %r", self.name, pdu)
# make sure we're connected
if not self.lan:
@ -128,3 +130,132 @@ class Node(Server):
# actual network delivery is deferred
deferred(self.lan.process_pdu, pdu)
#
# IPNetwork
#
@bacpypes_debugging
class IPNetwork(Network):
"""
IPNetwork instances are Network objects where the addresses on the
network are tuples that would be used for sockets like ('1.2.3.4', 5).
The first node added to the network sets the broadcast address, like
('1.2.3.255', 5) and the other nodes must have the same tuple.
"""
def __init__(self):
if _debug: IPNetwork._debug("__init__")
Network.__init__(self)
def add_node(self, node):
if _debug: IPNetwork._debug("add_node %r", node)
# first node sets the broadcast tuple, other nodes much match
if not self.nodes:
self.broadcast_address = node.addrBroadcastTuple
elif node.addrBroadcastTuple != self.broadcast_address:
raise ValueError("nodes must all have the same broadcast tuple")
# continue along
Network.add_node(self, node)
#
# IPNode
#
@bacpypes_debugging
class IPNode(Node):
"""
An IPNode is a Node where the address is an Address that has an address
tuple and a broadcast tuple that would be used for socket communications.
"""
def __init__(self, addr, lan=None, promiscuous=False, spoofing=False, sid=None):
if _debug: IPNode._debug("__init__ %r lan=%r", addr, lan)
# make sure it's an Address that has appropriate pieces
if not isinstance(addr, Address) or (not hasattr(addr, 'addrTuple')) \
or (not hasattr(addr, 'addrBroadcastTuple')):
raise ValueError("malformed address")
# save the address information
self.addrTuple = addr.addrTuple
self.addrBroadcastTuple = addr.addrBroadcastTuple
# continue initializing
Node.__init__(self, addr.addrTuple, lan=lan, promiscuous=promiscuous, spoofing=spoofing, sid=sid)
#
# IPRouterNode
#
@bacpypes_debugging
class IPRouterNode(Client):
def __init__(self, router, addr, lan=None):
if _debug: IPRouterNode._debug("__init__ %r %r lan=%r", router, addr, lan)
# save the reference to the router
self.router = router
# make ourselves an IPNode and bind to it
self.node = IPNode(addr, lan=lan, promiscuous=True, spoofing=True)
bind(self, self.node)
# save our mask and subnet
self.addrMask = addr.addrMask
self.addrSubnet = addr.addrSubnet
def confirmation(self, pdu):
if _debug: IPRouterNode._debug("confirmation %r", pdu)
self.router.process_pdu(self, pdu)
def process_pdu(self, pdu):
if _debug: IPRouterNode._debug("process_pdu %r", pdu)
# pass it downstream
self.request(pdu)
#
# IPRouter
#
@bacpypes_debugging
class IPRouter:
def __init__(self):
if _debug: IPRouter._debug("__init__")
# connected network nodes
self.nodes = []
def add_network(self, addr, lan):
if _debug: IPRouter._debug("add_network %r %r", addr, lan)
node = IPRouterNode(self, addr, lan)
if _debug: IPRouter._debug(" - node: %r", node)
self.nodes.append(node)
def process_pdu(self, node, pdu):
if _debug: IPRouter._debug("process_pdu %r %r", node, pdu)
# unpack the address part of the destination
addrstr = socket.inet_aton(pdu.pduDestination[0])
ipaddr = struct.unpack('!L', addrstr)[0]
if _debug: IPRouter._debug(" - ipaddr: %r", ipaddr)
# loop through the other nodes
for inode in self.nodes:
if inode is not node:
if (ipaddr & inode.addrMask) == inode.addrSubnet:
if _debug: IPRouter._debug(" - inode: %r", inode)
inode.process_pdu(pdu)

View File

@ -83,7 +83,7 @@ _identLock = threading.Lock()
@bacpypes_debugging
class IOCB(DebugContents):
_debugContents = \
_debug_contents = \
( 'args', 'kwargs'
, 'ioState', 'ioResponse-', 'ioError'
, 'ioController', 'ioServerRef', 'ioControllerRef', 'ioClientID', 'ioClientAddr'

View File

@ -20,3 +20,6 @@ from . import test_pdu
from . import test_primitive_data
from . import test_utilities
from . import test_vlan
from . import test_bvll

View File

@ -45,10 +45,18 @@ class SendTransition(Transition):
class ReceiveTransition(Transition):
def __init__(self, pdu, next_state):
def __init__(self, criteria, next_state):
Transition.__init__(self, next_state)
self.pdu = pdu
self.criteria = criteria
class EventTransition(Transition):
def __init__(self, event_id, next_state):
Transition.__init__(self, next_state)
self.event_id = event_id
class TimeoutTransition(Transition):
@ -59,6 +67,36 @@ class TimeoutTransition(Transition):
self.timeout = timeout
#
# match_pdu
#
@bacpypes_debugging
def match_pdu(pdu, pdu_type=None, **pdu_attrs):
if _debug: match_pdu._debug("match_pdu %r %r %r", pdu, pdu_type, pdu_attrs)
# check the type
if pdu_type and not isinstance(pdu, pdu_type):
if _debug: match_pdu._debug(" - wrong type")
return False
# check for matching attribute values
for attr_name, attr_value in pdu_attrs.items():
if not hasattr(pdu, attr_name):
if _debug: match_pdu._debug(" - missing attr: %r", attr_name)
return False
if getattr(pdu, attr_name) != attr_value:
if _debug: StateMachine._debug(" - attr value: %r, %r", attr_name, attr_value)
return False
if _debug: match_pdu._debug(" - successful_match")
return True
#
# State
#
@bacpypes_debugging
class State(object):
@ -88,6 +126,11 @@ class State(object):
self.send_transitions = []
self.receive_transitions = []
# empty lists of event transitions
self.set_event_transitions = []
self.clear_event_transitions = []
self.wait_event_transitions = []
# timeout transition
self.timeout_transition = None
@ -215,14 +258,26 @@ class State(object):
"""Called after each PDU sent."""
self.state_machine.after_send(pdu)
def receive(self, pdu, next_state=None):
def receive(self, pdu_type, **pdu_attrs):
"""Create a ReceiveTransition from this state to another, possibly new,
state. The next state is returned for method chaining.
:param pdu: PDU to match
:param criteria: PDU to match
:param next_state: destination state after a successful match
Simulate the function as if it was defined in Python3.5+ like this:
def receive(self, *pdu_type, next_state=None, **pdu_attrs)
"""
if _debug: State._debug("receive(%s) %r next_state=%r", self.doc_string, pdu, next_state)
if _debug: State._debug("receive(%s) %r %r", self.doc_string, pdu_type, pdu_attrs)
# extract the next_state keyword argument
if 'next_state' in pdu_attrs:
next_state = pdu_attrs['next_state']
if _debug: State._debug(" - next_state: %r", next_state)
del pdu_attrs['next_state']
else:
next_state = None
# maybe build a new state
if not next_state:
@ -231,8 +286,12 @@ class State(object):
elif next_state not in self.state_machine.states:
raise ValueError("off the rails")
# create a bundle of the match criteria
criteria = (pdu_type, pdu_attrs)
if _debug: State._debug(" - criteria: %r", criteria)
# add this to the list of transitions
self.receive_transitions.append(ReceiveTransition(pdu, next_state))
self.receive_transitions.append(ReceiveTransition(criteria, next_state))
# return the next state
return next_state
@ -253,6 +312,60 @@ class State(object):
# pass along to the state machine
self.state_machine.unexpected_receive(pdu)
def set_event(self, event_id):
"""Create an EventTransition for this state that sets an event. The
current state is returned for method chaining.
:param event_id: event identifier
"""
if _debug: State._debug("set_event(%s) %r", self.doc_string, event_id)
# add this to the list of transitions
self.set_event_transitions.append(EventTransition(event_id, None))
# return the next state
return self
def event_set(self, event_id):
"""Called with the event that was set."""
pass
def clear_event(self, event_id):
"""Create an EventTransition for this state that clears an event. The
current state is returned for method chaining.
:param event_id: event identifier
"""
if _debug: State._debug("clear_event(%s) %r", self.doc_string, event_id)
# add this to the list of transitions
self.clear_event_transitions.append(EventTransition(event_id, None))
# return the next state
return next_state
def wait_event(self, event_id, next_state=None):
"""Create an EventTransition from this state to another, possibly new,
state. The next state is returned for method chaining.
:param pdu: PDU to send
:param next_state: state to transition to after sending
"""
if _debug: State._debug("wait_event(%s) %r next_state=%r", self.doc_string, event_id, next_state)
# maybe build a new state
if not next_state:
next_state = self.state_machine.new_state()
if _debug: State._debug(" - new next_state: %r", next_state)
elif next_state not in self.state_machine.states:
raise ValueError("off the rails")
# add this to the list of transitions
self.wait_event_transitions.append(EventTransition(event_id, next_state))
# return the next state
return next_state
def timeout(self, delay, next_state=None):
"""Create a TimeoutTransition from this state to another, possibly new,
state. There can only be one timeout transition per state. The next
@ -473,6 +586,18 @@ class StateMachine(object):
# here we are
current_state = self.current_state = state
# events are managed by a state machine group
if self.machine_group:
# setting events
for transition in current_state.set_event_transitions:
if _debug: StateMachine._debug(" - setting event: %r", transition.event_id)
self.machine_group.set_event(transition.event_id)
# clearing events
for transition in current_state.clear_event_transitions:
if _debug: StateMachine._debug(" - clearing event: %r", transition.event_id)
self.machine_group.clear_event(transition.event_id)
# check for success state
if current_state.is_success_state:
if _debug: StateMachine._debug(" - success state")
@ -508,20 +633,39 @@ class StateMachine(object):
# assume we can stay
next_state = None
# events are managed by a state machine group
if self.machine_group:
# check to see if there are events that are already set
for transition in current_state.wait_event_transitions:
if _debug: StateMachine._debug(" - waiting event: %r", transition.event_id)
# check for a transition
if transition.event_id in self.machine_group.events:
next_state = transition.next_state
if _debug: StateMachine._debug(" - next_state: %r", next_state)
if next_state is not current_state:
break
else:
if _debug: StateMachine._debug(" - no events already set")
else:
if _debug: StateMachine._debug(" - not part of a group")
# send everything that needs to be sent
for transition in current_state.send_transitions:
if _debug: StateMachine._debug(" - sending: %r", transition)
if not next_state:
for transition in current_state.send_transitions:
if _debug: StateMachine._debug(" - sending: %r", transition)
current_state.before_send(transition.pdu)
self.send(transition.pdu)
current_state.after_send(transition.pdu)
current_state.before_send(transition.pdu)
self.send(transition.pdu)
current_state.after_send(transition.pdu)
# check for a transition
next_state = transition.next_state
if _debug: StateMachine._debug(" - next_state: %r", next_state)
# check for a transition
next_state = transition.next_state
if _debug: StateMachine._debug(" - next_state: %r", next_state)
if next_state is not current_state:
break
if next_state is not current_state:
break
if not next_state:
if _debug: StateMachine._debug(" - nowhere to go")
@ -552,10 +696,19 @@ class StateMachine(object):
# add a reference to the pdu in the transaction log
self.transaction_log.append(("<<<", pdu),)
def send(self, pdu):
raise NotImplementedError("send not implemented")
def after_send(self, pdu):
"""Called after each PDU sent."""
pass
def before_receive(self, pdu):
"""Called with each PDU received before matching."""
# add a reference to the pdu in the transaction log
self.transaction_log.append((">>>", pdu),)
def receive(self, pdu):
if _debug: StateMachine._debug("receive %r", pdu)
@ -581,7 +734,7 @@ class StateMachine(object):
# look for a matching receive transition
for transition in current_state.receive_transitions:
if self.match_pdu(pdu, transition.pdu):
if self.match_pdu(pdu, transition.criteria):
if _debug: StateMachine._debug(" - match found")
match_found = True
@ -606,12 +759,6 @@ class StateMachine(object):
self.goto_state(next_state)
def before_receive(self, pdu):
"""Called with each PDU received before matching."""
# add a reference to the pdu in the transaction log
self.transaction_log.append((">>>", pdu),)
def after_receive(self, pdu):
"""Called with PDU received after match."""
pass
@ -624,6 +771,49 @@ class StateMachine(object):
# go to the unexpected receive state (failing)
self.goto_state(self.unexpected_receive_state)
def event_set(self, event_id):
"""Called by the state machine group when an event is set, the state
machine checks to see if it's waiting for the event and makes the
state transition if there is a match."""
if _debug: StateMachine._debug("event_set %r", event_id)
if not self.running:
if _debug: StateMachine._debug(" - not running")
return
# check to see if we are transitioning
if self.state_transitioning:
if _debug: StateMachine._debug(" - transitioning")
return
if not self.current_state:
raise RuntimeError("no current state")
current_state = self.current_state
match_found = False
# look for a matching event transition
for transition in current_state.wait_event_transitions:
if transition.event_id == event_id:
if _debug: StateMachine._debug(" - match found")
match_found = True
# let the state know this event was set
current_state.event_set(event_id)
# check for a transition
next_state = transition.next_state
if _debug: StateMachine._debug(" - next_state: %r", next_state)
if next_state is not current_state:
break
else:
if _debug: StateMachine._debug(" - going nowhere")
if match_found and next_state is not current_state:
if _debug: StateMachine._debug(" - going")
self.goto_state(next_state)
def state_timeout(self):
if _debug: StateMachine._debug("state_timeout")
@ -644,13 +834,14 @@ class StateMachine(object):
# go to the state specified
self.goto_state(self.timeout_state)
def send(self, pdu):
raise NotImplementedError("send not implemented")
def match_pdu(self, pdu, criteria):
if _debug: StateMachine._debug("match_pdu %r %r", pdu, criteria)
def match_pdu(self, pdu, transition_pdu):
if _debug: StateMachine._debug("match_pdu %r %r", pdu, transition_pdu)
# separate the pdu_type and attributes to match
pdu_type, pdu_attrs = criteria
return pdu == transition_pdu
# pass along to the global function
return match_pdu(pdu, pdu_type, **pdu_attrs)
def __repr__(self):
if not self.running:
@ -698,6 +889,9 @@ class StateMachineGroup(object):
self.is_success_state = None
self.is_fail_state = None
# set of events that are set
self.events = set()
def append(self, state_machine):
"""Add a state machine to the end of the list of state machines."""
if _debug: StateMachineGroup._debug("append %r", state_machine)
@ -743,6 +937,32 @@ class StateMachineGroup(object):
self.is_success_state = False
self.is_fail_state = False
# events that are set
self.events = set()
def set_event(self, event_id):
"""Save an event as 'set' and pass it to the state machines to see
if they are in a state that is waiting for the event."""
if _debug: StateMachineGroup._debug("set_event %r", event_id)
self.events.add(event_id)
if _debug: StateMachineGroup._debug(" - event set")
# pass along to each machine
for state_machine in self.state_machines:
if _debug: StateMachineGroup._debug(" - state_machine: %r", state_machine)
state_machine.event_set(event_id)
def clear_event(self, event_id):
"""Remove an event from the set of elements that are 'set'."""
if _debug: StateMachineGroup._debug("clear_event %r", event_id)
if event_id in self.events:
self.events.remove(event_id)
if _debug: StateMachineGroup._debug(" - event cleared")
else:
if _debug: StateMachineGroup._debug(" - noop")
def run(self):
"""Runs all the machines in the group."""
if _debug: StateMachineGroup._debug("run")
@ -810,8 +1030,8 @@ class StateMachineGroup(object):
some_failed = some_failed or state_machine.current_state.is_fail_state
if _debug:
StateMachineGroup._debug(" all_success: %r", all_success)
StateMachineGroup._debug(" some_failed: %r", some_failed)
StateMachineGroup._debug(" - all_success: %r", all_success)
StateMachineGroup._debug(" - some_failed: %r", some_failed)
# return the results of the check
return (all_success, some_failed)

View File

@ -17,43 +17,65 @@ _log = ModuleLogger(globals())
@bacpypes_debugging
def setup_module():
"""This function is called once at the beginning of all of the tests
in this module."""
if _debug: setup_module._debug("setup_module")
@bacpypes_debugging
def teardown_module():
"""This function is called once at the end of the tests in this module."""
if _debug: teardown_module._debug("teardown_module")
@bacpypes_debugging
def setup_function(function):
"""This function is called before each module level test function."""
if _debug: setup_function._debug("setup_function %r", function)
@bacpypes_debugging
def teardown_function(function):
"""This function is called after each module level test function."""
if _debug: teardown_function._debug("teardown_function %r", function)
@bacpypes_debugging
def test_some_function(*args, **kwargs):
"""This is a module level test function."""
if _debug: setup_function._debug("test_some_function %r %r", args, kwargs)
@bacpypes_debugging
class TestCaseTemplate(unittest.TestCase):
@classmethod
def setup_class(cls):
"""This function is called once before the test case is instantiated
for each of the tests."""
if _debug: TestCaseTemplate._debug("setup_class")
@classmethod
def teardown_class(cls):
"""This function is called once at the end after the last instance
of the test case has been abandon."""
if _debug: TestCaseTemplate._debug("teardown_class")
def setup_method(self, method):
"""This function is called before each test method is called as is
given a reference to the test method."""
if _debug: TestCaseTemplate._debug("setup_method %r", method)
def teardown_method(self, method):
"""This function is called after each test method has been called and
is given a reference to the test method."""
if _debug: TestCaseTemplate._debug("teardown_method %r", method)
def test_something(self):
"""This is a method level test function."""
if _debug: TestCaseTemplate._debug("test_something")
def test_something_else(self):
"""This is another method level test function."""
if _debug: TestCaseTemplate._debug("test_something_else")

View File

@ -0,0 +1,8 @@
#!/usr/bin/python
"""
Test BVLL Module
"""
from . import test_codec

View File

@ -0,0 +1,276 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Test BVLL Encoding and Decoding
-------------------------------
"""
import string
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, btox, xtob
from bacpypes.comm import bind
from bacpypes.pdu import PDU, Address, LocalBroadcast
from bacpypes.bvll import Result, WriteBroadcastDistributionTable, \
ReadBroadcastDistributionTable, ReadBroadcastDistributionTableAck, \
ForwardedNPDU, RegisterForeignDevice, ReadForeignDeviceTable, \
ReadForeignDeviceTableAck, FDTEntry, DeleteForeignDeviceTableEntry, \
DistributeBroadcastToNetwork, OriginalUnicastNPDU, \
OriginalBroadcastNPDU
from bacpypes.bvllservice import AnnexJCodec
from ..trapped_classes import TrappedClient, TrappedServer
from ..state_machine import match_pdu
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# extended form of xtob that first removes whitespace and period seperators
xxtob = lambda s: xtob(''.join(s.split()).replace('.', ''))
@bacpypes_debugging
class TestAnnexJCodec(unittest.TestCase):
def setup_method(self, method):
"""This function is called before each test method is called as is
given a reference to the test method."""
if _debug: TestAnnexJCodec._debug("setup_method %r", method)
# minature trapped stack
self.client = TrappedClient()
self.codec = AnnexJCodec()
self.server = TrappedServer()
bind(self.client, self.codec, self.server)
def request(self, pdu):
"""Pass the PDU to the client to send down the stack."""
self.client.request(pdu)
def indication(self, pdu_type=None, **pdu_attrs):
"""Check what the server received."""
assert match_pdu(self.server.indication_received, pdu_type, **pdu_attrs)
def response(self, pdu):
"""Pass the PDU to the server to send up the stack."""
self.server.response(pdu)
def confirmation(self, pdu_type=None, **pdu_attrs):
"""Check what the client received."""
assert match_pdu(self.client.confirmation_received, pdu_type, **pdu_attrs)
def test_result(self):
"""Test the Result encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_codec_01")
# Request successful
self.request(Result(0))
self.indication(pduData=xtob('810000060000'))
self.response(PDU(xtob('810000060000')))
self.confirmation(Result, bvlciResultCode=0)
# Request error condition
self.request(Result(1))
self.indication(pduData=xtob('810000060001'))
self.response(PDU(xtob('810000060001')))
self.confirmation(Result, bvlciResultCode=1)
def test_write_broadcast_distribution_table(self):
"""Test the WriteBroadcastDistributionTable encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_write_broadcast_distribution_table")
# write an empty table
self.request(WriteBroadcastDistributionTable([]))
self.indication(pduData=xxtob('81.01.0004'))
self.response(PDU(xxtob('81.01.0004')))
self.confirmation(WriteBroadcastDistributionTable, bvlciBDT=[])
# write a table with an element
addr = Address('192.168.0.254/24')
pdu_bytes = xxtob('81.01.000e'
'c0.a8.00.fe.ba.c0.ff.ff.ff.00'
)
self.request(WriteBroadcastDistributionTable([addr]))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(WriteBroadcastDistributionTable, bvlciBDT=[addr])
def test_read_broadcast_distribution_table(self):
"""Test the ReadBroadcastDistributionTable encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_read_broadcast_distribution_table")
# read the table
self.request(ReadBroadcastDistributionTable())
self.indication(pduData=xxtob('81.02.0004'))
self.response(PDU(xxtob('81.02.0004')))
self.confirmation(ReadBroadcastDistributionTable)
def test_read_broadcast_distribution_table_ack(self):
"""Test the ReadBroadcastDistributionTableAck encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_read_broadcast_distribution_table_ack")
# read returns an empty table
self.request(ReadBroadcastDistributionTableAck([]))
self.indication(pduData=xxtob('81.03.0004'))
self.response(PDU(xxtob('81.03.0004')))
self.confirmation(ReadBroadcastDistributionTableAck, bvlciBDT=[])
# read returns a table with an element
addr = Address('192.168.0.254/24')
pdu_bytes = xxtob('81.03.000e'
'c0.a8.00.fe.ba.c0.ff.ff.ff.00'
)
self.request(ReadBroadcastDistributionTableAck([addr]))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(ReadBroadcastDistributionTableAck, bvlciBDT=[addr])
def test_forwarded_npdu(self):
"""Test the ForwardedNPDU encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_forwarded_npdu")
# read returns a table with an element
addr = Address('192.168.0.1')
xpdu = xxtob('deadbeef')
pdu_bytes = xxtob('81.04.000e'
'c0.a8.00.01.ba.c0' # original source address
'deadbeef' # forwarded PDU
)
self.request(ForwardedNPDU(addr, xpdu))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(ForwardedNPDU, bvlciAddress=addr, pduData=xpdu)
def test_register_foreign_device(self):
"""Test the RegisterForeignDevice encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_register_foreign_device")
# register as a foreign device with a 30 second time-to-live
pdu_bytes = xxtob('81.05.0006'
'001e' # time-to-live
)
self.request(RegisterForeignDevice(30))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(RegisterForeignDevice, bvlciTimeToLive=30)
def test_read_foreign_device_table(self):
"""Test the ReadForeignDeviceTable encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_read_foreign_device_table")
# read returns an empty table
self.request(ReadForeignDeviceTable())
self.indication(pduData=xxtob('81.06.0004'))
self.response(PDU(xxtob('81.06.0004')))
self.confirmation(ReadForeignDeviceTable)
def test_read_foreign_device_table_ack(self):
"""Test the ReadForeignDeviceTableAck encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_read_foreign_device_table_ack")
# read returns an empty table
self.request(ReadForeignDeviceTableAck([]))
self.indication(pduData=xxtob('81.07.0004'))
self.response(PDU(xxtob('81.07.0004')))
self.confirmation(ReadForeignDeviceTableAck, bvlciFDT=[])
# read returns a table with one entry
fdte = FDTEntry()
fdte.fdAddress = Address("192.168.0.10")
fdte.fdTTL = 30
fdte.fdRemain = 15
pdu_bytes = xxtob('81.07.000e'
'c0.a8.00.0a.ba.c0'
'001e.000f'
)
self.request(ReadForeignDeviceTableAck([fdte]))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(ReadForeignDeviceTableAck, bvlciFDT=[fdte])
def test_delete_foreign_device_table_entry(self):
"""Test the DeleteForeignDeviceTableEntry encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_delete_foreign_device_table_entry")
# delete an element
addr = Address('192.168.0.11/24')
pdu_bytes = xxtob('81.08.000a'
'c0.a8.00.0b.ba.c0' # address of entry to be deleted
)
self.request(DeleteForeignDeviceTableEntry(addr))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(DeleteForeignDeviceTableEntry, bvlciAddress=addr)
def test_distribute_broadcast_to_network(self):
"""Test the DistributeBroadcastToNetwork encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_distribute_broadcast_to_network")
# read returns a table with an element
xpdu = xxtob('deadbeef')
pdu_bytes = xxtob('81.09.0008'
'deadbeef' # PDU to broadcast
)
self.request(DistributeBroadcastToNetwork(xpdu))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(DistributeBroadcastToNetwork, pduData=xpdu)
def test_original_unicast_npdu(self):
"""Test the OriginalUnicastNPDU encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_original_unicast_npdu")
# read returns a table with an element
xpdu = xxtob('deadbeef')
pdu_bytes = xxtob('81.0a.0008'
'deadbeef' # PDU being unicast
)
self.request(OriginalUnicastNPDU(xpdu))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(OriginalUnicastNPDU, pduData=xpdu)
def test_original_broadcast_npdu(self):
"""Test the OriginalBroadcastNPDU encoding and decoding."""
if _debug: TestAnnexJCodec._debug("test_original_broadcast_npdu")
# read returns a table with an element
xpdu = xxtob('deadbeef')
pdu_bytes = xxtob('81.0b.0008'
'deadbeef' # PDU being broadcast
)
self.request(OriginalBroadcastNPDU(xpdu))
self.indication(pduData=pdu_bytes)
self.response(PDU(pdu_bytes))
self.confirmation(OriginalBroadcastNPDU, pduData=xpdu)

View File

@ -0,0 +1,14 @@
#!/usr/bin/python
"""
Test BACpypes IOCB Module
"""
from . import test_iocb
from . import test_iochain
from . import test_iogroup
from . import test_ioqueue
from . import test_iocontroller
from . import test_ioqcontroller
from . import test_clientcontroller
from . import test_sieveclientcontroller

View File

@ -0,0 +1 @@
# placeholder

View File

@ -0,0 +1 @@
# placeholder

View File

@ -0,0 +1 @@
# placeholder

View File

@ -0,0 +1 @@
# placeholder

View File

@ -0,0 +1 @@
# placeholder

View File

@ -0,0 +1 @@
# placeholder

View File

@ -0,0 +1 @@
# placeholder

View File

@ -0,0 +1 @@
# placeholder

View File

@ -6,6 +6,7 @@ Test Primitive Data Integer
---------------------------
"""
import sys
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
@ -78,6 +79,15 @@ class TestInteger(unittest.TestCase):
obj = Integer()
assert obj.value == 0
assert Integer.is_valid(1)
assert Integer.is_valid(-1)
if sys.version[0] == 2:
assert Integer.is_valid(long(1))
assert Integer.is_valid(long(-1))
assert not Integer.is_valid(True)
assert not Integer.is_valid(1.0)
with self.assertRaises(TypeError):
Integer("some string")
with self.assertRaises(TypeError):
@ -139,4 +149,5 @@ class TestInteger(unittest.TestCase):
integer_endec(-8388608, '800000')
integer_endec(2147483647, '7fffffff')
integer_endec(-2147483648, '80000000')
integer_endec(-2147483648, '80000000')

View File

@ -6,6 +6,7 @@ Test Primitive Data Unsigned
----------------------------
"""
import sys
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob
@ -78,6 +79,16 @@ class TestUnsigned(unittest.TestCase):
obj = Unsigned()
assert obj.value == 0
assert Unsigned.is_valid(1)
assert not Unsigned.is_valid(-1)
if sys.version[0] == 2:
assert Unsigned.is_valid(long(1))
assert not Unsigned.is_valid(long(-1))
assert not Unsigned.is_valid(True)
assert not Unsigned.is_valid(-1)
assert not Unsigned.is_valid(1.0)
with self.assertRaises(TypeError):
Unsigned("some string")
with self.assertRaises(TypeError):
@ -138,4 +149,5 @@ class TestUnsigned(unittest.TestCase):
unsigned_endec(8388608, '800000')
unsigned_endec(2147483647, '7fffffff')
unsigned_endec(2147483648, '80000000')
unsigned_endec(2147483648, '80000000')

View File

@ -9,7 +9,7 @@ Test Utilities State Machine
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from ..state_machine import State, StateMachine, StateMachineGroup
from ..state_machine import State, StateMachine, StateMachineGroup, match_pdu
from ..time_machine import reset_time_machine, run_time_machine
from ..trapped_classes import TrappedState, TrappedStateMachine
@ -18,6 +18,51 @@ _debug = 0
_log = ModuleLogger(globals())
@bacpypes_debugging
class TPDU:
def __init__(self, **kwargs):
if _debug: TPDU._debug("__init__ %r", kwargs)
self.__dict__.update(kwargs)
def __repr__(self):
return '<TPDU {}>'.format(', '.join(
'{}={}'.format(k, v) for k,v in self.__dict__.items(),
))
@bacpypes_debugging
class TestMatchPDU(unittest.TestCase):
def test_match_pdu(self):
if _debug: TestMatchPDU._debug("test_match_pdu")
tpdu = TPDU(x=1)
Anon = type('Anon', (), {})
anon = Anon()
# no criteria passes
assert match_pdu(tpdu)
assert match_pdu(anon)
# matching/not matching types
assert match_pdu(tpdu, TPDU)
assert not match_pdu(tpdu, Anon)
assert match_pdu(tpdu, (TPDU, Anon))
# matching/not matching attributes
assert match_pdu(tpdu, x=1)
assert not match_pdu(tpdu, x=2)
assert not match_pdu(tpdu, y=1)
assert not match_pdu(anon, x=1)
# matching/not matching types and attributes
assert match_pdu(tpdu, TPDU, x=1)
assert not match_pdu(tpdu, TPDU, x=2)
assert not match_pdu(tpdu, TPDU, y=1)
@bacpypes_debugging
class TestState(unittest.TestCase):
@ -29,6 +74,7 @@ class TestState(unittest.TestCase):
ns = ts.doc("test state")
assert ts.doc_string == "test state"
assert ns is ts
if _debug: TestState._debug(" - passed")
def test_state_success(self):
if _debug: TestState._debug("test_state_success")
@ -43,6 +89,7 @@ class TestState(unittest.TestCase):
ts.success()
with self.assertRaises(RuntimeError):
ts.fail()
if _debug: TestState._debug(" - passed")
def test_state_fail(self):
if _debug: TestState._debug("test_state_fail")
@ -57,9 +104,11 @@ class TestState(unittest.TestCase):
ts.success()
with self.assertRaises(RuntimeError):
ts.fail()
if _debug: TestState._debug(" - passed")
def test_something_else(self):
if _debug: TestState._debug("test_something_else")
if _debug: TestState._debug(" - passed")
@bacpypes_debugging
@ -77,12 +126,13 @@ class TestStateMachine(unittest.TestCase):
# check for still running in the start state
assert tsm.running
assert tsm.current_state is tsm.start_state
if _debug: TestStateMachine._debug(" - passed")
def test_state_machine_success(self):
if _debug: TestStateMachine._debug("test_state_machine_success")
# create a trapped state machine
tsm = TrappedStateMachine(state_subclass=TrappedState)
tsm = TrappedStateMachine()
assert isinstance(tsm.start_state, TrappedState)
# make the start state a success
@ -94,12 +144,13 @@ class TestStateMachine(unittest.TestCase):
# check for success
assert not tsm.running
assert tsm.current_state.is_success_state
if _debug: TestStateMachine._debug(" - passed")
def test_state_machine_fail(self):
if _debug: TestStateMachine._debug("test_state_machine_fail")
# create a trapped state machine
tsm = TrappedStateMachine(state_subclass=TrappedState)
tsm = TrappedStateMachine()
assert isinstance(tsm.start_state, TrappedState)
# make the start state a fail
@ -111,15 +162,16 @@ class TestStateMachine(unittest.TestCase):
# check for success
assert not tsm.running
assert tsm.current_state.is_fail_state
if _debug: TestStateMachine._debug(" - passed")
def test_state_machine_send(self):
if _debug: TestStateMachine._debug("test_state_machine_send")
# create a trapped state machine
tsm = TrappedStateMachine(state_subclass=TrappedState)
tsm = TrappedStateMachine()
# make pdu object
pdu = object()
pdu = TPDU()
# make a send transition from start to success, run the machine
tsm.start_state.send(pdu).success()
@ -141,18 +193,19 @@ class TestStateMachine(unittest.TestCase):
# check the transaction log
assert len(tsm.transaction_log) == 1
assert tsm.transaction_log[0][1] is pdu
if _debug: TestStateMachine._debug(" - passed")
def test_state_machine_receive(self):
if _debug: TestStateMachine._debug("test_state_machine_receive")
# create a trapped state machine
tsm = TrappedStateMachine(state_subclass=TrappedState)
tsm = TrappedStateMachine()
# make pdu object
pdu = object()
pdu = TPDU()
# make a receive transition from start to success, run the machine
tsm.start_state.receive(pdu).success()
tsm.start_state.receive(TPDU).success()
tsm.run()
# check for still running
@ -174,19 +227,20 @@ class TestStateMachine(unittest.TestCase):
# check the transaction log
assert len(tsm.transaction_log) == 1
assert tsm.transaction_log[0][1] is pdu
if _debug: TestStateMachine._debug(" - passed")
def test_state_machine_unexpected(self):
if _debug: TestStateMachine._debug("test_state_machine_unexpected")
# create a trapped state machine
tsm = TrappedStateMachine(state_subclass=TrappedState)
tsm = TrappedStateMachine()
# make pdu object
good_pdu = object()
bad_pdu = object()
good_pdu = TPDU(a=1)
bad_pdu = TPDU(b=2)
# make a receive transition from start to success, run the machine
tsm.start_state.receive(good_pdu).success()
tsm.start_state.receive(TPDU, a=1).success()
tsm.run()
# check for still running
@ -206,21 +260,24 @@ class TestStateMachine(unittest.TestCase):
# check the transaction log
assert len(tsm.transaction_log) == 1
assert tsm.transaction_log[0][1] is bad_pdu
if _debug: TestStateMachine._debug(" - passed")
def test_state_machine_loop_01(self):
if _debug: TestStateMachine._debug("test_state_machine_loop_01")
# create a trapped state machine
tsm = TrappedStateMachine(state_subclass=TrappedState)
tsm = TrappedStateMachine()
# make pdu object
first_pdu = object()
second_pdu = object()
first_pdu = TPDU(a=1)
if _debug: TestStateMachine._debug(" - first_pdu: %r", first_pdu)
second_pdu = TPDU(a=2)
if _debug: TestStateMachine._debug(" - second_pdu: %r", second_pdu)
# after sending the first pdu, wait for the second
s0 = tsm.start_state
s1 = s0.send(first_pdu)
s2 = s1.receive(second_pdu)
s2 = s1.receive(TPDU, a=2)
s2.success()
# run the machine
@ -229,6 +286,7 @@ class TestStateMachine(unittest.TestCase):
# check for still running and waiting
assert tsm.running
assert tsm.current_state is s1
if _debug: TestStateMachine._debug(" - still running and waiting")
# give the machine the second pdu
tsm.receive(second_pdu)
@ -236,31 +294,34 @@ class TestStateMachine(unittest.TestCase):
# check for success
assert not tsm.running
assert tsm.current_state.is_success_state
if _debug: TestStateMachine._debug(" - success")
# check the callbacks
assert s0.before_send_pdu is first_pdu
assert s0.after_send_pdu is first_pdu
assert s1.before_receive_pdu is second_pdu
assert s1.after_receive_pdu is second_pdu
if _debug: TestStateMachine._debug(" - callbacks passed")
# check the transaction log
assert len(tsm.transaction_log) == 2
assert tsm.transaction_log[0][1] is first_pdu
assert tsm.transaction_log[1][1] is second_pdu
if _debug: TestStateMachine._debug(" - transaction log passed")
def test_state_machine_loop_02(self):
if _debug: TestStateMachine._debug("test_state_machine_loop_02")
# create a trapped state machine
tsm = TrappedStateMachine(state_subclass=TrappedState)
tsm = TrappedStateMachine()
# make pdu object
first_pdu = object()
second_pdu = object()
first_pdu = TPDU(a=1)
second_pdu = TPDU(a=2)
# when the first pdu is received, send the second
s0 = tsm.start_state
s1 = s0.receive(first_pdu)
s1 = s0.receive(TPDU, a=1)
s2 = s1.send(second_pdu)
s2.success()
@ -269,6 +330,7 @@ class TestStateMachine(unittest.TestCase):
# check for still running
assert tsm.running
if _debug: TestStateMachine._debug(" - still running")
# give the machine the first pdu
tsm.receive(first_pdu)
@ -276,17 +338,20 @@ class TestStateMachine(unittest.TestCase):
# check for success
assert not tsm.running
assert tsm.current_state.is_success_state
if _debug: TestStateMachine._debug(" - success")
# check the callbacks
assert s0.before_receive_pdu is first_pdu
assert s0.after_receive_pdu is first_pdu
assert s1.before_send_pdu is second_pdu
assert s1.after_send_pdu is second_pdu
if _debug: TestStateMachine._debug(" - callbacks passed")
# check the transaction log
assert len(tsm.transaction_log) == 2
assert tsm.transaction_log[0][1] is first_pdu
assert tsm.transaction_log[1][1] is second_pdu
if _debug: TestStateMachine._debug(" - transaction log passed")
@bacpypes_debugging
@ -296,7 +361,7 @@ class TestStateMachineTimeout1(unittest.TestCase):
if _debug: TestStateMachineTimeout1._debug("test_state_machine_timeout_1")
# create a trapped state machine
tsm = TrappedStateMachine(state_subclass=TrappedState)
tsm = TrappedStateMachine()
# make a timeout transition from start to success
tsm.start_state.timeout(1.0).success()
@ -312,6 +377,7 @@ class TestStateMachineTimeout1(unittest.TestCase):
# check for success
assert not tsm.running
assert tsm.current_state.is_success_state
if _debug: TestStateMachine._debug(" - passed")
@bacpypes_debugging
@ -321,11 +387,11 @@ class TestStateMachineTimeout2(unittest.TestCase):
if _debug: TestStateMachineTimeout2._debug("test_state_machine_timeout_2")
# make some pdu's
first_pdu = object()
second_pdu = object()
first_pdu = TPDU(a=1)
second_pdu = TPDU(a=2)
# create a trapped state machine
tsm = TrappedStateMachine(state_subclass=TrappedState)
tsm = TrappedStateMachine()
s0 = tsm.start_state
# send something, wait, send something, wait, success
@ -350,6 +416,7 @@ class TestStateMachineTimeout2(unittest.TestCase):
assert len(tsm.transaction_log) == 2
assert tsm.transaction_log[0][1] is first_pdu
assert tsm.transaction_log[1][1] is second_pdu
if _debug: TestStateMachine._debug(" - passed")
@bacpypes_debugging
@ -362,7 +429,7 @@ class TestStateMachineGroup(unittest.TestCase):
smg = StateMachineGroup()
# create a trapped state machine, start state is success
tsm = TrappedStateMachine(state_subclass=TrappedState)
tsm = TrappedStateMachine()
tsm.start_state.success()
# add it to the group
@ -381,6 +448,7 @@ class TestStateMachineGroup(unittest.TestCase):
assert not tsm.running
assert tsm.current_state.is_success_state
assert smg.is_success_state
if _debug: TestStateMachine._debug(" - passed")
def test_state_machine_group_fail(self):
if _debug: TestStateMachineGroup._debug("test_state_machine_group_fail")
@ -389,7 +457,7 @@ class TestStateMachineGroup(unittest.TestCase):
smg = StateMachineGroup()
# create a trapped state machine, start state is fail
tsm = TrappedStateMachine(state_subclass=TrappedState)
tsm = TrappedStateMachine()
tsm.start_state.fail()
# add it to the group
@ -408,3 +476,71 @@ class TestStateMachineGroup(unittest.TestCase):
assert not tsm.running
assert tsm.current_state.is_fail_state
assert smg.is_fail_state
if _debug: TestStateMachine._debug(" - passed")
@bacpypes_debugging
class TestStateMachineEvents(unittest.TestCase):
def test_state_machine_event_01(self):
if _debug: TestStateMachineEvents._debug("test_state_machine_event_01")
# create a state machine group
smg = StateMachineGroup()
# create a trapped state machine, start state is success
tsm1 = TrappedStateMachine()
tsm1.start_state.set_event('e').success()
smg.append(tsm1)
# create another trapped state machine, waiting for the event
tsm2 = TrappedStateMachine()
tsm2.start_state.wait_event('e').success()
smg.append(tsm2)
reset_time_machine()
if _debug: TestStateMachineEvents._debug(" - time machine reset")
# tell the group to run
smg.run()
run_time_machine(60.0)
if _debug: TestStateMachineEvents._debug(" - time machine finished")
# check for success
assert tsm1.current_state.is_success_state
assert tsm2.current_state.is_success_state
assert smg.is_success_state
if _debug: TestStateMachineEvents._debug(" - passed")
def test_state_machine_event_02(self):
if _debug: TestStateMachineEvents._debug("test_state_machine_event_02")
# create a state machine group
smg = StateMachineGroup()
# create a trapped state machine, waiting for an event
tsm1 = TrappedStateMachine()
tsm1.start_state.wait_event('e').success()
smg.append(tsm1)
# create another trapped state machine, start state is success
tsm2 = TrappedStateMachine()
tsm2.start_state.set_event('e').success()
smg.append(tsm2)
reset_time_machine()
if _debug: TestStateMachineEvents._debug(" - time machine reset")
# tell the group to run
smg.run()
run_time_machine(60.0)
if _debug: TestStateMachineEvents._debug(" - time machine finished")
# check for success
assert tsm1.current_state.is_success_state
assert tsm2.current_state.is_success_state
assert smg.is_success_state
if _debug: TestStateMachineEvents._debug(" - passed")

View File

@ -8,3 +8,5 @@ This module tests the VLAN networking.
"""
from . import test_network
from . import test_ipnetwork

View File

@ -0,0 +1,374 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Test IPNetwork
------------
This module tests the basic functionality of a crudely simulated IPv4 network,
source and destination addresses are tuples like those used for sockets and
the UDPDirector.
"""
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.pdu import Address, LocalBroadcast, PDU
from bacpypes.comm import bind
from bacpypes.vlan import IPNetwork, IPNode, IPRouter
from ..state_machine import ClientStateMachine, StateMachineGroup
from ..time_machine import reset_time_machine, run_time_machine
# some debugging
_debug = 0
_log = ModuleLogger(globals())
@bacpypes_debugging
class TNetwork(StateMachineGroup):
def __init__(self, node_count, address_pattern):
if _debug: TNetwork._debug("__init__ %r", node_count)
StateMachineGroup.__init__(self)
self.vlan = IPNetwork()
for i in range(node_count):
node_address = Address(address_pattern.format(i + 1))
node = IPNode(node_address, self.vlan)
if _debug: TNetwork._debug(" - node: %r", node)
# bind a client state machine to the node
csm = ClientStateMachine()
bind(csm, node)
# add it to this group
self.append(csm)
def run(self, time_limit=60.0):
if _debug: TNetwork._debug("run %r", time_limit)
# reset the time machine
reset_time_machine()
if _debug: TNetwork._debug(" - time machine reset")
# run the group
super(TNetwork, self).run()
# run it for some time
run_time_machine(time_limit)
if _debug: TNetwork._debug(" - time machine finished")
# check for success
all_success, some_failed = super(TNetwork, self).check_for_success()
assert all_success
@bacpypes_debugging
class TestVLAN(unittest.TestCase):
def __init__(self, *args, **kwargs):
if _debug: TestVLAN._debug("__init__ %r %r", args, kwargs)
super(TestVLAN, self).__init__(*args, **kwargs)
def test_idle(self):
"""Test that a very quiet network can exist. This is not a network
test so much as a state machine group test.
"""
if _debug: TestVLAN._debug("test_idle")
# two element network
tnet = TNetwork(2, "192.168.1.{}/24")
tnode1, tnode2 = tnet.state_machines
# set the start states of both machines to success
tnode1.start_state.success()
tnode2.start_state.success()
# run the group
tnet.run()
def test_send_receive(self):
"""Test that a node can send a message to another node.
"""
if _debug: TestVLAN._debug("test_send_receive")
# two element network
tnet = TNetwork(2, "192.168.2.{}/24")
tnode1, tnode2 = tnet.state_machines
# make a PDU from node 1 to node 2
pdu = PDU(b'data',
source=('192.168.2.1', 47808),
destination=('192.168.2.2', 47808),
)
if _debug: TestVLAN._debug(" - pdu: %r", pdu)
# node 1 sends the pdu, mode 2 gets it
tnode1.start_state.send(pdu).success()
tnode2.start_state.receive(PDU,
pduSource=('192.168.2.1', 47808),
).success()
# run the group
tnet.run()
def test_broadcast(self):
"""Test that a node can send out a 'local broadcast' message which will
be received by every other node.
"""
if _debug: TestVLAN._debug("test_broadcast")
# three element network
tnet = TNetwork(3, "192.168.3.{}/24")
tnode1, tnode2, tnode3 = tnet.state_machines
# make a broadcast PDU
pdu = PDU(b'data',
source=('192.168.3.1', 47808),
destination=('192.168.3.255', 47808),
)
if _debug: TestVLAN._debug(" - pdu: %r", pdu)
# node 1 sends the pdu, node 2 and 3 each get it
tnode1.start_state.send(pdu).success()
tnode2.start_state.receive(PDU,
pduSource=('192.168.3.1', 47808),
).success()
tnode3.start_state.receive(PDU,
pduSource=('192.168.3.1', 47808)
).success()
# run the group
tnet.run()
def test_spoof_fail(self):
"""Test verifying that a node cannot send out packets with a source
address other than its own, see also test_spoof_pass().
"""
if _debug: TestVLAN._debug("test_spoof_fail")
# one element network
tnet = TNetwork(1, "192.168.4.{}/24")
tnode1, = tnet.state_machines
# make a unicast PDU with the wrong source
pdu = PDU(b'data',
source=('192.168.4.2', 47808),
destination=('192.168.4.3', 47808),
)
# the node sends the pdu and would be a success but...
tnode1.start_state.send(pdu).success()
# when the node attempts to send it raises an error
with self.assertRaises(RuntimeError):
tnet.run()
def test_spoof_pass(self):
"""Test allowing a node to send out packets with a source address
other than its own, see also test_spoof_fail().
"""
if _debug: TestVLAN._debug("test_spoof_pass")
# one node network
tnet = TNetwork(1, "192.168.5.{}/24")
tnode1, = tnet.state_machines
# reach into the network and enable spoofing for the node
tnet.vlan.nodes[0].spoofing = True
# make a unicast PDU from a fictitious node
pdu = PDU(b'data',
source=('192.168.5.3', 47808),
destination=('192.168.5.1', 47808),
)
# node 1 sends the pdu, but gets it back as if it was from node 3
tnode1.start_state.send(pdu).receive(PDU,
pduSource=('192.168.5.3', 47808),
).success()
# run the group
tnet.run()
def test_promiscuous_pass(self):
"""Test 'promiscuous mode' of a node which allows it to receive every
packet sent on the network. This is like the network is a hub, or
the node is connected to a 'monitor' port on a managed switch.
"""
if _debug: TestVLAN._debug("test_promiscuous_pass")
# three element network
tnet = TNetwork(3, "192.168.6.{}/24")
tnode1, tnode2, tnode3 = tnet.state_machines
# reach into the network and enable promiscuous mode
tnet.vlan.nodes[2].promiscuous = True
# make a PDU from node 1 to node 2
pdu = PDU(b'data',
source=('192.168.6.1', 47808),
destination=('192.168.6.2', 47808),
)
# node 1 sends the pdu to node 2, node 3 also gets a copy
tnode1.start_state.send(pdu).success()
tnode2.start_state.receive(PDU,
pduSource=('192.168.6.1', 47808),
).success()
tnode3.start_state.receive(PDU,
pduDestination=('192.168.6.2', 47808),
).success()
# run the group
tnet.run()
def test_promiscuous_fail(self):
if _debug: TestVLAN._debug("test_promiscuous_fail")
# three element network
tnet = TNetwork(3, "192.168.7.{}/24")
tnode1, tnode2, tnode3 = tnet.state_machines
# make a PDU from node 1 to node 2
pdu = PDU(b'data',
source=('192.168.7.1', 47808),
destination=('192.168.7.2', 47808),
)
# node 1 sends the pdu to node 2, node 3 waits and gets nothing
tnode1.start_state.send(pdu).success()
tnode2.start_state.receive(PDU,
pduSource=('192.168.7.1', 47808),
).success()
# if node 3 receives anything it will trigger unexpected receive and fail
tnode3.start_state.timeout(1).success()
# run the group
tnet.run()
@bacpypes_debugging
class TestRouter(unittest.TestCase):
def __init__(self, *args, **kwargs):
if _debug: TestRouter._debug("__init__ %r %r", args, kwargs)
super(TestRouter, self).__init__(*args, **kwargs)
def setup_method(self, method):
"""This function is called before each test method is called as is
given a reference to the test method."""
if _debug: TestRouter._debug("setup_method %r", method)
# create a state machine group that has all nodes on all networks
self.smg = StateMachineGroup()
# make some networks
vlan10 = IPNetwork()
vlan20 = IPNetwork()
# make a router and add the networks
trouter = IPRouter()
trouter.add_network(Address("192.168.10.1/24"), vlan10)
trouter.add_network(Address("192.168.20.1/24"), vlan20)
# add nodes to the networks
for pattern, lan in (
("192.168.10.{}/24", vlan10),
("192.168.20.{}/24", vlan20),
):
for i in range(2):
node_address = Address(pattern.format(i + 2))
node = IPNode(node_address, lan)
if _debug: TestRouter._debug(" - node: %r", node)
# bind a client state machine to the node
csm = ClientStateMachine()
bind(csm, node)
# add it to the group
self.smg.append(csm)
def teardown_method(self, method):
"""This function is called after each test method has been called and
is given a reference to the test method."""
if _debug: TestRouter._debug("teardown_method %r", method)
# reset the time machine
reset_time_machine()
if _debug: TestRouter._debug(" - time machine reset")
# run the group
self.smg.run()
# run it for some time
run_time_machine(60.0)
if _debug: TestRouter._debug(" - time machine finished")
# check for success
all_success, some_failed = self.smg.check_for_success()
assert all_success
def test_idle(self):
if _debug: TestRouter._debug("test_idle")
# all successful
for csm in self.smg.state_machines:
csm.start_state.success()
def test_send_receive(self):
"""Test that a node can send a message to another node on
a different network.
"""
if _debug: TestRouter._debug("test_send_receive")
# unpack the state machines
csm_10_2, csm_10_3, csm_20_2, csm_20_3 = self.smg.state_machines
# make a PDU from network 10 node 1 to network 20 node 2
pdu = PDU(b'data',
source=('192.168.10.2', 47808),
destination=('192.168.20.3', 47808),
)
if _debug: TestVLAN._debug(" - pdu: %r", pdu)
# node 1 sends the pdu, mode 2 gets it
csm_10_2.start_state.send(pdu).success()
csm_20_3.start_state.receive(PDU,
pduSource=('192.168.10.2', 47808),
).success()
# other nodes get nothing
csm_10_3.start_state.timeout(1).success()
csm_20_2.start_state.timeout(1).success()
def test_remote_broadcast(self):
"""Test that a node can send a message to all of the other nodes on
a different network.
"""
if _debug: TestRouter._debug("test_remote_broadcast")
# unpack the state machines
csm_10_2, csm_10_3, csm_20_2, csm_20_3 = self.smg.state_machines
# make a PDU from network 10 node 1 to network 20 node 2
pdu = PDU(b'data',
source=('192.168.10.2', 47808),
destination=('192.168.20.255', 47808),
)
if _debug: TestVLAN._debug(" - pdu: %r", pdu)
# node 10-2 sends the pdu, node 10-3 gets nothing, nodes 20-2 and 20-3 get it
csm_10_2.start_state.send(pdu).success()
csm_10_3.start_state.timeout(1).success()
csm_20_2.start_state.receive(PDU,
pduSource=('192.168.10.2', 47808),
).success()
csm_20_3.start_state.receive(PDU,
pduSource=('192.168.10.2', 47808),
).success()

View File

@ -2,58 +2,255 @@
# -*- coding: utf-8 -*-
"""
Test Module Template
--------------------
Test Network
------------
This module tests the basic functionality of a VLAN network. Each test "runs"
on a VLAN with two nodes, node_1 and node_2, and each has a state machine.
"""
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.pdu import Address, LocalBroadcast, PDU
from bacpypes.comm import bind
from bacpypes.vlan import Network, Node
from ..state_machine import ClientStateMachine, StateMachineGroup
from ..time_machine import reset_time_machine, run_time_machine
# some debugging
_debug = 0
_log = ModuleLogger(globals())
@bacpypes_debugging
def setup_module():
if _debug: setup_module._debug("setup_module")
class TNetwork(StateMachineGroup):
def __init__(self, node_count):
if _debug: TNetwork._debug("__init__ %r", node_count)
StateMachineGroup.__init__(self)
self.vlan = Network(broadcast_address=0)
for i in range(node_count):
node = Node(i + 1, self.vlan)
# bind a client state machine to the node
csm = ClientStateMachine()
bind(csm, node)
# add it to this group
self.append(csm)
def run(self, time_limit=60.0):
if _debug: TNetwork._debug("run %r", time_limit)
# reset the time machine
reset_time_machine()
if _debug: TNetwork._debug(" - time machine reset")
# run the group
super(TNetwork, self).run()
# run it for some time
run_time_machine(time_limit)
if _debug: TNetwork._debug(" - time machine finished")
# check for success
all_success, some_failed = super(TNetwork, self).check_for_success()
assert all_success
@bacpypes_debugging
def teardown_module():
if _debug: teardown_module._debug("teardown_module")
class TestVLAN(unittest.TestCase):
def __init__(self, *args, **kwargs):
if _debug: TestVLAN._debug("__init__ %r %r", args, kwargs)
super(TestVLAN, self).__init__(*args, **kwargs)
def test_idle(self):
"""Test that a very quiet network can exist. This is not a network
test so much as a state machine group test.
"""
if _debug: TestVLAN._debug("test_idle")
# two element network
tnet = TNetwork(2)
tnode1, tnode2 = tnet.state_machines
# set the start states of both machines to success
tnode1.start_state.success()
tnode2.start_state.success()
# run the group
tnet.run()
def test_send_receive(self):
"""Test that a node can send a message to another node.
"""
if _debug: TestVLAN._debug("test_send_receive")
# two element network
tnet = TNetwork(2)
tnode1, tnode2 = tnet.state_machines
# make a PDU from node 1 to node 2
pdu = PDU(b'data', source=1, destination=2)
if _debug: TestVLAN._debug(" - pdu: %r", pdu)
# node 1 sends the pdu, mode 2 gets it
tnode1.start_state.send(pdu).success()
tnode2.start_state.receive(PDU, pduSource=1).success()
# run the group
tnet.run()
def test_broadcast(self):
"""Test that a node can send out a 'local broadcast' message which will
be received by every other node.
"""
if _debug: TestVLAN._debug("test_broadcast")
# three element network
tnet = TNetwork(3)
tnode1, tnode2, tnode3 = tnet.state_machines
# make a broadcast PDU
pdu = PDU(b'data', source=1, destination=0)
if _debug: TestVLAN._debug(" - pdu: %r", pdu)
# node 1 sends the pdu, node 2 and 3 each get it
tnode1.start_state.send(pdu).success()
tnode2.start_state.receive(PDU, pduSource=1).success()
tnode3.start_state.receive(PDU, pduSource=1).success()
# run the group
tnet.run()
def test_spoof_fail(self):
"""Test verifying that a node cannot send out packets with a source
address other than its own, see also test_spoof_pass().
"""
if _debug: TestVLAN._debug("test_spoof_fail")
# two element network
tnet = TNetwork(1)
tnode1, = tnet.state_machines
# make a unicast PDU with the wrong source
pdu = PDU(b'data', source=2, destination=3)
# the node sends the pdu and would be a success but...
tnode1.start_state.send(pdu).success()
# when the node attempts to send it raises an error
with self.assertRaises(RuntimeError):
tnet.run()
def test_spoof_pass(self):
"""Test allowing a node to send out packets with a source address
other than its own, see also test_spoof_fail().
"""
if _debug: TestVLAN._debug("test_spoof_pass")
# one node network
tnet = TNetwork(1)
tnode1, = tnet.state_machines
# reach into the network and enable spoofing for the node
tnet.vlan.nodes[0].spoofing = True
# make a unicast PDU from a fictitious node
pdu = PDU(b'data', source=3, destination=1)
# node 1 sends the pdu, but gets it back as if it was from node 3
tnode1.start_state.send(pdu).receive(PDU, pduSource=3).success()
# run the group
tnet.run()
def test_promiscuous_pass(self):
"""Test 'promiscuous mode' of a node which allows it to receive every
packet sent on the network. This is like the network is a hub, or
the node is connected to a 'monitor' port on a managed switch.
"""
if _debug: TestVLAN._debug("test_promiscuous_pass")
# three element network
tnet = TNetwork(3)
tnode1, tnode2, tnode3 = tnet.state_machines
# reach into the network and enable promiscuous mode
tnet.vlan.nodes[2].promiscuous = True
# make a PDU from node 1 to node 2
pdu = PDU(b'data', source=1, destination=2)
# node 1 sends the pdu to node 2, node 3 also gets a copy
tnode1.start_state.send(pdu).success()
tnode2.start_state.receive(PDU, pduSource=1).success()
tnode3.start_state.receive(PDU, pduDestination=2).success()
# run the group
tnet.run()
def test_promiscuous_fail(self):
if _debug: TestVLAN._debug("test_promiscuous_fail")
# three element network
tnet = TNetwork(3)
tnode1, tnode2, tnode3 = tnet.state_machines
# make a PDU from node 1 to node 2
pdu = PDU(b'data', source=1, destination=2)
# node 1 sends the pdu to node 2, node 3 waits and gets nothing
tnode1.start_state.send(pdu).success()
tnode2.start_state.receive(PDU, pduSource=1).success()
# if node 3 receives anything it will trigger unexpected receive and fail
tnode3.start_state.timeout(0.5).success()
# run the group
tnet.run()
@bacpypes_debugging
def setup_function(function):
if _debug: setup_function._debug("setup_function %r", function)
class TestVLANEvents(unittest.TestCase):
def __init__(self, *args, **kwargs):
if _debug: TestVLANEvents._debug("__init__ %r %r", args, kwargs)
super(TestVLANEvents, self).__init__(*args, **kwargs)
@bacpypes_debugging
def teardown_function(function):
if _debug: teardown_function._debug("teardown_function %r", function)
def test_send_receive(self):
"""Test that a node can send a message to another node and use
events to continue with the messages.
"""
if _debug: TestVLAN._debug("test_send_receive")
# two element network
tnet = TNetwork(2)
tnode1, tnode2 = tnet.state_machines
@bacpypes_debugging
class TestCaseTemplate(unittest.TestCase):
# make a PDU from node 1 to node 2
dead_pdu = PDU(b'dead', source=1, destination=2)
if _debug: TestVLAN._debug(" - dead_pdu: %r", dead_pdu)
@classmethod
def setup_class(cls):
if _debug: TestCaseTemplate._debug("setup_class")
# make a PDU from node 1 to node 2
beef_pdu = PDU(b'beef', source=1, destination=2)
if _debug: TestVLAN._debug(" - beef_pdu: %r", beef_pdu)
@classmethod
def teardown_class(cls):
if _debug: TestCaseTemplate._debug("teardown_class")
# node 1 sends dead_pdu, waits for event, sends beef_pdu
tnode1.start_state \
.send(dead_pdu).wait_event('e') \
.send(beef_pdu).success()
def setup_method(self, method):
if _debug: TestCaseTemplate._debug("setup_module %r", method)
# node 2 receives dead_pdu, sets event, waits for beef_pdu
tnode2.start_state \
.receive(PDU, pduData=b'dead').set_event('e') \
.receive(PDU, pduData=b'beef').success()
def teardown_method(self, method):
if _debug: TestCaseTemplate._debug("teardown_method %r", method)
# run the group
tnet.run()
def test_something(self):
if _debug: TestCaseTemplate._debug("test_something")
def test_something_else(self):
if _debug: TestCaseTemplate._debug("test_something_else")

View File

@ -122,21 +122,24 @@ class TrappedStateMachine(Trapper, StateMachine):
throw an exception.
"""
def __init__(self, **kwargs):
"""Initialize a trapped state machine."""
if _debug: TrappedStateMachine._debug("__init__ %r", kwargs)
# provide a default state subclass
if 'state_subclass' not in kwargs:
kwargs['state_subclass'] = TrappedState
# pass them all along
super(TrappedStateMachine, self).__init__(**kwargs)
def send(self, pdu):
"""Called to send a PDU.
"""
"""Called to send a PDU."""
if _debug: TrappedStateMachine._debug("send %r", pdu)
# keep a copy
self.sent = pdu
def match_pdu(self, pdu, transition_pdu):
"""Very strong match condition."""
if _debug: TrappedStateMachine._debug("match_pdu %r %r", pdu, transition_pdu)
# must be identical objects
return pdu is transition_pdu
@bacpypes_debugging
class TrappedClient(Client):