1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Fix for RPC after remote configuration update

This commit is contained in:
imbeacon
2024-01-11 15:48:17 +02:00
parent 5576a4115e
commit 1a70320577
3 changed files with 62 additions and 34 deletions

View File

@@ -278,8 +278,7 @@ class RESTConnector(Connector, Thread):
self.__log.debug('Response from RPC request: %s', response)
self.__gateway.send_rpc_reply(device=device,
req_id=content["data"].get('id'),
content=response[2] if response and len(
response) >= 3 else response)
content=response[2] if response and len(response) >= 3 else response)
else:
for rpc_request in self.__rpc_requests:
if fullmatch(rpc_request["deviceNameFilter"], content["device"]) and \
@@ -325,21 +324,21 @@ class RESTConnector(Connector, Thread):
}
for request_section in requests_from_tb:
for request_config_object in self.__config.get(request_section, []):
uplink_converter = TBModuleLoader.import_module(self._connector_type,
request_config_object.get("extension",
self._default_converters[
"uplink"]))(
request_config_object, self.__log)
downlink_converter = TBModuleLoader.import_module(self._connector_type,
request_config_object.get("extension",
self._default_converters[
"downlink"]))(
request_config_object, self.__log)
uplink_imported_class = TBModuleLoader.import_module(self._connector_type, request_config_object.get("extension", self._default_converters["uplink"]))
uplink_converter = uplink_imported_class(request_config_object, self.__log)
downlink_imported_class = TBModuleLoader.import_module(self._connector_type, request_config_object.get("extension", self._default_converters["downlink"]))
downlink_converter = downlink_imported_class(request_config_object, self.__log)
request_dict = {**request_config_object,
"uplink_converter": uplink_converter,
"downlink_converter": downlink_converter,
}
requests_from_tb[request_section].append(request_dict)
self.__log.debug("Requests from TB: %s", requests_from_tb)
self.__rpc_requests = requests_from_tb["serverSideRpc"]
self.__attribute_updates = requests_from_tb["attributeUpdates"]
def __send_request(self, request_dict, converter_queue, logger, with_queue=True):
url = ""
@@ -376,8 +375,21 @@ class RESTConnector(Connector, Thread):
params["headers"] = request_dict["config"]["httpHeaders"]
logger.debug("Request to %s will be sent", url)
response = request_dict["request"](**params)
data_to_storage = [url, request_dict["config"]["uplink_converter"]]
response = None
data_to_storage = []
try:
response = request_dict["request"](**params)
except Timeout:
logger.error("Timeout error on request %s.", url)
data_to_storage.append({"error": "Timeout", "code": 408})
except RequestException as e:
logger.error("Cannot connect to %s. Request exception.", url)
data_to_storage.append({"error": str(e)})
logger.debug(e)
except ConnectionError:
logger.error("Cannot connect to %s. Connection error.", url)
data_to_storage.append({"error": f"Cannot connect to target url: {url}"})
if response and response.ok:
try:
@@ -391,12 +403,13 @@ class RESTConnector(Connector, Thread):
converter_queue.put(data_to_storage)
self.statistics["MessagesReceived"] = self.statistics["MessagesReceived"] + 1
else:
logger.error("Request to URL: %s finished with code: %i. Cat information: http://http.cat/%i",
url,
response.status_code,
response.status_code)
logger.debug("Response: %r", response.text)
data_to_storage.append({"error": response.reason, "code": response.status_code})
if response is not None:
logger.error("Request to URL: %s finished with code: %i. Cat information: http://http.cat/%i",
url,
response.status_code,
response.status_code)
logger.debug("Response: %r", response.text)
data_to_storage.append({"error": response.reason, "code": response.status_code})
if with_queue:
converter_queue.put(data_to_storage)
@@ -405,14 +418,6 @@ class RESTConnector(Connector, Thread):
if not with_queue:
return data_to_storage
except Timeout:
logger.error("Timeout error on request %s.", url)
except RequestException as e:
logger.error("Cannot connect to %s. Connection error.", url)
logger.debug(e)
except ConnectionError:
logger.error("Cannot connect to %s. Connection error.", url)
except Exception as e:
logger.exception(e)

View File

@@ -1377,9 +1377,12 @@ class TBGatewayService:
self.tb_client.client.gw_send_attributes(device_name, device_details)
def update_device(self, device_name, event, content):
should_save = False
if event == 'connector' and self.__connected_devices[device_name].get(event) != content:
self.__save_persistent_devices()
should_save = True
self.__connected_devices[device_name][event] = content
if should_save:
self.__save_persistent_devices()
def del_device_async(self, data):
if data['deviceName'] in self.__saved_devices:

View File

@@ -459,11 +459,23 @@ class RemoteConfigurator:
try:
config_file_name = config['configuration']
found_connectors = list(filter(lambda item: item['id'] == config['id'], self.connectors_configuration))
connector_id = TBUtility.get_or_create_connector_id(config.get("configurationJson"))
identifier_parameter = 'id' if config.get('id') else 'name'
found_connectors = list(filter(lambda item: item[identifier_parameter] == config[identifier_parameter],
self.connectors_configuration))
if (config.get('configurationJson')
and config.get('configurationJson').get('id') is None
and len(found_connectors) > 0
and found_connectors[0].get('configurationJson') is not None
and found_connectors[0].get('configurationJson').get('id') is not None):
connector_id = TBUtility.get_or_create_connector_id(found_connectors[0].get("configurationJson"))
else:
connector_id = TBUtility.get_or_create_connector_id(config.get('configurationJson'))
if not found_connectors:
connector_configuration = {'name': config['name'], 'type': config['type'],
'id': connector_id,'configuration': config_file_name}
connector_configuration = {'name': config['name'],
'type': config['type'],
'id': connector_id,
'configuration': config_file_name}
if config.get('key'):
connector_configuration['key'] = config['key']
@@ -498,7 +510,7 @@ class RemoteConfigurator:
changed = True
connector_configuration = None
if (found_connector.get('id') != config['id']
if (found_connector.get('id') != connector_id
or found_connector.get('name') != config['name']
or found_connector.get('type') != config['type']
or found_connector.get('class') != config.get('class')
@@ -528,10 +540,18 @@ class RemoteConfigurator:
self._gateway.available_connectors_by_id[connector_configuration['id']].close()
self._gateway.available_connectors_by_id.pop(connector_configuration['id'])
if self._gateway.available_connectors_by_name.get(connector_configuration['name']):
self._gateway.available_connectors_by_name.pop(connector_configuration['name'])
self._gateway.load_connectors(self._get_general_config_in_local_format())
self._gateway.connect_with_connectors()
for device_name in self._gateway.get_devices().keys():
for connector_id in self._gateway.available_connectors_by_id.keys():
if (self._gateway.available_connectors_by_id.get(connector_id)
and self._gateway.available_connectors_by_id[connector_id].get_id() == connector_id):
self._gateway.update_device(device_name, "connector", self._gateway.available_connectors_by_id[connector_id])
self._gateway.tb_client.client.send_attributes({config['name']: config})
except Exception as e:
LOG.exception(e)