mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Update Remote Configurator
This commit is contained in:
@@ -61,7 +61,8 @@ class StatisticsService(Thread):
|
||||
data_to_send = {}
|
||||
for attribute in self._config:
|
||||
try:
|
||||
process = subprocess.run(attribute['command'], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
process = subprocess.run(['/bin/sh', '-c', attribute['command']], stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
encoding='utf-8', timeout=attribute['timeout'])
|
||||
except Exception as e:
|
||||
self._log.warning("Statistic parameter %s raise the exception: %s",
|
||||
@@ -72,12 +73,12 @@ class StatisticsService(Thread):
|
||||
|
||||
data_to_send[attribute['attributeOnGateway']] = value
|
||||
|
||||
self._gateway.tb_client.client.send_attributes(data_to_send)
|
||||
self._gateway.tb_client.client.send_telemetry(data_to_send)
|
||||
|
||||
if datetime.datetime.now() - self._last_streams_statistics_clear_time >= datetime.timedelta(days=1):
|
||||
self.clear_streams_statistics()
|
||||
|
||||
self._gateway.tb_client.client.send_attributes(StatisticsService.DATA_STREAMS_STATISTICS)
|
||||
self._gateway.tb_client.client.send_telemetry(StatisticsService.DATA_STREAMS_STATISTICS)
|
||||
|
||||
self._last_poll = time()
|
||||
|
||||
|
||||
@@ -28,9 +28,7 @@ from threading import RLock, Thread
|
||||
from time import sleep, time
|
||||
from platform import system as platform_system
|
||||
|
||||
import simplejson
|
||||
from simplejson import JSONDecodeError, dumps, load, loads
|
||||
from yaml import safe_load
|
||||
|
||||
from thingsboard_gateway.gateway.constant_enums import DeviceActions, Status
|
||||
from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME, CONNECTOR_PARAMETER, \
|
||||
@@ -159,23 +157,33 @@ class TBGatewayService:
|
||||
self.__async_device_actions_queue = SimpleQueue()
|
||||
self.__process_async_actions_thread = Thread(target=self.__process_async_device_actions,
|
||||
name="Async device actions processing thread", daemon=True)
|
||||
|
||||
self._config_dir = path.dirname(path.abspath(config_file)) + path.sep
|
||||
if config_file is None:
|
||||
config_file = path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.yaml'.replace('/',
|
||||
config_file = path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.json'.replace('/',
|
||||
path.sep)
|
||||
|
||||
logging_error = None
|
||||
try:
|
||||
with open(self._config_dir + 'logs.json', 'r') as file:
|
||||
log_config = load(file)
|
||||
logging.config.dictConfig(log_config)
|
||||
except Exception as e:
|
||||
logging_error = e
|
||||
|
||||
global log
|
||||
log = logging.getLogger('service')
|
||||
|
||||
with open(config_file) as general_config:
|
||||
self.__config = safe_load(general_config)
|
||||
try:
|
||||
self.__config = load(general_config)
|
||||
except JSONDecodeError:
|
||||
log.error('YAML configuration is deprecated now. Please, use JSON configuration instead.')
|
||||
log.error('See default configuration file on https://github.com/thingsboard/thingsboard-gateway/blob/master/thingsboard_gateway/config/tb_gateway.json')
|
||||
|
||||
# change main config if Gateway running with docker env variables
|
||||
self.__modify_main_config()
|
||||
|
||||
self._config_dir = path.dirname(path.abspath(config_file)) + path.sep
|
||||
logging_error = None
|
||||
try:
|
||||
logging.config.fileConfig(self._config_dir + "logs.conf", disable_existing_loggers=False)
|
||||
except Exception as e:
|
||||
logging_error = e
|
||||
global log
|
||||
log = logging.getLogger('service')
|
||||
log.info("Gateway starting...")
|
||||
self.__updater = TBUpdater()
|
||||
self.__updates_check_period_ms = 300000
|
||||
@@ -208,7 +216,9 @@ class TBGatewayService:
|
||||
global main_handler
|
||||
self.main_handler = main_handler
|
||||
self.remote_handler = TBLoggerHandler(self)
|
||||
self.main_handler.setTarget(self.remote_handler)
|
||||
log.addHandler(self.remote_handler)
|
||||
log.addHandler(self.main_handler)
|
||||
# self.main_handler.setTarget(self.remote_handler)
|
||||
self._default_connectors = DEFAULT_CONNECTORS
|
||||
self.__converted_data_queue = SimpleQueue()
|
||||
self.__save_converted_data_thread = Thread(name="Save converted data", daemon=True,
|
||||
@@ -229,11 +239,10 @@ class TBGatewayService:
|
||||
"device_renamed": self.__process_renamed_gateway_devices,
|
||||
"device_deleted": self.__process_deleted_gateway_devices,
|
||||
}
|
||||
|
||||
self.__remote_shell = None
|
||||
if self.__config["thingsboard"].get("remoteShell"):
|
||||
log.warning("Remote shell is enabled. Please be carefully with this feature.")
|
||||
self.__remote_shell = RemoteShell(platform=self.__updater.get_platform(),
|
||||
release=self.__updater.get_release())
|
||||
self.init_remote_shell(self.__config["thingsboard"].get("remoteShell"))
|
||||
|
||||
self.__rpc_remote_shell_command_in_progress = None
|
||||
self.__scheduled_rpc_calls = []
|
||||
self.__rpc_processing_queue = SimpleQueue()
|
||||
@@ -248,14 +257,12 @@ class TBGatewayService:
|
||||
self.connectors_configs = {}
|
||||
self.__remote_configurator = None
|
||||
self.__request_config_after_connect = False
|
||||
self.__init_remote_configuration()
|
||||
self.__grpc_config = self.__config.get('grpc')
|
||||
|
||||
self.__grpc_config = None
|
||||
self.__grpc_connectors = None
|
||||
self.__grpc_manager = None
|
||||
self.__grpc_connectors = {}
|
||||
if GRPC_LOADED and self.__grpc_config is not None and self.__grpc_config.get("enabled"):
|
||||
self.__process_async_actions_thread.start()
|
||||
self.__grpc_manager = TBGRPCServerManager(self, self.__grpc_config)
|
||||
self.__grpc_manager.set_gateway_read_callbacks(self.__register_connector, self.__unregister_connector)
|
||||
self.init_grpc_service(self.__config.get('grpc'))
|
||||
|
||||
self._load_connectors()
|
||||
self._connect_with_connectors()
|
||||
self.__load_persistent_devices()
|
||||
@@ -267,13 +274,9 @@ class TBGatewayService:
|
||||
thread.start()
|
||||
log.info('Start checking devices idle time')
|
||||
|
||||
self.__statistics = self.__config['thingsboard'].get('statistics', DEFAULT_STATISTIC)
|
||||
self.__statistics = None
|
||||
self.__statistics_service = None
|
||||
if self.__statistics['enable']:
|
||||
self.__statistics_service = StatisticsService(self.__statistics['statsSendPeriodInSeconds'], self, log,
|
||||
config_path=self._config_dir + self.__statistics[
|
||||
'configuration'] if self.__statistics.get(
|
||||
'configuration') else None)
|
||||
self.init_statistics_service(self.__config['thingsboard'].get('statistics', DEFAULT_STATISTIC))
|
||||
|
||||
self._published_events = SimpleQueue()
|
||||
|
||||
@@ -288,14 +291,15 @@ class TBGatewayService:
|
||||
name="Send data to Thingsboard Thread")
|
||||
self._send_thread.start()
|
||||
|
||||
self.__device_filter_config = self.__config['thingsboard'].get('deviceFiltering', DEFAULT_DEVICE_FILTER)
|
||||
self.__device_filter_config = None
|
||||
self.__device_filter = None
|
||||
if self.__device_filter_config['enable']:
|
||||
self.__device_filter = DeviceFilter(config_path=self._config_dir + self.__device_filter_config[
|
||||
'filterFile'] if self.__device_filter_config.get('filterFile') else None)
|
||||
self.__grpc_manager = None
|
||||
self.init_device_filtering(self.__config['thingsboard'].get('deviceFiltering', DEFAULT_DEVICE_FILTER))
|
||||
|
||||
self.__duplicate_detector = DuplicateDetector(self.available_connectors)
|
||||
|
||||
self.__init_remote_configuration()
|
||||
|
||||
log.info("Gateway started.")
|
||||
|
||||
self._watchers_thread = Thread(target=self._watchers, name='Watchers', daemon=True)
|
||||
@@ -318,6 +322,56 @@ class TBGatewayService:
|
||||
self.server = self.manager.get_server()
|
||||
self.server.serve_forever()
|
||||
|
||||
def init_grpc_service(self, config):
|
||||
self.__grpc_config = config
|
||||
self.__grpc_connectors = {}
|
||||
if GRPC_LOADED and self.__grpc_config is not None and self.__grpc_config.get("enabled"):
|
||||
self.__process_async_actions_thread.start()
|
||||
self.__grpc_manager = TBGRPCServerManager(self, self.__grpc_config)
|
||||
self.__grpc_manager.set_gateway_read_callbacks(self.__register_connector, self.__unregister_connector)
|
||||
|
||||
def init_statistics_service(self, config):
|
||||
self.__statistics = config
|
||||
if self.__statistics['enable']:
|
||||
if isinstance(self.__statistics_service, StatisticsService):
|
||||
self.__statistics_service.stop()
|
||||
self.__statistics_service = StatisticsService(self.__statistics['statsSendPeriodInSeconds'], self, log,
|
||||
config_path=self._config_dir + self.__statistics[
|
||||
'configuration'] if self.__statistics.get(
|
||||
'configuration') else None)
|
||||
else:
|
||||
self.__statistics_service = None
|
||||
|
||||
def init_device_filtering(self, config):
|
||||
self.__device_filter_config = config
|
||||
self.__device_filter = None
|
||||
if self.__device_filter_config['enable']:
|
||||
self.__device_filter = DeviceFilter(config_path=self._config_dir + self.__device_filter_config[
|
||||
'filterFile'] if self.__device_filter_config.get('filterFile') else None)
|
||||
else:
|
||||
self.__device_filter = None
|
||||
|
||||
def init_remote_shell(self, enable):
|
||||
self.__remote_shell = None
|
||||
if enable:
|
||||
log.warning("Remote shell is enabled. Please be carefully with this feature.")
|
||||
self.__remote_shell = RemoteShell(platform=self.__updater.get_platform(),
|
||||
release=self.__updater.get_release())
|
||||
else:
|
||||
self.__remote_shell = None
|
||||
|
||||
@property
|
||||
def event_storage_types(self):
|
||||
return self._event_storage_types
|
||||
|
||||
@property
|
||||
def config(self):
|
||||
return self.__config
|
||||
|
||||
@config.setter
|
||||
def config(self, config):
|
||||
self.__config.update(config)
|
||||
|
||||
def get_gateway(self):
|
||||
if self.manager.has_gateway():
|
||||
return self.manager.gateway
|
||||
@@ -546,8 +600,8 @@ class TBGatewayService:
|
||||
def __process_remote_configuration(self, new_configuration):
|
||||
if new_configuration is not None and self.__remote_configurator is not None:
|
||||
try:
|
||||
self.__remote_configurator.process_configuration(new_configuration)
|
||||
self.__remote_configurator.send_current_configuration()
|
||||
self.__remote_configurator.process_config_request(new_configuration)
|
||||
# self.__remote_configurator.send_current_configuration()
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
@@ -612,11 +666,20 @@ class TBGatewayService:
|
||||
|
||||
return connector_persistent_key
|
||||
|
||||
def _load_connectors(self):
|
||||
def load_connectors(self, config=None):
|
||||
self._load_connectors(config=config)
|
||||
|
||||
def _load_connectors(self, config=None):
|
||||
self.connectors_configs = {}
|
||||
connectors_persistent_keys = self.__load_persistent_connector_keys()
|
||||
if self.__config.get("connectors"):
|
||||
for connector in self.__config['connectors']:
|
||||
|
||||
if config:
|
||||
configuration = config.get('connectors')
|
||||
else:
|
||||
configuration = self.__config.get('connectors')
|
||||
|
||||
if configuration:
|
||||
for connector in configuration:
|
||||
try:
|
||||
connector_persistent_key = None
|
||||
if connector['type'] == "grpc" and self.__grpc_manager is None:
|
||||
@@ -626,7 +689,7 @@ class TBGatewayService:
|
||||
|
||||
if connector['type'] != "grpc":
|
||||
connector_class = None
|
||||
if connector.get('useGRPC', True):
|
||||
if connector.get('useGRPC', False):
|
||||
module_name = f'Grpc{self._default_connectors.get(connector["type"], connector.get("class"))}'
|
||||
connector_class = TBModuleLoader.import_module(connector['type'], module_name)
|
||||
|
||||
@@ -683,6 +746,9 @@ class TBGatewayService:
|
||||
self.__init_remote_configuration(force=True)
|
||||
log.info("Remote configuration is enabled forcibly!")
|
||||
|
||||
def connect_with_connectors(self):
|
||||
self._connect_with_connectors()
|
||||
|
||||
def _connect_with_connectors(self):
|
||||
for connector_type in self.connectors_configs:
|
||||
for connector_config in self.connectors_configs[connector_type]:
|
||||
@@ -711,7 +777,7 @@ class TBGatewayService:
|
||||
connector_dir_abs = "/".join(self._config_dir.split("/")[:-2])
|
||||
connector_file_name = f'{connector_type}_connector.py'
|
||||
connector_abs_path = f'{connector_dir_abs}/grpc_connectors/{connector_type}/{connector_file_name}'
|
||||
connector_config_json = simplejson.dumps({
|
||||
connector_config_json = dumps({
|
||||
**connector_config,
|
||||
'gateway': {
|
||||
'host': 'localhost',
|
||||
@@ -1190,7 +1256,7 @@ class TBGatewayService:
|
||||
summary_messages = {"eventsProduced": 0, "eventsSent": 0}
|
||||
telemetry = {}
|
||||
for connector in self.available_connectors:
|
||||
connector_camel_case = connector.lower().replace(' ', '')
|
||||
connector_camel_case = connector.replace(' ', '')
|
||||
telemetry[(connector_camel_case + ' EventsProduced').replace(' ', '')] = \
|
||||
self.available_connectors[connector].statistics['MessagesReceived']
|
||||
self.available_connectors[connector].statistics['MessagesReceived'] = 0
|
||||
@@ -1217,7 +1283,12 @@ class TBGatewayService:
|
||||
self.__connected_devices[device_name] = {**content, "device_type": device_type}
|
||||
self.__saved_devices[device_name] = {**content, "device_type": device_type}
|
||||
self.__save_persistent_devices()
|
||||
device_details = {
|
||||
'connectorType': content['connector'].get_type(),
|
||||
'connectorName': content['connector'].get_name()
|
||||
}
|
||||
self.tb_client.client.gw_connect_device(device_name, device_type)
|
||||
self.tb_client.client.gw_send_attributes(device_name, device_details)
|
||||
|
||||
def update_device(self, device_name, event, content):
|
||||
if event == 'connector' and self.__connected_devices[device_name].get(event) != content:
|
||||
|
||||
Reference in New Issue
Block a user