From 5800c8b40f2909fc3d856bdd7e39812e106b1ae7 Mon Sep 17 00:00:00 2001 From: devaskim Date: Wed, 7 Dec 2022 19:46:19 +0500 Subject: [PATCH] Support of TTL in duplicate filtering. --- tests/gateway/test_duplicate_detector.py | 134 +++++++++++------- thingsboard_gateway/connectors/connector.py | 6 +- .../connectors/mqtt/mqtt_connector.py | 10 +- thingsboard_gateway/gateway/constants.py | 5 +- .../gateway/duplicate_detector.py | 31 ++-- 5 files changed, 121 insertions(+), 65 deletions(-) diff --git a/tests/gateway/test_duplicate_detector.py b/tests/gateway/test_duplicate_detector.py index ea057609..cb5d6b45 100644 --- a/tests/gateway/test_duplicate_detector.py +++ b/tests/gateway/test_duplicate_detector.py @@ -20,15 +20,19 @@ from thingsboard_gateway.gateway.duplicate_detector import DuplicateDetector class Connector: - def __init__(self, enable_filtering): + def __init__(self, enable_filtering, ttl=DEFAULT_SEND_ON_CHANGE_INFINITE_TTL_VALUE): self._enable_filtering = enable_filtering + self._ttl = ttl def is_filtering_enable(self, device_name): return self._enable_filtering + def get_ttl_for_duplicates(self, device_name): + return self._ttl + class TestDuplicateDetector(unittest.TestCase): - + CONNECTOR_NAME = "ConnectorName" TEST_DEVICE_NAME = "Test device" TEST_DEVICE_TYPE = "TimeMachine" @@ -65,8 +69,8 @@ class TestDuplicateDetector(unittest.TestCase): return { DEVICE_NAME_PARAMETER: TestDuplicateDetector.TEST_DEVICE_NAME, DEVICE_TYPE_PARAMETER: TestDuplicateDetector.TEST_DEVICE_TYPE, - ATTRIBUTES_PARAMETER: attributes if attributes else [TestDuplicateDetector._create_attributes()], - TELEMETRY_PARAMETER: telemetry if telemetry else [TestDuplicateDetector._create_telemetry()] + ATTRIBUTES_PARAMETER: attributes if attributes is not None else [TestDuplicateDetector._create_attributes()], + TELEMETRY_PARAMETER: telemetry if telemetry is not None else [TestDuplicateDetector._create_telemetry()] } @staticmethod @@ -74,15 +78,9 @@ class TestDuplicateDetector(unittest.TestCase): return {'testAttribute': 100500} @staticmethod - def _create_telemetry(): - return {'testTelemetry': 12345} - - @staticmethod - def _create_ts_telemetry(): - return { - TELEMETRY_TIMESTAMP_PARAMETER: 1668624816000, - TELEMETRY_VALUES_PARAMETER: TestDuplicateDetector._create_telemetry() - } + def _create_telemetry(ts=None): + value = {'testTelemetry': 12345} + return {TELEMETRY_TIMESTAMP_PARAMETER: ts, TELEMETRY_VALUES_PARAMETER: value} if ts else value def setUp(self): self.connectors = {} @@ -91,17 +89,17 @@ class TestDuplicateDetector(unittest.TestCase): def test_in_data_filter_disable(self): expected_data = self._create_data_packet() expected_data[SEND_ON_CHANGE_PARAMETER] = False - actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data) + actual_data1 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) self._is_data_packets_equal(actual_data1, expected_data) - actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data) + actual_data2 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) self._is_data_packets_equal(actual_data2, expected_data) def test_in_data_filter_enable(self): expected_data = self._create_data_packet() expected_data[SEND_ON_CHANGE_PARAMETER] = True - actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data) + actual_data1 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) self._is_data_packets_equal(actual_data1, expected_data) - actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data) + actual_data2 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) self.assertIsNone(actual_data2) def test_dont_filter_data_from_unknown_connector(self): @@ -112,23 +110,21 @@ class TestDuplicateDetector(unittest.TestCase): self._is_data_packets_equal(actual_data2, expected_data) def test_connector_with_disable_filtering(self): - connector_name = "some_connector" - self.connectors[connector_name] = Connector(False) + self.connectors[self.CONNECTOR_NAME] = Connector(False) expected_data = self._create_data_packet() - actual_data1 = self._duplicate_detector.filter_data(connector_name, expected_data) + actual_data1 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) self._is_data_packets_equal(actual_data1, expected_data) - actual_data2 = self._duplicate_detector.filter_data(connector_name, expected_data) + actual_data2 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) self._is_data_packets_equal(actual_data2, expected_data) def test_connector_with_enable_filtering(self): - connector_name = "some_connector" - self.connectors[connector_name] = Connector(True) + self.connectors[self.CONNECTOR_NAME] = Connector(True) expected_data = self._create_data_packet() - actual_data1 = self._duplicate_detector.filter_data(connector_name, expected_data) + actual_data1 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) self._is_data_packets_equal(actual_data1, expected_data) - actual_data2 = self._duplicate_detector.filter_data(connector_name, expected_data) + actual_data2 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) self.assertIsNone(actual_data2) def test_one_unchanged_and_one_changed_attributes(self): @@ -144,11 +140,11 @@ class TestDuplicateDetector(unittest.TestCase): expected_data2 = self._create_data_packet(expected_attributes2, []) expected_data2[SEND_ON_CHANGE_PARAMETER] = True - actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data1) + actual_data1 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data1) expected_data1.pop(SEND_ON_CHANGE_PARAMETER, None) self.assertEqual(actual_data1, expected_data1) - actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data2) + actual_data2 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data2) actual_attributes = actual_data2.get(ATTRIBUTES_PARAMETER) self.assertNotEqual(len(actual_attributes), len(expected_attributes2)) self.assertTrue(len(actual_attributes) == 1) @@ -167,11 +163,11 @@ class TestDuplicateDetector(unittest.TestCase): expected_data2 = self._create_data_packet([], expected_telemetry2) expected_data2[SEND_ON_CHANGE_PARAMETER] = True - actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data1) + actual_data1 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data1) expected_data1.pop(SEND_ON_CHANGE_PARAMETER, None) self.assertEqual(actual_data1, expected_data1) - actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data2) + actual_data2 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data2) actual_telemetry = actual_data2.get(TELEMETRY_PARAMETER) self.assertNotEqual(len(actual_telemetry), len(expected_telemetry2)) self.assertTrue(len(actual_telemetry) == 1) @@ -199,45 +195,87 @@ class TestDuplicateDetector(unittest.TestCase): expected_data2 = self._create_data_packet([], expected_telemetry2) expected_data2[SEND_ON_CHANGE_PARAMETER] = True - actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data1) + actual_data1 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data1) expected_data1.pop(SEND_ON_CHANGE_PARAMETER, None) self.assertEqual(actual_data1, expected_data1) - actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data2) + actual_data2 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data2) actual_telemetry = actual_data2.get(TELEMETRY_PARAMETER) self.assertNotEqual(len(actual_telemetry), len(expected_telemetry2)) self.assertTrue(len(actual_telemetry) == 1) - self.assertEqual(actual_telemetry[0].get(TELEMETRY_TIMESTAMP_PARAMETER), changed_telemetry_value2.get(TELEMETRY_TIMESTAMP_PARAMETER)) - self._is_dicts_equal(actual_telemetry[0].get(TELEMETRY_VALUES_PARAMETER), changed_telemetry_value2.get(TELEMETRY_VALUES_PARAMETER)) + self.assertEqual(actual_telemetry[0].get(TELEMETRY_TIMESTAMP_PARAMETER), + changed_telemetry_value2.get(TELEMETRY_TIMESTAMP_PARAMETER)) + self._is_dicts_equal(actual_telemetry[0].get(TELEMETRY_VALUES_PARAMETER), + changed_telemetry_value2.get(TELEMETRY_VALUES_PARAMETER)) - def test_device_deletion(self): - connector_name = "some_connector" - self.connectors[connector_name] = Connector(True) + def test_in_connector_ttl(self): + ttl = 60 * 1000 + now = int(time() * 1000) + self.connectors[self.CONNECTOR_NAME] = Connector(True, ttl) - expected_data = self._create_data_packet() - actual_data1 = self._duplicate_detector.filter_data(connector_name, expected_data) - self._is_data_packets_equal(actual_data1, expected_data) - actual_data2 = self._duplicate_detector.filter_data(connector_name, expected_data) + expected_data1 = self._create_data_packet(telemetry=[self._create_telemetry(now)]) + actual_data1 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data1) + self._is_data_packets_equal(actual_data1, expected_data1) + + expected_data2 = self._create_data_packet(telemetry=[self._create_telemetry(now + int(ttl / 2))]) + actual_data2 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data2) self.assertIsNone(actual_data2) - self._duplicate_detector.delete_device(TestDuplicateDetector.TEST_DEVICE_NAME) - actual_data3 = self._duplicate_detector.filter_data(connector_name, expected_data) + def test_out_connector_ttl(self): + ttl = 60 * 1000 + now = int(time() * 1000) + self.connectors[self.CONNECTOR_NAME] = Connector(True, ttl) + + expected_data1 = self._create_data_packet([], telemetry=[self._create_telemetry(now)]) + actual_data1 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data1) + self._is_data_packets_equal(actual_data1, expected_data1) + + expected_data2 = self._create_data_packet([], telemetry=[self._create_telemetry(now + int(ttl * 2))]) + actual_data2 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data2) + self._is_data_packets_equal(actual_data2, expected_data2) + + def test_out_connector_ttl_but_in_converter_ttl(self): + connector_ttl = 60 * 1000 + converter_ttl = 3 * connector_ttl + data_ts_shift = 2 * connector_ttl + now = int(time() * 1000) + self.connectors[self.CONNECTOR_NAME] = Connector(True, connector_ttl) + + expected_data1 = self._create_data_packet([], [self._create_telemetry(now)]) + actual_data1 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data1) + self._is_data_packets_equal(actual_data1, expected_data1) + + expected_data2 = self._create_data_packet([], [self._create_telemetry(now + data_ts_shift)]) + expected_data2[SEND_ON_CHANGE_TTL_PARAMETER] = converter_ttl + actual_data2 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data2) + self.assertIsNone(actual_data2) + + def test_device_deletion(self): + self.connectors[self.CONNECTOR_NAME] = Connector(True) + + expected_data = self._create_data_packet() + actual_data1 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) + self._is_data_packets_equal(actual_data1, expected_data) + actual_data2 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) + self.assertIsNone(actual_data2) + + self._duplicate_detector.delete_device(self.TEST_DEVICE_NAME) + actual_data3 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) self._is_data_packets_equal(actual_data3, expected_data) def test_device_renaming(self): - connector_name = "some_connector" - self.connectors[connector_name] = Connector(True) + self.connectors[self.CONNECTOR_NAME] = Connector(True) expected_data = self._create_data_packet() - actual_data1 = self._duplicate_detector.filter_data(connector_name, expected_data) + actual_data1 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) self._is_data_packets_equal(actual_data1, expected_data) - actual_data2 = self._duplicate_detector.filter_data(connector_name, expected_data) + actual_data2 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) self.assertIsNone(actual_data2) new_device_name = "New device name" - self._duplicate_detector.rename_device(TestDuplicateDetector.TEST_DEVICE_NAME, new_device_name) + self._duplicate_detector.rename_device(self.TEST_DEVICE_NAME, new_device_name) expected_data[DEVICE_NAME_PARAMETER] = new_device_name - actual_data3 = self._duplicate_detector.filter_data(connector_name, expected_data) + actual_data3 = self._duplicate_detector.filter_data(self.CONNECTOR_NAME, expected_data) self.assertIsNone(actual_data3) diff --git a/thingsboard_gateway/connectors/connector.py b/thingsboard_gateway/connectors/connector.py index 9610ce39..1ab1f138 100644 --- a/thingsboard_gateway/connectors/connector.py +++ b/thingsboard_gateway/connectors/connector.py @@ -14,6 +14,7 @@ import logging from abc import ABC, abstractmethod +from thingsboard_gateway.gateway.constants import DEFAULT_SEND_ON_CHANGE_INFINITE_TTL_VALUE, DEFAULT_SEND_ON_CHANGE_VALUE log = logging.getLogger("connector") @@ -45,4 +46,7 @@ class Connector(ABC): pass def is_filtering_enable(self, device_name): - return False + return DEFAULT_SEND_ON_CHANGE_VALUE + + def get_ttl_for_duplicates(self, device_name): + return DEFAULT_SEND_ON_CHANGE_INFINITE_TTL_VALUE diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py index d6429321..bb5f1bc1 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py @@ -22,8 +22,8 @@ from time import sleep, time import simplejson -from thingsboard_gateway.gateway.constants import SEND_ON_CHANGE_PARAMETER, DEFAULT_SEND_DATA_ON_CHANGE_VALUE, \ - ATTRIBUTES_PARAMETER, TELEMETRY_PARAMETER +from thingsboard_gateway.gateway.constants import SEND_ON_CHANGE_PARAMETER, DEFAULT_SEND_ON_CHANGE_VALUE, \ + ATTRIBUTES_PARAMETER, TELEMETRY_PARAMETER, SEND_ON_CHANGE_TTL_PARAMETER, DEFAULT_SEND_ON_CHANGE_INFINITE_TTL_VALUE from thingsboard_gateway.gateway.constant_enums import Status from thingsboard_gateway.connectors.connector import Connector, log from thingsboard_gateway.connectors.mqtt.mqtt_decorators import CustomCollectStatistics @@ -111,7 +111,8 @@ class MqttConnector(Connector, Thread): # Extract main sections from configuration --------------------------------------------------------------------- self.__broker = config.get('broker') - self.__send_data_only_on_change = self.__broker.get(SEND_ON_CHANGE_PARAMETER, DEFAULT_SEND_DATA_ON_CHANGE_VALUE) + self.__send_data_only_on_change = self.__broker.get(SEND_ON_CHANGE_PARAMETER, DEFAULT_SEND_ON_CHANGE_VALUE) + self.__send_data_only_on_change_ttl = self.__broker.get(SEND_ON_CHANGE_TTL_PARAMETER, DEFAULT_SEND_ON_CHANGE_INFINITE_TTL_VALUE) # for sendDataOnlyOnChange param self.__topic_content = {} @@ -225,6 +226,9 @@ class MqttConnector(Connector, Thread): def is_filtering_enable(self, device_name): return self.__send_data_only_on_change + def get_ttl_for_duplicates(self, device_name): + return self.__send_data_only_on_change_ttl + def load_handlers(self, handler_flavor, mandatory_keys, accepted_handlers_list): if handler_flavor not in self.config: self.__log.error("'%s' section missing from configuration", handler_flavor) diff --git a/thingsboard_gateway/gateway/constants.py b/thingsboard_gateway/gateway/constants.py index 2f9dade6..3d3938ef 100644 --- a/thingsboard_gateway/gateway/constants.py +++ b/thingsboard_gateway/gateway/constants.py @@ -43,7 +43,10 @@ TELEMETRY_TIMESTAMP_PARAMETER = "ts" TELEMETRY_VALUES_PARAMETER = "values" SEND_ON_CHANGE_PARAMETER = "sendDataOnlyOnChange" -DEFAULT_SEND_DATA_ON_CHANGE_VALUE = False +SEND_ON_CHANGE_TTL_PARAMETER = "sendDataOnlyOnChangeTts" + +DEFAULT_SEND_ON_CHANGE_VALUE = False +DEFAULT_SEND_ON_CHANGE_INFINITE_TTL_VALUE = 0 # RPC parameter constants diff --git a/thingsboard_gateway/gateway/duplicate_detector.py b/thingsboard_gateway/gateway/duplicate_detector.py index 5ad1c9fe..5d87c69a 100644 --- a/thingsboard_gateway/gateway/duplicate_detector.py +++ b/thingsboard_gateway/gateway/duplicate_detector.py @@ -12,16 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +from time import time from logging import getLogger - from thingsboard_gateway.gateway.constants import SEND_ON_CHANGE_PARAMETER, DEVICE_NAME_PARAMETER, \ ATTRIBUTES_PARAMETER, TELEMETRY_PARAMETER, TELEMETRY_TIMESTAMP_PARAMETER, TELEMETRY_VALUES_PARAMETER, \ - DEVICE_TYPE_PARAMETER + DEVICE_TYPE_PARAMETER, SEND_ON_CHANGE_TTL_PARAMETER, DEFAULT_SEND_ON_CHANGE_INFINITE_TTL_VALUE log = getLogger("service") class DuplicateDetector: + ABSENT_DATA_PAIR_VALUES = [None, 0] + def __init__(self, connectors): self._latest_data = {} self._connectors = connectors @@ -41,19 +43,23 @@ class DuplicateDetector: if isinstance(in_data_filter_enabled, bool) and not in_data_filter_enabled: return new_data + ttl = new_data.get(SEND_ON_CHANGE_TTL_PARAMETER) device_name = new_data[DEVICE_NAME_PARAMETER] if not in_data_filter_enabled: connector = self._connectors.get(connector_name) if not connector or not connector.is_filtering_enable(device_name): return new_data + elif ttl is None: + ttl = connector.get_ttl_for_duplicates(device_name) + now = int(time() * 1000) to_send = {ATTRIBUTES_PARAMETER: [], TELEMETRY_PARAMETER: []} remaining_attributes_count = 0 filtered_attributes_count = 0 for attribute in new_data[ATTRIBUTES_PARAMETER]: for key, new_value in attribute.items(): - if self._update_latest_attribute_value(device_name, key, new_value): + if self._update_latest_attribute_value(device_name, key, new_value, now, ttl): to_send[ATTRIBUTES_PARAMETER].append(attribute) remaining_attributes_count += 1 else: @@ -66,7 +72,7 @@ class DuplicateDetector: ts = ts_kv_list.get(TELEMETRY_TIMESTAMP_PARAMETER) ts_values = {} for key, new_value in ts_kv_list.get(TELEMETRY_VALUES_PARAMETER, ts_kv_list).items(): - if self._update_latest_telemetry_value(device_name, key, new_value): + if self._update_latest_telemetry_value(device_name, key, new_value, ts if ts else now, ttl): ts_values[key] = new_value ts_added = True remaining_telemetry_count += 1 @@ -95,19 +101,20 @@ class DuplicateDetector: TELEMETRY_PARAMETER: {} } - def _update_latest_attribute_value(self, device_name, key, value): - return self._update_latest_value(device_name, ATTRIBUTES_PARAMETER, key, value) + def _update_latest_attribute_value(self, device_name, key, value, ts, ttl): + return self._update_latest_value(device_name, ATTRIBUTES_PARAMETER, key, value, ts, ttl) - def _update_latest_telemetry_value(self, device_name, key, value): - return self._update_latest_value(device_name, TELEMETRY_PARAMETER, key, value) + def _update_latest_telemetry_value(self, device_name, key, value, ts, ttl): + return self._update_latest_value(device_name, TELEMETRY_PARAMETER, key, value, ts, ttl) - def _update_latest_value(self, device_name, data_type, key, value): + def _update_latest_value(self, device_name, data_type, key, value, ts, ttl): if device_name not in self._latest_data: self._latest_data[device_name] = DuplicateDetector._create_device_latest_data() - self._latest_data[device_name][data_type][key] = value + self._latest_data[device_name][data_type][key] = [value, ts] return True - if self._latest_data[device_name][data_type].get(key) != value: - self._latest_data[device_name][data_type][key] = value + data_pair = self._latest_data[device_name][data_type].get(key, self.ABSENT_DATA_PAIR_VALUES) + if data_pair[0] != value or (ttl and (ts - data_pair[1]) > ttl): + self._latest_data[device_name][data_type][key] = [value, ts] return True return False