1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Merge branch 'thingsboard:master' into dev/opcua-asyncio

This commit is contained in:
Vitalii Bidochka
2022-05-18 10:30:21 +00:00
committed by GitHub
3 changed files with 98 additions and 2 deletions

View File

@@ -0,0 +1,51 @@
import subprocess
from threading import Thread
from time import time, sleep
import simplejson
class StatisticsService(Thread):
def __init__(self, config_path, stats_send_period_in_seconds, gateway, log):
super().__init__()
self.name = 'Statistics Thread'
self.daemon = True
self._stopped = False
self._config_path = config_path
self._stats_send_period_in_seconds = stats_send_period_in_seconds / 1000
self._gateway = gateway
self._log = log
self._config = self._load_config()
self._last_poll = 0
self.start()
def stop(self):
self._stopped = True
def _load_config(self):
with open(self._config_path, 'r') as file:
return simplejson.load(file)
def run(self) -> None:
while not self._stopped:
if time() - self._last_poll >= self._stats_send_period_in_seconds:
data_to_send = {}
for attribute in self._config:
process = subprocess.run(attribute['command'], stdout=subprocess.PIPE, stderr=subprocess.PIPE,
encoding='utf-8', timeout=attribute['timeout'])
if process.returncode != 0:
self._log.error("Statistic parameter raise the exception: %s", process.stderr)
continue
value = process.stdout
data_to_send[attribute['attributeOnGateway']] = value
self._gateway.tb_client.client.send_attributes(data_to_send)
self._last_poll = time()
sleep(.2)

View File

@@ -32,6 +32,7 @@ from yaml import safe_load
from thingsboard_gateway.gateway.constant_enums import DeviceActions, Status
from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME, CONNECTOR_PARAMETER, \
PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME
from thingsboard_gateway.gateway.statistics_service import StatisticsService
from thingsboard_gateway.gateway.tb_client import TBClient
from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage
from thingsboard_gateway.storage.memory.memory_event_storage import MemoryEventStorage
@@ -71,6 +72,11 @@ DEFAULT_CONNECTORS = {
"socket": "SocketConnector"
}
DEFAULT_STATISTIC = {
'enable': True,
'statsSendPeriodInSeconds': 3600
}
def load_file(path_to_file):
content = None
@@ -194,6 +200,12 @@ class TBGatewayService:
thread.start()
log.info('Start checking devices idle time')
self.__statistics = self.__config['thingsboard'].get('statistics', DEFAULT_STATISTIC)
if self.__statistics['enable'] and self.__statistics.get('configuration'):
statistics_config_path = self._config_dir + self.__statistics['configuration']
self.__statistics_service = StatisticsService(statistics_config_path,
self.__statistics['statsSendPeriodInSeconds'], self, log)
self._published_events = SimpleQueue()
self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True,
name="Send data to Thingsboard Thread")
@@ -253,8 +265,8 @@ class TBGatewayService:
self.__request_config_after_connect = True
self.__check_shared_attributes()
if cur_time - gateway_statistic_send > self.__config["thingsboard"].get("statsSendPeriodInSeconds",
3600) * 1000 and self.tb_client.is_connected():
if cur_time - gateway_statistic_send > self.__statistics[
'statsSendPeriodInSeconds'] * 1000 and self.tb_client.is_connected():
summary_messages = self.__form_statistics()
# with self.__lock:
self.tb_client.client.send_telemetry(summary_messages)
@@ -291,6 +303,7 @@ class TBGatewayService:
self.stopped = True
self.__updater.stop()
log.info("Stopping...")
self.__statistics_service.stop()
if self.__grpc_manager is not None:
self.__grpc_manager.stop()
self.__close_connectors()