From 3fe5430897898189665892a9f6e5b07352388c15 Mon Sep 17 00:00:00 2001 From: zbeacon Date: Wed, 22 Jan 2020 10:35:34 +0200 Subject: [PATCH] Added more logging and improvements for OPC-UA Connector and improvements for remote configuration --- .../connectors/opcua/opcua_connector.py | 41 +++++++++++-------- .../gateway/tb_gateway_remote_configurator.py | 4 +- .../gateway/tb_gateway_service.py | 21 ++++++---- .../tb_client/tb_device_mqtt.py | 8 +++- thingsboard_gateway/tb_utility/tb_utility.py | 2 +- 5 files changed, 47 insertions(+), 29 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 1a04f7da..4c3fc985 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -68,7 +68,7 @@ class OpcUaConnector(Thread, Connector): if self.__server_conf["identity"].get("password"): self.client.set_password(self.__server_conf["identity"].get("password")) - self.setName(self.__server_conf.get("name", 'OPC-UA Default ' + ''.join(choice(ascii_lowercase) for _ in range(5))) + " Connector") + # self.setName(self.__server_conf.get("name", 'OPC-UA Default ' + ''.join(choice(ascii_lowercase) for _ in range(5))) + " Connector") self.__opcua_nodes = {} self._subscribed = {} self.data_to_send = [] @@ -173,15 +173,18 @@ class OpcUaConnector(Thread, Connector): for childId in node.get_children(): ch = self.client.get_node(childId) current_var_path = '.'.join(x.split(":")[1] for x in ch.get_path(20000, True)) + if self.__server_conf.get("showMap"): + log.info("Looking for name: %s", current_var_path) if self.__interest_nodes: if ch.get_node_class() == ua.NodeClass.Object: for interest_node in self.__interest_nodes: for int_node in interest_node: subrecursion_level = recursion_level if subrecursion_level != recursion_level + len(interest_node[int_node]["deviceNamePattern"].split("\\.")): - if ch.get_display_name().Text in TBUtility.get_value(interest_node[int_node]["deviceNamePattern"], get_tag=True).split('.') or \ - re.search(TBUtility.get_value(interest_node[int_node]["deviceNodePattern"], get_tag=True), ch.get_display_name().Text): - self.__search_name(ch, subrecursion_level+1) + if ch.get_display_name().Text in TBUtility.get_value(interest_node[int_node]["deviceNamePattern"], get_tag=True).split('\\.') or \ + re.search(TBUtility.get_value(interest_node[int_node]["deviceNodePattern"].split("\\.")[recursion_level+1], get_tag=True), ch.get_display_name().Text): + if interest_node[int_node].get("deviceName") is None: + self.__search_name(ch, subrecursion_level+1) else: return elif ch.get_node_class() == ua.NodeClass.Variable: @@ -212,13 +215,16 @@ class OpcUaConnector(Thread, Connector): if not self.__gateway.get_devices().get(full_device_name): self.__gateway.add_device(full_device_name, {"connector": None}) self.__gateway.update_device(full_device_name, "connector", self) + return else: try: - if re.search(int_node.split('\\.')[recursion_level-2], ch.get_display_name().Text): - self.__search_name(ch, recursion_level+1) + if re.search(int_node.split('\\.')[recursion_level], ch.get_display_name().Text): + if interest_node[int_node].get("deviceName") is None: + self.__search_name(ch, recursion_level+1) except IndexError: if re.search(int_node.split('\\.')[-1], ch.get_display_name().Text): - self.__search_name(ch, recursion_level+1) + if interest_node[int_node].get("deviceName") is None: + self.__search_name(ch, recursion_level+1) except Exception as e: log.exception(e) @@ -241,12 +247,13 @@ class OpcUaConnector(Thread, Connector): for interest_node in self.__interest_nodes: for int_node in interest_node: try: - name_to_check = int_node.split('\\.')[recursion_level-1] if '\\.' in int_node else int_node - name_to_check = int_node.split('.')[recursion_level-1] if '.' in int_node else name_to_check + name_to_check = int_node.split('\\.')[recursion_level+1] if '\\.' in int_node else int_node + # name_to_check = int_node.split('.')[recursion_level] if '.' in int_node else name_to_check except IndexError: name_to_check = int_node.split('\\.')[-1] if '\\.' in int_node else int_node - name_to_check = int_node.split('.')[-1] if '.' in int_node else name_to_check - if re.search(name_to_check, ch.get_display_name().Text): + # name_to_check = int_node.split('.')[-1] if '.' in int_node else name_to_check + log.debug("%s\t%s", name_to_check, ch.get_display_name().Text) + if re.search(name_to_check.replace("\\", "\\\\"), ch.get_display_name().Text): try: methods = ch.get_methods() for method in methods: @@ -260,7 +267,7 @@ class OpcUaConnector(Thread, Connector): self.__search_tags(ch, subrecursion_level+1, sub) else: return - self.__search_tags(ch, recursion_level+1, sub) + # self.__search_tags(ch, recursion_level+1, sub) elif ch.get_node_class() == ua.NodeClass.Variable: try: for interest_node in self.__interest_nodes: @@ -273,8 +280,8 @@ class OpcUaConnector(Thread, Connector): except Exception as e: log.exception(e) name_to_check = int_node.split('\\.')[-1] if '\\.' in int_node else int_node - name_to_check = int_node.split('.')[-1] if '.' in int_node else name_to_check - if re.search(name_to_check.replace('$', ''), current_var_path): + # name_to_check = int_node.split('.')[-1] if '.' in int_node else name_to_check + if re.search(name_to_check.replace('$', ''), current_var_path.split(".")[-2]): tags = [] if interest_node[int_node].get("attributes"): tags.extend(interest_node[int_node]["attributes"]) @@ -283,11 +290,11 @@ class OpcUaConnector(Thread, Connector): for tag in tags: target = TBUtility.get_value(tag["path"], get_tag=True) try: - tag_name_for_check = target.split('\\.')[recursion_level-1] if '\\.' in target else target - tag_name_for_check = target.split('.')[recursion_level-1] if '.' in target else tag_name_for_check + tag_name_for_check = target.split('\\.')[recursion_level] if '\\.' in target else target + # tag_name_for_check = target.split('.')[recursion_level] if '.' in target else tag_name_for_check except IndexError: tag_name_for_check = target.split('\\.')[-1] if '\\.' in target else target - tag_name_for_check = target.split('.')[-1] if '.' in target else tag_name_for_check + # tag_name_for_check = target.split('.')[-1] if '.' in target else tag_name_for_check current_node_name = ch.get_display_name().Text if current_node_name == tag_name_for_check: sub.subscribe_data_change(ch) diff --git a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py index 9dd59e9a..bba99970 100644 --- a/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/gateway/tb_gateway_remote_configurator.py @@ -47,7 +47,7 @@ class RemoteConfigurator: self.__new_configuration = loads(decoded_configuration) self.__old_connectors_configs = self.__gateway._connectors_configs self.__new_general_configuration_file = self.__new_configuration.get("thingsboard") - if self.__old_configuration != self.__new_configuration: + if self.__old_configuration != decoded_configuration: log.info("Remote configuration received: \n %s", decoded_configuration) self.__process_connectors_configuration() self.__old_configuration = self.__new_configuration @@ -95,7 +95,7 @@ class RemoteConfigurator: def __apply_new_connectors_configuration(self): try: - self.__gateway._load_connectors(self.__new_configuration["thingsboard"]) + self.__gateway._load_connectors(self.__new_configuration["thingsboard"], False) for connector_name in self.__gateway.available_connectors: try: self.__gateway.available_connectors[connector_name].close() diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 8fd18c11..8e22034c 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -111,8 +111,8 @@ class TBGatewayService: break if self.__remote_configurator is not None and not self.__request_config_after_connect and \ self.tb_client.is_connected() and not self.tb_client.client.get_subscriptions_in_progress(): - self.__check_shared_attributes() self.__request_config_after_connect = True + self.__check_shared_attributes() if cur_time - gateway_statistic_send > 60.0 and self.tb_client.is_connected(): @@ -132,7 +132,7 @@ class TBGatewayService: str(connector_camel_case + ' EventsSent').replace(' ', '')] self.tb_client.client.send_telemetry(summary_messages) gateway_statistic_send = time.time() - self.__check_shared_attributes() + # self.__check_shared_attributes() except KeyboardInterrupt as e: log.info("Stopping...") self.__close_connectors() @@ -162,6 +162,10 @@ class TBGatewayService: if self.__remote_configurator is not None and shared_attributes.get("configuration"): try: self.__remote_configurator.process_configuration(shared_attributes.get("configuration")) + + self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True, + name="Send data to Thingsboard Thread") + self.__send_thread.start() except Exception as e: log.exception(e) if client_attributes is not None: @@ -169,6 +173,10 @@ class TBGatewayService: if self.__remote_configurator is not None and content.get("configuration"): try: self.__remote_configurator.process_configuration(content.get("configuration")) + + self.__send_thread = Thread(target=self.__read_data_from_storage, daemon=True, + name="Send data to Thingsboard Thread") + self.__send_thread.start() except Exception as e: log.exception(e) remote_logging_level = shared_attributes.get('RemoteLoggingLevel') if shared_attributes is not None else content.get("RemoteLoggingLevel") @@ -215,12 +223,11 @@ class TBGatewayService: with open(self._config_dir + connector['configuration'], 'r') as conf_file: try: connector_conf = load(conf_file) + if not self._connectors_configs.get(connector['type']): + self._connectors_configs[connector['type']] = [] + self._connectors_configs[connector['type']].append({"name": connector["name"], "config": {connector['configuration']: connector_conf}}) except Exception as e: log.exception(e) - log.error("Cannot read from file: %s", conf_file) - if not self._connectors_configs.get(connector['type']): - self._connectors_configs[connector['type']] = [] - self._connectors_configs[connector['type']].append({"name": connector["name"], "config": {connector['configuration']: connector_conf}}) except Exception as e: log.exception(e) @@ -349,8 +356,8 @@ class TBGatewayService: else: time.sleep(.01) else: - self.__request_config_after_connect = False time.sleep(.1) + self.__request_config_after_connect = False except Exception as e: log.exception(e) time.sleep(1) diff --git a/thingsboard_gateway/tb_client/tb_device_mqtt.py b/thingsboard_gateway/tb_client/tb_device_mqtt.py index 7d800754..4c2eefb0 100644 --- a/thingsboard_gateway/tb_client/tb_device_mqtt.py +++ b/thingsboard_gateway/tb_client/tb_device_mqtt.py @@ -167,7 +167,8 @@ class TBDeviceMqttClient: pass def _on_disconnect(self, client, userdata, rc): - self.__is_connected = False + log.debug(client) + log.debug("Disconnected") def _on_connect(self, client, userdata, flags, rc, *extra_params): result_codes = { @@ -182,6 +183,7 @@ class TBDeviceMqttClient: if rc == 0: self.__is_connected = True log.info("connection SUCCESS") + log.debug(client) self._client.subscribe(ATTRIBUTES_TOPIC, qos=1) self._client.subscribe(ATTRIBUTES_TOPIC + "/response/+", 1) self._client.subscribe(RPC_REQUEST_TOPIC + '+') @@ -212,7 +214,9 @@ class TBDeviceMqttClient: def disconnect(self): self._client.disconnect() - log.info("Disconnected from ThingsBoard!") + log.debug(self._client) + log.debug("Disconnecting from ThingsBoard") + self.__is_connected = False def _on_message(self, client, userdata, message): content = TBUtility.decode(message) diff --git a/thingsboard_gateway/tb_utility/tb_utility.py b/thingsboard_gateway/tb_utility/tb_utility.py index f6cb86c5..a3bbcf5e 100644 --- a/thingsboard_gateway/tb_utility/tb_utility.py +++ b/thingsboard_gateway/tb_utility/tb_utility.py @@ -74,7 +74,7 @@ class TBUtility: return None else: module = util.module_from_spec(module_spec) - log.debug(module) + log.debug(str(module)) try: module_spec.loader.exec_module(module) except Exception as e: