From e15e11049c0a58f23aceeb66373cae500622b863 Mon Sep 17 00:00:00 2001 From: zbeacon Date: Thu, 25 Nov 2021 16:03:16 +0200 Subject: [PATCH] Added connector registration processing and condiguration send logic --- setup.py | 5 +- thingsboard_gateway/config/tb_gateway.yaml | 2 +- thingsboard_gateway/gateway/constants.py | 3 + .../gateway/grpc_service/grpc_connector.py | 45 +++++ .../gateway/grpc_service/tb_grpc_manager.py | 78 +++++++-- .../gateway/grpc_service/tb_grpc_server.py | 3 +- .../gateway/proto/messages.proto | 5 +- .../gateway/proto/messages_pb2.py | 83 +++++----- .../gateway/tb_gateway_service.py | 156 ++++++++++++------ 9 files changed, 264 insertions(+), 116 deletions(-) create mode 100644 thingsboard_gateway/gateway/grpc_service/grpc_connector.py diff --git a/setup.py b/setup.py index c63249d1..138e2163 100644 --- a/setup.py +++ b/setup.py @@ -35,8 +35,9 @@ setup( long_description_content_type="text/markdown", include_package_data=True, python_requires=">=3.7", - packages=['thingsboard_gateway', 'thingsboard_gateway.gateway', 'thingsboard_gateway.storage','thingsboard_gateway.storage.memory', - 'thingsboard_gateway.storage.file','thingsboard_gateway.storage.sqlite','thingsboard_gateway.tb_client', + packages=['thingsboard_gateway', 'thingsboard_gateway.gateway', 'thingsboard_gateway.gateway.proto', 'thingsboard_gateway.gateway.grpc_service', + 'thingsboard_gateway.storage', 'thingsboard_gateway.storage.memory', + 'thingsboard_gateway.storage.file', 'thingsboard_gateway.storage.sqlite', 'thingsboard_gateway.tb_client', 'thingsboard_gateway.connectors', 'thingsboard_gateway.connectors.ble', 'thingsboard_gateway.connectors.mqtt', 'thingsboard_gateway.connectors.opcua', 'thingsboard_gateway.connectors.request', 'thingsboard_gateway.connectors.modbus', 'thingsboard_gateway.connectors.can', 'thingsboard_gateway.connectors.bacnet', diff --git a/thingsboard_gateway/config/tb_gateway.yaml b/thingsboard_gateway/config/tb_gateway.yaml index 2fcc235d..782df0b8 100644 --- a/thingsboard_gateway/config/tb_gateway.yaml +++ b/thingsboard_gateway/config/tb_gateway.yaml @@ -23,7 +23,7 @@ storage: # messages_ttl_check_in_hours: 1 # messages_ttl_in_days: 7 grpc: - enable: true + enabled: false serverPort: 9595 connectors: - name: MQTT Broker Connector diff --git a/thingsboard_gateway/gateway/constants.py b/thingsboard_gateway/gateway/constants.py index 6a1658f1..dccaf84d 100644 --- a/thingsboard_gateway/gateway/constants.py +++ b/thingsboard_gateway/gateway/constants.py @@ -27,6 +27,9 @@ CONFIG_SECTION_PARAMETER = "config" CONFIG_SERVER_SECTION_PARAMETER = "server" CONFIG_DEVICES_SECTION_PARAMETER = "devices" +CONNECTED_DEVICES_FILENAME = "connected_devices.json" +PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME = "persistent_keys.json" + # Data parameter constants DEVICE_SECTION_PARAMETER = "device" diff --git a/thingsboard_gateway/gateway/grpc_service/grpc_connector.py b/thingsboard_gateway/gateway/grpc_service/grpc_connector.py new file mode 100644 index 00000000..5ee8c6fe --- /dev/null +++ b/thingsboard_gateway/gateway/grpc_service/grpc_connector.py @@ -0,0 +1,45 @@ +# Copyright 2021. ThingsBoard +# # +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# # +# http://www.apache.org/licenses/LICENSE-2.0 +# # +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from thingsboard_gateway.connectors.connector import Connector + + +class GrpcConnector(Connector): + def __init__(self, gateway, config, context): + self.name = None + + def setName(self, name): + self.name = name + + def open(self): + pass + + def close(self): + # send unregister + pass + + def get_name(self): + return self.name + + def is_connected(self): + pass + + def on_attributes_update(self, content): + # send updated + pass + + def server_side_rpc_handler(self, content): + # send command + pass diff --git a/thingsboard_gateway/gateway/grpc_service/tb_grpc_manager.py b/thingsboard_gateway/gateway/grpc_service/tb_grpc_manager.py index e34af362..37f1c8b4 100644 --- a/thingsboard_gateway/gateway/grpc_service/tb_grpc_manager.py +++ b/thingsboard_gateway/gateway/grpc_service/tb_grpc_manager.py @@ -15,42 +15,93 @@ import asyncio import grpc import logging +from threading import Thread from time import sleep +from enum import Enum +from simplejson import dumps from thingsboard_gateway.gateway.proto.messages_pb2_grpc import add_TBGatewayProtoServiceServicer_to_server -from thingsboard_gateway.gateway.proto.messages_pb2 import FromConnectorMessage +from thingsboard_gateway.gateway.proto.messages_pb2 import * from thingsboard_gateway.gateway.grpc_service.tb_grpc_server import TBGRPCServer log = logging.getLogger('service') -class TBGRPCServerManager: +class RegistrationStatus(Enum): + FAILURE = 1, + NOT_FOUND = 2, + SUCCESS = 3 + + +class TBGRPCServerManager(Thread): def __init__(self, config): - self.__aio_server = None + super().__init__() + self.daemon = True + self.setName("TB GRPC manager thread") + self.__aio_server: grpc.aio.Server = None self.__register_connector = None self.__send_data_to_storage = None self._stopped = False self.__config = config self.__grpc_port = config['serverPort'] self.__connectors_sessions = {} - self.__grpc_server = TBGRPCServer(self.read_cb, self.write_cb) + self.__grpc_server = TBGRPCServer(self.read_cb) + self.start() + + def run(self): + log.info("GRPC server started.") asyncio.run(self.serve(), debug=True) while not self._stopped: sleep(.1) - def write_cb(self): - pass - - def read_cb(self, context, msg:FromConnectorMessage): - #TODO parse incoming message - self.__send_data_to_storage() - self.write("", "") + def read_cb(self, context, msg: FromConnectorMessage): + log.debug("[GRPC] incoming message: %s", msg) + if msg.HasField("response"): + pass + if msg.HasField("gatewayTelemetryMsg"): + pass + if msg.HasField("gatewayAttributesMsg"): + pass + if msg.HasField("gatewayClaimMsg"): + pass + if msg.HasField("registerConnectorMsg"): + self.__register_connector(context, msg.registerConnectorMsg.connectorKey) + if msg.HasField("unregisterConnectorMsg"): + pass + if msg.HasField("connectMsg"): + pass + if msg.HasField("disconnectMsg"): + pass + if msg.HasField("gatewayRpcResponseMsg"): + pass + if msg.HasField("gatewayAttributeRequestMsg"): + pass + # self.__send_data_to_storage() + # self.write("", "") def write(self, connector_name, data): - # if self.__connectors_sessions.get(connector_name) is not None: + log.debug("[GRPC] outgoing message: %s", data) + if self.__connectors_sessions.get(connector_name) is not None: self.__grpc_server.write(self.__grpc_server.get_response('SUCCESS')) + def registration_finished(self, registration_result: RegistrationStatus, context, connector_configuration): + if registration_result == RegistrationStatus.SUCCESS: + connector_name = connector_configuration['name'] + self.__connectors_sessions[connector_name] = {"context": context, "config": connector_configuration} + msg = self.__grpc_server.get_response("SUCCESS") + configuration_msg = ConnectorConfigurationMsg() + configuration_msg.connectorName = connector_name + configuration_msg.configuration = dumps(connector_configuration['config']) + msg.connectorConfigurationMsg.MergeFrom(configuration_msg) + self.__grpc_server.write(msg) + elif registration_result == RegistrationStatus.NOT_FOUND: + msg = self.__grpc_server.get_response("NOT_FOUND") + self.__grpc_server.write(msg) + elif registration_result == RegistrationStatus.FAILURE: + msg = self.__grpc_server.get_response("FAILURE") + self.__grpc_server.write(msg) + async def serve(self): self.__aio_server = grpc.aio.server() add_TBGatewayProtoServiceServicer_to_server(self.__grpc_server, self.__aio_server) @@ -61,7 +112,8 @@ class TBGRPCServerManager: def stop(self): self._stopped = True if self.__aio_server is not None: - self.__aio_server.stop() + loop = asyncio.get_event_loop() + loop.create_task(self.__aio_server.stop(True)) def set_gateway_read_callbacks(self, register, send_data_to_storage): self.__register_connector = register diff --git a/thingsboard_gateway/gateway/grpc_service/tb_grpc_server.py b/thingsboard_gateway/gateway/grpc_service/tb_grpc_server.py index aece906d..98a194e8 100644 --- a/thingsboard_gateway/gateway/grpc_service/tb_grpc_server.py +++ b/thingsboard_gateway/gateway/grpc_service/tb_grpc_server.py @@ -8,9 +8,8 @@ import thingsboard_gateway.gateway.proto.messages_pb2_grpc as messages_pb2_grpc class TBGRPCServer(messages_pb2_grpc.TBGatewayProtoServiceServicer): - def __init__(self, read_callback, write_callback): + def __init__(self, read_callback): self._read_callback = read_callback - self._write_callback = write_callback self.__write_queue = Queue() def write(self, msg: FromServiceMessage): diff --git a/thingsboard_gateway/gateway/proto/messages.proto b/thingsboard_gateway/gateway/proto/messages.proto index 07003761..29e20558 100644 --- a/thingsboard_gateway/gateway/proto/messages.proto +++ b/thingsboard_gateway/gateway/proto/messages.proto @@ -141,8 +141,7 @@ message DisconnectMsg { } message RegisterConnectorMsg { - string connectorName = 1; - string connectorKey = 2; + string connectorKey = 1; } message UnregisterConnectorMsg { @@ -150,7 +149,7 @@ message UnregisterConnectorMsg { } message ConnectorConfigurationMsg { - string name = 1; + string connectorName = 1; string configuration = 2; } diff --git a/thingsboard_gateway/gateway/proto/messages_pb2.py b/thingsboard_gateway/gateway/proto/messages_pb2.py index 71d49b4c..343c3989 100644 --- a/thingsboard_gateway/gateway/proto/messages_pb2.py +++ b/thingsboard_gateway/gateway/proto/messages_pb2.py @@ -20,7 +20,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( syntax='proto3', serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\x0emessages.proto\x12\x08messages\"4\n\x08Response\x12(\n\x06status\x18\x01 \x01(\x0e\x32\x18.messages.ResponseStatus\"\xcf\x04\n\x14\x46romConnectorMessage\x12$\n\x08response\x18\x01 \x01(\x0b\x32\x12.messages.Response\x12:\n\x13gatewayTelemetryMsg\x18\x02 \x01(\x0b\x32\x1d.messages.GatewayTelemetryMsg\x12<\n\x14gatewayAttributesMsg\x18\x03 \x01(\x0b\x32\x1e.messages.GatewayAttributesMsg\x12\x32\n\x0fgatewayClaimMsg\x18\x04 \x01(\x0b\x32\x19.messages.GatewayClaimMsg\x12<\n\x14registerConnectorMsg\x18\x05 \x01(\x0b\x32\x1e.messages.RegisterConnectorMsg\x12@\n\x16unregisterConnectorMsg\x18\x06 \x01(\x0b\x32 .messages.UnregisterConnectorMsg\x12(\n\nconnectMsg\x18\x07 \x01(\x0b\x32\x14.messages.ConnectMsg\x12.\n\rdisconnectMsg\x18\x08 \x01(\x0b\x32\x17.messages.DisconnectMsg\x12>\n\x15gatewayRpcResponseMsg\x18\t \x01(\x0b\x32\x1f.messages.GatewayRpcResponseMsg\x12I\n\x1agatewayAttributeRequestMsg\x18\n \x01(\x0b\x32%.messages.GatewayAttributesRequestMsg\"\xba\x03\n\x12\x46romServiceMessage\x12$\n\x08response\x18\x01 \x01(\x0b\x32\x12.messages.Response\x12\x46\n\x19\x63onnectorConfigurationMsg\x18\x02 \x01(\x0b\x32#.messages.ConnectorConfigurationMsg\x12^\n%gatewayAttributeUpdateNotificationMsg\x18\x03 \x01(\x0b\x32/.messages.GatewayAttributeUpdateNotificationMsg\x12J\n\x1bgatewayAttributeResponseMsg\x18\x04 \x01(\x0b\x32%.messages.GatewayAttributeResponseMsg\x12H\n\x1agatewayDeviceRpcRequestMsg\x18\x05 \x01(\x0b\x32$.messages.GatewayDeviceRpcRequestMsg\x12@\n\x16unregisterConnectorMsg\x18\x06 \x01(\x0b\x32 .messages.UnregisterConnectorMsg\"\x96\x01\n\rKeyValueProto\x12\x0b\n\x03key\x18\x01 \x01(\t\x12$\n\x04type\x18\x02 \x01(\x0e\x32\x16.messages.KeyValueType\x12\x0e\n\x06\x62ool_v\x18\x03 \x01(\x08\x12\x0e\n\x06long_v\x18\x04 \x01(\x03\x12\x10\n\x08\x64ouble_v\x18\x05 \x01(\x01\x12\x10\n\x08string_v\x18\x06 \x01(\t\x12\x0e\n\x06json_v\x18\x07 \x01(\t\"<\n\tTsKvProto\x12\n\n\x02ts\x18\x01 \x01(\x03\x12#\n\x02kv\x18\x02 \x01(\x0b\x32\x17.messages.KeyValueProto\"@\n\rTsKvListProto\x12\n\n\x02ts\x18\x01 \x01(\x03\x12#\n\x02kv\x18\x02 \x03(\x0b\x32\x17.messages.KeyValueProto\"=\n\x10PostTelemetryMsg\x12)\n\x08tsKvList\x18\x01 \x03(\x0b\x32\x17.messages.TsKvListProto\"7\n\x10PostAttributeMsg\x12#\n\x02kv\x18\x01 \x03(\x0b\x32\x17.messages.KeyValueProto\"{\n\x16GetAttributeRequestMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x1c\n\x14\x63lientAttributeNames\x18\x02 \x03(\t\x12\x1c\n\x14sharedAttributeNames\x18\x03 \x03(\t\x12\x12\n\nonlyShared\x18\x04 \x01(\x08\"\xb7\x01\n\x17GetAttributeResponseMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x30\n\x13\x63lientAttributeList\x18\x02 \x03(\x0b\x32\x13.messages.TsKvProto\x12\x30\n\x13sharedAttributeList\x18\x03 \x03(\x0b\x32\x13.messages.TsKvProto\x12\r\n\x05\x65rror\x18\x05 \x01(\t\x12\x16\n\x0esharedStateMsg\x18\x06 \x01(\x08\"c\n\x1e\x41ttributeUpdateNotificationMsg\x12*\n\rsharedUpdated\x18\x01 \x03(\x0b\x32\x13.messages.TsKvProto\x12\x15\n\rsharedDeleted\x18\x02 \x03(\t\"N\n\x15ToDeviceRpcRequestMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x12\n\nmethodName\x18\x02 \x01(\t\x12\x0e\n\x06params\x18\x03 \x01(\t\"K\n\x16ToDeviceRpcResponseMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"N\n\x15ToServerRpcRequestMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x12\n\nmethodName\x18\x02 \x01(\t\x12\x0e\n\x06params\x18\x03 \x01(\t\"K\n\x16ToServerRpcResponseMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"4\n\x0b\x43laimDevice\x12\x11\n\tsecretKey\x18\x01 \x01(\t\x12\x12\n\ndurationMs\x18\x02 \x01(\x03\";\n\x11\x41ttributesRequest\x12\x12\n\nclientKeys\x18\x01 \x01(\t\x12\x12\n\nsharedKeys\x18\x02 \x01(\t\",\n\nRpcRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0e\n\x06params\x18\x02 \x01(\t\"#\n\rDisconnectMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\"C\n\x14RegisterConnectorMsg\x12\x15\n\rconnectorName\x18\x01 \x01(\t\x12\x14\n\x0c\x63onnectorKey\x18\x02 \x01(\t\"/\n\x16UnregisterConnectorMsg\x12\x15\n\rconnectorName\x18\x01 \x01(\t\"@\n\x19\x43onnectorConfigurationMsg\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x15\n\rconfiguration\x18\x02 \x01(\t\"4\n\nConnectMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x12\n\ndeviceType\x18\x02 \x01(\t\"K\n\x0cTelemetryMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\'\n\x03msg\x18\x03 \x01(\x0b\x32\x1a.messages.PostTelemetryMsg\"L\n\rAttributesMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\'\n\x03msg\x18\x02 \x01(\x0b\x32\x1a.messages.PostAttributeMsg\"Q\n\x0e\x43laimDeviceMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12+\n\x0c\x63laimRequest\x18\x02 \x01(\x0b\x32\x15.messages.ClaimDevice\":\n\x13GatewayTelemetryMsg\x12#\n\x03msg\x18\x01 \x03(\x0b\x32\x16.messages.TelemetryMsg\"8\n\x0fGatewayClaimMsg\x12%\n\x03msg\x18\x01 \x03(\x0b\x32\x18.messages.ClaimDeviceMsg\"<\n\x14GatewayAttributesMsg\x12$\n\x03msg\x18\x01 \x03(\x0b\x32\x17.messages.AttributesMsg\"E\n\x15GatewayRpcResponseMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\t\"i\n\x1bGatewayAttributeResponseMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x36\n\x0bresponseMsg\x18\x02 \x01(\x0b\x32!.messages.GetAttributeResponseMsg\"~\n%GatewayAttributeUpdateNotificationMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x41\n\x0fnotificationMsg\x18\x02 \x01(\x0b\x32(.messages.AttributeUpdateNotificationMsg\"h\n\x1aGatewayDeviceRpcRequestMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x36\n\rrpcRequestMsg\x18\x02 \x01(\x0b\x32\x1f.messages.ToDeviceRpcRequestMsg\"[\n\x1bGatewayAttributesRequestMsg\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x12\n\ndeviceName\x18\x02 \x01(\t\x12\x0e\n\x06\x63lient\x18\x03 \x01(\x08\x12\x0c\n\x04keys\x18\x04 \x03(\t*F\n\x0eResponseStatus\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0b\n\x07SUCCESS\x10\x01\x12\r\n\tNOT_FOUND\x10\x02\x12\x0b\n\x07\x46\x41ILURE\x10\x03*Q\n\x0cKeyValueType\x12\r\n\tBOOLEAN_V\x10\x00\x12\n\n\x06LONG_V\x10\x01\x12\x0c\n\x08\x44OUBLE_V\x10\x02\x12\x0c\n\x08STRING_V\x10\x03\x12\n\n\x06JSON_V\x10\x04\x32\x63\n\x15TBGatewayProtoService\x12J\n\x06stream\x12\x1e.messages.FromConnectorMessage\x1a\x1c.messages.FromServiceMessage(\x01\x30\x01\x62\x06proto3' + serialized_pb=b'\n\x0emessages.proto\x12\x08messages\"4\n\x08Response\x12(\n\x06status\x18\x01 \x01(\x0e\x32\x18.messages.ResponseStatus\"\xcf\x04\n\x14\x46romConnectorMessage\x12$\n\x08response\x18\x01 \x01(\x0b\x32\x12.messages.Response\x12:\n\x13gatewayTelemetryMsg\x18\x02 \x01(\x0b\x32\x1d.messages.GatewayTelemetryMsg\x12<\n\x14gatewayAttributesMsg\x18\x03 \x01(\x0b\x32\x1e.messages.GatewayAttributesMsg\x12\x32\n\x0fgatewayClaimMsg\x18\x04 \x01(\x0b\x32\x19.messages.GatewayClaimMsg\x12<\n\x14registerConnectorMsg\x18\x05 \x01(\x0b\x32\x1e.messages.RegisterConnectorMsg\x12@\n\x16unregisterConnectorMsg\x18\x06 \x01(\x0b\x32 .messages.UnregisterConnectorMsg\x12(\n\nconnectMsg\x18\x07 \x01(\x0b\x32\x14.messages.ConnectMsg\x12.\n\rdisconnectMsg\x18\x08 \x01(\x0b\x32\x17.messages.DisconnectMsg\x12>\n\x15gatewayRpcResponseMsg\x18\t \x01(\x0b\x32\x1f.messages.GatewayRpcResponseMsg\x12I\n\x1agatewayAttributeRequestMsg\x18\n \x01(\x0b\x32%.messages.GatewayAttributesRequestMsg\"\xba\x03\n\x12\x46romServiceMessage\x12$\n\x08response\x18\x01 \x01(\x0b\x32\x12.messages.Response\x12\x46\n\x19\x63onnectorConfigurationMsg\x18\x02 \x01(\x0b\x32#.messages.ConnectorConfigurationMsg\x12^\n%gatewayAttributeUpdateNotificationMsg\x18\x03 \x01(\x0b\x32/.messages.GatewayAttributeUpdateNotificationMsg\x12J\n\x1bgatewayAttributeResponseMsg\x18\x04 \x01(\x0b\x32%.messages.GatewayAttributeResponseMsg\x12H\n\x1agatewayDeviceRpcRequestMsg\x18\x05 \x01(\x0b\x32$.messages.GatewayDeviceRpcRequestMsg\x12@\n\x16unregisterConnectorMsg\x18\x06 \x01(\x0b\x32 .messages.UnregisterConnectorMsg\"\x96\x01\n\rKeyValueProto\x12\x0b\n\x03key\x18\x01 \x01(\t\x12$\n\x04type\x18\x02 \x01(\x0e\x32\x16.messages.KeyValueType\x12\x0e\n\x06\x62ool_v\x18\x03 \x01(\x08\x12\x0e\n\x06long_v\x18\x04 \x01(\x03\x12\x10\n\x08\x64ouble_v\x18\x05 \x01(\x01\x12\x10\n\x08string_v\x18\x06 \x01(\t\x12\x0e\n\x06json_v\x18\x07 \x01(\t\"<\n\tTsKvProto\x12\n\n\x02ts\x18\x01 \x01(\x03\x12#\n\x02kv\x18\x02 \x01(\x0b\x32\x17.messages.KeyValueProto\"@\n\rTsKvListProto\x12\n\n\x02ts\x18\x01 \x01(\x03\x12#\n\x02kv\x18\x02 \x03(\x0b\x32\x17.messages.KeyValueProto\"=\n\x10PostTelemetryMsg\x12)\n\x08tsKvList\x18\x01 \x03(\x0b\x32\x17.messages.TsKvListProto\"7\n\x10PostAttributeMsg\x12#\n\x02kv\x18\x01 \x03(\x0b\x32\x17.messages.KeyValueProto\"{\n\x16GetAttributeRequestMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x1c\n\x14\x63lientAttributeNames\x18\x02 \x03(\t\x12\x1c\n\x14sharedAttributeNames\x18\x03 \x03(\t\x12\x12\n\nonlyShared\x18\x04 \x01(\x08\"\xb7\x01\n\x17GetAttributeResponseMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x30\n\x13\x63lientAttributeList\x18\x02 \x03(\x0b\x32\x13.messages.TsKvProto\x12\x30\n\x13sharedAttributeList\x18\x03 \x03(\x0b\x32\x13.messages.TsKvProto\x12\r\n\x05\x65rror\x18\x05 \x01(\t\x12\x16\n\x0esharedStateMsg\x18\x06 \x01(\x08\"c\n\x1e\x41ttributeUpdateNotificationMsg\x12*\n\rsharedUpdated\x18\x01 \x03(\x0b\x32\x13.messages.TsKvProto\x12\x15\n\rsharedDeleted\x18\x02 \x03(\t\"N\n\x15ToDeviceRpcRequestMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x12\n\nmethodName\x18\x02 \x01(\t\x12\x0e\n\x06params\x18\x03 \x01(\t\"K\n\x16ToDeviceRpcResponseMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"N\n\x15ToServerRpcRequestMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x12\n\nmethodName\x18\x02 \x01(\t\x12\x0e\n\x06params\x18\x03 \x01(\t\"K\n\x16ToServerRpcResponseMsg\x12\x11\n\trequestId\x18\x01 \x01(\x05\x12\x0f\n\x07payload\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"4\n\x0b\x43laimDevice\x12\x11\n\tsecretKey\x18\x01 \x01(\t\x12\x12\n\ndurationMs\x18\x02 \x01(\x03\";\n\x11\x41ttributesRequest\x12\x12\n\nclientKeys\x18\x01 \x01(\t\x12\x12\n\nsharedKeys\x18\x02 \x01(\t\",\n\nRpcRequest\x12\x0e\n\x06method\x18\x01 \x01(\t\x12\x0e\n\x06params\x18\x02 \x01(\t\"#\n\rDisconnectMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\",\n\x14RegisterConnectorMsg\x12\x14\n\x0c\x63onnectorKey\x18\x01 \x01(\t\"/\n\x16UnregisterConnectorMsg\x12\x15\n\rconnectorName\x18\x01 \x01(\t\"I\n\x19\x43onnectorConfigurationMsg\x12\x15\n\rconnectorName\x18\x01 \x01(\t\x12\x15\n\rconfiguration\x18\x02 \x01(\t\"4\n\nConnectMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x12\n\ndeviceType\x18\x02 \x01(\t\"K\n\x0cTelemetryMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\'\n\x03msg\x18\x03 \x01(\x0b\x32\x1a.messages.PostTelemetryMsg\"L\n\rAttributesMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\'\n\x03msg\x18\x02 \x01(\x0b\x32\x1a.messages.PostAttributeMsg\"Q\n\x0e\x43laimDeviceMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12+\n\x0c\x63laimRequest\x18\x02 \x01(\x0b\x32\x15.messages.ClaimDevice\":\n\x13GatewayTelemetryMsg\x12#\n\x03msg\x18\x01 \x03(\x0b\x32\x16.messages.TelemetryMsg\"8\n\x0fGatewayClaimMsg\x12%\n\x03msg\x18\x01 \x03(\x0b\x32\x18.messages.ClaimDeviceMsg\"<\n\x14GatewayAttributesMsg\x12$\n\x03msg\x18\x01 \x03(\x0b\x32\x17.messages.AttributesMsg\"E\n\x15GatewayRpcResponseMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\x05\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\t\"i\n\x1bGatewayAttributeResponseMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x36\n\x0bresponseMsg\x18\x02 \x01(\x0b\x32!.messages.GetAttributeResponseMsg\"~\n%GatewayAttributeUpdateNotificationMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x41\n\x0fnotificationMsg\x18\x02 \x01(\x0b\x32(.messages.AttributeUpdateNotificationMsg\"h\n\x1aGatewayDeviceRpcRequestMsg\x12\x12\n\ndeviceName\x18\x01 \x01(\t\x12\x36\n\rrpcRequestMsg\x18\x02 \x01(\x0b\x32\x1f.messages.ToDeviceRpcRequestMsg\"[\n\x1bGatewayAttributesRequestMsg\x12\n\n\x02id\x18\x01 \x01(\x05\x12\x12\n\ndeviceName\x18\x02 \x01(\t\x12\x0e\n\x06\x63lient\x18\x03 \x01(\x08\x12\x0c\n\x04keys\x18\x04 \x03(\t*F\n\x0eResponseStatus\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0b\n\x07SUCCESS\x10\x01\x12\r\n\tNOT_FOUND\x10\x02\x12\x0b\n\x07\x46\x41ILURE\x10\x03*Q\n\x0cKeyValueType\x12\r\n\tBOOLEAN_V\x10\x00\x12\n\n\x06LONG_V\x10\x01\x12\x0c\n\x08\x44OUBLE_V\x10\x02\x12\x0c\n\x08STRING_V\x10\x03\x12\n\n\x06JSON_V\x10\x04\x32\x63\n\x15TBGatewayProtoService\x12J\n\x06stream\x12\x1e.messages.FromConnectorMessage\x1a\x1c.messages.FromServiceMessage(\x01\x30\x01\x62\x06proto3' ) _RESPONSESTATUS = _descriptor.EnumDescriptor( @@ -53,8 +53,8 @@ _RESPONSESTATUS = _descriptor.EnumDescriptor( ], containing_type=None, serialized_options=None, - serialized_start=3607, - serialized_end=3677, + serialized_start=3593, + serialized_end=3663, ) _sym_db.RegisterEnumDescriptor(_RESPONSESTATUS) @@ -94,8 +94,8 @@ _KEYVALUETYPE = _descriptor.EnumDescriptor( ], containing_type=None, serialized_options=None, - serialized_start=3679, - serialized_end=3760, + serialized_start=3665, + serialized_end=3746, ) _sym_db.RegisterEnumDescriptor(_KEYVALUETYPE) @@ -1016,19 +1016,12 @@ _REGISTERCONNECTORMSG = _descriptor.Descriptor( create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( - name='connectorName', full_name='messages.RegisterConnectorMsg.connectorName', index=0, + name='connectorKey', full_name='messages.RegisterConnectorMsg.connectorKey', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), - _descriptor.FieldDescriptor( - name='connectorKey', full_name='messages.RegisterConnectorMsg.connectorKey', index=1, - number=2, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=b"".decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), ], extensions=[ ], @@ -1042,7 +1035,7 @@ _REGISTERCONNECTORMSG = _descriptor.Descriptor( oneofs=[ ], serialized_start=2446, - serialized_end=2513, + serialized_end=2490, ) @@ -1073,8 +1066,8 @@ _UNREGISTERCONNECTORMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2515, - serialized_end=2562, + serialized_start=2492, + serialized_end=2539, ) @@ -1087,7 +1080,7 @@ _CONNECTORCONFIGURATIONMSG = _descriptor.Descriptor( create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( - name='name', full_name='messages.ConnectorConfigurationMsg.name', index=0, + name='connectorName', full_name='messages.ConnectorConfigurationMsg.connectorName', index=0, number=1, type=9, cpp_type=9, label=1, has_default_value=False, default_value=b"".decode('utf-8'), message_type=None, enum_type=None, containing_type=None, @@ -1112,8 +1105,8 @@ _CONNECTORCONFIGURATIONMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2564, - serialized_end=2628, + serialized_start=2541, + serialized_end=2614, ) @@ -1151,8 +1144,8 @@ _CONNECTMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2630, - serialized_end=2682, + serialized_start=2616, + serialized_end=2668, ) @@ -1190,8 +1183,8 @@ _TELEMETRYMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2684, - serialized_end=2759, + serialized_start=2670, + serialized_end=2745, ) @@ -1229,8 +1222,8 @@ _ATTRIBUTESMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2761, - serialized_end=2837, + serialized_start=2747, + serialized_end=2823, ) @@ -1268,8 +1261,8 @@ _CLAIMDEVICEMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2839, - serialized_end=2920, + serialized_start=2825, + serialized_end=2906, ) @@ -1300,8 +1293,8 @@ _GATEWAYTELEMETRYMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2922, - serialized_end=2980, + serialized_start=2908, + serialized_end=2966, ) @@ -1332,8 +1325,8 @@ _GATEWAYCLAIMMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=2982, - serialized_end=3038, + serialized_start=2968, + serialized_end=3024, ) @@ -1364,8 +1357,8 @@ _GATEWAYATTRIBUTESMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=3040, - serialized_end=3100, + serialized_start=3026, + serialized_end=3086, ) @@ -1410,8 +1403,8 @@ _GATEWAYRPCRESPONSEMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=3102, - serialized_end=3171, + serialized_start=3088, + serialized_end=3157, ) @@ -1449,8 +1442,8 @@ _GATEWAYATTRIBUTERESPONSEMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=3173, - serialized_end=3278, + serialized_start=3159, + serialized_end=3264, ) @@ -1488,8 +1481,8 @@ _GATEWAYATTRIBUTEUPDATENOTIFICATIONMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=3280, - serialized_end=3406, + serialized_start=3266, + serialized_end=3392, ) @@ -1527,8 +1520,8 @@ _GATEWAYDEVICERPCREQUESTMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=3408, - serialized_end=3512, + serialized_start=3394, + serialized_end=3498, ) @@ -1580,8 +1573,8 @@ _GATEWAYATTRIBUTESREQUESTMSG = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=3514, - serialized_end=3605, + serialized_start=3500, + serialized_end=3591, ) _RESPONSE.fields_by_name['status'].enum_type = _RESPONSESTATUS @@ -1903,8 +1896,8 @@ _TBGATEWAYPROTOSERVICE = _descriptor.ServiceDescriptor( index=0, serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_start=3762, - serialized_end=3861, + serialized_start=3748, + serialized_end=3847, methods=[ _descriptor.MethodDescriptor( name='stream', diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 13e09e6e..033ad877 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -15,10 +15,11 @@ import logging import logging.config import logging.handlers +from hashlib import md5 from os import execv, listdir, path, pathsep, stat, system from queue import Queue from random import choice -from string import ascii_lowercase +from string import ascii_lowercase, hexdigits from sys import argv, executable, getsizeof from threading import RLock, Thread from time import sleep, time @@ -26,8 +27,10 @@ from time import sleep, time from simplejson import dumps, load, loads from yaml import safe_load -from thingsboard_gateway.gateway.grpc_service.tb_grpc_manager import TBGRPCServerManager +from thingsboard_gateway.gateway.grpc_service.tb_grpc_manager import TBGRPCServerManager, RegistrationStatus +from thingsboard_gateway.gateway.grpc_service.grpc_connector import GrpcConnector from thingsboard_gateway.gateway.tb_client import TBClient +from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME, PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage from thingsboard_gateway.storage.memory.memory_event_storage import MemoryEventStorage from thingsboard_gateway.storage.sqlite.sqlite_event_storage import SQLiteEventStorage @@ -57,6 +60,13 @@ DEFAULT_CONNECTORS = { } +def load_file(path_to_file): + content = None + with open(path_to_file, 'r') as target_file: + content = load(target_file) + return content + + class TBGatewayService: def __init__(self, config_file=None): self.stopped = False @@ -88,7 +98,6 @@ class TBGatewayService: self.name = ''.join(choice(ascii_lowercase) for _ in range(64)) self.__rpc_register_queue = Queue(-1) self.__rpc_requests_in_progress = {} - self.__connected_devices_file = "connected_devices.json" self.tb_client = TBClient(self.__config["thingsboard"], self._config_dir) try: self.tb_client.disconnect() @@ -142,6 +151,12 @@ class TBGatewayService: self.__connected_devices = {} self.__load_persistent_devices() self.__init_remote_configuration() + self.__grpc_config = self.__config.get('grpc') + self.__grpc_manager = None + self.__grpc_connectors = {} + if self.__grpc_config is not None and self.__grpc_config.get("enabled"): + self.__grpc_manager = TBGRPCServerManager(self.__grpc_config) + self.__grpc_manager.set_gateway_read_callbacks(self.__register_connector, self.send_to_storage) self._load_connectors() self._connect_with_connectors() self.__load_persistent_devices() @@ -150,11 +165,6 @@ class TBGatewayService: name="Send data to Thingsboard Thread") self._send_thread.start() self.__min_pack_send_delay_ms = self.__config['thingsboard'].get('minPackSendDelayMS', 500) / 1000.0 - self.__grpc_config = self.__config.get('grpc') - self.__grpc_manager = None - if self.__grpc_config is not None and self.__grpc_config.get("enabled"): - self.__grpc_manager = TBGRPCServerManager(self.__grpc_config) - self.__grpc_manager.set_gateway_read_callbacks(self.__register_connector, self.send_to_storage) log.info("Gateway started.") try: @@ -317,33 +327,61 @@ class TBGatewayService: def __check_shared_attributes(self): self.tb_client.client.request_attributes(callback=self._attributes_parse) - def __register_connector(self, connector_name, connector_key): - # TODO IMPLEMENT - pass + def __register_connector(self, context, connector_key): + if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key]['name'] not in self.available_connectors: + target_connector = self.__grpc_connectors.get(connector_key) + connector = GrpcConnector(self, target_connector['config'], context) + connector.setName(target_connector['name']) + self.available_connectors[connector.get_name()] = connector + self.__grpc_manager.registration_finished(RegistrationStatus.SUCCESS, context, target_connector) + elif self.__grpc_connectors.get(connector_key) is not None: + self.__grpc_manager.registration_finished(RegistrationStatus.FAILURE, context, None) + log.error("GRPC connector with key: %s - already registered!", connector_key) + else: + self.__grpc_manager.registration_finished(RegistrationStatus.NOT_FOUND, context, None) + log.error("GRPC configuration for connector with key: %s - not found", connector_key) def _load_connectors(self): self.connectors_configs = {} + connectors_persistent_keys = self.__load_persistent_connector_keys() if self.__config.get("connectors"): for connector in self.__config['connectors']: try: - connector_class = TBModuleLoader.import_module(connector["type"], - self._default_connectors.get(connector["type"], - connector.get("class"))) - self._implemented_connectors[connector["type"]] = connector_class + connector_persistent_key = None + if connector['type'] == "grpc" and self.__grpc_manager is None: + log.error("Cannot load connector with name: %s and type grpc. GRPC server is disabled!", connector['name']) + continue + if connector['type'] != "grpc": + connector_class = TBModuleLoader.import_module(connector['type'], + self._default_connectors.get(connector['type'], + connector.get('class'))) + self._implemented_connectors[connector['type']] = connector_class + elif connector['type'] == "grpc": + if connector.get('key') == "auto": + if connectors_persistent_keys and connectors_persistent_keys.get(connector['name']) is not None: + connector_persistent_key = connectors_persistent_keys[connector['name']] + else: + connector_persistent_key = "".join(choice(hexdigits) for _ in range(10)) + connectors_persistent_keys[connector['name']] = connector_persistent_key + else: + connector_persistent_key = connector['key'] + log.info("Connector key for GRPC connector with name [%s] is: [%s]", connector['name'], connector_persistent_key) config_file_path = self._config_dir + connector['configuration'] with open(config_file_path, 'r', encoding="UTF-8") as conf_file: connector_conf = load(conf_file) if not self.connectors_configs.get(connector['type']): self.connectors_configs[connector['type']] = [] - connector_conf["name"] = connector["name"] - self.connectors_configs[connector['type']].append({"name": connector["name"], - "config": {connector[ - 'configuration']: connector_conf}, + if connector['type'] != 'grpc': + connector_conf["name"] = connector['name'] + self.connectors_configs[connector['type']].append({"name": connector['name'], + "config": {connector['configuration']: connector_conf} if connector['type'] != 'grpc' else connector_conf, "config_updated": stat(config_file_path), - "config_file_path": config_file_path}) + "config_file_path": config_file_path, + "grpc_key": connector_persistent_key}) except Exception as e: - log.error("Error on loading connector:") - log.exception(e) + log.exception("Error on loading connector: %r", e) + if connectors_persistent_keys: + self.__save_persistent_keys(connectors_persistent_keys) else: log.error("Connectors - not found! Check your configuration!") self.__init_remote_configuration(force=True) @@ -352,26 +390,26 @@ class TBGatewayService: def _connect_with_connectors(self): for connector_type in self.connectors_configs: for connector_config in self.connectors_configs[connector_type]: - for config in connector_config["config"]: - connector = None - try: - if connector_config["config"][config] is not None: - if self._implemented_connectors[connector_type]: - connector = self._implemented_connectors[connector_type](self, - connector_config["config"][ - config], - connector_type) - connector.setName(connector_config["name"]) - self.available_connectors[connector.get_name()] = connector - connector.open() + if connector_type.lower() != 'grpc': + for config in connector_config["config"]: + connector = None + try: + if connector_config["config"][config] is not None: + if self._implemented_connectors[connector_type]: + connector = self._implemented_connectors[connector_type](self, connector_config["config"][config], connector_type) + connector.setName(connector_config["name"]) + self.available_connectors[connector.get_name()] = connector + connector.open() + else: + log.warning("Connector implementation not found for %s", connector_config["name"]) else: - log.warning("Connector implementation not found for %s", connector_config["name"]) - else: - log.info("Config not found for %s", connector_type) - except Exception as e: - log.exception(e) - if connector is not None: - connector.close() + log.info("Config not found for %s", connector_type) + except Exception as e: + log.exception(e) + if connector is not None: + connector.close() + else: + self.__grpc_connectors.update({connector_config['grpc_key']: connector_config}) def check_connector_configuration_updates(self): configuration_changed = False @@ -757,18 +795,36 @@ class TBGatewayService: def get_devices(self): return self.__connected_devices - def __load_persistent_devices(self): - devices = {} - if self.__connected_devices_file in listdir(self._config_dir) and \ - path.getsize(self._config_dir + self.__connected_devices_file) > 0: + def __load_persistent_connector_keys(self): + persistent_keys = {} + if PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME in listdir(self._config_dir) and \ + path.getsize(self._config_dir + PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME) > 0: try: - with open(self._config_dir + self.__connected_devices_file) as devices_file: - devices = load(devices_file) + persistent_keys = load_file(self._config_dir + PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME) + except Exception as e: + log.exception(e) + log.debug("Loaded keys: %s", persistent_keys) + else: + log.debug("Persistent keys file not found") + return persistent_keys + + def __save_persistent_keys(self, persistent_keys): + try: + with open(self._config_dir + PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME, 'w') as persistent_keys_file: + persistent_keys_file.write(dumps(persistent_keys, indent=2, sort_keys=True)) + except Exception as e: + log.exception(e) + + def __load_persistent_devices(self): + devices = None + if CONNECTED_DEVICES_FILENAME in listdir(self._config_dir) and \ + path.getsize(self._config_dir + CONNECTED_DEVICES_FILENAME) > 0: + try: + devices = load_file(self._config_dir + CONNECTED_DEVICES_FILENAME) except Exception as e: log.exception(e) else: - connected_devices_file = open(self._config_dir + self.__connected_devices_file, 'w') - connected_devices_file.close() + open(self._config_dir + CONNECTED_DEVICES_FILENAME, 'w').close() if devices is not None: log.debug("Loaded devices:\n %s", devices) @@ -785,7 +841,7 @@ class TBGatewayService: self.__connected_devices = {} if self.__connected_devices is None else self.__connected_devices def __save_persistent_devices(self): - with open(self._config_dir + self.__connected_devices_file, 'w') as config_file: + with open(self._config_dir + CONNECTED_DEVICES_FILENAME, 'w') as config_file: try: data_to_save = {} for device in self.__connected_devices: