diff --git a/thingsboard_gateway/gateway/grpc_service/tb_grpc_manager.py b/thingsboard_gateway/gateway/grpc_service/tb_grpc_manager.py index 37f1c8b4..974bbde8 100644 --- a/thingsboard_gateway/gateway/grpc_service/tb_grpc_manager.py +++ b/thingsboard_gateway/gateway/grpc_service/tb_grpc_manager.py @@ -13,18 +13,20 @@ # limitations under the License. import asyncio +from typing import Optional + import grpc import logging from threading import Thread from time import sleep from enum import Enum from simplejson import dumps +from thingsboard_gateway.gateway.grpc_service.grpc_connector import GrpcConnector from thingsboard_gateway.gateway.proto.messages_pb2_grpc import add_TBGatewayProtoServiceServicer_to_server from thingsboard_gateway.gateway.proto.messages_pb2 import * from thingsboard_gateway.gateway.grpc_service.tb_grpc_server import TBGRPCServer - log = logging.getLogger('service') @@ -41,6 +43,7 @@ class TBGRPCServerManager(Thread): self.setName("TB GRPC manager thread") self.__aio_server: grpc.aio.Server = None self.__register_connector = None + self.__unregister_connector = None self.__send_data_to_storage = None self._stopped = False self.__config = config @@ -68,7 +71,7 @@ class TBGRPCServerManager(Thread): if msg.HasField("registerConnectorMsg"): self.__register_connector(context, msg.registerConnectorMsg.connectorKey) if msg.HasField("unregisterConnectorMsg"): - pass + self.__unregister_connector(context, msg.unregisterConnectorMsg.connectorKey) if msg.HasField("connectMsg"): pass if msg.HasField("disconnectMsg"): @@ -102,8 +105,29 @@ class TBGRPCServerManager(Thread): msg = self.__grpc_server.get_response("FAILURE") self.__grpc_server.write(msg) + def unregister(self, unregistration_result: RegistrationStatus, context, connector: Optional[GrpcConnector]): + if unregistration_result == RegistrationStatus.SUCCESS: + connector_name = connector.get_name() + connector_session = self.__connectors_sessions.pop(connector_name) + msg = self.__grpc_server.get_response("SUCCESS") + self.__grpc_server.write(msg) + elif unregistration_result == RegistrationStatus.NOT_FOUND: + msg = self.__grpc_server.get_response("NOT_FOUND") + self.__grpc_server.write(msg) + elif unregistration_result == RegistrationStatus.FAILURE: + msg = self.__grpc_server.get_response("FAILURE") + self.__grpc_server.write(msg) + async def serve(self): - self.__aio_server = grpc.aio.server() + self.__aio_server = grpc.aio.server( + options=( + ('grpc.keepalive_time_ms', 10000), + ('grpc.keepalive_timeout_ms', 5000), + ('grpc.keepalive_permit_without_calls', True), + ('grpc.http2.max_pings_without_data', 0), + ('grpc.http2.min_time_between_pings_ms', 10000), + ('grpc.http2.min_ping_interval_without_data_ms', 5000), + )) add_TBGatewayProtoServiceServicer_to_server(self.__grpc_server, self.__aio_server) self.__aio_server.add_insecure_port("[::]:%s" % (self.__grpc_port,)) await self.__aio_server.start() @@ -115,9 +139,10 @@ class TBGRPCServerManager(Thread): loop = asyncio.get_event_loop() loop.create_task(self.__aio_server.stop(True)) - def set_gateway_read_callbacks(self, register, send_data_to_storage): - self.__register_connector = register - self.__send_data_to_storage = send_data_to_storage + def set_gateway_read_callbacks(self, registeration_cb, unregistration_cb, send_data_cb): + self.__register_connector = registeration_cb + self.__unregister_connector = unregistration_cb + self.__send_data_to_storage = send_data_cb if __name__ == '__main__': diff --git a/thingsboard_gateway/gateway/proto/messages.proto b/thingsboard_gateway/gateway/proto/messages.proto index 29e20558..164c52ec 100644 --- a/thingsboard_gateway/gateway/proto/messages.proto +++ b/thingsboard_gateway/gateway/proto/messages.proto @@ -145,7 +145,7 @@ message RegisterConnectorMsg { } message UnregisterConnectorMsg { - string connectorName = 1; + string connectorKey = 1; } message ConnectorConfigurationMsg { diff --git a/thingsboard_gateway/gateway/proto/messages_pb2.py b/thingsboard_gateway/gateway/proto/messages_pb2.py index 343c3989..9fe8eb13 100644 --- a/thingsboard_gateway/gateway/proto/messages_pb2.py +++ b/thingsboard_gateway/gateway/proto/messages_pb2.py @@ -20,7 +20,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( syntax='proto3', serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\x0emessages.proto\x12\x08messages\"4\n\x08Response\x12(\n\x06status\x18\x01 \x01(\x0e\x32\x18.messages.ResponseStatus\"\xcf\x04\n\x14\x46romConnectorMessage\x12$\n\x08response\x18\x01 \x01(\x0b\x32\x12.messages.Response\x12:\n\x13gatewayTelemetryMsg\x18\x02 \x01(\x0b\x32\x1d.messages.GatewayTelemetryMsg\x12<\n\x14gatewayAttributesMsg\x18\x03 \x01(\x0b\x32\x1e.messages.GatewayAttributesMsg\x12\x32\n\x0fgatewayClaimMsg\x18\x04 \x01(\x0b\x32\x19.messages.GatewayClaimMsg\x12<\n\x14registerConnectorMsg\x18\x05 \x01(\x0b\x32\x1e.messages.RegisterConnectorMsg\x12@\n\x16unregisterConnectorMsg\x18\x06 \x01(\x0b\x32 .messages.UnregisterConnectorMsg\x12(\n\nconnectMsg\x18\x07 \x01(\x0b\x32\x14.messages.ConnectMsg\x12.\n\rdisconnectMsg\x18\x08 \x01(\x0b\x32\x17.messages.DisconnectMsg\x12>\n\x15gatewayRpcResponseMsg\x18\t \x01(\x0b\x32\x1f.messages.GatewayRpcResponseMsg\x12I\n\x1agatewayAttributeRequestMsg\x18\n \x01(\x0b\x32%.messages.GatewayAttributesRequestMsg\"\xba\x03\n\x12\x46romServiceMessage\x12$\n\x08response\x18\x01 \x01(\x0b\x32\x12.messages.Response\x12\x46\n\x19\x63onnectorConfigurationMsg\x18\x02 \x01(\x0b\x32#.messages.ConnectorConfigurationMsg\x12^\n%gatewayAttributeUpdateNotificationMsg\x18\x03 \x01(\x0b\x32/.messages.GatewayAttributeUpdateNotificationMsg\x12J\n\x1bgatewayAttributeResponseMsg\x18\x04 \x01(\x0b\x32%.messages.GatewayAttributeResponseMsg\x12H\n\x1agatewayDeviceRpcRequestMsg\x18\x05 \x01(\x0b\x32$.messages.GatewayDeviceRpcRequestMsg\x12@\n\x16unregisterConnectorMsg\x18\x06 \x01(\x0b\x32 .messages.UnregisterConnectorMsg\"\x96\x01\n\rKeyValueProto\x12\x0b\n\x03key\x18\x01 \x01(\t\x12$\n\x04type\x18\x02 \x01(\x0e\x32\x16.messages.KeyValueType\x12\x0e\n\x06\x62ool_v\x18\x03 \x01(\x08\x12\x0e\n\x06long_v\x18\x04 \x01(\x03\x12\x10\n\x08\x64ouble_v\x18\x05 \x01(\x01\x12\x10\n\x08string_v\x18\x06 \x01(\t\x12\x0e\n\x06json_v\x18\x07 \x01(\t\"<\n\tTsKvProto\x12\n\n\x02ts\x18\x01 \x01(\x03\x12#\n\x02kv\x18\x02 \x01(\x0b\x32\x17.messages.KeyValueProto\"@\n\rTsKvListProto\x12\n\n\x02ts\x18\x01 \x01(\x03\x12#\n\x02kv\x18\x02 \x03(\x0b\x32\x17.messages.KeyValueProto\"=\n\x10PostTelemetryMsg\x12)\n\x08tsKvList\x18\x01 \x03(\x0b\x32\x17.messages.TsKvListProto\"7\n\x10PostAttributeMsg\x12#\n\x02kv\x18\x01 \x03(\x0b\x32\x17.messages.KeyValueProto\"{\n\x16GetAttributeRequestMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x1c\n\x14\x63lientAttributeNames\x18\x02 \x03(\t\x12\x1c\n\x14sharedAttributeNames\x18\x03 \x03(\t\x12\x12\n\nonlyShared\x18\x04 \x01(\x08\"\xb7\x01\n\x17GetAttributeResponseMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x30\n\x13\x63lientAttributeList\x18\x02 \x03(\x0b\x32\x13.messages.TsKvProto\x12\x30\n\x13sharedAttributeList\x18\x03 \x03(\x0b\x32\x13.messages.TsKvProto\x12\r\n\x05\x65rror\x18\x05 \x01(\t\x12\x16\n\x0esharedStateMsg\x18\x06 \x01(\x08\"c\n\x1e\x41ttributeUpdateNotificationMsg\x12*\n\rsharedUpdated\x18\x01 \x03(\x0b\x32\x13.messages.TsKvProto\x12\x15\n\rsharedDeleted\x18\x02 \x03(\t\"N\n\x15ToDeviceRpcRequestMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x12\n\nmethodName\x18\x02 \x01(\t\x12\x0e\n\x06params\x18\x03 \x01(\t\"K\n\x16ToDeviceRpcResponseMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"N\n\x15ToServerRpcRequestMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x12\n\nmethodName\x18\x02 \x01(\t\x12\x0e\n\x06params\x18\x03 \x01(\t\"K\n\x16ToServerRpcResponseMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"4\n\x0b\x43laimDevice\x12\x11\n\tsecretKey\x18\x01 \x01(\t\x12\x12\n\ndurationMs\x18\x02 \x01(\x03\";\n\x11\x41ttributesRequest\x12\x12\n\nclientKeys\x18\x01 \x01(\t\x12\x12\n\nsharedKeys\x18\x02 \x01(\t\",\n\nRpcRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0e\n\x06params\x18\x02 \x01(\t\"#\n\rDisconnectMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\",\n\x14RegisterConnectorMsg\x12\x14\n\x0c\x63onnectorKey\x18\x01 \x01(\t\"/\n\x16UnregisterConnectorMsg\x12\x15\n\rconnectorName\x18\x01 \x01(\t\"I\n\x19\x43onnectorConfigurationMsg\x12\x15\n\rconnectorName\x18\x01 \x01(\t\x12\x15\n\rconfiguration\x18\x02 \x01(\t\"4\n\nConnectMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x12\n\ndeviceType\x18\x02 \x01(\t\"K\n\x0cTelemetryMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\'\n\x03msg\x18\x03 \x01(\x0b\x32\x1a.messages.PostTelemetryMsg\"L\n\rAttributesMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\'\n\x03msg\x18\x02 \x01(\x0b\x32\x1a.messages.PostAttributeMsg\"Q\n\x0e\x43laimDeviceMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12+\n\x0c\x63laimRequest\x18\x02 \x01(\x0b\x32\x15.messages.ClaimDevice\":\n\x13GatewayTelemetryMsg\x12#\n\x03msg\x18\x01 \x03(\x0b\x32\x16.messages.TelemetryMsg\"8\n\x0fGatewayClaimMsg\x12%\n\x03msg\x18\x01 \x03(\x0b\x32\x18.messages.ClaimDeviceMsg\"<\n\x14GatewayAttributesMsg\x12$\n\x03msg\x18\x01 \x03(\x0b\x32\x17.messages.AttributesMsg\"E\n\x15GatewayRpcResponseMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\t\"i\n\x1bGatewayAttributeResponseMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x36\n\x0bresponseMsg\x18\x02 \x01(\x0b\x32!.messages.GetAttributeResponseMsg\"~\n%GatewayAttributeUpdateNotificationMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x41\n\x0fnotificationMsg\x18\x02 \x01(\x0b\x32(.messages.AttributeUpdateNotificationMsg\"h\n\x1aGatewayDeviceRpcRequestMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x36\n\rrpcRequestMsg\x18\x02 \x01(\x0b\x32\x1f.messages.ToDeviceRpcRequestMsg\"[\n\x1bGatewayAttributesRequestMsg\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x12\n\ndeviceName\x18\x02 \x01(\t\x12\x0e\n\x06\x63lient\x18\x03 \x01(\x08\x12\x0c\n\x04keys\x18\x04 \x03(\t*F\n\x0eResponseStatus\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0b\n\x07SUCCESS\x10\x01\x12\r\n\tNOT_FOUND\x10\x02\x12\x0b\n\x07\x46\x41ILURE\x10\x03*Q\n\x0cKeyValueType\x12\r\n\tBOOLEAN_V\x10\x00\x12\n\n\x06LONG_V\x10\x01\x12\x0c\n\x08\x44OUBLE_V\x10\x02\x12\x0c\n\x08STRING_V\x10\x03\x12\n\n\x06JSON_V\x10\x04\x32\x63\n\x15TBGatewayProtoService\x12J\n\x06stream\x12\x1e.messages.FromConnectorMessage\x1a\x1c.messages.FromServiceMessage(\x01\x30\x01\x62\x06proto3' + serialized_pb=b'\n\x0emessages.proto\x12\x08messages\"4\n\x08Response\x12(\n\x06status\x18\x01 \x01(\x0e\x32\x18.messages.ResponseStatus\"\xcf\x04\n\x14\x46romConnectorMessage\x12$\n\x08response\x18\x01 \x01(\x0b\x32\x12.messages.Response\x12:\n\x13gatewayTelemetryMsg\x18\x02 \x01(\x0b\x32\x1d.messages.GatewayTelemetryMsg\x12<\n\x14gatewayAttributesMsg\x18\x03 \x01(\x0b\x32\x1e.messages.GatewayAttributesMsg\x12\x32\n\x0fgatewayClaimMsg\x18\x04 \x01(\x0b\x32\x19.messages.GatewayClaimMsg\x12<\n\x14registerConnectorMsg\x18\x05 \x01(\x0b\x32\x1e.messages.RegisterConnectorMsg\x12@\n\x16unregisterConnectorMsg\x18\x06 \x01(\x0b\x32 .messages.UnregisterConnectorMsg\x12(\n\nconnectMsg\x18\x07 \x01(\x0b\x32\x14.messages.ConnectMsg\x12.\n\rdisconnectMsg\x18\x08 \x01(\x0b\x32\x17.messages.DisconnectMsg\x12>\n\x15gatewayRpcResponseMsg\x18\t \x01(\x0b\x32\x1f.messages.GatewayRpcResponseMsg\x12I\n\x1agatewayAttributeRequestMsg\x18\n \x01(\x0b\x32%.messages.GatewayAttributesRequestMsg\"\xba\x03\n\x12\x46romServiceMessage\x12$\n\x08response\x18\x01 \x01(\x0b\x32\x12.messages.Response\x12\x46\n\x19\x63onnectorConfigurationMsg\x18\x02 \x01(\x0b\x32#.messages.ConnectorConfigurationMsg\x12^\n%gatewayAttributeUpdateNotificationMsg\x18\x03 \x01(\x0b\x32/.messages.GatewayAttributeUpdateNotificationMsg\x12J\n\x1bgatewayAttributeResponseMsg\x18\x04 \x01(\x0b\x32%.messages.GatewayAttributeResponseMsg\x12H\n\x1agatewayDeviceRpcRequestMsg\x18\x05 \x01(\x0b\x32$.messages.GatewayDeviceRpcRequestMsg\x12@\n\x16unregisterConnectorMsg\x18\x06 \x01(\x0b\x32 .messages.UnregisterConnectorMsg\"\x96\x01\n\rKeyValueProto\x12\x0b\n\x03key\x18\x01 \x01(\t\x12$\n\x04type\x18\x02 \x01(\x0e\x32\x16.messages.KeyValueType\x12\x0e\n\x06\x62ool_v\x18\x03 \x01(\x08\x12\x0e\n\x06long_v\x18\x04 \x01(\x03\x12\x10\n\x08\x64ouble_v\x18\x05 \x01(\x01\x12\x10\n\x08string_v\x18\x06 \x01(\t\x12\x0e\n\x06json_v\x18\x07 \x01(\t\"<\n\tTsKvProto\x12\n\n\x02ts\x18\x01 \x01(\x03\x12#\n\x02kv\x18\x02 \x01(\x0b\x32\x17.messages.KeyValueProto\"@\n\rTsKvListProto\x12\n\n\x02ts\x18\x01 \x01(\x03\x12#\n\x02kv\x18\x02 \x03(\x0b\x32\x17.messages.KeyValueProto\"=\n\x10PostTelemetryMsg\x12)\n\x08tsKvList\x18\x01 \x03(\x0b\x32\x17.messages.TsKvListProto\"7\n\x10PostAttributeMsg\x12#\n\x02kv\x18\x01 \x03(\x0b\x32\x17.messages.KeyValueProto\"{\n\x16GetAttributeRequestMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x1c\n\x14\x63lientAttributeNames\x18\x02 \x03(\t\x12\x1c\n\x14sharedAttributeNames\x18\x03 \x03(\t\x12\x12\n\nonlyShared\x18\x04 \x01(\x08\"\xb7\x01\n\x17GetAttributeResponseMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x30\n\x13\x63lientAttributeList\x18\x02 \x03(\x0b\x32\x13.messages.TsKvProto\x12\x30\n\x13sharedAttributeList\x18\x03 \x03(\x0b\x32\x13.messages.TsKvProto\x12\r\n\x05\x65rror\x18\x05 \x01(\t\x12\x16\n\x0esharedStateMsg\x18\x06 \x01(\x08\"c\n\x1e\x41ttributeUpdateNotificationMsg\x12*\n\rsharedUpdated\x18\x01 \x03(\x0b\x32\x13.messages.TsKvProto\x12\x15\n\rsharedDeleted\x18\x02 \x03(\t\"N\n\x15ToDeviceRpcRequestMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x12\n\nmethodName\x18\x02 \x01(\t\x12\x0e\n\x06params\x18\x03 \x01(\t\"K\n\x16ToDeviceRpcResponseMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"N\n\x15ToServerRpcRequestMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x12\n\nmethodName\x18\x02 \x01(\t\x12\x0e\n\x06params\x18\x03 \x01(\t\"K\n\x16ToServerRpcResponseMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"4\n\x0b\x43laimDevice\x12\x11\n\tsecretKey\x18\x01 \x01(\t\x12\x12\n\ndurationMs\x18\x02 \x01(\x03\";\n\x11\x41ttributesRequest\x12\x12\n\nclientKeys\x18\x01 \x01(\t\x12\x12\n\nsharedKeys\x18\x02 \x01(\t\",\n\nRpcRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0e\n\x06params\x18\x02 \x01(\t\"#\n\rDisconnectMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\",\n\x14RegisterConnectorMsg\x12\x14\n\x0c\x63onnectorKey\x18\x01 \x01(\t\".\n\x16UnregisterConnectorMsg\x12\x14\n\x0c\x63onnectorKey\x18\x01 \x01(\t\"I\n\x19\x43onnectorConfigurationMsg\x12\x15\n\rconnectorName\x18\x01 \x01(\t\x12\x15\n\rconfiguration\x18\x02 \x01(\t\"4\n\nConnectMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x12\n\ndeviceType\x18\x02 \x01(\t\"K\n\x0cTelemetryMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\'\n\x03msg\x18\x03 \x01(\x0b\x32\x1a.messages.PostTelemetryMsg\"L\n\rAttributesMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\'\n\x03msg\x18\x02 \x01(\x0b\x32\x1a.messages.PostAttributeMsg\"Q\n\x0e\x43laimDeviceMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12+\n\x0c\x63laimRequest\x18\x02 \x01(\x0b\x32\x15.messages.ClaimDevice\":\n\x13GatewayTelemetryMsg\x12#\n\x03msg\x18\x01 \x03(\x0b\x32\x16.messages.TelemetryMsg\"8\n\x0fGatewayClaimMsg\x12%\n\x03msg\x18\x01 \x03(\x0b\x32\x18.messages.ClaimDeviceMsg\"<\n\x14GatewayAttributesMsg\x12$\n\x03msg\x18\x01 \x03(\x0b\x32\x17.messages.AttributesMsg\"E\n\x15GatewayRpcResponseMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\t\"i\n\x1bGatewayAttributeResponseMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x36\n\x0bresponseMsg\x18\x02 \x01(\x0b\x32!.messages.GetAttributeResponseMsg\"~\n%GatewayAttributeUpdateNotificationMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x41\n\x0fnotificationMsg\x18\x02 \x01(\x0b\x32(.messages.AttributeUpdateNotificationMsg\"h\n\x1aGatewayDeviceRpcRequestMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x36\n\rrpcRequestMsg\x18\x02 \x01(\x0b\x32\x1f.messages.ToDeviceRpcRequestMsg\"[\n\x1bGatewayAttributesRequestMsg\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x12\n\ndeviceName\x18\x02 \x01(\t\x12\x0e\n\x06\x63lient\x18\x03 \x01(\x08\x12\x0c\n\x04keys\x18\x04 \x03(\t*F\n\x0eResponseStatus\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0b\n\x07SUCCESS\x10\x01\x12\r\n\tNOT_FOUND\x10\x02\x12\x0b\n\x07\x46\x41ILURE\x10\x03*Q\n\x0cKeyValueType\x12\r\n\tBOOLEAN_V\x10\x00\x12\n\n\x06LONG_V\x10\x01\x12\x0c\n\x08\x44OUBLE_V\x10\x02\x12\x0c\n\x08STRING_V\x10\x03\x12\n\n\x06JSON_V\x10\x04\x32\x63\n\x15TBGatewayProtoService\x12J\n\x06stream\x12\x1e.messages.FromConnectorMessage\x1a\x1c.messages.FromServiceMessage(\x01\x30\x01\x62\x06proto3' ) _RESPONSESTATUS = _descriptor.EnumDescriptor( @@ -53,8 +53,8 @@ _RESPONSESTATUS = _descriptor.EnumDescriptor( ], containing_type=None, serialized_options=None, - serialized_start=3593, - serialized_end=3663, + serialized_start=3592, + serialized_end=3662, ) _sym_db.RegisterEnumDescriptor(_RESPONSESTATUS) @@ -94,8 +94,8 @@ _KEYVALUETYPE = _descriptor.EnumDescriptor( ], containing_type=None, serialized_options=None, - serialized_start=3665, - serialized_end=3746, + serialized_start=3664, + serialized_end=3745, ) _sym_db.RegisterEnumDescriptor(_KEYVALUETYPE) @@ -1048,7 +1048,7 @@ _UNREGISTERCONNECTORMSG = _descriptor.Descriptor( create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( - name='connectorName', full_name='messages.UnregisterConnectorMsg.connectorName', index=0, + name='connectorKey', full_name='messages.UnregisterConnectorMsg.connectorKey', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, @@ -1067,7 +1067,7 @@ _UNREGISTERCONNECTORMSG = _descriptor.Descriptor( oneofs=[ ], serialized_start=2492, - serialized_end=2539, + serialized_end=2538, ) @@ -1105,8 +1105,8 @@ _CONNECTORCONFIGURATIONMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2541, - serialized_end=2614, + serialized_start=2540, + serialized_end=2613, ) @@ -1144,8 +1144,8 @@ _CONNECTMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2616, - serialized_end=2668, + serialized_start=2615, + serialized_end=2667, ) @@ -1183,8 +1183,8 @@ _TELEMETRYMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2670, - serialized_end=2745, + serialized_start=2669, + serialized_end=2744, ) @@ -1222,8 +1222,8 @@ _ATTRIBUTESMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2747, - serialized_end=2823, + serialized_start=2746, + serialized_end=2822, ) @@ -1261,8 +1261,8 @@ _CLAIMDEVICEMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2825, - serialized_end=2906, + serialized_start=2824, + serialized_end=2905, ) @@ -1293,8 +1293,8 @@ _GATEWAYTELEMETRYMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2908, - serialized_end=2966, + serialized_start=2907, + serialized_end=2965, ) @@ -1325,8 +1325,8 @@ _GATEWAYCLAIMMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2968, - serialized_end=3024, + serialized_start=2967, + serialized_end=3023, ) @@ -1357,8 +1357,8 @@ _GATEWAYATTRIBUTESMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=3026, - serialized_end=3086, + serialized_start=3025, + serialized_end=3085, ) @@ -1403,8 +1403,8 @@ _GATEWAYRPCRESPONSEMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=3088, - serialized_end=3157, + serialized_start=3087, + serialized_end=3156, ) @@ -1442,8 +1442,8 @@ _GATEWAYATTRIBUTERESPONSEMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=3159, - serialized_end=3264, + serialized_start=3158, + serialized_end=3263, ) @@ -1481,8 +1481,8 @@ _GATEWAYATTRIBUTEUPDATENOTIFICATIONMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=3266, - serialized_end=3392, + serialized_start=3265, + serialized_end=3391, ) @@ -1520,8 +1520,8 @@ _GATEWAYDEVICERPCREQUESTMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=3394, - serialized_end=3498, + serialized_start=3393, + serialized_end=3497, ) @@ -1573,8 +1573,8 @@ _GATEWAYATTRIBUTESREQUESTMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=3500, - serialized_end=3591, + serialized_start=3499, + serialized_end=3590, ) _RESPONSE.fields_by_name['status'].enum_type = _RESPONSESTATUS @@ -1896,8 +1896,8 @@ _TBGATEWAYPROTOSERVICE = _descriptor.ServiceDescriptor( index=0, serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_start=3748, - serialized_end=3847, + serialized_start=3747, + serialized_end=3846, methods=[ _descriptor.MethodDescriptor( name='stream', diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 033ad877..3205115f 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -24,7 +24,7 @@ from sys import argv, executable, getsizeof from threading import RLock, Thread from time import sleep, time -from simplejson import dumps, load, loads +from simplejson import dumps, load, loads, JSONDecodeError from yaml import safe_load from thingsboard_gateway.gateway.grpc_service.tb_grpc_manager import TBGRPCServerManager, RegistrationStatus @@ -156,7 +156,7 @@ class TBGatewayService: self.__grpc_connectors = {} if self.__grpc_config is not None and self.__grpc_config.get("enabled"): self.__grpc_manager = TBGRPCServerManager(self.__grpc_config) - self.__grpc_manager.set_gateway_read_callbacks(self.__register_connector, self.send_to_storage) + self.__grpc_manager.set_gateway_read_callbacks(self.__register_connector, self.__unregister_connector, self.send_to_storage) self._load_connectors() self._connect_with_connectors() self.__load_persistent_devices() @@ -334,6 +334,7 @@ class TBGatewayService: connector.setName(target_connector['name']) self.available_connectors[connector.get_name()] = connector self.__grpc_manager.registration_finished(RegistrationStatus.SUCCESS, context, target_connector) + log.info("GRPC connector with key %s registered with name %s", connector_key, connector.get_name()) elif self.__grpc_connectors.get(connector_key) is not None: self.__grpc_manager.registration_finished(RegistrationStatus.FAILURE, context, None) log.error("GRPC connector with key: %s - already registered!", connector_key) @@ -341,6 +342,20 @@ class TBGatewayService: self.__grpc_manager.registration_finished(RegistrationStatus.NOT_FOUND, context, None) log.error("GRPC configuration for connector with key: %s - not found", connector_key) + def __unregister_connector(self, context, connector_key): + if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key]['name'] in self.available_connectors: + connector_name = self.__grpc_connectors[connector_key]['name'] + target_connector: GrpcConnector = self.available_connectors.pop(connector_name) + target_connector.close() + self.__grpc_manager.unregister(RegistrationStatus.SUCCESS, context, target_connector) + log.info("GRPC connector with key %s and name %s - unregistered", connector_key, target_connector.get_name()) + elif self.__grpc_connectors.get(connector_key) is not None: + self.__grpc_manager.unregister(RegistrationStatus.NOT_FOUND, context, None) + log.error("GRPC connector with key: %s - is not registered!", connector_key) + else: + self.__grpc_manager.unregister(RegistrationStatus.FAILURE, context, None) + log.error("GRPC configuration for connector with key: %s - not found in configuration and not registered", connector_key) + def _load_connectors(self): self.connectors_configs = {} connectors_persistent_keys = self.__load_persistent_connector_keys() @@ -367,17 +382,26 @@ class TBGatewayService: connector_persistent_key = connector['key'] log.info("Connector key for GRPC connector with name [%s] is: [%s]", connector['name'], connector_persistent_key) config_file_path = self._config_dir + connector['configuration'] + connector_conf_file_data = '' with open(config_file_path, 'r', encoding="UTF-8") as conf_file: - connector_conf = load(conf_file) - if not self.connectors_configs.get(connector['type']): - self.connectors_configs[connector['type']] = [] - if connector['type'] != 'grpc': - connector_conf["name"] = connector['name'] - self.connectors_configs[connector['type']].append({"name": connector['name'], - "config": {connector['configuration']: connector_conf} if connector['type'] != 'grpc' else connector_conf, - "config_updated": stat(config_file_path), - "config_file_path": config_file_path, - "grpc_key": connector_persistent_key}) + connector_conf_file_data = conf_file.read() + + connector_conf = connector_conf_file_data + try: + connector_conf = loads(connector_conf_file_data) + except JSONDecodeError as e: + log.debug(e) + log.warning("Cannot parse connector configuration as a JSON, it will be passed as a string.") + + if not self.connectors_configs.get(connector['type']): + self.connectors_configs[connector['type']] = [] + if connector['type'] != 'grpc' and isinstance(connector_conf, dict): + connector_conf["name"] = connector['name'] + self.connectors_configs[connector['type']].append({"name": connector['name'], + "config": {connector['configuration']: connector_conf} if connector['type'] != 'grpc' else connector_conf, + "config_updated": stat(config_file_path), + "config_file_path": config_file_path, + "grpc_key": connector_persistent_key}) except Exception as e: log.exception("Error on loading connector: %r", e) if connectors_persistent_keys: