From 4501b57713a8a47f226b76fefbe02139cb42d519 Mon Sep 17 00:00:00 2001 From: zbeacon Date: Fri, 6 Dec 2019 16:15:15 +0200 Subject: [PATCH] Some improvements for performence of mqtt connector, converter and storage. --- .../mqtt/json_mqtt_uplink_converter.py | 4 +- .../connectors/mqtt/mqtt_connector.py | 143 +++++++++--------- .../gateway/tb_gateway_service.py | 9 +- .../storage/event_storage_reader.py | 2 +- thingsboard_gateway/tb_utility/tb_utility.py | 48 +++--- 5 files changed, 111 insertions(+), 95 deletions(-) diff --git a/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py b/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py index e1bcc445..8c679993 100644 --- a/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py +++ b/thingsboard_gateway/connectors/mqtt/json_mqtt_uplink_converter.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from json import dumps +from simplejson import dumps from re import search +from time import time +from pprint import pformat from thingsboard_gateway.connectors.mqtt.mqtt_uplink_converter import MqttUplinkConverter, log from thingsboard_gateway.tb_utility.tb_utility import TBUtility diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py index 98a1831c..a09312b4 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py @@ -13,9 +13,10 @@ # limitations under the License. import time +import timeit import string import random -import re +from re import match, fullmatch, search import ssl from paho.mqtt.client import Client from thingsboard_gateway.connectors.connector import Connector, log @@ -28,6 +29,7 @@ from json import loads class MqttConnector(Connector, Thread): def __init__(self, gateway, config, connector_type): super().__init__() + self.__log = log self.__connector_type = connector_type self.statistics = {'MessagesReceived': 0, 'MessagesSent': 0} @@ -51,7 +53,7 @@ class MqttConnector(Connector, Thread): private_key = self.__broker["security"].get("privateKey") cert = self.__broker["security"].get("cert") if ca_cert is None or cert is None: - log.error("caCert and cert parameters must be in config if you need to use the SSL. Please add it and try again.") + self.__log.error("caCert and cert parameters must be in config if you need to use the SSL. Please add it and try again.") else: try: self._client.tls_set(ca_certs=ca_cert, @@ -61,7 +63,7 @@ class MqttConnector(Connector, Thread): tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) except Exception as e: - log.error("Cannot setup connection to broker %s using SSL. Please check your configuration.\nError: %s", + self.__log.error("Cannot setup connection to broker %s using SSL. Please check your configuration.\nError: %s", self.get_name(), e) self._client.tls_insecure_set(False) @@ -92,17 +94,16 @@ class MqttConnector(Connector, Thread): if not self._connected: time.sleep(10) except Exception as e: - log.error(e) + self.__log.error(e) time.sleep(10) except Exception as e: - log.error(e) + self.__log.error(e) try: self.close() except Exception as e: - log.debug(e) + self.__log.debug(e) while True: - time.sleep(.1) if self.__stopped: break @@ -110,7 +111,7 @@ class MqttConnector(Connector, Thread): self._client.loop_stop() self._client.disconnect() self.__stopped = True - log.info('%s has been stopped.', self.get_name()) + self.__log.info('%s has been stopped.', self.get_name()) def get_name(self): return self.name @@ -120,7 +121,7 @@ class MqttConnector(Connector, Thread): try: self.__subscribes_sent[message[1]] = topic except Exception as e: - log.exception(e) + self.__log.exception(e) def _on_connect(self, client, userdata, flags, rc, *extra_params): result_codes = { @@ -132,7 +133,7 @@ class MqttConnector(Connector, Thread): } if rc == 0: self._connected = True - log.info('%s connected to %s:%s - successfully.', + self.__log.info('%s connected to %s:%s - successfully.', self.get_name(), self.__broker["host"], self.__broker.get("port", "1883")) @@ -143,12 +144,12 @@ class MqttConnector(Connector, Thread): try: module = TBUtility.check_and_import(self.__connector_type, mapping["converter"]["extension"]) if module is not None: - log.debug('Custom converter for topic %s - found!', mapping["topicFilter"]) + self.__log.debug('Custom converter for topic %s - found!', mapping["topicFilter"]) converter = module(mapping) else: - log.error("\n\nCannot find extension module for %s topic.\n\Please check your configuration.\n", mapping["topicFilter"]) + self.__log.error("\n\nCannot find extension module for %s topic.\n\Please check your configuration.\n", mapping["topicFilter"]) except Exception as e: - log.exception(e) + self.__log.exception(e) else: converter = JsonMqttUplinkConverter(mapping) if converter is not None: @@ -159,43 +160,43 @@ class MqttConnector(Connector, Thread): self.__sub_topics[regex_topic].append({converter: None}) # self._client.subscribe(TBUtility.regex_to_topic(regex_topic)) self.__subscribe(mapping["topicFilter"]) - log.info('Connector "%s" subscribe to %s', + self.__log.info('Connector "%s" subscribe to %s', self.get_name(), TBUtility.regex_to_topic(regex_topic)) else: - log.error("Cannot find converter for %s topic", mapping["topicFilter"]) + self.__log.error("Cannot find converter for %s topic", mapping["topicFilter"]) except Exception as e: - log.exception(e) + self.__log.exception(e) try: for request in self.__service_config: if self.__service_config.get(request) is not None: for request_config in self.__service_config.get(request): self.__subscribe(request_config["topicFilter"]) except Exception as e: - log.error(e) + self.__log.error(e) else: if rc in result_codes: - log.error("%s connection FAIL with error %s %s!", self.get_name(), rc, result_codes[rc]) + self.__log.error("%s connection FAIL with error %s %s!", self.get_name(), rc, result_codes[rc]) else: - log.error("%s connection FAIL with unknown error!", self.get_name()) + self.__log.error("%s connection FAIL with unknown error!", self.get_name()) def _on_disconnect(self, *args): - log.debug('"%s" was disconnected.', self.get_name()) + self.__log.debug('"%s" was disconnected.', self.get_name()) def _on_log(self, *args): - log.debug(args) + self.__log.debug(args) def _on_subscribe(self, client, userdata, mid, granted_qos): try: if granted_qos[0] == 128: - log.error('"%s" subscription failed to topic %s subscription message id = %i', self.get_name(), self.__subscribes_sent.get(mid), mid) + self.__log.error('"%s" subscription failed to topic %s subscription message id = %i', self.get_name(), self.__subscribes_sent.get(mid), mid) else: - log.info('"%s" subscription success to topic %s, subscription message id = %i', self.get_name(), self.__subscribes_sent.get(mid), mid) + self.__log.info('"%s" subscription success to topic %s, subscription message id = %i', self.get_name(), self.__subscribes_sent.get(mid), mid) if self.__subscribes_sent.get is not None: del self.__subscribes_sent[mid] except Exception as e: - log.exception(e) + self.__log.exception(e) def __get_service_config(self, config): for service_config in self.__service_config: @@ -206,52 +207,58 @@ class MqttConnector(Connector, Thread): def _on_message(self, client, userdata, message): self.statistics['MessagesReceived'] += 1 + self.__log.error(self.statistics) content = self._decode(message) - regex_topic = [regex for regex in self.__sub_topics if re.fullmatch(regex, message.topic)] + regex_topic = [regex for regex in self.__sub_topics if fullmatch(regex, message.topic)] if regex_topic: - for regex in regex_topic: - if self.__sub_topics.get(regex): - for converter_value in range(len(self.__sub_topics.get(regex))): - if self.__sub_topics[regex][converter_value]: - for converter in self.__sub_topics.get(regex)[converter_value]: - converted_content = converter.convert(message.topic, content) - if converted_content and TBUtility.validate_converted_data(converted_content): - try: - self.__sub_topics[regex][converter_value][converter] = converted_content - except Exception as e: - log.exception(e) - if not self.__gateway.get_devices().get(converted_content["deviceName"]): - self.__gateway.add_device(converted_content["deviceName"], {"connector": None}) - self.__gateway.update_device(converted_content["deviceName"], "connector", self) - self.__gateway.send_to_storage(self.get_name(), converted_content) - self.statistics['MessagesSent'] += 1 - else: - continue - else: - log.error('Cannot find converter for topic:"%s"!', message.topic) + try: + for regex in regex_topic: + if self.__sub_topics.get(regex): + for converter_value in range(len(self.__sub_topics.get(regex))): + if self.__sub_topics[regex][converter_value]: + for converter in self.__sub_topics.get(regex)[converter_value]: + converted_content = converter.convert(message.topic, content) + if converted_content: # and TBUtility.validate_converted_data(converted_content): + try: + self.__sub_topics[regex][converter_value][converter] = converted_content + except Exception as e: + self.__log.exception(e) + if not self.__gateway.get_devices().get(converted_content["deviceName"]): + self.__gateway.add_device(converted_content["deviceName"], {"connector": None}) + self.__gateway.update_device(converted_content["deviceName"], "connector", self) + self.__gateway.send_to_storage(self.get_name(), converted_content) + self.statistics['MessagesSent'] += 1 + else: + continue + else: + self.__log.error('Cannot find converter for topic:"%s"!', message.topic) + return + except Exception as e: + log.exception(e) + return elif self.__service_config.get("connectRequests"): connect_requests = [connect_request for connect_request in self.__service_config.get("connectRequests")] if connect_requests: for request in connect_requests: if request.get("topicFilter"): if message.topic in request.get("topicFilter") or\ - (request.get("deviceNameTopicExpression") is not None and re.search(request.get("deviceNameTopicExpression"), message.topic)): + (request.get("deviceNameTopicExpression") is not None and search(request.get("deviceNameTopicExpression"), message.topic)): founded_device_name = None if request.get("deviceNameJsonExpression"): founded_device_name = TBUtility.get_value(request["deviceNameJsonExpression"], content) if request.get("deviceNameTopicExpression"): device_name_expression = request["deviceNameTopicExpression"] - founded_device_name = re.search(device_name_expression, message.topic) + founded_device_name = search(device_name_expression, message.topic) if founded_device_name is not None and founded_device_name not in self.__gateway.get_devices(): self.__gateway.add_device(founded_device_name, {"connector": self}) else: - log.error("Cannot find connect request for device from message from topic: %s and with data: %s", - message.topic, - content) + self.__log.error("Cannot find connect request for device from message from topic: %s and with data: %s", + message.topic, + content) else: - log.error("\"topicFilter\" in connect requests config not found.") + self.__log.error("\"topicFilter\" in connect requests config not found.") else: - log.error("Connection requests in config not found.") + self.__log.error("Connection requests in config not found.") elif self.__service_config.get("disconnectRequests") is not None: disconnect_requests = [disconnect_request for disconnect_request in self.__service_config.get("disconnectRequests")] @@ -259,27 +266,27 @@ class MqttConnector(Connector, Thread): for request in disconnect_requests: if request.get("topicFilter") is not None: if message.topic in request.get("topicFilter") or\ - (request.get("deviceNameTopicExpression") is not None and re.search(request.get("deviceNameTopicExpression"), message.topic)): + (request.get("deviceNameTopicExpression") is not None and search(request.get("deviceNameTopicExpression"), message.topic)): founded_device_name = None if request.get("deviceNameJsonExpression"): founded_device_name = TBUtility.get_value(request["deviceNameJsonExpression"], content) if request.get("deviceNameTopicExpression"): device_name_expression = request["deviceNameTopicExpression"] - founded_device_name = re.search(device_name_expression, message.topic) + founded_device_name = search(device_name_expression, message.topic) if founded_device_name is not None and founded_device_name in self.__gateway.get_devices(): self.__gateway.del_device(founded_device_name) else: - log.error("Cannot find connect request for device from message from topic: %s and with data: %s", + self.__log.error("Cannot find connect request for device from message from topic: %s and with data: %s", message.topic, content) else: - log.error("\"topicFilter\" in connect requests config not found.") + self.__log.error("\"topicFilter\" in connect requests config not found.") else: - log.error("Disconnection requests in config not found.") + self.__log.error("Disconnection requests in config not found.") elif message.topic in self.__gateway.rpc_requests_in_progress: self.__gateway.rpc_with_reply_processing(message.topic, content) else: - log.debug("Received message to topic \"%s\" with unknown interpreter data: \n\n\"%s\"", + self.__log.debug("Received message to topic \"%s\" with unknown interpreter data: \n\n\"%s\"", message.topic, content) @@ -287,7 +294,7 @@ class MqttConnector(Connector, Thread): attribute_updates_config = [update for update in self.__attribute_updates] if attribute_updates_config: for attribute_update in attribute_updates_config: - if re.match(attribute_update["deviceNameFilter"], content["device"]) and \ + if match(attribute_update["deviceNameFilter"], content["device"]) and \ content["data"].get(attribute_update["attributeFilter"]): topic = attribute_update["topicExpression"]\ .replace("${deviceName}", content["device"])\ @@ -299,22 +306,22 @@ class MqttConnector(Connector, Thread): .replace("${attributeKey}", attribute_update["attributeFilter"])\ .replace("${attributeValue}", content["data"][attribute_update["attributeFilter"]]) except Exception as e: - log.error(e) + self.__log.error(e) self._client.publish(topic, data).wait_for_publish() - log.debug("Attribute Update data: %s for device %s to topic: %s", + self.__log.debug("Attribute Update data: %s for device %s to topic: %s", data, content["device"], topic) else: - log.error("Not found deviceName by filter in message or attributeFilter in message with data: %s", + self.__log.error("Not found deviceName by filter in message or attributeFilter in message with data: %s", content) else: - log.error("Attribute updates config not found.") + self.__log.error("Attribute updates config not found.") def server_side_rpc_handler(self, content): for rpc_config in self.__server_side_rpc: - if re.search(rpc_config["deviceNameFilter"], content["device"]) \ - and re.search(rpc_config["methodFilter"], content["data"]["method"]) is not None: + if search(rpc_config["deviceNameFilter"], content["device"]) \ + and search(rpc_config["methodFilter"], content["data"]["method"]) is not None: # Subscribe to RPC response topic if rpc_config.get("responseTopicExpression"): topic_for_subscribe = rpc_config["responseTopicExpression"] \ @@ -331,7 +338,7 @@ class MqttConnector(Connector, Thread): # Maybe we need to wait for the command to execute successfully before publishing the request. self._client.subscribe(topic_for_subscribe) else: - log.error("Not found RPC response timeout in config, sending without waiting for response") + self.__log.error("Not found RPC response timeout in config, sending without waiting for response") # Publish RPC request if rpc_config.get("requestTopicExpression") is not None\ and rpc_config.get("valueExpression"): @@ -347,11 +354,11 @@ class MqttConnector(Connector, Thread): .replace("${params}", content["data"]["params"]) try: self._client.publish(topic, data_to_send) - log.debug("Send RPC with no response request to topic: %s with data %s", + self.__log.debug("Send RPC with no response request to topic: %s with data %s", topic, data_to_send) except Exception as e: - log.error(e) + self.__log.error(e) def rpc_cancel_processing(self, topic): self._client.unsubscribe(topic) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index ae52fedc..6b028497 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -46,6 +46,7 @@ class TBGatewayService: self.available_connectors = {} self.__connector_incoming_messages = {} self.__connected_devices = {} + self.__saved_devices = {} self.__events = [] self.__rpc_requests_in_progress = {} self.__connected_devices_file = "connected_devices.json" @@ -95,8 +96,7 @@ class TBGatewayService: summary_messages['SummarySent'] += telemetry[(connector+' MessagesSent').replace(' ', '')] self.tb_client.client.send_telemetry(summary_messages) gateway_statistic_send = int(time.time()*1000) - - time.sleep(.1) + time.sleep(.1) except Exception as e: log.exception(e) for device in self.__connected_devices: @@ -201,6 +201,7 @@ class TBGatewayService: if filtered_telemetry != {}: self.__published_events.append(self.tb_client.client.gw_send_telemetry(current_event["deviceName"], data_to_send)) + time.sleep(.001) if current_event.get("attributes"): # log.debug(current_event) attributes = {} @@ -219,6 +220,7 @@ class TBGatewayService: if filtered_attributes != {}: self.__published_events.append(self.tb_client.client.gw_send_attributes(current_event["deviceName"], data_to_send)) + time.sleep(.001) success = True for event in range(len(self.__published_events)): result = self.__published_events[event].get() @@ -283,8 +285,9 @@ class TBGatewayService: log.debug(content) def add_device(self, device_name, content, wait_for_publish=False): - if device_name not in self.__connected_devices: + if device_name not in self.__saved_devices: self.__connected_devices[device_name] = content + self.__saved_devices[device_name] = content if wait_for_publish: self.tb_client.client.gw_connect_device(device_name).wait_for_publish() else: diff --git a/thingsboard_gateway/storage/event_storage_reader.py b/thingsboard_gateway/storage/event_storage_reader.py index 0e0c65a6..8a6e5632 100644 --- a/thingsboard_gateway/storage/event_storage_reader.py +++ b/thingsboard_gateway/storage/event_storage_reader.py @@ -101,7 +101,7 @@ class EventStorageReader: self.delete_read_file(self.current_pos.get_file()) if len(self.files.get_data_files()) == 0: os.remove(self.settings.get_data_folder_path() + self.files.get_state_file()) - # self.write_info_to_state_file(self.current_pos) + self.write_info_to_state_file(self.new_pos) self.current_pos = copy.deepcopy(self.new_pos) self.current_batch = None except Exception as e: diff --git a/thingsboard_gateway/tb_utility/tb_utility.py b/thingsboard_gateway/tb_utility/tb_utility.py index f3c9b418..8715fdfa 100644 --- a/thingsboard_gateway/tb_utility/tb_utility.py +++ b/thingsboard_gateway/tb_utility/tb_utility.py @@ -12,15 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import re -import inspect -import importlib -import importlib.util +from os import path, listdir +from inspect import getmembers, isclass +from importlib import util import jsonpath_rw_ext as jp from logging import getLogger from json import dumps, loads from re import search +from time import time + log = getLogger("service") @@ -52,28 +52,28 @@ class TBUtility: @staticmethod def check_and_import(extension_type, module_name): try: - if os.path.exists('/var/lib/thingsboard_gateway/'+extension_type.lower()): + if path.exists('/var/lib/thingsboard_gateway/'+extension_type.lower()): custom_extension_path = '/var/lib/thingsboard_gateway/' + extension_type.lower() log.info('Extension %s - looking for class in %s', extension_type, custom_extension_path) else: - custom_extension_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)) + '/extensions/' + extension_type.lower()) + custom_extension_path = path.abspath(path.dirname(path.dirname(__file__)) + '/extensions/' + extension_type.lower()) log.info('Extension %s - looking for class in %s', extension_type, custom_extension_path) - for file in os.listdir(custom_extension_path): + for file in listdir(custom_extension_path): if not file.startswith('__') and file.endswith('.py'): try: - module_spec = importlib.util.spec_from_file_location(module_name, custom_extension_path + '/' + file) + module_spec = util.spec_from_file_location(module_name, custom_extension_path + '/' + file) log.debug(module_spec) if module_spec is None: log.error('Module: {} not found'.format(module_name)) return None else: - module = importlib.util.module_from_spec(module_spec) + module = util.module_from_spec(module_spec) log.debug(module) try: module_spec.loader.exec_module(module) except Exception as e: log.exception(e) - for extension_class in inspect.getmembers(module, inspect.isclass): + for extension_class in getmembers(module, isclass): if module_name in extension_class: return extension_class[1] except ImportError: @@ -85,6 +85,7 @@ class TBUtility: @staticmethod def get_value(expression, body={}, value_type="string", get_tag=False): + T0 = time()*1000 if isinstance(body, str): body = loads(body) if not expression: @@ -100,22 +101,25 @@ class TBUtility: target_str = str(expression[p1:p2]) if get_tag: return target_str - value = True full_value = None try: if value_type == "string": - value = jp.match1(target_str.split()[0], dumps(body)) - if value is None and body.get(target_str): - full_value = expression[0: min(abs(p1-2), 0)] + body[target_str] + expression[p2+1:len(expression)] - elif value is None: - full_value = expression[0: min(abs(p1-2), 0)] + jp.match1(target_str.split()[0], loads(body) if type(body) == str else body) + expression[p2+1:len(expression)] - else: - full_value = expression[0: min(abs(p1-2), 0)] + value + expression[p2+1:len(expression)] + # tg_sp = target_str.split()[0] + # log.warning(time() * 1000 - T0) + # value = jp.match1(tg_sp, body) + # full_value = expression[0: min(abs(p1 - 2), 0)] + jp.match1(target_str.split()[0], body) + expression[p2 + 1:len(expression)] + full_value = expression[0: min(abs(p1 - 2), 0)] + body[target_str.split()[0]] + expression[p2 + 1:len(expression)] + # log.warning(time() * 1000 - T0) + # if full_value is None: + # if body.get(target_str): + # full_value = expression[0: min(abs(p1-2), 0)] + body[target_str] + expression[p2+1:len(expression)] + # else: + # full_value = expression[0: min(abs(p1-2), 0)] + jp.match1(target_str.split()[0], dumps(body)) + expression[p2+1:len(expression)] else: - full_value = jp.match1(target_str.split()[0], loads(body) if type(body) == str else body) - + # full_value = jp.match1(target_str.split()[0], loads(body) if type(body) == str else body) + full_value = body[target_str.split()[0]] except TypeError: - if value is None: + if full_value is None: log.error('Value is None - Cannot find the pattern: %s in %s. Expression will be interpreted as value.', target_str, dumps(body)) full_value = expression except Exception as e: