From 04d2eaf898cfbe4cca886c662950206771ee3da9 Mon Sep 17 00:00:00 2001 From: samson0v Date: Mon, 19 Dec 2022 11:34:18 +0200 Subject: [PATCH] Fixed AutoProxy and Manager --- thingsboard_gateway/gateway/shell/__init__.py | 0 thingsboard_gateway/gateway/shell/proxy.py | 79 +++++++++++++++++++ .../gateway/{ => shell}/shell.py | 60 +++++--------- .../gateway/tb_gateway_service.py | 25 +++--- 4 files changed, 114 insertions(+), 50 deletions(-) create mode 100644 thingsboard_gateway/gateway/shell/__init__.py create mode 100644 thingsboard_gateway/gateway/shell/proxy.py rename thingsboard_gateway/gateway/{ => shell}/shell.py (79%) diff --git a/thingsboard_gateway/gateway/shell/__init__.py b/thingsboard_gateway/gateway/shell/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/thingsboard_gateway/gateway/shell/proxy.py b/thingsboard_gateway/gateway/shell/proxy.py new file mode 100644 index 00000000..20d50a65 --- /dev/null +++ b/thingsboard_gateway/gateway/shell/proxy.py @@ -0,0 +1,79 @@ +import multiprocessing +from multiprocessing import connection, process +from multiprocessing.managers import dispatch, convert_to_error + + +class GatewayProxy(multiprocessing.managers.BaseProxy): + def _callmethod(self, methodname, args=(), kwds={}): + """ + Try to call a method of the referent and return a copy of the result + """ + self._id = list(self._idset)[0] + self._connect() + conn = self._tls.connection + + conn.send((self._id, methodname, args, kwds)) + kind, result = conn.recv() + + if kind == '#RETURN': + return result + elif kind == '#PROXY': + exposed, token = result + proxytype = self._manager._registry[token.typeid][-1] + token.address = self._token.address + proxy = proxytype( + token, self._serializer, manager=self._manager, + authkey=self._authkey, exposed=exposed + ) + conn = self._Client(token.address, authkey=self._authkey) + dispatch(conn, None, 'decref', (token.id,)) + return proxy + raise convert_to_error(kind, result) + + +def AutoProxy(token, serializer, manager=None, authkey=None, + exposed=None, incref=True, manager_owned=False): + """ + Return an auto-proxy for `token` + """ + _Client = connection.Client + + if exposed is None: + conn = _Client(token.address, authkey=authkey) + try: + exposed = dispatch(conn, None, 'get_methods', (token,)) + finally: + conn.close() + + if authkey is None and manager is not None: + authkey = manager._authkey + if authkey is None: + authkey = process.current_process().authkey + + ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) + proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, + incref=incref, manager_owned=manager_owned) + proxy._isauto = True + return proxy + + +def MakeProxyType(name, exposed, _cache={}): + """ + Return a proxy type whose methods are given by `exposed` + """ + exposed = tuple(exposed) + try: + return _cache[(name, exposed)] + except KeyError: + pass + + dic = {} + + for meth in exposed: + exec('''def %s(self, /, *args, **kwds): + return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) + + ProxyType = type(name, (GatewayProxy,), dic) + ProxyType._exposed_ = exposed + _cache[(name, exposed)] = ProxyType + return ProxyType \ No newline at end of file diff --git a/thingsboard_gateway/gateway/shell.py b/thingsboard_gateway/gateway/shell/shell.py similarity index 79% rename from thingsboard_gateway/gateway/shell.py rename to thingsboard_gateway/gateway/shell/shell.py index 4cefa946..a70cfd78 100644 --- a/thingsboard_gateway/gateway/shell.py +++ b/thingsboard_gateway/gateway/shell/shell.py @@ -6,7 +6,7 @@ from re import findall from threading import Thread from time import sleep -# from thingsboard_gateway.gateway.shell_manager import ShellManager +from thingsboard_gateway.gateway.shell.proxy import AutoProxy from thingsboard_gateway.gateway.tb_gateway_service import GatewayManager @@ -25,21 +25,6 @@ class Shell(cmd.Cmd, Thread): self.stdout.write('Gateway Shell\n') self.stdout.write('=============\n') - # self.gateway_manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway') - # try: - # self.gateway_manager.connect() - # GatewayManager.register('get_gateway') - # except (FileNotFoundError, AssertionError): - # self.stdout.write('Gateway is not running!\n') - # return - - # self.connected = False - # self.gateway = None - # try: - # self.connect() - # except Exception as e: - # self.stdout.write(e.__str__()) - self.gateway_manager, self.gateway = self.create_manager() if self.gateway_manager is None or self.gateway is None: self.stdout.write('Gateway is not running!\n') @@ -63,10 +48,12 @@ class Shell(cmd.Cmd, Thread): }, { 'arg': ('-s', '--status'), + 'val_required': True, 'func': self.gateway.get_connector_status }, { 'arg': ('-c', '--config'), + 'val_required': True, 'func': self.gateway.get_connector_config } ], @@ -94,7 +81,7 @@ class Shell(cmd.Cmd, Thread): try: gateway_manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway') gateway_manager.connect() - GatewayManager.register('get_gateway') + GatewayManager.register('get_gateway', proxytype=AutoProxy) GatewayManager.register('gateway') gateway = self.connect(gateway_manager) except (FileNotFoundError, AssertionError): @@ -151,9 +138,12 @@ class Shell(cmd.Cmd, Thread): opt = input_arg[0] if opt in arg['arg']: + if arg.get('val_required') and not val: + continue + return arg['func'](val.strip()) if val else arg['func']() - raise ShellSyntaxError() + raise ShellSyntaxError() def _print(self, *args): for arg in args: @@ -165,30 +155,29 @@ class Shell(cmd.Cmd, Thread): else: self.stdout.write(str(arg) + '\n') + def wrapper(self, arg, label, config): + try: + self._print(self.parser(arg, config)) + except ShellSyntaxError as e: + self.stdout.write(e.__str__() + ' (see `help ' + label + '`)\n') + self.do_help(label) + except (FileNotFoundError, AttributeError, BrokenPipeError, multiprocessing.managers.RemoteError): + self.stdout.write('Connection is lost...\n') + self.gateway_manager, self.gateway = self.create_manager() + # --- def do_gateway(self, arg): """Gateway -s/--status: get gateway status """ - try: - self._print(self.parser(arg, self.command_config['gateway'])) - except ShellSyntaxError as e: - self.stdout.write(e.__str__() + ' (see `help gateway`)\n') - self.do_help('gateway') - except (FileNotFoundError, AttributeError, BrokenPipeError): - self.stdout.write('Connection is lost...\n') - self.gateway_manager, self.gateway = self.create_manager() + self.wrapper(arg, 'gateway', self.command_config['gateway']) def do_storage(self, arg): """Storage -n/--name: name of storage -c/--count: events in storage """ - try: - self._print(self.parser(arg, self.command_config['storage'])) - except ShellSyntaxError as e: - self.stdout.write(e.__str__() + ' (see `help storage`)\n') - self.do_help('storage') + self.wrapper(arg, 'storage', self.command_config['storage']) def do_connector(self, arg): """Connector @@ -196,14 +185,7 @@ class Shell(cmd.Cmd, Thread): -s/--status : get connector status -c/--config : get connector config """ - try: - self._print(self.parser(arg, self.command_config['connector'])) - except ShellSyntaxError as e: - self.stdout.write(e.__str__() + ' (see `help connector`)\n') - self.do_help('connector') - except (FileNotFoundError, AttributeError): - self.stdout.write('Connection is lost...\n') - self.connect() + self.wrapper(arg, 'connector', self.command_config['connector']) # --- @staticmethod diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index fbf8678f..ef3838f6 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -35,7 +35,7 @@ from thingsboard_gateway.gateway.constant_enums import DeviceActions, Status from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME, CONNECTOR_PARAMETER, \ PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME from thingsboard_gateway.gateway.duplicate_detector import DuplicateDetector -# from thingsboard_gateway.gateway.shell_manager import ShellManager +from thingsboard_gateway.gateway.shell.proxy import AutoProxy from thingsboard_gateway.gateway.statistics_service import StatisticsService from thingsboard_gateway.gateway.tb_client import TBClient from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage @@ -138,7 +138,6 @@ class TBGatewayService: 'get_storage_events_count', 'get_available_connectors', 'get_connector_status', - 'get_connector_name', 'get_connector_config' ] @@ -291,7 +290,8 @@ class TBGatewayService: self._watchers_thread.start() self.manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway') - GatewayManager.register('get_gateway', self.get_gateway, exposed=self.EXPOSED_GETTERS) + GatewayManager.register('get_gateway', self.get_gateway, proxytype=AutoProxy, exposed=self.EXPOSED_GETTERS, + create_method=False) self.server = self.manager.get_server() self.server.serve_forever() @@ -300,7 +300,7 @@ class TBGatewayService: return self.manager.gateway else: self.manager.add_gateway(self) - self.manager.register('gateway', lambda: self) + self.manager.register('gateway', lambda: self, proxytype=AutoProxy) def _watchers(self): try: @@ -412,8 +412,6 @@ class TBGatewayService: log.info("The gateway has been stopped.") self.tb_client.disconnect() self.tb_client.stop() - # self.shell_manager.connect() - # self.shell_manager.gateway_stopped() self.manager.shutdown() def __init_remote_configuration(self, force=False): @@ -1321,13 +1319,18 @@ class TBGatewayService: return {num + 1: name for (num, name) in enumerate(self.available_connectors)} def get_connector_status(self, name): - return {'connected': self.available_connectors[name].is_connected()} - - def get_connector_name(self, name): - return self.available_connectors[name].get_name() + try: + connector = self.available_connectors[name] + return {'connected': connector.is_connected()} + except KeyError: + return f'Connector {name} not found!' def get_connector_config(self, name): - return self.available_connectors[name].get_config() + try: + connector = self.available_connectors[name] + return connector.get_config() + except KeyError: + return f'Connector {name} not found!' # Gateway ---------------------- def get_status(self):