diff --git a/thingsboard_gateway/gateway/tb_client.py b/thingsboard_gateway/gateway/tb_client.py index aa19d17a..96b55c48 100644 --- a/thingsboard_gateway/gateway/tb_client.py +++ b/thingsboard_gateway/gateway/tb_client.py @@ -37,6 +37,7 @@ class TBClient(threading.Thread): self.__token = None self.__is_connected = False self.__stopped = False + self.__paused = False if credentials.get("accessToken") is not None: self.__token = str(credentials["accessToken"]) if self.__tls: @@ -53,21 +54,34 @@ class TBClient(threading.Thread): def _on_log(self, *args): log.debug(args) + def pause(self): + self.__paused = True + + def unpause(self): + self.__paused = False + def is_connected(self): return self.client.is_connected() def _on_connect(self, client, userdata, flags, rc, *extra_params): - log.debug('Gateway connected to ThingsBoard') + log.debug('TB client %s connected to ThingsBoard', str(client)) self.client._on_connect(client, userdata, flags, rc, *extra_params) def _on_disconnect(self, client, userdata, rc): - log.info('Gateway disconnected.') + log.info("TB client %s has been disconnected.", str(client)) self.client._on_disconnect(client, userdata, rc) + def stop(self): + self.disconnect() + self.__stopped = True + def disconnect(self): + self.__paused = True self.client.disconnect() def connect(self, min_reconnect_delay=10): + self.__paused = False + self.__stopped = False self.__min_reconnect_delay = min_reconnect_delay def run(self): @@ -75,15 +89,16 @@ class TBClient(threading.Thread): try: while not self.client.is_connected() and not self.__stopped: log.debug("connecting to ThingsBoard") - try: - self.client.connect(tls=self.__tls, - ca_certs=self.__ca_cert, - cert_file=self.__cert, - key_file=self.__private_key, - keepalive=keep_alive, - min_reconnect_delay=self.__min_reconnect_delay) - except ConnectionRefusedError: - pass + if not self.__paused: + try: + self.client.connect(tls=self.__tls, + ca_certs=self.__ca_cert, + cert_file=self.__cert, + key_file=self.__private_key, + keepalive=keep_alive, + min_reconnect_delay=self.__min_reconnect_delay) + except ConnectionRefusedError: + pass time.sleep(1) except Exception as e: log.exception(e) @@ -93,6 +108,8 @@ class TBClient(threading.Thread): try: if not self.__stopped: time.sleep(1) + else: + break except KeyboardInterrupt: self.__stopped = True except Exception as e: diff --git a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py index 49c5d4a9..f088f749 100644 --- a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py @@ -14,17 +14,21 @@ # from threading import Thread from simplejson import dumps, loads, dump -from yaml import safe_load, safe_dump -from copy import deepcopy +from yaml import safe_dump +from time import time, sleep from logging import getLogger from os import remove +from thingsboard_gateway.gateway.tb_client import TBClient log = getLogger("service") -class RemoteConfigurator(): + +class RemoteConfigurator: def __init__(self, gateway, config): self.__gateway = gateway self.__new_configuration = None + self.__apply_timeout = 10 + self.__old_tb_client = None self.__old_connectors_configs = {} self.__new_connectors_configs = {} self.__old_general_configuration_file = config @@ -34,17 +38,35 @@ class RemoteConfigurator(): log.info("Remote configuration received: \n %s", dumps(configuration)) self.__new_configuration = loads(configuration) self.__old_connectors_configs = self.__gateway._connectors_configs + self.__new_general_configuration_file = self.__new_configuration.get("thingsboard") self.__process_general_configuration() self.__process_connectors_configuration() + def send_current_configuration(self): + current_configuration = {} + for connector in self.__gateway._connectors_configs: + log.debug(connector) + if current_configuration.get(connector) is None: + current_configuration[connector] = [] + for config_file in self.__gateway._connectors_configs[connector]: + for config in config_file: + current_configuration[connector].append(config_file[config]) + current_configuration["thingsboard"] = self.__old_general_configuration_file + self.__gateway.tb_client.client.send_attributes(dumps({"configuration": dumps(current_configuration)})) + log.debug(current_configuration) + + def __process_general_configuration(self): # TODO Add remote configuration for the general configuration file - pass + if self.__new_general_configuration_file is not None and self.__old_general_configuration_file != self.__new_general_configuration_file: + log.debug("New general configuration found.") + else: + log.debug("General configuration from server is the same like current gateway general configuration.") def __process_connectors_configuration(self): log.debug("Processing remote connectors configuration...") self.__prepare_connectors_configuration() - if self.__apply_new_connectors_configuration(): + if self.__apply_new_configuration(): self.__write_new_configuration_files() def __prepare_connectors_configuration(self): @@ -64,13 +86,14 @@ class RemoteConfigurator(): except Exception as e: log.exception(e) - def __apply_new_connectors_configuration(self): + def __apply_new_configuration(self): try: self.__gateway._connectors_configs = self.__new_connectors_configs for connector_name in self.__gateway.available_connectors: self.__gateway.available_connectors[connector_name].close() self.__gateway._connect_with_connectors() log.debug("New connectors configuration has been applied") + self.__old_connectors_configs = {} return True except Exception as e: self.__gateway._connectors_configs = self.__old_connectors_configs @@ -84,7 +107,11 @@ class RemoteConfigurator(): try: general_edited = False if self.__new_general_configuration_file and self.__new_general_configuration_file != self.__old_general_configuration_file: - general_edited = True + general_edited = False + if self.__new_general_configuration_file["thingsboard"] != self.__old_general_configuration_file["thingsboard"]: + general_edited = True + if self.__new_general_configuration_file["storage"] != self.__old_general_configuration_file["storage"]: + general_edited = True self.__new_general_configuration_file = self.__new_general_configuration_file if general_edited else self.__old_general_configuration_file self.__new_connectors_configs = self.__new_connectors_configs if self.__new_connectors_configs else self.__new_connectors_configs self.__new_general_configuration_file["connectors"] = [] @@ -104,23 +131,80 @@ class RemoteConfigurator(): } ) with open(self.__gateway._config_dir + connector_file, "w") as config_file: - dump(connector_config, config_file) + dump(connector_config, config_file, sort_keys=True) new_connectors_files.append(connector_file) log.debug("Saving new configuration for \"%s\" connector to file \"%s\"", connector_type, connector_file) for old_connector_type in self.__old_connectors_configs: for old_connector_config_section in self.__old_connectors_configs[old_connector_type]: for old_connector_file in old_connector_config_section: if old_connector_file not in new_connectors_files: - remove(self.__gateway._config_dir+old_connector_file) + remove(self.__gateway._config_dir + old_connector_file) log.debug("Remove old configuration file \"%s\" for \"%s\" connector ", old_connector_file, old_connector_type) if not general_edited: with open(self.__gateway._config_dir+"tb_gateway.yaml", "w") as general_configuration_file: safe_dump(self.__new_general_configuration_file, general_configuration_file) + self.__old_connectors_configs = {} + self.__new_connectors_configs = {} else: - self.safe_apply() + if self.safe_apply(): + log.info("A new configuration has been applied.") + with open(self.__gateway._config_dir+"tb_gateway.yaml", "w") as general_configuration_file: + safe_dump(self.__new_general_configuration_file, general_configuration_file) + self.__old_connectors_configs = {} + self.__new_connectors_configs = {} + self.__old_general_configuration_file = self.__new_general_configuration_file + self.__new_general_configuration_file = {} + else: + log.error("A new configuration applying has been failed.") + self.__old_connectors_configs = {} + self.__new_connectors_configs = {} + self.__new_general_configuration_file = {} except Exception as e: log.exception(e) def safe_apply(self): # TODO Add check for connection to the ThingsBoard and revert configuration on fail in timeout - pass + apply_start = time()*1000 + self.__old_tb_client = self.__gateway.tb_client + try: + self.__old_tb_client.pause() + except Exception as e: + log.exception(e) + self.__revert_configuration() + return False + connection_state = False + try: + tb_client = TBClient(self.__new_general_configuration_file["thingsboard"]) + tb_client.connect() + except Exception as e: + log.exception(e) + self.__revert_configuration() + return False + self.__gateway.tb_client = tb_client + try: + + while time()*1000-apply_start < self.__apply_timeout*1000: + connection_state = self.__gateway.tb_client.is_connected() + sleep(.1) + if not connection_state: + self.__revert_configuration() + log.info("The gateway cannot connect to the ThingsBoard server with a new configuration.") + return False + else: + self.__old_tb_client.stop() + return True + except Exception as e: + log.exception(e) + self.__revert_configuration() + return False + + def __revert_configuration(self): + log.info("Configuration will be restored.") + self.__new_general_configuration_file = self.__old_general_configuration_file + self.__gateway.tb_client.disconnect() + self.__gateway.tb_client.stop() + self.__gateway.tb_client = self.__old_tb_client + self.__gateway.tb_client.connect() + self.__gateway.tb_client.unpause() + + diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 2eaeb1e9..c314141d 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -53,19 +53,20 @@ class TBGatewayService: self.main_handler = logging.handlers.MemoryHandler(1000) self.remote_handler = TBLoggerHandler(self) self.main_handler.setTarget(self.remote_handler) - self.__default_connectors = { + self._default_connectors = { "mqtt": "MqttConnector", "modbus": "ModbusConnector", "opcua": "OpcUaConnector", "ble": "BLEConnector", } - self.__implemented_connectors = {} + self._implemented_connectors = {} self.__event_storage_types = { "memory": MemoryEventStorage, "file": FileEventStorage, } self.__load_connectors(config) self._connect_with_connectors() + self.__remote_configurator = None if config["thingsboard"].get("remoteConfiguration"): try: self.__remote_configurator = RemoteConfigurator(self, config) @@ -129,15 +130,18 @@ class TBGatewayService: log.error(e) def __attributes_parse(self, content, *args): - shared_attributes = content.get("shared") - client_attributes = content.get("client") - if shared_attributes is None and client_attributes is None: - self.__remote_configurator.process_configuration(content.get("configuration")) - elif shared_attributes is not None: - if shared_attributes.get("configuration"): - self.__remote_configurator.process_configuration(shared_attributes.get("configuration")) - elif client_attributes is not None: - log.debug("Client attributes received") + try: + shared_attributes = content.get("shared") + client_attributes = content.get("client") + if self.__remote_configurator is not None: + self.__remote_configurator.send_current_configuration() + if shared_attributes is not None: + if self.__remote_configurator is not None and shared_attributes.get("configuration"): + self.__remote_configurator.process_configuration(shared_attributes.get("configuration")) + elif client_attributes is not None: + log.debug("Client attributes received") + except Exception as e: + log.exception(e) def get_config_path(self): return self._config_dir @@ -154,14 +158,14 @@ class TBGatewayService: if connector.get('class') is not None: try: connector_class = TBUtility.check_and_import(connector['type'], connector['class']) - self.__implemented_connectors[connector['type']] = connector_class + self._implemented_connectors[connector['type']] = connector_class except Exception as e: log.error("Exception when loading the custom connector:") log.exception(e) - elif connector.get("type") is not None and connector["type"] in self.__default_connectors: + elif connector.get("type") is not None and connector["type"] in self._default_connectors: try: - connector_class = TBUtility.check_and_import(connector["type"], self.__default_connectors[connector["type"]], default=True) - self.__implemented_connectors[connector["type"]] = connector_class + connector_class = TBUtility.check_and_import(connector["type"], self._default_connectors[connector["type"]], default=True) + self._implemented_connectors[connector["type"]] = connector_class except Exception as e: log.error("Error on loading default connector:") log.exception(e) @@ -182,8 +186,8 @@ class TBGatewayService: for config_file in connector_config: connector = None try: - connector = self.__implemented_connectors[connector_type](self, connector_config[config_file], - connector_type) + connector = self._implemented_connectors[connector_type](self, connector_config[config_file], + connector_type) self.available_connectors[connector.get_name()] = connector connector.open() except Exception as e: