From 0c68e77880be3550cf6455f4faae832becb50566 Mon Sep 17 00:00:00 2001 From: samson0v Date: Thu, 12 Aug 2021 11:20:36 +0300 Subject: [PATCH] Fixed duplicate send on_attributes_update callback --- .../gateway/tb_gateway_service.py | 1 + .../tb_client/tb_device_mqtt.py | 3 +++ .../tb_client/tb_gateway_mqtt.py | 20 +++++++++---------- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 08a446ac..828fe54b 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -287,6 +287,7 @@ class TBGatewayService: return self._config_dir def subscribe_to_required_topics(self): + self.tb_client.client.clean_device_sub_dict() self.tb_client.client.gw_set_server_side_rpc_request_handler(self._rpc_request_handler) self.tb_client.client.set_server_side_rpc_request_handler(self._rpc_request_handler) self.tb_client.client.subscribe_to_all_attributes(self._attribute_update_callback) diff --git a/thingsboard_gateway/tb_client/tb_device_mqtt.py b/thingsboard_gateway/tb_client/tb_device_mqtt.py index a5e68e9f..5d227254 100644 --- a/thingsboard_gateway/tb_client/tb_device_mqtt.py +++ b/thingsboard_gateway/tb_client/tb_device_mqtt.py @@ -288,6 +288,9 @@ class TBDeviceMqttClient: self.__device_sub_dict = {} self.__device_sub_dict = dict((k, v) for k, v in self.__device_sub_dict.items() if v) + def clean_device_sub_dict(self): + self.__device_sub_dict = {} + def subscribe_to_all_attributes(self, callback): return self.subscribe_to_attribute("*", callback) diff --git a/thingsboard_gateway/tb_client/tb_gateway_mqtt.py b/thingsboard_gateway/tb_client/tb_gateway_mqtt.py index bb879ff0..d7178036 100644 --- a/thingsboard_gateway/tb_client/tb_gateway_mqtt.py +++ b/thingsboard_gateway/tb_client/tb_gateway_mqtt.py @@ -90,19 +90,19 @@ class TBGatewayMqttClient(TBDeviceMqttClient): with self._lock: # callbacks for everything if self.__sub_dict.get("*|*"): - for callback in self.__sub_dict["*|*"]: - self.__sub_dict["*|*"][callback](content) + for device in self.__sub_dict["*|*"]: + self.__sub_dict["*|*"][device](content) # callbacks for device. in this case callback executes for all attributes in message target = content["device"] + "|*" if self.__sub_dict.get(target): - for callback in self.__sub_dict[target]: - self.__sub_dict[target][callback](content) + for device in self.__sub_dict[target]: + self.__sub_dict[target][device](content) # callback for atr. in this case callback executes for all attributes in message targets = [content["device"] + "|" + attribute for attribute in content["data"]] for target in targets: if self.__sub_dict.get(target): - for sub_id in self.__sub_dict[target]: - self.__sub_dict[target][sub_id](content) + for device in self.__sub_dict[target]: + self.__sub_dict[target][device](content) elif message.topic == GATEWAY_RPC_TOPIC: if self.devices_server_side_rpc_request_handler: self.devices_server_side_rpc_request_handler(self, content) @@ -170,10 +170,10 @@ class TBGatewayMqttClient(TBDeviceMqttClient): self.__max_sub_id += 1 key = device + "|" + attribute if key not in self.__sub_dict: - self.__sub_dict.update({key: {self.__max_sub_id: callback}}) + self.__sub_dict.update({key: {device: callback}}) else: - self.__sub_dict[key].update({self.__max_sub_id: callback}) - log.info("Subscribed to %s with id %i", key, self.__max_sub_id) + self.__sub_dict[key].update({device: callback}) + log.info("Subscribed to %s with id %i for device %s", key, self.__max_sub_id, device) return self.__max_sub_id def gw_unsubscribe(self, subscription_id): @@ -181,7 +181,7 @@ class TBGatewayMqttClient(TBDeviceMqttClient): for attribute in self.__sub_dict: if self.__sub_dict[attribute].get(subscription_id): del self.__sub_dict[attribute][subscription_id] - log.info("Unsubscribed from %s, subscription id %i", attribute, subscription_id) + log.info("Unsubscribed from %s, subscription id %r", attribute, subscription_id) if subscription_id == '*': self.__sub_dict = {}