1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Added ability to request shared attributes for GRPC based connectors

This commit is contained in:
zbeacon
2022-06-15 18:20:47 +03:00
parent 88c66a4c6b
commit deebadb970
7 changed files with 206 additions and 211 deletions

View File

@@ -69,6 +69,9 @@ class GrpcDownlinkConverter(Converter):
gw_attr_upd_notify_msg.deviceName = msg['device']
attr_notify_msg = AttributeUpdateNotificationMsg()
for shared_attribute in msg['data']:
if len(msg["data"]) == 1 and msg["data"].get("deleted") is not None and isinstance(msg["data"]["deleted"], list):
attr_notify_msg.sharedDeleted.extend(msg["data"]["deleted"])
break
ts_kv_proto = TsKvProto()
ts_kv_proto.ts = ts
kv = GrpcDownlinkConverter.__get_key_value_proto_value(shared_attribute, msg['data'][shared_attribute])
@@ -80,7 +83,27 @@ class GrpcDownlinkConverter(Converter):
@staticmethod
def __convert_gateway_attribute_response_msg(basic_msg, msg, additional_data=None):
pass
attrs_resp_msg = GatewayAttributesResponseMsg()
attrs_resp_msg.requestId = additional_data["request_id"]
if additional_data.get("error") is not None:
attrs_resp_msg.error = additional_data["error"]
if additional_data.get("key") is not None: # Single key requested
if additional_data["client"]:
attrs_resp_msg.clientAttributeList.extend([GrpcDownlinkConverter.__get_key_value_proto_value(additional_data["key"], msg.get("value"))])
else:
attrs_resp_msg.sharedAttributeList.extend([GrpcDownlinkConverter.__get_key_value_proto_value(additional_data["key"], msg.get("value"))])
else: # Several keys requested
for key, value in msg["values"].items():
if additional_data["client"]:
attrs_resp_msg.clientAttributeList.extend([GrpcDownlinkConverter.__get_key_value_proto_value(key, value)])
else:
attrs_resp_msg.sharedAttributeList.extend([GrpcDownlinkConverter.__get_key_value_proto_value(key, value)])
gw_attr_resp_msg = GatewayAttributeResponseMsg()
gw_attr_resp_msg.deviceName = msg["device"]
gw_attr_resp_msg.responseMsg.MergeFrom(attrs_resp_msg)
basic_msg.gatewayAttributeResponseMsg.MergeFrom(gw_attr_resp_msg)
return basic_msg
@staticmethod
def __convert_gateway_device_rpc_request_msg(basic_msg, msg, additional_data=None):

View File

