mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Added check remote configuration after connection/reconnection to ThingsBoard
This commit is contained in:
@@ -53,7 +53,7 @@ class TBClient(threading.Thread):
|
||||
|
||||
def _on_log(self, *args):
|
||||
if "exception" in args[-1]:
|
||||
log.exception(args[-1])
|
||||
log.exception(args)
|
||||
else:
|
||||
log.debug(args)
|
||||
|
||||
@@ -70,10 +70,6 @@ class TBClient(threading.Thread):
|
||||
log.debug('TB client %s connected to ThingsBoard', str(client))
|
||||
self.client._on_connect(client, userdata, flags, rc, *extra_params)
|
||||
|
||||
def _on_subscribe(self, client, userdata, flags, rc, *extra_params):
|
||||
log.debug('TB client %s connected to ThingsBoard', str(client))
|
||||
self.client(client, userdata, flags, rc, *extra_params)
|
||||
|
||||
def _on_disconnect(self, client, userdata, rc):
|
||||
log.info("TB client %s has been disconnected.", str(client))
|
||||
self.client._on_disconnect(client, userdata, rc)
|
||||
|
||||
@@ -63,8 +63,22 @@ class RemoteConfigurator:
|
||||
log.debug("Processing remote connectors configuration...")
|
||||
self.__prepare_connectors_configuration()
|
||||
self.__apply_storage_configuration()
|
||||
if self.__apply_new_configuration():
|
||||
if self.__apply_new_connectors_configuration():
|
||||
self.__write_new_configuration_files()
|
||||
if self.__safe_apply_connection_configuration():
|
||||
log.info("Remote configuration has been applied.")
|
||||
with open(self.__gateway._config_dir+"tb_gateway.yaml", "w") as general_configuration_file:
|
||||
safe_dump(self.__new_general_configuration_file, general_configuration_file)
|
||||
self.__old_connectors_configs = {}
|
||||
self.__new_connectors_configs = {}
|
||||
self.__old_general_configuration_file = self.__new_general_configuration_file
|
||||
self.__new_general_configuration_file = {}
|
||||
else:
|
||||
log.error("A remote general configuration applying has been failed.")
|
||||
self.__old_connectors_configs = {}
|
||||
self.__new_connectors_configs = {}
|
||||
self.__new_general_configuration_file = {}
|
||||
return
|
||||
|
||||
def __prepare_connectors_configuration(self):
|
||||
self.__new_connectors_configs = {}
|
||||
@@ -82,7 +96,7 @@ class RemoteConfigurator:
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def __apply_new_configuration(self):
|
||||
def __apply_new_connectors_configuration(self):
|
||||
try:
|
||||
self.__gateway._connectors_configs = self.__new_connectors_configs
|
||||
for connector_name in self.__gateway.available_connectors:
|
||||
@@ -130,20 +144,6 @@ class RemoteConfigurator:
|
||||
if old_connector_file not in new_connectors_files:
|
||||
remove(self.__gateway._config_dir + old_connector_file)
|
||||
log.debug("Remove old configuration file \"%s\" for \"%s\" connector ", old_connector_file, old_connector_type)
|
||||
if self.__safe_apply_connection_configuration():
|
||||
log.info("Remote configuration has been applied.")
|
||||
with open(self.__gateway._config_dir+"tb_gateway.yaml", "w") as general_configuration_file:
|
||||
safe_dump(self.__new_general_configuration_file, general_configuration_file)
|
||||
self.__old_connectors_configs = {}
|
||||
self.__new_connectors_configs = {}
|
||||
self.__old_general_configuration_file = self.__new_general_configuration_file
|
||||
self.__new_general_configuration_file = {}
|
||||
else:
|
||||
log.error("A remote general configuration applying has been failed.")
|
||||
self.__old_connectors_configs = {}
|
||||
self.__new_connectors_configs = {}
|
||||
self.__new_general_configuration_file = {}
|
||||
return
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
|
||||
@@ -76,19 +76,8 @@ class TBGatewayService:
|
||||
self.__remote_configurator = None
|
||||
if config["thingsboard"].get("remoteConfiguration"):
|
||||
try:
|
||||
self.__request_config_after_connect = False
|
||||
self.__remote_configurator = RemoteConfigurator(self, config)
|
||||
|
||||
def check_attribute_after_connection(gateway:TBGatewayService):
|
||||
while not gateway.tb_client.is_connected():
|
||||
time.sleep(1)
|
||||
log.debug("Request for shared attribute has been sent.")
|
||||
info = gateway.tb_client.client.request_attributes(callback=gateway._attributes_parse)
|
||||
|
||||
self.__checking_thread = Thread(target=check_attribute_after_connection,
|
||||
args=(self,),
|
||||
name="Check shared attributes on connect",
|
||||
daemon=True).start()
|
||||
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
if self.__remote_configurator is not None:
|
||||
@@ -115,6 +104,11 @@ class TBGatewayService:
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
break
|
||||
if self.__remote_configurator is not None and not self.__request_config_after_connect and \
|
||||
self.tb_client.is_connected() and not self.tb_client.client.get_subscriptions_in_progress():
|
||||
self.__check_shared_attributes()
|
||||
self.__request_config_after_connect = True
|
||||
|
||||
|
||||
if cur_time - gateway_statistic_send > 60.0 and self.tb_client.is_connected():
|
||||
summary_messages = {"eventsProduced": 0, "eventsSent": 0}
|
||||
@@ -149,7 +143,6 @@ class TBGatewayService:
|
||||
try:
|
||||
self.available_connectors[current_connector].close()
|
||||
log.debug("Connector %s closed connection.", current_connector)
|
||||
log.debug(current_connector)
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
|
||||
@@ -185,7 +178,7 @@ class TBGatewayService:
|
||||
return self._config_dir
|
||||
|
||||
def __check_shared_attributes(self):
|
||||
self.tb_client.client.request_attributes(callback=self._attributes_parse).wait_for_publish()
|
||||
self.tb_client.client.request_attributes(callback=self._attributes_parse)
|
||||
|
||||
def _load_connectors(self, config):
|
||||
self._connectors_configs = {}
|
||||
@@ -337,6 +330,7 @@ class TBGatewayService:
|
||||
else:
|
||||
time.sleep(.01)
|
||||
else:
|
||||
self.__request_config_after_connect = False
|
||||
time.sleep(.1)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
Reference in New Issue
Block a user