mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Merge remote-tracking branch 'origin/master'
This commit is contained in:
@@ -342,11 +342,12 @@ class TBGatewayService:
|
||||
|
||||
config = {}
|
||||
try:
|
||||
with open(''.join(config_file.split('.')[:-1]) + '.yaml') as general_config:
|
||||
filename = ''.join(config_file.split('.')[:-1])
|
||||
with open(filename + '.yaml') as general_config:
|
||||
config = safe_load(general_config)
|
||||
|
||||
with open(config_file, 'w') as file:
|
||||
file.writelines(dumps(config))
|
||||
with open(filename + '.json', 'w') as file:
|
||||
file.writelines(dumps(config, indent=' '))
|
||||
except Exception as e:
|
||||
log.exception('Failed to load configuration file:\n %s', e)
|
||||
|
||||
@@ -768,7 +769,7 @@ class TBGatewayService:
|
||||
if connectors_persistent_keys:
|
||||
self.__save_persistent_keys(connectors_persistent_keys)
|
||||
else:
|
||||
log.error("Connectors - not found! Check your configuration!")
|
||||
log.warning("Connectors - not found!")
|
||||
self.__init_remote_configuration(force=True)
|
||||
log.info("Remote configuration is enabled forcibly!")
|
||||
|
||||
@@ -973,6 +974,10 @@ class TBGatewayService:
|
||||
def __get_data_size(data: dict):
|
||||
return getsizeof(str(data))
|
||||
|
||||
@staticmethod
|
||||
def get_data_size(data):
|
||||
return TBGatewayService.__get_data_size(data)
|
||||
|
||||
@staticmethod
|
||||
def __convert_telemetry_to_ts(data):
|
||||
telemetry = {}
|
||||
@@ -1477,6 +1482,9 @@ class TBGatewayService:
|
||||
def ping(self):
|
||||
return self.name
|
||||
|
||||
def get_max_payload_size_bytes(self):
|
||||
return self.__config["thingsboard"].get("maxPayloadSizeBytes", 400)
|
||||
|
||||
# ----------------------------
|
||||
# Storage --------------------
|
||||
def get_storage_name(self):
|
||||
|
||||
@@ -197,24 +197,22 @@ class RemoteConfigurator:
|
||||
"""
|
||||
|
||||
LOG.debug('Sending all configurations (init)')
|
||||
self._gateway.tb_client.client.send_attributes(
|
||||
{'general_configuration': self._get_general_config_in_remote_format()})
|
||||
self._gateway.tb_client.client.send_attributes({'storage_configuration': self.storage_configuration})
|
||||
self._gateway.tb_client.client.send_attributes({'grpc_configuration': self.grpc_configuration})
|
||||
self._gateway.tb_client.client.send_attributes(
|
||||
{'logs_configuration': {**self._logs_configuration, 'ts': int(time() * 1000)}})
|
||||
self._gateway.tb_client.client.send_attributes({'active_connectors': self._get_active_connectors()})
|
||||
self._send_default_connectors_config()
|
||||
self._gateway.tb_client.client.send_attributes({'Version': self._gateway.version.get('current_version', '0.0')})
|
||||
init_config_message = {
|
||||
'general_configuration': self._get_general_config_in_remote_format(),
|
||||
'storage_configuration': self.storage_configuration,
|
||||
'grpc_configuration': self.grpc_configuration,
|
||||
'logs_configuration': {**self._logs_configuration, 'ts': int(time() * 1000)},
|
||||
'active_connectors': self._get_active_connectors(),
|
||||
'Version': self._gateway.version.get('current_version', '0.0')
|
||||
}
|
||||
self._gateway.tb_client.client.send_attributes(init_config_message)
|
||||
|
||||
# sending remote created connectors
|
||||
already_sent_connectors = []
|
||||
for connector in self.connectors_configuration:
|
||||
self._gateway.tb_client.client.send_attributes(
|
||||
{connector['name']: {**connector,
|
||||
'logLevel': connector.get('configurationJson', {}).get('logLevel', 'INFO'),
|
||||
'ts': int(time() * 1000)}})
|
||||
already_sent_connectors.append(connector['configuration'])
|
||||
|
||||
def _load_connectors_configuration(self):
|
||||
for (_, connector_list) in self._gateway.connectors_configs.items():
|
||||
|
||||
@@ -13,10 +13,12 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import logging.handlers
|
||||
from sys import stdout
|
||||
from time import time
|
||||
from time import time, sleep
|
||||
from os import environ
|
||||
from queue import Queue, Empty
|
||||
|
||||
from thingsboard_gateway.tb_utility.tb_logger import TbLogger
|
||||
|
||||
@@ -35,6 +37,12 @@ class TBLoggerHandler(logging.Handler):
|
||||
self.setLevel(logging.getLevelName('DEBUG'))
|
||||
self.__gateway = gateway
|
||||
self.activated = False
|
||||
|
||||
self._max_message_count_batch = 20
|
||||
self._logs_queue = Queue(1000)
|
||||
# start() method calls in activate() method
|
||||
self._send_logs_thread = threading.Thread(target=self._send_logs, name='Logs Sending Thread', daemon=True)
|
||||
|
||||
self.setFormatter(logging.Formatter('%(asctime)s - |%(levelname)s| - [%(filename)s] - %(module)s - %(lineno)d - %(message)s'))
|
||||
self.loggers = ['service',
|
||||
'extension',
|
||||
@@ -51,6 +59,37 @@ class TBLoggerHandler(logging.Handler):
|
||||
log.addHandler(self.__gateway.main_handler)
|
||||
log.debug("Added remote handler to log %s", name)
|
||||
|
||||
def _send_logs(self):
|
||||
while self.activated:
|
||||
if not self._logs_queue.empty():
|
||||
logs_for_sending_list = []
|
||||
|
||||
count = 1
|
||||
while count <= self._max_message_count_batch:
|
||||
try:
|
||||
log_msg = self._logs_queue.get(block=False)
|
||||
|
||||
logs_msg_size = self.__gateway.get_data_size(log_msg)
|
||||
if logs_msg_size > self.__gateway.get_max_payload_size_bytes():
|
||||
print(f'Too big LOG message size to send ({logs_msg_size}). Skipping...')
|
||||
continue
|
||||
|
||||
if self.__gateway.get_data_size(
|
||||
logs_for_sending_list) + logs_msg_size > self.__gateway.get_max_payload_size_bytes():
|
||||
self.__gateway.tb_client.client.send_telemetry(logs_for_sending_list)
|
||||
logs_for_sending_list = [log_msg]
|
||||
else:
|
||||
logs_for_sending_list.append(log_msg)
|
||||
|
||||
count += 1
|
||||
except Empty:
|
||||
break
|
||||
|
||||
if logs_for_sending_list:
|
||||
self.__gateway.tb_client.client.send_telemetry(logs_for_sending_list)
|
||||
|
||||
sleep(1)
|
||||
|
||||
def activate(self, log_level=None):
|
||||
try:
|
||||
for logger in self.loggers:
|
||||
@@ -66,6 +105,7 @@ class TBLoggerHandler(logging.Handler):
|
||||
log = TbLogger('service')
|
||||
log.exception(e)
|
||||
self.activated = True
|
||||
self._send_logs_thread.start()
|
||||
|
||||
def handle(self, record):
|
||||
if self.activated and not self.__gateway.stopped:
|
||||
@@ -76,8 +116,7 @@ class TBLoggerHandler(logging.Handler):
|
||||
except KeyError:
|
||||
telemetry_key = name + '_LOGS'
|
||||
|
||||
self.__gateway.tb_client.client.send_telemetry(
|
||||
{'ts': int(time() * 1000), 'values': {telemetry_key: record, 'LOGS': record}})
|
||||
self._logs_queue.put({'ts': int(time() * 1000), 'values': {telemetry_key: record, 'LOGS': record}})
|
||||
|
||||
def deactivate(self):
|
||||
self.activated = False
|
||||
|
||||
Reference in New Issue
Block a user