From 3b63ea8f0f9d110a2dc84676c1bbe0c3784d27bd Mon Sep 17 00:00:00 2001 From: samson0v Date: Wed, 29 Nov 2023 11:43:29 +0200 Subject: [PATCH] Added max size batch check --- .../gateway/tb_gateway_service.py | 7 +++++++ thingsboard_gateway/tb_utility/tb_handler.py | 18 ++++++++++++++---- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index bc28c89a..579d2ea3 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -973,6 +973,10 @@ class TBGatewayService: def __get_data_size(data: dict): return getsizeof(str(data)) + @staticmethod + def get_data_size(data): + return getsizeof(str(data)) + @staticmethod def __convert_telemetry_to_ts(data): telemetry = {} @@ -1477,6 +1481,9 @@ class TBGatewayService: def ping(self): return self.name + def get_max_payload_size_bytes(self): + return self.__config["thingsboard"].get("maxPayloadSizeBytes", 400) + # ---------------------------- # Storage -------------------- def get_storage_name(self): diff --git a/thingsboard_gateway/tb_utility/tb_handler.py b/thingsboard_gateway/tb_utility/tb_handler.py index 10f49489..3208c829 100644 --- a/thingsboard_gateway/tb_utility/tb_handler.py +++ b/thingsboard_gateway/tb_utility/tb_handler.py @@ -38,8 +38,8 @@ class TBLoggerHandler(logging.Handler): self.__gateway = gateway self.activated = False - self._max_message_count_patch = 10 - self._logs_queue = Queue() + self._max_message_count_batch = 20 + self._logs_queue = Queue(1000) # start() method calls in activate() method self._send_logs_thread = threading.Thread(target=self._send_logs, name='Logs Sending Thread', daemon=True) @@ -65,14 +65,24 @@ class TBLoggerHandler(logging.Handler): logs_for_sending_list = [] count = 1 - while count <= self._max_message_count_patch: + while count <= self._max_message_count_batch: try: logs_for_sending_list.append(self._logs_queue.get(block=False)) count += 1 except Empty: break - self.__gateway.tb_client.client.send_telemetry(logs_for_sending_list) + if self.__gateway.get_data_size(logs_for_sending_list) <= self.__gateway.get_max_payload_size_bytes(): + self.__gateway.tb_client.client.send_telemetry(logs_for_sending_list) + else: + adopted_logs_batch = [] + for item in logs_for_sending_list: + if self.__gateway.get_data_size( + adopted_logs_batch) <= self.__gateway.get_max_payload_size_bytes(): + adopted_logs_batch.append(item) + else: + self.__gateway.tb_client.client.send_telemetry(adopted_logs_batch) + adopted_logs_batch = [] sleep(1)