1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Added RPC reply processing for GRPC API

This commit is contained in:
zbeacon
2021-12-29 13:07:33 +02:00
parent d25e746ba0
commit 9901d6c1b5
3 changed files with 33 additions and 13 deletions

View File

@@ -84,11 +84,13 @@ class GrpcUplinkConverter(Converter):
def __convert_disconnect_msg(msg: DisconnectMsg):
return {"deviceName": msg.deviceName}
def __convert_gateway_rpc_response_msg(self, msg: GatewayRpcResponseMsg):
pass
@staticmethod
def __convert_gateway_rpc_response_msg(msg: GatewayRpcResponseMsg):
return {"deviceName": msg.deviceName, "id": msg.id, "data": msg.data}
def __convert_gateway_attributes_request_msg(self, msg: GatewayAttributesRequestMsg):
pass
@staticmethod
def __convert_gateway_attributes_request_msg(msg: GatewayAttributesRequestMsg):
return {"id": msg.id, "deviceName": msg.deviceName, "client": msg.client, "keys": msg.keys}
@staticmethod
def get_value(msg: KeyValueProto):

View File

@@ -88,38 +88,39 @@ class TBGRPCServerManager(Thread):
if msg.response.ByteSize() == 0:
outgoing_message = True
if msg.HasField("gatewayTelemetryMsg"):
data = self.__uplink_converter.convert(None, msg.gatewayTelemetryMsg)
data = self.__convert_with_uplink_converter(msg.gatewayTelemetryMsg)
result_status = self.__gateway.send_to_storage(self.sessions[session_id]['name'], data)
outgoing_message = True
self.__increase_incoming_statistic(session_id)
if msg.HasField("gatewayAttributesMsg"):
data = self.__uplink_converter.convert(None, msg.gatewayAttributesMsg)
data = self.__convert_with_uplink_converter(msg.gatewayAttributesMsg)
result_status = self.__gateway.send_to_storage(self.sessions[session_id]['name'], data)
outgoing_message = True
self.__increase_incoming_statistic(session_id)
if msg.HasField("gatewayClaimMsg"):
message_for_conversion = msg.gatewayClaimMsg
data = self.__uplink_converter.convert(None, message_for_conversion)
data = self.__convert_with_uplink_converter(msg.gatewayClaimMsg)
result_status = self.__gateway.send_to_storage(self.sessions[session_id]['name'], data)
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, result_status)
self.__increase_incoming_statistic(session_id)
if msg.HasField("connectMsg"):
message_for_conversion = msg.connectMsg
data = self.__uplink_converter.convert(None, message_for_conversion)
data = self.__convert_with_uplink_converter(msg.connectMsg)
data['name'] = self.sessions[session_id]['name']
result_status = self.__gateway.add_device_async(data)
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, result_status)
self.__increase_incoming_statistic(session_id)
if msg.HasField("disconnectMsg"):
message_for_conversion = msg.disconnectMsg
data = self.__uplink_converter.convert(None, message_for_conversion)
data = self.__convert_with_uplink_converter(msg.disconnectMsg)
data['name'] = self.sessions[session_id]['name']
result_status = self.__gateway.del_device_async(data)
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, result_status)
self.__increase_incoming_statistic(session_id)
if msg.HasField("gatewayRpcResponseMsg"):
pass
data = self.__convert_with_uplink_converter(msg.gatewayRpcResponseMsg)
result_status = self.__gateway.send_rpc_reply(device=data['deviceName'], req_id=data['id'], content=data['data'])
outgoing_message = True
self.__increase_incoming_statistic(session_id)
if msg.HasField("gatewayAttributeRequestMsg"):
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, Status.NOT_FOUND)
pass
else:
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, Status.FAILURE)
@@ -183,6 +184,9 @@ class TBGRPCServerManager(Thread):
else:
return DEFAULT_STATISTICS_DICT
def __convert_with_uplink_converter(self, data):
return self.__uplink_converter.convert(None, data)
def __increase_incoming_statistic(self, session_id):
if session_id in self.sessions:
self.sessions[session_id]['statistics']["MessagesReceived"] += 1

View File

@@ -147,10 +147,13 @@ class TBGatewayService:
release=self.__updater.get_release())
self.__rpc_remote_shell_command_in_progress = None
self.__scheduled_rpc_calls = []
self.__rpc_processing_queue = SimpleQueue()
self.__rpc_scheduled_methods_functions = {
"restart": {"function": execv, "arguments": (executable, [executable.split(pathsep)[-1]] + argv)},
"reboot": {"function": system, "arguments": ("reboot 0",)},
}
self.__rpc_processing_thread = Thread(target=self.__send_rpc_reply_processing, daemon=True, name="RPC processing thread")
self.__rpc_processing_thread.start()
self._event_storage = self._event_storage_types[self.__config["storage"]["type"]](self.__config["storage"])
self.connectors_configs = {}
self.__remote_configurator = None
@@ -832,6 +835,17 @@ class TBGatewayService:
def send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None, wait_for_publish=None,
quality_of_service=0):
self.__rpc_processing_queue.put((device, req_id, content, success_sent, wait_for_publish, quality_of_service))
def __send_rpc_reply_processing(self):
while not self.stopped:
if not self.__rpc_processing_queue.empty():
args = self.__rpc_processing_queue.get()
self.__send_rpc_reply(*args)
else:
sleep(.1)
def __send_rpc_reply(self, device=None, req_id=None, content=None, success_sent=None, wait_for_publish=None, quality_of_service=0):
try:
self.__rpc_reply_sent = True
rpc_response = {"success": False}