From 425dabaab4237e3e5a203fc830464111e4205726 Mon Sep 17 00:00:00 2001 From: samson0v Date: Wed, 14 Dec 2022 11:37:32 +0200 Subject: [PATCH] Updated Shell --- thingsboard_gateway/gateway/shell.py | 209 +++++++++++++----- .../gateway/tb_gateway_service.py | 10 +- 2 files changed, 161 insertions(+), 58 deletions(-) diff --git a/thingsboard_gateway/gateway/shell.py b/thingsboard_gateway/gateway/shell.py index aaacc945..4cefa946 100644 --- a/thingsboard_gateway/gateway/shell.py +++ b/thingsboard_gateway/gateway/shell.py @@ -1,16 +1,18 @@ import cmd -import multiprocessing.managers -import signal import sys +import signal +import multiprocessing.managers +from re import findall from threading import Thread from time import sleep -from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService +# from thingsboard_gateway.gateway.shell_manager import ShellManager from thingsboard_gateway.gateway.tb_gateway_service import GatewayManager class ShellSyntaxError(Exception): - pass + def __str__(self): + return 'Invalid syntax' class Shell(cmd.Cmd, Thread): @@ -20,28 +22,61 @@ class Shell(cmd.Cmd, Thread): cmd.Cmd.__init__(self, stdin=stdin, stdout=stdout) Thread.__init__(self, name='Gateway Shell', daemon=False) - self.manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway') - try: - self.manager.connect() - GatewayManager.register('get_gateway') - self.gateway: TBGatewayService = self.manager.get_gateway() - except (FileNotFoundError, AssertionError): + self.stdout.write('Gateway Shell\n') + self.stdout.write('=============\n') + + # self.gateway_manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway') + # try: + # self.gateway_manager.connect() + # GatewayManager.register('get_gateway') + # except (FileNotFoundError, AssertionError): + # self.stdout.write('Gateway is not running!\n') + # return + + # self.connected = False + # self.gateway = None + # try: + # self.connect() + # except Exception as e: + # self.stdout.write(e.__str__()) + + self.gateway_manager, self.gateway = self.create_manager() + if self.gateway_manager is None or self.gateway is None: self.stdout.write('Gateway is not running!\n') return - self.connected = False - self.stdout.write('Connecting to Gateway') - while not self.connected: - try: - self.gateway.ping() - self.stdout.write('\n') - self.connected = True - self.stdout.write('Connected to Gateway\n') - except (multiprocessing.managers.RemoteError, KeyError, AttributeError): - self.stdout.write('.') - self.gateway = self.manager.get_gateway() - self.manager.connect() - sleep(1) + self.command_config = { + 'storage': [ + { + 'arg': ('-n', '--name'), + 'func': self.gateway.get_storage_name + }, + { + 'arg': ('-c', '--count'), + 'func': self.gateway.get_storage_events_count + } + ], + 'connector': [ + { + 'arg': ('-l', '--list'), + 'func': self.gateway.get_available_connectors + }, + { + 'arg': ('-s', '--status'), + 'func': self.gateway.get_connector_status + }, + { + 'arg': ('-c', '--config'), + 'func': self.gateway.get_connector_config + } + ], + 'gateway': [ + { + 'arg': ('-s', '--status'), + 'func': self.gateway.get_status + } + ] + } self.interactive = sys.__stdin__.isatty() @@ -50,72 +85,130 @@ class Shell(cmd.Cmd, Thread): self.start() + def create_manager(self, timeout=5): + elapsed_time = 0 + gateway_manager = None + gateway = None + self.stdout.write('Establish connection...\n') + while elapsed_time <= timeout: + try: + gateway_manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway') + gateway_manager.connect() + GatewayManager.register('get_gateway') + GatewayManager.register('gateway') + gateway = self.connect(gateway_manager) + except (FileNotFoundError, AssertionError): + sleep(1) + continue + else: + break + finally: + elapsed_time += 1 + + return gateway_manager, gateway + + def connect(self, manager, timeout=5): + elapsed_time = 0 + gateway = None + self.connected = False + self.stdout.write('Connecting to Gateway') + while not self.connected and elapsed_time <= timeout: + try: + gateway = manager.get_gateway() + gateway.ping() + self.connected = True + except (multiprocessing.managers.RemoteError, KeyError, AttributeError, BrokenPipeError): + self.stdout.write('.') + gateway = manager.get_gateway() + self.connected = True + sleep(1) + except FileNotFoundError: + self.stdout.write('.') + sleep(1) + continue + finally: + elapsed_time += 1 + + if not self.connected: + raise RuntimeError('Timeout error') + + self.stdout.write('\n') + return gateway + def run(self): self.cmdloop() @staticmethod - def parser(arg: str, args): - input_args = arg.strip().split(' ') - opt, val = None, None - try: - (opt, val) = input_args - except ValueError: - opt = input_args[0] + def parser(arg, args): + input_args = findall(r'(-{1,2}[A-Za-z]+)[?^\s]?([A-Za-z\d\s]*)', arg.strip()) - try: - for keys, func in args.items(): - if opt in keys: - return func(val) if val else func() - except Exception as _: - raise ShellSyntaxError('Invalid syntax (see `help storage`)') + for input_arg in input_args: + for arg in args: + opt, val = None, None + try: + (opt, val) = input_arg + except ValueError: + opt = input_arg[0] + + if opt in arg['arg']: + return arg['func'](val.strip()) if val else arg['func']() + + raise ShellSyntaxError() def _print(self, *args): for arg in args: if isinstance(arg, list): - self.stdout.write(','.join(arg)) + self.stdout.write(','.join(arg) + '\n') elif isinstance(arg, dict): for key, value in arg.items(): self.stdout.write(str(key).capitalize() + ': ' + str(value) + '\n') else: - self.stdout.write(arg + '\n') + self.stdout.write(str(arg) + '\n') + + # --- + def do_gateway(self, arg): + """Gateway + -s/--status: get gateway status + """ + try: + self._print(self.parser(arg, self.command_config['gateway'])) + except ShellSyntaxError as e: + self.stdout.write(e.__str__() + ' (see `help gateway`)\n') + self.do_help('gateway') + except (FileNotFoundError, AttributeError, BrokenPipeError): + self.stdout.write('Connection is lost...\n') + self.gateway_manager, self.gateway = self.create_manager() def do_storage(self, arg): """Storage -n/--name: name of storage -c/--count: events in storage """ - args = { - ('-n', '--name'): self.gateway.get_storage_name, - ('-c', '--count'): self.gateway.get_storage_events_count - } try: - self._print(self.parser(arg, args)) + self._print(self.parser(arg, self.command_config['storage'])) except ShellSyntaxError as e: - self.stdout.write(e.__str__() + '\n') + self.stdout.write(e.__str__() + ' (see `help storage`)\n') + self.do_help('storage') def do_connector(self, arg): """Connector - -a/--available: list of available connectors + -l/--list: list of available connectors -s/--status : get connector status - -n/--name : get connector name - -c/--config : + -c/--config : get connector config """ - args = { - ('-a', '--available'): self.gateway.get_available_connectors, - ('-s', '--status'): self.gateway.get_connector_status, - ('-n', '--name'): self.gateway.get_connector_name, - ('-c', '--config'): self.gateway.get_connector_config - } - try: - self._print(self.parser(arg, args)) + self._print(self.parser(arg, self.command_config['connector'])) except ShellSyntaxError as e: - self.stdout.write(e.__str__() + '\n') + self.stdout.write(e.__str__() + ' (see `help connector`)\n') + self.do_help('connector') + except (FileNotFoundError, AttributeError): + self.stdout.write('Connection is lost...\n') + self.connect() # --- @staticmethod def interrupt(_, __): - sys.stderr.write("\nKeyboard interrupt trapped - use EOF to end\n") + sys.stderr.write("\nKeyboard interrupt trapped - use Ctrl+d or Cmd+d to end\n") def precmd(self, line: str) -> str: return line.strip() @@ -128,9 +221,11 @@ class Shell(cmd.Cmd, Thread): @staticmethod def do_exit(_): + """Exit""" return -1 def do_EOF(self, args): + """EOF""" return self.do_exit(args) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 48ab4f6e..7cdc19cb 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -35,6 +35,7 @@ from thingsboard_gateway.gateway.constant_enums import DeviceActions, Status from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME, CONNECTOR_PARAMETER, \ PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME from thingsboard_gateway.gateway.duplicate_detector import DuplicateDetector +# from thingsboard_gateway.gateway.shell_manager import ShellManager from thingsboard_gateway.gateway.statistics_service import StatisticsService from thingsboard_gateway.gateway.tb_client import TBClient from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage @@ -132,6 +133,7 @@ class GatewayManager(multiprocessing.managers.BaseManager): class TBGatewayService: EXPOSED_GETTERS = [ 'ping', + 'get_status', 'get_storage_name', 'get_storage_events_count', 'get_available_connectors', @@ -414,6 +416,8 @@ class TBGatewayService: log.info("The gateway has been stopped.") self.tb_client.disconnect() self.tb_client.stop() + # self.shell_manager.connect() + # self.shell_manager.gateway_stopped() self.manager.shutdown() def __init_remote_configuration(self, force=False): @@ -1318,7 +1322,7 @@ class TBGatewayService: # Connectors ----------------- def get_available_connectors(self): - return [con for con in self.available_connectors] + return {num + 1: name for (num, name) in enumerate(self.available_connectors)} def get_connector_status(self, name): return {'connected': self.available_connectors[name].is_connected()} @@ -1329,6 +1333,10 @@ class TBGatewayService: def get_connector_config(self, name): return self.available_connectors[name].get_config() + # Gateway ---------------------- + def get_status(self): + return {'connected': self.tb_client.is_connected()} + if __name__ == '__main__': TBGatewayService(