diff --git a/thingsboard_gateway/connectors/connector.py b/thingsboard_gateway/connectors/connector.py index b2016eb2..65148a99 100644 --- a/thingsboard_gateway/connectors/connector.py +++ b/thingsboard_gateway/connectors/connector.py @@ -49,6 +49,10 @@ class Connector(ABC): def server_side_rpc_handler(self, content): pass + @abstractmethod + def get_converters(self): + pass + def is_filtering_enable(self, device_name): return DEFAULT_SEND_ON_CHANGE_VALUE diff --git a/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py b/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py index 19bcf988..212713b8 100644 --- a/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py +++ b/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py @@ -31,6 +31,10 @@ class JsonMqttUplinkConverter(MqttUplinkConverter): def config(self): return self.__config + @config.setter + def config(self, value): + self.__config = value + @StatisticsService.CollectStatistics(start_stat_type='receivedBytesFromDevices', end_stat_type='convertedBytesFromDevice') def convert(self, topic, data): diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py index 2cbcf6e2..14f683f5 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py @@ -844,6 +844,9 @@ class MqttConnector(Connector, Thread): log.info("RPC canceled or terminated. Unsubscribing from %s", topic) self._client.unsubscribe(topic) + def get_converters(self): + return self.__mapping_sub_topics + class ConverterWorker(Thread): def __init__(self, name, incoming_queue, send_result): super().__init__() diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index d322f6ff..c54c8cc9 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -471,6 +471,7 @@ class TBGatewayService: def __process_attribute_update(self, content): self.__process_remote_logging_update(content.get("RemoteLoggingLevel")) self.__process_remote_configuration(content.get("configuration")) + self.__process_remote_converter_configuration_update(content) def __process_attributes_response(self, shared_attributes, client_attributes): self.__process_remote_logging_update(shared_attributes.get('RemoteLoggingLevel')) @@ -487,6 +488,27 @@ class TBGatewayService: log.info('Remote logging has being updated. Current logging level is: %s ', remote_logging_level) + def __process_remote_converter_configuration_update(self, content: dict): + try: + key = list(content.keys())[0] + connector_name, converter_name = key.split(':') + log.info('Got remote converter configuration update') + if not self.available_connectors.get(connector_name): + raise ValueError + + converters = self.available_connectors[connector_name].get_converters() + for (_, converter_class_name_list) in converters.items(): + converter_class_name = converter_class_name_list[0].__class__.__name__ + converter_obj = converter_class_name_list[0] + if converter_class_name == converter_name: + converter_obj.config = content[key] + log.info('Updated converter configuration for: %s with configuration %s', + converter_name, converter_obj.config) + # TODO: log correct exception + except (ValueError, AttributeError, IndexError) as e: + log.exception(e) + return + def __process_deleted_gateway_devices(self, deleted_device_name: str): log.info("Received deleted gateway device notification: %s", deleted_device_name) if deleted_device_name in list(self.__renamed_devices.values()):