1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Added wait for stop for connectors

This commit is contained in:
imbeacon
2024-04-29 12:09:10 +03:00
parent 8e0c15576a
commit 90d10a718c
2 changed files with 28 additions and 15 deletions

View File

@@ -26,7 +26,7 @@ from signal import signal, SIGINT
from string import ascii_lowercase, hexdigits from string import ascii_lowercase, hexdigits
from sys import argv, executable, getsizeof from sys import argv, executable, getsizeof
from threading import RLock, Thread, main_thread, current_thread from threading import RLock, Thread, main_thread, current_thread
from time import sleep, time from time import sleep, time, monotonic
from simplejson import JSONDecodeError, dumps, load, loads from simplejson import JSONDecodeError, dumps, load, loads
from yaml import safe_load from yaml import safe_load
@@ -533,10 +533,15 @@ class TBGatewayService:
def __close_connectors(self): def __close_connectors(self):
for current_connector in self.available_connectors_by_id: for current_connector in self.available_connectors_by_id:
try: try:
self.available_connectors_by_id[current_connector].close() close_start = monotonic()
while not self.available_connectors_by_id[current_connector].is_stopped():
self.available_connectors_by_id[current_connector].close()
if monotonic() - close_start > 5:
log.error("Connector %s close timeout", current_connector)
break
log.debug("Connector %s closed connection.", current_connector) log.debug("Connector %s closed connection.", current_connector)
except Exception as e: except Exception as e:
log.exception(e) log.exception("Error while closing connector %s", current_connector, exc_info=e)
def __stop_gateway(self): def __stop_gateway(self):
self.stopped = True self.stopped = True
@@ -883,14 +888,6 @@ class TBGatewayService:
self.available_connectors_by_id[connector_id] = connector self.available_connectors_by_id[connector_id] = connector
self.available_connectors_by_name[connector_name] = connector self.available_connectors_by_name[connector_name] = connector
connector.open() connector.open()
elif available_connector is not None \
and not available_connector.is_stopped() \
and connector_name != available_connector.name:
available_connector.name = connector_name
del self.available_connectors_by_name[available_connector.name]
self.available_connectors_by_name[connector_name] = available_connector
log.info("[%r] Connector %s was renamed to %s", connector_id,
available_connector.name, connector_name)
else: else:
log.warning("[%r] Connector with name %s already exists and not stopped!", log.warning("[%r] Connector with name %s already exists and not stopped!",
connector_id, connector_name) connector_id, connector_name)
@@ -901,7 +898,7 @@ class TBGatewayService:
except Exception as e: except Exception as e:
log.error("[%r] Error on loading connector %r: %r", connector_id, connector_name, e) log.error("[%r] Error on loading connector %r: %r", connector_id, connector_name, e)
log.exception(e, attr_name=connector_name) log.exception(e, attr_name=connector_name)
if connector is not None: if connector is not None and not connector.is_stopped():
connector.close() connector.close()
else: else:
self.__grpc_connectors.update({connector_config['grpc_key']: connector_config}) self.__grpc_connectors.update({connector_config['grpc_key']: connector_config})

View File

@@ -15,7 +15,7 @@
import os.path import os.path
from logging import getLogger from logging import getLogger
from logging.config import dictConfig from logging.config import dictConfig
from time import sleep, time from time import sleep, time, monotonic
from regex import fullmatch from regex import fullmatch
from simplejson import dumps, load from simplejson import dumps, load
@@ -503,9 +503,25 @@ class RemoteConfigurator:
if connector_configuration is None: if connector_configuration is None:
connector_configuration = found_connector connector_configuration = found_connector
if connector_configuration.get('id') in self._gateway.available_connectors_by_id: if connector_configuration.get('id') in self._gateway.available_connectors_by_id:
self._gateway.available_connectors_by_id[connector_configuration['id']].close() try:
close_start = monotonic()
while not self._gateway.available_connectors_by_id[connector_configuration['id']].stopped:
self._gateway.available_connectors_by_id[connector_configuration['id']].close()
if monotonic() - close_start > 5:
LOG.error('Connector %s not stopped in 5 seconds', connector_configuration['id'])
break
except Exception as e:
LOG.exception("Exception on closing connector occurred:", exc_info=e)
elif connector_configuration.get('name') in self._gateway.available_connectors_by_name: elif connector_configuration.get('name') in self._gateway.available_connectors_by_name:
self._gateway.available_connectors_by_name[connector_configuration['name']].close() try:
close_start = monotonic()
while not self._gateway.available_connectors_by_name[connector_configuration['name']].stopped:
self._gateway.available_connectors_by_name[connector_configuration['name']].close()
if monotonic() - close_start > 5:
LOG.error('Connector %s not stopped in 5 seconds', connector_configuration['name'])
break
except Exception as e:
LOG.exception("Exception on closing connector occurred:", exc_info=e)
else: else:
LOG.warning('Connector with id %s not found in available connectors', connector_configuration.get('id')) LOG.warning('Connector with id %s not found in available connectors', connector_configuration.get('id'))
if connector_configuration.get('id') in self._gateway.available_connectors_by_id: if connector_configuration.get('id') in self._gateway.available_connectors_by_id: