mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Added registration/unregistration processing
This commit is contained in:
@@ -13,18 +13,20 @@
|
||||
# limitations under the License.
|
||||
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
import grpc
|
||||
import logging
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
from enum import Enum
|
||||
from simplejson import dumps
|
||||
from thingsboard_gateway.gateway.grpc_service.grpc_connector import GrpcConnector
|
||||
|
||||
from thingsboard_gateway.gateway.proto.messages_pb2_grpc import add_TBGatewayProtoServiceServicer_to_server
|
||||
from thingsboard_gateway.gateway.proto.messages_pb2 import *
|
||||
from thingsboard_gateway.gateway.grpc_service.tb_grpc_server import TBGRPCServer
|
||||
|
||||
|
||||
log = logging.getLogger('service')
|
||||
|
||||
|
||||
@@ -41,6 +43,7 @@ class TBGRPCServerManager(Thread):
|
||||
self.setName("TB GRPC manager thread")
|
||||
self.__aio_server: grpc.aio.Server = None
|
||||
self.__register_connector = None
|
||||
self.__unregister_connector = None
|
||||
self.__send_data_to_storage = None
|
||||
self._stopped = False
|
||||
self.__config = config
|
||||
@@ -68,7 +71,7 @@ class TBGRPCServerManager(Thread):
|
||||
if msg.HasField("registerConnectorMsg"):
|
||||
self.__register_connector(context, msg.registerConnectorMsg.connectorKey)
|
||||
if msg.HasField("unregisterConnectorMsg"):
|
||||
pass
|
||||
self.__unregister_connector(context, msg.unregisterConnectorMsg.connectorKey)
|
||||
if msg.HasField("connectMsg"):
|
||||
pass
|
||||
if msg.HasField("disconnectMsg"):
|
||||
@@ -102,8 +105,29 @@ class TBGRPCServerManager(Thread):
|
||||
msg = self.__grpc_server.get_response("FAILURE")
|
||||
self.__grpc_server.write(msg)
|
||||
|
||||
def unregister(self, unregistration_result: RegistrationStatus, context, connector: Optional[GrpcConnector]):
|
||||
if unregistration_result == RegistrationStatus.SUCCESS:
|
||||
connector_name = connector.get_name()
|
||||
connector_session = self.__connectors_sessions.pop(connector_name)
|
||||
msg = self.__grpc_server.get_response("SUCCESS")
|
||||
self.__grpc_server.write(msg)
|
||||
elif unregistration_result == RegistrationStatus.NOT_FOUND:
|
||||
msg = self.__grpc_server.get_response("NOT_FOUND")
|
||||
self.__grpc_server.write(msg)
|
||||
elif unregistration_result == RegistrationStatus.FAILURE:
|
||||
msg = self.__grpc_server.get_response("FAILURE")
|
||||
self.__grpc_server.write(msg)
|
||||
|
||||
async def serve(self):
|
||||
self.__aio_server = grpc.aio.server()
|
||||
self.__aio_server = grpc.aio.server(
|
||||
options=(
|
||||
('grpc.keepalive_time_ms', 10000),
|
||||
('grpc.keepalive_timeout_ms', 5000),
|
||||
('grpc.keepalive_permit_without_calls', True),
|
||||
('grpc.http2.max_pings_without_data', 0),
|
||||
('grpc.http2.min_time_between_pings_ms', 10000),
|
||||
('grpc.http2.min_ping_interval_without_data_ms', 5000),
|
||||
))
|
||||
add_TBGatewayProtoServiceServicer_to_server(self.__grpc_server, self.__aio_server)
|
||||
self.__aio_server.add_insecure_port("[::]:%s" % (self.__grpc_port,))
|
||||
await self.__aio_server.start()
|
||||
@@ -115,9 +139,10 @@ class TBGRPCServerManager(Thread):
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.create_task(self.__aio_server.stop(True))
|
||||
|
||||
def set_gateway_read_callbacks(self, register, send_data_to_storage):
|
||||
self.__register_connector = register
|
||||
self.__send_data_to_storage = send_data_to_storage
|
||||
def set_gateway_read_callbacks(self, registeration_cb, unregistration_cb, send_data_cb):
|
||||
self.__register_connector = registeration_cb
|
||||
self.__unregister_connector = unregistration_cb
|
||||
self.__send_data_to_storage = send_data_cb
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -145,7 +145,7 @@ message RegisterConnectorMsg {
|
||||
}
|
||||
|
||||
message UnregisterConnectorMsg {
|
||||
string connectorName = 1;
|
||||
string connectorKey = 1;
|
||||
}
|
||||
|
||||
message ConnectorConfigurationMsg {
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -24,7 +24,7 @@ from sys import argv, executable, getsizeof
|
||||
from threading import RLock, Thread
|
||||
from time import sleep, time
|
||||
|
||||
from simplejson import dumps, load, loads
|
||||
from simplejson import dumps, load, loads, JSONDecodeError
|
||||
from yaml import safe_load
|
||||
|
||||
from thingsboard_gateway.gateway.grpc_service.tb_grpc_manager import TBGRPCServerManager, RegistrationStatus
|
||||
@@ -156,7 +156,7 @@ class TBGatewayService:
|
||||
self.__grpc_connectors = {}
|
||||
if self.__grpc_config is not None and self.__grpc_config.get("enabled"):
|
||||
self.__grpc_manager = TBGRPCServerManager(self.__grpc_config)
|
||||
self.__grpc_manager.set_gateway_read_callbacks(self.__register_connector, self.send_to_storage)
|
||||
self.__grpc_manager.set_gateway_read_callbacks(self.__register_connector, self.__unregister_connector, self.send_to_storage)
|
||||
self._load_connectors()
|
||||
self._connect_with_connectors()
|
||||
self.__load_persistent_devices()
|
||||
@@ -334,6 +334,7 @@ class TBGatewayService:
|
||||
connector.setName(target_connector['name'])
|
||||
self.available_connectors[connector.get_name()] = connector
|
||||
self.__grpc_manager.registration_finished(RegistrationStatus.SUCCESS, context, target_connector)
|
||||
log.info("GRPC connector with key %s registered with name %s", connector_key, connector.get_name())
|
||||
elif self.__grpc_connectors.get(connector_key) is not None:
|
||||
self.__grpc_manager.registration_finished(RegistrationStatus.FAILURE, context, None)
|
||||
log.error("GRPC connector with key: %s - already registered!", connector_key)
|
||||
@@ -341,6 +342,20 @@ class TBGatewayService:
|
||||
self.__grpc_manager.registration_finished(RegistrationStatus.NOT_FOUND, context, None)
|
||||
log.error("GRPC configuration for connector with key: %s - not found", connector_key)
|
||||
|
||||
def __unregister_connector(self, context, connector_key):
|
||||
if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key]['name'] in self.available_connectors:
|
||||
connector_name = self.__grpc_connectors[connector_key]['name']
|
||||
target_connector: GrpcConnector = self.available_connectors.pop(connector_name)
|
||||
target_connector.close()
|
||||
self.__grpc_manager.unregister(RegistrationStatus.SUCCESS, context, target_connector)
|
||||
log.info("GRPC connector with key %s and name %s - unregistered", connector_key, target_connector.get_name())
|
||||
elif self.__grpc_connectors.get(connector_key) is not None:
|
||||
self.__grpc_manager.unregister(RegistrationStatus.NOT_FOUND, context, None)
|
||||
log.error("GRPC connector with key: %s - is not registered!", connector_key)
|
||||
else:
|
||||
self.__grpc_manager.unregister(RegistrationStatus.FAILURE, context, None)
|
||||
log.error("GRPC configuration for connector with key: %s - not found in configuration and not registered", connector_key)
|
||||
|
||||
def _load_connectors(self):
|
||||
self.connectors_configs = {}
|
||||
connectors_persistent_keys = self.__load_persistent_connector_keys()
|
||||
@@ -367,17 +382,26 @@ class TBGatewayService:
|
||||
connector_persistent_key = connector['key']
|
||||
log.info("Connector key for GRPC connector with name [%s] is: [%s]", connector['name'], connector_persistent_key)
|
||||
config_file_path = self._config_dir + connector['configuration']
|
||||
connector_conf_file_data = ''
|
||||
with open(config_file_path, 'r', encoding="UTF-8") as conf_file:
|
||||
connector_conf = load(conf_file)
|
||||
if not self.connectors_configs.get(connector['type']):
|
||||
self.connectors_configs[connector['type']] = []
|
||||
if connector['type'] != 'grpc':
|
||||
connector_conf["name"] = connector['name']
|
||||
self.connectors_configs[connector['type']].append({"name": connector['name'],
|
||||
"config": {connector['configuration']: connector_conf} if connector['type'] != 'grpc' else connector_conf,
|
||||
"config_updated": stat(config_file_path),
|
||||
"config_file_path": config_file_path,
|
||||
"grpc_key": connector_persistent_key})
|
||||
connector_conf_file_data = conf_file.read()
|
||||
|
||||
connector_conf = connector_conf_file_data
|
||||
try:
|
||||
connector_conf = loads(connector_conf_file_data)
|
||||
except JSONDecodeError as e:
|
||||
log.debug(e)
|
||||
log.warning("Cannot parse connector configuration as a JSON, it will be passed as a string.")
|
||||
|
||||
if not self.connectors_configs.get(connector['type']):
|
||||
self.connectors_configs[connector['type']] = []
|
||||
if connector['type'] != 'grpc' and isinstance(connector_conf, dict):
|
||||
connector_conf["name"] = connector['name']
|
||||
self.connectors_configs[connector['type']].append({"name": connector['name'],
|
||||
"config": {connector['configuration']: connector_conf} if connector['type'] != 'grpc' else connector_conf,
|
||||
"config_updated": stat(config_file_path),
|
||||
"config_file_path": config_file_path,
|
||||
"grpc_key": connector_persistent_key})
|
||||
except Exception as e:
|
||||
log.exception("Error on loading connector: %r", e)
|
||||
if connectors_persistent_keys:
|
||||
|
||||
Reference in New Issue
Block a user