@@ -29,7 +29,6 @@ from thingsboard_gateway.gateway.proto.messages_pb2_grpc import add_TBGatewayPro
log = logging.getLogger('grpc')
DEFAULT_STATISTICS_DICT = {"MessagesReceived": 0, "MessagesSent": 0}
@@ -81,9 +80,12 @@ class TBGRPCServerManager(Thread):
if msg.response.ByteSize() == 0:
outgoing_message = True
if msg.HasField("connectorGetConnectedDevicesMsg"):
connector_name = list(self.__connectors_sessions.keys())[list(self.__connectors_sessions.values()).index(session_id)]
connector_name = list(self.__connectors_sessions.keys())[
list(self.__connectors_sessions.values()).index(session_id)]
connected_devices = self.__get_connector_devices(connector_name)
downlink_converter_config = {"message_type": [DownlinkMessageType.ConnectorGetConnectedDevicesResponseMsg], "additional_message": connected_devices}
downlink_converter_config = {
"message_type": [DownlinkMessageType.ConnectorGetConnectedDevicesResponseMsg],
"additional_message": connected_devices}
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, None)
if msg.HasField("gatewayTelemetryMsg"):
data = self.__convert_with_uplink_converter(msg.gatewayTelemetryMsg)
@@ -114,12 +116,33 @@ class TBGRPCServerManager(Thread):
self.__increase_incoming_statistic(session_id)
if msg.HasField("gatewayRpcResponseMsg"):
data = self.__convert_with_uplink_converter(msg.gatewayRpcResponseMsg)
result_status = self.__gateway.send_rpc_reply(device=data['deviceName'], req_id=data['id'], content=data['data'])
result_status = self.__gateway.send_rpc_reply(device=data['deviceName'], req_id=data['id'],
content=data['data'])
outgoing_message = True
self.__increase_incoming_statistic(session_id)
if msg.HasField("gatewayAttributeRequestMsg"):
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, Status.NOT_FOUND)
pass
shared_keys = None
client_keys = None
device_name = msg.gatewayAttributeRequestMsg.deviceName
request_id = msg.gatewayAttributeRequestMsg.id
is_client = msg.gatewayAttributeRequestMsg.client
keys = list(msg.gatewayAttributeRequestMsg.keys)
if is_client:
client_keys = keys
else:
shared_keys = keys
callback_with_extra_params = (self.__process_requested_attributes,
{"request_id": request_id, "session_id": session_id,
"device_name": device_name, "client": is_client})
if len(keys) == 1:
callback_with_extra_params[1]["key"] = keys[0]
self.__gateway.request_device_attributes(device_name,
shared_keys,
client_keys,
callback_with_extra_params
)
outgoing_message = True
self.__increase_incoming_statistic(session_id)
else:
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, Status.FAILURE)
if outgoing_message is None:
@@ -137,14 +160,16 @@ class TBGRPCServerManager(Thread):
self.__grpc_server.write(session_id, msg)
self.__increase_outgoing_statistic(session_id)
else:
log.warning("Cannot write to connector with name %s, session is not found. Client is not registered!", connector_name)
log.warning("Cannot write to connector with name %s, session is not found. Client is not registered!",
connector_name)
def registration_finished(self, registration_result: Status, session_id, connector_configuration):
additional_message = FromConnectorMessage()
additional_message.registerConnectorMsg.MergeFrom(RegisterConnectorMsg())
if registration_result == Status.SUCCESS:
connector_name = connector_configuration['name']
self.sessions[session_id] = {"config": connector_configuration, "name": connector_name, "statistics": DEFAULT_STATISTICS_DICT}
self.sessions[session_id] = {"config": connector_configuration, "name": connector_name,
"statistics": DEFAULT_STATISTICS_DICT}
self.__connectors_sessions[connector_name] = session_id
msg = self.__grpc_server.get_response("SUCCESS", additional_message)
configuration_msg = ConnectorConfigurationMsg()
@@ -182,6 +207,14 @@ class TBGRPCServerManager(Thread):
else:
return DEFAULT_STATISTICS_DICT
def __process_requested_attributes(self, content, error, extra_params):
log.debug("Received requested attributes")
if error:
log.error(error)
downlink_converter_config = {"message_type": [DownlinkMessageType.GatewayAttributeResponseMsg], "additional_message": {**extra_params, "error": str(error)}}
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, content)
self.__grpc_server.write(extra_params['session_id'], outgoing_message)
def __convert_with_uplink_converter(self, data):
return self.__uplink_converter.convert(None, data)
@@ -202,7 +235,7 @@ class TBGRPCServerManager(Thread):
('grpc.http2.max_pings_without_data', config.get('maxPingsWithoutData', 0)),
('grpc.http2.min_time_between_pings_ms', config.get('minTimeBetweenPingsMs', 10000)),
('grpc.http2.min_ping_interval_without_data_ms', config.get('minPingIntervalWithoutDataMs', 5000)),
))
))
add_TBGatewayProtoServiceServicer_to_server(self.__grpc_server, self.__aio_server)
self.__aio_server.add_insecure_port("[::]:%s" % (self.__grpc_port,))
await self.__aio_server.start()

View File

@@ -112,21 +112,6 @@ message PostAttributeMsg {
repeated KeyValueProto kv = 1;
}
message GetAttributeRequestMsg {
int32 requestId = 1;
repeated string clientAttributeNames = 2;
repeated string sharedAttributeNames = 3;
bool onlyShared = 4;
}
message GetAttributeResponseMsg {
int32 requestId = 1;
repeated TsKvProto clientAttributeList = 2;
repeated TsKvProto sharedAttributeList = 3;
string error = 5;
bool sharedStateMsg = 6;
}
message AttributeUpdateNotificationMsg {
repeated TsKvProto sharedUpdated = 1;
repeated string sharedDeleted = 2;
@@ -217,7 +202,7 @@ message GatewayRpcResponseMsg {
message GatewayAttributeResponseMsg {
string deviceName = 1;
GetAttributeResponseMsg responseMsg = 2;
GatewayAttributesResponseMsg responseMsg = 2;
}
message GatewayAttributeUpdateNotificationMsg {
@@ -235,4 +220,11 @@ message GatewayAttributesRequestMsg {
string deviceName = 2;
bool client = 3;
repeated string keys = 4;
}
message GatewayAttributesResponseMsg {
int32 requestId = 1;
repeated KeyValueProto clientAttributeList = 2;
repeated KeyValueProto sharedAttributeList = 3;
string error = 5;
}

File diff suppressed because one or more lines are too long

View File

@@ -414,9 +414,14 @@ class TBGatewayService:
self.tb_client.client.subscribe_to_all_attributes(self._attribute_update_callback)
self.tb_client.client.gw_subscribe_to_all_attributes(self._attribute_update_callback)
def request_device_attributes(self, device_name, shared_keys, client_keys, callback):
if client_keys is not None:
self.tb_client.client.gw_request_client_attributes(device_name, client_keys, callback)
if shared_keys is not None:
self.tb_client.client.gw_request_shared_attributes(device_name, shared_keys, callback)
def __check_shared_attributes(self):
self.tb_client.client.request_attributes(callback=self._attributes_parse)
def __register_connector(self, session_id, connector_key):
if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key][
'name'] not in self.available_connectors: