mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Major fix for device reconnection to the ThingsBoard after connection lost and RPC processing
This commit is contained in:
@@ -69,11 +69,12 @@ class TBClient(threading.Thread):
|
||||
self.__paused = False
|
||||
|
||||
def is_connected(self):
|
||||
return self.client.is_connected()
|
||||
return self.__is_connected
|
||||
|
||||
def _on_connect(self, client, userdata, flags, result_code, *extra_params):
|
||||
log.debug('TB client %s connected to ThingsBoard', str(client))
|
||||
self.__is_connected = True
|
||||
if result_code == 0:
|
||||
self.__is_connected = True
|
||||
# pylint: disable=protected-access
|
||||
self.client._on_connect(client, userdata, flags, result_code, *extra_params)
|
||||
|
||||
@@ -82,9 +83,10 @@ class TBClient(threading.Thread):
|
||||
if self.client._client != client:
|
||||
log.info("TB client %s has been disconnected. Current client for connection is: %s", str(client), str(self.client._client))
|
||||
client.disconnect()
|
||||
self.__is_connected = False
|
||||
client.loop_stop()
|
||||
self.client._on_disconnect(client, userdata, result_code)
|
||||
else:
|
||||
self.__is_connected = False
|
||||
self.client._on_disconnect(client, userdata, result_code)
|
||||
|
||||
def stop(self):
|
||||
# self.disconnect()
|
||||
|
||||
@@ -91,6 +91,7 @@ class TBGatewayService:
|
||||
self.tb_client = TBClient(config["thingsboard"])
|
||||
self.tb_client.connect()
|
||||
self.subscribe_to_required_topics()
|
||||
self.__subscribed_to_rpc_topics = True
|
||||
if logging_error is not None:
|
||||
self.tb_client.client.send_telemetry({"ts": time()*1000, "values": {"LOGS": "Logging loading exception, logs.conf is wrong: %s" % (str(logging_error), )}})
|
||||
TBLoggerHandler.set_default_handler()
|
||||
@@ -141,6 +142,13 @@ class TBGatewayService:
|
||||
gateway_statistic_send = 0
|
||||
while not self.stopped:
|
||||
cur_time = time()*1000
|
||||
if not self.tb_client.is_connected() and self.__subscribed_to_rpc_topics:
|
||||
self.__subscribed_to_rpc_topics = False
|
||||
if self.tb_client.is_connected() and not self.__subscribed_to_rpc_topics:
|
||||
for device in self.__saved_devices:
|
||||
self.add_device(device, {"connector": self.__saved_devices[device]["connector"]}, True, device_type=self.__saved_devices[device]["device_type"])
|
||||
self.subscribe_to_required_topics()
|
||||
self.__subscribed_to_rpc_topics = True
|
||||
if self.__sheduled_rpc_calls:
|
||||
for rpc_call_index in range(len(self.__sheduled_rpc_calls)):
|
||||
rpc_call = self.__sheduled_rpc_calls[rpc_call_index]
|
||||
@@ -579,14 +587,14 @@ class TBGatewayService:
|
||||
|
||||
def add_device(self, device_name, content, wait_for_publish=False, device_type=None):
|
||||
if device_name not in self.__saved_devices:
|
||||
self.__connected_devices[device_name] = content
|
||||
self.__saved_devices[device_name] = content
|
||||
device_type = device_type if device_type is not None else 'default'
|
||||
if wait_for_publish:
|
||||
self.tb_client.client.gw_connect_device(device_name, device_type).wait_for_publish()
|
||||
else:
|
||||
self.tb_client.client.gw_connect_device(device_name, device_type)
|
||||
self.__connected_devices[device_name] = {**content, "device_type": device_type}
|
||||
self.__saved_devices[device_name] = {**content, "device_type": device_type}
|
||||
self.__save_persistent_devices()
|
||||
if wait_for_publish:
|
||||
self.tb_client.client.gw_connect_device(device_name, device_type).wait_for_publish()
|
||||
else:
|
||||
self.tb_client.client.gw_connect_device(device_name, device_type)
|
||||
|
||||
def update_device(self, device_name, event, content):
|
||||
if event == 'connector' and self.__connected_devices[device_name].get(event) != content:
|
||||
|
||||
@@ -194,11 +194,6 @@ class TBDeviceMqttClient:
|
||||
self.__is_connected = True
|
||||
log.info("connection SUCCESS")
|
||||
log.debug(client)
|
||||
time.sleep(.05)
|
||||
self._client.subscribe(ATTRIBUTES_TOPIC, qos=1)
|
||||
self._client.subscribe(ATTRIBUTES_TOPIC + "/response/+", 1)
|
||||
self._client.subscribe(RPC_REQUEST_TOPIC + '+')
|
||||
self._client.subscribe(RPC_RESPONSE_TOPIC + '+', qos=1)
|
||||
else:
|
||||
if result_code in result_codes:
|
||||
log.error("connection FAIL with error %s %s", result_code, result_codes[result_code])
|
||||
|
||||
Reference in New Issue
Block a user