1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Changed structure for utilities

This commit is contained in:
zbeacon
2021-04-09 09:00:39 +03:00
parent 2739966d17
commit 64a1eb870a
16 changed files with 117 additions and 86 deletions

View File

@@ -1,285 +0,0 @@
# Copyright 2020. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from os import remove, linesep
from os.path import exists, dirname
from re import findall
from time import time, sleep
from logging import getLogger
from logging.config import fileConfig
from base64 import b64encode, b64decode
from simplejson import dumps, loads, dump
from yaml import safe_dump
from configparser import ConfigParser
from thingsboard_gateway.gateway.tb_client import TBClient
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
from thingsboard_gateway.gateway.tb_logger import TBLoggerHandler
# pylint: disable=protected-access
LOG = getLogger("service")
class RemoteConfigurator:
def __init__(self, gateway, config):
self.__gateway = gateway
self.__new_configuration = None
self.__old_configuration = None
self.__apply_timeout = 10
self.__old_tb_client = None
self.__old_logs_configuration = self.__get_current_logs_configuration()
self.__new_logs_configuration = None
self.__old_connectors_configs = {}
self.__new_connectors_configs = {}
self.__old_general_configuration_file = config
self.__new_general_configuration_file = {}
self.__old_event_storage = None
self.__new_event_storage = None
self.in_process = False
def process_configuration(self, configuration):
try:
if not self.in_process:
self.in_process = True
# while not self.__gateway._published_events.empty():
# LOG.debug("Waiting for end of the data processing...")
# sleep(1)
decoded_configuration = b64decode(configuration)
self.__new_configuration = loads(decoded_configuration)
self.__old_connectors_configs = self.__gateway.connectors_configs
self.__new_general_configuration_file = self.__new_configuration.get("thingsboard")
self.__new_logs_configuration = b64decode(self.__new_general_configuration_file.pop("logs")).decode('UTF-8').replace('}}', '\n')
if self.__old_configuration != decoded_configuration:
LOG.info("Remote configuration received: \n %s", decoded_configuration)
result = self.__process_connectors_configuration()
self.in_process = False
if result:
self.__old_configuration = self.__new_configuration
return True
else:
return False
else:
LOG.info("Remote configuration is the same.")
else:
LOG.error("Remote configuration is already in processing")
return False
except Exception as e:
self.in_process = False
LOG.exception(e)
def send_current_configuration(self):
try:
current_configuration = {}
for connector in self.__gateway.connectors_configs:
if current_configuration.get(connector) is None:
current_configuration[connector] = []
for config in self.__gateway.connectors_configs[connector]:
for config_file in config['config']:
current_configuration[connector].append({'name': config['name'], 'config': config['config'][config_file]})
current_configuration["thingsboard"] = self.__old_general_configuration_file
current_configuration["thingsboard"]["logs"] = b64encode(self.__old_logs_configuration.replace('\n', '}}').encode("UTF-8"))
json_current_configuration = dumps(current_configuration)
encoded_current_configuration = b64encode(json_current_configuration.encode())
self.__old_configuration = encoded_current_configuration
self.__gateway.tb_client.client.send_attributes(
{"current_configuration": encoded_current_configuration.decode("UTF-8")})
LOG.debug('Current configuration has been sent to ThingsBoard: %s', json_current_configuration)
except Exception as e:
LOG.exception(e)
def __process_connectors_configuration(self):
LOG.info("Processing remote connectors configuration...")
if self.__apply_new_connectors_configuration():
self.__write_new_configuration_files()
self.__apply_storage_configuration()
if self.__safe_apply_connection_configuration():
LOG.info("Remote configuration has been applied.")
with open(self.__gateway.get_config_path() + "tb_gateway.yaml", "w", encoding="UTF-8") as general_configuration_file:
safe_dump(self.__new_general_configuration_file, general_configuration_file)
self.__old_connectors_configs = {}
self.__new_connectors_configs = {}
self.__old_general_configuration_file = self.__new_general_configuration_file
self.__old_logs_configuration = self.__new_logs_configuration
self.__update_logs_configuration()
self.__new_logs_configuration = None
self.__new_general_configuration_file = {}
return True
else:
self.__update_logs_configuration()
self.__old_general_configuration_file.pop("logs")
with open(self.__gateway.get_config_path() + "tb_gateway.yaml", "w", encoding="UTF-8") as general_configuration_file:
safe_dump(self.__old_general_configuration_file, general_configuration_file)
LOG.error("A remote general configuration applying has been failed.")
self.__old_connectors_configs = {}
self.__new_connectors_configs = {}
self.__new_logs_configuration = None
self.__new_general_configuration_file = {}
return False
def __prepare_connectors_configuration(self, input_connector_config):
try:
self.__gateway.connectors_configs = {}
for connector in input_connector_config['thingsboard']['connectors']:
for input_connector in input_connector_config[connector['type']]:
if input_connector['name'] == connector['name']:
if not self.__gateway.connectors_configs.get(connector['type']):
self.__gateway.connectors_configs[connector['type']] = []
self.__gateway.connectors_configs[connector['type']].append(
{"name": connector["name"], "config": {connector['configuration']: input_connector["config"]}})
connector_class = TBUtility.check_and_import(connector["type"], self.__gateway._default_connectors.get(connector["type"], connector.get("class")))
self.__gateway._implemented_connectors[connector["type"]] = connector_class
except Exception as e:
LOG.exception(e)
def __apply_new_connectors_configuration(self):
try:
self.__prepare_connectors_configuration(self.__new_configuration)
for connector_name in self.__gateway.available_connectors:
try:
self.__gateway.available_connectors[connector_name].close()
except Exception as e:
LOG.exception(e)
self.__gateway._connect_with_connectors()
LOG.debug("New connectors configuration has been applied")
self.__old_connectors_configs = {}
return True
except Exception as e:
self.__gateway.connectors_configs = self.__old_connectors_configs
for connector_name in self.__gateway.available_connectors:
self.__gateway.available_connectors[connector_name].close()
self.__gateway._load_connectors(self.__old_general_configuration_file)
self.__gateway._connect_with_connectors()
LOG.exception(e)
return False
def __write_new_configuration_files(self):
try:
self.__new_connectors_configs = self.__new_connectors_configs if self.__new_connectors_configs else self.__gateway.connectors_configs
new_connectors_files = []
for connector_type in self.__new_connectors_configs:
for connector_config_section in self.__new_connectors_configs[connector_type]:
for connector_file in connector_config_section["config"]:
connector_config = connector_config_section["config"][connector_file]
with open(self.__gateway.get_config_path() + connector_file, "w", encoding="UTF-8") as config_file:
dump(connector_config, config_file, sort_keys=True, indent=2)
new_connectors_files.append(connector_file)
LOG.debug("Saving new configuration for \"%s\" connector to file \"%s\"", connector_type,
connector_file)
break
self.__old_general_configuration_file["connectors"] = self.__new_general_configuration_file["connectors"]
for old_connector_type in self.__old_connectors_configs:
for old_connector_config_section in self.__old_connectors_configs[old_connector_type]:
for old_connector_file in old_connector_config_section["config"]:
if old_connector_file not in new_connectors_files:
remove(self.__gateway.get_config_path() + old_connector_file)
LOG.debug("Remove old configuration file \"%s\" for \"%s\" connector ", old_connector_file,
old_connector_type)
except Exception as e:
LOG.exception(e)
def __safe_apply_connection_configuration(self):
apply_start = time() * 1000
self.__old_tb_client = self.__gateway.tb_client
try:
self.__old_tb_client.unsubscribe('*')
self.__old_tb_client.stop()
self.__old_tb_client.disconnect()
self.__gateway.tb_client = TBClient(self.__new_general_configuration_file["thingsboard"])
self.__gateway.tb_client.connect()
connection_state = False
while time() * 1000 - apply_start < self.__apply_timeout * 1000 and not connection_state:
connection_state = self.__gateway.tb_client.is_connected()
sleep(.1)
if not connection_state:
self.__revert_configuration()
LOG.info("The gateway cannot connect to the ThingsBoard server with a new configuration.")
return False
else:
self.__old_tb_client.stop()
self.__gateway.subscribe_to_required_topics()
return True
except Exception as e:
LOG.exception(e)
self.__revert_configuration()
return False
def __apply_storage_configuration(self):
if self.__old_general_configuration_file["storage"] != self.__new_general_configuration_file["storage"]:
self.__old_event_storage = self.__gateway._event_storage
try:
storage_class = self.__gateway._event_storage_types[self.__new_general_configuration_file["storage"]["type"]]
self.__gateway._event_storage = storage_class(self.__new_general_configuration_file["storage"])
self.__old_event_storage = None
except Exception as e:
LOG.exception(e)
self.__gateway._event_storage = self.__old_event_storage
def __revert_configuration(self):
try:
LOG.info("Remote general configuration will be restored.")
self.__new_general_configuration_file = self.__old_general_configuration_file
self.__gateway.tb_client.disconnect()
self.__gateway.tb_client.stop()
self.__gateway.tb_client = TBClient(self.__old_general_configuration_file["thingsboard"])
self.__gateway.tb_client.connect()
self.__gateway.subscribe_to_required_topics()
LOG.debug("%s connection has been restored", str(self.__gateway.tb_client.client._client))
except Exception as e:
LOG.exception("Exception on reverting configuration occurred:")
LOG.exception(e)
def __get_current_logs_configuration(self):
try:
with open(self.__gateway.get_config_path() + 'logs.conf', 'r', encoding="UTF-8") as logs:
current_logs_configuration = logs.read()
return current_logs_configuration
except Exception as e:
LOG.exception(e)
def __update_logs_configuration(self):
global LOG
try:
LOG = getLogger('service')
logs_conf_file_path = self.__gateway.get_config_path() + 'logs.conf'
new_logging_level = findall(r'level=(.*)', self.__new_logs_configuration.replace("NONE", "NOTSET"))[-1]
new_logging_config = self.__new_logs_configuration.replace("NONE", "NOTSET").replace("\r\n", linesep)
logs_config = ConfigParser(allow_no_value=True)
logs_config.read_string(new_logging_config)
for section in logs_config:
if "handler_" in section and section != "handler_consoleHandler":
args = tuple(logs_config[section]["args"]
.replace('(', '')
.replace(')', '')
.split(', '))
path = args[0][1:-1]
LOG.debug("Checking %s...", path)
if not exists(dirname(path)):
raise FileNotFoundError
with open(logs_conf_file_path, 'w', encoding="UTF-8") as logs:
logs.write(self.__new_logs_configuration.replace("NONE", "NOTSET")+"\r\n")
fileConfig(logs_config)
LOG = getLogger('service')
# self.__gateway.remote_handler.deactivate()
self.__gateway.remote_handler = TBLoggerHandler(self.__gateway)
self.__gateway.main_handler.setLevel(new_logging_level)
self.__gateway.main_handler.setTarget(self.__gateway.remote_handler)
if new_logging_level == "NOTSET":
self.__gateway.remote_handler.deactivate()
else:
self.__gateway.remote_handler.activate(new_logging_level)
LOG.debug("Logs configuration has been updated.")
except Exception as e:
LOG.error("Remote logging configuration is wrong!")
LOG.exception(e)

View File

@@ -26,14 +26,15 @@ from threading import Thread, RLock
from yaml import safe_load
from simplejson import load, dumps, loads
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
from thingsboard_gateway.gateway.tb_client import TBClient
from thingsboard_gateway.gateway.tb_updater import TBUpdater
from thingsboard_gateway.gateway.tb_logger import TBLoggerHandler
from thingsboard_gateway.tb_utility.tb_updater import TBUpdater
from thingsboard_gateway.tb_utility.tb_logger import TBLoggerHandler
from thingsboard_gateway.storage.memory_event_storage import MemoryEventStorage
from thingsboard_gateway.storage.file_event_storage import FileEventStorage
from thingsboard_gateway.gateway.tb_gateway_remote_configurator import RemoteConfigurator
from thingsboard_gateway.gateway.tb_remote_shell import RemoteShell
from thingsboard_gateway.tb_utility.tb_gateway_remote_configurator import RemoteConfigurator
from thingsboard_gateway.tb_utility.tb_remote_shell import RemoteShell
@@ -196,6 +197,7 @@ class TBGatewayService:
self.__stop_gateway()
except Exception as e:
log.exception(e)
self.__stop_gateway()
self.__close_connectors()
log.info("The gateway has been stopped.")
self.tb_client.stop()
@@ -278,7 +280,7 @@ class TBGatewayService:
if self.__config.get("connectors"):
for connector in self.__config['connectors']:
try:
connector_class = TBUtility.check_and_import(connector["type"], self._default_connectors.get(connector["type"], connector.get("class")))
connector_class = TBModuleLoader.import_module(connector["type"], self._default_connectors.get(connector["type"], connector.get("class")))
self._implemented_connectors[connector["type"]] = connector_class
with open(self._config_dir + connector['configuration'], 'r', encoding="UTF-8") as conf_file:
connector_conf = load(conf_file)

View File

@@ -1,77 +0,0 @@
# Copyright 2020. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sys import stdout
import logging
import logging.handlers
class TBLoggerHandler(logging.Handler):
def __init__(self, gateway):
self.current_log_level = 'INFO'
super().__init__(logging.getLevelName(self.current_log_level))
self.setLevel(logging.getLevelName('DEBUG'))
self.__gateway = gateway
self.activated = False
self.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - [%(filename)s] - %(module)s - %(lineno)d - %(message)s'))
self.loggers = ['service',
'extension',
'converter',
'connector',
'tb_connection'
]
for logger in self.loggers:
log = logging.getLogger(logger)
log.addHandler(self.__gateway.main_handler)
log.debug("Added remote handler to log %s", logger)
def activate(self, log_level=None):
try:
for logger in self.loggers:
if log_level is not None and logging.getLevelName(log_level) is not None:
if logger == 'tb_connection' and log_level == 'DEBUG':
log = logging.getLogger(logger)
log.setLevel(logging.getLevelName('INFO'))
else:
log = logging.getLogger(logger)
self.current_log_level = log_level
log.setLevel(logging.getLevelName(log_level))
except Exception as e:
log = logging.getLogger('service')
log.exception(e)
self.activated = True
def handle(self, record):
if self.activated:
record = self.formatter.format(record)
self.__gateway.send_to_storage(self.__gateway.name, {"deviceName": self.__gateway.name, "telemetry": [{'LOGS': record}]})
def deactivate(self):
self.activated = False
@staticmethod
def set_default_handler():
logger_names = [
'service',
'storage',
'extension',
'converter',
'connector',
'tb_connection'
]
for logger_name in logger_names:
logger = logging.getLogger(logger_name)
handler = logging.StreamHandler(stdout)
handler.setFormatter(logging.Formatter('[STREAM ONLY] %(asctime)s - %(levelname)s - [%(filename)s] - %(module)s - %(lineno)d - %(message)s'))
logger.addHandler(handler)

