1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Merge pull request #1017 from samson0v/feature/shell

Added Gateway Shell
This commit is contained in:
Illia Barkov
2023-01-10 10:47:32 +02:00
committed by GitHub
24 changed files with 428 additions and 0 deletions

View File

@@ -0,0 +1,79 @@
import multiprocessing
from multiprocessing import connection, process
from multiprocessing.managers import dispatch, convert_to_error
class GatewayProxy(multiprocessing.managers.BaseProxy):
def _callmethod(self, methodname, args=(), kwds={}):
"""
Try to call a method of the referent and return a copy of the result
"""
self._id = list(self._idset)[0]
self._connect()
conn = self._tls.connection
conn.send((self._id, methodname, args, kwds))
kind, result = conn.recv()
if kind == '#RETURN':
return result
elif kind == '#PROXY':
exposed, token = result
proxytype = self._manager._registry[token.typeid][-1]
token.address = self._token.address
proxy = proxytype(
token, self._serializer, manager=self._manager,
authkey=self._authkey, exposed=exposed
)
conn = self._Client(token.address, authkey=self._authkey)
dispatch(conn, None, 'decref', (token.id,))
return proxy
raise convert_to_error(kind, result)
def AutoProxy(token, serializer, manager=None, authkey=None,
exposed=None, incref=True, manager_owned=False):
"""
Return an auto-proxy for `token`
"""
_Client = connection.Client
if exposed is None:
conn = _Client(token.address, authkey=authkey)
try:
exposed = dispatch(conn, None, 'get_methods', (token,))
finally:
conn.close()
if authkey is None and manager is not None:
authkey = manager._authkey
if authkey is None:
authkey = process.current_process().authkey
ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
incref=incref, manager_owned=manager_owned)
proxy._isauto = True
return proxy
def MakeProxyType(name, exposed, _cache={}):
"""
Return a proxy type whose methods are given by `exposed`
"""
exposed = tuple(exposed)
try:
return _cache[(name, exposed)]
except KeyError:
pass
dic = {}
for meth in exposed:
exec('''def %s(self, /, *args, **kwds):
return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
ProxyType = type(name, (GatewayProxy,), dic)
ProxyType._exposed_ = exposed
_cache[(name, exposed)] = ProxyType
return ProxyType

View File

@@ -0,0 +1,215 @@
import cmd
import sys
import signal
import multiprocessing.managers
from re import findall
from threading import Thread
from time import sleep
from thingsboard_gateway.gateway.shell.proxy import AutoProxy
from thingsboard_gateway.gateway.tb_gateway_service import GatewayManager
class ShellSyntaxError(Exception):
def __str__(self):
return 'Invalid syntax'
class Shell(cmd.Cmd, Thread):
prompt = '(gateway) |> '
def __init__(self, stdin=None, stdout=None):
cmd.Cmd.__init__(self, stdin=stdin, stdout=stdout)
Thread.__init__(self, name='Gateway Shell', daemon=False)
self.stdout.write('Gateway Shell\n')
self.stdout.write('=============\n')
self.gateway_manager, self.gateway = self.create_manager()
if self.gateway_manager is None or self.gateway is None:
self.stdout.write('Gateway is not running!\n')
return
self.command_config = {
'storage': [
{
'arg': ('-n', '--name'),
'func': self.gateway.get_storage_name
},
{
'arg': ('-c', '--count'),
'func': self.gateway.get_storage_events_count
}
],
'connector': [
{
'arg': ('-l', '--list'),
'func': self.gateway.get_available_connectors
},
{
'arg': ('-s', '--status'),
'val_required': True,
'func': self.gateway.get_connector_status
},
{
'arg': ('-c', '--config'),
'val_required': True,
'func': self.gateway.get_connector_config
}
],
'gateway': [
{
'arg': ('-s', '--status'),
'func': self.gateway.get_status
}
]
}
self.interactive = sys.__stdin__.isatty()
if hasattr(signal, 'SIGINT'):
signal.signal(signal.SIGINT, self.interrupt)
self.start()
def create_manager(self, timeout=5):
elapsed_time = 0
gateway_manager = None
gateway = None
self.stdout.write('Establish connection...\n')
while elapsed_time <= timeout:
try:
gateway_manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway')
gateway_manager.connect()
GatewayManager.register('get_gateway', proxytype=AutoProxy)
GatewayManager.register('gateway')
gateway = self.connect(gateway_manager)
except (FileNotFoundError, AssertionError):
sleep(1)
continue
else:
break
finally:
elapsed_time += 1
return gateway_manager, gateway
def connect(self, manager, timeout=5):
elapsed_time = 0
gateway = None
self.connected = False
self.stdout.write('Connecting to Gateway')
while not self.connected and elapsed_time <= timeout:
try:
gateway = manager.get_gateway()
gateway.ping()
self.connected = True
except (multiprocessing.managers.RemoteError, KeyError, AttributeError, BrokenPipeError):
self.stdout.write('.')
gateway = manager.get_gateway()
self.connected = True
sleep(1)
except FileNotFoundError:
self.stdout.write('.')
sleep(1)
continue
finally:
elapsed_time += 1
if not self.connected:
raise RuntimeError('Timeout error')
self.stdout.write('\n')
return gateway
def run(self):
self.cmdloop()
@staticmethod
def parser(arg, args):
input_args = findall(r'(-{1,2}[A-Za-z]+)[?^\s]?([A-Za-z\d\s]*)', arg.strip())
for input_arg in input_args:
for arg in args:
opt, val = None, None
try:
(opt, val) = input_arg
except ValueError:
opt = input_arg[0]
if opt in arg['arg']:
if arg.get('val_required') and not val:
continue
return arg['func'](val.strip()) if val else arg['func']()
raise ShellSyntaxError()
def _print(self, *args):
for arg in args:
if isinstance(arg, list):
self.stdout.write(','.join(arg) + '\n')
elif isinstance(arg, dict):
for key, value in arg.items():
self.stdout.write(str(key).capitalize() + ': ' + str(value) + '\n')
else:
self.stdout.write(str(arg) + '\n')
def wrapper(self, arg, label, config):
try:
self._print(self.parser(arg, config))
except ShellSyntaxError as e:
self.stdout.write(e.__str__() + ' (see `help ' + label + '`)\n')
self.do_help(label)
except (FileNotFoundError, AttributeError, BrokenPipeError, multiprocessing.managers.RemoteError):
self.stdout.write('Connection is lost...\n')
self.gateway_manager, self.gateway = self.create_manager()
# ---
def do_gateway(self, arg):
"""Gateway
-s/--status: get gateway status
"""
self.wrapper(arg, 'gateway', self.command_config['gateway'])
def do_storage(self, arg):
"""Storage
-n/--name: name of storage
-c/--count: events in storage
"""
self.wrapper(arg, 'storage', self.command_config['storage'])
def do_connector(self, arg):
"""Connector
-l/--list: list of available connectors
-s/--status <name>: get connector status
-c/--config <name>: get connector config
"""
self.wrapper(arg, 'connector', self.command_config['connector'])
# ---
@staticmethod
def interrupt(_, __):
sys.stderr.write("\nKeyboard interrupt trapped - use Ctrl+d or Cmd+d to end\n")
def precmd(self, line: str) -> str:
return line.strip()
def postcmd(self, stop: bool, line: str) -> bool:
return stop
def emptyline(self) -> bool:
pass
@staticmethod
def do_exit(_):
"""Exit"""
return -1
def do_EOF(self, args):
"""EOF"""
return self.do_exit(args)
if __name__ == '__main__':
shell = Shell()

View File

@@ -15,6 +15,7 @@
import logging
import logging.config
import logging.handlers
import multiprocessing.managers
from signal import signal, SIGINT
import subprocess
from copy import deepcopy
@@ -35,6 +36,7 @@ from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME, CO
PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME
from thingsboard_gateway.gateway.device_filter import DeviceFilter
from thingsboard_gateway.gateway.duplicate_detector import DuplicateDetector
from thingsboard_gateway.gateway.shell.proxy import AutoProxy
from thingsboard_gateway.gateway.statistics_service import StatisticsService
from thingsboard_gateway.gateway.tb_client import TBClient
from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage
@@ -121,7 +123,29 @@ def get_env_variables():
return converted_env_variables
class GatewayManager(multiprocessing.managers.BaseManager):
def __init__(self, address=None, authkey=b''):
super().__init__(address=address, authkey=authkey)
self.gateway = None
def has_gateway(self):
return self.gateway is not None
def add_gateway(self, gateway):
self.gateway = gateway
class TBGatewayService:
EXPOSED_GETTERS = [
'ping',
'get_status',
'get_storage_name',
'get_storage_events_count',
'get_available_connectors',
'get_connector_status',
'get_connector_config'
]
def __init__(self, config_file=None):
signal(SIGINT, lambda _, __: self.__stop_gateway())
@@ -276,6 +300,19 @@ class TBGatewayService:
self._watchers_thread = Thread(target=self._watchers, name='Watchers', daemon=True)
self._watchers_thread.start()
self.manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway')
GatewayManager.register('get_gateway', self.get_gateway, proxytype=AutoProxy, exposed=self.EXPOSED_GETTERS,
create_method=False)
self.server = self.manager.get_server()
self.server.serve_forever()
def get_gateway(self):
if self.manager.has_gateway():
return self.manager.gateway
else:
self.manager.add_gateway(self)
self.manager.register('gateway', lambda: self, proxytype=AutoProxy)
def _watchers(self):
try:
gateway_statistic_send = 0
@@ -386,6 +423,7 @@ class TBGatewayService:
log.info("The gateway has been stopped.")
self.tb_client.disconnect()
self.tb_client.stop()
self.manager.shutdown()
def __init_remote_configuration(self, force=False):
if (self.__config["thingsboard"].get("remoteConfiguration") or force) and self.__remote_configurator is None:
@@ -1283,6 +1321,40 @@ class TBGatewayService:
sleep(check_devices_idle_every_sec)
# GETTERS --------------------
def ping(self):
return self.name
# ----------------------------
# Storage --------------------
def get_storage_name(self):
return self._event_storage.__class__.__name__
def get_storage_events_count(self):
return self._event_storage.len()
# Connectors -----------------
def get_available_connectors(self):
return {num + 1: name for (num, name) in enumerate(self.available_connectors)}
def get_connector_status(self, name):
try:
connector = self.available_connectors[name]
return {'connected': connector.is_connected()}
except KeyError:
return f'Connector {name} not found!'
def get_connector_config(self, name):
try:
connector = self.available_connectors[name]
return connector.get_config()
except KeyError:
return f'Connector {name} not found!'
# Gateway ----------------------
def get_status(self):
return {'connected': self.tb_client.is_connected()}
if __name__ == '__main__':
TBGatewayService(