mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Added logging as telemetry and configuration for this logging from shared attribute RemoteLoggingLevel of gateway in ThingsBoard platform instance.
This commit is contained in:
@@ -48,35 +48,35 @@ formatter=LogFormatter
|
||||
args=(sys.stdout,)
|
||||
|
||||
[handler_connectorHandler]
|
||||
level=DEBUG
|
||||
level=ERROR
|
||||
class=logging.handlers.TimedRotatingFileHandler
|
||||
formatter=LogFormatter
|
||||
args=("./logs/connector.log", 'd', 1, 7,)
|
||||
|
||||
[handler_storageHandler]
|
||||
level=DEBUG
|
||||
level=ERROR
|
||||
class=logging.handlers.TimedRotatingFileHandler
|
||||
formatter=LogFormatter
|
||||
args=("./logs/storage.log", 'd', 1, 7,)
|
||||
|
||||
[handler_serviceHandler]
|
||||
level=DEBUG
|
||||
level=ERROR
|
||||
class=logging.handlers.TimedRotatingFileHandler
|
||||
formatter=LogFormatter
|
||||
args=("./logs/service.log", 'd', 1, 7,)
|
||||
|
||||
[handler_extensionHandler]
|
||||
level=DEBUG
|
||||
level=ERROR
|
||||
class=logging.handlers.TimedRotatingFileHandler
|
||||
formatter=LogFormatter
|
||||
args=("./logs/extension.log", 'd', 1, 3,)
|
||||
|
||||
[handler_tb_connectionHandler]
|
||||
level=DEBUG
|
||||
level=ERROR
|
||||
class=logging.handlers.TimedRotatingFileHandler
|
||||
formatter=LogFormatter
|
||||
args=("./logs/tb_connection.log", 'd', 1, 3,)
|
||||
|
||||
[formatter_LogFormatter]
|
||||
format='%(asctime)s - %(levelname)s - %(module)s - %(lineno)d - %(message)s'
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
|
||||
@@ -41,6 +41,10 @@ class TBClient:
|
||||
# Adding callbacks
|
||||
self.client._client._on_connect = self._on_connect
|
||||
self.client._client._on_disconnect = self._on_disconnect
|
||||
self.client._client._on_log = self._on_log
|
||||
|
||||
def _on_log(self, *args):
|
||||
log.info(args)
|
||||
|
||||
def is_connected(self):
|
||||
return self.client.is_connected
|
||||
|
||||
@@ -20,6 +20,7 @@ from json import load, loads, dumps
|
||||
from os import listdir, path
|
||||
from threading import Thread
|
||||
from thingsboard_gateway.gateway.tb_client import TBClient
|
||||
from thingsboard_gateway.gateway.tb_logger import TBLoggerHandler
|
||||
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
|
||||
from thingsboard_gateway.connectors.mqtt.mqtt_connector import MqttConnector
|
||||
from thingsboard_gateway.connectors.opcua.opcua_connector import OpcUaConnector
|
||||
@@ -49,7 +50,9 @@ class TBGatewayService:
|
||||
self.__rpc_requests_in_progress = {}
|
||||
self.__connected_devices_file = "connected_devices.json"
|
||||
self.tb_client = TBClient(config["thingsboard"])
|
||||
self.tb_client.client.gw_set_server_side_rpc_request_handler(self.__rpc_request_handler)
|
||||
self.main_handler = logging.handlers.MemoryHandler(1000)
|
||||
self.remote_handler = TBLoggerHandler(self)
|
||||
self.main_handler.setTarget(self.remote_handler)
|
||||
self.__implemented_connectors = {
|
||||
"mqtt": MqttConnector,
|
||||
"modbus": ModbusConnector,
|
||||
@@ -65,6 +68,9 @@ class TBGatewayService:
|
||||
self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True)
|
||||
self.__event_storage = self.__event_storage_types[config["storage"]["type"]](config["storage"])
|
||||
self.tb_client.connect()
|
||||
self.tb_client.client.gw_set_server_side_rpc_request_handler(self.__rpc_request_handler)
|
||||
self.tb_client.client.set_server_side_rpc_request_handler(self.__rpc_request_handler)
|
||||
self.tb_client.client.subscribe_to_all_attributes(self.__attribute_update_callback)
|
||||
self.tb_client.client.gw_subscribe_to_all_attributes(self.__attribute_update_callback)
|
||||
self.__send_thread.start()
|
||||
|
||||
@@ -76,18 +82,18 @@ class TBGatewayService:
|
||||
self.__rpc_requests_in_progress[rpc_in_progress][2](rpc_in_progress)
|
||||
self.cancel_rpc_request(rpc_in_progress)
|
||||
|
||||
if time.time() - gateway_statistic_send >= 60000:
|
||||
if int(time.time()*1000) - gateway_statistic_send >= 60000:
|
||||
summary_messages = {"SummaryReceived": 0, "SummarySent": 0}
|
||||
telemetry = {}
|
||||
for connector in self.available_connectors:
|
||||
# if self.available_connectors[connector].is_connected():
|
||||
telemetry[(connector+' MessagesReceived').replace(' ', '')] = self.available_connectors[connector].statistics['MessagesReceived']
|
||||
telemetry[(connector+' MessagesSent').replace(' ', '')] = self.available_connectors[connector].statistics['MessagesSent']
|
||||
self.tb_client.client.send_telemetry(telemetry)
|
||||
summary_messages['SummaryReceived'] += telemetry[(connector+' MessagesReceived').replace(' ', '')]
|
||||
summary_messages['SummarySent'] += telemetry[(connector+' MessagesSent').replace(' ', '')]
|
||||
if self.available_connectors[connector].is_connected():
|
||||
telemetry[(connector+' MessagesReceived').replace(' ', '')] = self.available_connectors[connector].statistics['MessagesReceived']
|
||||
telemetry[(connector+' MessagesSent').replace(' ', '')] = self.available_connectors[connector].statistics['MessagesSent']
|
||||
self.tb_client.client.send_telemetry(telemetry)
|
||||
summary_messages['SummaryReceived'] += telemetry[(connector+' MessagesReceived').replace(' ', '')]
|
||||
summary_messages['SummarySent'] += telemetry[(connector+' MessagesSent').replace(' ', '')]
|
||||
self.tb_client.client.send_telemetry(summary_messages)
|
||||
gateway_statistic_send = time.time()
|
||||
gateway_statistic_send = int(time.time()*1000)
|
||||
|
||||
time.sleep(.1)
|
||||
except Exception as e:
|
||||
@@ -205,6 +211,7 @@ class TBGatewayService:
|
||||
dumps(content))
|
||||
else:
|
||||
log.debug("RPC request with no device param.")
|
||||
log.debug(content)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
@@ -223,8 +230,23 @@ class TBGatewayService:
|
||||
def cancel_rpc_request(self, rpc_request):
|
||||
del self.__rpc_requests_in_progress[rpc_request]
|
||||
|
||||
def __attribute_update_callback(self, content):
|
||||
self.__connected_devices[content["device"]]["connector"].on_attributes_update(content)
|
||||
def __attribute_update_callback(self, content, *args):
|
||||
if content.get('device') is not None:
|
||||
try:
|
||||
self.__connected_devices[content["device"]]["connector"].on_attributes_update(content)
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
else:
|
||||
if content.get('RemoteLoggingLevel') == 'NONE':
|
||||
self.remote_handler.deactivate()
|
||||
log.info('Remote logging has being deactivated.')
|
||||
elif content.get('RemoteLoggingLevel') is not None:
|
||||
self.remote_handler.activate(content.get('RemoteLoggingLevel'))
|
||||
log.info('Remote logging has being activated.')
|
||||
else:
|
||||
log.debug('Attributes on the gateway has being updated!')
|
||||
log.debug(args)
|
||||
log.debug(content)
|
||||
|
||||
def add_device(self, device_name, content, wait_for_publish=False):
|
||||
self.__connected_devices[device_name] = content
|
||||
|
||||
76
thingsboard_gateway/gateway/tb_logger.py
Normal file
76
thingsboard_gateway/gateway/tb_logger.py
Normal file
@@ -0,0 +1,76 @@
|
||||
# Copyright 2019. ThingsBoard
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import logging.handlers
|
||||
from time import time
|
||||
# from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService
|
||||
|
||||
|
||||
class TBLoggerHandler(logging.Handler):
|
||||
def __init__(self, gateway):
|
||||
super().__init__(logging.ERROR)
|
||||
self.__gateway = gateway
|
||||
self.activated = False
|
||||
self.log_levels = {
|
||||
# 'NONE': 0,
|
||||
'DEBUG': 10,
|
||||
'INFO': 20,
|
||||
'WARNING': 30,
|
||||
'ERROR': 40,
|
||||
'FATAL': 50,
|
||||
'CRITICAL': 50,
|
||||
'EXCEPTION': 50
|
||||
|
||||
}
|
||||
self.loggers=['service',
|
||||
'tb_connection',
|
||||
'storage',
|
||||
'extension',
|
||||
'connector'
|
||||
]
|
||||
for logger in self.loggers:
|
||||
log = logging.getLogger(logger)
|
||||
log.addHandler(self.__gateway.main_handler)
|
||||
self.__current_log_level = 'ERROR'
|
||||
|
||||
def emit(self, record):
|
||||
pass
|
||||
|
||||
def activate(self, log_level=None):
|
||||
try:
|
||||
for logger in self.loggers:
|
||||
if log_level is not None and self.log_levels.get(log_level) is not None:
|
||||
log = logging.getLogger(logger)
|
||||
self.__current_log_level = log_level
|
||||
log.setLevel(self.log_levels[log_level])
|
||||
except Exception as e:
|
||||
log = logging.getLogger('service')
|
||||
log.error(e)
|
||||
self.activated = True
|
||||
|
||||
def handle(self, record):
|
||||
if self.activated:
|
||||
self.__form_message(record)
|
||||
self.__gateway.tb_client.client.send_telemetry(self.message, quality_of_service=1)
|
||||
|
||||
def __form_message(self, record):
|
||||
self.message = {'ts': int(time()*1000),
|
||||
'values': {
|
||||
'LOGS': str(record.getMessage())
|
||||
}
|
||||
}
|
||||
|
||||
def deactivate(self):
|
||||
self.activated = False
|
||||
Reference in New Issue
Block a user