mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Changed processing from stream-stream model to unary-unary in order to improve performance, like it is described in Best GRPC practics for Python
This commit is contained in:
@@ -28,16 +28,3 @@ class DownlinkMessageType(Enum):
|
||||
GatewayAttributeResponseMsg = 3,
|
||||
GatewayDeviceRpcRequestMsg = 4,
|
||||
UnregisterConnectorMsg = 5
|
||||
|
||||
|
||||
class UplinkMessageType(Enum):
|
||||
Response = 0,
|
||||
GatewayTelemetryMsg = 1,
|
||||
GatewayAttributesMsg = 2,
|
||||
GatewayClaimMsg = 3,
|
||||
RegisterConnectorMsg = 4,
|
||||
UnregisterConnectorMsg = 5,
|
||||
ConnectMsg = 6,
|
||||
DisconnectMsg = 7,
|
||||
GatewayRpcResponseMsg = 8,
|
||||
GatewayAttributesRequestMsg = 9
|
||||
|
||||
@@ -18,7 +18,6 @@ from thingsboard_gateway.gateway.proto.messages_pb2 import *
|
||||
|
||||
|
||||
class GrpcDownlinkConverter(Converter):
|
||||
|
||||
def __init__(self):
|
||||
self.__conversion_methods = {
|
||||
DownlinkMessageType.Response: self.__convert_response_msg,
|
||||
@@ -32,37 +31,46 @@ class GrpcDownlinkConverter(Converter):
|
||||
def convert(self, config, msg):
|
||||
try:
|
||||
basic_msg = FromServiceMessage()
|
||||
if not isinstance(config, list):
|
||||
config = [config]
|
||||
for conf in config:
|
||||
self.__conversion_methods[conf](basic_msg, msg)
|
||||
message_types = config.get("message_type")
|
||||
additional_message = config.get("additional_message")
|
||||
if not isinstance(message_types, list):
|
||||
message_types = [message_types]
|
||||
for message_type in message_types:
|
||||
self.__conversion_methods[message_type](basic_msg, msg, additional_message)
|
||||
return basic_msg
|
||||
except Exception as e:
|
||||
log.exception("[GRPC] ", e)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def __convert_response_msg(basic_msg, msg):
|
||||
def __convert_response_msg(basic_msg, msg, additional_message):
|
||||
if additional_message is not None:
|
||||
if additional_message.HasField('gatewayTelemetryMsg'):
|
||||
additional_message.gatewayTelemetryMsg.MergeFrom(GatewayTelemetryMsg())
|
||||
elif additional_message.HasField("gatewayAttributesMsg"):
|
||||
additional_message.gatewayTelemetryMsg.MergeFrom(GatewayTelemetryMsg())
|
||||
else:
|
||||
basic_msg.response.connectorMessage.MergeFrom(additional_message)
|
||||
basic_msg.response.status = ResponseStatus.Value(msg.name)
|
||||
|
||||
@staticmethod
|
||||
def __convert_connector_configuration_msg(basic_msg, msg):
|
||||
def __convert_connector_configuration_msg(basic_msg, msg, additional_data=None):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def __convert_gateway_attribute_update_notification_msg(basic_msg, msg):
|
||||
def __convert_gateway_attribute_update_notification_msg(basic_msg, msg, additional_data=None):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def __convert_gateway_attribute_response_msg(basic_msg, msg):
|
||||
def __convert_gateway_attribute_response_msg(basic_msg, msg, additional_data=None):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def __convert_gateway_device_rpc_request_msg(basic_msg, msg):
|
||||
def __convert_gateway_device_rpc_request_msg(basic_msg, msg, additional_data=None):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def __convert_unregister_connector_msg(basic_msg, msg):
|
||||
def __convert_unregister_connector_msg(basic_msg, msg, additional_data=None):
|
||||
if msg is None:
|
||||
msg = b''
|
||||
unreg_msg = UnregisterConnectorMsg()
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
# limitations under the License.
|
||||
|
||||
from thingsboard_gateway.connectors.converter import Converter, log
|
||||
from thingsboard_gateway.gateway.constant_enums import UplinkMessageType
|
||||
from thingsboard_gateway.gateway.proto.messages_pb2 import ConnectMsg, DisconnectMsg, GatewayAttributesMsg, GatewayAttributesRequestMsg, GatewayClaimMsg, \
|
||||
GatewayRpcResponseMsg, GatewayTelemetryMsg, KeyValueProto, KeyValueType, Response
|
||||
|
||||
@@ -21,22 +20,22 @@ from thingsboard_gateway.gateway.proto.messages_pb2 import ConnectMsg, Disconnec
|
||||
class GrpcUplinkConverter(Converter):
|
||||
def __init__(self):
|
||||
self.__conversion_methods = {
|
||||
UplinkMessageType.Response: self.__convert_response_msg,
|
||||
UplinkMessageType.GatewayTelemetryMsg: self.__convert_gateway_telemetry_msg,
|
||||
UplinkMessageType.GatewayAttributesMsg: self.__convert_gateway_attributes_msg,
|
||||
UplinkMessageType.GatewayClaimMsg: self.__convert_gateway_claim_msg,
|
||||
UplinkMessageType.ConnectMsg: self.__convert_connect_msg,
|
||||
UplinkMessageType.DisconnectMsg: self.__convert_disconnect_msg,
|
||||
UplinkMessageType.GatewayRpcResponseMsg: self.__convert_gateway_rpc_response_msg,
|
||||
UplinkMessageType.GatewayAttributesRequestMsg: self.__convert_gateway_attributes_request_msg
|
||||
Response.DESCRIPTOR: self.__convert_response_msg,
|
||||
GatewayTelemetryMsg.DESCRIPTOR: self.__convert_gateway_telemetry_msg,
|
||||
GatewayAttributesMsg.DESCRIPTOR: self.__convert_gateway_attributes_msg,
|
||||
GatewayClaimMsg.DESCRIPTOR: self.__convert_gateway_claim_msg,
|
||||
ConnectMsg.DESCRIPTOR: self.__convert_connect_msg,
|
||||
DisconnectMsg.DESCRIPTOR: self.__convert_disconnect_msg,
|
||||
GatewayRpcResponseMsg.DESCRIPTOR: self.__convert_gateway_rpc_response_msg,
|
||||
GatewayAttributesRequestMsg.DESCRIPTOR: self.__convert_gateway_attributes_request_msg
|
||||
}
|
||||
|
||||
def convert(self, config, data):
|
||||
try:
|
||||
if self.__conversion_methods.get(config) is not None:
|
||||
return self.__conversion_methods[config](data)
|
||||
if self.__conversion_methods.get(data.DESCRIPTOR) is not None:
|
||||
return self.__conversion_methods[data.DESCRIPTOR](data)
|
||||
else:
|
||||
log.error("[GRPC] unknown uplink message type: %r", config)
|
||||
log.error("[GRPC] unknown uplink message descriptor: %r", data.DESCRIPTOR)
|
||||
return {}
|
||||
except Exception as e:
|
||||
log.exception("[GRPC] ", e)
|
||||
|
||||
@@ -21,7 +21,7 @@ from time import sleep
|
||||
import grpc
|
||||
from simplejson import dumps
|
||||
|
||||
from thingsboard_gateway.gateway.constant_enums import DownlinkMessageType, UplinkMessageType
|
||||
from thingsboard_gateway.gateway.constant_enums import DownlinkMessageType
|
||||
from thingsboard_gateway.gateway.grpc_service.grpc_downlink_converter import GrpcDownlinkConverter
|
||||
from thingsboard_gateway.gateway.grpc_service.grpc_uplink_converter import GrpcUplinkConverter
|
||||
from thingsboard_gateway.gateway.grpc_service.tb_grpc_server import TBGRPCServer
|
||||
@@ -63,102 +63,110 @@ class TBGRPCServerManager(Thread):
|
||||
while not self._stopped:
|
||||
sleep(.01)
|
||||
|
||||
def incoming_messages_cb(self, context, msg: FromConnectorMessage):
|
||||
log.debug("Connected client with peer address: %s", context.peer())
|
||||
if context.peer() not in self.sessions:
|
||||
self.sessions[context.peer()] = {"context": context}
|
||||
else:
|
||||
log.debug("Existing client context is: %s", self.sessions[context.peer()])
|
||||
def incoming_messages_cb(self, session_id, msg: FromConnectorMessage):
|
||||
log.debug("Connected client with identifier: %s", session_id)
|
||||
# if session_id not in self.sessions:
|
||||
# self.sessions[session_id] = {"context": context}
|
||||
# else:
|
||||
# log.debug("Existing client context is: %s", self.sessions[session_id])
|
||||
# self.sessions[session_id]["context"] = context
|
||||
log.debug("[GRPC] incoming message: %s", msg)
|
||||
try:
|
||||
outgoing_message = None
|
||||
downlink_converter_config = {"message_type": [DownlinkMessageType.Response], "additional_message": msg}
|
||||
if msg.HasField("registerConnectorMsg"):
|
||||
self.__register_connector(context, msg.registerConnectorMsg.connectorKey)
|
||||
self.__register_connector(session_id, msg.registerConnectorMsg.connectorKey)
|
||||
outgoing_message = True
|
||||
elif msg.HasField("unregisterConnectorMsg"):
|
||||
self.__unregister_connector(context, msg.unregisterConnectorMsg.connectorKey)
|
||||
self.__unregister_connector(session_id, msg.unregisterConnectorMsg.connectorKey)
|
||||
outgoing_message = True
|
||||
elif self.sessions[context.peer()].get('name') is not None:
|
||||
elif self.sessions.get(session_id) is not None and self.sessions[session_id].get('name') is not None:
|
||||
if msg.HasField("response"):
|
||||
pass
|
||||
if msg.response.ByteSize() == 0:
|
||||
outgoing_message = True
|
||||
if msg.HasField("gatewayTelemetryMsg"):
|
||||
data = self.__uplink_converter.convert(UplinkMessageType.GatewayTelemetryMsg, msg.gatewayTelemetryMsg)
|
||||
result_status = self.__gateway.send_to_storage(self.sessions[context.peer()]['name'], data)
|
||||
outgoing_message = self.__downlink_converter.convert([DownlinkMessageType.Response], result_status)
|
||||
data = self.__uplink_converter.convert(None, msg.gatewayTelemetryMsg)
|
||||
result_status = self.__gateway.send_to_storage(self.sessions[session_id]['name'], data)
|
||||
outgoing_message = True
|
||||
if msg.HasField("gatewayAttributesMsg"):
|
||||
data = self.__uplink_converter.convert(UplinkMessageType.GatewayAttributesMsg, msg.gatewayAttributesMsg)
|
||||
result_status = self.__gateway.send_to_storage(self.sessions[context.peer()]['name'], data)
|
||||
outgoing_message = self.__downlink_converter.convert([DownlinkMessageType.Response], result_status)
|
||||
data = self.__uplink_converter.convert(None, msg.gatewayAttributesMsg)
|
||||
result_status = self.__gateway.send_to_storage(self.sessions[session_id]['name'], data)
|
||||
outgoing_message = True
|
||||
if msg.HasField("gatewayClaimMsg"):
|
||||
data = self.__uplink_converter.convert(UplinkMessageType.GatewayClaimMsg, msg.gatewayAttributesMsg)
|
||||
result_status = self.__gateway.send_to_storage(self.sessions[context.peer()]['name'], data)
|
||||
outgoing_message = self.__downlink_converter.convert([DownlinkMessageType.Response], result_status)
|
||||
message_for_conversion = msg.gatewayClaimMsg
|
||||
data = self.__uplink_converter.convert(None, message_for_conversion)
|
||||
result_status = self.__gateway.send_to_storage(self.sessions[session_id]['name'], data)
|
||||
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, result_status)
|
||||
if msg.HasField("connectMsg"):
|
||||
data = self.__uplink_converter.convert(UplinkMessageType.ConnectMsg, msg.connectMsg)
|
||||
data['name'] = self.sessions[context.peer()]['name']
|
||||
message_for_conversion = msg.connectMsg
|
||||
data = self.__uplink_converter.convert(None, message_for_conversion)
|
||||
data['name'] = self.sessions[session_id]['name']
|
||||
result_status = self.__gateway.add_device_async(data)
|
||||
outgoing_message = self.__downlink_converter.convert([DownlinkMessageType.Response], result_status)
|
||||
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, result_status)
|
||||
if msg.HasField("disconnectMsg"):
|
||||
data = self.__uplink_converter.convert(UplinkMessageType.DisconnectMsg, msg.connectMsg)
|
||||
data['name'] = self.sessions[context.peer()]['name']
|
||||
message_for_conversion = msg.disconnectMsg
|
||||
data = self.__uplink_converter.convert(None, message_for_conversion)
|
||||
data['name'] = self.sessions[session_id]['name']
|
||||
result_status = self.__gateway.del_device_async(data)
|
||||
outgoing_message = self.__downlink_converter.convert([DownlinkMessageType.Response], result_status)
|
||||
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, result_status)
|
||||
if msg.HasField("gatewayRpcResponseMsg"):
|
||||
pass
|
||||
if msg.HasField("gatewayAttributeRequestMsg"):
|
||||
pass
|
||||
else:
|
||||
outgoing_message = self.__downlink_converter.convert([DownlinkMessageType.UnregisterConnectorMsg], None)
|
||||
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, Status.FAILURE)
|
||||
if outgoing_message is None:
|
||||
log.debug("Cannot convert outgoing message!")
|
||||
elif isinstance(outgoing_message, FromServiceMessage):
|
||||
self.__grpc_server.write(context, outgoing_message)
|
||||
self.__grpc_server.write(session_id, outgoing_message)
|
||||
except ValueError as e:
|
||||
log.error("Received unknown GRPC message!", e)
|
||||
|
||||
def write(self, connector_name, msg: FromServiceMessage):
|
||||
log.debug("[GRPC] outgoing message: %s", msg)
|
||||
grpc_client_peer = self.__connectors_sessions.get(connector_name)
|
||||
session_id = self.__connectors_sessions.get(connector_name)
|
||||
|
||||
if grpc_client_peer is not None:
|
||||
grpc_session = self.sessions[grpc_client_peer]
|
||||
self.__grpc_server.write(grpc_session['context'], msg)
|
||||
if session_id is not None:
|
||||
self.__grpc_server.write(session_id, msg)
|
||||
else:
|
||||
log.warning("Cannot write to connector with name %s, session is not found. Client is not registered!", connector_name)
|
||||
|
||||
def registration_finished(self, registration_result: Status, context, connector_configuration):
|
||||
def registration_finished(self, registration_result: Status, session_id, connector_configuration):
|
||||
additional_message = FromConnectorMessage()
|
||||
additional_message.registerConnectorMsg.MergeFrom(RegisterConnectorMsg())
|
||||
if registration_result == Status.SUCCESS:
|
||||
connector_name = connector_configuration['name']
|
||||
self.sessions[context.peer()].update({"config": connector_configuration,
|
||||
"name": connector_name})
|
||||
self.__connectors_sessions[connector_name] = context.peer()
|
||||
msg = self.__grpc_server.get_response("SUCCESS")
|
||||
self.sessions[session_id] = {"config": connector_configuration, "name": connector_name}
|
||||
self.__connectors_sessions[connector_name] = session_id
|
||||
msg = self.__grpc_server.get_response("SUCCESS", additional_message)
|
||||
configuration_msg = ConnectorConfigurationMsg()
|
||||
configuration_msg.connectorName = connector_name
|
||||
configuration_msg.configuration = dumps(connector_configuration['config'])
|
||||
msg.connectorConfigurationMsg.MergeFrom(configuration_msg)
|
||||
self.__grpc_server.write(context, msg)
|
||||
self.__grpc_server.write(session_id, msg)
|
||||
log.debug('Connector "%s" configuration sent!', connector_name)
|
||||
elif registration_result == Status.NOT_FOUND:
|
||||
msg = self.__grpc_server.get_response("NOT_FOUND")
|
||||
self.__grpc_server.write(context, msg)
|
||||
msg = self.__grpc_server.get_response("NOT_FOUND", additional_message)
|
||||
self.__grpc_server.write(session_id, msg)
|
||||
elif registration_result == Status.FAILURE:
|
||||
msg = self.__grpc_server.get_response("FAILURE")
|
||||
self.__grpc_server.write(context, msg)
|
||||
msg = self.__grpc_server.get_response("FAILURE", additional_message)
|
||||
self.__grpc_server.write(session_id, msg)
|
||||
|
||||
def unregister(self, unregistration_result: Status, context, connector):
|
||||
def unregister(self, unregistration_result: Status, session_id, connector):
|
||||
additional_message = FromConnectorMessage()
|
||||
additional_message.unregisterConnectorMsg.MergeFrom(UnregisterConnectorMsg())
|
||||
if unregistration_result == Status.SUCCESS:
|
||||
connector_name = connector.get_name()
|
||||
grpc_client_peer = self.__connectors_sessions.pop(connector_name)
|
||||
connector_session = self.sessions.pop(grpc_client_peer)
|
||||
msg = self.__grpc_server.get_response("SUCCESS")
|
||||
self.__grpc_server.write(context, msg)
|
||||
connector_session_id = self.__connectors_sessions.pop(connector_name)
|
||||
del self.sessions[connector_session_id]
|
||||
msg = self.__grpc_server.get_response("SUCCESS", additional_message)
|
||||
self.__grpc_server.write(session_id, msg)
|
||||
elif unregistration_result == Status.NOT_FOUND:
|
||||
msg = self.__grpc_server.get_response("NOT_FOUND")
|
||||
self.__grpc_server.write(context, msg)
|
||||
msg = self.__grpc_server.get_response("NOT_FOUND", additional_message)
|
||||
self.__grpc_server.write(session_id, msg)
|
||||
elif unregistration_result == Status.FAILURE:
|
||||
msg = self.__grpc_server.get_response("FAILURE")
|
||||
self.__grpc_server.write(context, msg)
|
||||
msg = self.__grpc_server.get_response("FAILURE", additional_message)
|
||||
self.__grpc_server.write(session_id, msg)
|
||||
|
||||
async def serve(self, config):
|
||||
self.__aio_server = grpc.aio.server(
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import asyncio
|
||||
from threading import Thread
|
||||
from queue import SimpleQueue
|
||||
from threading import RLock, Thread
|
||||
|
||||
import thingsboard_gateway.gateway.proto.messages_pb2_grpc as messages_pb2_grpc
|
||||
from thingsboard_gateway.gateway.proto.messages_pb2 import *
|
||||
@@ -9,40 +8,53 @@ from thingsboard_gateway.gateway.proto.messages_pb2 import *
|
||||
class TBGRPCServer(messages_pb2_grpc.TBGatewayProtoServiceServicer):
|
||||
def __init__(self, read_callback):
|
||||
self._read_callback = read_callback
|
||||
self.__write_queue = SimpleQueue()
|
||||
self.__writing_queue_creation_lock = RLock()
|
||||
self.__writing_queues = {}
|
||||
self.__read_queue = SimpleQueue()
|
||||
self.__processing_thread = Thread(target=self.__processing_read, daemon=True, name="Read processing thread")
|
||||
self.__processing_thread.start()
|
||||
|
||||
def write(self, context, msg: FromServiceMessage):
|
||||
self.__write_queue.put(msg, True, 10)
|
||||
def write(self, session_id, msg: FromServiceMessage):
|
||||
if session_id not in self.__writing_queues:
|
||||
with self.__writing_queue_creation_lock:
|
||||
self.__writing_queues[session_id] = SimpleQueue()
|
||||
self.__writing_queues[session_id].put(msg, True, 10)
|
||||
|
||||
async def __read_task(self, context, request_iter):
|
||||
async for msg in request_iter:
|
||||
self._read_callback(context, msg)
|
||||
def stream(self, request, context):
|
||||
session_id = self.get_session_id(context)
|
||||
self.__read_queue.put((session_id, request))
|
||||
data_to_send = None
|
||||
if self.__writing_queues.get(session_id) is not None and not self.__writing_queues[session_id].empty():
|
||||
data_to_send = self.__writing_queues[session_id].get_nowait()
|
||||
if self.__writing_queues[session_id].qsize() == 0: # TODO Check writing and removing at the same time
|
||||
with self.__writing_queue_creation_lock:
|
||||
del self.__writing_queues[session_id]
|
||||
if data_to_send is None:
|
||||
basic_msg = FromServiceMessage()
|
||||
basic_msg.response.MergeFrom(Response())
|
||||
data_to_send = basic_msg
|
||||
return data_to_send
|
||||
|
||||
def __processing_read(self):
|
||||
while True:
|
||||
if not self.__read_queue.empty():
|
||||
context, request = self.__read_queue.get()
|
||||
self._read_callback(context, request)
|
||||
|
||||
@staticmethod
|
||||
async def __write_task(context, msg: FromServiceMessage):
|
||||
await context.write(msg)
|
||||
|
||||
async def stream(self,
|
||||
request_iterator,
|
||||
context,
|
||||
options=(),
|
||||
channel_credentials=None,
|
||||
call_credentials=None,
|
||||
insecure=False,
|
||||
compression=None,
|
||||
wait_for_ready=None,
|
||||
timeout=None,
|
||||
metadata=None):
|
||||
if not self.__write_queue.empty():
|
||||
data_to_send = self.__write_queue.get_nowait()
|
||||
write_task = asyncio.create_task(self.__write_task(context, data_to_send))
|
||||
await write_task
|
||||
else:
|
||||
read_task = asyncio.create_task(self.__read_task(context, request_iterator))
|
||||
await read_task
|
||||
|
||||
@staticmethod
|
||||
def get_response(name):
|
||||
def get_response(status, connector_message):
|
||||
msg = FromServiceMessage()
|
||||
msg.response.status = ResponseStatus.Value(name)
|
||||
if isinstance(status, str):
|
||||
msg.response.status = ResponseStatus.Value(status)
|
||||
else:
|
||||
msg.response.status = status
|
||||
if connector_message is not None:
|
||||
msg.response.connectorMessage.MergeFrom(connector_message)
|
||||
return msg
|
||||
|
||||
@staticmethod
|
||||
def get_session_id(context):
|
||||
client_metadata = context.invocation_metadata()
|
||||
for key, value in client_metadata:
|
||||
if key == "identifier":
|
||||
return value
|
||||
|
||||
@@ -5,10 +5,12 @@ package messages;
|
||||
|
||||
message Response {
|
||||
ResponseStatus status = 1;
|
||||
FromServiceMessage serviceMessage = 2;
|
||||
FromConnectorMessage connectorMessage = 3;
|
||||
}
|
||||
|
||||
service TBGatewayProtoService {
|
||||
rpc stream(stream FromConnectorMessage) returns (stream FromServiceMessage);
|
||||
rpc stream(FromConnectorMessage) returns (FromServiceMessage);
|
||||
}
|
||||
|
||||
message FromConnectorMessage {
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -14,7 +14,7 @@ class TBGatewayProtoServiceStub(object):
|
||||
Args:
|
||||
channel: A grpc.Channel.
|
||||
"""
|
||||
self.stream = channel.stream_stream(
|
||||
self.stream = channel.unary_unary(
|
||||
'/messages.TBGatewayProtoService/stream',
|
||||
request_serializer=messages__pb2.FromConnectorMessage.SerializeToString,
|
||||
response_deserializer=messages__pb2.FromServiceMessage.FromString,
|
||||
@@ -24,7 +24,7 @@ class TBGatewayProtoServiceStub(object):
|
||||
class TBGatewayProtoServiceServicer(object):
|
||||
"""Missing associated documentation comment in .proto file."""
|
||||
|
||||
def stream(self, request_iterator, context):
|
||||
def stream(self, request, context):
|
||||
"""Missing associated documentation comment in .proto file."""
|
||||
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
|
||||
context.set_details('Method not implemented!')
|
||||
@@ -33,7 +33,7 @@ class TBGatewayProtoServiceServicer(object):
|
||||
|
||||
def add_TBGatewayProtoServiceServicer_to_server(servicer, server):
|
||||
rpc_method_handlers = {
|
||||
'stream': grpc.stream_stream_rpc_method_handler(
|
||||
'stream': grpc.unary_unary_rpc_method_handler(
|
||||
servicer.stream,
|
||||
request_deserializer=messages__pb2.FromConnectorMessage.FromString,
|
||||
response_serializer=messages__pb2.FromServiceMessage.SerializeToString,
|
||||
@@ -49,7 +49,7 @@ class TBGatewayProtoService(object):
|
||||
"""Missing associated documentation comment in .proto file."""
|
||||
|
||||
@staticmethod
|
||||
def stream(request_iterator,
|
||||
def stream(request,
|
||||
target,
|
||||
options=(),
|
||||
channel_credentials=None,
|
||||
@@ -59,7 +59,7 @@ class TBGatewayProtoService(object):
|
||||
wait_for_ready=None,
|
||||
timeout=None,
|
||||
metadata=None):
|
||||
return grpc.experimental.stream_stream(request_iterator, target, '/messages.TBGatewayProtoService/stream',
|
||||
return grpc.experimental.unary_unary(request, target, '/messages.TBGatewayProtoService/stream',
|
||||
messages__pb2.FromConnectorMessage.SerializeToString,
|
||||
messages__pb2.FromServiceMessage.FromString,
|
||||
options, channel_credentials,
|
||||
|
||||
@@ -332,33 +332,33 @@ class TBGatewayService:
|
||||
def __check_shared_attributes(self):
|
||||
self.tb_client.client.request_attributes(callback=self._attributes_parse)
|
||||
|
||||
def __register_connector(self, context, connector_key):
|
||||
def __register_connector(self, session_id, connector_key):
|
||||
if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key]['name'] not in self.available_connectors:
|
||||
target_connector = self.__grpc_connectors.get(connector_key)
|
||||
connector = GrpcConnector(self, target_connector['config'], self.__grpc_manager)
|
||||
connector.setName(target_connector['name'])
|
||||
self.available_connectors[connector.get_name()] = connector
|
||||
self.__grpc_manager.registration_finished(Status.SUCCESS, context, target_connector)
|
||||
self.__grpc_manager.registration_finished(Status.SUCCESS, session_id, target_connector)
|
||||
log.info("GRPC connector with key %s registered with name %s", connector_key, connector.get_name())
|
||||
elif self.__grpc_connectors.get(connector_key) is not None:
|
||||
self.__grpc_manager.registration_finished(Status.FAILURE, context, None)
|
||||
self.__grpc_manager.registration_finished(Status.FAILURE, session_id, None)
|
||||
log.error("GRPC connector with key: %s - already registered!", connector_key)
|
||||
else:
|
||||
self.__grpc_manager.registration_finished(Status.NOT_FOUND, context, None)
|
||||
self.__grpc_manager.registration_finished(Status.NOT_FOUND, session_id, None)
|
||||
log.error("GRPC configuration for connector with key: %s - not found", connector_key)
|
||||
|
||||
def __unregister_connector(self, context, connector_key):
|
||||
def __unregister_connector(self, session_id, connector_key):
|
||||
if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key]['name'] in self.available_connectors:
|
||||
connector_name = self.__grpc_connectors[connector_key]['name']
|
||||
target_connector: GrpcConnector = self.available_connectors.pop(connector_name)
|
||||
target_connector.close()
|
||||
self.__grpc_manager.unregister(Status.SUCCESS, context, target_connector)
|
||||
self.__grpc_manager.unregister(Status.SUCCESS, session_id, target_connector)
|
||||
log.info("GRPC connector with key %s and name %s - unregistered", connector_key, target_connector.get_name())
|
||||
elif self.__grpc_connectors.get(connector_key) is not None:
|
||||
self.__grpc_manager.unregister(Status.NOT_FOUND, context, None)
|
||||
self.__grpc_manager.unregister(Status.NOT_FOUND, session_id, None)
|
||||
log.error("GRPC connector with key: %s - is not registered!", connector_key)
|
||||
else:
|
||||
self.__grpc_manager.unregister(Status.FAILURE, context, None)
|
||||
self.__grpc_manager.unregister(Status.FAILURE, session_id, None)
|
||||
log.error("GRPC configuration for connector with key: %s - not found in configuration and not registered", connector_key)
|
||||
|
||||
def _load_connectors(self):
|
||||
@@ -471,13 +471,14 @@ class TBGatewayService:
|
||||
data_array = event if isinstance(event, list) else [event]
|
||||
for data in data_array:
|
||||
if not connector_name == self.name:
|
||||
device_type_is_required = True
|
||||
if data.get('deviceType') is None and data['deviceName'] in self.get_devices():
|
||||
data['deviceType'] = self.get_devices()[data['deviceName']].get('device_type', "default")
|
||||
device_type_is_required = False
|
||||
if 'telemetry' not in data:
|
||||
data['telemetry'] = []
|
||||
if 'attributes' not in data:
|
||||
data['attributes'] = []
|
||||
if not TBUtility.validate_converted_data(data):
|
||||
if not TBUtility.validate_converted_data(data, device_type_is_required):
|
||||
log.error("Data from %s connector is invalid.", connector_name)
|
||||
continue
|
||||
if data["deviceName"] not in self.get_devices() and self.tb_client.is_connected():
|
||||
@@ -898,7 +899,7 @@ class TBGatewayService:
|
||||
log.debug("Loaded devices:\n %s", devices)
|
||||
for device_name in devices:
|
||||
try:
|
||||
if not isinstance(devices[device_name], tuple):
|
||||
if not isinstance(devices[device_name], list):
|
||||
open(self._config_dir + CONNECTED_DEVICES_FILENAME, 'w').close()
|
||||
log.debug("Old connected_devices file, new file will be created")
|
||||
return
|
||||
|
||||
@@ -38,11 +38,11 @@ class TBUtility:
|
||||
return content
|
||||
|
||||
@staticmethod
|
||||
def validate_converted_data(data):
|
||||
def validate_converted_data(data, device_type_is_required=True):
|
||||
error = None
|
||||
if error is None and not data.get("deviceName"):
|
||||
error = 'deviceName is empty in data: '
|
||||
if error is None and not data.get("deviceType"):
|
||||
if device_type_is_required and error is None and not data.get("deviceType"):
|
||||
error = 'deviceType is empty in data: '
|
||||
|
||||
if error is None:
|
||||
|
||||
Reference in New Issue
Block a user