mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Moved variables initialization to the beggining of gateway init, refactoring
This commit is contained in:
@@ -62,7 +62,12 @@ try:
|
||||
except ImportError:
|
||||
print("Cannot load GRPC connector!")
|
||||
|
||||
log: TbLogger = None
|
||||
class GrpcConnector:
|
||||
pass
|
||||
|
||||
class TBGRPCServerManager:
|
||||
pass
|
||||
log: TbLogger = None # type: ignore
|
||||
main_handler = logging.handlers.MemoryHandler(-1)
|
||||
|
||||
DEFAULT_CONNECTORS = {
|
||||
@@ -96,7 +101,6 @@ SECURITY_VAR = ('accessToken', 'caCert', 'privateKey', 'cert', 'clientId', 'user
|
||||
|
||||
|
||||
def load_file(path_to_file):
|
||||
content = None
|
||||
with open(path_to_file, 'r') as target_file:
|
||||
content = load(target_file)
|
||||
return content
|
||||
@@ -190,7 +194,6 @@ class TBGatewayService:
|
||||
DeviceActions.CONNECT: self.add_device,
|
||||
DeviceActions.DISCONNECT: self.del_device
|
||||
}
|
||||
self.__async_device_actions_queue = SimpleQueue()
|
||||
self.__process_async_actions_thread = Thread(target=self.__process_async_device_actions,
|
||||
name="Async device actions processing thread", daemon=True)
|
||||
|
||||
@@ -225,16 +228,7 @@ class TBGatewayService:
|
||||
self.__updates_check_time = 0
|
||||
self.version = self.__updater.get_version()
|
||||
log.info("ThingsBoard IoT gateway version: %s", self.version["current_version"])
|
||||
self.available_connectors_by_name: dict[str, Connector] = {}
|
||||
self.available_connectors_by_id: dict[str, Connector] = {}
|
||||
self.__connector_incoming_messages = {}
|
||||
self.__connected_devices = {}
|
||||
self.__renamed_devices = {}
|
||||
self.__saved_devices = {}
|
||||
self.__events = []
|
||||
self.name = ''.join(choice(ascii_lowercase) for _ in range(64))
|
||||
self.__rpc_register_queue = SimpleQueue()
|
||||
self.__rpc_requests_in_progress = {}
|
||||
connection_logger = logging.getLogger('tb_connection')
|
||||
self.tb_client = TBClient(self.__config["thingsboard"], self._config_dir, connection_logger)
|
||||
try:
|
||||
@@ -253,7 +247,6 @@ class TBGatewayService:
|
||||
log.addHandler(self.remote_handler)
|
||||
# self.main_handler.setTarget(self.remote_handler)
|
||||
self._default_connectors = DEFAULT_CONNECTORS
|
||||
self.__converted_data_queue = SimpleQueue()
|
||||
self.__save_converted_data_thread = Thread(name="Storage fill thread", daemon=True,
|
||||
target=self.__send_to_storage)
|
||||
self.__save_converted_data_thread.start()
|
||||
@@ -275,8 +268,6 @@ class TBGatewayService:
|
||||
|
||||
self.init_remote_shell(self.__config["thingsboard"].get("remoteShell"))
|
||||
|
||||
self.__scheduled_rpc_calls = []
|
||||
self.__rpc_processing_queue = SimpleQueue()
|
||||
self.__rpc_scheduled_methods_functions = {
|
||||
"restart": {"function": execv, "arguments": (executable, [executable.split(pathsep)[-1]] + argv)},
|
||||
"reboot": {"function": subprocess.call, "arguments": (["shutdown", "-r", "-t", "0"],)},
|
||||
@@ -285,7 +276,6 @@ class TBGatewayService:
|
||||
name="RPC processing thread")
|
||||
self.__rpc_processing_thread.start()
|
||||
self._event_storage = self._event_storage_types[self.__config["storage"]["type"]](self.__config["storage"])
|
||||
self.connectors_configs = {}
|
||||
|
||||
self.init_grpc_service(self.__config.get('grpc'))
|
||||
|
||||
@@ -298,8 +288,6 @@ class TBGatewayService:
|
||||
|
||||
self.init_statistics_service(self.__config['thingsboard'].get('statistics', DEFAULT_STATISTIC))
|
||||
|
||||
self._published_events = SimpleQueue()
|
||||
|
||||
self.__min_pack_send_delay_ms = self.__config['thingsboard'].get('minPackSendDelayMS', 200)
|
||||
self.__min_pack_send_delay_ms = self.__min_pack_send_delay_ms / 1000.0
|
||||
self.__min_pack_size_to_send = self.__config['thingsboard'].get('minPackSizeToSend', 50)
|
||||
@@ -360,6 +348,23 @@ class TBGatewayService:
|
||||
self.__rpc_reply_sent = False
|
||||
self.__subscribed_to_rpc_topics = False
|
||||
self.__rpc_remote_shell_command_in_progress = None
|
||||
self.connectors_configs = {}
|
||||
self.__scheduled_rpc_calls = []
|
||||
self.__rpc_requests_in_progress = {}
|
||||
self.available_connectors_by_name: dict[str, Connector] = {}
|
||||
self.available_connectors_by_id: dict[str, Connector] = {}
|
||||
self.__connector_incoming_messages = {}
|
||||
self.__connected_devices = {}
|
||||
self.__renamed_devices = {}
|
||||
self.__saved_devices = {}
|
||||
self.__events = []
|
||||
self.__grpc_connectors = {}
|
||||
|
||||
self._published_events = SimpleQueue()
|
||||
self.__rpc_processing_queue = SimpleQueue()
|
||||
self.__async_device_actions_queue = SimpleQueue()
|
||||
self.__rpc_register_queue = SimpleQueue()
|
||||
self.__converted_data_queue = SimpleQueue()
|
||||
|
||||
@staticmethod
|
||||
def __load_general_config(config_file):
|
||||
@@ -392,7 +397,6 @@ class TBGatewayService:
|
||||
|
||||
def init_grpc_service(self, config):
|
||||
self.__grpc_config = config
|
||||
self.__grpc_connectors = {}
|
||||
if GRPC_LOADED and self.__grpc_config is not None and self.__grpc_config.get("enabled"):
|
||||
self.__process_async_actions_thread.start()
|
||||
self.__grpc_manager = TBGRPCServerManager(self, self.__grpc_config)
|
||||
@@ -632,7 +636,7 @@ class TBGatewayService:
|
||||
if not self.available_connectors_by_name.get(connector_name):
|
||||
raise ValueError
|
||||
|
||||
self.available_connectors_by_name[connector_name].update_converter_config(converter_name, content[key])
|
||||
self.available_connectors_by_name[connector_name].update_converter_config(converter_name, content[key]) # type: ignore
|
||||
except (ValueError, AttributeError, IndexError) as e:
|
||||
log.debug('Failed to process remote converter update: %s', e)
|
||||
|
||||
@@ -733,7 +737,7 @@ class TBGatewayService:
|
||||
if (self.__grpc_connectors.get(connector_key) is not None
|
||||
and self.__grpc_connectors[connector_key]['id'] in self.available_connectors_by_id):
|
||||
connector_id = self.__grpc_connectors[connector_key]['id']
|
||||
target_connector: GrpcConnector = self.available_connectors_by_id.pop(connector_id)
|
||||
target_connector = self.available_connectors_by_id.pop(connector_id)
|
||||
self.__grpc_manager.unregister(Status.SUCCESS, session_id, target_connector)
|
||||
log.info("[%r] GRPC connector with key %s and name %s - unregistered", session_id, connector_key,
|
||||
target_connector.get_name())
|
||||
@@ -743,7 +747,7 @@ class TBGatewayService:
|
||||
else:
|
||||
self.__grpc_manager.unregister(Status.FAILURE, session_id, None)
|
||||
log.error("[%r] GRPC configuration for connector with key: %s - not found and not registered",
|
||||
session_id, connector_key)
|
||||
session_id, connector_key)
|
||||
|
||||
@staticmethod
|
||||
def _generate_persistent_key(connector, connectors_persistent_keys):
|
||||
@@ -1184,7 +1188,7 @@ class TBGatewayService:
|
||||
try:
|
||||
if (self.tb_client.is_connected()
|
||||
and (self.__remote_configurator is None
|
||||
or not self.__remote_configurator.in_process)):
|
||||
or not self.__remote_configurator.in_process)):
|
||||
if self.tb_client.client.quality_of_service == 1:
|
||||
success = event.get() == event.TB_ERR_SUCCESS
|
||||
else:
|
||||
@@ -1259,7 +1263,8 @@ class TBGatewayService:
|
||||
log.debug("Connector \"%s\" for RPC request \"%s\" found", module, content["method"])
|
||||
for connector_name in self.available_connectors_by_name:
|
||||
if self.available_connectors_by_name[connector_name]._connector_type == module:
|
||||
log.debug("Sending command RPC %s to connector %s", content["method"], connector_name)
|
||||
log.debug("Sending command RPC %s to connector %s", content["method"],
|
||||
connector_name)
|
||||
content['id'] = request_id
|
||||
result = self.available_connectors_by_name[connector_name].server_side_rpc_handler(content)
|
||||
elif module == 'gateway' or module in self.__remote_shell.shell_commands:
|
||||
@@ -1288,13 +1293,15 @@ class TBGatewayService:
|
||||
method_to_call = content["method"].replace("gateway_", "")
|
||||
result = None
|
||||
if self.__remote_shell is not None:
|
||||
method_function = self.__remote_shell.shell_commands.get(method_to_call, self.__gateway_rpc_methods.get(method_to_call))
|
||||
method_function = self.__remote_shell.shell_commands.get(method_to_call,
|
||||
self.__gateway_rpc_methods.get(method_to_call))
|
||||
else:
|
||||
log.info("Remote shell is disabled.")
|
||||
method_function = self.__gateway_rpc_methods.get(method_to_call)
|
||||
if method_function is None and method_to_call in self.__rpc_scheduled_methods_functions:
|
||||
seconds_to_restart = arguments * 1000 if arguments and arguments != '{}' else 0
|
||||
self.__scheduled_rpc_calls.append([time() * 1000 + seconds_to_restart, self.__rpc_scheduled_methods_functions[method_to_call]])
|
||||
self.__scheduled_rpc_calls.append([time() * 1000 + seconds_to_restart,
|
||||
self.__rpc_scheduled_methods_functions[method_to_call]])
|
||||
log.info("Gateway %s scheduled in %i seconds", method_to_call, seconds_to_restart / 1000)
|
||||
result = {"success": True}
|
||||
elif method_function is None:
|
||||
@@ -1368,7 +1375,8 @@ class TBGatewayService:
|
||||
if success_sent:
|
||||
rpc_response["success"] = True
|
||||
if device is not None and success_sent is not None:
|
||||
self.tb_client.client.gw_send_rpc_reply(device, req_id, dumps(rpc_response), quality_of_service=quality_of_service)
|
||||
self.tb_client.client.gw_send_rpc_reply(device, req_id, dumps(rpc_response),
|
||||
quality_of_service=quality_of_service)
|
||||
elif device is not None and req_id is not None and content is not None:
|
||||
self.tb_client.client.gw_send_rpc_reply(device, req_id, content, quality_of_service=quality_of_service)
|
||||
elif device is None and success_sent is not None:
|
||||
|
||||
Reference in New Issue
Block a user