1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Added remote configurator for connectors

This commit is contained in:
zbeacon
2020-01-10 14:51:43 +02:00
parent 24e6fedd8e
commit e151ced29b
7 changed files with 183 additions and 42 deletions

View File

@@ -85,7 +85,7 @@ class MqttConnector(Connector, Thread):
def run(self):
try:
while not self._connected:
while not self._connected and not self.__stopped:
try:
self._client.connect(self.__broker['host'],
self.__broker.get('port', 1883))
@@ -93,15 +93,15 @@ class MqttConnector(Connector, Thread):
if not self._connected:
time.sleep(1)
except Exception as e:
self.__log.error(e)
self.__log.exception(e)
time.sleep(10)
except Exception as e:
self.__log.error(e)
self.__log.exception(e)
try:
self.close()
except Exception as e:
self.__log.debug(e)
self.__log.exception(e)
while True:
if self.__stopped:
break
@@ -109,8 +109,11 @@ class MqttConnector(Connector, Thread):
time.sleep(1)
def close(self):
try:
self._client.disconnect()
except:
pass
self._client.loop_stop()
self._client.disconnect()
self.__stopped = True
self.__log.info('%s has been stopped.', self.get_name())
@@ -209,7 +212,7 @@ class MqttConnector(Connector, Thread):
def _on_message(self, client, userdata, message):
self.statistics['MessagesReceived'] += 1
content = self._decode(message)
content = TBUtility.decode(message)
regex_topic = [regex for regex in self.__sub_topics if fullmatch(regex, message.topic)]
if regex_topic:
try:
@@ -361,7 +364,3 @@ class MqttConnector(Connector, Thread):
def rpc_cancel_processing(self, topic):
self._client.unsubscribe(topic)
@staticmethod
def _decode(message):
content = loads(message.payload.decode("utf-8"))
return content

View File

