From 64c3cfbfea07fe93436bcee480a3dfe5586535a2 Mon Sep 17 00:00:00 2001 From: zbeacon Date: Mon, 6 Sep 2021 16:26:11 +0300 Subject: [PATCH] Moved saving data to storage to separated thread --- .../gateway/tb_gateway_service.py | 74 +++++++++++-------- 1 file changed, 43 insertions(+), 31 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 828fe54b..66e0bdb7 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -101,6 +101,9 @@ class TBGatewayService: self.remote_handler = TBLoggerHandler(self) self.main_handler.setTarget(self.remote_handler) self._default_connectors = DEFAULT_CONNECTORS + self.__converted_data_queue = Queue() + self.__save_converted_data_thread = Thread(name="Save converted data", daemon=True, target=self.__send_to_storage) + self.__save_converted_data_thread.start() self._implemented_connectors = {} self._event_storage_types = { "memory": MemoryEventStorage, @@ -363,39 +366,48 @@ class TBGatewayService: self._connect_with_connectors() def send_to_storage(self, connector_name, data): - if not connector_name == self.name: - if not TBUtility.validate_converted_data(data): - log.error("Data from %s connector is invalid.", connector_name) - return None - if data["deviceName"] not in self.get_devices() and self.tb_client.is_connected(): - self.add_device(data["deviceName"], - {"connector": self.available_connectors[connector_name]}, - device_type=data["deviceType"]) - if not self.__connector_incoming_messages.get(connector_name): - self.__connector_incoming_messages[connector_name] = 0 - else: - self.__connector_incoming_messages[connector_name] += 1 - else: - data["deviceName"] = "currentThingsBoardGateway" + self.__converted_data_queue.put((connector_name, data), False) - telemetry = {} - telemetry_with_ts = [] - for item in data["telemetry"]: - if item.get("ts") is None: - telemetry = {**telemetry, **item} - else: - telemetry_with_ts.append({"ts": item["ts"], "values": {**item["values"]}}) - if telemetry_with_ts: - data["telemetry"] = telemetry_with_ts - else: - data["telemetry"] = {"ts": int(time() * 1000), "values": telemetry} + def __send_to_storage(self): + while True: + try: + if not self.__converted_data_queue.empty(): + connector_name, data = self.__converted_data_queue.get(False) + if not connector_name == self.name: + if not TBUtility.validate_converted_data(data): + log.error("Data from %s connector is invalid.", connector_name) + return None + if data["deviceName"] not in self.get_devices() and self.tb_client.is_connected(): + self.add_device(data["deviceName"], + {"connector": self.available_connectors[connector_name]}, + device_type=data["deviceType"]) + if not self.__connector_incoming_messages.get(connector_name): + self.__connector_incoming_messages[connector_name] = 0 + else: + self.__connector_incoming_messages[connector_name] += 1 + else: + data["deviceName"] = "currentThingsBoardGateway" - json_data = dumps(data) - save_result = self._event_storage.put(json_data) - if not save_result: - log.error('Data from the device "%s" cannot be saved, connector name is %s.', - data["deviceName"], - connector_name) + telemetry = {} + telemetry_with_ts = [] + for item in data["telemetry"]: + if item.get("ts") is None: + telemetry = {**telemetry, **item} + else: + telemetry_with_ts.append({"ts": item["ts"], "values": {**item["values"]}}) + if telemetry_with_ts: + data["telemetry"] = telemetry_with_ts + else: + data["telemetry"] = {"ts": int(time() * 1000), "values": telemetry} + + json_data = dumps(data) + save_result = self._event_storage.put(json_data) + if not save_result: + log.error('Data from the device "%s" cannot be saved, connector name is %s.', + data["deviceName"], + connector_name) + except Exception as e: + log.error(e) def check_size(self, size, devices_data_in_event_pack): if size >= 48000: