1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Improvements for remote configurator

This commit is contained in:
zbeacon
2020-02-24 09:31:05 +02:00
parent fb91fa7041
commit 8750b2b7b3
2 changed files with 55 additions and 44 deletions

View File

@@ -17,6 +17,7 @@ from simplejson import dumps, loads, dump
from yaml import safe_dump from yaml import safe_dump
from time import time, sleep from time import time, sleep
from logging import getLogger from logging import getLogger
from re import findall
from logging.config import fileConfig from logging.config import fileConfig
from logging.handlers import MemoryHandler from logging.handlers import MemoryHandler
from os import remove from os import remove
@@ -48,6 +49,9 @@ class RemoteConfigurator:
try: try:
if not self.in_process: if not self.in_process:
self.in_process = True self.in_process = True
# while not self.__gateway._published_events.empty():
# log.debug("Waiting for end of the data processing...")
# sleep(1)
decoded_configuration = b64decode(configuration) decoded_configuration = b64decode(configuration)
self.__new_configuration = loads(decoded_configuration) self.__new_configuration = loads(decoded_configuration)
self.__old_connectors_configs = self.__gateway.connectors_configs self.__old_connectors_configs = self.__gateway.connectors_configs
@@ -243,23 +247,24 @@ class RemoteConfigurator:
def __update_logs_configuration(self): def __update_logs_configuration(self):
try: try:
if self.__old_logs_configuration == self.__new_logs_configuration: # if self.__old_logs_configuration != self.__new_logs_configuration:
remote_handler_current_state = self.__gateway.remote_handler.activated global log
remote_handler_current_level = self.__gateway.remote_handler.current_log_level log = getLogger('service')
logs_conf_file_path = self.__gateway._config_dir + 'logs.conf' remote_handler_current_state = self.__gateway.remote_handler.activated
with open(logs_conf_file_path, 'w') as logs: remote_handler_current_level = self.__gateway.remote_handler.current_log_level
logs.write(self.__new_logs_configuration+"\r\n") logs_conf_file_path = self.__gateway._config_dir + 'logs.conf'
fileConfig(logs_conf_file_path) new_logging_level = findall(r'level=(.*)', self.__new_logs_configuration.replace("NONE", "NOTSET"))[-1]
self.__gateway.main_handler = MemoryHandler(-1) with open(logs_conf_file_path, 'w') as logs:
self.__gateway.remote_handler = TBLoggerHandler(self.__gateway) logs.write(self.__new_logs_configuration.replace("NONE", "NOTSET")+"\r\n")
self.__gateway.main_handler.setTarget(self.__gateway.remote_handler) fileConfig(logs_conf_file_path)
if remote_handler_current_level != 'NOTSET': self.__gateway.main_handler = MemoryHandler(-1)
self.__gateway.remote_handler.activate(remote_handler_current_level) self.__gateway.remote_handler = TBLoggerHandler(self.__gateway)
if not remote_handler_current_state: self.__gateway.main_handler.setTarget(self.__gateway.remote_handler)
self.__gateway.remote_handler.deactivate() if new_logging_level == "NOTSET":
global log self.__gateway.remote_handler.deactivate()
log = getLogger('service') else:
log.debug("Logs configuration has been updated.") self.__gateway.remote_handler.activate(new_logging_level)
log.debug("Logs configuration has been updated.")
except Exception as e: except Exception as e:
log.exception(e) log.exception(e)

View File

