From cc89caed09c3e487bc033cc6fb7d92a4bf6599ee Mon Sep 17 00:00:00 2001 From: samson0v Date: Tue, 2 May 2023 13:54:21 +0300 Subject: [PATCH] Fixed FTP Converter --- .../connectors/ftp/ftp_connector.py | 20 ++-- .../connectors/ftp/ftp_uplink_converter.py | 19 ++-- .../gateway/duplicate_detector.py | 97 ++++++++++--------- 3 files changed, 69 insertions(+), 67 deletions(-) diff --git a/thingsboard_gateway/connectors/ftp/ftp_connector.py b/thingsboard_gateway/connectors/ftp/ftp_connector.py index 107f2182..d979c974 100644 --- a/thingsboard_gateway/connectors/ftp/ftp_connector.py +++ b/thingsboard_gateway/connectors/ftp/ftp_connector.py @@ -143,7 +143,7 @@ class FTPConnector(Connector, Thread): for file in path.files: current_hash = file.get_current_hash(ftp) if ((file.has_hash() and current_hash != file.hash) - or not file.has_hash()) and file.check_size_limit(ftp): + or not file.has_hash()) and file.check_size_limit(ftp): file.set_new_hash(current_hash) handle_stream = io.BytesIO() @@ -160,14 +160,10 @@ class FTPConnector(Connector, Thread): if isinstance(json_data, list): for obj in json_data: converted_data = converter.convert(convert_conf, obj) - self.__gateway.send_to_storage(self.getName(), converted_data) - self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1 - log.debug("Data to ThingsBoard: %s", converted_data) + self.__send_data(converted_data) else: converted_data = converter.convert(convert_conf, json_data) - self.__gateway.send_to_storage(self.getName(), converted_data) - self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1 - log.debug("Data to ThingsBoard: %s", converted_data) + self.__send_data(converted_data) else: cursor = file.cursor or 0 @@ -182,12 +178,16 @@ class FTPConnector(Connector, Thread): else: converted_data = converter.convert(convert_conf, line) - self.__gateway.send_to_storage(self.getName(), converted_data) - self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1 - log.debug("Data to ThingsBoard: %s", converted_data) + self.__send_data(converted_data) handle_stream.close() + def __send_data(self, converted_data): + if converted_data: + self.__gateway.send_to_storage(self.getName(), converted_data) + self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1 + log.debug("Data to ThingsBoard: %s", converted_data) + def close(self): self.__stopped = True diff --git a/thingsboard_gateway/connectors/ftp/ftp_uplink_converter.py b/thingsboard_gateway/connectors/ftp/ftp_uplink_converter.py index 1013af6c..32b4c80e 100644 --- a/thingsboard_gateway/connectors/ftp/ftp_uplink_converter.py +++ b/thingsboard_gateway/connectors/ftp/ftp_uplink_converter.py @@ -196,12 +196,13 @@ class FTPUplinkConverter(FTPConverter): @StatisticsService.CollectStatistics(start_stat_type='receivedBytesFromDevices', end_stat_type='convertedBytesFromDevice') def convert(self, config, data): - if config['file_ext'] == 'csv' or ( - config['file_ext'] == 'txt' and self.__config['txt_file_data_view'] == 'TABLE'): - return self._convert_table_view_data(config, data) - elif config['file_ext'] == 'txt' and self.__config['txt_file_data_view'] == 'SLICED': - return self._convert_slices_view_data(data) - elif config['file_ext'] == 'json': - return self._convert_json_file(data) - else: - raise Exception('Incorrect file extension or file data view mode') + if data: + if config['file_ext'] == 'csv' or ( + config['file_ext'] == 'txt' and self.__config['txt_file_data_view'] == 'TABLE'): + return self._convert_table_view_data(config, data) + elif config['file_ext'] == 'txt' and self.__config['txt_file_data_view'] == 'SLICED': + return self._convert_slices_view_data(data) + elif config['file_ext'] == 'json': + return self._convert_json_file(data) + else: + raise Exception('Incorrect file extension or file data view mode') diff --git a/thingsboard_gateway/gateway/duplicate_detector.py b/thingsboard_gateway/gateway/duplicate_detector.py index 404d8291..114c45d3 100644 --- a/thingsboard_gateway/gateway/duplicate_detector.py +++ b/thingsboard_gateway/gateway/duplicate_detector.py @@ -39,60 +39,61 @@ class DuplicateDetector: raise NotImplementedError("Persisting feature for latest attributes/telemetry values is not implemented!") def filter_data(self, connector_name, new_data): - in_data_filter_enabled = new_data.get(SEND_ON_CHANGE_PARAMETER) - if not in_data_filter_enabled or not isinstance(in_data_filter_enabled, bool): - return new_data - - ttl = new_data.get(SEND_ON_CHANGE_TTL_PARAMETER) - device_name = new_data[DEVICE_NAME_PARAMETER] - if not in_data_filter_enabled: - connector = self._connectors.get(connector_name) - if not connector or not connector.is_filtering_enable(device_name): + if new_data: + in_data_filter_enabled = new_data.get(SEND_ON_CHANGE_PARAMETER) + if not in_data_filter_enabled or not isinstance(in_data_filter_enabled, bool): return new_data - elif ttl is None: - ttl = connector.get_ttl_for_duplicates(device_name) - now = int(time() * 1000) - to_send = {ATTRIBUTES_PARAMETER: [], TELEMETRY_PARAMETER: []} + ttl = new_data.get(SEND_ON_CHANGE_TTL_PARAMETER) + device_name = new_data[DEVICE_NAME_PARAMETER] + if not in_data_filter_enabled: + connector = self._connectors.get(connector_name) + if not connector or not connector.is_filtering_enable(device_name): + return new_data + elif ttl is None: + ttl = connector.get_ttl_for_duplicates(device_name) - remaining_attributes_count = 0 - filtered_attributes_count = 0 - for attribute in new_data[ATTRIBUTES_PARAMETER]: - for key, new_value in attribute.items(): - if self._update_latest_attribute_value(device_name, key, new_value, now, ttl): - to_send[ATTRIBUTES_PARAMETER].append(attribute) - remaining_attributes_count += 1 - else: - filtered_attributes_count += 1 + now = int(time() * 1000) + to_send = {ATTRIBUTES_PARAMETER: [], TELEMETRY_PARAMETER: []} - remaining_telemetry_count = 0 - filtered_telemetry_count = 0 - for ts_kv_list in new_data[TELEMETRY_PARAMETER]: - ts_added = False - ts = ts_kv_list.get(TELEMETRY_TIMESTAMP_PARAMETER) - ts_values = {} - for key, new_value in ts_kv_list.get(TELEMETRY_VALUES_PARAMETER, ts_kv_list).items(): - if self._update_latest_telemetry_value(device_name, key, new_value, ts if ts else now, ttl): - ts_values[key] = new_value - ts_added = True - remaining_telemetry_count += 1 - else: - filtered_telemetry_count += 1 - if ts_added: - to_send[TELEMETRY_PARAMETER].append( - {TELEMETRY_TIMESTAMP_PARAMETER: ts, TELEMETRY_VALUES_PARAMETER: ts_values} if ts else ts_values) + remaining_attributes_count = 0 + filtered_attributes_count = 0 + for attribute in new_data[ATTRIBUTES_PARAMETER]: + for key, new_value in attribute.items(): + if self._update_latest_attribute_value(device_name, key, new_value, now, ttl): + to_send[ATTRIBUTES_PARAMETER].append(attribute) + remaining_attributes_count += 1 + else: + filtered_attributes_count += 1 - if remaining_attributes_count > 0 or remaining_telemetry_count > 0: - log.debug("[%s] '%s' changed attributes %d from %d, changed telemetry %d from %d", - connector_name, device_name, - remaining_attributes_count, remaining_attributes_count + filtered_attributes_count, - remaining_telemetry_count, remaining_telemetry_count + filtered_telemetry_count) - to_send[DEVICE_NAME_PARAMETER] = device_name - to_send[DEVICE_TYPE_PARAMETER] = new_data[DEVICE_TYPE_PARAMETER] - return to_send + remaining_telemetry_count = 0 + filtered_telemetry_count = 0 + for ts_kv_list in new_data[TELEMETRY_PARAMETER]: + ts_added = False + ts = ts_kv_list.get(TELEMETRY_TIMESTAMP_PARAMETER) + ts_values = {} + for key, new_value in ts_kv_list.get(TELEMETRY_VALUES_PARAMETER, ts_kv_list).items(): + if self._update_latest_telemetry_value(device_name, key, new_value, ts if ts else now, ttl): + ts_values[key] = new_value + ts_added = True + remaining_telemetry_count += 1 + else: + filtered_telemetry_count += 1 + if ts_added: + to_send[TELEMETRY_PARAMETER].append( + {TELEMETRY_TIMESTAMP_PARAMETER: ts, TELEMETRY_VALUES_PARAMETER: ts_values} if ts else ts_values) - log.info("[%s] '%s' device data has not been changed", connector_name, device_name) - return None + if remaining_attributes_count > 0 or remaining_telemetry_count > 0: + log.debug("[%s] '%s' changed attributes %d from %d, changed telemetry %d from %d", + connector_name, device_name, + remaining_attributes_count, remaining_attributes_count + filtered_attributes_count, + remaining_telemetry_count, remaining_telemetry_count + filtered_telemetry_count) + to_send[DEVICE_NAME_PARAMETER] = device_name + to_send[DEVICE_TYPE_PARAMETER] = new_data[DEVICE_TYPE_PARAMETER] + return to_send + + log.info("[%s] '%s' device data has not been changed", connector_name, device_name) + return None @staticmethod def _create_device_latest_data():