mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Added converter update from shared attr for MQTT Connector
This commit is contained in:
@@ -49,6 +49,10 @@ class Connector(ABC):
|
|||||||
def server_side_rpc_handler(self, content):
|
def server_side_rpc_handler(self, content):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_converters(self):
|
||||||
|
pass
|
||||||
|
|
||||||
def is_filtering_enable(self, device_name):
|
def is_filtering_enable(self, device_name):
|
||||||
return DEFAULT_SEND_ON_CHANGE_VALUE
|
return DEFAULT_SEND_ON_CHANGE_VALUE
|
||||||
|
|
||||||
|
|||||||
@@ -31,6 +31,10 @@ class JsonMqttUplinkConverter(MqttUplinkConverter):
|
|||||||
def config(self):
|
def config(self):
|
||||||
return self.__config
|
return self.__config
|
||||||
|
|
||||||
|
@config.setter
|
||||||
|
def config(self, value):
|
||||||
|
self.__config = value
|
||||||
|
|
||||||
@StatisticsService.CollectStatistics(start_stat_type='receivedBytesFromDevices',
|
@StatisticsService.CollectStatistics(start_stat_type='receivedBytesFromDevices',
|
||||||
end_stat_type='convertedBytesFromDevice')
|
end_stat_type='convertedBytesFromDevice')
|
||||||
def convert(self, topic, data):
|
def convert(self, topic, data):
|
||||||
|
|||||||
@@ -844,6 +844,9 @@ class MqttConnector(Connector, Thread):
|
|||||||
log.info("RPC canceled or terminated. Unsubscribing from %s", topic)
|
log.info("RPC canceled or terminated. Unsubscribing from %s", topic)
|
||||||
self._client.unsubscribe(topic)
|
self._client.unsubscribe(topic)
|
||||||
|
|
||||||
|
def get_converters(self):
|
||||||
|
return self.__mapping_sub_topics
|
||||||
|
|
||||||
class ConverterWorker(Thread):
|
class ConverterWorker(Thread):
|
||||||
def __init__(self, name, incoming_queue, send_result):
|
def __init__(self, name, incoming_queue, send_result):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|||||||
@@ -471,6 +471,7 @@ class TBGatewayService:
|
|||||||
def __process_attribute_update(self, content):
|
def __process_attribute_update(self, content):
|
||||||
self.__process_remote_logging_update(content.get("RemoteLoggingLevel"))
|
self.__process_remote_logging_update(content.get("RemoteLoggingLevel"))
|
||||||
self.__process_remote_configuration(content.get("configuration"))
|
self.__process_remote_configuration(content.get("configuration"))
|
||||||
|
self.__process_remote_converter_configuration_update(content)
|
||||||
|
|
||||||
def __process_attributes_response(self, shared_attributes, client_attributes):
|
def __process_attributes_response(self, shared_attributes, client_attributes):
|
||||||
self.__process_remote_logging_update(shared_attributes.get('RemoteLoggingLevel'))
|
self.__process_remote_logging_update(shared_attributes.get('RemoteLoggingLevel'))
|
||||||
@@ -487,6 +488,27 @@ class TBGatewayService:
|
|||||||
log.info('Remote logging has being updated. Current logging level is: %s ',
|
log.info('Remote logging has being updated. Current logging level is: %s ',
|
||||||
remote_logging_level)
|
remote_logging_level)
|
||||||
|
|
||||||
|
def __process_remote_converter_configuration_update(self, content: dict):
|
||||||
|
try:
|
||||||
|
key = list(content.keys())[0]
|
||||||
|
connector_name, converter_name = key.split(':')
|
||||||
|
log.info('Got remote converter configuration update')
|
||||||
|
if not self.available_connectors.get(connector_name):
|
||||||
|
raise ValueError
|
||||||
|
|
||||||
|
converters = self.available_connectors[connector_name].get_converters()
|
||||||
|
for (_, converter_class_name_list) in converters.items():
|
||||||
|
converter_class_name = converter_class_name_list[0].__class__.__name__
|
||||||
|
converter_obj = converter_class_name_list[0]
|
||||||
|
if converter_class_name == converter_name:
|
||||||
|
converter_obj.config = content[key]
|
||||||
|
log.info('Updated converter configuration for: %s with configuration %s',
|
||||||
|
converter_name, converter_obj.config)
|
||||||
|
# TODO: log correct exception
|
||||||
|
except (ValueError, AttributeError, IndexError) as e:
|
||||||
|
log.exception(e)
|
||||||
|
return
|
||||||
|
|
||||||
def __process_deleted_gateway_devices(self, deleted_device_name: str):
|
def __process_deleted_gateway_devices(self, deleted_device_name: str):
|
||||||
log.info("Received deleted gateway device notification: %s", deleted_device_name)
|
log.info("Received deleted gateway device notification: %s", deleted_device_name)
|
||||||
if deleted_device_name in list(self.__renamed_devices.values()):
|
if deleted_device_name in list(self.__renamed_devices.values()):
|
||||||
|
|||||||
Reference in New Issue
Block a user