mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Changed logging and improved remote configuration
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -20,3 +20,4 @@ thingsboard_gateway/storage/data/
|
||||
/data/
|
||||
/logs/
|
||||
__pycache__
|
||||
/thingsboard_gateway/config/
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
log = logging.getLogger("connector")
|
||||
log = logging.getLogger("tb_gateway.connector")
|
||||
|
||||
|
||||
class Connector(ABC):
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
log = logging.getLogger("converter")
|
||||
log = logging.getLogger("tb_gateway.converter")
|
||||
|
||||
|
||||
class Converter(ABC):
|
||||
|
||||
@@ -109,12 +109,12 @@ class MqttConnector(Connector, Thread):
|
||||
time.sleep(1)
|
||||
|
||||
def close(self):
|
||||
self.__stopped = True
|
||||
try:
|
||||
self._client.disconnect()
|
||||
except:
|
||||
pass
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
self._client.loop_stop()
|
||||
self.__stopped = True
|
||||
self.__log.info('%s has been stopped.', self.get_name())
|
||||
|
||||
def get_name(self):
|
||||
|
||||
@@ -17,7 +17,7 @@ import time
|
||||
from thingsboard_gateway.tb_client.tb_gateway_mqtt import TBGatewayMqttClient
|
||||
import threading
|
||||
|
||||
log = logging.getLogger("tb_connection")
|
||||
log = logging.getLogger("tb_gateway.tb_connection")
|
||||
|
||||
|
||||
class TBClient(threading.Thread):
|
||||
@@ -52,7 +52,10 @@ class TBClient(threading.Thread):
|
||||
self.start()
|
||||
|
||||
def _on_log(self, *args):
|
||||
log.debug(args)
|
||||
if "exception" in args[-1]:
|
||||
log.exception(args[-1])
|
||||
else:
|
||||
log.debug(args)
|
||||
|
||||
def pause(self):
|
||||
self.__paused = True
|
||||
@@ -67,6 +70,10 @@ class TBClient(threading.Thread):
|
||||
log.debug('TB client %s connected to ThingsBoard', str(client))
|
||||
self.client._on_connect(client, userdata, flags, rc, *extra_params)
|
||||
|
||||
def _on_subscribe(self, client, userdata, flags, rc, *extra_params):
|
||||
log.debug('TB client %s connected to ThingsBoard', str(client))
|
||||
self.client(client, userdata, flags, rc, *extra_params)
|
||||
|
||||
def _on_disconnect(self, client, userdata, rc):
|
||||
log.info("TB client %s has been disconnected.", str(client))
|
||||
self.client._on_disconnect(client, userdata, rc)
|
||||
@@ -104,7 +111,7 @@ class TBClient(threading.Thread):
|
||||
log.exception(e)
|
||||
time.sleep(10)
|
||||
|
||||
while True:
|
||||
while not self.__stopped:
|
||||
try:
|
||||
if not self.__stopped:
|
||||
time.sleep(1)
|
||||
|
||||
@@ -19,8 +19,9 @@ from time import time, sleep
|
||||
from logging import getLogger
|
||||
from os import remove
|
||||
from thingsboard_gateway.gateway.tb_client import TBClient
|
||||
from thingsboard_gateway.gateway.tb_logger import TBLoggerHandler
|
||||
|
||||
log = getLogger("service")
|
||||
log = getLogger("tb_gateway.service")
|
||||
|
||||
|
||||
class RemoteConfigurator:
|
||||
@@ -56,7 +57,7 @@ class RemoteConfigurator:
|
||||
current_configuration[connector].append(config_file[config])
|
||||
current_configuration["thingsboard"] = self.__old_general_configuration_file
|
||||
encoded_current_configuration = b64encode(dumps(current_configuration).encode())
|
||||
self.__gateway.tb_client.client.send_attributes({"current_configuration": encoded_current_configuration.decode("UTF-8")}).get()
|
||||
self.__gateway.tb_client.client.send_attributes({"current_configuration": encoded_current_configuration.decode("UTF-8")})
|
||||
|
||||
def __process_connectors_configuration(self):
|
||||
log.debug("Processing remote connectors configuration...")
|
||||
@@ -101,13 +102,6 @@ class RemoteConfigurator:
|
||||
def __write_new_configuration_files(self):
|
||||
try:
|
||||
general_edited = True
|
||||
# general_edited = False
|
||||
# if self.__new_general_configuration_file and self.__new_general_configuration_file != self.__old_general_configuration_file:
|
||||
# general_edited = False
|
||||
# if self.__new_general_configuration_file["thingsboard"] != self.__old_general_configuration_file["thingsboard"]:
|
||||
# general_edited = True
|
||||
# if self.__new_general_configuration_file["storage"] != self.__old_general_configuration_file["storage"]:
|
||||
# general_edited = True
|
||||
self.__new_general_configuration_file = self.__new_general_configuration_file if general_edited else self.__old_general_configuration_file
|
||||
self.__new_connectors_configs = self.__new_connectors_configs if self.__new_connectors_configs else self.__new_connectors_configs
|
||||
self.__new_general_configuration_file["connectors"] = []
|
||||
@@ -162,7 +156,7 @@ class RemoteConfigurator:
|
||||
log.exception(e)
|
||||
self.__revert_configuration()
|
||||
return False
|
||||
connection_state = False
|
||||
# connection_state = False
|
||||
try:
|
||||
tb_client = TBClient(self.__new_general_configuration_file["thingsboard"])
|
||||
tb_client.connect()
|
||||
|
||||
@@ -29,7 +29,7 @@ from thingsboard_gateway.storage.memory_event_storage import MemoryEventStorage
|
||||
from thingsboard_gateway.storage.file_event_storage import FileEventStorage
|
||||
from thingsboard_gateway.gateway.tb_gateway_remote_configurator import RemoteConfigurator
|
||||
|
||||
log = logging.getLogger('service')
|
||||
log = logging.getLogger('tb_gateway.service')
|
||||
|
||||
|
||||
class TBGatewayService:
|
||||
@@ -41,7 +41,7 @@ class TBGatewayService:
|
||||
self._config_dir = path.dirname(path.abspath(config_file)) + '/'
|
||||
logging.config.fileConfig(self._config_dir + "logs.conf")
|
||||
global log
|
||||
log = logging.getLogger('service')
|
||||
log = logging.getLogger('tb_gateway.service')
|
||||
self.available_connectors = {}
|
||||
self.__connector_incoming_messages = {}
|
||||
self.__connected_devices = {}
|
||||
@@ -70,16 +70,26 @@ class TBGatewayService:
|
||||
"file": FileEventStorage,
|
||||
}
|
||||
self._event_storage = self._event_storage_types[config["storage"]["type"]](config["storage"])
|
||||
self.__load_connectors(config)
|
||||
self._connectors_configs = {}
|
||||
self._load_connectors(config)
|
||||
self._connect_with_connectors()
|
||||
self.__remote_configurator = None
|
||||
if config["thingsboard"].get("remoteConfiguration"):
|
||||
try:
|
||||
self.__remote_configurator = RemoteConfigurator(self, config)
|
||||
self.__check_shared_attributes()
|
||||
|
||||
def check_attribute_after_connection(gateway:TBGatewayService):
|
||||
while not gateway.tb_client.is_connected():
|
||||
time.sleep(1)
|
||||
log.debug("Request for shared attribute has been sent.")
|
||||
info = gateway.tb_client.client.request_attributes(callback=gateway._attributes_parse)
|
||||
|
||||
self.__checking_thread = Thread(target=check_attribute_after_connection,
|
||||
args=(self,),
|
||||
name="Check shared attributes on connect",
|
||||
daemon=True).start()
|
||||
|
||||
except Exception as e:
|
||||
self.__load_connectors(config)
|
||||
self._connect_with_connectors()
|
||||
log.exception(e)
|
||||
if self.__remote_configurator is not None:
|
||||
self.__remote_configurator.send_current_configuration()
|
||||
@@ -100,7 +110,11 @@ class TBGatewayService:
|
||||
self.cancel_rpc_request(rpc_in_progress)
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
time.sleep(1)
|
||||
try:
|
||||
time.sleep(1)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
break
|
||||
|
||||
if cur_time - gateway_statistic_send > 60.0 and self.tb_client.is_connected():
|
||||
summary_messages = {"eventsProduced": 0, "eventsSent": 0}
|
||||
@@ -119,35 +133,51 @@ class TBGatewayService:
|
||||
str(connector_camel_case + ' EventsSent').replace(' ', '')]
|
||||
self.tb_client.client.send_telemetry(summary_messages)
|
||||
gateway_statistic_send = time.time()
|
||||
except KeyboardInterrupt as e:
|
||||
log.info("Stopping...")
|
||||
self.__close_connectors()
|
||||
log.info("The gateway has been stopped.")
|
||||
self.tb_client.stop()
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
for device in self.__connected_devices:
|
||||
log.debug("Close connection for device %s", device)
|
||||
try:
|
||||
current_connector = self.__connected_devices[device].get("connector")
|
||||
if current_connector is not None:
|
||||
current_connector.close()
|
||||
log.debug("Connector %s closed connection.", current_connector.get_name())
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
self.__close_connectors()
|
||||
log.info("The gateway has been stopped.")
|
||||
self.tb_client.stop()
|
||||
|
||||
def __attributes_parse(self, content, *args):
|
||||
def __close_connectors(self):
|
||||
for current_connector in self.available_connectors:
|
||||
try:
|
||||
self.available_connectors[current_connector].close()
|
||||
log.debug("Connector %s closed connection.", current_connector)
|
||||
log.debug(current_connector)
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
|
||||
def _attributes_parse(self, content, *args):
|
||||
try:
|
||||
shared_attributes = content.get("shared")
|
||||
client_attributes = content.get("client")
|
||||
if shared_attributes is not None:
|
||||
if self.__remote_configurator is not None and shared_attributes.get("configuration"):
|
||||
log.debug("Received data: %s", content)
|
||||
if content is not None:
|
||||
shared_attributes = content.get("shared")
|
||||
client_attributes = content.get("client")
|
||||
if shared_attributes is not None:
|
||||
if self.__remote_configurator is not None and shared_attributes.get("configuration"):
|
||||
try:
|
||||
self.__remote_configurator.process_configuration(shared_attributes.get("configuration"))
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
if client_attributes is not None:
|
||||
log.debug("Client attributes received (%s).", ", ".join([attr for attr in client_attributes.keys()]))
|
||||
if self.__remote_configurator is not None and content.get("configuration"):
|
||||
try:
|
||||
self.__remote_configurator.process_configuration(shared_attributes.get("configuration"))
|
||||
self.__remote_configurator.process_configuration(content.get("configuration"))
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
elif client_attributes is not None:
|
||||
log.debug("Client attributes received")
|
||||
if self.__remote_configurator is not None and content.get("configuration"):
|
||||
try:
|
||||
self.__remote_configurator.process_configuration(content.get("configuration"))
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
if (shared_attributes is not None and shared_attributes.get('RemoteLoggingLevel') == 'NONE') or content.get("RemoteLoggingLevel") == 'NONE':
|
||||
self.remote_handler.deactivate()
|
||||
log.info('Remote logging has being deactivated.')
|
||||
elif (shared_attributes is not None and shared_attributes.get('RemoteLoggingLevel') is not None) or content.get("RemoteLoggingLevel") is not None:
|
||||
self.remote_handler.activate(content.get('RemoteLoggingLevel'))
|
||||
log.info('Remote logging has being activated.')
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
@@ -155,9 +185,9 @@ class TBGatewayService:
|
||||
return self._config_dir
|
||||
|
||||
def __check_shared_attributes(self):
|
||||
self.tb_client.client.request_attributes(callback=self.__attributes_parse)
|
||||
self.tb_client.client.request_attributes(callback=self._attributes_parse).wait_for_publish()
|
||||
|
||||
def __load_connectors(self, config):
|
||||
def _load_connectors(self, config):
|
||||
self._connectors_configs = {}
|
||||
if not config.get("connectors"):
|
||||
raise Exception("Configuration for connectors not found, check your config file.")
|
||||
@@ -235,7 +265,7 @@ class TBGatewayService:
|
||||
|
||||
def __read_data_from_storage(self):
|
||||
devices_data_in_event_pack = {}
|
||||
while True:
|
||||
while not True:
|
||||
try:
|
||||
if self.tb_client.is_connected():
|
||||
size = getsizeof(devices_data_in_event_pack)
|
||||
@@ -303,7 +333,7 @@ class TBGatewayService:
|
||||
del devices_data_in_event_pack
|
||||
devices_data_in_event_pack = {}
|
||||
else:
|
||||
break
|
||||
continue
|
||||
else:
|
||||
time.sleep(.01)
|
||||
else:
|
||||
@@ -364,14 +394,7 @@ class TBGatewayService:
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
else:
|
||||
if content.get('RemoteLoggingLevel') == 'NONE':
|
||||
self.remote_handler.deactivate()
|
||||
log.info('Remote logging has being deactivated.')
|
||||
elif content.get('RemoteLoggingLevel') is not None:
|
||||
self.remote_handler.activate(content.get('RemoteLoggingLevel'))
|
||||
log.info('Remote logging has being activated.')
|
||||
else:
|
||||
self.__attributes_parse(content)
|
||||
self._attributes_parse(content)
|
||||
|
||||
def add_device(self, device_name, content, wait_for_publish=False):
|
||||
if device_name not in self.__saved_devices:
|
||||
|
||||
@@ -20,7 +20,7 @@ from time import time
|
||||
|
||||
class TBLoggerHandler(logging.Handler):
|
||||
def __init__(self, gateway):
|
||||
super().__init__(logging.ERROR)
|
||||
super().__init__(logging.DEBUG)
|
||||
self.__gateway = gateway
|
||||
self.activated = False
|
||||
self.log_levels = {
|
||||
@@ -43,7 +43,7 @@ class TBLoggerHandler(logging.Handler):
|
||||
for logger in self.loggers:
|
||||
log = logging.getLogger(logger)
|
||||
log.addHandler(self.__gateway.main_handler)
|
||||
self.__current_log_level = 'ERROR'
|
||||
self.__current_log_level = 'DEBUG'
|
||||
|
||||
def emit(self, record):
|
||||
pass
|
||||
@@ -53,11 +53,12 @@ class TBLoggerHandler(logging.Handler):
|
||||
for logger in self.loggers:
|
||||
if log_level is not None and self.log_levels.get(log_level) is not None:
|
||||
log = logging.getLogger(logger)
|
||||
# log.addHandler(self)
|
||||
self.__current_log_level = log_level
|
||||
log.setLevel(self.log_levels[log_level])
|
||||
except Exception as e:
|
||||
log = logging.getLogger('service')
|
||||
log.error(e)
|
||||
log = logging.getLogger('tb_gateway.service')
|
||||
log.exception(e)
|
||||
self.activated = True
|
||||
|
||||
def handle(self, record):
|
||||
|
||||
@@ -12,8 +12,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from logging import getLogger
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
log = getLogger("tb_gateway.storage")
|
||||
|
||||
class EventStorage(ABC):
|
||||
|
||||
|
||||
@@ -17,8 +17,9 @@ from os import remove
|
||||
from os.path import exists
|
||||
from base64 import b64decode
|
||||
from simplejson import load, dumps, JSONDecodeError
|
||||
from thingsboard_gateway.storage.file_event_storage import log
|
||||
from thingsboard_gateway.storage.event_storage_files import EventStorageFiles
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings, log
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings
|
||||
from thingsboard_gateway.storage.event_storage_reader_pointer import EventStorageReaderPointer
|
||||
|
||||
|
||||
|
||||
@@ -12,8 +12,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from thingsboard_gateway.storage.file_event_storage import log
|
||||
from thingsboard_gateway.storage.event_storage_files import EventStorageFiles
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings, log
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings
|
||||
from time import time
|
||||
from io import BufferedWriter, FileIO
|
||||
from os import linesep, open as os_open, O_CREAT, O_EXCL
|
||||
|
||||
@@ -12,11 +12,11 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from thingsboard_gateway.storage.event_storage import EventStorage
|
||||
from thingsboard_gateway.storage.event_storage import EventStorage, log
|
||||
from thingsboard_gateway.storage.event_storage_files import EventStorageFiles
|
||||
from thingsboard_gateway.storage.event_storage_writer import EventStorageWriter
|
||||
from thingsboard_gateway.storage.event_storage_reader import EventStorageReader
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings, log
|
||||
from thingsboard_gateway.storage.file_event_storage_settings import FileEventStorageSettings
|
||||
from random import choice
|
||||
from string import ascii_lowercase
|
||||
import os
|
||||
|
||||
@@ -12,10 +12,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from logging import getLogger
|
||||
|
||||
log = getLogger('storage')
|
||||
|
||||
|
||||
class FileEventStorageSettings:
|
||||
def __init__(self, config):
|
||||
|
||||
@@ -12,12 +12,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from thingsboard_gateway.storage.event_storage import EventStorage
|
||||
from thingsboard_gateway.storage.event_storage import EventStorage, log
|
||||
import queue
|
||||
from logging import getLogger
|
||||
|
||||
log = getLogger("storage")
|
||||
|
||||
|
||||
class MemoryEventStorage(EventStorage):
|
||||
def __init__(self, config):
|
||||
|
||||
@@ -83,7 +83,8 @@ ATTRIBUTES_TOPIC = 'v1/devices/me/attributes'
|
||||
ATTRIBUTES_TOPIC_REQUEST = 'v1/devices/me/attributes/request/'
|
||||
ATTRIBUTES_TOPIC_RESPONSE = 'v1/devices/me/attributes/response/'
|
||||
TELEMETRY_TOPIC = 'v1/devices/me/telemetry'
|
||||
log = logging.getLogger("tb_connection")
|
||||
log = logging.getLogger("tb_gateway.tb_connection")
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class TBTimeoutException(Exception):
|
||||
@@ -375,6 +376,7 @@ class TBDeviceMqttClient:
|
||||
self.__attr_request_number += 1
|
||||
self._attr_request_dict.update({self.__attr_request_number: callback})
|
||||
attr_request_number = self.__attr_request_number
|
||||
log.debug(attr_request_number)
|
||||
return attr_request_number
|
||||
|
||||
def __timeout_check(self):
|
||||
@@ -382,7 +384,7 @@ class TBDeviceMqttClient:
|
||||
try:
|
||||
item = self.__timeout_queue.get()
|
||||
if item is not None:
|
||||
while True:
|
||||
while not True:
|
||||
current_ts_in_millis = int(round(time.time() * 1000))
|
||||
if current_ts_in_millis > item["ts"]:
|
||||
break
|
||||
|
||||
@@ -25,7 +25,8 @@ GATEWAY_ATTRIBUTES_RESPONSE_TOPIC = "v1/gateway/attributes/response"
|
||||
GATEWAY_MAIN_TOPIC = "v1/gateway/"
|
||||
GATEWAY_RPC_TOPIC = "v1/gateway/rpc"
|
||||
|
||||
log = logging.getLogger("tb_connection")
|
||||
log = logging.getLogger("tb_gateway.tb_connection")
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class TBGatewayAPI:
|
||||
|
||||
Reference in New Issue
Block a user