mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Added connector registration processing and condiguration send logic
This commit is contained in:
@@ -27,6 +27,9 @@ CONFIG_SECTION_PARAMETER = "config"
|
||||
CONFIG_SERVER_SECTION_PARAMETER = "server"
|
||||
CONFIG_DEVICES_SECTION_PARAMETER = "devices"
|
||||
|
||||
CONNECTED_DEVICES_FILENAME = "connected_devices.json"
|
||||
PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME = "persistent_keys.json"
|
||||
|
||||
# Data parameter constants
|
||||
|
||||
DEVICE_SECTION_PARAMETER = "device"
|
||||
|
||||
45
thingsboard_gateway/gateway/grpc_service/grpc_connector.py
Normal file
45
thingsboard_gateway/gateway/grpc_service/grpc_connector.py
Normal file
@@ -0,0 +1,45 @@
|
||||
# Copyright 2021. ThingsBoard
|
||||
# #
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# #
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# #
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from thingsboard_gateway.connectors.connector import Connector
|
||||
|
||||
|
||||
class GrpcConnector(Connector):
|
||||
def __init__(self, gateway, config, context):
|
||||
self.name = None
|
||||
|
||||
def setName(self, name):
|
||||
self.name = name
|
||||
|
||||
def open(self):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
# send unregister
|
||||
pass
|
||||
|
||||
def get_name(self):
|
||||
return self.name
|
||||
|
||||
def is_connected(self):
|
||||
pass
|
||||
|
||||
def on_attributes_update(self, content):
|
||||
# send updated
|
||||
pass
|
||||
|
||||
def server_side_rpc_handler(self, content):
|
||||
# send command
|
||||
pass
|
||||
@@ -15,42 +15,93 @@
|
||||
import asyncio
|
||||
import grpc
|
||||
import logging
|
||||
from threading import Thread
|
||||
from time import sleep
|
||||
from enum import Enum
|
||||
from simplejson import dumps
|
||||
|
||||
from thingsboard_gateway.gateway.proto.messages_pb2_grpc import add_TBGatewayProtoServiceServicer_to_server
|
||||
from thingsboard_gateway.gateway.proto.messages_pb2 import FromConnectorMessage
|
||||
from thingsboard_gateway.gateway.proto.messages_pb2 import *
|
||||
from thingsboard_gateway.gateway.grpc_service.tb_grpc_server import TBGRPCServer
|
||||
|
||||
|
||||
log = logging.getLogger('service')
|
||||
|
||||
|
||||
class TBGRPCServerManager:
|
||||
class RegistrationStatus(Enum):
|
||||
FAILURE = 1,
|
||||
NOT_FOUND = 2,
|
||||
SUCCESS = 3
|
||||
|
||||
|
||||
class TBGRPCServerManager(Thread):
|
||||
def __init__(self, config):
|
||||
self.__aio_server = None
|
||||
super().__init__()
|
||||
self.daemon = True
|
||||
self.setName("TB GRPC manager thread")
|
||||
self.__aio_server: grpc.aio.Server = None
|
||||
self.__register_connector = None
|
||||
self.__send_data_to_storage = None
|
||||
self._stopped = False
|
||||
self.__config = config
|
||||
self.__grpc_port = config['serverPort']
|
||||
self.__connectors_sessions = {}
|
||||
self.__grpc_server = TBGRPCServer(self.read_cb, self.write_cb)
|
||||
self.__grpc_server = TBGRPCServer(self.read_cb)
|
||||
self.start()
|
||||
|
||||
def run(self):
|
||||
log.info("GRPC server started.")
|
||||
asyncio.run(self.serve(), debug=True)
|
||||
while not self._stopped:
|
||||
sleep(.1)
|
||||
|
||||
def write_cb(self):
|
||||
pass
|
||||
|
||||
def read_cb(self, context, msg:FromConnectorMessage):
|
||||
#TODO parse incoming message
|
||||
self.__send_data_to_storage()
|
||||
self.write("", "")
|
||||
def read_cb(self, context, msg: FromConnectorMessage):
|
||||
log.debug("[GRPC] incoming message: %s", msg)
|
||||
if msg.HasField("response"):
|
||||
pass
|
||||
if msg.HasField("gatewayTelemetryMsg"):
|
||||
pass
|
||||
if msg.HasField("gatewayAttributesMsg"):
|
||||
pass
|
||||
if msg.HasField("gatewayClaimMsg"):
|
||||
pass
|
||||
if msg.HasField("registerConnectorMsg"):
|
||||
self.__register_connector(context, msg.registerConnectorMsg.connectorKey)
|
||||
if msg.HasField("unregisterConnectorMsg"):
|
||||
pass
|
||||
if msg.HasField("connectMsg"):
|
||||
pass
|
||||
if msg.HasField("disconnectMsg"):
|
||||
pass
|
||||
if msg.HasField("gatewayRpcResponseMsg"):
|
||||
pass
|
||||
if msg.HasField("gatewayAttributeRequestMsg"):
|
||||
pass
|
||||
# self.__send_data_to_storage()
|
||||
# self.write("", "")
|
||||
|
||||
def write(self, connector_name, data):
|
||||
# if self.__connectors_sessions.get(connector_name) is not None:
|
||||
log.debug("[GRPC] outgoing message: %s", data)
|
||||
if self.__connectors_sessions.get(connector_name) is not None:
|
||||
self.__grpc_server.write(self.__grpc_server.get_response('SUCCESS'))
|
||||
|
||||
def registration_finished(self, registration_result: RegistrationStatus, context, connector_configuration):
|
||||
if registration_result == RegistrationStatus.SUCCESS:
|
||||
connector_name = connector_configuration['name']
|
||||
self.__connectors_sessions[connector_name] = {"context": context, "config": connector_configuration}
|
||||
msg = self.__grpc_server.get_response("SUCCESS")
|
||||
configuration_msg = ConnectorConfigurationMsg()
|
||||
configuration_msg.connectorName = connector_name
|
||||
configuration_msg.configuration = dumps(connector_configuration['config'])
|
||||
msg.connectorConfigurationMsg.MergeFrom(configuration_msg)
|
||||
self.__grpc_server.write(msg)
|
||||
elif registration_result == RegistrationStatus.NOT_FOUND:
|
||||
msg = self.__grpc_server.get_response("NOT_FOUND")
|
||||
self.__grpc_server.write(msg)
|
||||
elif registration_result == RegistrationStatus.FAILURE:
|
||||
msg = self.__grpc_server.get_response("FAILURE")
|
||||
self.__grpc_server.write(msg)
|
||||
|
||||
async def serve(self):
|
||||
self.__aio_server = grpc.aio.server()
|
||||
add_TBGatewayProtoServiceServicer_to_server(self.__grpc_server, self.__aio_server)
|
||||
@@ -61,7 +112,8 @@ class TBGRPCServerManager:
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
if self.__aio_server is not None:
|
||||
self.__aio_server.stop()
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.create_task(self.__aio_server.stop(True))
|
||||
|
||||
def set_gateway_read_callbacks(self, register, send_data_to_storage):
|
||||
self.__register_connector = register
|
||||
|
||||
@@ -8,9 +8,8 @@ import thingsboard_gateway.gateway.proto.messages_pb2_grpc as messages_pb2_grpc
|
||||
|
||||
|
||||
class TBGRPCServer(messages_pb2_grpc.TBGatewayProtoServiceServicer):
|
||||
def __init__(self, read_callback, write_callback):
|
||||
def __init__(self, read_callback):
|
||||
self._read_callback = read_callback
|
||||
self._write_callback = write_callback
|
||||
self.__write_queue = Queue()
|
||||
|
||||
def write(self, msg: FromServiceMessage):
|
||||
|
||||
@@ -141,8 +141,7 @@ message DisconnectMsg {
|
||||
}
|
||||
|
||||
message RegisterConnectorMsg {
|
||||
string connectorName = 1;
|
||||
string connectorKey = 2;
|
||||
string connectorKey = 1;
|
||||
}
|
||||
|
||||
message UnregisterConnectorMsg {
|
||||
@@ -150,7 +149,7 @@ message UnregisterConnectorMsg {
|
||||
}
|
||||
|
||||
message ConnectorConfigurationMsg {
|
||||
string name = 1;
|
||||
string connectorName = 1;
|
||||
string configuration = 2;
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -15,10 +15,11 @@
|
||||
import logging
|
||||
import logging.config
|
||||
import logging.handlers
|
||||
from hashlib import md5
|
||||
from os import execv, listdir, path, pathsep, stat, system
|
||||
from queue import Queue
|
||||
from random import choice
|
||||
from string import ascii_lowercase
|
||||
from string import ascii_lowercase, hexdigits
|
||||
from sys import argv, executable, getsizeof
|
||||
from threading import RLock, Thread
|
||||
from time import sleep, time
|
||||
@@ -26,8 +27,10 @@ from time import sleep, time
|
||||
from simplejson import dumps, load, loads
|
||||
from yaml import safe_load
|
||||
|
||||
from thingsboard_gateway.gateway.grpc_service.tb_grpc_manager import TBGRPCServerManager
|
||||
from thingsboard_gateway.gateway.grpc_service.tb_grpc_manager import TBGRPCServerManager, RegistrationStatus
|
||||
from thingsboard_gateway.gateway.grpc_service.grpc_connector import GrpcConnector
|
||||
from thingsboard_gateway.gateway.tb_client import TBClient
|
||||
from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME, PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME
|
||||
from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage
|
||||
from thingsboard_gateway.storage.memory.memory_event_storage import MemoryEventStorage
|
||||
from thingsboard_gateway.storage.sqlite.sqlite_event_storage import SQLiteEventStorage
|
||||
@@ -57,6 +60,13 @@ DEFAULT_CONNECTORS = {
|
||||
}
|
||||
|
||||
|
||||
def load_file(path_to_file):
|
||||
content = None
|
||||
with open(path_to_file, 'r') as target_file:
|
||||
content = load(target_file)
|
||||
return content
|
||||
|
||||
|
||||
class TBGatewayService:
|
||||
def __init__(self, config_file=None):
|
||||
self.stopped = False
|
||||
@@ -88,7 +98,6 @@ class TBGatewayService:
|
||||
self.name = ''.join(choice(ascii_lowercase) for _ in range(64))
|
||||
self.__rpc_register_queue = Queue(-1)
|
||||
self.__rpc_requests_in_progress = {}
|
||||
self.__connected_devices_file = "connected_devices.json"
|
||||
self.tb_client = TBClient(self.__config["thingsboard"], self._config_dir)
|
||||
try:
|
||||
self.tb_client.disconnect()
|
||||
@@ -142,6 +151,12 @@ class TBGatewayService:
|
||||
self.__connected_devices = {}
|
||||
self.__load_persistent_devices()
|
||||
self.__init_remote_configuration()
|
||||
self.__grpc_config = self.__config.get('grpc')
|
||||
self.__grpc_manager = None
|
||||
self.__grpc_connectors = {}
|
||||
if self.__grpc_config is not None and self.__grpc_config.get("enabled"):
|
||||
self.__grpc_manager = TBGRPCServerManager(self.__grpc_config)
|
||||
self.__grpc_manager.set_gateway_read_callbacks(self.__register_connector, self.send_to_storage)
|
||||
self._load_connectors()
|
||||
self._connect_with_connectors()
|
||||
self.__load_persistent_devices()
|
||||
@@ -150,11 +165,6 @@ class TBGatewayService:
|
||||
name="Send data to Thingsboard Thread")
|
||||
self._send_thread.start()
|
||||
self.__min_pack_send_delay_ms = self.__config['thingsboard'].get('minPackSendDelayMS', 500) / 1000.0
|
||||
self.__grpc_config = self.__config.get('grpc')
|
||||
self.__grpc_manager = None
|
||||
if self.__grpc_config is not None and self.__grpc_config.get("enabled"):
|
||||
self.__grpc_manager = TBGRPCServerManager(self.__grpc_config)
|
||||
self.__grpc_manager.set_gateway_read_callbacks(self.__register_connector, self.send_to_storage)
|
||||
log.info("Gateway started.")
|
||||
|
||||
try:
|
||||
@@ -317,33 +327,61 @@ class TBGatewayService:
|
||||
def __check_shared_attributes(self):
|
||||
self.tb_client.client.request_attributes(callback=self._attributes_parse)
|
||||
|
||||
def __register_connector(self, connector_name, connector_key):
|
||||
# TODO IMPLEMENT
|
||||
pass
|
||||
def __register_connector(self, context, connector_key):
|
||||
if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key]['name'] not in self.available_connectors:
|
||||
target_connector = self.__grpc_connectors.get(connector_key)
|
||||
connector = GrpcConnector(self, target_connector['config'], context)
|
||||
connector.setName(target_connector['name'])
|
||||
self.available_connectors[connector.get_name()] = connector
|
||||
self.__grpc_manager.registration_finished(RegistrationStatus.SUCCESS, context, target_connector)
|
||||
elif self.__grpc_connectors.get(connector_key) is not None:
|
||||
self.__grpc_manager.registration_finished(RegistrationStatus.FAILURE, context, None)
|
||||
log.error("GRPC connector with key: %s - already registered!", connector_key)
|
||||
else:
|
||||
self.__grpc_manager.registration_finished(RegistrationStatus.NOT_FOUND, context, None)
|
||||
log.error("GRPC configuration for connector with key: %s - not found", connector_key)
|
||||
|
||||
def _load_connectors(self):
|
||||
self.connectors_configs = {}
|
||||
connectors_persistent_keys = self.__load_persistent_connector_keys()
|
||||
if self.__config.get("connectors"):
|
||||
for connector in self.__config['connectors']:
|
||||
try:
|
||||
connector_class = TBModuleLoader.import_module(connector["type"],
|
||||
self._default_connectors.get(connector["type"],
|
||||
connector.get("class")))
|
||||
self._implemented_connectors[connector["type"]] = connector_class
|
||||
connector_persistent_key = None
|
||||
if connector['type'] == "grpc" and self.__grpc_manager is None:
|
||||
log.error("Cannot load connector with name: %s and type grpc. GRPC server is disabled!", connector['name'])
|
||||
continue
|
||||
if connector['type'] != "grpc":
|
||||
connector_class = TBModuleLoader.import_module(connector['type'],
|
||||
self._default_connectors.get(connector['type'],
|
||||
connector.get('class')))
|
||||
self._implemented_connectors[connector['type']] = connector_class
|
||||
elif connector['type'] == "grpc":
|
||||
if connector.get('key') == "auto":
|
||||
if connectors_persistent_keys and connectors_persistent_keys.get(connector['name']) is not None:
|
||||
connector_persistent_key = connectors_persistent_keys[connector['name']]
|
||||
else:
|
||||
connector_persistent_key = "".join(choice(hexdigits) for _ in range(10))
|
||||
connectors_persistent_keys[connector['name']] = connector_persistent_key
|
||||
else:
|
||||
connector_persistent_key = connector['key']
|
||||
log.info("Connector key for GRPC connector with name [%s] is: [%s]", connector['name'], connector_persistent_key)
|
||||
config_file_path = self._config_dir + connector['configuration']
|
||||
with open(config_file_path, 'r', encoding="UTF-8") as conf_file:
|
||||
connector_conf = load(conf_file)
|
||||
if not self.connectors_configs.get(connector['type']):
|
||||
self.connectors_configs[connector['type']] = []
|
||||
connector_conf["name"] = connector["name"]
|
||||
self.connectors_configs[connector['type']].append({"name": connector["name"],
|
||||
"config": {connector[
|
||||
'configuration']: connector_conf},
|
||||
if connector['type'] != 'grpc':
|
||||
connector_conf["name"] = connector['name']
|
||||
self.connectors_configs[connector['type']].append({"name": connector['name'],
|
||||
"config": {connector['configuration']: connector_conf} if connector['type'] != 'grpc' else connector_conf,
|
||||
"config_updated": stat(config_file_path),
|
||||
"config_file_path": config_file_path})
|
||||
"config_file_path": config_file_path,
|
||||
"grpc_key": connector_persistent_key})
|
||||
except Exception as e:
|
||||
log.error("Error on loading connector:")
|
||||
log.exception(e)
|
||||
log.exception("Error on loading connector: %r", e)
|
||||
if connectors_persistent_keys:
|
||||
self.__save_persistent_keys(connectors_persistent_keys)
|
||||
else:
|
||||
log.error("Connectors - not found! Check your configuration!")
|
||||
self.__init_remote_configuration(force=True)
|
||||
@@ -352,26 +390,26 @@ class TBGatewayService:
|
||||
def _connect_with_connectors(self):
|
||||
for connector_type in self.connectors_configs:
|
||||
for connector_config in self.connectors_configs[connector_type]:
|
||||
for config in connector_config["config"]:
|
||||
connector = None
|
||||
try:
|
||||
if connector_config["config"][config] is not None:
|
||||
if self._implemented_connectors[connector_type]:
|
||||
connector = self._implemented_connectors[connector_type](self,
|
||||
connector_config["config"][
|
||||
config],
|
||||
connector_type)
|
||||
connector.setName(connector_config["name"])
|
||||
self.available_connectors[connector.get_name()] = connector
|
||||
connector.open()
|
||||
if connector_type.lower() != 'grpc':
|
||||
for config in connector_config["config"]:
|
||||
connector = None
|
||||
try:
|
||||
if connector_config["config"][config] is not None:
|
||||
if self._implemented_connectors[connector_type]:
|
||||
connector = self._implemented_connectors[connector_type](self, connector_config["config"][config], connector_type)
|
||||
connector.setName(connector_config["name"])
|
||||
self.available_connectors[connector.get_name()] = connector
|
||||
connector.open()
|
||||
else:
|
||||
log.warning("Connector implementation not found for %s", connector_config["name"])
|
||||
else:
|
||||
log.warning("Connector implementation not found for %s", connector_config["name"])
|
||||
else:
|
||||
log.info("Config not found for %s", connector_type)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
if connector is not None:
|
||||
connector.close()
|
||||
log.info("Config not found for %s", connector_type)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
if connector is not None:
|
||||
connector.close()
|
||||
else:
|
||||
self.__grpc_connectors.update({connector_config['grpc_key']: connector_config})
|
||||
|
||||
def check_connector_configuration_updates(self):
|
||||
configuration_changed = False
|
||||
@@ -757,18 +795,36 @@ class TBGatewayService:
|
||||
def get_devices(self):
|
||||
return self.__connected_devices
|
||||
|
||||
def __load_persistent_devices(self):
|
||||
devices = {}
|
||||
if self.__connected_devices_file in listdir(self._config_dir) and \
|
||||
path.getsize(self._config_dir + self.__connected_devices_file) > 0:
|
||||
def __load_persistent_connector_keys(self):
|
||||
persistent_keys = {}
|
||||
if PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME in listdir(self._config_dir) and \
|
||||
path.getsize(self._config_dir + PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME) > 0:
|
||||
try:
|
||||
with open(self._config_dir + self.__connected_devices_file) as devices_file:
|
||||
devices = load(devices_file)
|
||||
persistent_keys = load_file(self._config_dir + PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
log.debug("Loaded keys: %s", persistent_keys)
|
||||
else:
|
||||
log.debug("Persistent keys file not found")
|
||||
return persistent_keys
|
||||
|
||||
def __save_persistent_keys(self, persistent_keys):
|
||||
try:
|
||||
with open(self._config_dir + PERSISTENT_GRPC_CONNECTORS_KEY_FILENAME, 'w') as persistent_keys_file:
|
||||
persistent_keys_file.write(dumps(persistent_keys, indent=2, sort_keys=True))
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
|
||||
def __load_persistent_devices(self):
|
||||
devices = None
|
||||
if CONNECTED_DEVICES_FILENAME in listdir(self._config_dir) and \
|
||||
path.getsize(self._config_dir + CONNECTED_DEVICES_FILENAME) > 0:
|
||||
try:
|
||||
devices = load_file(self._config_dir + CONNECTED_DEVICES_FILENAME)
|
||||
except Exception as e:
|
||||
log.exception(e)
|
||||
else:
|
||||
connected_devices_file = open(self._config_dir + self.__connected_devices_file, 'w')
|
||||
connected_devices_file.close()
|
||||
open(self._config_dir + CONNECTED_DEVICES_FILENAME, 'w').close()
|
||||
|
||||
if devices is not None:
|
||||
log.debug("Loaded devices:\n %s", devices)
|
||||
@@ -785,7 +841,7 @@ class TBGatewayService:
|
||||
self.__connected_devices = {} if self.__connected_devices is None else self.__connected_devices
|
||||
|
||||
def __save_persistent_devices(self):
|
||||
with open(self._config_dir + self.__connected_devices_file, 'w') as config_file:
|
||||
with open(self._config_dir + CONNECTED_DEVICES_FILENAME, 'w') as config_file:
|
||||
try:
|
||||
data_to_save = {}
|
||||
for device in self.__connected_devices:
|
||||
|
||||
Reference in New Issue
Block a user