From 5819ba258d0ac2bab8deaba1e54e7b3d285e8aea Mon Sep 17 00:00:00 2001 From: devaskim Date: Thu, 17 Nov 2022 18:28:30 +0500 Subject: [PATCH] Data duplicate detector feature. --- .../test_mqtt_json_uplink_converter.py | 21 ++ tests/gateway/test_duplicate_detector.py | 226 ++++++++++++++++++ thingsboard_gateway/config/mqtt.json | 1 + thingsboard_gateway/connectors/connector.py | 3 + .../mqtt/json_mqtt_uplink_converter.py | 6 +- .../connectors/mqtt/mqtt_connector.py | 15 +- thingsboard_gateway/gateway/constant_enums.py | 3 +- thingsboard_gateway/gateway/constants.py | 5 + .../gateway/duplicate_detector.py | 104 ++++++++ .../gateway/tb_gateway_service.py | 13 +- 10 files changed, 389 insertions(+), 8 deletions(-) create mode 100644 tests/gateway/test_duplicate_detector.py create mode 100644 thingsboard_gateway/gateway/duplicate_detector.py diff --git a/tests/converters/test_mqtt_json_uplink_converter.py b/tests/converters/test_mqtt_json_uplink_converter.py index f782ca66..7eda6241 100644 --- a/tests/converters/test_mqtt_json_uplink_converter.py +++ b/tests/converters/test_mqtt_json_uplink_converter.py @@ -15,6 +15,7 @@ import unittest from random import randint +from thingsboard_gateway.gateway.constants import * from thingsboard_gateway.connectors.mqtt.json_mqtt_uplink_converter import JsonMqttUplinkConverter @@ -56,6 +57,26 @@ class JsonMqttUplinkConverterTests(unittest.TestCase): self.assertDictEqual(single_data, self._convert_to_dict(item.get('telemetry'))) self.assertDictEqual(single_data, self._convert_to_dict(item.get('attributes'))) + def test_without_send_on_change_option(self): + topic, config, data = self._get_device_1_test_data() + converter = JsonMqttUplinkConverter(config) + converted_array_data = converter.convert(topic, data) + self.assertIsNone(converted_array_data.get(SEND_ON_CHANGE_PARAMETER)) + + def test_with_send_on_change_option_disabled(self): + topic, config, data = self._get_device_1_test_data() + config["converter"][SEND_ON_CHANGE_PARAMETER] = False + converter = JsonMqttUplinkConverter(config) + converted_array_data = converter.convert(topic, data) + self.assertFalse(converted_array_data.get(SEND_ON_CHANGE_PARAMETER)) + + def test_with_send_on_change_option_enabled(self): + topic, config, data = self._get_device_1_test_data() + config["converter"][SEND_ON_CHANGE_PARAMETER] = True + converter = JsonMqttUplinkConverter(config) + converted_array_data = converter.convert(topic, data) + self.assertTrue(converted_array_data.get(SEND_ON_CHANGE_PARAMETER)) + @staticmethod def _convert_to_dict(data_array): data_dict = {} diff --git a/tests/gateway/test_duplicate_detector.py b/tests/gateway/test_duplicate_detector.py new file mode 100644 index 00000000..b388587d --- /dev/null +++ b/tests/gateway/test_duplicate_detector.py @@ -0,0 +1,226 @@ +# Copyright 2022. ThingsBoard +# # +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# # +# http://www.apache.org/licenses/LICENSE-2.0 +# # +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from time import time + +from thingsboard_gateway.gateway.constants import * +from thingsboard_gateway.gateway.duplicate_detector import DuplicateDetector + + +class ConnectorStorage: + def __init__(self): + self._connectors = {} + + def add_connector(self, name, connector): + self._connectors[name] = connector + + def get_connector(self, name): + return self._connectors.get(name) + + +class Connector: + def __init__(self, enable_filtering): + self._enable_filtering = enable_filtering + + def is_filtering_enable(self, device_name): + return self._enable_filtering + + +class TestDuplicateDetector(unittest.TestCase): + + TEST_DEVICE_NAME = "Test device" + TEST_DEVICE_TYPE = "TimeMachine" + + @staticmethod + def _to_dict(dicts_in_array): + result_dict = {} + for attribute in dicts_in_array: + for key, value in attribute.items(): + result_dict[key] = value + return result_dict + + def _is_dicts_equal(self, actual, expected): + self.assertEqual(len(actual), len(expected)) + diff = set(actual.items()) ^ set(expected.items()) + self.assertTrue(len(diff) == 0) + + def _is_data_packets_equal(self, actual, expected): + self.assertIsNotNone(actual) + self.assertEqual(actual.get(DEVICE_NAME_PARAMETER), expected.get(DEVICE_NAME_PARAMETER)) + self.assertEqual(actual.get(DEVICE_TYPE_PARAMETER), expected.get(DEVICE_TYPE_PARAMETER)) + + actual_attributes = TestDuplicateDetector._to_dict(actual.get(ATTRIBUTES_PARAMETER)) + expected_attributes = TestDuplicateDetector._to_dict(expected.get(ATTRIBUTES_PARAMETER)) + + self._is_dicts_equal(actual_attributes, expected_attributes) + + actual_telemetry = actual.get(TELEMETRY_PARAMETER) + expected_telemetry = expected.get(TELEMETRY_PARAMETER) + + self.assertEqual(len(actual_telemetry), len(expected_telemetry)) + + @staticmethod + def _create_data_packet(attributes=None, telemetry=None): + return { + DEVICE_NAME_PARAMETER: TestDuplicateDetector.TEST_DEVICE_NAME, + DEVICE_TYPE_PARAMETER: TestDuplicateDetector.TEST_DEVICE_TYPE, + ATTRIBUTES_PARAMETER: attributes if attributes else [TestDuplicateDetector._create_attributes()], + TELEMETRY_PARAMETER: telemetry if telemetry else [TestDuplicateDetector._create_telemetry()] + } + + @staticmethod + def _create_attributes(): + return {'testAttribute': 100500} + + @staticmethod + def _create_telemetry(): + return {'testTelemetry': 12345} + + @staticmethod + def _create_ts_telemetry(): + return { + TELEMETRY_TIMESTAMP_PARAMETER: 1668624816000, + TELEMETRY_VALUES_PARAMETER: TestDuplicateDetector._create_telemetry() + } + + def setUp(self): + self._connector_storage = ConnectorStorage() + self._duplicate_detector = DuplicateDetector(self._connector_storage) + + def test_in_data_filter_disable(self): + expected_data = self._create_data_packet() + expected_data[SEND_ON_CHANGE_PARAMETER] = False + actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data) + self._is_data_packets_equal(actual_data1, expected_data) + actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data) + self._is_data_packets_equal(actual_data2, expected_data) + + def test_in_data_filter_enable(self): + expected_data = self._create_data_packet() + expected_data[SEND_ON_CHANGE_PARAMETER] = True + actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data) + self._is_data_packets_equal(actual_data1, expected_data) + actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data) + self.assertIsNone(actual_data2) + + def test_dont_filter_data_from_unknown_connector(self): + expected_data = self._create_data_packet() + actual_data1 = self._duplicate_detector.filter_data("unknown_connector", expected_data) + self._is_data_packets_equal(actual_data1, expected_data) + actual_data2 = self._duplicate_detector.filter_data("unknown_connector", expected_data) + self._is_data_packets_equal(actual_data2, expected_data) + + def test_connector_with_disable_filtering(self): + connector_name = "some_connector" + self._connector_storage.add_connector(connector_name, Connector(False)) + + expected_data = self._create_data_packet() + actual_data1 = self._duplicate_detector.filter_data(connector_name, expected_data) + self._is_data_packets_equal(actual_data1, expected_data) + actual_data2 = self._duplicate_detector.filter_data(connector_name, expected_data) + self._is_data_packets_equal(actual_data2, expected_data) + + def test_connector_with_enable_filtering(self): + connector_name = "some_connector" + self._connector_storage.add_connector(connector_name, Connector(True)) + + expected_data = self._create_data_packet() + actual_data1 = self._duplicate_detector.filter_data(connector_name, expected_data) + self._is_data_packets_equal(actual_data1, expected_data) + actual_data2 = self._duplicate_detector.filter_data(connector_name, expected_data) + self.assertIsNone(actual_data2) + + def test_one_unchanged_and_one_changed_attributes(self): + unchanged_attribute = {"UnchangedAttr": 1} + changed_attr_name = "ChangedAttr" + changed_attribute_value1 = {changed_attr_name: 10} + changed_attribute_value2 = {changed_attr_name: 20} + expected_attributes1 = [unchanged_attribute, changed_attribute_value1] + expected_attributes2 = [unchanged_attribute, changed_attribute_value2] + + expected_data1 = self._create_data_packet(expected_attributes1, []) + expected_data1[SEND_ON_CHANGE_PARAMETER] = True + expected_data2 = self._create_data_packet(expected_attributes2, []) + expected_data2[SEND_ON_CHANGE_PARAMETER] = True + + actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data1) + expected_data1.pop(SEND_ON_CHANGE_PARAMETER, None) + self.assertEqual(actual_data1, expected_data1) + + actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data2) + actual_attributes = actual_data2.get(ATTRIBUTES_PARAMETER) + self.assertNotEqual(len(actual_attributes), len(expected_attributes2)) + self.assertTrue(len(actual_attributes) == 1) + self._is_dicts_equal(actual_attributes[0], changed_attribute_value2) + + def test_one_unchanged_and_one_changed_telemetry_without_timestamp(self): + unchanged_telemetry = {"UnchangedTelemetry": 1} + changed_telemetry_name = "ChangedTelemetry" + changed_telemetry_value1 = {changed_telemetry_name: 10} + changed_telemetry_value2 = {changed_telemetry_name: 20} + expected_telemetry1 = [unchanged_telemetry, changed_telemetry_value1] + expected_telemetry2 = [unchanged_telemetry, changed_telemetry_value2] + + expected_data1 = self._create_data_packet([], expected_telemetry1) + expected_data1[SEND_ON_CHANGE_PARAMETER] = True + expected_data2 = self._create_data_packet([], expected_telemetry2) + expected_data2[SEND_ON_CHANGE_PARAMETER] = True + + actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data1) + expected_data1.pop(SEND_ON_CHANGE_PARAMETER, None) + self.assertEqual(actual_data1, expected_data1) + + actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data2) + actual_telemetry = actual_data2.get(TELEMETRY_PARAMETER) + self.assertNotEqual(len(actual_telemetry), len(expected_telemetry2)) + self.assertTrue(len(actual_telemetry) == 1) + self._is_dicts_equal(actual_telemetry[0], changed_telemetry_value2) + + def test_one_unchanged_and_one_changed_telemetry_with_timestamp(self): + unchanged_telemetry = { + TELEMETRY_TIMESTAMP_PARAMETER: time(), + TELEMETRY_VALUES_PARAMETER: {"UnchangedTelemetry": 1} + } + changed_telemetry_name = "ChangedTelemetry" + changed_telemetry_value1 = { + TELEMETRY_TIMESTAMP_PARAMETER: time(), + TELEMETRY_VALUES_PARAMETER: {changed_telemetry_name: 10} + } + changed_telemetry_value2 = { + TELEMETRY_TIMESTAMP_PARAMETER: time(), + TELEMETRY_VALUES_PARAMETER: {changed_telemetry_name: 20} + } + expected_telemetry1 = [unchanged_telemetry, changed_telemetry_value1] + expected_telemetry2 = [unchanged_telemetry, changed_telemetry_value2] + + expected_data1 = self._create_data_packet([], expected_telemetry1) + expected_data1[SEND_ON_CHANGE_PARAMETER] = True + expected_data2 = self._create_data_packet([], expected_telemetry2) + expected_data2[SEND_ON_CHANGE_PARAMETER] = True + + actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data1) + expected_data1.pop(SEND_ON_CHANGE_PARAMETER, None) + self.assertEqual(actual_data1, expected_data1) + + actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data2) + actual_telemetry = actual_data2.get(TELEMETRY_PARAMETER) + self.assertNotEqual(len(actual_telemetry), len(expected_telemetry2)) + self.assertTrue(len(actual_telemetry) == 1) + self.assertEqual(actual_telemetry[0].get(TELEMETRY_TIMESTAMP_PARAMETER), changed_telemetry_value2.get(TELEMETRY_TIMESTAMP_PARAMETER)) + self._is_dicts_equal(actual_telemetry[0].get(TELEMETRY_VALUES_PARAMETER), changed_telemetry_value2.get(TELEMETRY_VALUES_PARAMETER)) + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/thingsboard_gateway/config/mqtt.json b/thingsboard_gateway/config/mqtt.json index 500da64d..4acb3d4e 100644 --- a/thingsboard_gateway/config/mqtt.json +++ b/thingsboard_gateway/config/mqtt.json @@ -7,6 +7,7 @@ "version": 5, "maxMessageNumberPerWorker": 10, "maxNumberOfWorkers": 100, + "sendDataOnlyOnChange": false, "security": { "type": "basic", "username": "user", diff --git a/thingsboard_gateway/connectors/connector.py b/thingsboard_gateway/connectors/connector.py index 951cd3e8..9610ce39 100644 --- a/thingsboard_gateway/connectors/connector.py +++ b/thingsboard_gateway/connectors/connector.py @@ -43,3 +43,6 @@ class Connector(ABC): @abstractmethod def server_side_rpc_handler(self, content): pass + + def is_filtering_enable(self, device_name): + return False diff --git a/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py b/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py index 6ccaaf94..3768b841 100644 --- a/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py +++ b/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py @@ -13,10 +13,10 @@ # limitations under the License. from re import search -from time import time from simplejson import dumps +from thingsboard_gateway.gateway.constants import * from thingsboard_gateway.connectors.mqtt.mqtt_uplink_converter import MqttUplinkConverter, log from thingsboard_gateway.tb_utility.tb_utility import TBUtility from thingsboard_gateway.gateway.statistics_service import StatisticsService @@ -25,6 +25,7 @@ from thingsboard_gateway.gateway.statistics_service import StatisticsService class JsonMqttUplinkConverter(MqttUplinkConverter): def __init__(self, config): self.__config = config.get('converter') + self.__send_data_on_change = self.__config.get(SEND_ON_CHANGE_PARAMETER) @property def config(self): @@ -51,6 +52,9 @@ class JsonMqttUplinkConverter(MqttUplinkConverter): "telemetry": [] } + if isinstance(self.__send_data_on_change, bool): + dict_result[SEND_ON_CHANGE_PARAMETER] = self.__send_data_on_change + try: for datatype in datatypes: timestamp = data.get("ts", data.get("timestamp")) if datatype == 'timeseries' else None diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py index 3c50b405..464d5077 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py @@ -22,6 +22,8 @@ from time import sleep, time import simplejson +from thingsboard_gateway.gateway.constants import * +from thingsboard_gateway.gateway.constant_enums import Status from thingsboard_gateway.connectors.connector import Connector, log from thingsboard_gateway.connectors.mqtt.mqtt_decorators import CustomCollectStatistics from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader @@ -108,6 +110,7 @@ class MqttConnector(Connector, Thread): # Extract main sections from configuration --------------------------------------------------------------------- self.__broker = config.get('broker') + self.__send_data_only_on_change = self.__broker.get(SEND_ON_CHANGE_PARAMETER, DEFAULT_SEND_DATA_ON_CHANGE_VALUE) # for sendDataOnlyOnChange param self.__topic_content = {} @@ -218,6 +221,9 @@ class MqttConnector(Connector, Thread): self._on_message_thread = Thread(name='On Message', target=self._process_on_message, daemon=True) self._on_message_thread.start() + def is_filtering_enable(self, device_name): + return self.__send_data_only_on_change + def load_handlers(self, handler_flavor, mandatory_keys, accepted_handlers_list): if handler_flavor not in self.config: self.__log.error("'%s' section missing from configuration", handler_flavor) @@ -436,9 +442,9 @@ class MqttConnector(Connector, Thread): return False def _save_converted_msg(self, topic, data): - self.__gateway.send_to_storage(self.name, data) - self.statistics['MessagesSent'] += 1 - self.__log.debug("Successfully converted message from topic %s", topic) + if self.__gateway.send_to_storage(self.name, data) == Status.SUCCESS: + self.statistics['MessagesSent'] += 1 + self.__log.debug("Successfully converted message from topic %s", topic) def __threads_manager(self): if len(self.__workers_thread_pool) == 0: @@ -847,7 +853,8 @@ class MqttConnector(Connector, Thread): convert_function, config, incoming_data = self.__msg_queue.get(True, 100) converted_data = convert_function(config, incoming_data) log.debug(converted_data) - if converted_data and (converted_data.get('attributes') or converted_data.get('telemetry')): + if converted_data and (converted_data.get(ATTRIBUTES_PARAMETER) or + converted_data.get(TELEMETRY_PARAMETER)): self.__send_result(config, converted_data) self.in_progress = False else: diff --git a/thingsboard_gateway/gateway/constant_enums.py b/thingsboard_gateway/gateway/constant_enums.py index 6ce8f426..99f15537 100644 --- a/thingsboard_gateway/gateway/constant_enums.py +++ b/thingsboard_gateway/gateway/constant_enums.py @@ -34,4 +34,5 @@ class DownlinkMessageType(Enum): class Status(Enum): FAILURE = 1, NOT_FOUND = 2, - SUCCESS = 3 + SUCCESS = 3, + NO_NEW_DATA = 4 diff --git a/thingsboard_gateway/gateway/constants.py b/thingsboard_gateway/gateway/constants.py index 5f421da3..2f9dade6 100644 --- a/thingsboard_gateway/gateway/constants.py +++ b/thingsboard_gateway/gateway/constants.py @@ -39,6 +39,11 @@ DEVICE_TYPE_PARAMETER = "deviceType" ATTRIBUTES_PARAMETER = "attributes" TELEMETRY_PARAMETER = "telemetry" +TELEMETRY_TIMESTAMP_PARAMETER = "ts" +TELEMETRY_VALUES_PARAMETER = "values" + +SEND_ON_CHANGE_PARAMETER = "sendDataOnlyOnChange" +DEFAULT_SEND_DATA_ON_CHANGE_VALUE = False # RPC parameter constants diff --git a/thingsboard_gateway/gateway/duplicate_detector.py b/thingsboard_gateway/gateway/duplicate_detector.py new file mode 100644 index 00000000..f2f1fc74 --- /dev/null +++ b/thingsboard_gateway/gateway/duplicate_detector.py @@ -0,0 +1,104 @@ +# Copyright 2022. ThingsBoard +# # +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# # +# http://www.apache.org/licenses/LICENSE-2.0 +# # +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from logging import getLogger + +from thingsboard_gateway.gateway.constants import * + +log = getLogger("service") + + +class DuplicateDetector: + def __init__(self, connectors_storage): + self._connectors_data = {} + self._connectors_storage = connectors_storage + + def persist_latest_values(self): + # TODO: periodically save latest values if persistent storage is enabled + pass + + def filter_data(self, connector_name, new_data): + in_data_filter_enabled = new_data.get(SEND_ON_CHANGE_PARAMETER) + if isinstance(in_data_filter_enabled, bool) and not in_data_filter_enabled: + return new_data + + device_name = new_data[DEVICE_NAME_PARAMETER] + if not in_data_filter_enabled: + connector = self._connectors_storage.get_connector(connector_name) + if not connector or not connector.is_filtering_enable(device_name): + return new_data + + to_send = {ATTRIBUTES_PARAMETER: [], TELEMETRY_PARAMETER: []} + + remaining_attributes_count = 0 + filtered_attributes_count = 0 + for attribute in new_data[ATTRIBUTES_PARAMETER]: + for key, new_value in attribute.items(): + if self._update_latest_attribute_value(connector_name, device_name, key, new_value): + to_send[ATTRIBUTES_PARAMETER].append(attribute) + remaining_attributes_count += 1 + else: + filtered_attributes_count += 1 + + remaining_telemetry_count = 0 + filtered_telemetry_count = 0 + for ts_kv_list in new_data[TELEMETRY_PARAMETER]: + ts_added = False + ts = ts_kv_list.get(TELEMETRY_TIMESTAMP_PARAMETER) + ts_values = {} + for key, new_value in ts_kv_list.get(TELEMETRY_VALUES_PARAMETER, ts_kv_list).items(): + if self._update_latest_telemetry_value(connector_name, device_name, key, new_value): + ts_values[key] = new_value + ts_added = True + remaining_telemetry_count += 1 + else: + filtered_telemetry_count += 1 + if ts_added: + to_send[TELEMETRY_PARAMETER].append( + {TELEMETRY_TIMESTAMP_PARAMETER: ts, TELEMETRY_VALUES_PARAMETER: ts_values} if ts else ts_values) + + if remaining_attributes_count > 0 or remaining_telemetry_count > 0: + log.warning("[%s] '%s' changed attributes %d from %d, changed telemetry %d from %d", + connector_name, device_name, + remaining_attributes_count, remaining_attributes_count + filtered_attributes_count, + remaining_telemetry_count, remaining_telemetry_count + filtered_telemetry_count) + to_send[DEVICE_NAME_PARAMETER] = device_name + to_send[DEVICE_TYPE_PARAMETER] = new_data[DEVICE_TYPE_PARAMETER] + return to_send + + log.warning("[%s] '%s' device data has not been changed", connector_name, device_name) + return None + + def _update_latest_attribute_value(self, connector_name, device_name, key, value): + return self._update_latest_value(connector_name, device_name, ATTRIBUTES_PARAMETER, key, value) + + def _update_latest_telemetry_value(self, connector_name, device_name, key, value): + return self._update_latest_value(connector_name, device_name, TELEMETRY_PARAMETER, key, value) + + def _update_latest_value(self, connector_name, device_name, data_type, key, value): + if connector_name not in self._connectors_data: + self._connectors_data[connector_name] = {} + + if device_name not in self._connectors_data[connector_name]: + self._connectors_data[connector_name][device_name] = { + ATTRIBUTES_PARAMETER: {}, + TELEMETRY_PARAMETER: {} + } + self._connectors_data[connector_name][device_name][data_type][key] = value + return True + + if self._connectors_data[connector_name][device_name][data_type].get(key) != value: + self._connectors_data[connector_name][device_name][data_type][key] = value + return True + return False diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index d2dc036d..a04d2831 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -33,6 +33,7 @@ from yaml import safe_load from thingsboard_gateway.gateway.constant_enums import DeviceActions, Status from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME, CONNECTOR_PARAMETER, \ PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME +from thingsboard_gateway.gateway.duplicate_detector import DuplicateDetector from thingsboard_gateway.gateway.statistics_service import StatisticsService from thingsboard_gateway.gateway.tb_client import TBClient from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage @@ -146,6 +147,7 @@ class TBGatewayService: global log log = logging.getLogger('service') log.info("Gateway starting...") + self.__duplicate_detector = DuplicateDetector(self) self.__updater = TBUpdater() self.__updates_check_period_ms = 300000 self.__updates_check_time = 0 @@ -260,6 +262,9 @@ class TBGatewayService: self._watchers_thread = Thread(target=self._watchers, name='Watchers', daemon=True) self._watchers_thread.start() + def get_connector(self, name): + return self.available_connectors.get(name) + def _watchers(self): try: gateway_statistic_send = 0 @@ -654,8 +659,12 @@ class TBGatewayService: def send_to_storage(self, connector_name, data): try: - self.__converted_data_queue.put((connector_name, data), True, 100) - return Status.SUCCESS + filtered_data = self.__duplicate_detector.filter_data(connector_name, data) + if filtered_data: + self.__converted_data_queue.put((connector_name, filtered_data), True, 100) + return Status.SUCCESS + else: + return Status.NO_NEW_DATA except Exception as e: log.exception("Cannot put converted data!", e) return Status.FAILURE