1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

synchronize task changes with other sources

This commit is contained in:
Joel Bender 2015-08-09 22:09:11 -04:00
parent 67245e2184
commit bbcf3c2bc0
3 changed files with 76 additions and 31 deletions

View File

@ -52,9 +52,16 @@ class _Task(DebugContents, Logging):
self.taskTime = None
self.isScheduled = False
def install_task(self, when=None):
def install_task(self, when=None, delta=None):
global _task_manager, _unscheduled_tasks
# check for delta from now
if (when is None) and (delta is not None):
if not _task_manager:
raise RuntimeError("no task manager")
when = _task_manager.get_time() + delta
# fallback to the inited value
if when is None:
when = self.taskTime
@ -110,12 +117,20 @@ class OneShotDeleteTask(_Task):
#
def OneShotFunction(fn, *args, **kwargs):
class OneShotFunctionTask(OneShotDeleteTask):
def process_task(self):
OneShotFunction._debug("process_task %r %s %s", fn, repr(args), repr(kwargs))
fn(*args, **kwargs)
task = OneShotFunctionTask(_time())
task.install_task()
task = OneShotFunctionTask()
# if there is no task manager, postpone the install
if not _task_manager:
_unscheduled_tasks.append(task)
else:
task.install_task(_task_manager.get_time())
return task
@ -129,9 +144,11 @@ def FunctionTask(fn, *args, **kwargs):
_log.debug("FunctionTask %r %r %r", fn, args, kwargs)
class _FunctionTask(OneShotDeleteTask):
def process_task(self):
_log.debug("process_task (%r %r %r)", fn, args, kwargs)
fn(*args, **kwargs)
task = _FunctionTask()
_log.debug(" - task: %r", task)
@ -148,10 +165,6 @@ class RecurringTask(_Task):
def __init__(self, interval=None):
_Task.__init__(self)
self.taskInterval = interval
if interval is None:
self.taskTime = None
else:
self.taskTime = _time() + (self.taskInterval / 1000)
def install_task(self, interval=None):
global _task_manager, _unscheduled_tasks
@ -159,18 +172,22 @@ class RecurringTask(_Task):
# set the interval if it hasn't already been set
if interval is not None:
self.taskInterval = interval
if not self.taskInterval:
if self.taskInterval is None:
raise RuntimeError("interval unset, use ctor or install_task parameter")
if self.taskInterval <= 0.0:
raise RuntimeError("interval must be greater than zero")
# get ready for the next interval (aligned)
now = _time()
interval = self.taskInterval / 1000.0
self.taskTime = now + interval - (now % interval)
# pass along to the task manager
# if there is no task manager, postpone the install
if not _task_manager:
_unscheduled_tasks.append(self)
else:
# get ready for the next interval (aligned)
now = _task_manager.get_time()
interval = self.taskInterval / 1000.0
self.taskTime = now + interval - (now % interval)
# install it
_task_manager.install_task(self)
#
@ -243,8 +260,14 @@ class TaskManager(SingletonLogging):
for task in _unscheduled_tasks:
self.install_task(task)
def get_time(self):
if _debug: TaskManager._debug("get_time")
# return the real time
return _time()
def install_task(self, task):
if _debug: TaskManager._debug("install_task %r %r", task, task.taskTime)
if _debug: TaskManager._debug("install_task %r @ %r", task, task.taskTime)
# if this is already installed, suspend it
if task.isScheduled:

View File

@ -265,7 +265,6 @@ class TaskManager(SingletonLogging):
def install_task(self, task):
if _debug: TaskManager._debug("install_task %r @ %r", task, task.taskTime)
if _debug: TaskManager._debug(" - self: %r", self)
# if this is already installed, suspend it
if task.isScheduled:

View File

@ -52,9 +52,16 @@ class _Task(DebugContents, Logging):
self.taskTime = None
self.isScheduled = False
def install_task(self, when=None):
def install_task(self, when=None, delta=None):
global _task_manager, _unscheduled_tasks
# check for delta from now
if (when is None) and (delta is not None):
if not _task_manager:
raise RuntimeError("no task manager")
when = _task_manager.get_time() + delta
# fallback to the inited value
if when is None:
when = self.taskTime
@ -111,12 +118,20 @@ class OneShotDeleteTask(_Task):
@bacpypes_debugging
def OneShotFunction(fn, *args, **kwargs):
class OneShotFunctionTask(OneShotDeleteTask):
def process_task(self):
OneShotFunction._debug("process_task %r %s %s", fn, repr(args), repr(kwargs))
fn(*args, **kwargs)
task = OneShotFunctionTask(_time())
task.install_task()
task = OneShotFunctionTask()
# if there is no task manager, postpone the install
if not _task_manager:
_unscheduled_tasks.append(task)
else:
task.install_task(_task_manager.get_time())
return task
@ -128,9 +143,11 @@ def FunctionTask(fn, *args, **kwargs):
_log.debug("FunctionTask %r %r %r", fn, args, kwargs)
class _FunctionTask(OneShotDeleteTask):
def process_task(self):
_log.debug("process_task (%r %r %r)", fn, args, kwargs)
fn(*args, **kwargs)
task = _FunctionTask()
_log.debug(" - task: %r", task)
@ -147,10 +164,6 @@ class RecurringTask(_Task):
def __init__(self, interval=None):
_Task.__init__(self)
self.taskInterval = interval
if interval is None:
self.taskTime = None
else:
self.taskTime = _time() + (self.taskInterval / 1000)
def install_task(self, interval=None):
global _task_manager, _unscheduled_tasks
@ -158,18 +171,22 @@ class RecurringTask(_Task):
# set the interval if it hasn't already been set
if interval is not None:
self.taskInterval = interval
if not self.taskInterval:
if self.taskInterval is None:
raise RuntimeError("interval unset, use ctor or install_task parameter")
if self.taskInterval <= 0.0:
raise RuntimeError("interval must be greater than zero")
# get ready for the next interval (aligned)
now = _time()
interval = self.taskInterval / 1000.0
self.taskTime = now + interval - (now % interval)
# pass along to the task manager
# if there is no task manager, postpone the install
if not _task_manager:
_unscheduled_tasks.append(self)
else:
# get ready for the next interval (aligned)
now = _task_manager.get_time()
interval = self.taskInterval / 1000.0
self.taskTime = now + interval - (now % interval)
# install it
_task_manager.install_task(self)
#
@ -240,8 +257,14 @@ class TaskManager(SingletonLogging):
for task in _unscheduled_tasks:
self.install_task(task)
def get_time(self):
if _debug: TaskManager._debug("get_time")
# return the real time
return _time()
def install_task(self, task):
if _debug: TaskManager._debug("install_task %r %r", task, task.taskTime)
if _debug: TaskManager._debug("install_task %r @ %r", task, task.taskTime)
# if this is already installed, suspend it
if task.isScheduled: