mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Expanded collection of statistics
This commit is contained in:
@@ -19,12 +19,15 @@ from simplejson import dumps
|
||||
|
||||
from thingsboard_gateway.connectors.mqtt.mqtt_uplink_converter import MqttUplinkConverter, log
|
||||
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
|
||||
from thingsboard_gateway.gateway.statistics_service import StatisticsService
|
||||
|
||||
|
||||
class JsonMqttUplinkConverter(MqttUplinkConverter):
|
||||
def __init__(self, config):
|
||||
self.__config = config.get('converter')
|
||||
|
||||
@StatisticsService.CollectStatistics(start_stat_type='receivedBytesFromDevices',
|
||||
end_stat_type='convertedBytesFromDevice')
|
||||
def convert(self, config, data):
|
||||
datatypes = {"attributes": "attributes",
|
||||
"timeseries": "telemetry"}
|
||||
|
||||
@@ -24,8 +24,10 @@ import simplejson
|
||||
from paho.mqtt.client import Client
|
||||
|
||||
from thingsboard_gateway.connectors.connector import Connector, log
|
||||
from thingsboard_gateway.connectors.mqtt.mqtt_decorators import CustomCollectStatistics
|
||||
from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader
|
||||
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
|
||||
from thingsboard_gateway.gateway.statistics_service import StatisticsService
|
||||
|
||||
|
||||
class MqttConnector(Connector, Thread):
|
||||
@@ -601,6 +603,7 @@ class MqttConnector(Connector, Thread):
|
||||
|
||||
self._client.publish(topic, data, retain=retain).wait_for_publish()
|
||||
|
||||
@StatisticsService.CollectAllReceivedBytesStatistics(start_stat_type='allReceivedBytesFromTB')
|
||||
def on_attributes_update(self, content):
|
||||
if self.__attribute_updates:
|
||||
for attribute_update in self.__attribute_updates:
|
||||
@@ -622,8 +625,8 @@ class MqttConnector(Connector, Thread):
|
||||
except KeyError as e:
|
||||
log.exception("Cannot form topic, key %s - not found", e)
|
||||
raise e
|
||||
self._client.publish(topic, data,
|
||||
retain=attribute_update.get('retain', False)).wait_for_publish()
|
||||
|
||||
self._publish(topic, data, attribute_update.get('retain', False))
|
||||
self.__log.debug("Attribute Update data: %s for device %s to topic: %s", data,
|
||||
content["device"], topic)
|
||||
else:
|
||||
@@ -633,6 +636,7 @@ class MqttConnector(Connector, Thread):
|
||||
else:
|
||||
self.__log.error("Attribute updates config not found.")
|
||||
|
||||
@StatisticsService.CollectAllReceivedBytesStatistics(start_stat_type='allReceivedBytesFromTB')
|
||||
def server_side_rpc_handler(self, content):
|
||||
self.__log.info("Incoming server-side RPC: %s", content)
|
||||
|
||||
@@ -705,7 +709,7 @@ class MqttConnector(Connector, Thread):
|
||||
|
||||
try:
|
||||
self.__log.info("Publishing to: %s with data %s", request_topic, data_to_send)
|
||||
self._client.publish(request_topic, data_to_send, retain=rpc_config.get('retain', False))
|
||||
self._publish(request_topic, data_to_send, rpc_config.get('retain', False))
|
||||
|
||||
if not expects_response or not defines_timeout:
|
||||
self.__log.info("One-way RPC: sending ack to ThingsBoard immediately")
|
||||
@@ -720,6 +724,10 @@ class MqttConnector(Connector, Thread):
|
||||
|
||||
self.__log.error("RPC not handled: %s", content)
|
||||
|
||||
@CustomCollectStatistics(start_stat_type='allBytesSentToDevices')
|
||||
def _publish(self, request_topic, data_to_send, retain):
|
||||
self._client.publish(request_topic, data_to_send, retain).wait_for_publish()
|
||||
|
||||
def rpc_cancel_processing(self, topic):
|
||||
log.info("RPC canceled or terminated. Unsubscribing from %s", topic)
|
||||
self._client.unsubscribe(topic)
|
||||
|
||||
15
thingsboard_gateway/connectors/mqtt/mqtt_decorators.py
Normal file
15
thingsboard_gateway/connectors/mqtt/mqtt_decorators.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from thingsboard_gateway.gateway.statistics_service import StatisticsService
|
||||
|
||||
|
||||
class CustomCollectStatistics(StatisticsService.CollectStatistics):
|
||||
def __call__(self, func):
|
||||
def inner(*args, **kwargs):
|
||||
try:
|
||||
_, __, data, ___ = args
|
||||
self.collect(self.start_stat_type, data)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
func(*args, **kwargs)
|
||||
|
||||
return inner
|
||||
@@ -1,3 +1,4 @@
|
||||
import datetime
|
||||
import subprocess
|
||||
from threading import Thread
|
||||
from time import time, sleep
|
||||
@@ -6,6 +7,14 @@ import simplejson
|
||||
|
||||
|
||||
class StatisticsService(Thread):
|
||||
DATA_STREAMS_STATISTICS = {
|
||||
'receivedBytesFromDevices': 0,
|
||||
'convertedBytesFromDevice': 0,
|
||||
'allReceivedBytesFromTB': 0,
|
||||
'allBytesSentToTB': 0,
|
||||
'allBytesSentToDevices': 0,
|
||||
}
|
||||
|
||||
def __init__(self, config_path, stats_send_period_in_seconds, gateway, log):
|
||||
super().__init__()
|
||||
self.name = 'Statistics Thread'
|
||||
@@ -18,6 +27,7 @@ class StatisticsService(Thread):
|
||||
self._log = log
|
||||
self._config = self._load_config()
|
||||
self._last_poll = 0
|
||||
self._last_streams_statistics_clear_time = datetime.datetime.now()
|
||||
|
||||
self.start()
|
||||
|
||||
@@ -28,6 +38,20 @@ class StatisticsService(Thread):
|
||||
with open(self._config_path, 'r') as file:
|
||||
return simplejson.load(file)
|
||||
|
||||
@classmethod
|
||||
def add_bytes(cls, stat_type, bytes_count):
|
||||
cls.DATA_STREAMS_STATISTICS[stat_type] += bytes_count
|
||||
|
||||
@classmethod
|
||||
def clear_streams_statistics(cls):
|
||||
cls.DATA_STREAMS_STATISTICS = {
|
||||
'receivedBytesFromDevices': 0,
|
||||
'convertedBytesFromDevice': 0,
|
||||
'allReceivedBytesFromTB': 0,
|
||||
'allBytesSentToTB': 0,
|
||||
'allBytesSentToDevices': 0,
|
||||
}
|
||||
|
||||
def run(self) -> None:
|
||||
while not self._stopped:
|
||||
if time() - self._last_poll >= self._stats_send_period_in_seconds:
|
||||
@@ -46,6 +70,65 @@ class StatisticsService(Thread):
|
||||
|
||||
self._gateway.tb_client.client.send_attributes(data_to_send)
|
||||
|
||||
if datetime.datetime.now() - self._last_streams_statistics_clear_time >= datetime.timedelta(days=1):
|
||||
self.clear_streams_statistics()
|
||||
|
||||
self._gateway.tb_client.client.send_attributes(StatisticsService.DATA_STREAMS_STATISTICS)
|
||||
|
||||
self._last_poll = time()
|
||||
|
||||
sleep(.2)
|
||||
|
||||
class CollectStatistics:
|
||||
def __init__(self, start_stat_type, end_stat_type=None):
|
||||
self.start_stat_type = start_stat_type
|
||||
self.end_stat_type = end_stat_type
|
||||
|
||||
def __call__(self, func):
|
||||
def inner(*args, **kwargs):
|
||||
try:
|
||||
_, __, data = args
|
||||
self.collect(self.start_stat_type, data)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
result = func(*args, **kwargs)
|
||||
if result and self.end_stat_type:
|
||||
self.collect(self.end_stat_type, result)
|
||||
|
||||
return result
|
||||
|
||||
return inner
|
||||
|
||||
@staticmethod
|
||||
def collect(stat_type, data):
|
||||
bytes_count = str(data).__sizeof__()
|
||||
StatisticsService.add_bytes(stat_type, bytes_count)
|
||||
|
||||
class CollectAllReceivedBytesStatistics(CollectStatistics):
|
||||
def __call__(self, func):
|
||||
def inner(*args, **kwargs):
|
||||
try:
|
||||
_, data = args
|
||||
self.collect(self.start_stat_type, data)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
func(*args, **kwargs)
|
||||
|
||||
return inner
|
||||
|
||||
class CollectAllSentTBBytesStatistics(CollectAllReceivedBytesStatistics):
|
||||
def __call__(self, func):
|
||||
return super().__call__(func)
|
||||
|
||||
class CollectRPCReplyStatistics(CollectStatistics):
|
||||
def __call__(self, func):
|
||||
def inner(*args, **kwargs):
|
||||
data = kwargs.get('content')
|
||||
if data:
|
||||
self.collect(self.start_stat_type, data)
|
||||
|
||||
func(*args, **kwargs)
|
||||
|
||||
return inner
|
||||
|
||||
@@ -821,6 +821,7 @@ class TBGatewayService:
|
||||
log.exception(e)
|
||||
sleep(1)
|
||||
|
||||
@StatisticsService.CollectAllSentTBBytesStatistics(start_stat_type='allBytesSentToTB')
|
||||
def __send_data(self, devices_data_in_event_pack):
|
||||
try:
|
||||
for device in devices_data_in_event_pack:
|
||||
@@ -963,6 +964,7 @@ class TBGatewayService:
|
||||
log.info("Outgoing RPC. Device: %s, ID: %d", device, req_id)
|
||||
self.send_rpc_reply(device, req_id, content)
|
||||
|
||||
@StatisticsService.CollectRPCReplyStatistics(start_stat_type='all_bytes_sent')
|
||||
def send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None, wait_for_publish=None,
|
||||
quality_of_service=0):
|
||||
self.__rpc_processing_queue.put((device, req_id, content, success_sent, wait_for_publish, quality_of_service))
|
||||
|
||||
Reference in New Issue
Block a user