diff --git a/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py b/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py index 9fa39ccc..1f757517 100644 --- a/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py +++ b/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py @@ -19,12 +19,15 @@ from simplejson import dumps from thingsboard_gateway.connectors.mqtt.mqtt_uplink_converter import MqttUplinkConverter, log from thingsboard_gateway.tb_utility.tb_utility import TBUtility +from thingsboard_gateway.gateway.statistics_service import StatisticsService class JsonMqttUplinkConverter(MqttUplinkConverter): def __init__(self, config): self.__config = config.get('converter') + @StatisticsService.CollectStatistics(start_stat_type='receivedBytesFromDevices', + end_stat_type='convertedBytesFromDevice') def convert(self, config, data): datatypes = {"attributes": "attributes", "timeseries": "telemetry"} diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py index e9532fd3..927b279b 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py @@ -24,8 +24,10 @@ import simplejson from paho.mqtt.client import Client from thingsboard_gateway.connectors.connector import Connector, log +from thingsboard_gateway.connectors.mqtt.mqtt_decorators import CustomCollectStatistics from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader from thingsboard_gateway.tb_utility.tb_utility import TBUtility +from thingsboard_gateway.gateway.statistics_service import StatisticsService class MqttConnector(Connector, Thread): @@ -601,6 +603,7 @@ class MqttConnector(Connector, Thread): self._client.publish(topic, data, retain=retain).wait_for_publish() + @StatisticsService.CollectAllReceivedBytesStatistics(start_stat_type='allReceivedBytesFromTB') def on_attributes_update(self, content): if self.__attribute_updates: for attribute_update in self.__attribute_updates: @@ -622,8 +625,8 @@ class MqttConnector(Connector, Thread): except KeyError as e: log.exception("Cannot form topic, key %s - not found", e) raise e - self._client.publish(topic, data, - retain=attribute_update.get('retain', False)).wait_for_publish() + + self._publish(topic, data, attribute_update.get('retain', False)) self.__log.debug("Attribute Update data: %s for device %s to topic: %s", data, content["device"], topic) else: @@ -633,6 +636,7 @@ class MqttConnector(Connector, Thread): else: self.__log.error("Attribute updates config not found.") + @StatisticsService.CollectAllReceivedBytesStatistics(start_stat_type='allReceivedBytesFromTB') def server_side_rpc_handler(self, content): self.__log.info("Incoming server-side RPC: %s", content) @@ -705,7 +709,7 @@ class MqttConnector(Connector, Thread): try: self.__log.info("Publishing to: %s with data %s", request_topic, data_to_send) - self._client.publish(request_topic, data_to_send, retain=rpc_config.get('retain', False)) + self._publish(request_topic, data_to_send, rpc_config.get('retain', False)) if not expects_response or not defines_timeout: self.__log.info("One-way RPC: sending ack to ThingsBoard immediately") @@ -720,6 +724,10 @@ class MqttConnector(Connector, Thread): self.__log.error("RPC not handled: %s", content) + @CustomCollectStatistics(start_stat_type='allBytesSentToDevices') + def _publish(self, request_topic, data_to_send, retain): + self._client.publish(request_topic, data_to_send, retain).wait_for_publish() + def rpc_cancel_processing(self, topic): log.info("RPC canceled or terminated. Unsubscribing from %s", topic) self._client.unsubscribe(topic) diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_decorators.py b/thingsboard_gateway/connectors/mqtt/mqtt_decorators.py new file mode 100644 index 00000000..6de726a9 --- /dev/null +++ b/thingsboard_gateway/connectors/mqtt/mqtt_decorators.py @@ -0,0 +1,15 @@ +from thingsboard_gateway.gateway.statistics_service import StatisticsService + + +class CustomCollectStatistics(StatisticsService.CollectStatistics): + def __call__(self, func): + def inner(*args, **kwargs): + try: + _, __, data, ___ = args + self.collect(self.start_stat_type, data) + except ValueError: + pass + + func(*args, **kwargs) + + return inner diff --git a/thingsboard_gateway/gateway/statistics_service.py b/thingsboard_gateway/gateway/statistics_service.py index d9dc3c5e..6df499e1 100644 --- a/thingsboard_gateway/gateway/statistics_service.py +++ b/thingsboard_gateway/gateway/statistics_service.py @@ -1,3 +1,4 @@ +import datetime import subprocess from threading import Thread from time import time, sleep @@ -6,6 +7,14 @@ import simplejson class StatisticsService(Thread): + DATA_STREAMS_STATISTICS = { + 'receivedBytesFromDevices': 0, + 'convertedBytesFromDevice': 0, + 'allReceivedBytesFromTB': 0, + 'allBytesSentToTB': 0, + 'allBytesSentToDevices': 0, + } + def __init__(self, config_path, stats_send_period_in_seconds, gateway, log): super().__init__() self.name = 'Statistics Thread' @@ -18,6 +27,7 @@ class StatisticsService(Thread): self._log = log self._config = self._load_config() self._last_poll = 0 + self._last_streams_statistics_clear_time = datetime.datetime.now() self.start() @@ -28,6 +38,20 @@ class StatisticsService(Thread): with open(self._config_path, 'r') as file: return simplejson.load(file) + @classmethod + def add_bytes(cls, stat_type, bytes_count): + cls.DATA_STREAMS_STATISTICS[stat_type] += bytes_count + + @classmethod + def clear_streams_statistics(cls): + cls.DATA_STREAMS_STATISTICS = { + 'receivedBytesFromDevices': 0, + 'convertedBytesFromDevice': 0, + 'allReceivedBytesFromTB': 0, + 'allBytesSentToTB': 0, + 'allBytesSentToDevices': 0, + } + def run(self) -> None: while not self._stopped: if time() - self._last_poll >= self._stats_send_period_in_seconds: @@ -46,6 +70,65 @@ class StatisticsService(Thread): self._gateway.tb_client.client.send_attributes(data_to_send) + if datetime.datetime.now() - self._last_streams_statistics_clear_time >= datetime.timedelta(days=1): + self.clear_streams_statistics() + + self._gateway.tb_client.client.send_attributes(StatisticsService.DATA_STREAMS_STATISTICS) + self._last_poll = time() sleep(.2) + + class CollectStatistics: + def __init__(self, start_stat_type, end_stat_type=None): + self.start_stat_type = start_stat_type + self.end_stat_type = end_stat_type + + def __call__(self, func): + def inner(*args, **kwargs): + try: + _, __, data = args + self.collect(self.start_stat_type, data) + except ValueError: + pass + + result = func(*args, **kwargs) + if result and self.end_stat_type: + self.collect(self.end_stat_type, result) + + return result + + return inner + + @staticmethod + def collect(stat_type, data): + bytes_count = str(data).__sizeof__() + StatisticsService.add_bytes(stat_type, bytes_count) + + class CollectAllReceivedBytesStatistics(CollectStatistics): + def __call__(self, func): + def inner(*args, **kwargs): + try: + _, data = args + self.collect(self.start_stat_type, data) + except ValueError: + pass + + func(*args, **kwargs) + + return inner + + class CollectAllSentTBBytesStatistics(CollectAllReceivedBytesStatistics): + def __call__(self, func): + return super().__call__(func) + + class CollectRPCReplyStatistics(CollectStatistics): + def __call__(self, func): + def inner(*args, **kwargs): + data = kwargs.get('content') + if data: + self.collect(self.start_stat_type, data) + + func(*args, **kwargs) + + return inner diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 21b43b22..eff45697 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -821,6 +821,7 @@ class TBGatewayService: log.exception(e) sleep(1) + @StatisticsService.CollectAllSentTBBytesStatistics(start_stat_type='allBytesSentToTB') def __send_data(self, devices_data_in_event_pack): try: for device in devices_data_in_event_pack: @@ -963,6 +964,7 @@ class TBGatewayService: log.info("Outgoing RPC. Device: %s, ID: %d", device, req_id) self.send_rpc_reply(device, req_id, content) + @StatisticsService.CollectRPCReplyStatistics(start_stat_type='all_bytes_sent') def send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None, wait_for_publish=None, quality_of_service=0): self.__rpc_processing_queue.put((device, req_id, content, success_sent, wait_for_publish, quality_of_service))