mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Added max size batch check
This commit is contained in:
@@ -973,6 +973,10 @@ class TBGatewayService:
|
||||
def __get_data_size(data: dict):
|
||||
return getsizeof(str(data))
|
||||
|
||||
@staticmethod
|
||||
def get_data_size(data):
|
||||
return getsizeof(str(data))
|
||||
|
||||
@staticmethod
|
||||
def __convert_telemetry_to_ts(data):
|
||||
telemetry = {}
|
||||
@@ -1477,6 +1481,9 @@ class TBGatewayService:
|
||||
def ping(self):
|
||||
return self.name
|
||||
|
||||
def get_max_payload_size_bytes(self):
|
||||
return self.__config["thingsboard"].get("maxPayloadSizeBytes", 400)
|
||||
|
||||
# ----------------------------
|
||||
# Storage --------------------
|
||||
def get_storage_name(self):
|
||||
|
||||
@@ -38,8 +38,8 @@ class TBLoggerHandler(logging.Handler):
|
||||
self.__gateway = gateway
|
||||
self.activated = False
|
||||
|
||||
self._max_message_count_patch = 10
|
||||
self._logs_queue = Queue()
|
||||
self._max_message_count_batch = 20
|
||||
self._logs_queue = Queue(1000)
|
||||
# start() method calls in activate() method
|
||||
self._send_logs_thread = threading.Thread(target=self._send_logs, name='Logs Sending Thread', daemon=True)
|
||||
|
||||
@@ -65,14 +65,24 @@ class TBLoggerHandler(logging.Handler):
|
||||
logs_for_sending_list = []
|
||||
|
||||
count = 1
|
||||
while count <= self._max_message_count_patch:
|
||||
while count <= self._max_message_count_batch:
|
||||
try:
|
||||
logs_for_sending_list.append(self._logs_queue.get(block=False))
|
||||
count += 1
|
||||
except Empty:
|
||||
break
|
||||
|
||||
self.__gateway.tb_client.client.send_telemetry(logs_for_sending_list)
|
||||
if self.__gateway.get_data_size(logs_for_sending_list) <= self.__gateway.get_max_payload_size_bytes():
|
||||
self.__gateway.tb_client.client.send_telemetry(logs_for_sending_list)
|
||||
else:
|
||||
adopted_logs_batch = []
|
||||
for item in logs_for_sending_list:
|
||||
if self.__gateway.get_data_size(
|
||||
adopted_logs_batch) <= self.__gateway.get_max_payload_size_bytes():
|
||||
adopted_logs_batch.append(item)
|
||||
else:
|
||||
self.__gateway.tb_client.client.send_telemetry(adopted_logs_batch)
|
||||
adopted_logs_batch = []
|
||||
|
||||
sleep(1)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user