mirror of
https://github.com/JoelBender/bacpypes
synced 2025-10-05 22:18:16 +08:00
243 lines
7.0 KiB
Python
243 lines
7.0 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Test Utilities Time Machine
|
|
----------------------------
|
|
"""
|
|
|
|
import unittest
|
|
|
|
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
|
|
|
|
from bacpypes.task import OneShotTask, FunctionTask, RecurringTask
|
|
from ..time_machine import TimeMachine, reset_time_machine, run_time_machine
|
|
|
|
# some debugging
|
|
_debug = 0
|
|
_log = ModuleLogger(globals())
|
|
|
|
# reference to time machine
|
|
time_machine = None
|
|
|
|
|
|
@bacpypes_debugging
|
|
def almost_equal(x, y):
|
|
"""Compare two arrays of floats."""
|
|
# must be the same length
|
|
if len(x) != len(y):
|
|
return False
|
|
|
|
# absolute value of the difference is tollerable
|
|
for xx, yy in zip(x, y):
|
|
if abs(xx - yy) > 0.000001:
|
|
return False
|
|
|
|
# good to go
|
|
return True
|
|
|
|
|
|
@bacpypes_debugging
|
|
def setup_module(module):
|
|
if _debug: setup_module._debug("setup_module %r", module)
|
|
global time_machine
|
|
|
|
# this is a singleton
|
|
time_machine = TimeMachine()
|
|
|
|
# make sure this is the same one referenced by the functions
|
|
assert time_machine is reset_time_machine.__globals__['time_machine']
|
|
assert time_machine is run_time_machine.__globals__['time_machine']
|
|
|
|
|
|
@bacpypes_debugging
|
|
def teardown_module():
|
|
if _debug: teardown_module._debug("teardown_module")
|
|
global time_machine
|
|
|
|
# all done
|
|
time_machine = None
|
|
|
|
|
|
@bacpypes_debugging
|
|
class SampleOneShotTask(OneShotTask):
|
|
|
|
def __init__(self):
|
|
if _debug: SampleOneShotTask._debug("__init__")
|
|
OneShotTask.__init__(self)
|
|
|
|
self.process_task_called = []
|
|
|
|
def process_task(self):
|
|
global time_machine
|
|
if _debug: SampleOneShotTask._debug("process_task @ %r", time_machine.current_time)
|
|
|
|
# add the current time
|
|
self.process_task_called.append(time_machine.current_time)
|
|
|
|
|
|
# flag to make sure the function was called
|
|
sample_task_function_called = []
|
|
|
|
@bacpypes_debugging
|
|
def sample_task_function(*args, **kwargs):
|
|
global sample_task_function_called, time_machine
|
|
if _debug: sample_task_function._debug("sample_task_function %r %r @ %r", args, kwargs, time_machine.current_time)
|
|
|
|
# bump the counter
|
|
sample_task_function_called.append(time_machine.current_time)
|
|
|
|
|
|
@bacpypes_debugging
|
|
class SampleRecurringTask(RecurringTask):
|
|
|
|
def __init__(self):
|
|
if _debug: SampleRecurringTask._debug("__init__")
|
|
RecurringTask.__init__(self)
|
|
|
|
self.process_task_called = []
|
|
|
|
def process_task(self):
|
|
global time_machine
|
|
if _debug: SampleRecurringTask._debug("process_task @ %r", time_machine.current_time)
|
|
|
|
# add the current time
|
|
self.process_task_called.append(time_machine.current_time)
|
|
|
|
|
|
@bacpypes_debugging
|
|
class TestTimeMachine(unittest.TestCase):
|
|
|
|
def test_time_machine_exists(self):
|
|
if _debug: TestTimeMachine._debug("test_time_machine_exists")
|
|
|
|
# time machine created by setUpPackage
|
|
assert time_machine is not None
|
|
|
|
def test_empty_run(self):
|
|
if _debug: TestTimeMachine._debug("test_empty_run")
|
|
|
|
# reset the time machine
|
|
reset_time_machine()
|
|
|
|
# let it run
|
|
run_time_machine(60.0)
|
|
|
|
# no time has passed
|
|
assert time_machine.current_time == 0.0
|
|
|
|
def test_one_shot_immediate(self):
|
|
if _debug: TestTimeMachine._debug("test_one_shot_immediate")
|
|
|
|
# create a function task
|
|
ft = SampleOneShotTask()
|
|
|
|
# reset the time machine, install the task, let it run
|
|
reset_time_machine()
|
|
ft.install_task(0.0)
|
|
run_time_machine(60.0)
|
|
|
|
# function called, no time has passed
|
|
assert almost_equal(ft.process_task_called, [0.0])
|
|
assert time_machine.current_time == 0.0
|
|
|
|
def test_function_task_immediate(self):
|
|
if _debug: TestTimeMachine._debug("test_function_task_immediate")
|
|
global sample_task_function_called
|
|
|
|
# create a function task
|
|
ft = FunctionTask(sample_task_function)
|
|
sample_task_function_called = []
|
|
|
|
# reset the time machine, install the task, let it run
|
|
reset_time_machine()
|
|
ft.install_task(0.0)
|
|
run_time_machine(60.0)
|
|
|
|
# function called, no time has passed
|
|
assert almost_equal(sample_task_function_called, [0.0])
|
|
assert time_machine.current_time == 0.0
|
|
|
|
def test_function_task_delay(self):
|
|
if _debug: TestTimeMachine._debug("test_function_task_delay")
|
|
global sample_task_function_called
|
|
|
|
sample_delay = 10.0
|
|
|
|
# create a function task
|
|
ft = FunctionTask(sample_task_function)
|
|
sample_task_function_called = []
|
|
|
|
# reset the time machine, install the task, let it run
|
|
reset_time_machine()
|
|
ft.install_task(sample_delay)
|
|
run_time_machine(60.0)
|
|
|
|
# function called, no time has passed
|
|
assert almost_equal(sample_task_function_called, [sample_delay])
|
|
assert time_machine.current_time == sample_delay
|
|
|
|
def test_recurring_task_1(self):
|
|
if _debug: TestTimeMachine._debug("test_recurring_task_1")
|
|
|
|
# create a function task
|
|
ft = SampleRecurringTask()
|
|
|
|
# reset the time machine, install the task, let it run
|
|
reset_time_machine()
|
|
ft.install_task(1000.0)
|
|
run_time_machine(5.0)
|
|
|
|
# function called, no time has passed
|
|
assert almost_equal(ft.process_task_called, [1.0, 2.0, 3.0, 4.0])
|
|
assert time_machine.current_time == 5.0
|
|
|
|
def test_recurring_task_2(self):
|
|
if _debug: TestTimeMachine._debug("test_recurring_task_2")
|
|
|
|
# create a function task
|
|
ft1 = SampleRecurringTask()
|
|
ft2 = SampleRecurringTask()
|
|
|
|
# reset the time machine, install the task, let it run
|
|
reset_time_machine()
|
|
ft1.install_task(1000.0)
|
|
ft2.install_task(1500.0)
|
|
run_time_machine(5.0)
|
|
|
|
# function called, no time has passed
|
|
assert almost_equal(ft1.process_task_called, [1.0, 2.0, 3.0, 4.0])
|
|
assert almost_equal(ft2.process_task_called, [1.5, 3.0, 4.5])
|
|
assert time_machine.current_time == 5.0
|
|
|
|
def test_recurring_task_3(self):
|
|
if _debug: TestTimeMachine._debug("test_recurring_task_3")
|
|
|
|
# create a function task
|
|
ft = SampleRecurringTask()
|
|
|
|
# reset the time machine, install the task, let it run
|
|
reset_time_machine()
|
|
ft.install_task(1000.0, offset=100.0)
|
|
run_time_machine(5.0)
|
|
|
|
# function called, no time has passed
|
|
assert almost_equal(ft.process_task_called, [0.1, 1.1, 2.1, 3.1, 4.1])
|
|
assert time_machine.current_time == 5.0
|
|
|
|
def test_recurring_task_4(self):
|
|
if _debug: TestTimeMachine._debug("test_recurring_task_4")
|
|
|
|
# create a function task
|
|
ft = SampleRecurringTask()
|
|
|
|
# reset the time machine, install the task, let it run
|
|
reset_time_machine()
|
|
ft.install_task(1000.0, offset=-100.0)
|
|
run_time_machine(5.0)
|
|
|
|
# function called, no time has passed
|
|
assert almost_equal(ft.process_task_called, [0.9, 1.9, 2.9, 3.9, 4.9])
|
|
assert time_machine.current_time == 5.0
|
|
|