1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Fixed duplicate send on_attributes_update callback

This commit is contained in:
samson0v
2021-08-12 11:20:36 +03:00
parent 3030dfd4f6
commit 0c68e77880
3 changed files with 14 additions and 10 deletions

View File

@@ -287,6 +287,7 @@ class TBGatewayService:
return self._config_dir
def subscribe_to_required_topics(self):
self.tb_client.client.clean_device_sub_dict()
self.tb_client.client.gw_set_server_side_rpc_request_handler(self._rpc_request_handler)
self.tb_client.client.set_server_side_rpc_request_handler(self._rpc_request_handler)
self.tb_client.client.subscribe_to_all_attributes(self._attribute_update_callback)

View File

@@ -288,6 +288,9 @@ class TBDeviceMqttClient:
self.__device_sub_dict = {}
self.__device_sub_dict = dict((k, v) for k, v in self.__device_sub_dict.items() if v)
def clean_device_sub_dict(self):
self.__device_sub_dict = {}
def subscribe_to_all_attributes(self, callback):
return self.subscribe_to_attribute("*", callback)

View File

@@ -90,19 +90,19 @@ class TBGatewayMqttClient(TBDeviceMqttClient):
with self._lock:
# callbacks for everything
if self.__sub_dict.get("*|*"):
for callback in self.__sub_dict["*|*"]:
self.__sub_dict["*|*"][callback](content)
for device in self.__sub_dict["*|*"]:
self.__sub_dict["*|*"][device](content)
# callbacks for device. in this case callback executes for all attributes in message
target = content["device"] + "|*"
if self.__sub_dict.get(target):
for callback in self.__sub_dict[target]:
self.__sub_dict[target][callback](content)
for device in self.__sub_dict[target]:
self.__sub_dict[target][device](content)
# callback for atr. in this case callback executes for all attributes in message
targets = [content["device"] + "|" + attribute for attribute in content["data"]]
for target in targets:
if self.__sub_dict.get(target):
for sub_id in self.__sub_dict[target]:
self.__sub_dict[target][sub_id](content)
for device in self.__sub_dict[target]:
self.__sub_dict[target][device](content)
elif message.topic == GATEWAY_RPC_TOPIC:
if self.devices_server_side_rpc_request_handler:
self.devices_server_side_rpc_request_handler(self, content)
@@ -170,10 +170,10 @@ class TBGatewayMqttClient(TBDeviceMqttClient):
self.__max_sub_id += 1
key = device + "|" + attribute
if key not in self.__sub_dict:
self.__sub_dict.update({key: {self.__max_sub_id: callback}})
self.__sub_dict.update({key: {device: callback}})
else:
self.__sub_dict[key].update({self.__max_sub_id: callback})
log.info("Subscribed to %s with id %i", key, self.__max_sub_id)
self.__sub_dict[key].update({device: callback})
log.info("Subscribed to %s with id %i for device %s", key, self.__max_sub_id, device)
return self.__max_sub_id
def gw_unsubscribe(self, subscription_id):
@@ -181,7 +181,7 @@ class TBGatewayMqttClient(TBDeviceMqttClient):
for attribute in self.__sub_dict:
if self.__sub_dict[attribute].get(subscription_id):
del self.__sub_dict[attribute][subscription_id]
log.info("Unsubscribed from %s, subscription id %i", attribute, subscription_id)
log.info("Unsubscribed from %s, subscription id %r", attribute, subscription_id)
if subscription_id == '*':
self.__sub_dict = {}