diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 7bfcbcc6..db9fc9e8 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -173,6 +173,9 @@ class GatewayManager(multiprocessing.managers.BaseManager): class TBGatewayService: + + DEFAULT_TIMEOUT = 5 + EXPOSED_GETTERS = [ 'ping', 'get_status', @@ -217,10 +220,6 @@ class TBGatewayService: # change main config if Gateway running with docker env variables self.__modify_main_config() - self.__test_env = False - if environ.get('TB_BASE_URL') == 'http://127.0.0.1:9090': - self.__test_env = True - log.info("Gateway starting...") self.__updater = TBUpdater() self.version = self.__updater.get_version() @@ -252,6 +251,9 @@ class TBGatewayService: self.__rpc_processing_thread = Thread(target=self.__send_rpc_reply_processing, daemon=True, name="RPC processing thread") self.__rpc_processing_thread.start() + self.__rpc_to_devices_processing_thread = Thread(target=self.__rpc_to_devices_processing, daemon=True, + name="RPC to devices processing thread") + self.__rpc_to_devices_processing_thread.start() self._event_storage = self._event_storage_types[self.__config["storage"]["type"]](self.__config["storage"]) self.init_grpc_service(self.__config.get('grpc')) @@ -340,6 +342,7 @@ class TBGatewayService: self._published_events = SimpleQueue() self.__rpc_processing_queue = SimpleQueue() + self.__rpc_to_devices_queue = SimpleQueue() self.__async_device_actions_queue = SimpleQueue() self.__rpc_register_queue = SimpleQueue() self.__converted_data_queue = SimpleQueue() @@ -1261,14 +1264,7 @@ class TBGatewayService: try: device = content.get("device") if device is not None: - connector = self.get_devices()[device].get(CONNECTOR_PARAMETER) - if connector is not None: - content['id'] = request_id - connector.server_side_rpc_handler(content) - else: - log.error("Received RPC request but connector for the device %s not found. Request data: \n %s", - content["device"], - dumps(content)) + self.__rpc_to_devices_queue.put((request_id, content, monotonic())) else: try: method_split = content["method"].split('_') @@ -1305,6 +1301,30 @@ class TBGatewayService: except Exception as e: log.exception(e) + def __rpc_to_devices_processing(self): + while not self.stopped: + if not self.__rpc_to_devices_queue.empty(): + request_id, content, received_time = self.__rpc_to_devices_queue.get() + timeout = content.get("params", {}).get("timeout", self.DEFAULT_TIMEOUT) + if monotonic() - received_time > timeout: + log.error("RPC request %s timeout", request_id) + self.send_rpc_reply(content["device"], request_id, "{\"error\":\"Request timeout\", \"code\": 408}") + device = content.get("device") + if device in self.get_devices(): + connector = self.get_devices()[content['device']].get(CONNECTOR_PARAMETER) + if connector is not None: + content['id'] = request_id + connector.server_side_rpc_handler(content) + else: + log.error("Received RPC request but connector for the device %s not found. Request data: \n %s", + content["device"], + dumps(content)) + else: + self.__rpc_to_devices_queue.put((request_id, content, received_time)) + sleep(.001) + else: + sleep(.01) + def __rpc_gateway_processing(self, request_id, content): log.info("Received RPC request to the gateway, id: %s, method: %s", str(request_id), content["method"]) arguments = content.get('params', {}) @@ -1454,10 +1474,12 @@ class TBGatewayService: return Status.FAILURE def add_device(self, device_name, content, device_type=None): - if device_name not in self.__added_devices or self.__test_env \ - or monotonic() - self.__added_devices[device_name]["last_send_ts"] > 60 \ + if (device_name not in self.__added_devices + or device_name not in self.__connected_devices + or device_name not in self.__saved_devices + or monotonic() - self.__added_devices[device_name]["last_send_ts"] > 60 or (self.__added_devices[device_name]["device_details"]["connectorName"] != content['connector'].get_name() # noqa E501 - or self.__added_devices[device_name]["device_details"]["connectorType"] != content['connector'].get_type()): # noqa E501 + or self.__added_devices[device_name]["device_details"]["connectorType"] != content['connector'].get_type())): # noqa E501 device_type = device_type if device_type is not None else 'default' self.__connected_devices[device_name] = {**content, DEVICE_TYPE_PARAMETER: device_type}