1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Refactoring for main service, added check is outgoing mqtt client queue is almost full and waiting for processing new events pack

This commit is contained in:
imbeacon
2023-05-17 12:49:42 +03:00
parent eaabfb8114
commit 5e4d830c1a

View File

@@ -16,17 +16,16 @@ import logging
import logging.config
import logging.handlers
import multiprocessing.managers
from signal import signal, SIGINT
import subprocess
from copy import deepcopy
from os import execv, listdir, path, pathsep, stat, system, environ
from platform import system as platform_system
from queue import SimpleQueue
from random import choice
from signal import signal, SIGINT
from string import ascii_lowercase, hexdigits
from sys import argv, executable, getsizeof
from threading import RLock, Thread
from time import sleep, time
from platform import system as platform_system
import simplejson
from simplejson import JSONDecodeError, dumps, load, loads
@@ -381,8 +380,7 @@ class TBGatewayService:
self.__request_config_after_connect = True
self.__check_shared_attributes()
if cur_time - gateway_statistic_send > self.__statistics[
'statsSendPeriodInSeconds'] * 1000 and self.tb_client.is_connected():
if cur_time - gateway_statistic_send > self.__statistics['statsSendPeriodInSeconds'] * 1000 and self.tb_client.is_connected():
summary_messages = self.__form_statistics()
# with self.__lock:
self.tb_client.client.send_telemetry(summary_messages)
@@ -570,8 +568,7 @@ class TBGatewayService:
self.tb_client.client.request_attributes(callback=self._attributes_parse)
def __register_connector(self, session_id, connector_key):
if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key][
'name'] not in self.available_connectors:
if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key]['name'] not in self.available_connectors:
target_connector = self.__grpc_connectors.get(connector_key)
connector = GrpcConnector(self, target_connector['config'], self.__grpc_manager, session_id)
connector.setName(target_connector['name'])
@@ -586,8 +583,7 @@ class TBGatewayService:
log.error("GRPC configuration for connector with key: %s - not found", connector_key)
def __unregister_connector(self, session_id, connector_key):
if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key][
'name'] in self.available_connectors:
if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key]['name'] in self.available_connectors:
connector_name = self.__grpc_connectors[connector_key]['name']
target_connector: GrpcConnector = self.available_connectors.pop(connector_name)
self.__grpc_manager.unregister(Status.SUCCESS, session_id, target_connector)
@@ -718,7 +714,8 @@ class TBGatewayService:
}
})
thread = Thread(target=self._run_connector, args=(connector_abs_path, connector_config_json,),
thread = Thread(target=self._run_connector,
args=(connector_abs_path, connector_config_json,),
daemon=True, name='Separate DRPC Connector')
thread.start()
@@ -905,7 +902,8 @@ class TBGatewayService:
if self.__remote_configurator is None or not self.__remote_configurator.in_process:
events = self._event_storage.get_event_pack()
if events:
if events and len(self.tb_client.client._client._out_messages) <= (
self.tb_client.client._client._max_queued_messages - 10000):
for event in events:
try:
current_event = loads(event)
@@ -1183,20 +1181,19 @@ class TBGatewayService:
def __form_statistics(self):
summary_messages = {"eventsProduced": 0, "eventsSent": 0}
telemetry = {}
for connector in self.available_connectors:
connector_camel_case = connector.lower().replace(' ', '')
telemetry[(connector_camel_case + ' EventsProduced').replace(' ', '')] = \
self.available_connectors[connector].statistics['MessagesReceived']
self.available_connectors[connector].statistics['MessagesReceived'] = 0
telemetry[(connector_camel_case + ' EventsSent').replace(' ', '')] = \
self.available_connectors[connector].statistics['MessagesSent']
self.available_connectors[connector].statistics['MessagesSent'] = 0
telemetry = {
(connector_camel_case + ' EventsProduced').replace(' ', ''): self.available_connectors[
connector].statistics.pop('MessagesReceived', 0),
(connector_camel_case + ' EventsSent').replace(' ', ''): self.available_connectors[
connector].statistics.pop('MessagesSent', 0)
}
summary_messages['eventsProduced'] += telemetry[
str(connector_camel_case + ' EventsProduced').replace(' ', '')]
summary_messages['eventsSent'] += telemetry[
str(connector_camel_case + ' EventsSent').replace(' ', '')]
summary_messages.update(**telemetry)
summary_messages.update(telemetry)
return summary_messages
def add_device_async(self, data):
@@ -1236,7 +1233,7 @@ class TBGatewayService:
return self.__connected_devices if connector_name is None else {
device_name: self.__connected_devices[device_name]["device_type"] for device_name in
self.__connected_devices.keys() if self.__connected_devices[device_name].get("connector") is not None and
self.__connected_devices[device_name]["connector"].get_name() == connector_name}
self.__connected_devices[device_name]["connector"].get_name() == connector_name}
def __process_async_device_actions(self):
while not self.stopped: