1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Fixed AutoProxy and Manager

This commit is contained in:
samson0v
2022-12-19 11:34:18 +02:00
parent c4c8e6eef8
commit 04d2eaf898
4 changed files with 114 additions and 50 deletions

View File

@@ -0,0 +1,79 @@
import multiprocessing
from multiprocessing import connection, process
from multiprocessing.managers import dispatch, convert_to_error
class GatewayProxy(multiprocessing.managers.BaseProxy):
def _callmethod(self, methodname, args=(), kwds={}):
"""
Try to call a method of the referent and return a copy of the result
"""
self._id = list(self._idset)[0]
self._connect()
conn = self._tls.connection
conn.send((self._id, methodname, args, kwds))
kind, result = conn.recv()
if kind == '#RETURN':
return result
elif kind == '#PROXY':
exposed, token = result
proxytype = self._manager._registry[token.typeid][-1]
token.address = self._token.address
proxy = proxytype(
token, self._serializer, manager=self._manager,
authkey=self._authkey, exposed=exposed
)
conn = self._Client(token.address, authkey=self._authkey)
dispatch(conn, None, 'decref', (token.id,))
return proxy
raise convert_to_error(kind, result)
def AutoProxy(token, serializer, manager=None, authkey=None,
exposed=None, incref=True, manager_owned=False):
"""
Return an auto-proxy for `token`
"""
_Client = connection.Client
if exposed is None:
conn = _Client(token.address, authkey=authkey)
try:
exposed = dispatch(conn, None, 'get_methods', (token,))
finally:
conn.close()
if authkey is None and manager is not None:
authkey = manager._authkey
if authkey is None:
authkey = process.current_process().authkey
ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
incref=incref, manager_owned=manager_owned)
proxy._isauto = True
return proxy
def MakeProxyType(name, exposed, _cache={}):
"""
Return a proxy type whose methods are given by `exposed`
"""
exposed = tuple(exposed)
try:
return _cache[(name, exposed)]
except KeyError:
pass
dic = {}
for meth in exposed:
exec('''def %s(self, /, *args, **kwds):
return self._callmethod(%r, args, kwds)''' % (meth, meth), dic)
ProxyType = type(name, (GatewayProxy,), dic)
ProxyType._exposed_ = exposed
_cache[(name, exposed)] = ProxyType
return ProxyType

View File

@@ -6,7 +6,7 @@ from re import findall
from threading import Thread
from time import sleep
# from thingsboard_gateway.gateway.shell_manager import ShellManager
from thingsboard_gateway.gateway.shell.proxy import AutoProxy
from thingsboard_gateway.gateway.tb_gateway_service import GatewayManager
@@ -25,21 +25,6 @@ class Shell(cmd.Cmd, Thread):
self.stdout.write('Gateway Shell\n')
self.stdout.write('=============\n')
# self.gateway_manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway')
# try:
# self.gateway_manager.connect()
# GatewayManager.register('get_gateway')
# except (FileNotFoundError, AssertionError):
# self.stdout.write('Gateway is not running!\n')
# return
# self.connected = False
# self.gateway = None
# try:
# self.connect()
# except Exception as e:
# self.stdout.write(e.__str__())
self.gateway_manager, self.gateway = self.create_manager()
if self.gateway_manager is None or self.gateway is None:
self.stdout.write('Gateway is not running!\n')
@@ -63,10 +48,12 @@ class Shell(cmd.Cmd, Thread):
},
{
'arg': ('-s', '--status'),
'val_required': True,
'func': self.gateway.get_connector_status
},
{
'arg': ('-c', '--config'),
'val_required': True,
'func': self.gateway.get_connector_config
}
],
@@ -94,7 +81,7 @@ class Shell(cmd.Cmd, Thread):
try:
gateway_manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway')
gateway_manager.connect()
GatewayManager.register('get_gateway')
GatewayManager.register('get_gateway', proxytype=AutoProxy)
GatewayManager.register('gateway')
gateway = self.connect(gateway_manager)
except (FileNotFoundError, AssertionError):
@@ -151,9 +138,12 @@ class Shell(cmd.Cmd, Thread):
opt = input_arg[0]
if opt in arg['arg']:
if arg.get('val_required') and not val:
continue
return arg['func'](val.strip()) if val else arg['func']()
raise ShellSyntaxError()
raise ShellSyntaxError()
def _print(self, *args):
for arg in args:
@@ -165,30 +155,29 @@ class Shell(cmd.Cmd, Thread):
else:
self.stdout.write(str(arg) + '\n')
def wrapper(self, arg, label, config):
try:
self._print(self.parser(arg, config))
except ShellSyntaxError as e:
self.stdout.write(e.__str__() + ' (see `help ' + label + '`)\n')
self.do_help(label)
except (FileNotFoundError, AttributeError, BrokenPipeError, multiprocessing.managers.RemoteError):
self.stdout.write('Connection is lost...\n')
self.gateway_manager, self.gateway = self.create_manager()
# ---
def do_gateway(self, arg):
"""Gateway
-s/--status: get gateway status
"""
try:
self._print(self.parser(arg, self.command_config['gateway']))
except ShellSyntaxError as e:
self.stdout.write(e.__str__() + ' (see `help gateway`)\n')
self.do_help('gateway')
except (FileNotFoundError, AttributeError, BrokenPipeError):
self.stdout.write('Connection is lost...\n')
self.gateway_manager, self.gateway = self.create_manager()
self.wrapper(arg, 'gateway', self.command_config['gateway'])
def do_storage(self, arg):
"""Storage
-n/--name: name of storage
-c/--count: events in storage
"""
try:
self._print(self.parser(arg, self.command_config['storage']))
except ShellSyntaxError as e:
self.stdout.write(e.__str__() + ' (see `help storage`)\n')
self.do_help('storage')
self.wrapper(arg, 'storage', self.command_config['storage'])
def do_connector(self, arg):
"""Connector
@@ -196,14 +185,7 @@ class Shell(cmd.Cmd, Thread):
-s/--status <name>: get connector status
-c/--config <name>: get connector config
"""
try:
self._print(self.parser(arg, self.command_config['connector']))
except ShellSyntaxError as e:
self.stdout.write(e.__str__() + ' (see `help connector`)\n')
self.do_help('connector')
except (FileNotFoundError, AttributeError):
self.stdout.write('Connection is lost...\n')
self.connect()
self.wrapper(arg, 'connector', self.command_config['connector'])
# ---
@staticmethod

View File

@@ -35,7 +35,7 @@ from thingsboard_gateway.gateway.constant_enums import DeviceActions, Status
from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME, CONNECTOR_PARAMETER, \
PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME
from thingsboard_gateway.gateway.duplicate_detector import DuplicateDetector
# from thingsboard_gateway.gateway.shell_manager import ShellManager
from thingsboard_gateway.gateway.shell.proxy import AutoProxy
from thingsboard_gateway.gateway.statistics_service import StatisticsService
from thingsboard_gateway.gateway.tb_client import TBClient
from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage
@@ -138,7 +138,6 @@ class TBGatewayService:
'get_storage_events_count',
'get_available_connectors',
'get_connector_status',
'get_connector_name',
'get_connector_config'
]
@@ -291,7 +290,8 @@ class TBGatewayService:
self._watchers_thread.start()
self.manager = GatewayManager(address='/tmp/gateway', authkey=b'gateway')
GatewayManager.register('get_gateway', self.get_gateway, exposed=self.EXPOSED_GETTERS)
GatewayManager.register('get_gateway', self.get_gateway, proxytype=AutoProxy, exposed=self.EXPOSED_GETTERS,
create_method=False)
self.server = self.manager.get_server()
self.server.serve_forever()
@@ -300,7 +300,7 @@ class TBGatewayService:
return self.manager.gateway
else:
self.manager.add_gateway(self)
self.manager.register('gateway', lambda: self)
self.manager.register('gateway', lambda: self, proxytype=AutoProxy)
def _watchers(self):
try:
@@ -412,8 +412,6 @@ class TBGatewayService:
log.info("The gateway has been stopped.")
self.tb_client.disconnect()
self.tb_client.stop()
# self.shell_manager.connect()
# self.shell_manager.gateway_stopped()
self.manager.shutdown()
def __init_remote_configuration(self, force=False):
@@ -1321,13 +1319,18 @@ class TBGatewayService:
return {num + 1: name for (num, name) in enumerate(self.available_connectors)}
def get_connector_status(self, name):
return {'connected': self.available_connectors[name].is_connected()}
def get_connector_name(self, name):
return self.available_connectors[name].get_name()
try:
connector = self.available_connectors[name]
return {'connected': connector.is_connected()}
except KeyError:
return f'Connector {name} not found!'
def get_connector_config(self, name):
return self.available_connectors[name].get_config()
try:
connector = self.available_connectors[name]
return connector.get_config()
except KeyError:
return f'Connector {name} not found!'
# Gateway ----------------------
def get_status(self):