1
0
mirror of https://github.com/JoelBender/bacpypes synced 2025-09-28 22:15:23 +08:00

additional 'switch' for swapping components in/out of a stack without rebinding (#172)

This commit is contained in:
Joel Bender 2018-07-04 17:27:44 -04:00
parent e191623c2b
commit c368b0ed50
4 changed files with 311 additions and 104 deletions

View File

@ -383,6 +383,110 @@ class Echo(Client, Server):
bacpypes_debugging(Echo)
#
# Switch
#
class Switch(Client, Server):
"""
A Switch is a client and server that wraps around clients and/or servers
and provides a way to switch between them without unbinding and rebinding
the stack.
"""
class TerminalWrapper(Client, Server):
def __init__(self, switch, terminal):
self.switch = switch
self.terminal = terminal
if isinstance(terminal, Server):
bind(self, terminal)
if isinstance(terminal, Client):
bind(terminal, self)
def indication(self, *args, **kwargs):
self.switch.request(*args, **kwargs)
def confirmation(self, *args, **kwargs):
self.switch.response(*args, **kwargs)
def __init__(self, **terminals):
if _debug: Switch._debug("__init__ %r", terminals)
Client.__init__(self)
Server.__init__(self)
# wrap the terminals
self.terminals = {k:Switch.TerminalWrapper(self, v) for k, v in terminals.items()}
self.current_terminal = None
def __getitem__(self, key):
if key not in self.terminals:
raise KeyError("%r not a terminal" % (key,))
# return the terminal, not the wrapper
return self.terminals[key].terminal
def __setitem__(self, key, term):
if key in self.terminals:
raise KeyError("%r already a terminal" % (key,))
# build a wrapper and map it
self.terminals[key] = Switch.TerminalWrapper(self, term)
def __delitem__(self, key):
if key not in self.terminals:
raise KeyError("%r not a terminal" % (key,))
# if deleting current terminal, deactivate it
term = self.terminals[key].terminal
if term is self.current_terminal:
terminal_deactivate = getattr(self.current_terminal, 'deactivate', None)
if terminal_deactivate:
terminal_deactivate()
self.current_terminal = None
del self.terminals[key]
def switch_terminal(self, key):
if key not in self.terminals:
raise KeyError("%r not a terminal" % (key,))
if self.current_terminal:
terminal_deactivate = getattr(self.current_terminal, 'deactivate', None)
if terminal_deactivate:
terminal_deactivate()
if key is None:
self.current_terminal = None
else:
self.current_terminal = self.terminals[key].terminal
terminal_activate = getattr(self.current_terminal, 'activate', None)
if terminal_activate:
terminal_activate()
def indication(self, *args, **kwargs):
"""Downstream packet, send to current terminal."""
if not self.current_terminal:
raise RuntimeError("no active terminal")
if not isinstance(self.current_terminal, Server):
raise RuntimeError("current terminal not a server")
self.current_terminal.indication(*args, **kwargs)
def confirmation(self, *args, **kwargs):
"""Upstream packet, send to current terminal."""
if not self.current_terminal:
raise RuntimeError("no active terminal")
if not isinstance(self.current_terminal, Client):
raise RuntimeError("current terminal not a client")
self.current_terminal.confirmation(*args, **kwargs)
bacpypes_debugging(Switch)
#
# ServiceAccessPoint
#

View File

@ -376,6 +376,109 @@ class Echo(Client, Server):
self.response(*args, **kwargs)
#
# Switch
#
@bacpypes_debugging
class Switch(Client, Server):
"""
A Switch is a client and server that wraps around clients and/or servers
and provides a way to switch between them without unbinding and rebinding
the stack.
"""
class TerminalWrapper(Client, Server):
def __init__(self, switch, terminal):
self.switch = switch
self.terminal = terminal
if isinstance(terminal, Server):
bind(self, terminal)
if isinstance(terminal, Client):
bind(terminal, self)
def indication(self, *args, **kwargs):
self.switch.request(*args, **kwargs)
def confirmation(self, *args, **kwargs):
self.switch.response(*args, **kwargs)
def __init__(self, **terminals):
if _debug: Switch._debug("__init__ %r", terminals)
Client.__init__(self)
Server.__init__(self)
# wrap the terminals
self.terminals = {k:Switch.TerminalWrapper(self, v) for k, v in terminals.items()}
self.current_terminal = None
def __getitem__(self, key):
if key not in self.terminals:
raise KeyError("%r not a terminal" % (key,))
# return the terminal, not the wrapper
return self.terminals[key].terminal
def __setitem__(self, key, term):
if key in self.terminals:
raise KeyError("%r already a terminal" % (key,))
# build a wrapper and map it
self.terminals[key] = Switch.TerminalWrapper(self, term)
def __delitem__(self, key):
if key not in self.terminals:
raise KeyError("%r not a terminal" % (key,))
# if deleting current terminal, deactivate it
term = self.terminals[key].terminal
if term is self.current_terminal:
terminal_deactivate = getattr(self.current_terminal, 'deactivate', None)
if terminal_deactivate:
terminal_deactivate()
self.current_terminal = None
del self.terminals[key]
def switch_terminal(self, key):
if key not in self.terminals:
raise KeyError("%r not a terminal" % (key,))
if self.current_terminal:
terminal_deactivate = getattr(self.current_terminal, 'deactivate', None)
if terminal_deactivate:
terminal_deactivate()
if key is None:
self.current_terminal = None
else:
self.current_terminal = self.terminals[key].terminal
terminal_activate = getattr(self.current_terminal, 'activate', None)
if terminal_activate:
terminal_activate()
def indication(self, *args, **kwargs):
"""Downstream packet, send to current terminal."""
if not self.current_terminal:
raise RuntimeError("no active terminal")
if not isinstance(self.current_terminal, Server):
raise RuntimeError("current terminal not a server")
self.current_terminal.indication(*args, **kwargs)
def confirmation(self, *args, **kwargs):
"""Upstream packet, send to current terminal."""
if not self.current_terminal:
raise RuntimeError("no active terminal")
if not isinstance(self.current_terminal, Client):
raise RuntimeError("current terminal not a client")
self.current_terminal.confirmation(*args, **kwargs)
#
# ServiceAccessPoint
#

View File

@ -386,6 +386,109 @@ class Echo(Client, Server):
self.response(*args, **kwargs)
#
# Switch
#
@bacpypes_debugging
class Switch(Client, Server):
"""
A Switch is a client and server that wraps around clients and/or servers
and provides a way to switch between them without unbinding and rebinding
the stack.
"""
class TerminalWrapper(Client, Server):
def __init__(self, switch, terminal):
self.switch = switch
self.terminal = terminal
if isinstance(terminal, Server):
bind(self, terminal)
if isinstance(terminal, Client):
bind(terminal, self)
def indication(self, *args, **kwargs):
self.switch.request(*args, **kwargs)
def confirmation(self, *args, **kwargs):
self.switch.response(*args, **kwargs)
def __init__(self, **terminals):
if _debug: Switch._debug("__init__ %r", terminals)
Client.__init__(self)
Server.__init__(self)
# wrap the terminals
self.terminals = {k:Switch.TerminalWrapper(self, v) for k, v in terminals.items()}
self.current_terminal = None
def __getitem__(self, key):
if key not in self.terminals:
raise KeyError("%r not a terminal" % (key,))
# return the terminal, not the wrapper
return self.terminals[key].terminal
def __setitem__(self, key, term):
if key in self.terminals:
raise KeyError("%r already a terminal" % (key,))
# build a wrapper and map it
self.terminals[key] = Switch.TerminalWrapper(self, term)
def __delitem__(self, key):
if key not in self.terminals:
raise KeyError("%r not a terminal" % (key,))
# if deleting current terminal, deactivate it
term = self.terminals[key].terminal
if term is self.current_terminal:
terminal_deactivate = getattr(self.current_terminal, 'deactivate', None)
if terminal_deactivate:
terminal_deactivate()
self.current_terminal = None
del self.terminals[key]
def switch_terminal(self, key):
if key not in self.terminals:
raise KeyError("%r not a terminal" % (key,))
if self.current_terminal:
terminal_deactivate = getattr(self.current_terminal, 'deactivate', None)
if terminal_deactivate:
terminal_deactivate()
if key is None:
self.current_terminal = None
else:
self.current_terminal = self.terminals[key].terminal
terminal_activate = getattr(self.current_terminal, 'activate', None)
if terminal_activate:
terminal_activate()
def indication(self, *args, **kwargs):
"""Downstream packet, send to current terminal."""
if not self.current_terminal:
raise RuntimeError("no active terminal")
if not isinstance(self.current_terminal, Server):
raise RuntimeError("current terminal not a server")
self.current_terminal.indication(*args, **kwargs)
def confirmation(self, *args, **kwargs):
"""Upstream packet, send to current terminal."""
if not self.current_terminal:
raise RuntimeError("no active terminal")
if not isinstance(self.current_terminal, Client):
raise RuntimeError("current terminal not a client")
self.current_terminal.confirmation(*args, **kwargs)
#
# ServiceAccessPoint
#

View File

@ -7,7 +7,7 @@ from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ArgumentParser
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.comm import Client, Server, Debug, bind
from bacpypes.comm import Client, Server, Switch, Debug, bind
from bacpypes.core import run, enable_sleeping
# some debugging
@ -35,109 +35,6 @@ class DebugTerm(Debug):
def deactivate(self):
print(self.label + " deactivated")
#
# Switch
#
@bacpypes_debugging
class Switch(Client, Server):
"""
A Switch is a client and server that wraps around clients and/or servers
and provides a way to switch between them without unbinding and rebinding
the stack.
"""
class TerminalWrapper(Client, Server):
def __init__(self, switch, terminal):
self.switch = switch
self.terminal = terminal
if isinstance(terminal, Server):
bind(self, terminal)
if isinstance(terminal, Client):
bind(terminal, self)
def indication(self, *args, **kwargs):
self.switch.request(*args, **kwargs)
def confirmation(self, *args, **kwargs):
self.switch.response(*args, **kwargs)
def __init__(self, **terminals):
if _debug: Switch._debug("__init__ %r", terminals)
Client.__init__(self)
Server.__init__(self)
# wrap the terminals
self.terminals = {k:Switch.TerminalWrapper(self, v) for k, v in terminals.items()}
self.current_terminal = None
def __getitem__(self, key):
if key not in self.terminals:
raise KeyError("%r not a terminal" % (key,))
# return the terminal, not the wrapper
return self.terminals[key].terminal
def __setitem__(self, key, term):
if key in self.terminals:
raise KeyError("%r already a terminal" % (key,))
# build a wrapper and map it
self.terminals[key] = Switch.TerminalWrapper(self, term)
def __delitem__(self, key):
if key not in self.terminals:
raise KeyError("%r not a terminal" % (key,))
# if deleting current terminal, deactivate it
term = self.terminals[key].terminal
if term is self.current_terminal:
terminal_deactivate = getattr(self.current_terminal, 'deactivate', None)
if terminal_deactivate:
terminal_deactivate()
self.current_terminal = None
del self.terminals[key]
def switch_terminal(self, key):
if key not in self.terminals:
raise KeyError("%r not a terminal" % (key,))
if self.current_terminal:
terminal_deactivate = getattr(self.current_terminal, 'deactivate', None)
if terminal_deactivate:
terminal_deactivate()
if key is None:
self.current_terminal = None
else:
self.current_terminal = self.terminals[key].terminal
terminal_activate = getattr(self.current_terminal, 'activate', None)
if terminal_activate:
terminal_activate()
def indication(self, *args, **kwargs):
"""Downstream packet, send to current terminal."""
if not self.current_terminal:
raise RuntimeError("no active terminal")
if not isinstance(self.current_terminal, Server):
raise RuntimeError("current terminal not a server")
self.current_terminal.indication(*args, **kwargs)
def confirmation(self, *args, **kwargs):
"""Upstream packet, send to current terminal."""
if not self.current_terminal:
raise RuntimeError("no active terminal")
if not isinstance(self.current_terminal, Client):
raise RuntimeError("current terminal not a client")
self.current_terminal.confirmation(*args, **kwargs)
#
# TestConsoleCmd
#