mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Moved saving data to storage to separated thread
This commit is contained in:
@@ -101,6 +101,9 @@ class TBGatewayService:
|
||||
self.remote_handler = TBLoggerHandler(self)
|
||||
self.main_handler.setTarget(self.remote_handler)
|
||||
self._default_connectors = DEFAULT_CONNECTORS
|
||||
self.__converted_data_queue = Queue()
|
||||
self.__save_converted_data_thread = Thread(name="Save converted data", daemon=True, target=self.__send_to_storage)
|
||||
self.__save_converted_data_thread.start()
|
||||
self._implemented_connectors = {}
|
||||
self._event_storage_types = {
|
||||
"memory": MemoryEventStorage,
|
||||
@@ -363,39 +366,48 @@ class TBGatewayService:
|
||||
self._connect_with_connectors()
|
||||
|
||||
def send_to_storage(self, connector_name, data):
|
||||
if not connector_name == self.name:
|
||||
if not TBUtility.validate_converted_data(data):
|
||||
log.error("Data from %s connector is invalid.", connector_name)
|
||||
return None
|
||||
if data["deviceName"] not in self.get_devices() and self.tb_client.is_connected():
|
||||
self.add_device(data["deviceName"],
|
||||
{"connector": self.available_connectors[connector_name]},
|
||||
device_type=data["deviceType"])
|
||||
if not self.__connector_incoming_messages.get(connector_name):
|
||||
self.__connector_incoming_messages[connector_name] = 0
|
||||
else:
|
||||
self.__connector_incoming_messages[connector_name] += 1
|
||||
else:
|
||||
data["deviceName"] = "currentThingsBoardGateway"
|
||||
self.__converted_data_queue.put((connector_name, data), False)
|
||||
|
||||
telemetry = {}
|
||||
telemetry_with_ts = []
|
||||
for item in data["telemetry"]:
|
||||
if item.get("ts") is None:
|
||||
telemetry = {**telemetry, **item}
|
||||
else:
|
||||
telemetry_with_ts.append({"ts": item["ts"], "values": {**item["values"]}})
|
||||
if telemetry_with_ts:
|
||||
data["telemetry"] = telemetry_with_ts
|
||||
else:
|
||||
data["telemetry"] = {"ts": int(time() * 1000), "values": telemetry}
|
||||
def __send_to_storage(self):
|
||||
while True:
|
||||
try:
|
||||
if not self.__converted_data_queue.empty():
|
||||
connector_name, data = self.__converted_data_queue.get(False)
|
||||
if not connector_name == self.name:
|
||||
if not TBUtility.validate_converted_data(data):
|
||||
log.error("Data from %s connector is invalid.", connector_name)
|
||||
return None
|
||||
if data["deviceName"] not in self.get_devices() and self.tb_client.is_connected():
|
||||
self.add_device(data["deviceName"],
|
||||
{"connector": self.available_connectors[connector_name]},
|
||||
device_type=data["deviceType"])
|
||||
if not self.__connector_incoming_messages.get(connector_name):
|
||||
self.__connector_incoming_messages[connector_name] = 0
|
||||
else:
|
||||
self.__connector_incoming_messages[connector_name] += 1
|
||||
else:
|
||||
data["deviceName"] = "currentThingsBoardGateway"
|
||||
|
||||
json_data = dumps(data)
|
||||
save_result = self._event_storage.put(json_data)
|
||||
if not save_result:
|
||||
log.error('Data from the device "%s" cannot be saved, connector name is %s.',
|
||||
data["deviceName"],
|
||||
connector_name)
|
||||
telemetry = {}
|
||||
telemetry_with_ts = []
|
||||
for item in data["telemetry"]:
|
||||
if item.get("ts") is None:
|
||||
telemetry = {**telemetry, **item}
|
||||
else:
|
||||
telemetry_with_ts.append({"ts": item["ts"], "values": {**item["values"]}})
|
||||
if telemetry_with_ts:
|
||||
data["telemetry"] = telemetry_with_ts
|
||||
else:
|
||||
data["telemetry"] = {"ts": int(time() * 1000), "values": telemetry}
|
||||
|
||||
json_data = dumps(data)
|
||||
save_result = self._event_storage.put(json_data)
|
||||
if not save_result:
|
||||
log.error('Data from the device "%s" cannot be saved, connector name is %s.',
|
||||
data["deviceName"],
|
||||
connector_name)
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
|
||||
def check_size(self, size, devices_data_in_event_pack):
|
||||
if size >= 48000:
|
||||
|
||||
Reference in New Issue
Block a user