From 90d10a718c0b58c1a9f776a34e9ba9024d6238cb Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 29 Apr 2024 12:09:10 +0300 Subject: [PATCH] Added wait for stop for connectors --- .../gateway/tb_gateway_service.py | 21 ++++++++---------- .../tb_gateway_remote_configurator.py | 22 ++++++++++++++++--- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index ebf73ff5..a8cf3aed 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -26,7 +26,7 @@ from signal import signal, SIGINT from string import ascii_lowercase, hexdigits from sys import argv, executable, getsizeof from threading import RLock, Thread, main_thread, current_thread -from time import sleep, time +from time import sleep, time, monotonic from simplejson import JSONDecodeError, dumps, load, loads from yaml import safe_load @@ -533,10 +533,15 @@ class TBGatewayService: def __close_connectors(self): for current_connector in self.available_connectors_by_id: try: - self.available_connectors_by_id[current_connector].close() + close_start = monotonic() + while not self.available_connectors_by_id[current_connector].is_stopped(): + self.available_connectors_by_id[current_connector].close() + if monotonic() - close_start > 5: + log.error("Connector %s close timeout", current_connector) + break log.debug("Connector %s closed connection.", current_connector) except Exception as e: - log.exception(e) + log.exception("Error while closing connector %s", current_connector, exc_info=e) def __stop_gateway(self): self.stopped = True @@ -883,14 +888,6 @@ class TBGatewayService: self.available_connectors_by_id[connector_id] = connector self.available_connectors_by_name[connector_name] = connector connector.open() - elif available_connector is not None \ - and not available_connector.is_stopped() \ - and connector_name != available_connector.name: - available_connector.name = connector_name - del self.available_connectors_by_name[available_connector.name] - self.available_connectors_by_name[connector_name] = available_connector - log.info("[%r] Connector %s was renamed to %s", connector_id, - available_connector.name, connector_name) else: log.warning("[%r] Connector with name %s already exists and not stopped!", connector_id, connector_name) @@ -901,7 +898,7 @@ class TBGatewayService: except Exception as e: log.error("[%r] Error on loading connector %r: %r", connector_id, connector_name, e) log.exception(e, attr_name=connector_name) - if connector is not None: + if connector is not None and not connector.is_stopped(): connector.close() else: self.__grpc_connectors.update({connector_config['grpc_key']: connector_config}) diff --git a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py index 0d59ccf4..48450d0f 100644 --- a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py @@ -15,7 +15,7 @@ import os.path from logging import getLogger from logging.config import dictConfig -from time import sleep, time +from time import sleep, time, monotonic from regex import fullmatch from simplejson import dumps, load @@ -503,9 +503,25 @@ class RemoteConfigurator: if connector_configuration is None: connector_configuration = found_connector if connector_configuration.get('id') in self._gateway.available_connectors_by_id: - self._gateway.available_connectors_by_id[connector_configuration['id']].close() + try: + close_start = monotonic() + while not self._gateway.available_connectors_by_id[connector_configuration['id']].stopped: + self._gateway.available_connectors_by_id[connector_configuration['id']].close() + if monotonic() - close_start > 5: + LOG.error('Connector %s not stopped in 5 seconds', connector_configuration['id']) + break + except Exception as e: + LOG.exception("Exception on closing connector occurred:", exc_info=e) elif connector_configuration.get('name') in self._gateway.available_connectors_by_name: - self._gateway.available_connectors_by_name[connector_configuration['name']].close() + try: + close_start = monotonic() + while not self._gateway.available_connectors_by_name[connector_configuration['name']].stopped: + self._gateway.available_connectors_by_name[connector_configuration['name']].close() + if monotonic() - close_start > 5: + LOG.error('Connector %s not stopped in 5 seconds', connector_configuration['name']) + break + except Exception as e: + LOG.exception("Exception on closing connector occurred:", exc_info=e) else: LOG.warning('Connector with id %s not found in available connectors', connector_configuration.get('id')) if connector_configuration.get('id') in self._gateway.available_connectors_by_id: