mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Merge pull request #1000 from engix-ltd/duplicate_detector
[Core] Data duplicate detector feature.
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
import unittest
|
||||
from random import randint
|
||||
|
||||
from thingsboard_gateway.gateway.constants import *
|
||||
from thingsboard_gateway.connectors.mqtt.json_mqtt_uplink_converter import JsonMqttUplinkConverter
|
||||
|
||||
|
||||
@@ -56,6 +57,26 @@ class JsonMqttUplinkConverterTests(unittest.TestCase):
|
||||
self.assertDictEqual(single_data, self._convert_to_dict(item.get('telemetry')))
|
||||
self.assertDictEqual(single_data, self._convert_to_dict(item.get('attributes')))
|
||||
|
||||
def test_without_send_on_change_option(self):
|
||||
topic, config, data = self._get_device_1_test_data()
|
||||
converter = JsonMqttUplinkConverter(config)
|
||||
converted_array_data = converter.convert(topic, data)
|
||||
self.assertIsNone(converted_array_data.get(SEND_ON_CHANGE_PARAMETER))
|
||||
|
||||
def test_with_send_on_change_option_disabled(self):
|
||||
topic, config, data = self._get_device_1_test_data()
|
||||
config["converter"][SEND_ON_CHANGE_PARAMETER] = False
|
||||
converter = JsonMqttUplinkConverter(config)
|
||||
converted_array_data = converter.convert(topic, data)
|
||||
self.assertFalse(converted_array_data.get(SEND_ON_CHANGE_PARAMETER))
|
||||
|
||||
def test_with_send_on_change_option_enabled(self):
|
||||
topic, config, data = self._get_device_1_test_data()
|
||||
config["converter"][SEND_ON_CHANGE_PARAMETER] = True
|
||||
converter = JsonMqttUplinkConverter(config)
|
||||
converted_array_data = converter.convert(topic, data)
|
||||
self.assertTrue(converted_array_data.get(SEND_ON_CHANGE_PARAMETER))
|
||||
|
||||
@staticmethod
|
||||
def _convert_to_dict(data_array):
|
||||
data_dict = {}
|
||||
|
||||
245
tests/gateway/test_duplicate_detector.py
Normal file
245
tests/gateway/test_duplicate_detector.py
Normal file
@@ -0,0 +1,245 @@
|
||||
# Copyright 2022. ThingsBoard
|
||||
# #
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# #
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# #
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import unittest
|
||||
from time import time
|
||||
|
||||
from thingsboard_gateway.gateway.constants import *
|
||||
from thingsboard_gateway.gateway.duplicate_detector import DuplicateDetector
|
||||
|
||||
|
||||
class Connector:
|
||||
def __init__(self, enable_filtering):
|
||||
self._enable_filtering = enable_filtering
|
||||
|
||||
def is_filtering_enable(self, device_name):
|
||||
return self._enable_filtering
|
||||
|
||||
|
||||
class TestDuplicateDetector(unittest.TestCase):
|
||||
|
||||
TEST_DEVICE_NAME = "Test device"
|
||||
TEST_DEVICE_TYPE = "TimeMachine"
|
||||
|
||||
@staticmethod
|
||||
def _to_dict(dicts_in_array):
|
||||
result_dict = {}
|
||||
for attribute in dicts_in_array:
|
||||
for key, value in attribute.items():
|
||||
result_dict[key] = value
|
||||
return result_dict
|
||||
|
||||
def _is_dicts_equal(self, actual, expected):
|
||||
self.assertEqual(len(actual), len(expected))
|
||||
diff = set(actual.items()) ^ set(expected.items())
|
||||
self.assertTrue(len(diff) == 0)
|
||||
|
||||
def _is_data_packets_equal(self, actual, expected):
|
||||
self.assertIsNotNone(actual)
|
||||
self.assertEqual(actual.get(DEVICE_NAME_PARAMETER), expected.get(DEVICE_NAME_PARAMETER))
|
||||
self.assertEqual(actual.get(DEVICE_TYPE_PARAMETER), expected.get(DEVICE_TYPE_PARAMETER))
|
||||
|
||||
actual_attributes = TestDuplicateDetector._to_dict(actual.get(ATTRIBUTES_PARAMETER))
|
||||
expected_attributes = TestDuplicateDetector._to_dict(expected.get(ATTRIBUTES_PARAMETER))
|
||||
|
||||
self._is_dicts_equal(actual_attributes, expected_attributes)
|
||||
|
||||
actual_telemetry = actual.get(TELEMETRY_PARAMETER)
|
||||
expected_telemetry = expected.get(TELEMETRY_PARAMETER)
|
||||
|
||||
self.assertEqual(len(actual_telemetry), len(expected_telemetry))
|
||||
|
||||
@staticmethod
|
||||
def _create_data_packet(attributes=None, telemetry=None):
|
||||
return {
|
||||
DEVICE_NAME_PARAMETER: TestDuplicateDetector.TEST_DEVICE_NAME,
|
||||
DEVICE_TYPE_PARAMETER: TestDuplicateDetector.TEST_DEVICE_TYPE,
|
||||
ATTRIBUTES_PARAMETER: attributes if attributes else [TestDuplicateDetector._create_attributes()],
|
||||
TELEMETRY_PARAMETER: telemetry if telemetry else [TestDuplicateDetector._create_telemetry()]
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _create_attributes():
|
||||
return {'testAttribute': 100500}
|
||||
|
||||
@staticmethod
|
||||
def _create_telemetry():
|
||||
return {'testTelemetry': 12345}
|
||||
|
||||
@staticmethod
|
||||
def _create_ts_telemetry():
|
||||
return {
|
||||
TELEMETRY_TIMESTAMP_PARAMETER: 1668624816000,
|
||||
TELEMETRY_VALUES_PARAMETER: TestDuplicateDetector._create_telemetry()
|
||||
}
|
||||
|
||||
def setUp(self):
|
||||
self.connectors = {}
|
||||
self._duplicate_detector = DuplicateDetector(self.connectors)
|
||||
|
||||
def test_in_data_filter_disable(self):
|
||||
expected_data = self._create_data_packet()
|
||||
expected_data[SEND_ON_CHANGE_PARAMETER] = False
|
||||
actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data)
|
||||
self._is_data_packets_equal(actual_data1, expected_data)
|
||||
actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data)
|
||||
self._is_data_packets_equal(actual_data2, expected_data)
|
||||
|
||||
def test_in_data_filter_enable(self):
|
||||
expected_data = self._create_data_packet()
|
||||
expected_data[SEND_ON_CHANGE_PARAMETER] = True
|
||||
actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data)
|
||||
self._is_data_packets_equal(actual_data1, expected_data)
|
||||
actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data)
|
||||
self.assertIsNone(actual_data2)
|
||||
|
||||
def test_dont_filter_data_from_unknown_connector(self):
|
||||
expected_data = self._create_data_packet()
|
||||
actual_data1 = self._duplicate_detector.filter_data("unknown_connector", expected_data)
|
||||
self._is_data_packets_equal(actual_data1, expected_data)
|
||||
actual_data2 = self._duplicate_detector.filter_data("unknown_connector", expected_data)
|
||||
self._is_data_packets_equal(actual_data2, expected_data)
|
||||
|
||||
def test_connector_with_disable_filtering(self):
|
||||
connector_name = "some_connector"
|
||||
self.connectors[connector_name] = Connector(False)
|
||||
|
||||
expected_data = self._create_data_packet()
|
||||
actual_data1 = self._duplicate_detector.filter_data(connector_name, expected_data)
|
||||
self._is_data_packets_equal(actual_data1, expected_data)
|
||||
actual_data2 = self._duplicate_detector.filter_data(connector_name, expected_data)
|
||||
self._is_data_packets_equal(actual_data2, expected_data)
|
||||
|
||||
def test_connector_with_enable_filtering(self):
|
||||
connector_name = "some_connector"
|
||||
self.connectors[connector_name] = Connector(True)
|
||||
|
||||
expected_data = self._create_data_packet()
|
||||
actual_data1 = self._duplicate_detector.filter_data(connector_name, expected_data)
|
||||
self._is_data_packets_equal(actual_data1, expected_data)
|
||||
actual_data2 = self._duplicate_detector.filter_data(connector_name, expected_data)
|
||||
self.assertIsNone(actual_data2)
|
||||
|
||||
def test_one_unchanged_and_one_changed_attributes(self):
|
||||
unchanged_attribute = {"UnchangedAttr": 1}
|
||||
changed_attr_name = "ChangedAttr"
|
||||
changed_attribute_value1 = {changed_attr_name: 10}
|
||||
changed_attribute_value2 = {changed_attr_name: 20}
|
||||
expected_attributes1 = [unchanged_attribute, changed_attribute_value1]
|
||||
expected_attributes2 = [unchanged_attribute, changed_attribute_value2]
|
||||
|
||||
expected_data1 = self._create_data_packet(expected_attributes1, [])
|
||||
expected_data1[SEND_ON_CHANGE_PARAMETER] = True
|
||||
expected_data2 = self._create_data_packet(expected_attributes2, [])
|
||||
expected_data2[SEND_ON_CHANGE_PARAMETER] = True
|
||||
|
||||
actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data1)
|
||||
expected_data1.pop(SEND_ON_CHANGE_PARAMETER, None)
|
||||
self.assertEqual(actual_data1, expected_data1)
|
||||
|
||||
actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data2)
|
||||
actual_attributes = actual_data2.get(ATTRIBUTES_PARAMETER)
|
||||
self.assertNotEqual(len(actual_attributes), len(expected_attributes2))
|
||||
self.assertTrue(len(actual_attributes) == 1)
|
||||
self._is_dicts_equal(actual_attributes[0], changed_attribute_value2)
|
||||
|
||||
def test_one_unchanged_and_one_changed_telemetry_without_timestamp(self):
|
||||
unchanged_telemetry = {"UnchangedTelemetry": 1}
|
||||
changed_telemetry_name = "ChangedTelemetry"
|
||||
changed_telemetry_value1 = {changed_telemetry_name: 10}
|
||||
changed_telemetry_value2 = {changed_telemetry_name: 20}
|
||||
expected_telemetry1 = [unchanged_telemetry, changed_telemetry_value1]
|
||||
expected_telemetry2 = [unchanged_telemetry, changed_telemetry_value2]
|
||||
|
||||
expected_data1 = self._create_data_packet([], expected_telemetry1)
|
||||
expected_data1[SEND_ON_CHANGE_PARAMETER] = True
|
||||
expected_data2 = self._create_data_packet([], expected_telemetry2)
|
||||
expected_data2[SEND_ON_CHANGE_PARAMETER] = True
|
||||
|
||||
actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data1)
|
||||
expected_data1.pop(SEND_ON_CHANGE_PARAMETER, None)
|
||||
self.assertEqual(actual_data1, expected_data1)
|
||||
|
||||
actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data2)
|
||||
actual_telemetry = actual_data2.get(TELEMETRY_PARAMETER)
|
||||
self.assertNotEqual(len(actual_telemetry), len(expected_telemetry2))
|
||||
self.assertTrue(len(actual_telemetry) == 1)
|
||||
self._is_dicts_equal(actual_telemetry[0], changed_telemetry_value2)
|
||||
|
||||
def test_one_unchanged_and_one_changed_telemetry_with_timestamp(self):
|
||||
unchanged_telemetry = {
|
||||
TELEMETRY_TIMESTAMP_PARAMETER: time(),
|
||||
TELEMETRY_VALUES_PARAMETER: {"UnchangedTelemetry": 1}
|
||||
}
|
||||
changed_telemetry_name = "ChangedTelemetry"
|
||||
changed_telemetry_value1 = {
|
||||
TELEMETRY_TIMESTAMP_PARAMETER: time(),
|
||||
TELEMETRY_VALUES_PARAMETER: {changed_telemetry_name: 10}
|
||||
}
|
||||
changed_telemetry_value2 = {
|
||||
TELEMETRY_TIMESTAMP_PARAMETER: time(),
|
||||
TELEMETRY_VALUES_PARAMETER: {changed_telemetry_name: 20}
|
||||
}
|
||||
expected_telemetry1 = [unchanged_telemetry, changed_telemetry_value1]
|
||||
expected_telemetry2 = [unchanged_telemetry, changed_telemetry_value2]
|
||||
|
||||
expected_data1 = self._create_data_packet([], expected_telemetry1)
|
||||
expected_data1[SEND_ON_CHANGE_PARAMETER] = True
|
||||
expected_data2 = self._create_data_packet([], expected_telemetry2)
|
||||
expected_data2[SEND_ON_CHANGE_PARAMETER] = True
|
||||
|
||||
actual_data1 = self._duplicate_detector.filter_data("some_connector", expected_data1)
|
||||
expected_data1.pop(SEND_ON_CHANGE_PARAMETER, None)
|
||||
self.assertEqual(actual_data1, expected_data1)
|
||||
|
||||
actual_data2 = self._duplicate_detector.filter_data("some_connector", expected_data2)
|
||||
actual_telemetry = actual_data2.get(TELEMETRY_PARAMETER)
|
||||
self.assertNotEqual(len(actual_telemetry), len(expected_telemetry2))
|
||||
self.assertTrue(len(actual_telemetry) == 1)
|
||||
self.assertEqual(actual_telemetry[0].get(TELEMETRY_TIMESTAMP_PARAMETER), changed_telemetry_value2.get(TELEMETRY_TIMESTAMP_PARAMETER))
|
||||
self._is_dicts_equal(actual_telemetry[0].get(TELEMETRY_VALUES_PARAMETER), changed_telemetry_value2.get(TELEMETRY_VALUES_PARAMETER))
|
||||
|
||||
def test_device_deletion(self):
|
||||
connector_name = "some_connector"
|
||||
self.connectors[connector_name] = Connector(True)
|
||||
|
||||
expected_data = self._create_data_packet()
|
||||
actual_data1 = self._duplicate_detector.filter_data(connector_name, expected_data)
|
||||
self._is_data_packets_equal(actual_data1, expected_data)
|
||||
actual_data2 = self._duplicate_detector.filter_data(connector_name, expected_data)
|
||||
self.assertIsNone(actual_data2)
|
||||
|
||||
self._duplicate_detector.delete_device(TestDuplicateDetector.TEST_DEVICE_NAME)
|
||||
actual_data3 = self._duplicate_detector.filter_data(connector_name, expected_data)
|
||||
self._is_data_packets_equal(actual_data3, expected_data)
|
||||
|
||||
def test_device_renaming(self):
|
||||
connector_name = "some_connector"
|
||||
self.connectors[connector_name] = Connector(True)
|
||||
|
||||
expected_data = self._create_data_packet()
|
||||
actual_data1 = self._duplicate_detector.filter_data(connector_name, expected_data)
|
||||
self._is_data_packets_equal(actual_data1, expected_data)
|
||||
actual_data2 = self._duplicate_detector.filter_data(connector_name, expected_data)
|
||||
self.assertIsNone(actual_data2)
|
||||
|
||||
new_device_name = "New device name"
|
||||
self._duplicate_detector.rename_device(TestDuplicateDetector.TEST_DEVICE_NAME, new_device_name)
|
||||
expected_data[DEVICE_NAME_PARAMETER] = new_device_name
|
||||
actual_data3 = self._duplicate_detector.filter_data(connector_name, expected_data)
|
||||
self.assertIsNone(actual_data3)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -7,6 +7,7 @@
|
||||
"version": 5,
|
||||
"maxMessageNumberPerWorker": 10,
|
||||
"maxNumberOfWorkers": 100,
|
||||
"sendDataOnlyOnChange": false,
|
||||
"security": {
|
||||
"type": "basic",
|
||||
"username": "user",
|
||||
|
||||
@@ -43,3 +43,6 @@ class Connector(ABC):
|
||||
@abstractmethod
|
||||
def server_side_rpc_handler(self, content):
|
||||
pass
|
||||
|
||||
def is_filtering_enable(self, device_name):
|
||||
return False
|
||||
|
||||
@@ -13,10 +13,10 @@
|
||||
# limitations under the License.
|
||||
|
||||
from re import search
|
||||
from time import time
|
||||
|
||||
from simplejson import dumps
|
||||
|
||||
from thingsboard_gateway.gateway.constants import SEND_ON_CHANGE_PARAMETER
|
||||
from thingsboard_gateway.connectors.mqtt.mqtt_uplink_converter import MqttUplinkConverter, log
|
||||
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
|
||||
from thingsboard_gateway.gateway.statistics_service import StatisticsService
|
||||
@@ -25,6 +25,7 @@ from thingsboard_gateway.gateway.statistics_service import StatisticsService
|
||||
class JsonMqttUplinkConverter(MqttUplinkConverter):
|
||||
def __init__(self, config):
|
||||
self.__config = config.get('converter')
|
||||
self.__send_data_on_change = self.__config.get(SEND_ON_CHANGE_PARAMETER)
|
||||
|
||||
@property
|
||||
def config(self):
|
||||
@@ -51,6 +52,9 @@ class JsonMqttUplinkConverter(MqttUplinkConverter):
|
||||
"telemetry": []
|
||||
}
|
||||
|
||||
if isinstance(self.__send_data_on_change, bool):
|
||||
dict_result[SEND_ON_CHANGE_PARAMETER] = self.__send_data_on_change
|
||||
|
||||
try:
|
||||
for datatype in datatypes:
|
||||
timestamp = data.get("ts", data.get("timestamp")) if datatype == 'timeseries' else None
|
||||
|
||||
@@ -22,6 +22,9 @@ from time import sleep, time
|
||||
|
||||
import simplejson
|
||||
|
||||
from thingsboard_gateway.gateway.constants import SEND_ON_CHANGE_PARAMETER, DEFAULT_SEND_DATA_ON_CHANGE_VALUE, \
|
||||
ATTRIBUTES_PARAMETER, TELEMETRY_PARAMETER
|
||||
from thingsboard_gateway.gateway.constant_enums import Status
|
||||
from thingsboard_gateway.connectors.connector import Connector, log
|
||||
from thingsboard_gateway.connectors.mqtt.mqtt_decorators import CustomCollectStatistics
|
||||
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
|
||||
@@ -108,6 +111,7 @@ class MqttConnector(Connector, Thread):
|
||||
|
||||
# Extract main sections from configuration ---------------------------------------------------------------------
|
||||
self.__broker = config.get('broker')
|
||||
self.__send_data_only_on_change = self.__broker.get(SEND_ON_CHANGE_PARAMETER, DEFAULT_SEND_DATA_ON_CHANGE_VALUE)
|
||||
|
||||
# for sendDataOnlyOnChange param
|
||||
self.__topic_content = {}
|
||||
@@ -218,6 +222,9 @@ class MqttConnector(Connector, Thread):
|
||||
self._on_message_thread = Thread(name='On Message', target=self._process_on_message, daemon=True)
|
||||
self._on_message_thread.start()
|
||||
|
||||
def is_filtering_enable(self, device_name):
|
||||
return self.__send_data_only_on_change
|
||||
|
||||
def load_handlers(self, handler_flavor, mandatory_keys, accepted_handlers_list):
|
||||
if handler_flavor not in self.config:
|
||||
self.__log.error("'%s' section missing from configuration", handler_flavor)
|
||||
@@ -436,9 +443,9 @@ class MqttConnector(Connector, Thread):
|
||||
return False
|
||||
|
||||
def _save_converted_msg(self, topic, data):
|
||||
self.__gateway.send_to_storage(self.name, data)
|
||||
self.statistics['MessagesSent'] += 1
|
||||
self.__log.debug("Successfully converted message from topic %s", topic)
|
||||
if self.__gateway.send_to_storage(self.name, data) == Status.SUCCESS:
|
||||
self.statistics['MessagesSent'] += 1
|
||||
self.__log.debug("Successfully converted message from topic %s", topic)
|
||||
|
||||
def __threads_manager(self):
|
||||
if len(self.__workers_thread_pool) == 0:
|
||||
@@ -847,7 +854,8 @@ class MqttConnector(Connector, Thread):
|
||||
convert_function, config, incoming_data = self.__msg_queue.get(True, 100)
|
||||
converted_data = convert_function(config, incoming_data)
|
||||
log.debug(converted_data)
|
||||
if converted_data and (converted_data.get('attributes') or converted_data.get('telemetry')):
|
||||
if converted_data and (converted_data.get(ATTRIBUTES_PARAMETER) or
|
||||
converted_data.get(TELEMETRY_PARAMETER)):
|
||||
self.__send_result(config, converted_data)
|
||||
self.in_progress = False
|
||||
else:
|
||||
|
||||
@@ -34,4 +34,5 @@ class DownlinkMessageType(Enum):
|
||||
class Status(Enum):
|
||||
FAILURE = 1,
|
||||
NOT_FOUND = 2,
|
||||
SUCCESS = 3
|
||||
SUCCESS = 3,
|
||||
NO_NEW_DATA = 4
|
||||
|
||||
@@ -39,6 +39,11 @@ DEVICE_TYPE_PARAMETER = "deviceType"
|
||||
|
||||
ATTRIBUTES_PARAMETER = "attributes"
|
||||
TELEMETRY_PARAMETER = "telemetry"
|
||||
TELEMETRY_TIMESTAMP_PARAMETER = "ts"
|
||||
TELEMETRY_VALUES_PARAMETER = "values"
|
||||
|
||||
SEND_ON_CHANGE_PARAMETER = "sendDataOnlyOnChange"
|
||||
DEFAULT_SEND_DATA_ON_CHANGE_VALUE = False
|
||||
|
||||
# RPC parameter constants
|
||||
|
||||
|
||||
113
thingsboard_gateway/gateway/duplicate_detector.py
Normal file
113
thingsboard_gateway/gateway/duplicate_detector.py
Normal file
@@ -0,0 +1,113 @@
|
||||
# Copyright 2022. ThingsBoard
|
||||
# #
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# #
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# #
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from logging import getLogger
|
||||
|
||||
from thingsboard_gateway.gateway.constants import SEND_ON_CHANGE_PARAMETER, DEVICE_NAME_PARAMETER, \
|
||||
ATTRIBUTES_PARAMETER, TELEMETRY_PARAMETER, TELEMETRY_TIMESTAMP_PARAMETER, TELEMETRY_VALUES_PARAMETER, \
|
||||
DEVICE_TYPE_PARAMETER
|
||||
|
||||
log = getLogger("service")
|
||||
|
||||
|
||||
class DuplicateDetector:
|
||||
def __init__(self, connectors):
|
||||
self._latest_data = {}
|
||||
self._connectors = connectors
|
||||
|
||||
def rename_device(self, old_device_name, new_device_name):
|
||||
self._latest_data[new_device_name] = self._latest_data.pop(old_device_name,
|
||||
DuplicateDetector._create_device_latest_data())
|
||||
|
||||
def delete_device(self, device_name):
|
||||
del self._latest_data[device_name]
|
||||
|
||||
def persist_latest_values(self):
|
||||
raise NotImplementedError("Persisting feature for latest attributes/telemetry values is not implemented!")
|
||||
|
||||
def filter_data(self, connector_name, new_data):
|
||||
in_data_filter_enabled = new_data.get(SEND_ON_CHANGE_PARAMETER)
|
||||
if isinstance(in_data_filter_enabled, bool) and not in_data_filter_enabled:
|
||||
return new_data
|
||||
|
||||
device_name = new_data[DEVICE_NAME_PARAMETER]
|
||||
if not in_data_filter_enabled:
|
||||
connector = self._connectors.get(connector_name)
|
||||
if not connector or not connector.is_filtering_enable(device_name):
|
||||
return new_data
|
||||
|
||||
to_send = {ATTRIBUTES_PARAMETER: [], TELEMETRY_PARAMETER: []}
|
||||
|
||||
remaining_attributes_count = 0
|
||||
filtered_attributes_count = 0
|
||||
for attribute in new_data[ATTRIBUTES_PARAMETER]:
|
||||
for key, new_value in attribute.items():
|
||||
if self._update_latest_attribute_value(device_name, key, new_value):
|
||||
to_send[ATTRIBUTES_PARAMETER].append(attribute)
|
||||
remaining_attributes_count += 1
|
||||
else:
|
||||
filtered_attributes_count += 1
|
||||
|
||||
remaining_telemetry_count = 0
|
||||
filtered_telemetry_count = 0
|
||||
for ts_kv_list in new_data[TELEMETRY_PARAMETER]:
|
||||
ts_added = False
|
||||
ts = ts_kv_list.get(TELEMETRY_TIMESTAMP_PARAMETER)
|
||||
ts_values = {}
|
||||
for key, new_value in ts_kv_list.get(TELEMETRY_VALUES_PARAMETER, ts_kv_list).items():
|
||||
if self._update_latest_telemetry_value(device_name, key, new_value):
|
||||
ts_values[key] = new_value
|
||||
ts_added = True
|
||||
remaining_telemetry_count += 1
|
||||
else:
|
||||
filtered_telemetry_count += 1
|
||||
if ts_added:
|
||||
to_send[TELEMETRY_PARAMETER].append(
|
||||
{TELEMETRY_TIMESTAMP_PARAMETER: ts, TELEMETRY_VALUES_PARAMETER: ts_values} if ts else ts_values)
|
||||
|
||||
if remaining_attributes_count > 0 or remaining_telemetry_count > 0:
|
||||
log.debug("[%s] '%s' changed attributes %d from %d, changed telemetry %d from %d",
|
||||
connector_name, device_name,
|
||||
remaining_attributes_count, remaining_attributes_count + filtered_attributes_count,
|
||||
remaining_telemetry_count, remaining_telemetry_count + filtered_telemetry_count)
|
||||
to_send[DEVICE_NAME_PARAMETER] = device_name
|
||||
to_send[DEVICE_TYPE_PARAMETER] = new_data[DEVICE_TYPE_PARAMETER]
|
||||
return to_send
|
||||
|
||||
log.info("[%s] '%s' device data has not been changed", connector_name, device_name)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _create_device_latest_data():
|
||||
return {
|
||||
ATTRIBUTES_PARAMETER: {},
|
||||
TELEMETRY_PARAMETER: {}
|
||||
}
|
||||
|
||||
def _update_latest_attribute_value(self, device_name, key, value):
|
||||
return self._update_latest_value(device_name, ATTRIBUTES_PARAMETER, key, value)
|
||||
|
||||
def _update_latest_telemetry_value(self, device_name, key, value):
|
||||
return self._update_latest_value(device_name, TELEMETRY_PARAMETER, key, value)
|
||||
|
||||
def _update_latest_value(self, device_name, data_type, key, value):
|
||||
if device_name not in self._latest_data:
|
||||
self._latest_data[device_name] = DuplicateDetector._create_device_latest_data()
|
||||
self._latest_data[device_name][data_type][key] = value
|
||||
return True
|
||||
|
||||
if self._latest_data[device_name][data_type].get(key) != value:
|
||||
self._latest_data[device_name][data_type][key] = value
|
||||
return True
|
||||
return False
|
||||
@@ -33,6 +33,7 @@ from yaml import safe_load
|
||||
from thingsboard_gateway.gateway.constant_enums import DeviceActions, Status
|
||||
from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME, CONNECTOR_PARAMETER, \
|
||||
PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME
|
||||
from thingsboard_gateway.gateway.duplicate_detector import DuplicateDetector
|
||||
from thingsboard_gateway.gateway.statistics_service import StatisticsService
|
||||
from thingsboard_gateway.gateway.tb_client import TBClient
|
||||
from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage
|
||||
@@ -261,6 +262,8 @@ class TBGatewayService:
|
||||
name="Send data to Thingsboard Thread")
|
||||
self._send_thread.start()
|
||||
|
||||
self.__duplicate_detector = DuplicateDetector(self.available_connectors)
|
||||
|
||||
log.info("Gateway started.")
|
||||
|
||||
self._watchers_thread = Thread(target=self._watchers, name='Watchers', daemon=True)
|
||||
@@ -440,6 +443,7 @@ class TBGatewayService:
|
||||
if deleted_device_name in self.__saved_devices:
|
||||
del self.__saved_devices[deleted_device_name]
|
||||
log.debug("Device %s - was removed from __saved_devices", deleted_device_name)
|
||||
self.__duplicate_detector.delete_device(deleted_device_name)
|
||||
self.__save_persistent_devices()
|
||||
self.__load_persistent_devices()
|
||||
|
||||
@@ -452,6 +456,7 @@ class TBGatewayService:
|
||||
else:
|
||||
device_name_key = new_device_name
|
||||
self.__renamed_devices[device_name_key] = new_device_name
|
||||
self.__duplicate_detector.rename_device(old_device_name, new_device_name)
|
||||
|
||||
self.__save_persistent_devices()
|
||||
self.__load_persistent_devices()
|
||||
@@ -660,8 +665,12 @@ class TBGatewayService:
|
||||
|
||||
def send_to_storage(self, connector_name, data):
|
||||
try:
|
||||
self.__converted_data_queue.put((connector_name, data), True, 100)
|
||||
return Status.SUCCESS
|
||||
filtered_data = self.__duplicate_detector.filter_data(connector_name, data)
|
||||
if filtered_data:
|
||||
self.__converted_data_queue.put((connector_name, filtered_data), True, 100)
|
||||
return Status.SUCCESS
|
||||
else:
|
||||
return Status.NO_NEW_DATA
|
||||
except Exception as e:
|
||||
log.exception("Cannot put converted data!", e)
|
||||
return Status.FAILURE
|
||||
|
||||
Reference in New Issue
Block a user