1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Added ability to get devices from gateway to grpc connectors

This commit is contained in:
zbeacon
2022-06-15 11:39:42 +03:00
parent bc308ea28b
commit 0a6e79b759
6 changed files with 422 additions and 227 deletions

View File

@@ -27,7 +27,8 @@ class DownlinkMessageType(Enum):
GatewayAttributeUpdateNotificationMsg = 2,
GatewayAttributeResponseMsg = 3,
GatewayDeviceRpcRequestMsg = 4,
UnregisterConnectorMsg = 5
UnregisterConnectorMsg = 5,
ConnectorGetConnectedDevicesResponseMsg = 6
class Status(Enum):

View File

@@ -29,7 +29,8 @@ class GrpcDownlinkConverter(Converter):
DownlinkMessageType.GatewayAttributeUpdateNotificationMsg: self.__convert_gateway_attribute_update_notification_msg,
DownlinkMessageType.GatewayAttributeResponseMsg: self.__convert_gateway_attribute_response_msg,
DownlinkMessageType.GatewayDeviceRpcRequestMsg: self.__convert_gateway_device_rpc_request_msg,
DownlinkMessageType.UnregisterConnectorMsg: self.__convert_unregister_connector_msg
DownlinkMessageType.UnregisterConnectorMsg: self.__convert_unregister_connector_msg,
DownlinkMessageType.ConnectorGetConnectedDevicesResponseMsg: self.__convert_get_connected_devices_msg
}
def convert(self, config, msg):
@@ -52,7 +53,7 @@ class GrpcDownlinkConverter(Converter):
if additional_message.HasField('gatewayTelemetryMsg'):
additional_message.gatewayTelemetryMsg.MergeFrom(GatewayTelemetryMsg())
elif additional_message.HasField("gatewayAttributesMsg"):
additional_message.gatewayTelemetryMsg.MergeFrom(GatewayTelemetryMsg())
additional_message.gatewayAttributesMsg.MergeFrom(GatewayAttributesMsg())
else:
basic_msg.response.connectorMessage.MergeFrom(additional_message)
basic_msg.response.status = ResponseStatus.Value(msg.name)
@@ -106,6 +107,23 @@ class GrpcDownlinkConverter(Converter):
basic_msg.unregisterConnectorMsg.MergeFrom(unreg_msg)
return basic_msg
@staticmethod
def __convert_get_connected_devices_msg(basic_msg, msg, additional_data=None):
status = ResponseStatus.Value("SUCCESS")
if additional_data is None:
status = ResponseStatus.Value("FAILURE")
additional_data = {}
connector_devices_response_msg = ConnectorGetConnectedDevicesResponseMsg()
for device_name, device_type in additional_data.items():
device_info_msg = ConnectorDeviceInfo()
device_info_msg.deviceName = device_name
device_info_msg.deviceType = device_type
connector_devices_response_msg.connectorDevices.extend([device_info_msg])
basic_msg.connectorGetConnectedDevicesResponseMsg.MergeFrom(connector_devices_response_msg)
basic_msg.response.connectorMessage.connectorGetConnectedDevicesMsg.MergeFrom(ConnectorGetConnectedDevicesMsg())
basic_msg.response.status = status
return basic_msg
@staticmethod
def __get_key_value_proto_value(key: str, value: Union[str, bool, int, float, dict]) -> KeyValueProto:
key_value_proto = KeyValueProto()

View File

@@ -80,6 +80,11 @@ class TBGRPCServerManager(Thread):
if msg.HasField("response"):
if msg.response.ByteSize() == 0:
outgoing_message = True
if msg.HasField("connectorGetConnectedDevicesMsg"):
connector_name = list(self.__connectors_sessions.keys())[list(self.__connectors_sessions.values()).index(session_id)]
connected_devices = self.__get_connector_devices(connector_name)
downlink_converter_config = {"message_type": [DownlinkMessageType.ConnectorGetConnectedDevicesResponseMsg], "additional_message": connected_devices}
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, None)
if msg.HasField("gatewayTelemetryMsg"):
data = self.__convert_with_uplink_converter(msg.gatewayTelemetryMsg)
result_status = self.__gateway.send_to_storage(self.sessions[session_id]['name'], data)
@@ -209,6 +214,9 @@ class TBGRPCServerManager(Thread):
loop = asyncio.get_event_loop()
loop.create_task(self.__aio_server.stop(True))
def __get_connector_devices(self, connector_name: str):
return self.__gateway.get_devices(connector_name)
def set_gateway_read_callbacks(self, registration_cb, unregistration_cb):
self.__register_connector = registration_cb
self.__unregister_connector = unregistration_cb

View File

@@ -24,6 +24,7 @@ message FromConnectorMessage {
DisconnectMsg disconnectMsg = 8;
GatewayRpcResponseMsg gatewayRpcResponseMsg = 9;
GatewayAttributesRequestMsg gatewayAttributeRequestMsg = 10;
ConnectorGetConnectedDevicesMsg connectorGetConnectedDevicesMsg = 11;
}
message FromServiceMessage {
@@ -33,8 +34,10 @@ message FromServiceMessage {
GatewayAttributeResponseMsg gatewayAttributeResponseMsg = 4;
GatewayDeviceRpcRequestMsg gatewayDeviceRpcRequestMsg = 5;
UnregisterConnectorMsg unregisterConnectorMsg = 6;
ConnectorGetConnectedDevicesResponseMsg connectorGetConnectedDevicesResponseMsg = 7;
}
// Enums
enum ResponseStatus {
UNKNOWN = 0;
@@ -51,6 +54,36 @@ enum KeyValueType {
JSON_V = 4;
}
// Service connector messages
message RegisterConnectorMsg {
string connectorKey = 1;
}
message UnregisterConnectorMsg {
string connectorKey = 1;
}
message ConnectorConfigurationMsg {
string connectorName = 1;
string configuration = 2;
}
message ConnectorGetConnectedDevicesMsg {
string connectorKey = 1;
}
message ConnectorGetConnectedDevicesResponseMsg {
repeated ConnectorDeviceInfo connectorDevices = 1;
}
message ConnectorDeviceInfo {
string deviceName = 1;
string deviceType = 2;
}
// Device messages
message KeyValueProto {
string key = 1;
KeyValueType type = 2;
@@ -142,19 +175,6 @@ message DisconnectMsg {
string deviceName = 1;
}
message RegisterConnectorMsg {
string connectorKey = 1;
}
message UnregisterConnectorMsg {
string connectorKey = 1;
}
message ConnectorConfigurationMsg {
string connectorName = 1;
string configuration = 2;
}
message ConnectMsg {
string deviceName = 1;
string deviceType = 2;
@@ -175,6 +195,8 @@ message ClaimDeviceMsg {
ClaimDevice claimRequest = 2;
}
// Gateway messages
message GatewayTelemetryMsg {
repeated TelemetryMsg msg = 1;
}

File diff suppressed because one or more lines are too long

View File

@@ -530,8 +530,8 @@ class TBGatewayService:
def _connect_with_connectors(self):
for connector_type in self.connectors_configs:
for connector_config in self.connectors_configs[connector_type]:
if connector_type.lower() != 'grpc' and 'Grpc' not in self._implemented_connectors[
connector_type.lower()].__name__:
if connector_type.lower() != 'grpc' and (self._implemented_connectors.get(connector_type.lower()) is not None and
'Grpc' not in self._implemented_connectors[connector_type.lower()].__name__):
for config in connector_config["config"]:
connector = None
try:
@@ -554,21 +554,21 @@ class TBGatewayService:
connector.close()
else:
self.__grpc_connectors.update({connector_config['grpc_key']: connector_config})
if connector_type.lower() != 'grpc':
connector_dir_abs = "/".join(self._config_dir.split("/")[:-2])
connector_file_name = f'{connector_type}_connector.py'
connector_abs_path = f'{connector_dir_abs}/grpc_connectors/{connector_type}/{connector_file_name}'
connector_config_json = simplejson.dumps({
**connector_config,
'gateway': {
'host': 'localhost',
'port': self.__config['grpc']['serverPort']
}
})
connector_dir_abs = "/".join(self._config_dir.split("/")[:-2])
connector_file_name = f'{connector_type}_connector.py'
connector_abs_path = f'{connector_dir_abs}/grpc_connectors/{connector_type}/{connector_file_name}'
connector_config_json = simplejson.dumps({
**connector_config,
'gateway': {
'host': 'localhost',
'port': self.__config['grpc']['serverPort']
}
})
thread = Thread(target=self._run_connector, args=(connector_abs_path, connector_config_json,),
daemon=True, name='Separate DRPC Connector')
thread.start()
thread = Thread(target=self._run_connector, args=(connector_abs_path, connector_config_json,),
daemon=True, name='Separate DRPC Connector')
thread.start()
def _run_connector(self, connector_abs_path, connector_config_json):
subprocess.run(['python3', connector_abs_path, connector_config_json, self._config_dir],
@@ -1063,8 +1063,10 @@ class TBGatewayService:
self.__saved_devices.pop(device_name)
self.__save_persistent_devices()
def get_devices(self):
return self.__connected_devices
def get_devices(self, connector_name: str = None):
return self.__connected_devices if connector_name is None else {device_name: self.__connected_devices[device_name]["device_type"] for device_name in self.__connected_devices.keys() if self.__connected_devices[device_name].get("connector") is not None and
self.__connected_devices[device_name]["connector"].get_name() == connector_name}
def __process_async_device_actions(self):
while not self.stopped: