mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Added checking for remote configuration in shared attributes of the gateway and template for uploading current configuration
This commit is contained in:
@@ -37,6 +37,7 @@ class TBClient(threading.Thread):
|
||||
self.__token = None
|
||||
self.__is_connected = False
|
||||
self.__stopped = False
|
||||
self.__paused = False
|
||||
if credentials.get("accessToken") is not None:
|
||||
self.__token = str(credentials["accessToken"])
|
||||
if self.__tls:
|
||||
@@ -53,21 +54,34 @@ class TBClient(threading.Thread):
|
||||
def _on_log(self, *args):
|
||||
log.debug(args)
|
||||
|
||||
def pause(self):
|
||||
self.__paused = True
|
||||
|
||||
def unpause(self):
|
||||
self.__paused = False
|
||||
|
||||
def is_connected(self):
|
||||
return self.client.is_connected()
|
||||
|
||||
def _on_connect(self, client, userdata, flags, rc, *extra_params):
|
||||
log.debug('Gateway connected to ThingsBoard')
|
||||
log.debug('TB client %s connected to ThingsBoard', str(client))
|
||||
self.client._on_connect(client, userdata, flags, rc, *extra_params)
|
||||
|
||||
def _on_disconnect(self, client, userdata, rc):
|
||||
log.info('Gateway disconnected.')
|
||||
log.info("TB client %s has been disconnected.", str(client))
|
||||
self.client._on_disconnect(client, userdata, rc)
|
||||
|
||||
def stop(self):
|
||||
self.disconnect()
|
||||
self.__stopped = True
|
||||
|
||||
def disconnect(self):
|
||||
self.__paused = True
|
||||
self.client.disconnect()
|
||||
|
||||
def connect(self, min_reconnect_delay=10):
|
||||
self.__paused = False
|
||||
self.__stopped = False
|
||||
self.__min_reconnect_delay = min_reconnect_delay
|
||||
|
||||
def run(self):
|
||||
@@ -75,6 +89,7 @@ class TBClient(threading.Thread):
|
||||
try:
|
||||
while not self.client.is_connected() and not self.__stopped:
|
||||
log.debug("connecting to ThingsBoard")
|
||||
if not self.__paused:
|
||||
try:
|
||||
self.client.connect(tls=self.__tls,
|
||||
ca_certs=self.__ca_cert,
|
||||
@@ -93,6 +108,8 @@ class TBClient(threading.Thread):
|
||||
try:
|
||||
if not self.__stopped:
|
||||
time.sleep(1)
|
||||
else:
|
||||
break
|
||||
except KeyboardInterrupt:
|
||||
self.__stopped = True
|
||||
except Exception as e:
|
||||
|
||||
@@ -14,17 +14,21 @@
|
||||
|
||||
# from threading import Thread
|
||||
from simplejson import dumps, loads, dump
|
||||
from yaml import safe_load, safe_dump
|
||||
from copy import deepcopy
|
||||
from yaml import safe_dump
|
||||
from time import time, sleep
|
||||
from logging import getLogger
|
||||
from os import remove
|
||||
from thingsboard_gateway.gateway.tb_client import TBClient
|
||||
|
||||
log = getLogger("service")
|
||||
|
||||
class RemoteConfigurator():
|
||||
|
||||
class RemoteConfigurator:
|
||||
def __init__(self, gateway, config):
|
||||
self.__gateway = gateway
|
||||
self.__new_configuration = None
|
||||
self.__apply_timeout = 10
|
||||
self.__old_tb_client = None
|
||||
self.__old_connectors_configs = {}
|
||||
self.__new_connectors_configs = {}
|
||||
self.__old_general_configuration_file = config
|
||||
@@ -34,17 +38,35 @@ class RemoteConfigurator():
|
||||
log.info("Remote configuration received: \n %s", dumps(configuration))
|
||||
self.__new_configuration = loads(configuration)
|
||||
self.__old_connectors_configs = self.__gateway._connectors_configs
|
||||
self.__new_general_configuration_file = self.__new_configuration.get("thingsboard")
|
||||
self.__process_general_configuration()
|
||||
self.__process_connectors_configuration()
|
||||
|
||||
def send_current_configuration(self):
|
||||
current_configuration = {}
|
||||
for connector in self.__gateway._connectors_configs:
|
||||
log.debug(connector)
|
||||
if current_configuration.get(connector) is None:
|
||||
current_configuration[connector] = []
|
||||
for config_file in self.__gateway._connectors_configs[connector]:
|
||||
for config in config_file:
|
||||
current_configuration[connector].append(config_file[config])
|
||||
current_configuration["thingsboard"] = self.__old_general_configuration_file
|
||||
self.__gateway.tb_client.client.send_attributes(dumps({"configuration": dumps(current_configuration)}))
|
||||
log.debug(current_configuration)
|
||||
|
||||
|
||||
def __process_general_configuration(self):
|
||||
# TODO Add remote configuration for the general configuration file
|
||||
pass
|
||||
if self.__new_general_configuration_file is not None and self.__old_general_configuration_file != self.__new_general_configuration_file:
|
||||
log.debug("New general configuration found.")
|
||||
else:
|
||||
log.debug("General configuration from server is the same like current gateway general configuration.")
|
||||
|
||||
def __process_connectors_configuration(self):
|
||||
log.debug("Processing remote connectors configuration...")
|
||||
self.__prepare_connectors_configuration()
|
||||
if self.__apply_new_connectors_configuration():
|
||||
if self.__apply_new_configuration():
|
||||
self.__write_new_configuration_files()
|
||||
|
||||
def __prepare_connectors_configuration(self):
|
||||
@@ -64,13 +86,14 @@ class RemoteConfigurator():
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def __apply_new_connectors_configuration(self):
|
||||
def __apply_new_configuration(self):
|
||||
try:
|
||||
self.__gateway._connectors_configs = self.__new_connectors_configs
|
||||
for connector_name in self.__gateway.available_connectors:
|
||||
self.__gateway.available_connectors[connector_name].close()
|
||||
self.__gateway._connect_with_connectors()
|
||||
log.debug("New connectors configuration has been applied")
|
||||
self.__old_connectors_configs = {}
|
||||
return True
|
||||
except Exception as e:
|
||||
self.__gateway._connectors_configs = self.__old_connectors_configs
|
||||
@@ -84,6 +107,10 @@ class RemoteConfigurator():
|
||||
try:
|
||||
general_edited = False
|
||||
if self.__new_general_configuration_file and self.__new_general_configuration_file != self.__old_general_configuration_file:
|
||||
general_edited = False
|
||||
if self.__new_general_configuration_file["thingsboard"] != self.__old_general_configuration_file["thingsboard"]:
|
||||
general_edited = True
|
||||
if self.__new_general_configuration_file["storage"] != self.__old_general_configuration_file["storage"]:
|
||||
general_edited = True
|
||||
self.__new_general_configuration_file = self.__new_general_configuration_file if general_edited else self.__old_general_configuration_file
|
||||
self.__new_connectors_configs = self.__new_connectors_configs if self.__new_connectors_configs else self.__new_connectors_configs
|
||||
@@ -104,7 +131,7 @@ class RemoteConfigurator():
|
||||
}
|
||||
)
|
||||
with open(self.__gateway._config_dir + connector_file, "w") as config_file:
|
||||
dump(connector_config, config_file)
|
||||
dump(connector_config, config_file, sort_keys=True)
|
||||
new_connectors_files.append(connector_file)
|
||||
log.debug("Saving new configuration for \"%s\" connector to file \"%s\"", connector_type, connector_file)
|
||||
for old_connector_type in self.__old_connectors_configs:
|
||||
@@ -116,11 +143,68 @@ class RemoteConfigurator():
|
||||
if not general_edited:
|
||||
with open(self.__gateway._config_dir+"tb_gateway.yaml", "w") as general_configuration_file:
|
||||
safe_dump(self.__new_general_configuration_file, general_configuration_file)
|
||||
self.__old_connectors_configs = {}
|
||||
self.__new_connectors_configs = {}
|
||||
else:
|
||||
self.safe_apply()
|
||||
if self.safe_apply():
|
||||
log.info("A new configuration has been applied.")
|
||||
with open(self.__gateway._config_dir+"tb_gateway.yaml", "w") as general_configuration_file:
|
||||
safe_dump(self.__new_general_configuration_file, general_configuration_file)
|
||||
self.__old_connectors_configs = {}
|
||||
self.__new_connectors_configs = {}
|
||||
self.__old_general_configuration_file = self.__new_general_configuration_file
|
||||
self.__new_general_configuration_file = {}
|
||||
else:
|
||||
log.error("A new configuration applying has been failed.")
|
||||
self.__old_connectors_configs = {}
|
||||
self.__new_connectors_configs = {}
|
||||
self.__new_general_configuration_file = {}
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def safe_apply(self):
|
||||
# TODO Add check for connection to the ThingsBoard and revert configuration on fail in timeout
|
||||
pass
|
||||
apply_start = time()*1000
|
||||
self.__old_tb_client = self.__gateway.tb_client
|
||||
try:
|
||||
self.__old_tb_client.pause()
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
self.__revert_configuration()
|
||||
return False
|
||||
connection_state = False
|
||||
try:
|
||||
tb_client = TBClient(self.__new_general_configuration_file["thingsboard"])
|
||||
tb_client.connect()
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
self.__revert_configuration()
|
||||
return False
|
||||
self.__gateway.tb_client = tb_client
|
||||
try:
|
||||
|
||||
while time()*1000-apply_start < self.__apply_timeout*1000:
|
||||
connection_state = self.__gateway.tb_client.is_connected()
|
||||
sleep(.1)
|
||||
if not connection_state:
|
||||
self.__revert_configuration()
|
||||
log.info("The gateway cannot connect to the ThingsBoard server with a new configuration.")
|
||||
return False
|
||||
else:
|
||||
self.__old_tb_client.stop()
|
||||
return True
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
self.__revert_configuration()
|
||||
return False
|
||||
|
||||
def __revert_configuration(self):
|
||||
log.info("Configuration will be restored.")
|
||||
self.__new_general_configuration_file = self.__old_general_configuration_file
|
||||
self.__gateway.tb_client.disconnect()
|
||||
self.__gateway.tb_client.stop()
|
||||
self.__gateway.tb_client = self.__old_tb_client
|
||||
self.__gateway.tb_client.connect()
|
||||
self.__gateway.tb_client.unpause()
|
||||
|
||||
|
||||
|
||||
@@ -53,19 +53,20 @@ class TBGatewayService:
|
||||
self.main_handler = logging.handlers.MemoryHandler(1000)
|
||||
self.remote_handler = TBLoggerHandler(self)
|
||||
self.main_handler.setTarget(self.remote_handler)
|
||||
self.__default_connectors = {
|
||||
self._default_connectors = {
|
||||
"mqtt": "MqttConnector",
|
||||
"modbus": "ModbusConnector",
|
||||
"opcua": "OpcUaConnector",
|
||||
"ble": "BLEConnector",
|
||||
}
|
||||
self.__implemented_connectors = {}
|
||||
self._implemented_connectors = {}
|
||||
self.__event_storage_types = {
|
||||
"memory": MemoryEventStorage,
|
||||
"file": FileEventStorage,
|
||||
}
|
||||
self.__load_connectors(config)
|
||||
self._connect_with_connectors()
|
||||
self.__remote_configurator = None
|
||||
if config["thingsboard"].get("remoteConfiguration"):
|
||||
try:
|
||||
self.__remote_configurator = RemoteConfigurator(self, config)
|
||||
@@ -129,15 +130,18 @@ class TBGatewayService:
|
||||
log.error(e)
|
||||
|
||||
def __attributes_parse(self, content, *args):
|
||||
try:
|
||||
shared_attributes = content.get("shared")
|
||||
client_attributes = content.get("client")
|
||||
if shared_attributes is None and client_attributes is None:
|
||||
self.__remote_configurator.process_configuration(content.get("configuration"))
|
||||
elif shared_attributes is not None:
|
||||
if shared_attributes.get("configuration"):
|
||||
if self.__remote_configurator is not None:
|
||||
self.__remote_configurator.send_current_configuration()
|
||||
if shared_attributes is not None:
|
||||
if self.__remote_configurator is not None and shared_attributes.get("configuration"):
|
||||
self.__remote_configurator.process_configuration(shared_attributes.get("configuration"))
|
||||
elif client_attributes is not None:
|
||||
log.debug("Client attributes received")
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def get_config_path(self):
|
||||
return self._config_dir
|
||||
@@ -154,14 +158,14 @@ class TBGatewayService:
|
||||
if connector.get('class') is not None:
|
||||
try:
|
||||
connector_class = TBUtility.check_and_import(connector['type'], connector['class'])
|
||||
self.__implemented_connectors[connector['type']] = connector_class
|
||||
self._implemented_connectors[connector['type']] = connector_class
|
||||
except Exception as e:
|
||||
log.error("Exception when loading the custom connector:")
|
||||
log.exception(e)
|
||||
elif connector.get("type") is not None and connector["type"] in self.__default_connectors:
|
||||
elif connector.get("type") is not None and connector["type"] in self._default_connectors:
|
||||
try:
|
||||
connector_class = TBUtility.check_and_import(connector["type"], self.__default_connectors[connector["type"]], default=True)
|
||||
self.__implemented_connectors[connector["type"]] = connector_class
|
||||
connector_class = TBUtility.check_and_import(connector["type"], self._default_connectors[connector["type"]], default=True)
|
||||
self._implemented_connectors[connector["type"]] = connector_class
|
||||
except Exception as e:
|
||||
log.error("Error on loading default connector:")
|
||||
log.exception(e)
|
||||
@@ -182,7 +186,7 @@ class TBGatewayService:
|
||||
for config_file in connector_config:
|
||||
connector = None
|
||||
try:
|
||||
connector = self.__implemented_connectors[connector_type](self, connector_config[config_file],
|
||||
connector = self._implemented_connectors[connector_type](self, connector_config[config_file],
|
||||
connector_type)
|
||||
self.available_connectors[connector.get_name()] = connector
|
||||
connector.open()
|
||||
|
||||
Reference in New Issue
Block a user