From e151ced29b6c707534b142dbfe342351e893b639 Mon Sep 17 00:00:00 2001 From: zbeacon Date: Fri, 10 Jan 2020 14:51:43 +0200 Subject: [PATCH] Added remote configurator for connectors --- .../connectors/mqtt/mqtt_connector.py | 19 ++- thingsboard_gateway/gateway/tb_client.py | 3 +- .../gateway/tb_gateway_remote_configurator.py | 126 ++++++++++++++++++ .../gateway/tb_gateway_service.py | 53 +++++--- .../tb_client/tb_device_mqtt.py | 16 +-- .../tb_client/tb_gateway_mqtt.py | 3 +- thingsboard_gateway/tb_utility/tb_utility.py | 5 + 7 files changed, 183 insertions(+), 42 deletions(-) create mode 100644 thingsboard_gateway/gateway/tb_gateway_remote_configurator.py diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py index 25d71859..33bd6295 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py @@ -85,7 +85,7 @@ class MqttConnector(Connector, Thread): def run(self): try: - while not self._connected: + while not self._connected and not self.__stopped: try: self._client.connect(self.__broker['host'], self.__broker.get('port', 1883)) @@ -93,15 +93,15 @@ class MqttConnector(Connector, Thread): if not self._connected: time.sleep(1) except Exception as e: - self.__log.error(e) + self.__log.exception(e) time.sleep(10) except Exception as e: - self.__log.error(e) + self.__log.exception(e) try: self.close() except Exception as e: - self.__log.debug(e) + self.__log.exception(e) while True: if self.__stopped: break @@ -109,8 +109,11 @@ class MqttConnector(Connector, Thread): time.sleep(1) def close(self): + try: + self._client.disconnect() + except: + pass self._client.loop_stop() - self._client.disconnect() self.__stopped = True self.__log.info('%s has been stopped.', self.get_name()) @@ -209,7 +212,7 @@ class MqttConnector(Connector, Thread): def _on_message(self, client, userdata, message): self.statistics['MessagesReceived'] += 1 - content = self._decode(message) + content = TBUtility.decode(message) regex_topic = [regex for regex in self.__sub_topics if fullmatch(regex, message.topic)] if regex_topic: try: @@ -361,7 +364,3 @@ class MqttConnector(Connector, Thread): def rpc_cancel_processing(self, topic): self._client.unsubscribe(topic) - @staticmethod - def _decode(message): - content = loads(message.payload.decode("utf-8")) - return content diff --git a/thingsboard_gateway/gateway/tb_client.py b/thingsboard_gateway/gateway/tb_client.py index 395957a1..aa19d17a 100644 --- a/thingsboard_gateway/gateway/tb_client.py +++ b/thingsboard_gateway/gateway/tb_client.py @@ -52,7 +52,6 @@ class TBClient(threading.Thread): def _on_log(self, *args): log.debug(args) - pass def is_connected(self): return self.client.is_connected() @@ -74,7 +73,7 @@ class TBClient(threading.Thread): def run(self): keep_alive = self.__config.get("keep_alive", 60) try: - while not self.client.is_connected(): + while not self.client.is_connected() and not self.__stopped: log.debug("connecting to ThingsBoard") try: self.client.connect(tls=self.__tls, diff --git a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py new file mode 100644 index 00000000..49c5d4a9 --- /dev/null +++ b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py @@ -0,0 +1,126 @@ +# Copyright 2019. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# from threading import Thread +from simplejson import dumps, loads, dump +from yaml import safe_load, safe_dump +from copy import deepcopy +from logging import getLogger +from os import remove + +log = getLogger("service") + +class RemoteConfigurator(): + def __init__(self, gateway, config): + self.__gateway = gateway + self.__new_configuration = None + self.__old_connectors_configs = {} + self.__new_connectors_configs = {} + self.__old_general_configuration_file = config + self.__new_general_configuration_file = {} + + def process_configuration(self, configuration): + log.info("Remote configuration received: \n %s", dumps(configuration)) + self.__new_configuration = loads(configuration) + self.__old_connectors_configs = self.__gateway._connectors_configs + self.__process_general_configuration() + self.__process_connectors_configuration() + + def __process_general_configuration(self): + # TODO Add remote configuration for the general configuration file + pass + + def __process_connectors_configuration(self): + log.debug("Processing remote connectors configuration...") + self.__prepare_connectors_configuration() + if self.__apply_new_connectors_configuration(): + self.__write_new_configuration_files() + + def __prepare_connectors_configuration(self): + self.__new_connectors_configs = {} + try: + for connector_type in {connector_type for connector_type in self.__new_configuration if "thingsboard" not in connector_type}: + connector_number = 0 + for connector in self.__new_configuration[connector_type]: + log.debug(connector) + log.debug("Processing remote configuration for connector with type \"%s\" and name \"%s\".", connector_type, connector) + if not self.__new_connectors_configs.get(connector_type): + self.__new_connectors_configs[connector_type] = [] + self.__new_connectors_configs[connector_type].append({connector_type.lower().replace(" ", "")+str(connector_number)+".json": connector}) + connector_number += 1 + log.debug("Saved configuration for connectors: %s", ', '.join(con for con in self.__new_connectors_configs)) + + except Exception as e: + log.exception(e) + + def __apply_new_connectors_configuration(self): + try: + self.__gateway._connectors_configs = self.__new_connectors_configs + for connector_name in self.__gateway.available_connectors: + self.__gateway.available_connectors[connector_name].close() + self.__gateway._connect_with_connectors() + log.debug("New connectors configuration has been applied") + return True + except Exception as e: + self.__gateway._connectors_configs = self.__old_connectors_configs + for connector_name in self.__gateway.available_connectors: + self.__gateway.available_connectors[connector_name].close() + self.__gateway._connect_with_connectors() + log.exception(e) + return False + + def __write_new_configuration_files(self): + try: + general_edited = False + if self.__new_general_configuration_file and self.__new_general_configuration_file != self.__old_general_configuration_file: + general_edited = True + self.__new_general_configuration_file = self.__new_general_configuration_file if general_edited else self.__old_general_configuration_file + self.__new_connectors_configs = self.__new_connectors_configs if self.__new_connectors_configs else self.__new_connectors_configs + self.__new_general_configuration_file["connectors"] = [] + new_connectors_files = [] + for connector_type in self.__new_connectors_configs: + for connector_config_section in self.__new_connectors_configs[connector_type]: + for connector_file in connector_config_section: + connector_config = connector_config_section[connector_file] + connector_name = connector_config["broker"]["name"] if connector_config.get("broker") else \ + connector_config["server"]["name"] if connector_config.get("server") else \ + connector_config["name"] if connector_config.get("name") else None + self.__new_general_configuration_file["connectors"].append( + { + "name": connector_name, + "type": connector_type, + "configuration": connector_file + } + ) + with open(self.__gateway._config_dir + connector_file, "w") as config_file: + dump(connector_config, config_file) + new_connectors_files.append(connector_file) + log.debug("Saving new configuration for \"%s\" connector to file \"%s\"", connector_type, connector_file) + for old_connector_type in self.__old_connectors_configs: + for old_connector_config_section in self.__old_connectors_configs[old_connector_type]: + for old_connector_file in old_connector_config_section: + if old_connector_file not in new_connectors_files: + remove(self.__gateway._config_dir+old_connector_file) + log.debug("Remove old configuration file \"%s\" for \"%s\" connector ", old_connector_file, old_connector_type) + if not general_edited: + with open(self.__gateway._config_dir+"tb_gateway.yaml", "w") as general_configuration_file: + safe_dump(self.__new_general_configuration_file, general_configuration_file) + else: + self.safe_apply() + except Exception as e: + log.exception(e) + + def safe_apply(self): + # TODO Add check for connection to the ThingsBoard and revert configuration on fail in timeout + pass diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index d649063f..2eaeb1e9 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -27,6 +27,7 @@ from thingsboard_gateway.gateway.tb_logger import TBLoggerHandler from thingsboard_gateway.tb_utility.tb_utility import TBUtility from thingsboard_gateway.storage.memory_event_storage import MemoryEventStorage from thingsboard_gateway.storage.file_event_storage import FileEventStorage +from thingsboard_gateway.gateway.tb_gateway_remote_configurator import RemoteConfigurator log = logging.getLogger('service') @@ -37,8 +38,8 @@ class TBGatewayService: config_file = path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.yaml' with open(config_file) as config: config = safe_load(config) - self.__config_dir = path.dirname(path.abspath(config_file)) + '/' - logging.config.fileConfig(self.__config_dir + "logs.conf") + self._config_dir = path.dirname(path.abspath(config_file)) + '/' + logging.config.fileConfig(self._config_dir + "logs.conf") global log log = logging.getLogger('service') self.available_connectors = {} @@ -64,7 +65,15 @@ class TBGatewayService: "file": FileEventStorage, } self.__load_connectors(config) - self.__connect_with_connectors() + self._connect_with_connectors() + if config["thingsboard"].get("remoteConfiguration"): + try: + self.__remote_configurator = RemoteConfigurator(self, config) + self.__check_shared_attributes() + except Exception as e: + self.__load_connectors(config) + self._connect_with_connectors() + log.exception(e) self.__load_persistent_devices() self.__published_events = Queue(0) self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True, @@ -97,9 +106,9 @@ class TBGatewayService: if self.available_connectors[connector].is_connected(): connector_camel_case = connector[0].lower() + connector[1:].replace(' ', '') telemetry[(connector_camel_case + ' EventsProduced').replace(' ', '')] = \ - self.available_connectors[connector].statistics['MessagesReceived'] + self.available_connectors[connector].statistics['MessagesReceived'] telemetry[(connector_camel_case + ' EventsSent').replace(' ', '')] = \ - self.available_connectors[connector].statistics['MessagesSent'] + self.available_connectors[connector].statistics['MessagesSent'] self.tb_client.client.send_telemetry(telemetry) summary_messages['eventsProduced'] += telemetry[ str(connector_camel_case + ' EventsProduced').replace(' ', '')] @@ -119,8 +128,22 @@ class TBGatewayService: except Exception as e: log.error(e) + def __attributes_parse(self, content, *args): + shared_attributes = content.get("shared") + client_attributes = content.get("client") + if shared_attributes is None and client_attributes is None: + self.__remote_configurator.process_configuration(content.get("configuration")) + elif shared_attributes is not None: + if shared_attributes.get("configuration"): + self.__remote_configurator.process_configuration(shared_attributes.get("configuration")) + elif client_attributes is not None: + log.debug("Client attributes received") + def get_config_path(self): - return self.__config_dir + return self._config_dir + + def __check_shared_attributes(self): + self.tb_client.client.request_attributes(callback=self.__attributes_parse) def __load_connectors(self, config): self._connectors_configs = {} @@ -145,7 +168,7 @@ class TBGatewayService: else: log.error("Connector with config %s - not found", safe_dump(connector)) - with open(self.__config_dir + connector['configuration'], 'r') as conf_file: + with open(self._config_dir + connector['configuration'], 'r') as conf_file: connector_conf = load(conf_file) if not self._connectors_configs.get(connector['type']): self._connectors_configs[connector['type']] = [] @@ -153,7 +176,7 @@ class TBGatewayService: except Exception as e: log.error(e) - def __connect_with_connectors(self): + def _connect_with_connectors(self): for connector_type in self._connectors_configs: for connector_config in self._connectors_configs[connector_type]: for config_file in connector_config: @@ -336,9 +359,7 @@ class TBGatewayService: self.remote_handler.activate(content.get('RemoteLoggingLevel')) log.info('Remote logging has being activated.') else: - log.debug('Attributes on the gateway has being updated!') - log.debug(args) - log.debug(content) + self.__attributes_parse(content) def add_device(self, device_name, content, wait_for_publish=False): if device_name not in self.__saved_devices: @@ -365,15 +386,15 @@ class TBGatewayService: def __load_persistent_devices(self): devices = {} - if self.__connected_devices_file in listdir(self.__config_dir) and \ - path.getsize(self.__config_dir + self.__connected_devices_file) > 0: + if self.__connected_devices_file in listdir(self._config_dir) and \ + path.getsize(self._config_dir + self.__connected_devices_file) > 0: try: - with open(self.__config_dir + self.__connected_devices_file) as devices_file: + with open(self._config_dir + self.__connected_devices_file) as devices_file: devices = load(devices_file) except Exception as e: log.exception(e) else: - connected_devices_file = open(self.__config_dir + self.__connected_devices_file, 'w') + connected_devices_file = open(self._config_dir + self.__connected_devices_file, 'w') connected_devices_file.close() if devices is not None: @@ -393,7 +414,7 @@ class TBGatewayService: self.__connected_devices = {} if self.__connected_devices is None else self.__connected_devices def __save_persistent_devices(self): - with open(self.__config_dir + self.__connected_devices_file, 'w') as config_file: + with open(self._config_dir + self.__connected_devices_file, 'w') as config_file: try: data_to_save = {} for device in self.__connected_devices: diff --git a/thingsboard_gateway/tb_client/tb_device_mqtt.py b/thingsboard_gateway/tb_client/tb_device_mqtt.py index 59573260..fdc52e2f 100644 --- a/thingsboard_gateway/tb_client/tb_device_mqtt.py +++ b/thingsboard_gateway/tb_client/tb_device_mqtt.py @@ -19,6 +19,7 @@ import time from simplejson import loads, dumps from threading import RLock from threading import Thread +from thingsboard_gateway.tb_utility.tb_utility import TBUtility import paho.mqtt.client as paho from jsonschema import Draft7Validator @@ -158,7 +159,7 @@ class TBDeviceMqttClient: self._client.on_disconnect = self._on_disconnect def _on_log(self, client, userdata, level, buf): - log.debug(buf) + log.exception(buf) def _on_publish(self, client, userdata, result): # log.debug("Data published to ThingsBoard!") @@ -213,17 +214,9 @@ class TBDeviceMqttClient: log.info("Disconnected from ThingsBoard!") def _on_message(self, client, userdata, message): - content = self._decode(message) + content = TBUtility.decode(message) self._on_decoded_message(content, message) - - @staticmethod - def _decode(message): - content = loads(message.payload.decode("utf-8")) - log.debug(content) - log.debug(message.topic) - return content - @staticmethod def validate(validator, data): try: @@ -350,9 +343,6 @@ class TBDeviceMqttClient: return self.__device_max_sub_id def request_attributes(self, client_keys=None, shared_keys=None, callback=None): - if client_keys is None and shared_keys is None: - log.error("There are no keys to request") - return False msg = {} if client_keys: tmp = "" diff --git a/thingsboard_gateway/tb_client/tb_gateway_mqtt.py b/thingsboard_gateway/tb_client/tb_gateway_mqtt.py index 65a69525..3d06d6c1 100644 --- a/thingsboard_gateway/tb_client/tb_gateway_mqtt.py +++ b/thingsboard_gateway/tb_client/tb_gateway_mqtt.py @@ -17,6 +17,7 @@ import time from simplejson import dumps from thingsboard_gateway.tb_client.tb_device_mqtt import TBDeviceMqttClient, DEVICE_TS_KV_VALIDATOR, KV_VALIDATOR +from thingsboard_gateway.tb_utility.tb_utility import TBUtility GATEWAY_ATTRIBUTES_TOPIC = "v1/gateway/attributes" GATEWAY_ATTRIBUTES_REQUEST_TOPIC = "v1/gateway/attributes/request" @@ -50,7 +51,7 @@ class TBGatewayMqttClient(TBDeviceMqttClient): self._client.subscribe(GATEWAY_RPC_TOPIC + "/+") def _on_message(self, client, userdata, message): - content = self._decode(message) + content = TBUtility.decode(message) super()._on_decoded_message(content, message) self._on_decoded_message(content, message) diff --git a/thingsboard_gateway/tb_utility/tb_utility.py b/thingsboard_gateway/tb_utility/tb_utility.py index 7b2c21ad..15d430ba 100644 --- a/thingsboard_gateway/tb_utility/tb_utility.py +++ b/thingsboard_gateway/tb_utility/tb_utility.py @@ -25,6 +25,11 @@ log = getLogger("service") class TBUtility: + @staticmethod + def decode(message): + content = loads(message.payload.decode("utf-8")) + return content + @staticmethod def validate_converted_data(data): json_data = dumps(data)