1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Improved sending data in packs to TB

This commit is contained in:
zbeacon
2021-12-13 16:04:24 +02:00
parent c55927c0e7
commit 66dbd12f68
7 changed files with 93 additions and 64 deletions

View File

@@ -1,7 +1,7 @@
[loggers]
keys=root, service, grpc, connector, converter, tb_connection, storage, extension
keys=root, service, connector, converter, tb_connection, storage, extension
[handlers]
keys=consoleHandler, serviceHandler, grpcHandler, connectorHandler, converterHandler, tb_connectionHandler, storageHandler, extensionHandler
keys=consoleHandler, serviceHandler, connectorHandler, converterHandler, tb_connectionHandler, storageHandler, extensionHandler
[formatters]
keys=LogFormatter
[logger_root]
@@ -32,11 +32,6 @@ level=INFO
handlers=serviceHandler
formatter=LogFormatter
qualname=service
[logger_grpc]
level=INFO
handlers=grpcHandler
formatter=LogFormatter
qualname=grpc
[logger_converter]
level=INFO
handlers=converterHandler
@@ -72,11 +67,6 @@ level=INFO
class=logging.handlers.TimedRotatingFileHandler
formatter=LogFormatter
args=("./logs/service.log", "d", 1, 7,)
[handler_grpcHandler]
level=INFO
class=logging.handlers.TimedRotatingFileHandler
formatter=LogFormatter
args=("./logs/grpc.log", "d", 1, 7,)
[handler_converterHandler]
level=INFO
class=logging.handlers.TimedRotatingFileHandler

View File

@@ -97,9 +97,4 @@ connectors:
# type: serial
# configuration: custom_serial.json
# class: CustomSerialConnector
#
# -
# name: GRPC test
# key: auto
# type: grpc
# configuration: test_grpc.json
#

View File

@@ -104,6 +104,7 @@ class GrpcDownlinkConverter(Converter):
unreg_msg = UnregisterConnectorMsg()
unreg_msg.connectorKey = msg
basic_msg.unregisterConnectorMsg.MergeFrom(unreg_msg)
return basic_msg
@staticmethod
def __get_key_value_proto_value(key: str, value: Union[str, bool, int, float, dict]) -> KeyValueProto:

View File

@@ -14,7 +14,7 @@
import logging
import threading
import time
from time import time, sleep
from ssl import CERT_REQUIRED, PROTOCOL_TLSv1_2
from thingsboard_gateway.tb_client.tb_gateway_mqtt import TBGatewayMqttClient
@@ -129,15 +129,15 @@ class TBClient(threading.Thread):
pass
except Exception as e:
log.exception(e)
time.sleep(1)
sleep(1)
except Exception as e:
log.exception(e)
time.sleep(10)
sleep(10)
while not self.__stopped:
try:
if not self.__stopped:
time.sleep(.2)
sleep(.2)
else:
break
except KeyboardInterrupt:

View File