@@ -52,7 +52,6 @@ class TBClient(threading.Thread):
def _on_log(self, *args):
log.debug(args)
pass
def is_connected(self):
return self.client.is_connected()
@@ -74,7 +73,7 @@ class TBClient(threading.Thread):
def run(self):
keep_alive = self.__config.get("keep_alive", 60)
try:
while not self.client.is_connected():
while not self.client.is_connected() and not self.__stopped:
log.debug("connecting to ThingsBoard")
try:
self.client.connect(tls=self.__tls,

View File

@@ -0,0 +1,126 @@
# Copyright 2019. ThingsBoard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# from threading import Thread
from simplejson import dumps, loads, dump
from yaml import safe_load, safe_dump
from copy import deepcopy
from logging import getLogger
from os import remove
log = getLogger("service")
class RemoteConfigurator():
def __init__(self, gateway, config):
self.__gateway = gateway
self.__new_configuration = None
self.__old_connectors_configs = {}
self.__new_connectors_configs = {}
self.__old_general_configuration_file = config
self.__new_general_configuration_file = {}
def process_configuration(self, configuration):
log.info("Remote configuration received: \n %s", dumps(configuration))
self.__new_configuration = loads(configuration)
self.__old_connectors_configs = self.__gateway._connectors_configs
self.__process_general_configuration()
self.__process_connectors_configuration()
def __process_general_configuration(self):
# TODO Add remote configuration for the general configuration file
pass
def __process_connectors_configuration(self):
log.debug("Processing remote connectors configuration...")
self.__prepare_connectors_configuration()
if self.__apply_new_connectors_configuration():
self.__write_new_configuration_files()
def __prepare_connectors_configuration(self):
self.__new_connectors_configs = {}
try:
for connector_type in {connector_type for connector_type in self.__new_configuration if "thingsboard" not in connector_type}:
connector_number = 0
for connector in self.__new_configuration[connector_type]:
log.debug(connector)
log.debug("Processing remote configuration for connector with type \"%s\" and name \"%s\".", connector_type, connector)
if not self.__new_connectors_configs.get(connector_type):
self.__new_connectors_configs[connector_type] = []
self.__new_connectors_configs[connector_type].append({connector_type.lower().replace(" ", "")+str(connector_number)+".json": connector})
connector_number += 1
log.debug("Saved configuration for connectors: %s", ', '.join(con for con in self.__new_connectors_configs))
except Exception as e:
log.exception(e)
def __apply_new_connectors_configuration(self):
try:
self.__gateway._connectors_configs = self.__new_connectors_configs
for connector_name in self.__gateway.available_connectors:
self.__gateway.available_connectors[connector_name].close()
self.__gateway._connect_with_connectors()
log.debug("New connectors configuration has been applied")
return True
except Exception as e:
self.__gateway._connectors_configs = self.__old_connectors_configs
for connector_name in self.__gateway.available_connectors:
self.__gateway.available_connectors[connector_name].close()
self.__gateway._connect_with_connectors()
log.exception(e)
return False
def __write_new_configuration_files(self):
try:
general_edited = False
if self.__new_general_configuration_file and self.__new_general_configuration_file != self.__old_general_configuration_file:
general_edited = True
self.__new_general_configuration_file = self.__new_general_configuration_file if general_edited else self.__old_general_configuration_file
self.__new_connectors_configs = self.__new_connectors_configs if self.__new_connectors_configs else self.__new_connectors_configs
self.__new_general_configuration_file["connectors"] = []
new_connectors_files = []
for connector_type in self.__new_connectors_configs:
for connector_config_section in self.__new_connectors_configs[connector_type]:
for connector_file in connector_config_section:
connector_config = connector_config_section[connector_file]
connector_name = connector_config["broker"]["name"] if connector_config.get("broker") else \
connector_config["server"]["name"] if connector_config.get("server") else \
connector_config["name"] if connector_config.get("name") else None
self.__new_general_configuration_file["connectors"].append(
{
"name": connector_name,
"type": connector_type,
"configuration": connector_file
}
)
with open(self.__gateway._config_dir + connector_file, "w") as config_file:
dump(connector_config, config_file)
new_connectors_files.append(connector_file)
log.debug("Saving new configuration for \"%s\" connector to file \"%s\"", connector_type, connector_file)
for old_connector_type in self.__old_connectors_configs:
for old_connector_config_section in self.__old_connectors_configs[old_connector_type]:
for old_connector_file in old_connector_config_section:
if old_connector_file not in new_connectors_files:
remove(self.__gateway._config_dir+old_connector_file)
log.debug("Remove old configuration file \"%s\" for \"%s\" connector ", old_connector_file, old_connector_type)
if not general_edited:
with open(self.__gateway._config_dir+"tb_gateway.yaml", "w") as general_configuration_file:
safe_dump(self.__new_general_configuration_file, general_configuration_file)
else:
self.safe_apply()
except Exception as e:
log.exception(e)
def safe_apply(self):
# TODO Add check for connection to the ThingsBoard and revert configuration on fail in timeout
pass

View File

@@ -27,6 +27,7 @@ from thingsboard_gateway.gateway.tb_logger import TBLoggerHandler
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
from thingsboard_gateway.storage.memory_event_storage import MemoryEventStorage
from thingsboard_gateway.storage.file_event_storage import FileEventStorage
from thingsboard_gateway.gateway.tb_gateway_remote_configurator import RemoteConfigurator
log = logging.getLogger('service')
@@ -37,8 +38,8 @@ class TBGatewayService:
config_file = path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.yaml'
with open(config_file) as config:
config = safe_load(config)
self.__config_dir = path.dirname(path.abspath(config_file)) + '/'
logging.config.fileConfig(self.__config_dir + "logs.conf")
self._config_dir = path.dirname(path.abspath(config_file)) + '/'
logging.config.fileConfig(self._config_dir + "logs.conf")
global log
log = logging.getLogger('service')
self.available_connectors = {}
@@ -64,7 +65,15 @@ class TBGatewayService:
"file": FileEventStorage,
}
self.__load_connectors(config)
self.__connect_with_connectors()
self._connect_with_connectors()
if config["thingsboard"].get("remoteConfiguration"):
try:
self.__remote_configurator = RemoteConfigurator(self, config)
self.__check_shared_attributes()
except Exception as e:
self.__load_connectors(config)
self._connect_with_connectors()
log.exception(e)
self.__load_persistent_devices()
self.__published_events = Queue(0)
self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
@@ -97,9 +106,9 @@ class TBGatewayService:
if self.available_connectors[connector].is_connected():
connector_camel_case = connector[0].lower() + connector[1:].replace(' ', '')
telemetry[(connector_camel_case + ' EventsProduced').replace(' ', '')] = \
self.available_connectors[connector].statistics['MessagesReceived']
self.available_connectors[connector].statistics['MessagesReceived']
telemetry[(connector_camel_case + ' EventsSent').replace(' ', '')] = \
self.available_connectors[connector].statistics['MessagesSent']
self.available_connectors[connector].statistics['MessagesSent']
self.tb_client.client.send_telemetry(telemetry)
summary_messages['eventsProduced'] += telemetry[
str(connector_camel_case + ' EventsProduced').replace(' ', '')]
@@ -119,8 +128,22 @@ class TBGatewayService:
except Exception as e:
log.error(e)
def __attributes_parse(self, content, *args):
shared_attributes = content.get("shared")
client_attributes = content.get("client")
if shared_attributes is None and client_attributes is None:
self.__remote_configurator.process_configuration(content.get("configuration"))
elif shared_attributes is not None:
if shared_attributes.get("configuration"):
self.__remote_configurator.process_configuration(shared_attributes.get("configuration"))
elif client_attributes is not None:
log.debug("Client attributes received")
def get_config_path(self):
return self.__config_dir
return self._config_dir
def __check_shared_attributes(self):
self.tb_client.client.request_attributes(callback=self.__attributes_parse)
def __load_connectors(self, config):
self._connectors_configs = {}
@@ -145,7 +168,7 @@ class TBGatewayService:
else:
log.error("Connector with config %s - not found", safe_dump(connector))
with open(self.__config_dir + connector['configuration'], 'r') as conf_file:
with open(self._config_dir + connector['configuration'], 'r') as conf_file:
connector_conf = load(conf_file)
if not self._connectors_configs.get(connector['type']):
self._connectors_configs[connector['type']] = []
@@ -153,7 +176,7 @@ class TBGatewayService:
except Exception as e:
log.error(e)
def __connect_with_connectors(self):
def _connect_with_connectors(self):
for connector_type in self._connectors_configs:
for connector_config in self._connectors_configs[connector_type]:
for config_file in connector_config:
@@ -336,9 +359,7 @@ class TBGatewayService:
self.remote_handler.activate(content.get('RemoteLoggingLevel'))
log.info('Remote logging has being activated.')
else:
log.debug('Attributes on the gateway has being updated!')
log.debug(args)
log.debug(content)
self.__attributes_parse(content)
def add_device(self, device_name, content, wait_for_publish=False):
if device_name not in self.__saved_devices:
@@ -365,15 +386,15 @@ class TBGatewayService:
def __load_persistent_devices(self):
devices = {}
if self.__connected_devices_file in listdir(self.__config_dir) and \
path.getsize(self.__config_dir + self.__connected_devices_file) > 0:
if self.__connected_devices_file in listdir(self._config_dir) and \
path.getsize(self._config_dir + self.__connected_devices_file) > 0:
try:
with open(self.__config_dir + self.__connected_devices_file) as devices_file:
with open(self._config_dir + self.__connected_devices_file) as devices_file:
devices = load(devices_file)
except Exception as e:
log.exception(e)
else:
connected_devices_file = open(self.__config_dir + self.__connected_devices_file, 'w')
connected_devices_file = open(self._config_dir + self.__connected_devices_file, 'w')
connected_devices_file.close()
if devices is not None:
@@ -393,7 +414,7 @@ class TBGatewayService:
self.__connected_devices = {} if self.__connected_devices is None else self.__connected_devices
def __save_persistent_devices(self):
with open(self.__config_dir + self.__connected_devices_file, 'w') as config_file:
with open(self._config_dir + self.__connected_devices_file, 'w') as config_file:
try:
data_to_save = {}
for device in self.__connected_devices:

View File

@@ -19,6 +19,7 @@ import time
from simplejson import loads, dumps
from threading import RLock
from threading import Thread
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
import paho.mqtt.client as paho
from jsonschema import Draft7Validator
@@ -158,7 +159,7 @@ class TBDeviceMqttClient:
self._client.on_disconnect = self._on_disconnect
def _on_log(self, client, userdata, level, buf):
log.debug(buf)
log.exception(buf)
def _on_publish(self, client, userdata, result):
# log.debug("Data published to ThingsBoard!")
@@ -213,17 +214,9 @@ class TBDeviceMqttClient:
log.info("Disconnected from ThingsBoard!")
def _on_message(self, client, userdata, message):
content = self._decode(message)
content = TBUtility.decode(message)
self._on_decoded_message(content, message)
@staticmethod
def _decode(message):
content = loads(message.payload.decode("utf-8"))
log.debug(content)
log.debug(message.topic)
return content
@staticmethod
def validate(validator, data):
try:
@@ -350,9 +343,6 @@ class TBDeviceMqttClient:
return self.__device_max_sub_id
def request_attributes(self, client_keys=None, shared_keys=None, callback=None):
if client_keys is None and shared_keys is None:
log.error("There are no keys to request")
return False
msg = {}
if client_keys:
tmp = ""

View File

@@ -17,6 +17,7 @@ import time
from simplejson import dumps
from thingsboard_gateway.tb_client.tb_device_mqtt import TBDeviceMqttClient, DEVICE_TS_KV_VALIDATOR, KV_VALIDATOR
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
GATEWAY_ATTRIBUTES_TOPIC = "v1/gateway/attributes"
GATEWAY_ATTRIBUTES_REQUEST_TOPIC = "v1/gateway/attributes/request"
@@ -50,7 +51,7 @@ class TBGatewayMqttClient(TBDeviceMqttClient):
self._client.subscribe(GATEWAY_RPC_TOPIC + "/+")
def _on_message(self, client, userdata, message):
content = self._decode(message)
content = TBUtility.decode(message)
super()._on_decoded_message(content, message)
self._on_decoded_message(content, message)

View File

@@ -25,6 +25,11 @@ log = getLogger("service")
class TBUtility:
@staticmethod
def decode(message):
content = loads(message.payload.decode("utf-8"))
return content
@staticmethod
def validate_converted_data(data):
json_data = dumps(data)