1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Fix data repack algorithm.

This commit is contained in:
devaskim
2022-10-08 19:12:02 +05:00
parent a996724bc6
commit 2521e93507

View File

@@ -700,44 +700,48 @@ class TBGatewayService:
"deviceType": data['deviceType'],
"attributes": {},
"telemetry": []}
empty_adopted_data_size = self.__get_data_size(adopted_data)
adopted_data_size = empty_adopted_data_size
# First, loop through the attributes
for attribute in data['attributes']:
adopted_data['attributes'].update(attribute)
adopted_data_size = self.__get_data_size(adopted_data)
adopted_data_size += self.__get_data_size(attribute)
if adopted_data_size >= max_data_size:
# We have surpassed the max_data_size, so send what we have and clear attributes
self.__send_data_pack_to_storage(adopted_data, connector_name)
adopted_data['attributes'] = {}
adopted_data_size = empty_adopted_data_size
# Now, loop through telemetry. Possibly have some unsent attributes that have been adopted.
telemetry = data['telemetry'] if isinstance(data['telemetry'], list) else [data['telemetry']]
ts_to_index = {}
for ts_kv_list in telemetry:
ts = ts_kv_list['ts']
for kv in ts_kv_list['values']:
if len(adopted_data['telemetry']) == 0:
adopted_data['telemetry'] = [
{'ts': ts, 'values': {kv: ts_kv_list['values'][kv]}}]
if ts in ts_to_index:
kv_data = {kv: ts_kv_list['values'][kv]}
adopted_data['telemetry'][ts_to_index]['values'].update(kv_data)
else:
for adopted_kv in adopted_data['telemetry']:
if adopted_kv['ts'] == ts:
adopted_kv['values'].update({kv: ts_kv_list['values'][kv]})
kv_data = {'ts': ts, 'values': {kv: ts_kv_list['values'][kv]}}
adopted_data['telemetry'].append(kv_data)
ts_to_index[ts] = len(adopted_data['telemetry']) - 1
adopted_data_size = self.__get_data_size(adopted_data)
adopted_data_size += self.__get_data_size(kv_data)
if adopted_data_size >= max_data_size:
# we have surpassed the max_data_size, so send what we have and clear attributes and telemetry
self.__send_data_pack_to_storage(adopted_data, connector_name)
adopted_data['telemetry'] = []
adopted_data['attributes'] = {}
adopted_data_size = empty_adopted_data_size
ts_to_index = {}
# It is possible that we get here and have some telemetry or attributes not yet sent, so check for that.
if len(adopted_data['telemetry']) > 0 or len(adopted_data['attributes']) > 0:
self.__send_data_pack_to_storage(adopted_data, connector_name)
# technically unnecessary to clear here, but leaving for consistency.
adopted_data['telemetry'] = []
adopted_data['attributes'] = {}
else:
self.__send_data_pack_to_storage(data, connector_name)