mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Improvements
This commit is contained in:
@@ -14,31 +14,38 @@
|
||||
|
||||
import asyncio
|
||||
import grpc
|
||||
import logging
|
||||
from time import sleep
|
||||
|
||||
from thingsboard_gateway.gateway.proto.messages_pb2_grpc import add_TBGatewayProtoServiceServicer_to_server
|
||||
from thingsboard_gateway.gateway.proto.messages_pb2 import FromConnectorMessage
|
||||
from thingsboard_gateway.gateway.grpc_service.tb_grpc_server import TBGRPCServer
|
||||
|
||||
|
||||
log = logging.getLogger('service')
|
||||
|
||||
|
||||
class TBGRPCServerManager:
|
||||
def __init__(self, config):
|
||||
self.__aio_server = None
|
||||
self.__register_connector = None
|
||||
self.__send_data_to_storage = None
|
||||
self.stopped = False
|
||||
self._stopped = False
|
||||
self.__config = config
|
||||
self.__grpc_port = config['serverPort']
|
||||
self.__connectors_sessions = {}
|
||||
self.__grpc_server = TBGRPCServer(self.read_cb, self.write_cb)
|
||||
asyncio.run(self.serve(), debug=True)
|
||||
while not self.stopped:
|
||||
while not self._stopped:
|
||||
sleep(.1)
|
||||
|
||||
def write_cb(self):
|
||||
pass
|
||||
|
||||
def read_cb(self, msg):
|
||||
def read_cb(self, context, msg:FromConnectorMessage):
|
||||
#TODO parse incoming message
|
||||
self.__send_data_to_storage()
|
||||
self.write("", "")
|
||||
pass
|
||||
|
||||
def write(self, connector_name, data):
|
||||
# if self.__connectors_sessions.get(connector_name) is not None:
|
||||
@@ -52,8 +59,9 @@ class TBGRPCServerManager:
|
||||
await self.__aio_server.wait_for_termination()
|
||||
|
||||
def stop(self):
|
||||
self.stopped = True
|
||||
self.__aio_server.stop()
|
||||
self._stopped = True
|
||||
if self.__aio_server is not None:
|
||||
self.__aio_server.stop()
|
||||
|
||||
def set_gateway_read_callbacks(self, register, send_data_to_storage):
|
||||
self.__register_connector = register
|
||||
|
||||
@@ -33,7 +33,6 @@ class TBGRPCServer(messages_pb2_grpc.TBGatewayProtoServiceServicer):
|
||||
read_task = asyncio.create_task(self.__read_task(context, request_iterator))
|
||||
await read_task
|
||||
|
||||
@classmethod
|
||||
@staticmethod
|
||||
def get_response(name):
|
||||
msg = FromServiceMessage()
|
||||
|
||||
@@ -247,6 +247,7 @@ class TBGatewayService:
|
||||
self.stopped = True
|
||||
self.__updater.stop()
|
||||
log.info("Stopping...")
|
||||
self.__grpc_manager.stop()
|
||||
self.__close_connectors()
|
||||
self._event_storage.stop()
|
||||
log.info("The gateway has been stopped.")
|
||||
|
||||
Reference in New Issue
Block a user