@@ -87,7 +87,7 @@ class TBGatewayService:
if self.__remote_configurator is not None: if self.__remote_configurator is not None:
self.__remote_configurator.send_current_configuration() self.__remote_configurator.send_current_configuration()
self.__load_persistent_devices() self.__load_persistent_devices()
self.__published_events = Queue(0) self._published_events = Queue(0)
self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True, self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
name="Send data to Thingsboard Thread") name="Send data to Thingsboard Thread")
self._send_thread.start() self._send_thread.start()
@@ -119,8 +119,8 @@ class TBGatewayService:
if cur_time - gateway_statistic_send > 5000.0 and self.tb_client.is_connected(): if cur_time - gateway_statistic_send > 5000.0 and self.tb_client.is_connected():
summary_messages = self.__form_statistics() summary_messages = self.__form_statistics()
with self.__lock: # with self.__lock:
self.tb_client.client.send_telemetry(summary_messages) self.tb_client.client.send_telemetry(summary_messages)
gateway_statistic_send = time.time()*1000 gateway_statistic_send = time.time()*1000
# self.__check_shared_attributes() # self.__check_shared_attributes()
except KeyboardInterrupt: except KeyboardInterrupt:
@@ -269,7 +269,9 @@ class TBGatewayService:
try: try:
if self.tb_client.is_connected(): if self.tb_client.is_connected():
size = getsizeof(devices_data_in_event_pack) size = getsizeof(devices_data_in_event_pack)
with self.__lock: # with self.__lock:
events = []
if self.__remote_configurator is None or not self.__remote_configurator.in_process:
events = self._event_storage.get_event_pack() events = self._event_storage.get_event_pack()
if events: if events:
for event in events: for event in events:
@@ -308,12 +310,17 @@ class TBGatewayService:
if devices_data_in_event_pack: if devices_data_in_event_pack:
if not self.tb_client.is_connected(): break if not self.tb_client.is_connected(): break
self.__send_data(devices_data_in_event_pack) self.__send_data(devices_data_in_event_pack)
if self.tb_client.is_connected(): if self.tb_client.is_connected() and (self.__remote_configurator is None or not self.__remote_configurator.in_process):
success = True success = True
while not self.__published_events.empty(): while not self._published_events.empty():
event = self.__published_events.get() if self.__remote_configurator.in_process or not self.tb_client.is_connected() or self._published_events.empty():
break
event = self._published_events.get(True, 10)
try: try:
success = event.get() == event.TB_ERR_SUCCESS if self.tb_client.is_connected() and (self.__remote_configurator is None or not self.__remote_configurator.in_process):
success = event.get() == event.TB_ERR_SUCCESS
else:
break
except Exception as e: except Exception as e:
log.exception(e) log.exception(e)
success = False success = False
@@ -333,25 +340,24 @@ class TBGatewayService:
def __send_data(self, devices_data_in_event_pack): def __send_data(self, devices_data_in_event_pack):
try: try:
with self.__lock: for device in devices_data_in_event_pack:
for device in devices_data_in_event_pack: if devices_data_in_event_pack[device].get("attributes"):
if devices_data_in_event_pack[device].get("attributes"): if device == self.name:
if device == self.name: self._published_events.put(self.tb_client.client.send_attributes(devices_data_in_event_pack[device]["attributes"]))
self.__published_events.put(self.tb_client.client.send_attributes(devices_data_in_event_pack[device]["attributes"])) else:
else: self._published_events.put(self.tb_client.client.gw_send_attributes(device,
self.__published_events.put(self.tb_client.client.gw_send_attributes(device, devices_data_in_event_pack[
devices_data_in_event_pack[ device][
device][ "attributes"]))
"attributes"])) if devices_data_in_event_pack[device].get("telemetry"):
if devices_data_in_event_pack[device].get("telemetry"): if device == self.name:
if device == self.name: self._published_events.put(self.tb_client.client.send_telemetry(devices_data_in_event_pack[device]["telemetry"]))
self.__published_events.put(self.tb_client.client.send_telemetry(devices_data_in_event_pack[device]["telemetry"])) else:
else: self._published_events.put(self.tb_client.client.gw_send_telemetry(device,
self.__published_events.put(self.tb_client.client.gw_send_telemetry(device, devices_data_in_event_pack[
devices_data_in_event_pack[ device][
device][ "telemetry"]))
"telemetry"])) devices_data_in_event_pack[device] = {"telemetry": [], "attributes": {}}
devices_data_in_event_pack[device] = {"telemetry": [], "attributes": {}}
except Exception as e: except Exception as e:
log.exception(e) log.exception(e)