From 1a7032057727e55bb5d4dd45519cf3c100feadc9 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Thu, 11 Jan 2024 15:48:17 +0200 Subject: [PATCH] Fix for RPC after remote configuration update --- .../connectors/rest/rest_connector.py | 61 ++++++++++--------- .../gateway/tb_gateway_service.py | 5 +- .../tb_gateway_remote_configurator.py | 30 +++++++-- 3 files changed, 62 insertions(+), 34 deletions(-) diff --git a/thingsboard_gateway/connectors/rest/rest_connector.py b/thingsboard_gateway/connectors/rest/rest_connector.py index 5a545346..53e87299 100644 --- a/thingsboard_gateway/connectors/rest/rest_connector.py +++ b/thingsboard_gateway/connectors/rest/rest_connector.py @@ -278,8 +278,7 @@ class RESTConnector(Connector, Thread): self.__log.debug('Response from RPC request: %s', response) self.__gateway.send_rpc_reply(device=device, req_id=content["data"].get('id'), - content=response[2] if response and len( - response) >= 3 else response) + content=response[2] if response and len(response) >= 3 else response) else: for rpc_request in self.__rpc_requests: if fullmatch(rpc_request["deviceNameFilter"], content["device"]) and \ @@ -325,21 +324,21 @@ class RESTConnector(Connector, Thread): } for request_section in requests_from_tb: for request_config_object in self.__config.get(request_section, []): - uplink_converter = TBModuleLoader.import_module(self._connector_type, - request_config_object.get("extension", - self._default_converters[ - "uplink"]))( - request_config_object, self.__log) - downlink_converter = TBModuleLoader.import_module(self._connector_type, - request_config_object.get("extension", - self._default_converters[ - "downlink"]))( - request_config_object, self.__log) + + uplink_imported_class = TBModuleLoader.import_module(self._connector_type, request_config_object.get("extension", self._default_converters["uplink"])) + uplink_converter = uplink_imported_class(request_config_object, self.__log) + + downlink_imported_class = TBModuleLoader.import_module(self._connector_type, request_config_object.get("extension", self._default_converters["downlink"])) + downlink_converter = downlink_imported_class(request_config_object, self.__log) + request_dict = {**request_config_object, "uplink_converter": uplink_converter, "downlink_converter": downlink_converter, } requests_from_tb[request_section].append(request_dict) + self.__log.debug("Requests from TB: %s", requests_from_tb) + self.__rpc_requests = requests_from_tb["serverSideRpc"] + self.__attribute_updates = requests_from_tb["attributeUpdates"] def __send_request(self, request_dict, converter_queue, logger, with_queue=True): url = "" @@ -376,8 +375,21 @@ class RESTConnector(Connector, Thread): params["headers"] = request_dict["config"]["httpHeaders"] logger.debug("Request to %s will be sent", url) - response = request_dict["request"](**params) - data_to_storage = [url, request_dict["config"]["uplink_converter"]] + response = None + data_to_storage = [] + try: + response = request_dict["request"](**params) + + except Timeout: + logger.error("Timeout error on request %s.", url) + data_to_storage.append({"error": "Timeout", "code": 408}) + except RequestException as e: + logger.error("Cannot connect to %s. Request exception.", url) + data_to_storage.append({"error": str(e)}) + logger.debug(e) + except ConnectionError: + logger.error("Cannot connect to %s. Connection error.", url) + data_to_storage.append({"error": f"Cannot connect to target url: {url}"}) if response and response.ok: try: @@ -391,12 +403,13 @@ class RESTConnector(Connector, Thread): converter_queue.put(data_to_storage) self.statistics["MessagesReceived"] = self.statistics["MessagesReceived"] + 1 else: - logger.error("Request to URL: %s finished with code: %i. Cat information: http://http.cat/%i", - url, - response.status_code, - response.status_code) - logger.debug("Response: %r", response.text) - data_to_storage.append({"error": response.reason, "code": response.status_code}) + if response is not None: + logger.error("Request to URL: %s finished with code: %i. Cat information: http://http.cat/%i", + url, + response.status_code, + response.status_code) + logger.debug("Response: %r", response.text) + data_to_storage.append({"error": response.reason, "code": response.status_code}) if with_queue: converter_queue.put(data_to_storage) @@ -405,14 +418,6 @@ class RESTConnector(Connector, Thread): if not with_queue: return data_to_storage - - except Timeout: - logger.error("Timeout error on request %s.", url) - except RequestException as e: - logger.error("Cannot connect to %s. Connection error.", url) - logger.debug(e) - except ConnectionError: - logger.error("Cannot connect to %s. Connection error.", url) except Exception as e: logger.exception(e) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 7878092c..73956283 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -1377,9 +1377,12 @@ class TBGatewayService: self.tb_client.client.gw_send_attributes(device_name, device_details) def update_device(self, device_name, event, content): + should_save = False if event == 'connector' and self.__connected_devices[device_name].get(event) != content: - self.__save_persistent_devices() + should_save = True self.__connected_devices[device_name][event] = content + if should_save: + self.__save_persistent_devices() def del_device_async(self, data): if data['deviceName'] in self.__saved_devices: diff --git a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py index a61caa55..56c3e27a 100644 --- a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py @@ -459,11 +459,23 @@ class RemoteConfigurator: try: config_file_name = config['configuration'] - found_connectors = list(filter(lambda item: item['id'] == config['id'], self.connectors_configuration)) - connector_id = TBUtility.get_or_create_connector_id(config.get("configurationJson")) + identifier_parameter = 'id' if config.get('id') else 'name' + found_connectors = list(filter(lambda item: item[identifier_parameter] == config[identifier_parameter], + self.connectors_configuration)) + + if (config.get('configurationJson') + and config.get('configurationJson').get('id') is None + and len(found_connectors) > 0 + and found_connectors[0].get('configurationJson') is not None + and found_connectors[0].get('configurationJson').get('id') is not None): + connector_id = TBUtility.get_or_create_connector_id(found_connectors[0].get("configurationJson")) + else: + connector_id = TBUtility.get_or_create_connector_id(config.get('configurationJson')) if not found_connectors: - connector_configuration = {'name': config['name'], 'type': config['type'], - 'id': connector_id,'configuration': config_file_name} + connector_configuration = {'name': config['name'], + 'type': config['type'], + 'id': connector_id, + 'configuration': config_file_name} if config.get('key'): connector_configuration['key'] = config['key'] @@ -498,7 +510,7 @@ class RemoteConfigurator: changed = True connector_configuration = None - if (found_connector.get('id') != config['id'] + if (found_connector.get('id') != connector_id or found_connector.get('name') != config['name'] or found_connector.get('type') != config['type'] or found_connector.get('class') != config.get('class') @@ -528,10 +540,18 @@ class RemoteConfigurator: self._gateway.available_connectors_by_id[connector_configuration['id']].close() self._gateway.available_connectors_by_id.pop(connector_configuration['id']) + if self._gateway.available_connectors_by_name.get(connector_configuration['name']): + self._gateway.available_connectors_by_name.pop(connector_configuration['name']) self._gateway.load_connectors(self._get_general_config_in_local_format()) self._gateway.connect_with_connectors() + for device_name in self._gateway.get_devices().keys(): + for connector_id in self._gateway.available_connectors_by_id.keys(): + if (self._gateway.available_connectors_by_id.get(connector_id) + and self._gateway.available_connectors_by_id[connector_id].get_id() == connector_id): + self._gateway.update_device(device_name, "connector", self._gateway.available_connectors_by_id[connector_id]) + self._gateway.tb_client.client.send_attributes({config['name']: config}) except Exception as e: LOG.exception(e)