1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Refactored Gateway initialization

This commit is contained in:
samson0v
2022-09-05 16:26:43 +03:00
parent eaad65d0c6
commit 935655e0b1

View File

@@ -15,6 +15,7 @@
import logging
import logging.config
import logging.handlers
from signal import signal, SIGINT
import subprocess
from copy import deepcopy
from os import execv, listdir, path, pathsep, stat, system
@@ -88,6 +89,8 @@ def load_file(path_to_file):
class TBGatewayService:
def __init__(self, config_file=None):
signal(SIGINT, lambda _, __: self.__stop_gateway())
self.stopped = False
self.__lock = RLock()
self.async_device_actions = {
@@ -216,19 +219,27 @@ class TBGatewayService:
self.__min_pack_send_delay_ms = self.__config['thingsboard'].get('minPackSendDelayMS', 500) / 1000.0
log.info("Gateway started.")
self._watchers_thread = Thread(target=self._watchers, name='Watchers', daemon=True)
self._watchers_thread.start()
def _watchers(self):
try:
gateway_statistic_send = 0
connectors_configuration_check_time = 0
while not self.stopped:
cur_time = time() * 1000
if not self.tb_client.is_connected() and self.__subscribed_to_rpc_topics:
self.__subscribed_to_rpc_topics = False
if self.tb_client.is_connected() and not self.__subscribed_to_rpc_topics:
for device in self.__saved_devices:
self.add_device(device, {"connector": self.__saved_devices[device]["connector"]},
device_type=self.__saved_devices[device]["device_type"])
self.subscribe_to_required_topics()
self.__subscribed_to_rpc_topics = True
if self.__scheduled_rpc_calls:
for rpc_call_index in range(len(self.__scheduled_rpc_calls)):
rpc_call = self.__scheduled_rpc_calls[rpc_call_index]
@@ -239,8 +250,10 @@ class TBGatewayService:
result = rpc_call[1]["function"](*rpc_call[1]["arguments"])
except Exception as e:
log.exception(e)
if result == 256:
log.warning("Error on RPC command: 256. Permission denied.")
if (
self.__rpc_requests_in_progress or not self.__rpc_register_queue.empty()) and self.tb_client.is_connected():
new_rpc_request_in_progress = {}
@@ -264,6 +277,7 @@ class TBGatewayService:
except Exception as e:
log.exception(e)
break
if not self.__request_config_after_connect and self.tb_client.is_connected() and not self.tb_client.client.get_subscriptions_in_progress():
self.__request_config_after_connect = True
self.__check_shared_attributes()
@@ -284,9 +298,6 @@ class TBGatewayService:
if cur_time - self.__updates_check_time >= self.__updates_check_period_ms:
self.__updates_check_time = time() * 1000
self.version = self.__updater.get_version()
except KeyboardInterrupt:
self.__stop_gateway()
except Exception as e:
log.exception(e)
self.__stop_gateway()
@@ -607,7 +618,7 @@ class TBGatewayService:
return Status.FAILURE
def __send_to_storage(self):
while True:
while not self.stopped:
try:
if not self.__converted_data_queue.empty():
connector_name, event = self.__converted_data_queue.get(True, 100)
@@ -1169,7 +1180,7 @@ class TBGatewayService:
check_devices_idle_every_sec = self.__devices_idle_checker.get('inactivityCheckPeriodSeconds', 1)
disconnect_device_after_idle = self.__devices_idle_checker.get('inactivityTimeoutSeconds', 50)
while True:
while not self.stopped:
for_deleting = []
for (device_name, device) in self.__connected_devices.items():
ts = time()