diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 50e51aaf..e1f7fd73 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -16,17 +16,16 @@ import logging import logging.config import logging.handlers import multiprocessing.managers -from signal import signal, SIGINT import subprocess -from copy import deepcopy from os import execv, listdir, path, pathsep, stat, system, environ +from platform import system as platform_system from queue import SimpleQueue from random import choice +from signal import signal, SIGINT from string import ascii_lowercase, hexdigits from sys import argv, executable, getsizeof from threading import RLock, Thread from time import sleep, time -from platform import system as platform_system import simplejson from simplejson import JSONDecodeError, dumps, load, loads @@ -381,8 +380,7 @@ class TBGatewayService: self.__request_config_after_connect = True self.__check_shared_attributes() - if cur_time - gateway_statistic_send > self.__statistics[ - 'statsSendPeriodInSeconds'] * 1000 and self.tb_client.is_connected(): + if cur_time - gateway_statistic_send > self.__statistics['statsSendPeriodInSeconds'] * 1000 and self.tb_client.is_connected(): summary_messages = self.__form_statistics() # with self.__lock: self.tb_client.client.send_telemetry(summary_messages) @@ -570,8 +568,7 @@ class TBGatewayService: self.tb_client.client.request_attributes(callback=self._attributes_parse) def __register_connector(self, session_id, connector_key): - if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key][ - 'name'] not in self.available_connectors: + if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key]['name'] not in self.available_connectors: target_connector = self.__grpc_connectors.get(connector_key) connector = GrpcConnector(self, target_connector['config'], self.__grpc_manager, session_id) connector.setName(target_connector['name']) @@ -586,8 +583,7 @@ class TBGatewayService: log.error("GRPC configuration for connector with key: %s - not found", connector_key) def __unregister_connector(self, session_id, connector_key): - if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key][ - 'name'] in self.available_connectors: + if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key]['name'] in self.available_connectors: connector_name = self.__grpc_connectors[connector_key]['name'] target_connector: GrpcConnector = self.available_connectors.pop(connector_name) self.__grpc_manager.unregister(Status.SUCCESS, session_id, target_connector) @@ -718,7 +714,8 @@ class TBGatewayService: } }) - thread = Thread(target=self._run_connector, args=(connector_abs_path, connector_config_json,), + thread = Thread(target=self._run_connector, + args=(connector_abs_path, connector_config_json,), daemon=True, name='Separate DRPC Connector') thread.start() @@ -905,7 +902,8 @@ class TBGatewayService: if self.__remote_configurator is None or not self.__remote_configurator.in_process: events = self._event_storage.get_event_pack() - if events: + if events and len(self.tb_client.client._client._out_messages) <= ( + self.tb_client.client._client._max_queued_messages - 10000): for event in events: try: current_event = loads(event) @@ -1183,20 +1181,19 @@ class TBGatewayService: def __form_statistics(self): summary_messages = {"eventsProduced": 0, "eventsSent": 0} - telemetry = {} for connector in self.available_connectors: connector_camel_case = connector.lower().replace(' ', '') - telemetry[(connector_camel_case + ' EventsProduced').replace(' ', '')] = \ - self.available_connectors[connector].statistics['MessagesReceived'] - self.available_connectors[connector].statistics['MessagesReceived'] = 0 - telemetry[(connector_camel_case + ' EventsSent').replace(' ', '')] = \ - self.available_connectors[connector].statistics['MessagesSent'] - self.available_connectors[connector].statistics['MessagesSent'] = 0 + telemetry = { + (connector_camel_case + ' EventsProduced').replace(' ', ''): self.available_connectors[ + connector].statistics.pop('MessagesReceived', 0), + (connector_camel_case + ' EventsSent').replace(' ', ''): self.available_connectors[ + connector].statistics.pop('MessagesSent', 0) + } summary_messages['eventsProduced'] += telemetry[ str(connector_camel_case + ' EventsProduced').replace(' ', '')] summary_messages['eventsSent'] += telemetry[ str(connector_camel_case + ' EventsSent').replace(' ', '')] - summary_messages.update(**telemetry) + summary_messages.update(telemetry) return summary_messages def add_device_async(self, data): @@ -1236,7 +1233,7 @@ class TBGatewayService: return self.__connected_devices if connector_name is None else { device_name: self.__connected_devices[device_name]["device_type"] for device_name in self.__connected_devices.keys() if self.__connected_devices[device_name].get("connector") is not None and - self.__connected_devices[device_name]["connector"].get_name() == connector_name} + self.__connected_devices[device_name]["connector"].get_name() == connector_name} def __process_async_device_actions(self): while not self.stopped: