1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Fixed calculating data size and optimized MQTT Connector

This commit is contained in:
samson0v
2022-07-22 13:17:47 +03:00
parent 5a1ef00005
commit c0d9c388c2
2 changed files with 200 additions and 180 deletions

View File

@@ -137,6 +137,10 @@ class MqttConnector(Connector, Thread):
self.__max_msg_number_for_worker = config.get('maxMessageNumberPerWorker', 10)
self.__max_number_of_workers = config.get('maxNumberOfWorkers', 100)
self._on_message_queue = Queue()
self._on_message_thread = Thread(name='On Message', target=self._process_on_message, daemon=True)
self._on_message_thread.start()
def load_handlers(self, handler_flavor, mandatory_keys, accepted_handlers_list):
if handler_flavor not in self.config:
self.__log.error("'%s' section missing from configuration", handler_flavor)
@@ -372,195 +376,211 @@ class MqttConnector(Connector, Thread):
self.__workers_thread_pool.remove(worker)
def _on_message(self, client, userdata, message):
self.statistics['MessagesReceived'] += 1
content = TBUtility.decode(message)
self._on_message_queue.put((client, userdata, message))
# Check if message topic exists in mappings "i.e., I'm posting telemetry/attributes" ---------------------------
topic_handlers = [regex for regex in self.__mapping_sub_topics if fullmatch(regex, message.topic)]
def _process_on_message(self):
while not self.__stopped:
if not self._on_message_queue.empty():
client, userdata, message = self._on_message_queue.get()
if topic_handlers:
# Note: every topic may be associated to one or more converter. This means that a single MQTT message
# may produce more than one message towards ThingsBoard. This also means that I cannot return after
# the first successful conversion: I got to use all the available ones.
# I will use a flag to understand whether at least one converter succeeded
request_handled = False
self.statistics['MessagesReceived'] += 1
content = TBUtility.decode(message)
for topic in topic_handlers:
available_converters = self.__mapping_sub_topics[topic]
for converter in available_converters:
try:
if isinstance(content, list):
for item in content:
request_handled = self.put_data_to_convert(converter, message, item)
if not request_handled:
self.__log.error(
'Cannot find converter for the topic:"%s"! Client: %s, User data: %s',
message.topic,
str(client),
str(userdata))
# Check if message topic exists in mappings "i.e., I'm posting telemetry/attributes" -------------------
topic_handlers = [regex for regex in self.__mapping_sub_topics if fullmatch(regex, message.topic)]
if topic_handlers:
# Note: every topic may be associated to one or more converter.
# This means that a single MQTT message
# may produce more than one message towards ThingsBoard. This also means that I cannot return after
# the first successful conversion: I got to use all the available ones.
# I will use a flag to understand whether at least one converter succeeded
request_handled = False
for topic in topic_handlers:
available_converters = self.__mapping_sub_topics[topic]
for converter in available_converters:
try:
if isinstance(content, list):
for item in content:
request_handled = self.put_data_to_convert(converter, message, item)
if not request_handled:
self.__log.error(
'Cannot find converter for the topic:"%s"! Client: %s, User data: %s',
message.topic,
str(client),
str(userdata))
else:
request_handled = self.put_data_to_convert(converter, message, content)
except Exception as e:
log.exception(e)
if not request_handled:
self.__log.error('Cannot find converter for the topic:"%s"! Client: %s, User data: %s',
message.topic,
str(client),
str(userdata))
# Note: if I'm in this branch, this was for sure a telemetry/attribute push message
# => Execution must end here both in case of failure and success
continue
# Check if message topic exists in connection handlers "i.e., I'm connecting a device" -----------------
topic_handlers = [regex for regex in self.__connect_requests_sub_topics if
fullmatch(regex, message.topic)]
if topic_handlers:
for topic in topic_handlers:
handler = self.__connect_requests_sub_topics[topic]
found_device_name = None
found_device_type = 'default'
# Get device name, either from topic or from content
if handler.get("deviceNameTopicExpression"):
device_name_match = search(handler["deviceNameTopicExpression"], message.topic)
if device_name_match is not None:
found_device_name = device_name_match.group(0)
elif handler.get("deviceNameJsonExpression"):
found_device_name = TBUtility.get_value(handler["deviceNameJsonExpression"], content)
# Get device type (if any), either from topic or from content
if handler.get("deviceTypeTopicExpression"):
device_type_match = search(handler["deviceTypeTopicExpression"], message.topic)
found_device_type = device_type_match.group(0) if device_type_match is not None else \
handler[
"deviceTypeTopicExpression"]
elif handler.get("deviceTypeJsonExpression"):
found_device_type = TBUtility.get_value(handler["deviceTypeJsonExpression"], content)
if found_device_name is None:
self.__log.error("Device name missing from connection request")
continue
# Note: device must be added even if it is already known locally: else ThingsBoard
# will not send RPCs and attribute updates
self.__log.info("Connecting device %s of type %s", found_device_name, found_device_type)
self.__gateway.add_device(found_device_name, {"connector": self}, device_type=found_device_type)
# Note: if I'm in this branch, this was for sure a connection message
# => Execution must end here both in case of failure and success
continue
# Check if message topic exists in disconnection handlers "i.e., I'm disconnecting a device" -----------
topic_handlers = [regex for regex in self.__disconnect_requests_sub_topics if
fullmatch(regex, message.topic)]
if topic_handlers:
for topic in topic_handlers:
handler = self.__disconnect_requests_sub_topics[topic]
found_device_name = None
found_device_type = 'default'
# Get device name, either from topic or from content
if handler.get("deviceNameTopicExpression"):
device_name_match = search(handler["deviceNameTopicExpression"], message.topic)
if device_name_match is not None:
found_device_name = device_name_match.group(0)
elif handler.get("deviceNameJsonExpression"):
found_device_name = TBUtility.get_value(handler["deviceNameJsonExpression"], content)
# Get device type (if any), either from topic or from content
if handler.get("deviceTypeTopicExpression"):
device_type_match = search(handler["deviceTypeTopicExpression"], message.topic)
if device_type_match is not None:
found_device_type = device_type_match.group(0)
elif handler.get("deviceTypeJsonExpression"):
found_device_type = TBUtility.get_value(handler["deviceTypeJsonExpression"], content)
if found_device_name is None:
self.__log.error("Device name missing from disconnection request")
continue
if found_device_name in self.__gateway.get_devices():
self.__log.info("Disconnecting device %s of type %s", found_device_name, found_device_type)
self.__gateway.del_device(found_device_name)
else:
request_handled = self.put_data_to_convert(converter, message, content)
self.__log.info("Device %s was not connected", found_device_name)
break
# Note: if I'm in this branch, this was for sure a disconnection message
# => Execution must end here both in case of failure and success
continue
# Check if message topic exists in attribute request handlers "i.e., I'm asking for a shared attribute"
topic_handlers = [regex for regex in self.__attribute_requests_sub_topics if
fullmatch(regex, message.topic)]
if topic_handlers:
try:
for topic in topic_handlers:
handler = self.__attribute_requests_sub_topics[topic]
found_device_name = None
found_attribute_names = None
# Get device name, either from topic or from content
if handler.get("deviceNameTopicExpression"):
device_name_match = search(handler["deviceNameTopicExpression"], message.topic)
if device_name_match is not None:
found_device_name = device_name_match.group(0)
elif handler.get("deviceNameJsonExpression"):
found_device_name = TBUtility.get_value(handler["deviceNameJsonExpression"], content)
# Get attribute name, either from topic or from content
if handler.get("attributeNameTopicExpression"):
attribute_name_match = search(handler["attributeNameTopicExpression"], message.topic)
if attribute_name_match is not None:
found_attribute_names = attribute_name_match.group(0)
elif handler.get("attributeNameJsonExpression"):
found_attribute_names = list(filter(lambda x: x is not None,
TBUtility.get_values(
handler["attributeNameJsonExpression"],
content)))
if found_device_name is None:
self.__log.error("Device name missing from attribute request")
continue
if found_attribute_names is None:
self.__log.error("Attribute name missing from attribute request")
continue
self.__log.info("Will retrieve attribute %s of %s", found_attribute_names,
found_device_name)
self.__gateway.tb_client.client.gw_request_shared_attributes(
found_device_name,
found_attribute_names,
lambda data, *args: self.notify_attribute(
data,
found_attribute_names,
handler.get("topicExpression"),
handler.get("valueExpression"),
handler.get('retain', False)))
break
except Exception as e:
log.exception(e)
if not request_handled:
self.__log.error('Cannot find converter for the topic:"%s"! Client: %s, User data: %s',
# Note: if I'm in this branch, this was for sure an attribute request message
# => Execution must end here both in case of failure and success
continue
# Check if message topic exists in RPC handlers --------------------------------------------------------
# The gateway is expecting for this message => no wildcards here, the topic must be evaluated as is
if self.__gateway.is_rpc_in_progress(message.topic):
log.info("RPC response arrived. Forwarding it to thingsboard.")
self.__gateway.rpc_with_reply_processing(message.topic, content)
continue
self.__log.debug("Received message to topic \"%s\" with unknown interpreter data: \n\n\"%s\"",
message.topic,
str(client),
str(userdata))
content)
# Note: if I'm in this branch, this was for sure a telemetry/attribute push message
# => Execution must end here both in case of failure and success
return None
# Check if message topic exists in connection handlers "i.e., I'm connecting a device" -------------------------
topic_handlers = [regex for regex in self.__connect_requests_sub_topics if fullmatch(regex, message.topic)]
if topic_handlers:
for topic in topic_handlers:
handler = self.__connect_requests_sub_topics[topic]
found_device_name = None
found_device_type = 'default'
# Get device name, either from topic or from content
if handler.get("deviceNameTopicExpression"):
device_name_match = search(handler["deviceNameTopicExpression"], message.topic)
if device_name_match is not None:
found_device_name = device_name_match.group(0)
elif handler.get("deviceNameJsonExpression"):
found_device_name = TBUtility.get_value(handler["deviceNameJsonExpression"], content)
# Get device type (if any), either from topic or from content
if handler.get("deviceTypeTopicExpression"):
device_type_match = search(handler["deviceTypeTopicExpression"], message.topic)
found_device_type = device_type_match.group(0) if device_type_match is not None else handler[
"deviceTypeTopicExpression"]
elif handler.get("deviceTypeJsonExpression"):
found_device_type = TBUtility.get_value(handler["deviceTypeJsonExpression"], content)
if found_device_name is None:
self.__log.error("Device name missing from connection request")
continue
# Note: device must be added even if it is already known locally: else ThingsBoard
# will not send RPCs and attribute updates
self.__log.info("Connecting device %s of type %s", found_device_name, found_device_type)
self.__gateway.add_device(found_device_name, {"connector": self}, device_type=found_device_type)
# Note: if I'm in this branch, this was for sure a connection message
# => Execution must end here both in case of failure and success
return None
# Check if message topic exists in disconnection handlers "i.e., I'm disconnecting a device" -------------------
topic_handlers = [regex for regex in self.__disconnect_requests_sub_topics if fullmatch(regex, message.topic)]
if topic_handlers:
for topic in topic_handlers:
handler = self.__disconnect_requests_sub_topics[topic]
found_device_name = None
found_device_type = 'default'
# Get device name, either from topic or from content
if handler.get("deviceNameTopicExpression"):
device_name_match = search(handler["deviceNameTopicExpression"], message.topic)
if device_name_match is not None:
found_device_name = device_name_match.group(0)
elif handler.get("deviceNameJsonExpression"):
found_device_name = TBUtility.get_value(handler["deviceNameJsonExpression"], content)
# Get device type (if any), either from topic or from content
if handler.get("deviceTypeTopicExpression"):
device_type_match = search(handler["deviceTypeTopicExpression"], message.topic)
if device_type_match is not None:
found_device_type = device_type_match.group(0)
elif handler.get("deviceTypeJsonExpression"):
found_device_type = TBUtility.get_value(handler["deviceTypeJsonExpression"], content)
if found_device_name is None:
self.__log.error("Device name missing from disconnection request")
continue
if found_device_name in self.__gateway.get_devices():
self.__log.info("Disconnecting device %s of type %s", found_device_name, found_device_type)
self.__gateway.del_device(found_device_name)
else:
self.__log.info("Device %s was not connected", found_device_name)
break
# Note: if I'm in this branch, this was for sure a disconnection message
# => Execution must end here both in case of failure and success
return None
# Check if message topic exists in attribute request handlers "i.e., I'm asking for a shared attribute" --------
topic_handlers = [regex for regex in self.__attribute_requests_sub_topics if fullmatch(regex, message.topic)]
if topic_handlers:
try:
for topic in topic_handlers:
handler = self.__attribute_requests_sub_topics[topic]
found_device_name = None
found_attribute_names = None
# Get device name, either from topic or from content
if handler.get("deviceNameTopicExpression"):
device_name_match = search(handler["deviceNameTopicExpression"], message.topic)
if device_name_match is not None:
found_device_name = device_name_match.group(0)
elif handler.get("deviceNameJsonExpression"):
found_device_name = TBUtility.get_value(handler["deviceNameJsonExpression"], content)
# Get attribute name, either from topic or from content
if handler.get("attributeNameTopicExpression"):
attribute_name_match = search(handler["attributeNameTopicExpression"], message.topic)
if attribute_name_match is not None:
found_attribute_names = attribute_name_match.group(0)
elif handler.get("attributeNameJsonExpression"):
found_attribute_names = list(filter(lambda x: x is not None,
TBUtility.get_values(handler["attributeNameJsonExpression"],
content)))
if found_device_name is None:
self.__log.error("Device name missing from attribute request")
continue
if found_attribute_names is None:
self.__log.error("Attribute name missing from attribute request")
continue
self.__log.info("Will retrieve attribute %s of %s", found_attribute_names, found_device_name)
self.__gateway.tb_client.client.gw_request_shared_attributes(
found_device_name,
found_attribute_names,
lambda data, *args: self.notify_attribute(
data,
found_attribute_names,
handler.get("topicExpression"),
handler.get("valueExpression"),
handler.get('retain', False)))
break
except Exception as e:
log.exception(e)
# Note: if I'm in this branch, this was for sure an attribute request message
# => Execution must end here both in case of failure and success
return None
# Check if message topic exists in RPC handlers ----------------------------------------------------------------
# The gateway is expecting for this message => no wildcards here, the topic must be evaluated as is
if self.__gateway.is_rpc_in_progress(message.topic):
log.info("RPC response arrived. Forwarding it to thingsboard.")
self.__gateway.rpc_with_reply_processing(message.topic, content)
return None
self.__log.debug("Received message to topic \"%s\" with unknown interpreter data: \n\n\"%s\"",
message.topic,
content)
sleep(.2)
def notify_attribute(self, incoming_data, attribute_name, topic_expression, value_expression, retain):
if incoming_data.get("device") is None or incoming_data.get("value", incoming_data.get('values')) is None:

View File

@@ -697,7 +697,7 @@ class TBGatewayService:
@staticmethod
def __get_data_size(data: dict):
return getsizeof(str(data))
return getsizeof(data)
@staticmethod
def __convert_telemetry_to_ts(data):