diff --git a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py index 56af989b..59be5822 100644 --- a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py @@ -49,14 +49,9 @@ class RemoteConfigurator: self.in_process = True decoded_configuration = b64decode(configuration) self.__new_configuration = loads(decoded_configuration) - self.__old_connectors_configs = self.__gateway._connectors_configs + self.__old_connectors_configs = self.__gateway.connectors_configs self.__new_general_configuration_file = self.__new_configuration.get("thingsboard") self.__new_logs_configuration = b64decode(self.__new_general_configuration_file.pop("logs")).decode('UTF-8').replace('}}', '\n') - if self.__old_logs_configuration == self.__new_logs_configuration: - log.debug("Received logs configuration is the same.") - else: - log.debug("Received logs configuration is not the same. Updating loggers...") - self.__update_logs_configuration() if self.__old_configuration != decoded_configuration: log.info("Remote configuration received: \n %s", decoded_configuration) result = self.__process_connectors_configuration() @@ -66,6 +61,8 @@ class RemoteConfigurator: return True else: return False + else: + log.info("Remote configuration is the same.") else: log.error("Remote configuration is already in processing") return False @@ -76,19 +73,20 @@ class RemoteConfigurator: def send_current_configuration(self): try: current_configuration = {} - for connector in self.__gateway._connectors_configs: - log.debug(connector) + for connector in self.__gateway.connectors_configs: if current_configuration.get(connector) is None: current_configuration[connector] = [] - for config in self.__gateway._connectors_configs[connector]: + for config in self.__gateway.connectors_configs[connector]: for config_file in config['config']: current_configuration[connector].append({'name': config['name'], 'config': config['config'][config_file]}) current_configuration["thingsboard"] = self.__old_general_configuration_file current_configuration["thingsboard"]["logs"] = b64encode(self.__old_logs_configuration.replace('\n', '}}').encode("UTF-8")) - encoded_current_configuration = b64encode(dumps(current_configuration).encode()) + json_current_configuration = dumps(current_configuration) + encoded_current_configuration = b64encode(json_current_configuration.encode()) self.__old_configuration = encoded_current_configuration self.__gateway.tb_client.client.send_attributes( - {"current_configuration": encoded_current_configuration.decode("UTF-8")}).get() + {"current_configuration": encoded_current_configuration.decode("UTF-8")}) + log.debug('Current configuration has been sent to ThingsBoard: %s', json_current_configuration) except Exception as e: log.exception(e) @@ -96,8 +94,13 @@ class RemoteConfigurator: log.info("Processing remote connectors configuration...") if self.__apply_new_connectors_configuration(): self.__write_new_configuration_files() + self.__apply_storage_configuration() if self.__safe_apply_connection_configuration(): - self.__apply_storage_configuration() + if self.__old_logs_configuration == self.__new_logs_configuration: + log.debug("Received logs configuration is the same.") + else: + log.debug("Received logs configuration is not the same. Updating loggers...") + self.__update_logs_configuration() log.info("Remote configuration has been applied.") with open(self.__gateway._config_dir + "tb_gateway.yaml", "w") as general_configuration_file: safe_dump(self.__new_general_configuration_file, general_configuration_file) @@ -107,23 +110,24 @@ class RemoteConfigurator: self.__old_logs_configuration = self.__new_logs_configuration self.__new_logs_configuration = None self.__new_general_configuration_file = {} + return True else: log.error("A remote general configuration applying has been failed.") self.__old_connectors_configs = {} self.__new_connectors_configs = {} self.__new_logs_configuration = None self.__new_general_configuration_file = {} - return + return False def __prepare_connectors_configuration(self, input_connector_config): try: - self.__gateway._connectors_configs = {} + self.__gateway.connectors_configs = {} for connector in input_connector_config['thingsboard']['connectors']: for input_connector in input_connector_config[connector['type']]: if input_connector['name'] == connector['name']: - if not self.__gateway._connectors_configs.get(connector['type']): - self.__gateway._connectors_configs[connector['type']] = [] - self.__gateway._connectors_configs[connector['type']].append( + if not self.__gateway.connectors_configs.get(connector['type']): + self.__gateway.connectors_configs[connector['type']] = [] + self.__gateway.connectors_configs[connector['type']].append( {"name": connector["name"], "config": {connector['configuration']: input_connector["config"]}}) except Exception as e: log.exception(e) @@ -141,7 +145,7 @@ class RemoteConfigurator: self.__old_connectors_configs = {} return True except Exception as e: - self.__gateway._connectors_configs = self.__old_connectors_configs + self.__gateway.connectors_configs = self.__old_connectors_configs for connector_name in self.__gateway.available_connectors: self.__gateway.available_connectors[connector_name].close() self.__gateway._connect_with_connectors() @@ -150,7 +154,7 @@ class RemoteConfigurator: def __write_new_configuration_files(self): try: - self.__new_connectors_configs = self.__new_connectors_configs if self.__new_connectors_configs else self.__gateway._connectors_configs + self.__new_connectors_configs = self.__new_connectors_configs if self.__new_connectors_configs else self.__gateway.connectors_configs new_connectors_files = [] for connector_type in self.__new_connectors_configs: for connector_config_section in self.__new_connectors_configs[connector_type]: @@ -179,18 +183,8 @@ class RemoteConfigurator: self.__old_tb_client.unsubscribe('*') self.__old_tb_client.stop() self.__old_tb_client.disconnect() - except Exception as e: - log.exception(e) - self.__revert_configuration() - return False - try: self.__gateway.tb_client = TBClient(self.__new_general_configuration_file["thingsboard"]) self.__gateway.tb_client.connect() - except Exception as e: - log.exception(e) - self.__revert_configuration() - return False - try: connection_state = False while time() * 1000 - apply_start < self.__apply_timeout * 1000 and not connection_state: connection_state = self.__gateway.tb_client.is_connected() @@ -201,12 +195,7 @@ class RemoteConfigurator: return False else: self.__old_tb_client.stop() - self.__gateway.tb_client.client.gw_set_server_side_rpc_request_handler( - self.__gateway._rpc_request_handler) - self.__gateway.tb_client.client.set_server_side_rpc_request_handler(self.__gateway._rpc_request_handler) - self.__gateway.tb_client.client.subscribe_to_all_attributes(self.__gateway._attribute_update_callback) - self.__gateway.tb_client.client.gw_subscribe_to_all_attributes( - self.__gateway._attribute_update_callback) + self.__gateway.subscribe_to_required_topics() return True except Exception as e: log.exception(e) @@ -233,10 +222,7 @@ class RemoteConfigurator: self.__gateway.tb_client.stop() self.__gateway.tb_client = TBClient(self.__old_general_configuration_file["thingsboard"]) self.__gateway.tb_client.connect() - self.__gateway.tb_client.client.gw_set_server_side_rpc_request_handler(self.__gateway._rpc_request_handler) - self.__gateway.tb_client.client.set_server_side_rpc_request_handler(self.__gateway._rpc_request_handler) - self.__gateway.tb_client.client.subscribe_to_all_attributes(self.__gateway._attribute_update_callback) - self.__gateway.tb_client.client.gw_subscribe_to_all_attributes(self.__gateway._attribute_update_callback) + self.__gateway.subscribe_to_required_topics() log.debug("%s connection has been restored", str(self.__gateway.tb_client.client._client)) except Exception as e: log.exception("Exception on reverting configuration occurred:") @@ -252,6 +238,8 @@ class RemoteConfigurator: def __update_logs_configuration(self): try: + remote_handler_current_state = self.__gateway.remote_handler.activated + remote_handler_current_level = self.__gateway.remote_handler.current_log_level logs_conf_file_path = self.__gateway._config_dir + 'logs.conf' with open(logs_conf_file_path, 'w') as logs: logs.write(self.__new_logs_configuration+"\r\n") @@ -259,6 +247,12 @@ class RemoteConfigurator: self.__gateway.main_handler = MemoryHandler(-1) self.__gateway.remote_handler = TBLoggerHandler(self.__gateway) self.__gateway.main_handler.setTarget(self.__gateway.remote_handler) + if remote_handler_current_level != 'NOTSET': + self.__gateway.remote_handler.activate(remote_handler_current_level) + if not remote_handler_current_state: + self.__gateway.remote_handler.deactivate() + global log + log = getLogger('service') log.debug("Logs configuration has been updated.") except Exception as e: log.exception(e) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 15d3be3f..e4ccf189 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -55,10 +55,7 @@ class TBGatewayService: self.__connected_devices_file = "connected_devices.json" self.tb_client = TBClient(config["thingsboard"]) self.tb_client.connect() - self.tb_client.client.gw_set_server_side_rpc_request_handler(self._rpc_request_handler) - self.tb_client.client.set_server_side_rpc_request_handler(self._rpc_request_handler) - self.tb_client.client.subscribe_to_all_attributes(self._attribute_update_callback) - self.tb_client.client.gw_subscribe_to_all_attributes(self._attribute_update_callback) + self.subscribe_to_required_topics() global main_handler self.main_handler = main_handler self.remote_handler = TBLoggerHandler(self) @@ -115,20 +112,7 @@ class TBGatewayService: self.__check_shared_attributes() if cur_time - gateway_statistic_send > 60.0 and self.tb_client.is_connected(): - summary_messages = {"eventsProduced": 0, "eventsSent": 0} - telemetry = {} - for connector in self.available_connectors: - if self.available_connectors[connector].is_connected(): - connector_camel_case = connector[0].lower() + connector[1:].replace(' ', '') - telemetry[(connector_camel_case + ' EventsProduced').replace(' ', '')] = \ - self.available_connectors[connector].statistics['MessagesReceived'] - telemetry[(connector_camel_case + ' EventsSent').replace(' ', '')] = \ - self.available_connectors[connector].statistics['MessagesSent'] - self.tb_client.client.send_telemetry(telemetry) - summary_messages['eventsProduced'] += telemetry[ - str(connector_camel_case + ' EventsProduced').replace(' ', '')] - summary_messages['eventsSent'] += telemetry[ - str(connector_camel_case + ' EventsSent').replace(' ', '')] + summary_messages = self.__form_statistics() self.tb_client.client.send_telemetry(summary_messages) gateway_statistic_send = time.time() # self.__check_shared_attributes() @@ -151,6 +135,9 @@ class TBGatewayService: except Exception as e: log.exception(e) + def __stop_gateway(self): + pass + def _attributes_parse(self, content, *args): try: log.debug("Received data: %s", content) @@ -187,6 +174,12 @@ class TBGatewayService: def get_config_path(self): return self._config_dir + def subscribe_to_required_topics(self): + self.tb_client.client.gw_set_server_side_rpc_request_handler(self._rpc_request_handler) + self.tb_client.client.set_server_side_rpc_request_handler(self._rpc_request_handler) + self.tb_client.client.subscribe_to_all_attributes(self._attribute_update_callback) + self.tb_client.client.gw_subscribe_to_all_attributes(self._attribute_update_callback) + def __check_shared_attributes(self): self.tb_client.client.request_attributes(callback=self._attributes_parse) @@ -395,6 +388,23 @@ class TBGatewayService: else: self._attributes_parse(content) + def __form_statistics(self): + summary_messages = {"eventsProduced": 0, "eventsSent": 0} + telemetry = {} + for connector in self.available_connectors: + if self.available_connectors[connector].is_connected(): + connector_camel_case = connector[0].lower() + connector[1:].replace(' ', '') + telemetry[(connector_camel_case + ' EventsProduced').replace(' ', '')] = \ + self.available_connectors[connector].statistics['MessagesReceived'] + telemetry[(connector_camel_case + ' EventsSent').replace(' ', '')] = \ + self.available_connectors[connector].statistics['MessagesSent'] + self.tb_client.client.send_telemetry(telemetry) + summary_messages['eventsProduced'] += telemetry[ + str(connector_camel_case + ' EventsProduced').replace(' ', '')] + summary_messages['eventsSent'] += telemetry[ + str(connector_camel_case + ' EventsSent').replace(' ', '')] + return summary_messages + def add_device(self, device_name, content, wait_for_publish=False): if device_name not in self.__saved_devices: self.__connected_devices[device_name] = content diff --git a/thingsboard_gateway/gateway/tb_logger.py b/thingsboard_gateway/gateway/tb_logger.py index 4a494fe0..0f98d284 100644 --- a/thingsboard_gateway/gateway/tb_logger.py +++ b/thingsboard_gateway/gateway/tb_logger.py @@ -20,11 +20,14 @@ class TBLoggerHandler(logging.Handler): def __init__(self, gateway): self.current_log_level = 'NOTSET' super().__init__(logging.getLevelName(self.current_log_level)) + self.setLevel(logging.getLevelName('DEBUG')) self.__gateway = gateway self.activated = False + self.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - [%(filename)s] - %(module)s - %(lineno)d - %(message)s')) self.loggers = ['service', 'storage', 'extension', + 'converter', 'connector', 'tb_connection' ] @@ -51,9 +54,8 @@ class TBLoggerHandler(logging.Handler): def handle(self, record): if self.activated: - record = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(lineno)d - %(message)s').format(record) + record = self.formatter.format(record) self.__gateway.send_to_storage(self.__gateway.name, {"deviceName": self.__gateway.name, "telemetry": [{'LOGS': record}]}) def deactivate(self): - self.current_log_level = 'NOTSET' self.activated = False