mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Added saving for processed subscriptions
This commit is contained in:
@@ -376,7 +376,6 @@ class TBDeviceMqttClient:
|
||||
self.__attr_request_number += 1
|
||||
self._attr_request_dict.update({self.__attr_request_number: callback})
|
||||
attr_request_number = self.__attr_request_number
|
||||
log.debug(attr_request_number)
|
||||
return attr_request_number
|
||||
|
||||
def __timeout_check(self):
|
||||
@@ -384,7 +383,7 @@ class TBDeviceMqttClient:
|
||||
try:
|
||||
item = self.__timeout_queue.get()
|
||||
if item is not None:
|
||||
while not True:
|
||||
while True:
|
||||
current_ts_in_millis = int(round(time.time() * 1000))
|
||||
if current_ts_in_millis > item["ts"]:
|
||||
break
|
||||
|
||||
@@ -42,14 +42,29 @@ class TBGatewayMqttClient(TBDeviceMqttClient):
|
||||
self.devices_server_side_rpc_request_handler = None
|
||||
self._client.on_connect = self._on_connect
|
||||
self._client.on_message = self._on_message
|
||||
self._client.on_subscribe = self._on_subscribe
|
||||
self._gw_subscriptions = {}
|
||||
self.gateway = gateway
|
||||
|
||||
def _on_connect(self, client, userdata, flags, rc, *extra_params):
|
||||
super()._on_connect(client, userdata, flags, rc, *extra_params)
|
||||
if rc == 0:
|
||||
self._client.subscribe(GATEWAY_ATTRIBUTES_TOPIC, qos=1)
|
||||
self._client.subscribe(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC + "/+")
|
||||
self._client.subscribe(GATEWAY_RPC_TOPIC + "/+")
|
||||
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_ATTRIBUTES_TOPIC, qos=1)[1])] = GATEWAY_ATTRIBUTES_TOPIC
|
||||
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC + "/+")[1])] = GATEWAY_ATTRIBUTES_RESPONSE_TOPIC
|
||||
self._gw_subscriptions[int(self._client.subscribe(GATEWAY_RPC_TOPIC + "/+")[1])] = GATEWAY_RPC_TOPIC
|
||||
|
||||
def _on_subscribe(self, client, userdata, mid, granted_qos):
|
||||
subscription = self._gw_subscriptions.get(mid)
|
||||
if subscription is not None:
|
||||
if mid == 128:
|
||||
log.error("Service subscription to topic %s - failed.", subscription)
|
||||
del(self._gw_subscriptions[mid])
|
||||
else:
|
||||
log.debug("Service subscription to topic %s - successfully completed.", subscription)
|
||||
del(self._gw_subscriptions[mid])
|
||||
|
||||
def get_subscriptions_in_progress(self):
|
||||
return True if self._gw_subscriptions else False
|
||||
|
||||
def _on_message(self, client, userdata, message):
|
||||
content = TBUtility.decode(message)
|
||||
|
||||
Reference in New Issue
Block a user