mirror of
https://github.com/thingsboard/thingsboard-gateway
synced 2025-10-26 22:31:42 +08:00
Added statistics collecting in GRPC connector
This commit is contained in:
@@ -14,13 +14,17 @@
|
||||
|
||||
|
||||
from thingsboard_gateway.connectors.connector import Connector
|
||||
from thingsboard_gateway.gateway.constant_enums import DownlinkMessageType
|
||||
from thingsboard_gateway.gateway.grpc_service.tb_grpc_manager import TBGRPCServerManager
|
||||
from thingsboard_gateway.gateway.grpc_service.grpc_downlink_converter import GrpcDownlinkConverter
|
||||
|
||||
|
||||
class GrpcConnector(Connector):
|
||||
def __init__(self, gateway, config, tb_grpc_server_manager: TBGRPCServerManager):
|
||||
def __init__(self, gateway, config, tb_grpc_server_manager: TBGRPCServerManager, session_id):
|
||||
self.name = None
|
||||
self.__server_manager = tb_grpc_server_manager
|
||||
self.__session_id = session_id
|
||||
self.__downlink_converter = GrpcDownlinkConverter()
|
||||
|
||||
def setName(self, name):
|
||||
self.name = name
|
||||
@@ -29,8 +33,9 @@ class GrpcConnector(Connector):
|
||||
pass
|
||||
|
||||
def close(self):
|
||||
# send unregister
|
||||
pass
|
||||
converter_config = {"message_type": DownlinkMessageType.UnregisterConnectorMsg}
|
||||
message_to_connector = self.__downlink_converter.convert(converter_config, "")
|
||||
self.__server_manager.write(self.name, message_to_connector, self.__session_id)
|
||||
|
||||
def get_name(self):
|
||||
return self.name
|
||||
@@ -39,9 +44,15 @@ class GrpcConnector(Connector):
|
||||
pass
|
||||
|
||||
def on_attributes_update(self, content):
|
||||
# send updated
|
||||
pass
|
||||
converter_config = {"message_type": DownlinkMessageType.GatewayAttributeUpdateNotificationMsg}
|
||||
message_to_connector = self.__downlink_converter.convert(converter_config, content)
|
||||
self.__server_manager.write(self.name, message_to_connector, self.__session_id)
|
||||
|
||||
def server_side_rpc_handler(self, content):
|
||||
# send command
|
||||
pass
|
||||
converter_config = {"message_type": DownlinkMessageType.GatewayDeviceRpcRequestMsg}
|
||||
message_to_connector = self.__downlink_converter.convert(converter_config, content)
|
||||
self.__server_manager.write(self.name, message_to_connector, self.__session_id)
|
||||
|
||||
@property
|
||||
def statistics(self):
|
||||
return self.__server_manager.get_connector_statistics(self.__session_id)
|
||||
|
||||
@@ -11,6 +11,10 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from time import time
|
||||
from typing import Union
|
||||
|
||||
from simplejson import dumps
|
||||
|
||||
from thingsboard_gateway.connectors.converter import Converter, log
|
||||
from thingsboard_gateway.gateway.constant_enums import DownlinkMessageType
|
||||
@@ -59,7 +63,19 @@ class GrpcDownlinkConverter(Converter):
|
||||
|
||||
@staticmethod
|
||||
def __convert_gateway_attribute_update_notification_msg(basic_msg, msg, additional_data=None):
|
||||
pass
|
||||
ts = int(time()*1000)
|
||||
gw_attr_upd_notify_msg = GatewayAttributeUpdateNotificationMsg()
|
||||
gw_attr_upd_notify_msg.deviceName = msg['device']
|
||||
attr_notify_msg = AttributeUpdateNotificationMsg()
|
||||
for shared_attribute in msg['data']:
|
||||
ts_kv_proto = TsKvProto()
|
||||
ts_kv_proto.ts = ts
|
||||
kv = GrpcDownlinkConverter.__get_key_value_proto_value(shared_attribute, msg['data'][shared_attribute])
|
||||
ts_kv_proto.kv.MergeFrom(kv)
|
||||
attr_notify_msg.sharedUpdated.extend([ts_kv_proto])
|
||||
gw_attr_upd_notify_msg.notificationMsg.MergeFrom(attr_notify_msg)
|
||||
basic_msg.gatewayAttributeUpdateNotificationMsg.MergeFrom(gw_attr_upd_notify_msg)
|
||||
return basic_msg
|
||||
|
||||
@staticmethod
|
||||
def __convert_gateway_attribute_response_msg(basic_msg, msg, additional_data=None):
|
||||
@@ -67,7 +83,19 @@ class GrpcDownlinkConverter(Converter):
|
||||
|
||||
@staticmethod
|
||||
def __convert_gateway_device_rpc_request_msg(basic_msg, msg, additional_data=None):
|
||||
pass
|
||||
msg_data = msg['data']
|
||||
gw_to_device_rpc = GatewayDeviceRpcRequestMsg()
|
||||
gw_to_device_rpc.deviceName = msg['device']
|
||||
rpc_request_msg = ToDeviceRpcRequestMsg()
|
||||
rpc_request_msg.requestId = msg_data['id']
|
||||
rpc_request_msg.methodName = msg_data['method']
|
||||
if isinstance(msg_data['params'], dict):
|
||||
rpc_request_msg.params = dumps(msg_data['params'])
|
||||
else:
|
||||
rpc_request_msg.params = str(msg_data['params'])
|
||||
gw_to_device_rpc.rpcRequestMsg.MergeFrom(rpc_request_msg)
|
||||
basic_msg.gatewayDeviceRpcRequestMsg.MergeFrom(gw_to_device_rpc)
|
||||
return basic_msg
|
||||
|
||||
@staticmethod
|
||||
def __convert_unregister_connector_msg(basic_msg, msg, additional_data=None):
|
||||
@@ -76,3 +104,24 @@ class GrpcDownlinkConverter(Converter):
|
||||
unreg_msg = UnregisterConnectorMsg()
|
||||
unreg_msg.connectorKey = msg
|
||||
basic_msg.unregisterConnectorMsg.MergeFrom(unreg_msg)
|
||||
|
||||
@staticmethod
|
||||
def __get_key_value_proto_value(key: str, value: Union[str, bool, int, float, dict]) -> KeyValueProto:
|
||||
key_value_proto = KeyValueProto()
|
||||
key_value_proto.key = key
|
||||
if isinstance(value, bool):
|
||||
key_value_proto.type = KeyValueType.BOOLEAN
|
||||
key_value_proto.bool_v = value
|
||||
elif isinstance(value, int):
|
||||
key_value_proto.type = KeyValueType.LONG_V
|
||||
key_value_proto.long_v = value
|
||||
elif isinstance(value, float):
|
||||
key_value_proto.type = KeyValueType.DOUBLE_V
|
||||
key_value_proto.double_v = value
|
||||
elif isinstance(value, str):
|
||||
key_value_proto.type = KeyValueType.STRING_V
|
||||
key_value_proto.string_v = value
|
||||
elif isinstance(value, dict):
|
||||
key_value_proto.type = KeyValueType.JSON_V
|
||||
key_value_proto.json_v = dumps(value)
|
||||
return key_value_proto
|
||||
|
||||
@@ -37,6 +37,9 @@ class Status(Enum):
|
||||
SUCCESS = 3
|
||||
|
||||
|
||||
DEFAULT_STATISTICS_DICT = {"MessagesReceived": 0, "MessagesSent": 0}
|
||||
|
||||
|
||||
class TBGRPCServerManager(Thread):
|
||||
def __init__(self, gateway, config):
|
||||
super().__init__()
|
||||
@@ -88,27 +91,32 @@ class TBGRPCServerManager(Thread):
|
||||
data = self.__uplink_converter.convert(None, msg.gatewayTelemetryMsg)
|
||||
result_status = self.__gateway.send_to_storage(self.sessions[session_id]['name'], data)
|
||||
outgoing_message = True
|
||||
self.__increase_incoming_statistic(session_id)
|
||||
if msg.HasField("gatewayAttributesMsg"):
|
||||
data = self.__uplink_converter.convert(None, msg.gatewayAttributesMsg)
|
||||
result_status = self.__gateway.send_to_storage(self.sessions[session_id]['name'], data)
|
||||
outgoing_message = True
|
||||
self.__increase_incoming_statistic(session_id)
|
||||
if msg.HasField("gatewayClaimMsg"):
|
||||
message_for_conversion = msg.gatewayClaimMsg
|
||||
data = self.__uplink_converter.convert(None, message_for_conversion)
|
||||
result_status = self.__gateway.send_to_storage(self.sessions[session_id]['name'], data)
|
||||
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, result_status)
|
||||
self.__increase_incoming_statistic(session_id)
|
||||
if msg.HasField("connectMsg"):
|
||||
message_for_conversion = msg.connectMsg
|
||||
data = self.__uplink_converter.convert(None, message_for_conversion)
|
||||
data['name'] = self.sessions[session_id]['name']
|
||||
result_status = self.__gateway.add_device_async(data)
|
||||
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, result_status)
|
||||
self.__increase_incoming_statistic(session_id)
|
||||
if msg.HasField("disconnectMsg"):
|
||||
message_for_conversion = msg.disconnectMsg
|
||||
data = self.__uplink_converter.convert(None, message_for_conversion)
|
||||
data['name'] = self.sessions[session_id]['name']
|
||||
result_status = self.__gateway.del_device_async(data)
|
||||
outgoing_message = self.__downlink_converter.convert(downlink_converter_config, result_status)
|
||||
self.__increase_incoming_statistic(session_id)
|
||||
if msg.HasField("gatewayRpcResponseMsg"):
|
||||
pass
|
||||
if msg.HasField("gatewayAttributeRequestMsg"):
|
||||
@@ -122,12 +130,13 @@ class TBGRPCServerManager(Thread):
|
||||
except ValueError as e:
|
||||
log.error("Received unknown GRPC message!", e)
|
||||
|
||||
def write(self, connector_name, msg: FromServiceMessage):
|
||||
def write(self, connector_name, msg: FromServiceMessage, session_id=None):
|
||||
log.debug("[GRPC] outgoing message: %s", msg)
|
||||
session_id = self.__connectors_sessions.get(connector_name)
|
||||
|
||||
if session_id is None:
|
||||
session_id = self.__connectors_sessions.get(connector_name)
|
||||
if session_id is not None:
|
||||
self.__grpc_server.write(session_id, msg)
|
||||
self.__increase_outgoing_statistic(session_id)
|
||||
else:
|
||||
log.warning("Cannot write to connector with name %s, session is not found. Client is not registered!", connector_name)
|
||||
|
||||
@@ -136,7 +145,7 @@ class TBGRPCServerManager(Thread):
|
||||
additional_message.registerConnectorMsg.MergeFrom(RegisterConnectorMsg())
|
||||
if registration_result == Status.SUCCESS:
|
||||
connector_name = connector_configuration['name']
|
||||
self.sessions[session_id] = {"config": connector_configuration, "name": connector_name}
|
||||
self.sessions[session_id] = {"config": connector_configuration, "name": connector_name, "statistics": DEFAULT_STATISTICS_DICT}
|
||||
self.__connectors_sessions[connector_name] = session_id
|
||||
msg = self.__grpc_server.get_response("SUCCESS", additional_message)
|
||||
configuration_msg = ConnectorConfigurationMsg()
|
||||
@@ -168,6 +177,20 @@ class TBGRPCServerManager(Thread):
|
||||
msg = self.__grpc_server.get_response("FAILURE", additional_message)
|
||||
self.__grpc_server.write(session_id, msg)
|
||||
|
||||
def get_connector_statistics(self, session_id):
|
||||
if session_id in self.sessions:
|
||||
return self.sessions[session_id].get('statistics', DEFAULT_STATISTICS_DICT)
|
||||
else:
|
||||
return DEFAULT_STATISTICS_DICT
|
||||
|
||||
def __increase_incoming_statistic(self, session_id):
|
||||
if session_id in self.sessions:
|
||||
self.sessions[session_id]['statistics']["MessagesReceived"] += 1
|
||||
|
||||
def __increase_outgoing_statistic(self, session_id):
|
||||
if session_id in self.sessions:
|
||||
self.sessions[session_id]['statistics']["MessagesSent"] += 1
|
||||
|
||||
async def serve(self, config):
|
||||
self.__aio_server = grpc.aio.server(
|
||||
options=(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from queue import SimpleQueue
|
||||
from threading import RLock, Thread
|
||||
from time import sleep
|
||||
|
||||
import thingsboard_gateway.gateway.proto.messages_pb2_grpc as messages_pb2_grpc
|
||||
from thingsboard_gateway.gateway.proto.messages_pb2 import *
|
||||
@@ -40,6 +41,8 @@ class TBGRPCServer(messages_pb2_grpc.TBGatewayProtoServiceServicer):
|
||||
if not self.__read_queue.empty():
|
||||
context, request = self.__read_queue.get()
|
||||
self._read_callback(context, request)
|
||||
else:
|
||||
sleep(.2)
|
||||
|
||||
@staticmethod
|
||||
def get_response(status, connector_message):
|
||||
|
||||
@@ -24,7 +24,6 @@ from threading import RLock, Thread
|
||||
from time import sleep, time
|
||||
|
||||
from simplejson import JSONDecodeError, dumps, load, loads
|
||||
from thingsboard_gateway.gateway.constants import CONNECTED_DEVICES_FILENAME
|
||||
from yaml import safe_load
|
||||
|
||||
from thingsboard_gateway.gateway.constant_enums import DeviceActions
|
||||
@@ -335,7 +334,7 @@ class TBGatewayService:
|
||||
def __register_connector(self, session_id, connector_key):
|
||||
if self.__grpc_connectors.get(connector_key) is not None and self.__grpc_connectors[connector_key]['name'] not in self.available_connectors:
|
||||
target_connector = self.__grpc_connectors.get(connector_key)
|
||||
connector = GrpcConnector(self, target_connector['config'], self.__grpc_manager)
|
||||
connector = GrpcConnector(self, target_connector['config'], self.__grpc_manager, session_id)
|
||||
connector.setName(target_connector['name'])
|
||||
self.available_connectors[connector.get_name()] = connector
|
||||
self.__grpc_manager.registration_finished(Status.SUCCESS, session_id, target_connector)
|
||||
@@ -471,14 +470,11 @@ class TBGatewayService:
|
||||
data_array = event if isinstance(event, list) else [event]
|
||||
for data in data_array:
|
||||
if not connector_name == self.name:
|
||||
device_type_is_required = True
|
||||
if data.get('deviceType') is None and data['deviceName'] in self.get_devices():
|
||||
device_type_is_required = False
|
||||
if 'telemetry' not in data:
|
||||
data['telemetry'] = []
|
||||
if 'attributes' not in data:
|
||||
data['attributes'] = []
|
||||
if not TBUtility.validate_converted_data(data, device_type_is_required):
|
||||
if not TBUtility.validate_converted_data(data):
|
||||
log.error("Data from %s connector is invalid.", connector_name)
|
||||
continue
|
||||
if data["deviceName"] not in self.get_devices() and self.tb_client.is_connected():
|
||||
|
||||
Reference in New Issue
Block a user