1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00
bacpypes/tests/test_utilities/test_time_machine.py

243 lines
7.0 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Test Utilities Time Machine
----------------------------
"""
import unittest
from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.task import OneShotTask, FunctionTask, RecurringTask
from ..time_machine import TimeMachine, reset_time_machine, run_time_machine
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# reference to time machine
time_machine = None
@bacpypes_debugging
def almost_equal(x, y):
"""Compare two arrays of floats."""
# must be the same length
if len(x) != len(y):
return False
# absolute value of the difference is tollerable
for xx, yy in zip(x, y):
if abs(xx - yy) > 0.000001:
return False
# good to go
return True
@bacpypes_debugging
def setup_module(module):
if _debug: setup_module._debug("setup_module %r", module)
global time_machine
# this is a singleton
time_machine = TimeMachine()
# make sure this is the same one referenced by the functions
assert time_machine is reset_time_machine.__globals__['time_machine']
assert time_machine is run_time_machine.__globals__['time_machine']
@bacpypes_debugging
def teardown_module():
if _debug: teardown_module._debug("teardown_module")
global time_machine
# all done
time_machine = None
@bacpypes_debugging
class SampleOneShotTask(OneShotTask):
def __init__(self):
if _debug: SampleOneShotTask._debug("__init__")
OneShotTask.__init__(self)
self.process_task_called = []
def process_task(self):
if _debug: SampleOneShotTask._debug("process_task @ %r", time_machine.current_time)
global time_machine
# add the current time
self.process_task_called.append(time_machine.current_time)
# flag to make sure the function was called
sample_task_function_called = []
@bacpypes_debugging
def sample_task_function(*args, **kwargs):
if _debug: sample_task_function._debug("sample_task_function %r %r @ %r", args, kwargs, time_machine.current_time)
global sample_task_function_called, time_machine
# bump the counter
sample_task_function_called.append(time_machine.current_time)
@bacpypes_debugging
class SampleRecurringTask(RecurringTask):
def __init__(self):
if _debug: SampleRecurringTask._debug("__init__")
RecurringTask.__init__(self)
self.process_task_called = []
def process_task(self):
if _debug: SampleRecurringTask._debug("process_task @ %r", time_machine.current_time)
global time_machine
# add the current time
self.process_task_called.append(time_machine.current_time)
@bacpypes_debugging
class TestTimeMachine(unittest.TestCase):
def test_time_machine_exists(self):
if _debug: TestTimeMachine._debug("test_time_machine_exists")
# time machine created by setUpPackage
assert time_machine is not None
def test_empty_run(self):
if _debug: TestTimeMachine._debug("test_empty_run")
# reset the time machine
reset_time_machine()
# let it run
run_time_machine(60.0)
# no time has passed
assert time_machine.current_time == 0.0
def test_one_shot_immediate(self):
if _debug: TestTimeMachine._debug("test_one_shot_immediate")
# create a function task
ft = SampleOneShotTask()
# reset the time machine, install the task, let it run
reset_time_machine()
ft.install_task(0.0)
run_time_machine(60.0)
# function called, no time has passed
assert almost_equal(ft.process_task_called, [0.0])
assert time_machine.current_time == 0.0
def test_function_task_immediate(self):
if _debug: TestTimeMachine._debug("test_function_task_immediate")
global sample_task_function_called
# create a function task
ft = FunctionTask(sample_task_function)
sample_task_function_called = []
# reset the time machine, install the task, let it run
reset_time_machine()
ft.install_task(0.0)
run_time_machine(60.0)
# function called, no time has passed
assert almost_equal(sample_task_function_called, [0.0])
assert time_machine.current_time == 0.0
def test_function_task_delay(self):
if _debug: TestTimeMachine._debug("test_function_task_delay")
global sample_task_function_called
sample_delay = 10.0
# create a function task
ft = FunctionTask(sample_task_function)
sample_task_function_called = []
# reset the time machine, install the task, let it run
reset_time_machine()
ft.install_task(sample_delay)
run_time_machine(60.0)
# function called, no time has passed
assert almost_equal(sample_task_function_called, [sample_delay])
assert time_machine.current_time == sample_delay
def test_recurring_task_1(self):
if _debug: TestTimeMachine._debug("test_recurring_task_1")
# create a function task
ft = SampleRecurringTask()
# reset the time machine, install the task, let it run
reset_time_machine()
ft.install_task(1000.0)
run_time_machine(5.0)
# function called, no time has passed
assert almost_equal(ft.process_task_called, [1.0, 2.0, 3.0, 4.0])
assert time_machine.current_time == 5.0
def test_recurring_task_2(self):
if _debug: TestTimeMachine._debug("test_recurring_task_2")
# create a function task
ft1 = SampleRecurringTask()
ft2 = SampleRecurringTask()
# reset the time machine, install the task, let it run
reset_time_machine()
ft1.install_task(1000.0)
ft2.install_task(1500.0)
run_time_machine(5.0)
# function called, no time has passed
assert almost_equal(ft1.process_task_called, [1.0, 2.0, 3.0, 4.0])
assert almost_equal(ft2.process_task_called, [1.5, 3.0, 4.5])
assert time_machine.current_time == 5.0
def test_recurring_task_3(self):
if _debug: TestTimeMachine._debug("test_recurring_task_3")
# create a function task
ft = SampleRecurringTask()
# reset the time machine, install the task, let it run
reset_time_machine()
ft.install_task(1000.0, offset=100.0)
run_time_machine(5.0)
# function called, no time has passed
assert almost_equal(ft.process_task_called, [0.1, 1.1, 2.1, 3.1, 4.1])
assert time_machine.current_time == 5.0
def test_recurring_task_4(self):
if _debug: TestTimeMachine._debug("test_recurring_task_4")
# create a function task
ft = SampleRecurringTask()
# reset the time machine, install the task, let it run
reset_time_machine()
ft.install_task(1000.0, offset=-100.0)
run_time_machine(5.0)
# function called, no time has passed
assert almost_equal(ft.process_task_called, [0.9, 1.9, 2.9, 3.9, 4.9])
assert time_machine.current_time == 5.0