@@ -15,6 +15,7 @@
import logging
import logging.config
import logging.handlers
from copy import deepcopy
from os import execv, listdir, path, pathsep, stat, system
from queue import SimpleQueue
from random import choice
@@ -350,7 +351,6 @@ class TBGatewayService:
if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key]['name'] in self.available_connectors:
connector_name = self.__grpc_connectors[connector_key]['name']
target_connector: GrpcConnector = self.available_connectors.pop(connector_name)
target_connector.close()
self.__grpc_manager.unregister(Status.SUCCESS, session_id, target_connector)
log.info("GRPC connector with key %s and name %s - unregistered", connector_key, target_connector.get_name())
elif self.__grpc_connectors.get(connector_key) is not None:
@@ -477,6 +477,14 @@ class TBGatewayService:
if not TBUtility.validate_converted_data(data):
log.error("Data from %s connector is invalid.", connector_name)
continue
if data.get('deviceType') is None:
device_name = data['deviceName']
if self.__connected_devices.get(device_name) is not None:
data["deviceType"] = self.__connected_devices[device_name]['device_type']
elif self.__saved_devices.get(device_name) is not None:
data["deviceType"] = self.__saved_devices[device_name]['device_type']
else:
data["deviceType"] = "default"
if data["deviceName"] not in self.get_devices() and self.tb_client.is_connected():
self.add_device(data["deviceName"],
{"connector": self.available_connectors[connector_name]},
@@ -488,34 +496,75 @@ class TBGatewayService:
else:
data["deviceName"] = "currentThingsBoardGateway"
telemetry = {}
telemetry_with_ts = []
for item in data["telemetry"]:
if item.get("ts") is None:
telemetry = {**telemetry, **item}
else:
telemetry_with_ts.append({"ts": item["ts"], "values": {**item["values"]}})
if telemetry_with_ts:
data["telemetry"] = telemetry_with_ts
elif len(data['telemetry']) > 0:
data["telemetry"] = {"ts": int(time() * 1000), "values": telemetry}
data = self.__convert_telemetry_to_ts(data)
max_data_size = self.__config["thingsboard"].get("maxPayloadSizeBytes", 1024)
if self.__get_data_size(data) >= max_data_size:
adopted_data = {"deviceName": data['deviceName'],
"deviceType": data['deviceType'],
"attributes": {},
"telemetry": []}
for attribute in data['attributes']:
adopted_data_size = self.__get_data_size(adopted_data)
if adopted_data_size >= max_data_size:
self.__send_data_pack_to_storage(adopted_data, connector_name)
adopted_data['attributes'] = {}
adopted_data['attributes'].update({attribute: data['attributes'][attribute]})
for ts_kv_list in data['telemetry']:
ts = ts_kv_list['ts']
for kv in ts_kv_list['values']:
adopted_data_size = self.__get_data_size(adopted_data)
if adopted_data_size >= max_data_size:
self.__send_data_pack_to_storage(adopted_data, connector_name)
adopted_data['telemetry'] = []
if len(adopted_data['telemetry']) == 0:
adopted_data['telemetry'] = [{'ts': ts, 'values': {kv: ts_kv_list['values'][kv]}}]
else:
for adopted_kv in adopted_data['telemetry']:
if adopted_kv['ts'] == ts:
adopted_kv['values'].update({kv: ts_kv_list['values'][kv]})
else:
self.__send_data_pack_to_storage(data, connector_name)
json_data = dumps(data)
save_result = self._event_storage.put(json_data)
if not save_result:
log.error('Data from the device "%s" cannot be saved, connector name is %s.',
data["deviceName"],
connector_name)
else:
sleep(0.2)
except Exception as e:
log.error(e)
def check_size(self, size, devices_data_in_event_pack):
if size >= self.__config["thingsboard"].get("maxPayloadSizeBytes", 4096):
@staticmethod
def __get_data_size(data: dict):
return getsizeof(str(data))
@staticmethod
def __convert_telemetry_to_ts(data):
telemetry = {}
telemetry_with_ts = []
for item in data["telemetry"]:
if item.get("ts") is None:
telemetry = {**telemetry, **item}
else:
telemetry_with_ts.append({"ts": item["ts"], "values": {**item["values"]}})
if telemetry_with_ts:
data["telemetry"] = telemetry_with_ts
elif len(data['telemetry']) > 0:
data["telemetry"] = {"ts": int(time() * 1000), "values": telemetry}
return data
def __send_data_pack_to_storage(self, data, connector_name):
json_data = dumps(data)
save_result = self._event_storage.put(json_data)
if not save_result:
log.error('Data from the device "%s" cannot be saved, connector name is %s.',
data["deviceName"],
connector_name)
def check_size(self, devices_data_in_event_pack):
if self.__get_data_size(devices_data_in_event_pack) >= self.__config["thingsboard"].get("maxPayloadSizeBytes", 1024):
self.__send_data(devices_data_in_event_pack)
size = 0
return size
for device in devices_data_in_event_pack:
devices_data_in_event_pack[device]["telemetry"] = []
devices_data_in_event_pack[device]["attributes"] = {}
def __read_data_from_storage(self):
devices_data_in_event_pack = {}
@@ -524,7 +573,7 @@ class TBGatewayService:
while not self.stopped:
try:
if self.tb_client.is_connected():
size = getsizeof(str(devices_data_in_event_pack)) - 2
size = self.__get_data_size(devices_data_in_event_pack) - 2
events = []
if self.__remote_configurator is None or not self.__remote_configurator.in_process:
@@ -545,34 +594,28 @@ class TBGatewayService:
if current_event.get("telemetry"):
if isinstance(current_event["telemetry"], list):
for item in current_event["telemetry"]:
size += getsizeof(str(item)) - 2
size = self.check_size(size, devices_data_in_event_pack)
self.check_size(devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(
item)
else:
size += getsizeof(str(current_event["telemetry"])) - 2
size = self.check_size(size, devices_data_in_event_pack)
self.check_size(devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(
current_event["telemetry"])
if current_event.get("attributes"):
if isinstance(current_event["attributes"], list):
for item in current_event["attributes"]:
size += getsizeof(str(item)) - 2
size = self.check_size(size, devices_data_in_event_pack)
self.check_size(devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(
item.items())
else:
size += getsizeof(str(current_event["attributes"].items())) - 2
size = self.check_size(size, devices_data_in_event_pack)
self.check_size(devices_data_in_event_pack)
devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(
current_event["attributes"].items())
if devices_data_in_event_pack:
if not self.tb_client.is_connected():
continue
while self.__rpc_reply_sent:
sleep(.2)
self.__send_data(devices_data_in_event_pack)
sleep(self.__min_pack_send_delay_ms)
@@ -600,7 +643,7 @@ class TBGatewayService:
log.exception(e)
success = False
sleep(0.2)
if success:
if success and self.tb_client.is_connected():
self._event_storage.event_pack_processing_done()
del devices_data_in_event_pack
devices_data_in_event_pack = {}

View File

@@ -15,7 +15,7 @@
import logging
import queue
import ssl
import time
from time import sleep, time
from threading import RLock, Thread
import paho.mqtt.client as paho
@@ -77,7 +77,7 @@ class TBPublishInfo:
class TBDeviceMqttClient:
def __init__(self, host, port=1883, token=None, quality_of_service=None):
self._client = paho.Client()
self._client = paho.Client(protocol=4)
self.quality_of_service = quality_of_service if quality_of_service is not None else 1
self.__host = host
self.__port = port
@@ -132,7 +132,7 @@ class TBDeviceMqttClient:
5: "not authorised",
}
if self.__connect_callback:
time.sleep(.2)
sleep(.2)
self.__connect_callback(client, userdata, flags, result_code, *extra_params)
if result_code == 0:
self.__is_connected = True
@@ -317,7 +317,7 @@ class TBDeviceMqttClient:
tmp = tmp[:len(tmp) - 1]
msg.update({"sharedKeys": tmp})
ts_in_millis = int(round(time.time() * 1000))
ts_in_millis = int(time() * 1000)
attr_request_number = self._add_attr_request_callback(callback)
@@ -343,10 +343,10 @@ class TBDeviceMqttClient:
item = self.__timeout_queue.get_nowait()
if item is not None:
while not self.stopped:
current_ts_in_millis = int(round(time.time() * 1000))
current_ts_in_millis = int(time() * 1000)
if current_ts_in_millis > item["ts"]:
break
time.sleep(0.2)
sleep(0.2)
with self._lock:
callback = None
if item.get("attribute_request_id"):
@@ -358,7 +358,7 @@ class TBDeviceMqttClient:
if callback is not None:
callback(None, TBTimeoutException("Timeout while waiting for a reply from ThingsBoard!"))
else:
time.sleep(0.2)
sleep(0.2)
def claim(self, secret_key, duration=30000):
claiming_request = {

View File

@@ -54,7 +54,7 @@ class TBGatewayMqttClient(TBDeviceMqttClient):
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_TOPIC, qos=1)[1])] = GATEWAY_RPC_TOPIC
# self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_RESPONSE_TOPIC)[1])] = GATEWAY_RPC_RESPONSE_TOPIC
def _on_subscribe(self, client, userdata, mid, granted_qos):
def _on_subscribe(self, client, userdata, mid, reasoncodes, properties=None):
subscription = self._gw_subscriptions.get(mid)
if subscription is not None:
if mid == 128: