1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Adopted to new library response on error during reading/writing to device and add lock for RPC processing

This commit is contained in:
imbeacon
2024-06-10 15:32:03 +03:00
parent 09edec0582
commit b199d2cf2a
2 changed files with 20 additions and 19 deletions

View File

@@ -744,12 +744,13 @@ class ModbusConnector(Connector, Thread):
if content.get(RPC_ID_PARAMETER) or (content.get(DATA_PARAMETER) is not None
and content[DATA_PARAMETER].get(RPC_ID_PARAMETER)) is not None:
if isinstance(response, Exception):
if isinstance(response, Exception) or isinstance(response, ExceptionResponse):
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
content={
content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response)
})
},
success_sent=False)
else:
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),

View File

@@ -1304,24 +1304,24 @@ class TBGatewayService:
def __rpc_to_devices_processing(self):
while not self.stopped:
if not self.__rpc_to_devices_queue.empty():
request_id, content, received_time = self.__rpc_to_devices_queue.get()
timeout = content.get("params", {}).get("timeout", self.DEFAULT_TIMEOUT)
if monotonic() - received_time > timeout:
log.error("RPC request %s timeout", request_id)
self.send_rpc_reply(content["device"], request_id, "{\"error\":\"Request timeout\", \"code\": 408}")
device = content.get("device")
if device in self.get_devices():
connector = self.get_devices()[content['device']].get(CONNECTOR_PARAMETER)
if connector is not None:
content['id'] = request_id
connector.server_side_rpc_handler(content)
with self.__lock:
request_id, content, received_time = self.__rpc_to_devices_queue.get()
timeout = content.get("params", {}).get("timeout", self.DEFAULT_TIMEOUT)
if monotonic() - received_time > timeout:
log.error("RPC request %s timeout", request_id)
self.send_rpc_reply(content["device"], request_id, "{\"error\":\"Request timeout\", \"code\": 408}")
device = content.get("device")
if device in self.get_devices():
connector = self.get_devices()[content['device']].get(CONNECTOR_PARAMETER)
if connector is not None:
content['id'] = request_id
connector.server_side_rpc_handler(content)
else:
log.error("Received RPC request but connector for the device %s not found. Request data: \n %s",
content["device"],
dumps(content))
else:
log.error("Received RPC request but connector for the device %s not found. Request data: \n %s",
content["device"],
dumps(content))
else:
self.__rpc_to_devices_queue.put((request_id, content, received_time))
sleep(.001)
self.__rpc_to_devices_queue.put((request_id, content, received_time))
else:
sleep(.01)