View File

@@ -1,113 +0,0 @@
# Copyright 2020. ThingsBoard
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from os import getcwd, chdir
from subprocess import Popen, PIPE, STDOUT, TimeoutExpired
from logging import getLogger
log = getLogger("service")
class RemoteShell:
def __init__(self, platform, release):
self.__session_active = False
self.__platform = platform
self.__release = release
self.shell_commands = {
"getTermInfo": self.get_term_info,
"sendCommand": self.send_command,
"getCommandStatus": self.get_command_status,
"terminateCommand": self.terminate_command,
}
self.command_in_progress = None
self.__previous_stdout = b""
self.__previous_stderr = b""
def get_term_info(self, *args):
return {"platform": self.__platform, "release": self.__release, "cwd": str(getcwd())}
def send_command(self, *args):
result = {"ok": False, "qos": 0}
log.debug("Received command to shell with args: %r", args)
command = args[0]['command']
cwd = args[0].get('cwd')
if cwd is not None and str(getcwd()) != cwd:
chdir(cwd)
if command.split():
if self.command_in_progress is not None:
log.debug("Received a new command: \"%s\", during old command is running, terminating old command...", command)
old_command = self.command_in_progress.args
self.terminate_command()
log.debug("Old command: \"%s\" terminated.", old_command)
if command.split()[0] in ["quit", "exit"]:
self.command_in_progress = None
elif command.split()[0] == "cd":
chdir(command.split()[1])
self.command_in_progress = "cd"
else:
log.debug("Run command in remote shell: %s", command)
self.command_in_progress = Popen(command, shell=True, stdout=PIPE, stdin=PIPE, stderr=STDOUT, universal_newlines=True)
result.update({"ok": True})
return result
def get_command_status(self, *args):
result = {"data": [{"stdout": "",
"stderr": ""}],
"cwd": str(getcwd()),
"done": True,
"qos": 0,
}
done = False
if self.command_in_progress == "cd":
done = True
elif self.command_in_progress is not None:
stdout_value = b""
stderr_value = b""
done = True if self.command_in_progress.poll() is not None else False
try:
stdout_value, stderr_value = self.command_in_progress.communicate(timeout=.1)
except TimeoutExpired as e:
log.debug("Process is run")
stdout_value = b"" if e.stdout is None else e.stdout.replace(self.__previous_stdout, b"")
stderr_value = b"" if e.stderr is None else e.stderr.replace(self.__previous_stderr, b"")
stdout_value = stdout_value[:-1] if len(stdout_value) and stdout_value[-1] == b"\n" else stdout_value
stderr_value = stderr_value[:-1] if len(stderr_value) and stderr_value[-1] == b"\n" else stderr_value
self.__previous_stderr = self.__previous_stderr + b"" if stderr_value is None else stderr_value
self.__previous_stdout = self.__previous_stdout + b"" if stdout_value is None else stdout_value
str_stdout = str(stdout_value, "UTF-8") if isinstance(stdout_value, bytes) else stdout_value
str_stderr = str(stderr_value, "UTF-8") if isinstance(stderr_value, bytes) else stderr_value
result.update({"data": [{"stdout": str_stdout,
"stderr": str_stderr}]
})
result.update({"done": done})
if done:
self.command_in_progress = None
return result
def terminate_command(self, *args):
result = {"ok": False}
if self.command_in_progress is not None:
try:
self.command_in_progress.terminate()
self.__previous_stderr = b""
self.__previous_stdout = b""
result.update({"ok": True})
except Exception as e:
log.exception(e)
result["error"] = str(e)
else:
result["error"] = "Process for termination not found."
return result

View File

@@ -1,113 +0,0 @@
# Copyright 2020. ThingsBoard
# #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# #
# http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from requests import post, ConnectionError
from uuid import uuid1
from platform import platform, system, release
from logging import getLogger
from pkg_resources import get_distribution
from threading import Thread
from time import time, sleep
from simplejson import loads
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
log = getLogger("service")
UPDATE_SERVICE_BASE_URL = "https://updates.thingsboard.io"
# UPDATE_SERVICE_BASE_URL = "http://127.0.0.1:8090"
class TBUpdater(Thread):
def __init__(self):
super().__init__()
self.__version = {"current_version": get_distribution('thingsboard_gateway').version,
"latest_version": get_distribution('thingsboard_gateway').version}
self.__instance_id = str(uuid1())
self.__platform = system()
self.__release = release()
self.__os_version = platform()
self.__previous_check = 0
self.__check_period = 3600.0
self.__request_timeout = 5
self.__stopped = True
self.start()
def run(self):
self.__stopped = False
while not self.__stopped:
if time() >= self.__previous_check + self.__check_period:
self.check_for_new_version()
self.__previous_check = time()
else:
sleep(1)
def stop(self):
self.__stopped = True
def get_version(self):
return self.__version
def get_platform(self):
return self.__platform
def get_release(self):
return self.__release
def check_for_new_version(self):
log.debug("Checking for new version")
request_args = self.form_request_params()
try:
response = post(**request_args)
content = None
content = loads(response.content)
if content is not None and content.get("updateAvailable", False):
new_version = content["message"].replace("New version ", "").replace(" is available!", "")
if new_version > self.__version["current_version"]:
log.info(content["message"])
self.__version["latest_version"] = new_version
log.info("\n\n[===UPDATE===]\n\n New version %s is available! \n\n[===UPDATE===]\n",
self.__version["latest_version"])
except ConnectionRefusedError:
log.warning("Cannot connect to the update service. PLease check your internet connection.")
except ConnectionError:
log.warning("Cannot connect to the update service. PLease check your internet connection.")
except Exception as e:
log.exception(e)
def form_request_params(self):
json_data = {
"version": self.__version["current_version"],
"platform": self.__platform,
"instanceId": self.__instance_id,
"osVersion": self.__os_version,
}
url = UPDATE_SERVICE_BASE_URL + "/api/tb-gateway/updates"
request_args = {
"url": url,
"json": json_data,
"timeout": self.__request_timeout
}
return request_args
def update(self):
if self.__version["latest_version"] != self.__version["current_version"]:
result = TBUtility.install_package("thingsboard-gateway", self.__version["latest_version"])
else:
result = "Congratulations! You have the latest version."
return result
if __name__ == '__main__':
updater = TBUpdater()