1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Support of TTL in duplicate filtering.

This commit is contained in:
devaskim
2022-12-07 19:46:19 +05:00
parent bce1bdf5eb
commit 5800c8b40f
5 changed files with 121 additions and 65 deletions

View File

@@ -43,7 +43,10 @@ TELEMETRY_TIMESTAMP_PARAMETER = "ts"
TELEMETRY_VALUES_PARAMETER = "values"
SEND_ON_CHANGE_PARAMETER = "sendDataOnlyOnChange"
DEFAULT_SEND_DATA_ON_CHANGE_VALUE = False
SEND_ON_CHANGE_TTL_PARAMETER = "sendDataOnlyOnChangeTts"
DEFAULT_SEND_ON_CHANGE_VALUE = False
DEFAULT_SEND_ON_CHANGE_INFINITE_TTL_VALUE = 0
# RPC parameter constants

View File

@@ -12,16 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from time import time
from logging import getLogger
from thingsboard_gateway.gateway.constants import SEND_ON_CHANGE_PARAMETER, DEVICE_NAME_PARAMETER, \
ATTRIBUTES_PARAMETER, TELEMETRY_PARAMETER, TELEMETRY_TIMESTAMP_PARAMETER, TELEMETRY_VALUES_PARAMETER, \
DEVICE_TYPE_PARAMETER
DEVICE_TYPE_PARAMETER, SEND_ON_CHANGE_TTL_PARAMETER, DEFAULT_SEND_ON_CHANGE_INFINITE_TTL_VALUE
log = getLogger("service")
class DuplicateDetector:
ABSENT_DATA_PAIR_VALUES = [None, 0]
def __init__(self, connectors):
self._latest_data = {}
self._connectors = connectors
@@ -41,19 +43,23 @@ class DuplicateDetector:
if isinstance(in_data_filter_enabled, bool) and not in_data_filter_enabled:
return new_data
ttl = new_data.get(SEND_ON_CHANGE_TTL_PARAMETER)
device_name = new_data[DEVICE_NAME_PARAMETER]
if not in_data_filter_enabled:
connector = self._connectors.get(connector_name)
if not connector or not connector.is_filtering_enable(device_name):
return new_data
elif ttl is None:
ttl = connector.get_ttl_for_duplicates(device_name)
now = int(time() * 1000)
to_send = {ATTRIBUTES_PARAMETER: [], TELEMETRY_PARAMETER: []}
remaining_attributes_count = 0
filtered_attributes_count = 0
for attribute in new_data[ATTRIBUTES_PARAMETER]:
for key, new_value in attribute.items():
if self._update_latest_attribute_value(device_name, key, new_value):
if self._update_latest_attribute_value(device_name, key, new_value, now, ttl):
to_send[ATTRIBUTES_PARAMETER].append(attribute)
remaining_attributes_count += 1
else:
@@ -66,7 +72,7 @@ class DuplicateDetector:
ts = ts_kv_list.get(TELEMETRY_TIMESTAMP_PARAMETER)
ts_values = {}
for key, new_value in ts_kv_list.get(TELEMETRY_VALUES_PARAMETER, ts_kv_list).items():
if self._update_latest_telemetry_value(device_name, key, new_value):
if self._update_latest_telemetry_value(device_name, key, new_value, ts if ts else now, ttl):
ts_values[key] = new_value
ts_added = True
remaining_telemetry_count += 1
@@ -95,19 +101,20 @@ class DuplicateDetector:
TELEMETRY_PARAMETER: {}
}
def _update_latest_attribute_value(self, device_name, key, value):
return self._update_latest_value(device_name, ATTRIBUTES_PARAMETER, key, value)
def _update_latest_attribute_value(self, device_name, key, value, ts, ttl):
return self._update_latest_value(device_name, ATTRIBUTES_PARAMETER, key, value, ts, ttl)
def _update_latest_telemetry_value(self, device_name, key, value):
return self._update_latest_value(device_name, TELEMETRY_PARAMETER, key, value)
def _update_latest_telemetry_value(self, device_name, key, value, ts, ttl):
return self._update_latest_value(device_name, TELEMETRY_PARAMETER, key, value, ts, ttl)
def _update_latest_value(self, device_name, data_type, key, value):
def _update_latest_value(self, device_name, data_type, key, value, ts, ttl):
if device_name not in self._latest_data:
self._latest_data[device_name] = DuplicateDetector._create_device_latest_data()
self._latest_data[device_name][data_type][key] = value
self._latest_data[device_name][data_type][key] = [value, ts]
return True
if self._latest_data[device_name][data_type].get(key) != value:
self._latest_data[device_name][data_type][key] = value
data_pair = self._latest_data[device_name][data_type].get(key, self.ABSENT_DATA_PAIR_VALUES)
if data_pair[0] != value or (ttl and (ts - data_pair[1]) > ttl):
self._latest_data[device_name][data_type][key] = [value, ts]
return True
return False