From 209d6199945c517563db6ea34d8ec0e99bb9c08e Mon Sep 17 00:00:00 2001 From: Joel Bender Date: Mon, 11 Sep 2017 02:08:01 -0400 Subject: [PATCH] state and time machine improvements and starting to add BVLL tests --- tests/state_machine.py | 8 + tests/test_bvll/__init__.py | 3 + tests/test_bvll/helpers.py | 199 +++++++++++++++++++++ tests/test_bvll/test_bbmd.py | 1 + tests/test_bvll/test_codec.py | 80 +++++---- tests/test_bvll/test_foreign.py | 115 ++++++++++++ tests/test_bvll/test_simple.py | 154 ++++++++++++++++ tests/test_utilities/test_state_machine.py | 4 + tests/time_machine.py | 43 ++++- 9 files changed, 571 insertions(+), 36 deletions(-) create mode 100644 tests/test_bvll/helpers.py create mode 100644 tests/test_bvll/test_bbmd.py create mode 100644 tests/test_bvll/test_foreign.py create mode 100644 tests/test_bvll/test_simple.py diff --git a/tests/state_machine.py b/tests/state_machine.py index 118a709..fc8071b 100755 --- a/tests/state_machine.py +++ b/tests/state_machine.py @@ -885,6 +885,9 @@ class StateMachineGroup(object): # flag for starting up self._startup_flag = False + # flag for at least one machine running + self.is_running = False + # flags for remembering success or fail self.is_success_state = None self.is_fail_state = None @@ -969,6 +972,7 @@ class StateMachineGroup(object): # turn on the startup flag self._startup_flag = True + self.is_running = True # pass along to each machine for state_machine in self.state_machines: @@ -985,6 +989,8 @@ class StateMachineGroup(object): self.success() elif some_failed: self.fail() + else: + if _debug: StateMachineGroup._debug(" - some still running") def running(self, state_machine): """Called by a state machine in the group when it has completed its @@ -1050,6 +1056,7 @@ class StateMachineGroup(object): are all in a 'success' final state.""" if _debug: StateMachineGroup._debug("success") + self.is_running = False self.is_success_state = True def fail(self): @@ -1057,6 +1064,7 @@ class StateMachineGroup(object): at least one of them is in a 'fail' final state.""" if _debug: StateMachineGroup._debug("fail") + self.is_running = False self.is_fail_state = True diff --git a/tests/test_bvll/__init__.py b/tests/test_bvll/__init__.py index 95546aa..2119d0a 100644 --- a/tests/test_bvll/__init__.py +++ b/tests/test_bvll/__init__.py @@ -5,4 +5,7 @@ Test BVLL Module """ from . import test_codec +from . import test_simple +from . import test_foreign +from . import test_bbmd diff --git a/tests/test_bvll/helpers.py b/tests/test_bvll/helpers.py new file mode 100644 index 0000000..beec9b7 --- /dev/null +++ b/tests/test_bvll/helpers.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python + +""" +B/IP VLAN Helper Classes +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger + +from bacpypes.comm import Client, Server, bind +from bacpypes.pdu import Address, LocalBroadcast, PDU, unpack_ip_addr +from bacpypes.vlan import IPNode + +from ..state_machine import ClientStateMachine + +from bacpypes.bvllservice import BIPSimple, BIPForeign, BIPBBMD, AnnexJCodec + + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# +# FauxMultiplexer +# + +@bacpypes_debugging +class FauxMultiplexer(Client, Server): + + def __init__(self, addr, network=None, cid=None, sid=None): + if _debug: FauxMultiplexer._debug("__init__") + + Client.__init__(self, cid) + Server.__init__(self, sid) + + # allow the address to be cast + if isinstance(addr, Address): + self.address = addr + else: + self.address = Address(addr) + + # get the unicast and broadcast tuples + self.unicast_tuple = addr.addrTuple + self.broadcast_tuple = addr.addrBroadcastTuple + + # make an internal node and bind to it, this takes the place of + # both the direct port and broadcast port of the real UDPMultiplexer + self.node = IPNode(addr, network) + bind(self, self.node) + + def indication(self, pdu): + if _debug: FauxMultiplexer._debug("indication %r", pdu) + + # check for a broadcast message + if pdu.pduDestination.addrType == Address.localBroadcastAddr: + dest = self.broadcast_tuple + if _debug: FauxMultiplexer._debug(" - requesting local broadcast: %r", dest) + + elif pdu.pduDestination.addrType == Address.localStationAddr: + dest = unpack_ip_addr(pdu.pduDestination.addrAddr) + if _debug: FauxMultiplexer._debug(" - requesting local station: %r", dest) + + else: + raise RuntimeError("invalid destination address type") + + # continue downstream + self.request(PDU(pdu, source=self.unicast_tuple, destination=dest)) + + def confirmation(self, pdu): + if _debug: FauxMultiplexer._debug("confirmation %r", pdu) + + # the PDU source and destination are tuples, convert them to Address instances + src = Address(pdu.pduSource) + + # see if the destination was our broadcast address + if pdu.pduDestination == self.broadcast_tuple: + dest = LocalBroadcast() + else: + dest = Address(pdu.pduDestination) + + # continue upstream + self.response(PDU(pdu, source=src, destination=dest)) + +# +# _repr +# + +class _repr: + + def __repr__(self): + if not self.running: + state_text = "idle " + else: + state_text = "in " + state_text += repr(self.current_state) + + return "<%s(%s) %s at %s>" % ( + self.__class__.__name__, + getattr(self, 'address', '?'), + state_text, + hex(id(self)), + ) + + +# +# SnifferNode +# + +@bacpypes_debugging +class SnifferNode(_repr, ClientStateMachine): + + def __init__(self, address, vlan): + if _debug: SnifferNode._debug("__init__ %r %r", address, vlan) + ClientStateMachine.__init__(self) + + # save the address + self.address = Address(address) + + # create a promiscuous node, added to the network + self.node = IPNode(self.address, vlan, promiscuous=True) + if _debug: SnifferNode._debug(" - node: %r", self.node) + + # bind this to the node + bind(self, self.node) + + +# +# SimpleNode +# + +@bacpypes_debugging +class SimpleNode(_repr, ClientStateMachine): + + def __init__(self, address, vlan): + if _debug: SimpleNode._debug("__init__ %r %r", address, vlan) + ClientStateMachine.__init__(self) + + # save the address + self.address = Address(address) + + # BACnet/IP interpreter + self.bip = BIPSimple() + self.annexj = AnnexJCodec() + + # fake multiplexer has a VLAN node in it + self.mux = FauxMultiplexer(self.address, vlan) + + # bind the stack together + bind(self, self.bip, self.annexj, self.mux) + + +# +# ForeignNode +# + +@bacpypes_debugging +class ForeignNode(_repr, ClientStateMachine): + + def __init__(self, address, vlan): + if _debug: ForeignNode._debug("__init__ %r %r", address, vlan) + ClientStateMachine.__init__(self) + + # save the address + self.address = Address(address) + + # BACnet/IP interpreter + self.bip = BIPForeign() + self.annexj = AnnexJCodec() + + # fake multiplexer has a VLAN node in it + self.mux = FauxMultiplexer(self.address, vlan) + + # bind the stack together + bind(self, self.bip, self.annexj, self.mux) + +# +# BBMDNode +# + +@bacpypes_debugging +class BBMDNode(_repr, ClientStateMachine): + + def __init__(self, address, vlan): + if _debug: BBMDNode._debug("__init__ %r %r", address, vlan) + ClientStateMachine.__init__(self) + + # save the address + self.address = Address(address) + + # BACnet/IP interpreter + self.bip = BIPBBMD(self.address) + self.annexj = AnnexJCodec() + + # fake multiplexer has a VLAN node in it + self.mux = FauxMultiplexer(self.address, vlan) + + # bind the stack together + bind(self, self.bip, self.annexj, self.mux) + + diff --git a/tests/test_bvll/test_bbmd.py b/tests/test_bvll/test_bbmd.py new file mode 100644 index 0000000..fdffa2a --- /dev/null +++ b/tests/test_bvll/test_bbmd.py @@ -0,0 +1 @@ +# placeholder diff --git a/tests/test_bvll/test_codec.py b/tests/test_bvll/test_codec.py index 22105a8..07de30f 100644 --- a/tests/test_bvll/test_codec.py +++ b/tests/test_bvll/test_codec.py @@ -69,17 +69,21 @@ class TestAnnexJCodec(unittest.TestCase): if _debug: TestAnnexJCodec._debug("test_codec_01") # Request successful - self.request(Result(0)) - self.indication(pduData=xtob('810000060000')) + pdu_bytes = xxtob('81.00.0006.0000') - self.response(PDU(xtob('810000060000'))) + self.request(Result(0)) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) self.confirmation(Result, bvlciResultCode=0) # Request error condition - self.request(Result(1)) - self.indication(pduData=xtob('810000060001')) + pdu_bytes = xxtob('81.00.0006.0001') - self.response(PDU(xtob('810000060001'))) + self.request(Result(1)) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) self.confirmation(Result, bvlciResultCode=1) def test_write_broadcast_distribution_table(self): @@ -87,16 +91,18 @@ class TestAnnexJCodec(unittest.TestCase): if _debug: TestAnnexJCodec._debug("test_write_broadcast_distribution_table") # write an empty table - self.request(WriteBroadcastDistributionTable([])) - self.indication(pduData=xxtob('81.01.0004')) + pdu_bytes = xxtob('81.01.0004') - self.response(PDU(xxtob('81.01.0004'))) + self.request(WriteBroadcastDistributionTable([])) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) self.confirmation(WriteBroadcastDistributionTable, bvlciBDT=[]) # write a table with an element addr = Address('192.168.0.254/24') pdu_bytes = xxtob('81.01.000e' - 'c0.a8.00.fe.ba.c0.ff.ff.ff.00' + 'c0.a8.00.fe.ba.c0 ff.ff.ff.00' # address and mask ) self.request(WriteBroadcastDistributionTable([addr])) @@ -110,10 +116,12 @@ class TestAnnexJCodec(unittest.TestCase): if _debug: TestAnnexJCodec._debug("test_read_broadcast_distribution_table") # read the table - self.request(ReadBroadcastDistributionTable()) - self.indication(pduData=xxtob('81.02.0004')) + pdu_bytes = xxtob('81.02.0004') - self.response(PDU(xxtob('81.02.0004'))) + self.request(ReadBroadcastDistributionTable()) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) self.confirmation(ReadBroadcastDistributionTable) def test_read_broadcast_distribution_table_ack(self): @@ -121,16 +129,18 @@ class TestAnnexJCodec(unittest.TestCase): if _debug: TestAnnexJCodec._debug("test_read_broadcast_distribution_table_ack") # read returns an empty table - self.request(ReadBroadcastDistributionTableAck([])) - self.indication(pduData=xxtob('81.03.0004')) + pdu_bytes = xxtob('81.03.0004') - self.response(PDU(xxtob('81.03.0004'))) + self.request(ReadBroadcastDistributionTableAck([])) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) self.confirmation(ReadBroadcastDistributionTableAck, bvlciBDT=[]) # read returns a table with an element addr = Address('192.168.0.254/24') - pdu_bytes = xxtob('81.03.000e' - 'c0.a8.00.fe.ba.c0.ff.ff.ff.00' + pdu_bytes = xxtob('81.03.000e' # bvlci + 'c0.a8.00.fe.ba.c0 ff.ff.ff.00' # address and mask ) self.request(ReadBroadcastDistributionTableAck([addr])) @@ -146,7 +156,7 @@ class TestAnnexJCodec(unittest.TestCase): # read returns a table with an element addr = Address('192.168.0.1') xpdu = xxtob('deadbeef') - pdu_bytes = xxtob('81.04.000e' + pdu_bytes = xxtob('81.04.000e' # bvlci 'c0.a8.00.01.ba.c0' # original source address 'deadbeef' # forwarded PDU ) @@ -162,7 +172,7 @@ class TestAnnexJCodec(unittest.TestCase): if _debug: TestAnnexJCodec._debug("test_register_foreign_device") # register as a foreign device with a 30 second time-to-live - pdu_bytes = xxtob('81.05.0006' + pdu_bytes = xxtob('81.05.0006' # bvlci '001e' # time-to-live ) @@ -177,10 +187,12 @@ class TestAnnexJCodec(unittest.TestCase): if _debug: TestAnnexJCodec._debug("test_read_foreign_device_table") # read returns an empty table - self.request(ReadForeignDeviceTable()) - self.indication(pduData=xxtob('81.06.0004')) + pdu_bytes = xxtob('81.06.0004') - self.response(PDU(xxtob('81.06.0004'))) + self.request(ReadForeignDeviceTable()) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) self.confirmation(ReadForeignDeviceTable) def test_read_foreign_device_table_ack(self): @@ -188,10 +200,12 @@ class TestAnnexJCodec(unittest.TestCase): if _debug: TestAnnexJCodec._debug("test_read_foreign_device_table_ack") # read returns an empty table - self.request(ReadForeignDeviceTableAck([])) - self.indication(pduData=xxtob('81.07.0004')) + pdu_bytes = xxtob('81.07.0004') - self.response(PDU(xxtob('81.07.0004'))) + self.request(ReadForeignDeviceTableAck([])) + self.indication(pduData=pdu_bytes) + + self.response(PDU(pdu_bytes)) self.confirmation(ReadForeignDeviceTableAck, bvlciFDT=[]) # read returns a table with one entry @@ -199,9 +213,9 @@ class TestAnnexJCodec(unittest.TestCase): fdte.fdAddress = Address("192.168.0.10") fdte.fdTTL = 30 fdte.fdRemain = 15 - pdu_bytes = xxtob('81.07.000e' - 'c0.a8.00.0a.ba.c0' - '001e.000f' + pdu_bytes = xxtob('81.07.000e' # bvlci + 'c0.a8.00.0a.ba.c0' # address + '001e.000f' # ttl and remaining ) self.request(ReadForeignDeviceTableAck([fdte])) @@ -216,7 +230,7 @@ class TestAnnexJCodec(unittest.TestCase): # delete an element addr = Address('192.168.0.11/24') - pdu_bytes = xxtob('81.08.000a' + pdu_bytes = xxtob('81.08.000a' # bvlci 'c0.a8.00.0b.ba.c0' # address of entry to be deleted ) @@ -232,7 +246,7 @@ class TestAnnexJCodec(unittest.TestCase): # read returns a table with an element xpdu = xxtob('deadbeef') - pdu_bytes = xxtob('81.09.0008' + pdu_bytes = xxtob('81.09.0008' # bvlci 'deadbeef' # PDU to broadcast ) @@ -248,7 +262,7 @@ class TestAnnexJCodec(unittest.TestCase): # read returns a table with an element xpdu = xxtob('deadbeef') - pdu_bytes = xxtob('81.0a.0008' + pdu_bytes = xxtob('81.0a.0008' # bvlci 'deadbeef' # PDU being unicast ) @@ -264,7 +278,7 @@ class TestAnnexJCodec(unittest.TestCase): # read returns a table with an element xpdu = xxtob('deadbeef') - pdu_bytes = xxtob('81.0b.0008' + pdu_bytes = xxtob('81.0b.0008' # bvlci 'deadbeef' # PDU being broadcast ) diff --git a/tests/test_bvll/test_foreign.py b/tests/test_bvll/test_foreign.py new file mode 100644 index 0000000..8e296a6 --- /dev/null +++ b/tests/test_bvll/test_foreign.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Test BVLL Foreign Devices +------------------------- +""" + +import unittest + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob + +from bacpypes.pdu import Address, PDU, LocalBroadcast +from bacpypes.vlan import IPNetwork, IPRouter + +from ..state_machine import match_pdu, StateMachineGroup +from ..time_machine import reset_time_machine, run_time_machine + +from .helpers import SnifferNode, SimpleNode, ForeignNode, BBMDNode + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + + +# extended form of xtob that first removes whitespace and period seperators +xxtob = lambda s: xtob(''.join(s.split()).replace('.', '')) + + +# +# TNetwork +# + +@bacpypes_debugging +class TNetwork(StateMachineGroup): + + def __init__(self): + if _debug: TNetwork._debug("__init__") + StateMachineGroup.__init__(self) + + # reset the time machine + reset_time_machine() + if _debug: TNetwork._debug(" - time machine reset") + + # make a router + self.router = IPRouter() + + # make a home LAN + self.home_vlan = IPNetwork() + self.router.add_network(Address("192.168.5.1/24"), self.home_vlan) + + # home sniffer node + self.home_sniffer = SnifferNode("192.168.5.254/24", self.home_vlan) + self.append(self.home_sniffer) + + # make a remote LAN + self.remote_vlan = IPNetwork() + self.router.add_network(Address("192.168.6.1/24"), self.remote_vlan) + + # remote sniffer node + self.remote_sniffer = SnifferNode("192.168.6.254/24", self.remote_vlan) + self.append(self.remote_sniffer) + + # the foreign device + self.fd = ForeignNode("192.168.6.2/24", self.remote_vlan) + self.append(self.fd) + + # intermediate test node + self.tnode = SimpleNode("192.168.5.2/24", self.home_vlan) + self.append(self.tnode) + + # bbmd + self.bbmd = BBMDNode("192.168.5.3/24", self.home_vlan) + self.append(self.bbmd) + + def run(self, time_limit=60.0): + if _debug: TNetwork._debug("run %r", time_limit) + + # run the group + super(TNetwork, self).run() + + # run it for some time + run_time_machine(time_limit) + if _debug: + TNetwork._debug(" - time machine finished") + for state_machine in self.state_machines: + TNetwork._debug(" - machine: %r", state_machine) + for direction, pdu in state_machine.transaction_log: + TNetwork._debug(" %s %s", direction, str(pdu)) + + # check for success + all_success, some_failed = super(TNetwork, self).check_for_success() + assert all_success + + +@bacpypes_debugging +class TestSimple(unittest.TestCase): + + def test_idle(self): + """Test an idle network, nothing happens is success.""" + if _debug: TestSimple._debug("test_idle") + + # create a network + tnet = TNetwork() + + # all start states are successful + tnet.home_sniffer.start_state.success() + tnet.remote_sniffer.start_state.success() + tnet.fd.start_state.success() + tnet.tnode.start_state.success() + tnet.bbmd.start_state.success() + + # run the group + tnet.run() + diff --git a/tests/test_bvll/test_simple.py b/tests/test_bvll/test_simple.py new file mode 100644 index 0000000..637128a --- /dev/null +++ b/tests/test_bvll/test_simple.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Test BVLL Simple Devices +------------------------ +""" + +import unittest + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger, xtob + +from bacpypes.pdu import PDU, LocalBroadcast +from bacpypes.vlan import IPNetwork + +from ..state_machine import match_pdu, StateMachineGroup +from ..time_machine import reset_time_machine, run_time_machine + +from .helpers import SnifferNode, SimpleNode + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + + +# extended form of xtob that first removes whitespace and period seperators +xxtob = lambda s: xtob(''.join(s.split()).replace('.', '')) + + +# +# TNetwork +# + +@bacpypes_debugging +class TNetwork(StateMachineGroup): + + def __init__(self): + if _debug: TNetwork._debug("__init__") + StateMachineGroup.__init__(self) + + # reset the time machine + reset_time_machine() + if _debug: TNetwork._debug(" - time machine reset") + + # make a little LAN + self.vlan = IPNetwork() + + # test device + self.td = SimpleNode("192.168.4.1/24", self.vlan) + self.append(self.td) + + # implementation under test + self.iut = SimpleNode("192.168.4.2/24", self.vlan) + self.append(self.iut) + + # sniffer node + self.sniffer = SnifferNode("192.168.4.254/24", self.vlan) + self.append(self.sniffer) + + + def run(self, time_limit=60.0): + if _debug: TNetwork._debug("run %r", time_limit) + + # run the group + super(TNetwork, self).run() + + # run it for some time + run_time_machine(time_limit) + if _debug: + TNetwork._debug(" - time machine finished") + for state_machine in self.state_machines: + TNetwork._debug(" - machine: %r", state_machine) + for direction, pdu in state_machine.transaction_log: + TNetwork._debug(" %s %s", direction, str(pdu)) + + # check for success + all_success, some_failed = super(TNetwork, self).check_for_success() + assert all_success + + +@bacpypes_debugging +class TestSimple(unittest.TestCase): + + def test_idle(self): + """Test an idle network, nothing happens is success.""" + if _debug: TestSimple._debug("test_idle") + + # create a network + tnet = TNetwork() + + # all start states are successful + tnet.td.start_state.success() + tnet.iut.start_state.success() + tnet.sniffer.start_state.success() + + # run the group + tnet.run() + + def test_unicast(self): + """Test a unicast message from TD to IUT.""" + if _debug: TestSimple._debug("test_unicast") + + # create a network + tnet = TNetwork() + + # make a PDU from node 1 to node 2 + pdu_data = xxtob('dead.beef') + pdu = PDU(pdu_data, source=tnet.td.address, destination=tnet.iut.address) + if _debug: TestSimple._debug(" - pdu: %r", pdu) + + # test device sends it, iut gets it + tnet.td.start_state.send(pdu).success() + tnet.iut.start_state.receive(PDU, pduSource=tnet.td.address).success() + + # sniffer sees message on the wire + tnet.sniffer.start_state.receive(PDU, + pduSource=tnet.td.address.addrTuple, + pduDestination=tnet.iut.address.addrTuple, + pduData=xxtob('81.0a.0008' # original unicast bvlci + 'deadbeef' # PDU being unicast + ), + ).timeout(1.0).success() + + # run the group + tnet.run() + + def test_broadcast(self): + """Test a broadcast message from TD to IUT.""" + if _debug: TestSimple._debug("test_broadcast") + + # create a network + tnet = TNetwork() + + # make a PDU from node 1 to node 2 + pdu_data = xxtob('dead.beef') + pdu = PDU(pdu_data, source=tnet.td.address, destination=LocalBroadcast()) + if _debug: TestSimple._debug(" - pdu: %r", pdu) + + # test device sends it, iut gets it + tnet.td.start_state.send(pdu).success() + tnet.iut.start_state.receive(PDU, pduSource=tnet.td.address).success() + + # sniffer sees message on the wire + tnet.sniffer.start_state.receive(PDU, + pduSource=tnet.td.address.addrTuple, + pduDestination=('192.168.4.255', 47808), + pduData=xxtob('81.0b.0008' # original broadcast bvlci + 'deadbeef' # PDU being unicast + ), + ).timeout(1.0).success() + + # run the group + tnet.run() + diff --git a/tests/test_utilities/test_state_machine.py b/tests/test_utilities/test_state_machine.py index 8ca7f1f..74e8c9d 100644 --- a/tests/test_utilities/test_state_machine.py +++ b/tests/test_utilities/test_state_machine.py @@ -447,6 +447,7 @@ class TestStateMachineGroup(unittest.TestCase): # check for success assert not tsm.running assert tsm.current_state.is_success_state + assert not smg.is_running assert smg.is_success_state if _debug: TestStateMachine._debug(" - passed") @@ -475,6 +476,7 @@ class TestStateMachineGroup(unittest.TestCase): # check for success assert not tsm.running assert tsm.current_state.is_fail_state + assert not smg.is_running assert smg.is_fail_state if _debug: TestStateMachine._debug(" - passed") @@ -510,6 +512,7 @@ class TestStateMachineEvents(unittest.TestCase): # check for success assert tsm1.current_state.is_success_state assert tsm2.current_state.is_success_state + assert not smg.is_running assert smg.is_success_state if _debug: TestStateMachineEvents._debug(" - passed") @@ -541,6 +544,7 @@ class TestStateMachineEvents(unittest.TestCase): # check for success assert tsm1.current_state.is_success_state assert tsm2.current_state.is_success_state + assert not smg.is_running assert smg.is_success_state if _debug: TestStateMachineEvents._debug(" - passed") diff --git a/tests/time_machine.py b/tests/time_machine.py index fcde01f..ed01e04 100755 --- a/tests/time_machine.py +++ b/tests/time_machine.py @@ -62,6 +62,33 @@ class TimeMachine(_TaskManager): _TaskManager.resume_task(self, task) + def peek_next_task(self): + """Get the next task if there's one that should be processed.""" + if _debug: TimeMachine._debug("peek_next_task @ %r", self.current_time) + if _debug: TimeMachine._debug(" - time_limit: %r", self.time_limit) + if _debug: TimeMachine._debug(" - tasks: %r", self.tasks) + + task = None + + if (self.time_limit is not None) and (self.current_time >= self.time_limit): + if _debug: TimeMachine._debug(" - time limit reached") + + elif not self.tasks: + if _debug: TimeMachine._debug(" - no more tasks") + + else: + # peek at the next task and see when it is supposed to run + when, task = self.tasks[0] + if when >= self.time_limit: + if _debug: TimeMachine._debug(" - time limit reached") + + # clear out the task + task = None + else: + if _debug: TimeMachine._debug(" - task: %r", task) + + return task + def get_next_task(self): """get the next task if there's one that should be processed, and return how long it will be until the next one should be @@ -132,7 +159,9 @@ def reset_time_machine(): @bacpypes_debugging def run_time_machine(time_limit): """This function is called after a set of tasks have been installed - and they should all run. + and they should run. The machine will stop when the limit has been + reached (maybe the middle of some tests) and can be called again to + continue running. """ if _debug: run_time_machine._debug("run_time_machine %r", time_limit) global time_machine @@ -142,10 +171,18 @@ def run_time_machine(time_limit): raise RuntimeError("no time machine") if time_limit <= 0.0: raise ValueError("time limit required") + if time_machine.current_time is None: + raise RuntimeError("reset the time machine before running") # pass the limit to the time machine - time_machine.time_limit = time_limit + time_machine.time_limit = time_machine.current_time + time_limit # run until there is nothing left to do - run_once() + while True: + run_once() + if _debug: run_time_machine._debug(" - ran once") + + if not time_machine.peek_next_task(): + if _debug: run_time_machine._debug(" - no more to do") + break