diff --git a/setup.py b/setup.py index 28587ef8..36a0df76 100644 --- a/setup.py +++ b/setup.py @@ -39,15 +39,15 @@ setup( 'thingsboard_gateway.storage', 'thingsboard_gateway.storage.memory', 'thingsboard_gateway.gateway.shell', 'thingsboard_gateway.storage.file', 'thingsboard_gateway.storage.sqlite', 'thingsboard_gateway.connectors', 'thingsboard_gateway.connectors.ble', 'thingsboard_gateway.connectors.socket', - 'thingsboard_gateway.connectors.mqtt', 'thingsboard_gateway.connectors.opcua_asyncio', 'thingsboard_gateway.connectors.xmpp', + 'thingsboard_gateway.connectors.mqtt', 'thingsboard_gateway.connectors.xmpp', 'thingsboard_gateway.connectors.opcua', 'thingsboard_gateway.connectors.request', 'thingsboard_gateway.connectors.ocpp', 'thingsboard_gateway.connectors.modbus', 'thingsboard_gateway.connectors.can', 'thingsboard_gateway.connectors.bacnet', 'thingsboard_gateway.connectors.bacnet.bacnet_utilities', 'thingsboard_gateway.connectors.odbc', 'thingsboard_gateway.connectors.rest', 'thingsboard_gateway.connectors.snmp', 'thingsboard_gateway.connectors.ftp', 'thingsboard_gateway.tb_utility', 'thingsboard_gateway.extensions', 'thingsboard_gateway.extensions.mqtt', 'thingsboard_gateway.extensions.modbus', 'thingsboard_gateway.extensions.opcua', - 'thingsboard_gateway.extensions.opcua_asyncio', 'thingsboard_gateway.extensions.ocpp', - 'thingsboard_gateway.extensions.ble', 'thingsboard_gateway.extensions.serial', 'thingsboard_gateway.extensions.request', + 'thingsboard_gateway.extensions.ocpp', 'thingsboard_gateway.extensions.ble', + 'thingsboard_gateway.extensions.serial', 'thingsboard_gateway.extensions.request', 'thingsboard_gateway.extensions.can', 'thingsboard_gateway.extensions.bacnet', 'thingsboard_gateway.extensions.odbc', 'thingsboard_gateway.extensions.rest', 'thingsboard_gateway.extensions.snmp', 'thingsboard_gateway.extensions.ftp', 'thingsboard_gateway.extensions.socket', 'thingsboard_gateway.extensions.xmpp', diff --git a/thingsboard_gateway/connectors/opcua/__init__.py b/thingsboard_gateway/connectors/opcua/__init__.py index 620ae837..e69de29b 100644 --- a/thingsboard_gateway/connectors/opcua/__init__.py +++ b/thingsboard_gateway/connectors/opcua/__init__.py @@ -1,13 +0,0 @@ -# Copyright 2024. ThingsBoard -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/thingsboard_gateway/connectors/opcua_asyncio/device.py b/thingsboard_gateway/connectors/opcua/device.py similarity index 100% rename from thingsboard_gateway/connectors/opcua_asyncio/device.py rename to thingsboard_gateway/connectors/opcua/device.py diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 6ad5d686..f9d27810 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -12,239 +12,124 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import re -import time -from uuid import UUID -from concurrent.futures import CancelledError, TimeoutError as FuturesTimeoutError -from copy import deepcopy from random import choice from string import ascii_lowercase from threading import Thread -from cachetools import cached, TTLCache - -import regex -from simplejson import dumps +from time import sleep, monotonic +from queue import Queue +from thingsboard_gateway.connectors.connector import Connector +from thingsboard_gateway.connectors.opcua.device import Device from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader from thingsboard_gateway.tb_utility.tb_utility import TBUtility -from thingsboard_gateway.gateway.statistics_service import StatisticsService from thingsboard_gateway.tb_utility.tb_logger import init_logger try: - from opcua import Client, Node, ua + import asyncua except ImportError: print("OPC-UA library not found") - TBUtility.install_package("opcua") - from opcua import Client, Node, ua + TBUtility.install_package("asyncua") + import asyncua -try: - from opcua.crypto import uacrypto -except ImportError: - TBUtility.install_package("cryptography") - from opcua.crypto import uacrypto +from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256, SecurityPolicyBasic256, \ + SecurityPolicyBasic128Rsa15 +from asyncua.ua.uaerrors import UaStatusCodeError, BadNodeIdUnknown, BadConnectionClosed, \ + BadInvalidState, BadSessionClosed, BadAttributeIdInvalid, BadCommunicationError, BadOutOfService -from thingsboard_gateway.connectors.connector import Connector -from thingsboard_gateway.connectors.opcua.opcua_uplink_converter import OpcUaUplinkConverter +DEFAULT_UPLINK_CONVERTER = 'OpcUaUplinkConverter' + +SECURITY_POLICIES = { + "Basic128Rsa15": SecurityPolicyBasic128Rsa15, + "Basic256": SecurityPolicyBasic256, + "Basic256Sha256": SecurityPolicyBasic256Sha256, +} + +MESSAGE_SECURITY_MODES = { + "None": asyncua.ua.MessageSecurityMode.None_, + "Sign": asyncua.ua.MessageSecurityMode.Sign, + "SignAndEncrypt": asyncua.ua.MessageSecurityMode.SignAndEncrypt +} -class OpcUaConnector(Thread, Connector): +class OpcUaConnector(Connector, Thread): def __init__(self, gateway, config, connector_type): - self._connector_type = connector_type self.statistics = {'MessagesReceived': 0, 'MessagesSent': 0} super().__init__() + self._connector_type = connector_type self.__gateway = gateway - self._config = config - self.__id = self._config.get('id') - self.__server_conf = config.get("server") - self.name = self._config.get("name", - 'OPC-UA ' + ''.join(choice(ascii_lowercase) for _ in range(5)) + " Connector") - self._log = init_logger(self.__gateway, self.name, self._config.get('logLevel', 'INFO'), - enable_remote_logging=self._config.get('enableRemoteLogging', False)) - self._log.warning("OPC-UA Connector is deprecated and will be removed in the release v.4.0") - self.__interest_nodes = [] - self.__available_object_resources = {} - self.__show_map = self.__server_conf.get("showMap", False) - self.__previous_scan_time = 0 - for mapping in self.__server_conf["mapping"]: - if mapping.get("deviceNodePattern") is not None: - self.__interest_nodes.append({mapping["deviceNodePattern"]: mapping}) - else: - self._log.error( - "deviceNodePattern in mapping: %s - not found, add property deviceNodePattern to processing this mapping", - dumps(mapping)) + self.__config = config + self.__id = self.__config.get('id') + self.__server_conf = config['server'] + self.name = self.__config.get("name", 'OPC-UA Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))) + self.__log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), + enable_remote_logging=self.__config.get('enableRemoteLogging', False)) + if "opc.tcp" not in self.__server_conf.get("url"): self.__opcua_url = "opc.tcp://" + self.__server_conf.get("url") else: self.__opcua_url = self.__server_conf.get("url") - self.client = None - self.__connected = False + self.__data_to_send = Queue(-1) + self.__sub_data_to_convert = Queue(-1) - self.__sub_handler = SubHandler(self) - self.data_to_send = [] + self.__loop = asyncio.new_event_loop() + + self.__client = None + self.__subscription = None + + self.__connected = False self.__stopped = False self.daemon = True - def is_connected(self): - return self.__connected - - def is_stopped(self): - return self.__stopped - - def get_type(self): - return self._connector_type + self.__device_nodes = [] + self.__last_poll = 0 def open(self): self.__stopped = False self.start() - self._log.info("Starting OPC-UA Connector") + self.__log.info("Starting OPC-UA Connector (Async IO)") - def __create_client(self): - if self.client: - try: - # Always try to disconnect to release resource on client and server side! - self.client.disconnect() - except: - pass - self.client = None - - self.client = Client(self.__opcua_url, timeout=self.__server_conf.get("timeoutInMillis", 4000) / 1000) - - if self.__server_conf.get('uri'): - self.client.application_uri = self.__server_conf['uri'] - - if self.__server_conf["identity"].get("type") == "cert.PEM": - self.__set_auth_settings_by_cert() - if self.__server_conf["identity"].get("username"): - self.__set_auth_settings_by_username() - - self.__available_object_resources = {} - self.__opcua_nodes = {} - self._subscribed = {} - self.__sub = None - self.__connected = False - - def __connect(self): - self.__create_client() - - while not self.__connected and not self.__stopped: - try: - self.client.connect() - try: - self.client.load_type_definitions() - except Exception as e: - self._log.error("Error on loading type definitions:\n %s", e) - self._log.debug(self.client.get_namespace_array()[-1]) - self._log.debug(self.client.get_namespace_index(self.client.get_namespace_array()[-1])) - - self.__initialize_client() - - if not self.__server_conf.get("disableSubscriptions", False): - self.__sub = self.client.create_subscription(self.__server_conf.get("subCheckPeriodInMillis", 500), - self.__sub_handler) - - self.__connected = True - self._log.info("OPC-UA connector %s connected to server %s", self.get_name(), - self.__server_conf.get("url")) - except ConnectionRefusedError: - self._log.error("Connection refused on connection to OPC-UA server with url %s", - self.__server_conf.get("url")) - time.sleep(10) - except OSError: - self._log.error("Connection refused on connection to OPC-UA server with url %s", - self.__server_conf.get("url")) - time.sleep(10) - except Exception as e: - self._log.error("error on connection to OPC-UA server.\n %s", e) - time.sleep(10) - - def run(self): - while not self.__stopped: - try: - time.sleep(.2) - self.__check_connection() - if not self.__connected and not self.__stopped: - self.__connect() - elif not self.__stopped: - if self.__server_conf.get("disableSubscriptions", False) and time.time() * 1000 - self.__previous_scan_time > self.__server_conf.get( - "scanPeriodInMillis", 60000): - self.scan_nodes_from_config() - self.__previous_scan_time = time.time() * 1000 - # giusguerrini, 2020-09-24: Fix: flush event set and send all data to platform, - # so data_to_send doesn't grow indefinitely in case of more than one value change - # per cycle, and platform doesn't lose events. - # NOTE: possible performance improvement: use a map to store only one event per - # variable to reduce frequency of messages to platform. - while self.data_to_send: - self.__gateway.send_to_storage(self.get_name(), self.get_id(), self.data_to_send.pop()) - if self.__stopped: - self.close() - break - except (KeyboardInterrupt, SystemExit): - self.close() - raise - except FuturesTimeoutError: - self.__check_connection() - except Exception as e: - self._log.error("Connection failed on connection to OPC-UA server with url %s\n %s", - self.__server_conf.get("url"), e) - - time.sleep(10) - - def __set_auth_settings_by_cert(self): - try: - ca_cert = self.__server_conf["identity"].get("caCert") - private_key = self.__server_conf["identity"].get("privateKey") - cert = self.__server_conf["identity"].get("cert") - security_mode = self.__server_conf["identity"].get("mode", "SignAndEncrypt") - policy = self.__server_conf["security"] - if cert is None or private_key is None: - self._log.exception("Error in ssl configuration - cert or privateKey parameter not found") - raise RuntimeError("Error in ssl configuration - cert or privateKey parameter not found") - security_string = policy + ',' + security_mode + ',' + cert + ',' + private_key - if ca_cert is not None: - security_string = security_string + ',' + ca_cert - self.client.set_security_string(security_string) - - except Exception as e: - self._log.exception(e) - - def __set_auth_settings_by_username(self): - self.client.set_user(self.__server_conf["identity"].get("username")) - if self.__server_conf["identity"].get("password"): - self.client.set_password(self.__server_conf["identity"].get("password")) - - def __check_connection(self): - try: - if self.client: - node = self.client.get_root_node() - node.get_children() - else: - self.__connected = False - except ConnectionRefusedError: - self.__connected = False - except OSError: - self.__connected = False - except FuturesTimeoutError: - self.__connected = False - except AttributeError: - self.__connected = False - except Exception as e: - self.__connected = False - self._log.exception(e) + def get_type(self): + return self._connector_type def close(self): + task = self.__loop.create_task(self.__reset_nodes()) + + while not task.done(): + sleep(.2) + self.__stopped = True - if self.client: + self.__connected = False + self.__log.info('%s has been stopped.', self.get_name()) + self.__log.stop() + + async def __reset_node(self, node): + node['valid'] = False + if node.get('sub_on', False): try: - # Always try to disconnect to release resource on client and server side! - self.client.disconnect() + if self.__subscription: + await self.__subscription.unsubscribe(node['subscription']) except: pass - self.__connected = False - self._log.info('%s has been stopped.', self.get_name()) - self._log.stop() + node['subscription'] = None + node['sub_on'] = False + + async def __reset_nodes(self, device_name=None): + for device in self.__device_nodes: + if device_name is None or device.name == device_name: + for section in ('attributes', 'timeseries'): + for node in device.values.get(section, []): + await self.__reset_node(node) + + if device_name is None and self.__subscription: + try: + await self.__subscription.delete() + except: + pass + self.__subscription = None def get_id(self): return self.__id @@ -252,28 +137,340 @@ class OpcUaConnector(Thread, Connector): def get_name(self): return self.name - @StatisticsService.CollectAllReceivedBytesStatistics(start_stat_type='allReceivedBytesFromTB') - def on_attributes_update(self, content): - self._log.debug(content) - try: - for server_variables in self.__available_object_resources[content["device"]]['variables']: - for attribute in content["data"]: - for variable in server_variables: - if attribute == variable: - try: - if isinstance(content["data"][variable], int): - dv = ua.DataValue(ua.Variant(content["data"][variable], server_variables[ - variable].get_data_type_as_variant_type())) - server_variables[variable].set_value(dv) - else: - server_variables[variable].set_value(content["data"][variable]) - except Exception: - server_variables[variable].set_attribute(ua.AttributeIds.Value, - ua.DataValue(content["data"][variable])) - except Exception as e: - self._log.exception(e) + def is_connected(self): + return self.__connected + + def is_stopped(self): + return self.__stopped + + def get_config(self): + return self.__server_conf + + def run(self): + data_send_thread = Thread(name='Send Data Thread', target=self.__send_data, daemon=True) + data_send_thread.start() + + if not self.__server_conf.get('disableSubscriptions', False): + sub_data_convert_thread = Thread(name='Sub Data Convert Thread', target=self.__convert_sub_data, + daemon=True) + sub_data_convert_thread.start() + + self.__loop.run_until_complete(self.start_client()) + + async def start_client(self): + while not self.__stopped: + self.__client = asyncua.Client(url=self.__opcua_url, + timeout=self.__server_conf.get('timeoutInMillis', 4000) / 1000) + + if self.__server_conf["identity"].get("type") == "cert.PEM": + await self.__set_auth_settings_by_cert() + if self.__server_conf["identity"].get("username"): + self.__set_auth_settings_by_username() + + try: + async with self.__client: + self.__connected = True + + try: + await self.__client.load_data_type_definitions() + except Exception as e: + self.__log.error("Error on loading type definitions:\n %s", e) + + while not self.__stopped: + if monotonic() - self.__last_poll >= self.__server_conf.get('scanPeriodInMillis', 5000) / 1000: + await self.__scan_device_nodes() + await self.__poll_nodes() + self.__last_poll = monotonic() + + await asyncio.sleep(.2) + except (ConnectionError, BadSessionClosed): + self.__log.warning('Connection lost for %s', self.get_name()) + except asyncio.exceptions.TimeoutError: + self.__log.warning('Failed to connect %s', self.get_name()) + except Exception as e: + self.__log.exception(e) + finally: + await self.__reset_nodes() + self.__connected = False + await asyncio.sleep(1) + + async def __set_auth_settings_by_cert(self): + try: + ca_cert = self.__server_conf["identity"].get("caCert") + private_key = self.__server_conf["identity"].get("privateKey") + cert = self.__server_conf["identity"].get("cert") + policy = self.__server_conf["security"] + mode = self.__server_conf["identity"].get("mode", "SignAndEncrypt") + + if cert is None or private_key is None: + self.__log.exception("Error in ssl configuration - cert or privateKey parameter not found") + raise RuntimeError("Error in ssl configuration - cert or privateKey parameter not found") + + await self.__client.set_security( + SECURITY_POLICIES[policy], + certificate=cert, + private_key=private_key, + server_certificate=ca_cert, + mode=MESSAGE_SECURITY_MODES[mode] + ) + except Exception as e: + self.__log.exception(e) + + def __set_auth_settings_by_username(self): + self.__client.set_user(self.__server_conf["identity"].get("username")) + if self.__server_conf["identity"].get("password"): + self.__client.set_password(self.__server_conf["identity"].get("password")) + + def __load_converter(self, device): + converter_class_name = device.get('converter', DEFAULT_UPLINK_CONVERTER) + module = TBModuleLoader.import_module(self._connector_type, converter_class_name) + + if module: + self.__log.debug('Converter %s for device %s - found!', converter_class_name, self.name) + return module + + self.__log.error("Cannot find converter for %s device", self.name) + return None + + @staticmethod + def is_regex_pattern(pattern): + return not re.fullmatch(pattern, pattern) + + async def __find_nodes(self, node_list, current_parent_node, nodes): + assert len(node_list) > 0 + final = [] + + children = await current_parent_node.get_children() + for node in children: + child_node = await node.read_browse_name() + + if re.fullmatch(re.escape(node_list[0]), child_node.Name) or node_list[0].split(':')[-1] == child_node.Name: + new_nodes = [*nodes, f'{child_node.NamespaceIndex}:{child_node.Name}'] + if len(node_list) == 1: + final.append(new_nodes) + else: + final.extend(await self.__find_nodes(node_list[1:], current_parent_node=node, nodes=new_nodes)) + + return final + + async def find_nodes(self, node_pattern, current_parent_node=None, nodes=[]): + node_list = node_pattern.split('\\.') + + if current_parent_node is None: + current_parent_node = self.__client.nodes.root + + if len(node_list) > 0 and node_list[0].lower() == 'root': + node_list = node_list[1:] + + return await self.__find_nodes(node_list, current_parent_node, nodes) + + async def find_node_name_space_index(self, path): + if isinstance(path, str): + path = path.split('\\.') + + # find unresolved nodes + u_node_count = len(tuple(filter(lambda u_node: len(u_node.split(':')) < 2, path))) + + resolved = path[:-u_node_count] + resolved_level = len(path) - u_node_count + parent_node = await self.__client.nodes.root.get_child(resolved) + + unresolved = path[resolved_level:] + return await self.__find_nodes(unresolved, current_parent_node=parent_node, nodes=resolved) + + async def __scan_device_nodes(self): + existing_devices = list(map(lambda dev: dev.name, self.__device_nodes)) + + scanned_devices = [] + for device in self.__server_conf.get('mapping', []): + nodes = await self.find_nodes(device['deviceNodePattern']) + self.__log.debug('Found devices: %s', nodes) + + device_names = await self._get_device_info_by_pattern(device['deviceNamePattern']) + + for device_name in device_names: + scanned_devices.append(device_name) + if device_name not in existing_devices: + for node in nodes: + converter = self.__load_converter(device) + device_type = await self._get_device_info_by_pattern( + device.get('deviceTypePattern', 'default'), + get_first=True) + device_config = {**device, 'device_name': device_name, 'device_type': device_type} + self.__device_nodes.append( + Device(path=node, name=device_name, config=device_config, + converter=converter(device_config, self.__log), + converter_for_sub=converter(device_config, self.__log) if not self.__server_conf.get( + 'disableSubscriptions', + False) else None, logger=self.__log)) + self.__log.info('Added device node: %s', device_name) + + for device_name in existing_devices: + if device_name not in scanned_devices: + await self.__reset_nodes(device_name) + + self.__log.debug('Device nodes: %s', self.__device_nodes) + + async def _get_device_info_by_pattern(self, pattern, get_first=False): + result = [] + + search_result = re.search(r"\${([A-Za-z.:\\\d]+)}", pattern) + if search_result: + try: + group = search_result.group(0) + node_path = search_result.group(1) + except IndexError: + self.__log.error('Invalid pattern: %s', pattern) + return result + + nodes = await self.find_nodes(node_path) + self.__log.debug('Found device name nodes: %s', nodes) + + for node in nodes: + try: + var = await self.__client.nodes.root.get_child(node) + value = await var.read_value() + result.append(pattern.replace(group, str(value))) + except Exception as e: + self.__log.exception(e) + continue + else: + result.append(pattern) + + return result[0] if len(result) > 0 and get_first else result + + def __convert_sub_data(self): + while not self.__stopped: + if not self.__sub_data_to_convert.empty(): + sub_node, data = self.__sub_data_to_convert.get() + + for device in self.__device_nodes: + for section in ('attributes', 'timeseries'): + for node in device.values.get(section, []): + if node.get('id') == sub_node.__str__(): + device.converter_for_sub.convert(config={'section': section, 'key': node['key']}, + val=data.monitored_item.Value) + converter_data = device.converter_for_sub.get_data() + + if converter_data: + self.__data_to_send.put(*converter_data) + device.converter_for_sub.clear_data() + else: + sleep(.2) + + async def __poll_nodes(self): + for device in self.__device_nodes: + for section in ('attributes', 'timeseries'): + for node in device.values.get(section, []): + try: + path = node.get('qualified_path', node['path']) + if isinstance(path, str) and re.match(r"(ns=\d+;[isgb]=[^}]+)", path): + var = self.__client.get_node(path) + else: + if len(path[-1].split(':')) != 2: + qualified_path = await self.find_node_name_space_index(path) + if len(qualified_path) == 0: + if node.get('valid', True): + self.__log.warning('Node not found; device: %s, key: %s, path: %s', device.name, + node['key'], node['path']) + await self.__reset_node(node) + continue + elif len(qualified_path) > 1: + self.__log.warning( + 'Multiple matching nodes found; device: %s, key: %s, path: %s; %s', device.name, + node['key'], node['path'], qualified_path) + node['qualified_path'] = qualified_path[0] + path = qualified_path[0] + + var = await self.__client.nodes.root.get_child(path) + + if (node.get('valid') is None or + (node.get('valid') and self.__server_conf.get('disableSubscriptions', False))): + value = await var.read_data_value() + device.converter.convert(config={'section': section, 'key': node['key']}, val=value) + + if not self.__server_conf.get('disableSubscriptions', False) and not node.get('sub_on', + False): + if self.__subscription is None: + self.__subscription = await self.__client.create_subscription(1, SubHandler( + self.__sub_data_to_convert, self.__log)) + handle = await self.__subscription.subscribe_data_change(var) + node['subscription'] = handle + node['sub_on'] = True + node['id'] = var.nodeid.to_string() + self.__log.info("Subscribed on data change; device: %s, key: %s, path: %s", device.name, + node['key'], node['id']) + + node['valid'] = True + except ConnectionError: + raise + except (BadNodeIdUnknown, BadConnectionClosed, BadInvalidState, BadAttributeIdInvalid, + BadCommunicationError, BadOutOfService): + if node.get('valid', True): + self.__log.warning('Node not found (2); device: %s, key: %s, path: %s', device.name, + node['key'], node['path']) + await self.__reset_node(node) + except UaStatusCodeError as uae: + if node.get('valid', True): + self.__log.exception('Node status code error: %s', uae) + await self.__reset_node(node) + except Exception as e: + if node.get('valid', True): + self.__log.exception(e) + await self.__reset_node(node) + + converter_data = device.converter.get_data() + if converter_data: + self.__data_to_send.put(*converter_data) + + device.converter.clear_data() + + def __send_data(self): + while not self.__stopped: + if not self.__data_to_send.empty(): + data = self.__data_to_send.get() + self.statistics['MessagesReceived'] = self.statistics['MessagesReceived'] + 1 + self.__log.debug(data) + self.__gateway.send_to_storage(self.get_name(), self.get_id(), data) + self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1 + self.__log.debug('Data to ThingsBoard %s', data) + else: + sleep(.2) + + async def get_shared_attr_node_id(self, path, result={}): + try: + q_path = await self.find_node_name_space_index(path) + var = await self.__client.nodes.root.get_child(q_path[0]) + result['result'] = var + except Exception as e: + result['error'] = e.__str__() + + def on_attributes_update(self, content): + self.__log.debug(content) + try: + device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0] + + for (key, value) in content['data'].items(): + for attr_update in device.config['attributes_updates']: + if attr_update['attributeOnThingsBoard'] == key: + result = {} + task = self.__loop.create_task( + self.get_shared_attr_node_id( + device.path + attr_update['attributeOnDevice'].replace('\\', '').split('.'), result)) + + while not task.done(): + sleep(.1) + + if result.get('error'): + self.__log.error('Node not found! (%s)', result['error']) + return + + node_id = result['result'] + self.__loop.create_task(self.__write_value(node_id, value)) + return + except Exception as e: + self.__log.exception(e) - @StatisticsService.CollectAllReceivedBytesStatistics(start_stat_type='allReceivedBytesFromTB') def server_side_rpc_handler(self, content): try: if content.get('data') is None: @@ -295,6 +492,7 @@ class OpcUaConnector(Thread, Connector): full_path = '' args_list = [] device = content.get('device') + try: args_list = content['data']['params'].split(';') @@ -303,474 +501,111 @@ class OpcUaConnector(Thread, Connector): else: full_path = args_list[0].split('=')[-1] except IndexError: - self._log.error('Not enough arguments. Expected min 2.') + self.__log.error('Not enough arguments. Expected min 2.') self.__gateway.send_rpc_reply(device=device, req_id=content['data'].get('id'), content={content['data'][ 'method']: 'Not enough arguments. Expected min 2.', 'code': 400}) - node_list = [] - self.__search_node(current_node=device, fullpath=full_path, result=node_list) - - node = None - try: - node = node_list[0] - except IndexError: - self.__gateway.send_rpc_reply(device=device, req_id=content['data'].get('id'), - content={content['data']['method']: 'Node didn\'t find!', - 'code': 500}) - + result = {} if rpc_method == 'get': - self.__gateway.send_rpc_reply(device=device, - req_id=content['data'].get('id'), - content={content['data']['method']: node.get_value(), 'code': 200}) - else: - try: - value = args_list[2].split('=')[-1] - node.set_value(value) - self.__gateway.send_rpc_reply(device=device, - req_id=content['data'].get('id'), - content={'success': 'true', 'code': 200}) - except ValueError: - self._log.error('Method SET take three arguments!') - self.__gateway.send_rpc_reply(device=device, - req_id=content['data'].get('id'), - content={'error': 'Method SET take three arguments!', - 'code': 400}) - except ua.UaStatusCodeError: - self._log.error('Write method doesn\'t allow!') - self.__gateway.send_rpc_reply(device=device, - req_id=content['data'].get('id'), - content={'error': 'Write method doesn\'t allow!', 'code': 400}) + task = self.__loop.create_task(self.__read_value(full_path, result)) - for method in self.__available_object_resources[content["device"]]['methods']: - if rpc_method is not None and method.get(rpc_method) is not None: - arguments_from_config = method["arguments"] - arguments = content["data"].get("params") if content["data"].get( - "params") is not None else arguments_from_config - try: - if isinstance(arguments, list): - result = method["node"].call_method(method[rpc_method], *arguments) - elif arguments is not None: - try: - result = method["node"].call_method(method[rpc_method], arguments) - except ua.UaStatusCodeError as e: - if "BadTypeMismatch" in str(e) and isinstance(arguments, int): - result = method["node"].call_method(method[rpc_method], float(arguments)) - else: - result = method["node"].call_method(method[rpc_method]) + while not task.done(): + sleep(.2) + elif rpc_method == 'set': + value = args_list[2].split('=')[-1] + task = self.__loop.create_task(self.__write_value(full_path, value, result)) - self.__gateway.send_rpc_reply(content["device"], - content["data"]["id"], - {content["data"]["method"]: result, "code": 200}) - self._log.debug("method %s result is: %s", method[rpc_method], result) - except Exception as e: - self._log.exception(e) - self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], - {"error": str(e), "code": 500}) - else: - self._log.error("Method %s not found for device %s", rpc_method, content["device"]) - self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], - {"error": "%s - Method not found" % rpc_method, "code": 404}) - except Exception as e: - self._log.exception(e) + while not task.done(): + sleep(.2) - def __initialize_client(self): - self.__opcua_nodes["root"] = self.client.get_objects_node() - self.__opcua_nodes["objects"] = self.client.get_objects_node() - self.scan_nodes_from_config() - self.__previous_scan_time = time.time() * 1000 - self._log.debug('Subscriptions: %s', self.subscribed) - self._log.debug("Available methods: %s", self.__available_object_resources) + self.__gateway.send_rpc_reply(device=device, + req_id=content['data'].get('id'), + content={content['data']['method']: result}) + else: + device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0] + + for rpc in device.config['rpc_methods']: + if rpc['method'] == content["data"]['method']: + arguments_from_config = rpc["arguments"] + arguments = content["data"].get("params") if content["data"].get( + "params") is not None else arguments_from_config + method_name = content['data']['method'] - def scan_nodes_from_config(self): - try: - if self.__interest_nodes: - for device_object in self.__interest_nodes: - for current_device in device_object: try: - device_configuration = device_object[current_device] - devices_info_array = self.__search_general_info(device_configuration) - for device_info in devices_info_array: - if device_info is not None and device_info.get("deviceNode") is not None: - self.__search_nodes_and_subscribe(device_info) - self.__save_methods(device_info) - self.__search_attribute_update_variables(device_info) - else: - self._log.error("Device node is None, please check your configuration.") - self._log.debug("Current device node is: %s", - str(device_configuration.get("deviceNodePattern"))) - break - except BrokenPipeError: - self._log.debug("Broken Pipe. Connection lost.") - except OSError: - self._log.debug("Stop on scanning.") - except FuturesTimeoutError: - self.__check_connection() + result = {} + task = self.__loop.create_task( + self.__call_method(device.path, method_name, arguments, result)) + + while not task.done(): + sleep(.2) + + self.__gateway.send_rpc_reply(content["device"], + content["data"]["id"], + {content["data"]["method"]: result, "code": 200}) + self.__log.debug("method %s result is: %s", rpc['method'], result) except Exception as e: - self._log.exception(e) - self._log.debug(self.__interest_nodes) - except Exception as e: - self._log.exception(e) - - def __search_nodes_and_subscribe(self, device_info): - sub_nodes = [] - information_types = {"attributes": "attributes", "timeseries": "telemetry"} - for information_type in information_types: - for information in device_info["configuration"][information_type]: - config_path = TBUtility.get_value(information["path"], get_tag=True) - information_path = self._check_path(config_path, device_info["deviceNode"]) - information["path"] = '${%s}' % information_path - information_key = information['key'] - information_nodes = [] - self.__search_node(device_info["deviceNode"], information_path, result=information_nodes) - if len(information_nodes) == 0: - self._log.error("No nodes found for: %s - %s - %s", information_type, information["key"], information_path) - - for information_node in information_nodes: - changed_key = False - - if information_node is not None: - try: - information_value = information_node.get_value() - except: - self._log.error("Err get_value: %s", str(information_node)) - continue - - if device_info.get("uplink_converter") is None: - configuration = {**device_info["configuration"], - "deviceName": device_info["deviceName"], - "deviceType": device_info["deviceType"]} - if device_info["configuration"].get('converter') is None: - converter = OpcUaUplinkConverter(configuration, self._log) - else: - converter = TBModuleLoader.import_module(self._connector_type, - device_info["configuration"].get('converter'))( - configuration, self._log) - device_info["uplink_converter"] = converter - else: - converter = device_info["uplink_converter"] - - self.subscribed[information_key] = {"converter": converter, - "information_node": information_node, - "information": information, - "information_type": information_type} - - # Use Node name if param "key" not found in config - if not information.get('key'): - information['key'] = information_node.get_browse_name().Name - self.subscribed[information_key]['key'] = information['key'] - changed_key = True - - self._log.debug("Node for %s \"%s\" with path: %s - FOUND! Current values is: %s", - information_type, - information_key, - information_path, - str(information_value)) - - if not device_info.get(information_types[information_type]): - device_info[information_types[information_type]] = [] - - converted_data = converter.convert((information, information_type), information_value) - self.statistics['MessagesReceived'] = self.statistics['MessagesReceived'] + 1 - self.data_to_send.append(converted_data) - self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1 - self._log.debug("Data to ThingsBoard: %s", converted_data) - - if not self.__server_conf.get("disableSubscriptions", False): - sub_nodes.append(information_node) + self.__log.exception(e) + self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], + {"error": str(e), "code": 500}) else: - self._log.error("Node for %s \"%s\" with path %s - NOT FOUND!", information_type, - information['key'], information_path) + self.__log.error("Method %s not found for device %s", rpc_method, content["device"]) + self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], + {"error": "%s - Method not found" % rpc_method, + "code": 404}) - if changed_key: - information['key'] = None + except Exception as e: + self.__log.exception(e) - if not self.__server_conf.get("disableSubscriptions", False): - if self.__sub is None: - self.__sub = self.client.create_subscription(self.__server_conf.get("subCheckPeriodInMillis", 500), - self.__sub_handler) - if sub_nodes: - self.__sub.subscribe_data_change(sub_nodes) - self._log.debug("Added subscription to nodes: %s", str(sub_nodes)) - - def __save_methods(self, device_info): + async def __write_value(self, path, value, result={}): try: - if self.__available_object_resources.get(device_info["deviceName"]) is None: - self.__available_object_resources[device_info["deviceName"]] = {} - if self.__available_object_resources[device_info["deviceName"]].get("methods") is None: - self.__available_object_resources[device_info["deviceName"]]["methods"] = [] - if device_info["configuration"].get("rpc_methods", []): - node = device_info["deviceNode"] - for method_object in device_info["configuration"]["rpc_methods"]: - method_node_path = self._check_path(method_object["method"], node) - methods = [] - self.__search_node(node, method_node_path, True, result=methods) - for method in methods: - if method is not None: - node_method_name = method.get_display_name().Text - self.__available_object_resources[device_info["deviceName"]]["methods"].append( - {node_method_name: method, "node": node, "arguments": method_object.get("arguments")}) - else: - self._log.error("Node for method with path %s - NOT FOUND!", method_node_path) - except Exception as e: - self._log.exception(e) + var = path + if isinstance(path, str): + var = self.__client.get_node(path.replace('\\.', '.')) - def __search_attribute_update_variables(self, device_info): + await var.write_value(value) + except Exception as e: + result['error'] = e.__str__() + + async def __read_value(self, path, result={}): try: - if device_info["configuration"].get("attributes_updates", []): - node = device_info["deviceNode"] - device_name = device_info["deviceName"] - if self.__available_object_resources.get(device_name) is None: - self.__available_object_resources[device_name] = {} - if self.__available_object_resources[device_name].get("variables") is None: - self.__available_object_resources[device_name]["variables"] = [] - for attribute_update in device_info["configuration"]["attributes_updates"]: - attribute_path = self._check_path(attribute_update["attributeOnDevice"], node) - attribute_nodes = [] - self.__search_node(node, attribute_path, result=attribute_nodes) - for attribute_node in attribute_nodes: - if attribute_node is not None: - if self.get_node_path(attribute_node) == attribute_path: - self.__available_object_resources[device_name]["variables"].append( - {attribute_update["attributeOnThingsBoard"]: attribute_node}) - else: - self._log.error("Attribute update node with path \"%s\" - NOT FOUND!", attribute_path) + var = self.__client.get_node(path) + result['value'] = await var.read_value() except Exception as e: - self._log.exception(e) + result['error'] = e.__str__() - def __search_general_info(self, device): - result = [] - match_devices = [] - self.__search_node(self.__opcua_nodes["root"], TBUtility.get_value(device["deviceNodePattern"], get_tag=True), - result=match_devices) - if len(match_devices) == 0: - self._log.warning("Device node not found with expression: %s", - TBUtility.get_value(device["deviceNodePattern"], get_tag=True)) - for device_node in match_devices: - if device_node is not None: - result_device_dict = {"deviceName": None, "deviceType": None, "deviceNode": device_node, - "configuration": deepcopy(device)} - name_pattern_config = device["deviceNamePattern"] - name_expression = TBUtility.get_value(name_pattern_config, get_tag=True) - if "${" in name_pattern_config and "}" in name_pattern_config: - self._log.debug("Looking for device name") - device_name_from_node = "" - if name_expression == "$DisplayName": - device_name_from_node = device_node.get_display_name().Text - elif name_expression == "$BrowseName": - device_name_from_node = device_node.get_browse_name().Name - elif name_expression == "$NodeId.Identifier": - device_name_from_node = str(device_node.nodeid.Identifier) - else: - name_path = self._check_path(name_expression, device_node) - device_name_node = [] - self.__search_node(device_node, name_path, result=device_name_node) - if len(device_name_node) == 0: - self._log.warn("Device name node - not found, skipping device...") - continue - device_name_node = device_name_node[0] - if device_name_node is not None: - device_name_from_node = device_name_node.get_value() - if device_name_from_node == "": - self._log.error("Device name node not found with expression: %s", name_expression) - return None - full_device_name = name_pattern_config.replace("${" + name_expression + "}", - str(device_name_from_node)).replace( - name_expression, str(device_name_from_node)) - else: - full_device_name = name_expression - result_device_dict["deviceName"] = full_device_name - self._log.debug("Device name: %s", full_device_name) - if device.get("deviceTypePattern"): - device_type_expression = TBUtility.get_value(device["deviceTypePattern"], - get_tag=True) - if "${" in device_type_expression and "}" in device_type_expression: - type_path = self._check_path(device_type_expression, device_node) - device_type_node = [] - self.__search_node(device_node, type_path, result=device_type_node) - device_type_node = device_type_node[0] - if device_type_node is not None: - device_type = device_type_node.get_value() - full_device_type = device_type_expression.replace("${" + device_type_expression + "}", - device_type).replace( - device_type_expression, - device_type) - else: - self._log.error("Device type node not found with expression: %s", device_type_expression) - full_device_type = "default" - else: - full_device_type = device_type_expression - result_device_dict["deviceType"] = full_device_type - self._log.debug("Device type: %s", full_device_type) - else: - result_device_dict["deviceType"] = "default" - result.append(result_device_dict) - else: - self._log.error("Device node not found with expression (2): %s", - TBUtility.get_value(device["deviceNodePattern"], get_tag=True)) - return result - - @cached(cache=TTLCache(maxsize=1000, ttl=10 * 60)) - def get_node_path(self, node: Node): - return '\\.'.join(node.get_browse_name().Name for node in node.get_path(200000)) - - def __search_node(self, current_node, fullpath, search_method=False, result=None): - if result is None: - result = [] + async def __call_method(self, path, method_name, arguments, result={}): try: - if regex.match(r"ns=\d*;[isgb]=.*", fullpath, regex.IGNORECASE): - if self.__show_map: - self._log.debug("Looking for node with config") - node = self.client.get_node(fullpath) - if node is None: - self._log.warning("NODE NOT FOUND - using configuration %s", fullpath) - else: - self._log.debug("Found in %s", node) - - # this unnecessary code is added to fix the issue with the to_string method of the NodeId class - # and can be deleted after the fix of the issue in the library - if node.nodeid.NodeIdType == ua.NodeIdType.Guid: - node.nodeid = ua.NodeId(UUID(node.nodeid.Identifier), node.nodeid.NamespaceIndex, - nodeidtype=ua.NodeIdType.Guid) - elif node.nodeid.NodeIdType == ua.NodeIdType.ByteString: - node.nodeid = ua.NodeId(node.nodeid.Identifier.encode('utf-8'), node.nodeid.NamespaceIndex, - nodeidtype=ua.NodeIdType.ByteString) - # -------------------------------------------------------------------------------------------------- - - result.append(node) - else: - fullpath_pattern = regex.compile(fullpath) - full1 = fullpath.replace('\\\\.', '.') - # current_node_path = '\\.'.join(char.split(":")[1] for char in current_node.get_path(200000, True)) - current_node_path = self.get_node_path(current_node) - # we are allways the parent - child_node_parent_class = current_node.get_node_class() - new_parent = current_node - for child_node in current_node.get_children(): - new_node_class = child_node.get_node_class() - # this will not change you can do it outside th loop - # basis Description of node.get_parent() function, sometime child_node.get_parent() return None - # new_parent = child_node.get_parent() - # if (new_parent is None): - # child_node_parent_class = current_node.get_node_class() - # else: - # child_node_parent_class = child_node.get_parent().get_node_class() - # current_node_path = '\\.'.join(char.split(":")[1] for char in current_node.get_path(200000, True)) - # new_node_path = '\\\\.'.join(char.split(":")[1] for char in child_node.get_path(200000, True)) - new_node_path = self.get_node_path(child_node) - if child_node_parent_class == ua.NodeClass.View and new_parent is not None: - parent_path = self.get_node_path(new_parent) - # parent_path = '\\.'.join(char.split(":")[1] for char in new_parent.get_path(200000, True)) - fullpath = fullpath.replace(current_node_path, parent_path) - nnp1 = new_node_path.replace('\\\\.', '.') - nnp2 = new_node_path.replace('\\\\', '\\') - if self.__show_map: - self._log.debug("SHOW MAP: Current node path: %s", new_node_path) - regex_fullmatch = regex.fullmatch(fullpath_pattern, nnp1) or \ - nnp2 == full1 or \ - nnp2 == fullpath or \ - nnp1 == full1 - if regex_fullmatch: - if self.__show_map: - self._log.debug("SHOW MAP: Current node path: %s - NODE FOUND", nnp2) - result.append(child_node) - else: - regex_search = fullpath_pattern.fullmatch(nnp1, partial=True) or \ - nnp2 in full1 or \ - nnp1 in full1 - if regex_search: - if self.__show_map: - self._log.debug("SHOW MAP: Current node path: %s - NODE FOUND", new_node_path) - if new_node_class == ua.NodeClass.Object: - if self.__show_map: - self._log.debug("SHOW MAP: Search for %s in %s", fullpath, new_node_path) - self.__search_node(child_node, fullpath, result=result) - elif new_node_class == ua.NodeClass.Variable: - if self.__show_map: - self._log.debug("SHOW MAP: Search for %s in %s", fullpath, new_node_path) - self.__search_node(child_node, fullpath, result=result) - elif new_node_class == ua.NodeClass.Method and search_method: - self._log.debug("Found in %s", new_node_path) - result.append(child_node) - except CancelledError: - self._log.error("Request during search has been canceled by the OPC-UA server.") - except BrokenPipeError: - self._log.error("Broken Pipe. Connection lost.") - except OSError: - self._log.debug("Stop on scanning.") + var = await self.__client.nodes.root.get_child(path) + method_id = '{}:{}'.format(var.nodeid.NamespaceIndex, method_name) + result['result'] = await var.call_method(method_id, *arguments) except Exception as e: - self._log.exception(e) - - def _check_path(self, config_path, node): - if regex.match(r"ns=\d*;[isgb]=.*", config_path, regex.IGNORECASE): - return config_path - if re.search(r"^root", config_path.lower()) is None: - node_path = self.get_node_path(node) - # node_path = '\\\\.'.join(char.split(":")[1] for char in node.get_path(200000, True)) - if config_path[:2] != '\\.': - information_path = node_path + '\\.' + config_path - else: - information_path = node_path + config_path - else: - information_path = config_path - return information_path.replace('\\', '\\\\') - - @property - def subscribed(self): - return self._subscribed - - def get_config(self): - return self.__server_conf - - def get_converters(self): - return [item['converter'] for _, item in self.subscribed.items()] + result['error'] = e.__str__() def update_converter_config(self, converter_name, config): - self._log.debug('Received remote converter configuration update for %s with configuration %s', converter_name, + self.__log.debug('Received remote converter configuration update for %s with configuration %s', converter_name, config) - converters = self.get_converters() - for converter_class_obj in converters: - converter_class_name = converter_class_obj.__class__.__name__ - converter_obj = converter_class_obj - if converter_class_name == converter_name: - converter_obj.config = config - self._log.info('Updated converter configuration for: %s with configuration %s', - converter_name, converter_obj.config) + for device in self.__device_nodes: + if device.converter.__class__.__name__ == converter_name: + device.config.update(config) + device.load_values() + self.__log.info('Updated converter configuration for: %s with configuration %s', + converter_name, device.config) for node_config in self.__server_conf['mapping']: - if node_config['deviceNodePattern'] == config['deviceNodePattern']: + if node_config['deviceNodePattern'] == device.config['deviceNodePattern']: node_config.update(config) self.__gateway.update_connector_config_file(self.name, {'server': self.__server_conf}) -class SubHandler(object): - def __init__(self, connector: OpcUaConnector): - self.connector = connector +class SubHandler: + def __init__(self, queue, logger): + self.__log = logger + self.__queue = queue - def datachange_notification(self, node, val, data): - try: - self.connector._log.debug("Python: New data change event on node %s, with val: %s and data %s", node, val, - str(data)) - subscriptions = list( - filter(lambda node_info: node_info["information_node"] == node, self.connector.subscribed.values())) - for subscription in subscriptions: - converted_data = subscription["converter"].convert( - (subscription["information"], subscription["information_type"]), val, data, - key=subscription.get('key')) - self.connector.statistics['MessagesReceived'] = self.connector.statistics['MessagesReceived'] + 1 - self.connector.data_to_send.append(converted_data) - self.connector.statistics['MessagesSent'] = self.connector.statistics['MessagesSent'] + 1 - self.connector._log.debug("[SUBSCRIPTION] Data to ThingsBoard: %s", converted_data) - except KeyError: - self.connector.scan_nodes_from_config() - except Exception as e: - self.connector._log.exception(e) - - def event_notification(self, event): - try: - self.connector._log.debug("Python: New event %s", event) - except Exception as e: - self.connector._log.exception(e) + def datachange_notification(self, node, _, data): + self.__log.debug("New data change event %s %s", node, data) + self.__queue.put((node, data)) diff --git a/thingsboard_gateway/connectors/opcua/opcua_converter.py b/thingsboard_gateway/connectors/opcua/opcua_converter.py index 5d6df2fc..f05fc76c 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_converter.py @@ -17,5 +17,5 @@ from thingsboard_gateway.connectors.converter import Converter, abstractmethod class OpcUaConverter(Converter): @abstractmethod - def convert(self, config, val, data = None): + def convert(self, config, val): pass diff --git a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py index e6e37ee8..0ac706dc 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py @@ -16,50 +16,79 @@ from time import time from datetime import timezone from thingsboard_gateway.connectors.opcua.opcua_converter import OpcUaConverter -from thingsboard_gateway.tb_utility.tb_utility import TBUtility -from thingsboard_gateway.gateway.statistics_service import StatisticsService +from asyncua.ua.uatypes import LocalizedText, VariantType + +DATA_TYPES = { + 'attributes': 'attributes', + 'timeseries': 'telemetry' +} class OpcUaUplinkConverter(OpcUaConverter): def __init__(self, config, logger): self._log = logger self.__config = config + self.data = { + 'deviceName': self.__config['device_name'], + 'deviceType': self.__config['device_type'], + 'attributes': [], + 'telemetry': [], + } + self._last_node_timestamp = 0 - @property - def config(self): - return self.__config + def clear_data(self): + self.data = { + 'deviceName': self.__config['device_name'], + 'deviceType': self.__config['device_type'], + 'attributes': [], + 'telemetry': [], + } - @config.setter - def config(self, value): - self.__config = value + def get_data(self): + if len(self.data['attributes']) or len(self.data['telemetry']): + data_list = [] + device_names = self.__config.get('device_names') + if device_names: + for device in device_names: + self.data['deviceName'] = device + data_list.append(self.data) - @StatisticsService.CollectStatistics(start_stat_type='receivedBytesFromDevices', - end_stat_type='convertedBytesFromDevice') - def convert(self, config, val, data=None, key=None): - information = config[0] - information_type = config[1] - device_name = self.__config["deviceName"] - result = {"deviceName": device_name, - "deviceType": self.__config.get("deviceType", "OPC-UA Device"), - "attributes": [], - "telemetry": [], } - try: - information_types = {"attributes": "attributes", "timeseries": "telemetry"} - path = TBUtility.get_value(information["path"], get_tag=True) - full_key = key if key else information["key"] - full_value = information["path"].replace("${"+path+"}", str(val)) - if information_type == 'timeseries' and data is not None and not self.__config.get( - 'subOverrideServerTime', False): - # Note: SourceTimestamp and ServerTimestamp may be naive datetime objects, hence for the timestamp() the tz must first be overwritten to UTC (which it is according to the spec) - if data.monitored_item.Value.SourceTimestamp is not None: - timestamp = int(data.monitored_item.Value.SourceTimestamp.replace(tzinfo=timezone.utc).timestamp()*1000) - elif data.monitored_item.Value.ServerTimestamp is not None: - timestamp = int(data.monitored_item.Value.ServerTimestamp.replace(tzinfo=timezone.utc).timestamp()*1000) + return data_list + + return [self.data] + + return None + + def convert(self, config, val): + if not val: + return + + data = val.Value.Value + + if data is not None: + if isinstance(data, LocalizedText): + data = data.Text + elif val.Value.VariantType == VariantType.ExtensionObject: + data = str(data) + elif val.Value.VariantType == VariantType.DateTime: + if data.tzinfo is None: + data = data.replace(tzinfo=timezone.utc) + data = data.isoformat() + + if config['section'] == 'timeseries': + if val.SourceTimestamp and int(val.SourceTimestamp.replace( + tzinfo=timezone.utc).timestamp() * 1000) != self._last_node_timestamp: + timestamp = int(val.SourceTimestamp.replace(tzinfo=timezone.utc).timestamp() * 1000) + self._last_node_timestamp = timestamp + elif val.ServerTimestamp and int(val.ServerTimestamp.replace( + tzinfo=timezone.utc).timestamp() * 1000) != self._last_node_timestamp: + timestamp = int(val.ServerTimestamp.replace(tzinfo=timezone.utc).timestamp() * 1000) + self._last_node_timestamp = timestamp else: - timestamp = int(time()*1000) - result[information_types[information_type]].append({"ts": timestamp, 'values': {full_key: full_value}}) + timestamp = int(time() * 1000) + + self.data[DATA_TYPES[config['section']]].append({'ts': timestamp, 'values': {config['key']: data}}) else: - result[information_types[information_type]].append({full_key: full_value}) - return result - except Exception as e: - self._log.exception(e) + self.data[DATA_TYPES[config['section']]].append({config['key']: data}) + + self._log.debug('Converted data: %s', self.data) diff --git a/thingsboard_gateway/connectors/opcua_asyncio/__init__.py b/thingsboard_gateway/connectors/opcua_asyncio/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/thingsboard_gateway/connectors/opcua_asyncio/opcua_connector.py b/thingsboard_gateway/connectors/opcua_asyncio/opcua_connector.py deleted file mode 100644 index fd3274de..00000000 --- a/thingsboard_gateway/connectors/opcua_asyncio/opcua_connector.py +++ /dev/null @@ -1,611 +0,0 @@ -# Copyright 2024. ThingsBoard -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import re -from random import choice -from string import ascii_lowercase -from threading import Thread -from time import sleep, monotonic -from queue import Queue - -from thingsboard_gateway.connectors.connector import Connector -from thingsboard_gateway.connectors.opcua_asyncio.device import Device -from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader -from thingsboard_gateway.tb_utility.tb_utility import TBUtility -from thingsboard_gateway.tb_utility.tb_logger import init_logger - -try: - import asyncua -except ImportError: - print("OPC-UA library not found") - TBUtility.install_package("asyncua") - import asyncua - -from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256, SecurityPolicyBasic256, \ - SecurityPolicyBasic128Rsa15 -from asyncua.ua.uaerrors import UaStatusCodeError, BadNodeIdUnknown, BadConnectionClosed, \ - BadInvalidState, BadSessionClosed, BadAttributeIdInvalid, BadCommunicationError, BadOutOfService - -DEFAULT_UPLINK_CONVERTER = 'OpcUaUplinkConverter' - -SECURITY_POLICIES = { - "Basic128Rsa15": SecurityPolicyBasic128Rsa15, - "Basic256": SecurityPolicyBasic256, - "Basic256Sha256": SecurityPolicyBasic256Sha256, -} - -MESSAGE_SECURITY_MODES = { - "None": asyncua.ua.MessageSecurityMode.None_, - "Sign": asyncua.ua.MessageSecurityMode.Sign, - "SignAndEncrypt": asyncua.ua.MessageSecurityMode.SignAndEncrypt -} - - -class OpcUaConnectorAsyncIO(Connector, Thread): - def __init__(self, gateway, config, connector_type): - self.statistics = {'MessagesReceived': 0, - 'MessagesSent': 0} - super().__init__() - self._connector_type = connector_type - self.__gateway = gateway - self.__config = config - self.__id = self.__config.get('id') - self.__server_conf = config['server'] - self.name = self.__config.get("name", 'OPC-UA Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))) - self.__log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), - enable_remote_logging=self.__config.get('enableRemoteLogging', False)) - - if "opc.tcp" not in self.__server_conf.get("url"): - self.__opcua_url = "opc.tcp://" + self.__server_conf.get("url") - else: - self.__opcua_url = self.__server_conf.get("url") - - self.__data_to_send = Queue(-1) - self.__sub_data_to_convert = Queue(-1) - - self.__loop = asyncio.new_event_loop() - - self.__client = None - self.__subscription = None - - self.__connected = False - self.__stopped = False - self.daemon = True - - self.__device_nodes = [] - self.__last_poll = 0 - - def open(self): - self.__stopped = False - self.start() - self.__log.info("Starting OPC-UA Connector (Async IO)") - - def get_type(self): - return self._connector_type - - def close(self): - task = self.__loop.create_task(self.__reset_nodes()) - - while not task.done(): - sleep(.2) - - self.__stopped = True - self.__connected = False - self.__log.info('%s has been stopped.', self.get_name()) - self.__log.stop() - - async def __reset_node(self, node): - node['valid'] = False - if node.get('sub_on', False): - try: - if self.__subscription: - await self.__subscription.unsubscribe(node['subscription']) - except: - pass - node['subscription'] = None - node['sub_on'] = False - - async def __reset_nodes(self, device_name=None): - for device in self.__device_nodes: - if device_name is None or device.name == device_name: - for section in ('attributes', 'timeseries'): - for node in device.values.get(section, []): - await self.__reset_node(node) - - if device_name is None and self.__subscription: - try: - await self.__subscription.delete() - except: - pass - self.__subscription = None - - def get_id(self): - return self.__id - - def get_name(self): - return self.name - - def is_connected(self): - return self.__connected - - def is_stopped(self): - return self.__stopped - - def get_config(self): - return self.__server_conf - - def run(self): - data_send_thread = Thread(name='Send Data Thread', target=self.__send_data, daemon=True) - data_send_thread.start() - - if not self.__server_conf.get('disableSubscriptions', False): - sub_data_convert_thread = Thread(name='Sub Data Convert Thread', target=self.__convert_sub_data, - daemon=True) - sub_data_convert_thread.start() - - self.__loop.run_until_complete(self.start_client()) - - async def start_client(self): - while not self.__stopped: - self.__client = asyncua.Client(url=self.__opcua_url, - timeout=self.__server_conf.get('timeoutInMillis', 4000) / 1000) - - if self.__server_conf["identity"].get("type") == "cert.PEM": - await self.__set_auth_settings_by_cert() - if self.__server_conf["identity"].get("username"): - self.__set_auth_settings_by_username() - - try: - async with self.__client: - self.__connected = True - - try: - await self.__client.load_data_type_definitions() - except Exception as e: - self.__log.error("Error on loading type definitions:\n %s", e) - - while not self.__stopped: - if monotonic() - self.__last_poll >= self.__server_conf.get('scanPeriodInMillis', 5000) / 1000: - await self.__scan_device_nodes() - await self.__poll_nodes() - self.__last_poll = monotonic() - - await asyncio.sleep(.2) - except (ConnectionError, BadSessionClosed): - self.__log.warning('Connection lost for %s', self.get_name()) - except asyncio.exceptions.TimeoutError: - self.__log.warning('Failed to connect %s', self.get_name()) - except Exception as e: - self.__log.exception(e) - finally: - await self.__reset_nodes() - self.__connected = False - await asyncio.sleep(1) - - async def __set_auth_settings_by_cert(self): - try: - ca_cert = self.__server_conf["identity"].get("caCert") - private_key = self.__server_conf["identity"].get("privateKey") - cert = self.__server_conf["identity"].get("cert") - policy = self.__server_conf["security"] - mode = self.__server_conf["identity"].get("mode", "SignAndEncrypt") - - if cert is None or private_key is None: - self.__log.exception("Error in ssl configuration - cert or privateKey parameter not found") - raise RuntimeError("Error in ssl configuration - cert or privateKey parameter not found") - - await self.__client.set_security( - SECURITY_POLICIES[policy], - certificate=cert, - private_key=private_key, - server_certificate=ca_cert, - mode=MESSAGE_SECURITY_MODES[mode] - ) - except Exception as e: - self.__log.exception(e) - - def __set_auth_settings_by_username(self): - self.__client.set_user(self.__server_conf["identity"].get("username")) - if self.__server_conf["identity"].get("password"): - self.__client.set_password(self.__server_conf["identity"].get("password")) - - def __load_converter(self, device): - converter_class_name = device.get('converter', DEFAULT_UPLINK_CONVERTER) - module = TBModuleLoader.import_module(self._connector_type, converter_class_name) - - if module: - self.__log.debug('Converter %s for device %s - found!', converter_class_name, self.name) - return module - - self.__log.error("Cannot find converter for %s device", self.name) - return None - - @staticmethod - def is_regex_pattern(pattern): - return not re.fullmatch(pattern, pattern) - - async def __find_nodes(self, node_list, current_parent_node, nodes): - assert len(node_list) > 0 - final = [] - - children = await current_parent_node.get_children() - for node in children: - child_node = await node.read_browse_name() - - if re.fullmatch(re.escape(node_list[0]), child_node.Name) or node_list[0].split(':')[-1] == child_node.Name: - new_nodes = [*nodes, f'{child_node.NamespaceIndex}:{child_node.Name}'] - if len(node_list) == 1: - final.append(new_nodes) - else: - final.extend(await self.__find_nodes(node_list[1:], current_parent_node=node, nodes=new_nodes)) - - return final - - async def find_nodes(self, node_pattern, current_parent_node=None, nodes=[]): - node_list = node_pattern.split('\\.') - - if current_parent_node is None: - current_parent_node = self.__client.nodes.root - - if len(node_list) > 0 and node_list[0].lower() == 'root': - node_list = node_list[1:] - - return await self.__find_nodes(node_list, current_parent_node, nodes) - - async def find_node_name_space_index(self, path): - if isinstance(path, str): - path = path.split('\\.') - - # find unresolved nodes - u_node_count = len(tuple(filter(lambda u_node: len(u_node.split(':')) < 2, path))) - - resolved = path[:-u_node_count] - resolved_level = len(path) - u_node_count - parent_node = await self.__client.nodes.root.get_child(resolved) - - unresolved = path[resolved_level:] - return await self.__find_nodes(unresolved, current_parent_node=parent_node, nodes=resolved) - - async def __scan_device_nodes(self): - existing_devices = list(map(lambda dev: dev.name, self.__device_nodes)) - - scanned_devices = [] - for device in self.__server_conf.get('mapping', []): - nodes = await self.find_nodes(device['deviceNodePattern']) - self.__log.debug('Found devices: %s', nodes) - - device_names = await self._get_device_info_by_pattern(device['deviceNamePattern']) - - for device_name in device_names: - scanned_devices.append(device_name) - if device_name not in existing_devices: - for node in nodes: - converter = self.__load_converter(device) - device_type = await self._get_device_info_by_pattern( - device.get('deviceTypePattern', 'default'), - get_first=True) - device_config = {**device, 'device_name': device_name, 'device_type': device_type} - self.__device_nodes.append( - Device(path=node, name=device_name, config=device_config, - converter=converter(device_config, self.__log), - converter_for_sub=converter(device_config, self.__log) if not self.__server_conf.get( - 'disableSubscriptions', - False) else None, logger=self.__log)) - self.__log.info('Added device node: %s', device_name) - - for device_name in existing_devices: - if device_name not in scanned_devices: - await self.__reset_nodes(device_name) - - self.__log.debug('Device nodes: %s', self.__device_nodes) - - async def _get_device_info_by_pattern(self, pattern, get_first=False): - result = [] - - search_result = re.search(r"\${([A-Za-z.:\\\d]+)}", pattern) - if search_result: - try: - group = search_result.group(0) - node_path = search_result.group(1) - except IndexError: - self.__log.error('Invalid pattern: %s', pattern) - return result - - nodes = await self.find_nodes(node_path) - self.__log.debug('Found device name nodes: %s', nodes) - - for node in nodes: - try: - var = await self.__client.nodes.root.get_child(node) - value = await var.read_value() - result.append(pattern.replace(group, str(value))) - except Exception as e: - self.__log.exception(e) - continue - else: - result.append(pattern) - - return result[0] if len(result) > 0 and get_first else result - - def __convert_sub_data(self): - while not self.__stopped: - if not self.__sub_data_to_convert.empty(): - sub_node, data = self.__sub_data_to_convert.get() - - for device in self.__device_nodes: - for section in ('attributes', 'timeseries'): - for node in device.values.get(section, []): - if node.get('id') == sub_node.__str__(): - device.converter_for_sub.convert(config={'section': section, 'key': node['key']}, - val=data.monitored_item.Value) - converter_data = device.converter_for_sub.get_data() - - if converter_data: - self.__data_to_send.put(*converter_data) - device.converter_for_sub.clear_data() - else: - sleep(.2) - - async def __poll_nodes(self): - for device in self.__device_nodes: - for section in ('attributes', 'timeseries'): - for node in device.values.get(section, []): - try: - path = node.get('qualified_path', node['path']) - if isinstance(path, str) and re.match(r"(ns=\d+;[isgb]=[^}]+)", path): - var = self.__client.get_node(path) - else: - if len(path[-1].split(':')) != 2: - qualified_path = await self.find_node_name_space_index(path) - if len(qualified_path) == 0: - if node.get('valid', True): - self.__log.warning('Node not found; device: %s, key: %s, path: %s', device.name, - node['key'], node['path']) - await self.__reset_node(node) - continue - elif len(qualified_path) > 1: - self.__log.warning( - 'Multiple matching nodes found; device: %s, key: %s, path: %s; %s', device.name, - node['key'], node['path'], qualified_path) - node['qualified_path'] = qualified_path[0] - path = qualified_path[0] - - var = await self.__client.nodes.root.get_child(path) - - if (node.get('valid') is None or - (node.get('valid') and self.__server_conf.get('disableSubscriptions', False))): - value = await var.read_data_value() - device.converter.convert(config={'section': section, 'key': node['key']}, val=value) - - if not self.__server_conf.get('disableSubscriptions', False) and not node.get('sub_on', - False): - if self.__subscription is None: - self.__subscription = await self.__client.create_subscription(1, SubHandler( - self.__sub_data_to_convert, self.__log)) - handle = await self.__subscription.subscribe_data_change(var) - node['subscription'] = handle - node['sub_on'] = True - node['id'] = var.nodeid.to_string() - self.__log.info("Subscribed on data change; device: %s, key: %s, path: %s", device.name, - node['key'], node['id']) - - node['valid'] = True - except ConnectionError: - raise - except (BadNodeIdUnknown, BadConnectionClosed, BadInvalidState, BadAttributeIdInvalid, - BadCommunicationError, BadOutOfService): - if node.get('valid', True): - self.__log.warning('Node not found (2); device: %s, key: %s, path: %s', device.name, - node['key'], node['path']) - await self.__reset_node(node) - except UaStatusCodeError as uae: - if node.get('valid', True): - self.__log.exception('Node status code error: %s', uae) - await self.__reset_node(node) - except Exception as e: - if node.get('valid', True): - self.__log.exception(e) - await self.__reset_node(node) - - converter_data = device.converter.get_data() - if converter_data: - self.__data_to_send.put(*converter_data) - - device.converter.clear_data() - - def __send_data(self): - while not self.__stopped: - if not self.__data_to_send.empty(): - data = self.__data_to_send.get() - self.statistics['MessagesReceived'] = self.statistics['MessagesReceived'] + 1 - self.__log.debug(data) - self.__gateway.send_to_storage(self.get_name(), self.get_id(), data) - self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1 - self.__log.debug('Data to ThingsBoard %s', data) - else: - sleep(.2) - - async def get_shared_attr_node_id(self, path, result={}): - try: - q_path = await self.find_node_name_space_index(path) - var = await self.__client.nodes.root.get_child(q_path[0]) - result['result'] = var - except Exception as e: - result['error'] = e.__str__() - - def on_attributes_update(self, content): - self.__log.debug(content) - try: - device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0] - - for (key, value) in content['data'].items(): - for attr_update in device.config['attributes_updates']: - if attr_update['attributeOnThingsBoard'] == key: - result = {} - task = self.__loop.create_task( - self.get_shared_attr_node_id( - device.path + attr_update['attributeOnDevice'].replace('\\', '').split('.'), result)) - - while not task.done(): - sleep(.1) - - if result.get('error'): - self.__log.error('Node not found! (%s)', result['error']) - return - - node_id = result['result'] - self.__loop.create_task(self.__write_value(node_id, value)) - return - except Exception as e: - self.__log.exception(e) - - def server_side_rpc_handler(self, content): - try: - if content.get('data') is None: - content['data'] = {'params': content['params'], 'method': content['method']} - - rpc_method = content["data"].get("method") - - # check if RPC type is connector RPC (can be only 'get' or 'set') - try: - (connector_type, rpc_method_name) = rpc_method.split('_') - if connector_type == self._connector_type: - rpc_method = rpc_method_name - content['device'] = content['params'].split(' ')[0].split('=')[-1] - except (ValueError, IndexError): - pass - - # firstly check if a method is not service - if rpc_method == 'set' or rpc_method == 'get': - full_path = '' - args_list = [] - device = content.get('device') - - try: - args_list = content['data']['params'].split(';') - - if 'ns' in content['data']['params']: - full_path = ';'.join([item for item in (args_list[0:-1] if rpc_method == 'set' else args_list)]) - else: - full_path = args_list[0].split('=')[-1] - except IndexError: - self.__log.error('Not enough arguments. Expected min 2.') - self.__gateway.send_rpc_reply(device=device, - req_id=content['data'].get('id'), - content={content['data'][ - 'method']: 'Not enough arguments. Expected min 2.', - 'code': 400}) - - result = {} - if rpc_method == 'get': - task = self.__loop.create_task(self.__read_value(full_path, result)) - - while not task.done(): - sleep(.2) - elif rpc_method == 'set': - value = args_list[2].split('=')[-1] - task = self.__loop.create_task(self.__write_value(full_path, value, result)) - - while not task.done(): - sleep(.2) - - self.__gateway.send_rpc_reply(device=device, - req_id=content['data'].get('id'), - content={content['data']['method']: result}) - else: - device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0] - - for rpc in device.config['rpc_methods']: - if rpc['method'] == content["data"]['method']: - arguments_from_config = rpc["arguments"] - arguments = content["data"].get("params") if content["data"].get( - "params") is not None else arguments_from_config - method_name = content['data']['method'] - - try: - result = {} - task = self.__loop.create_task( - self.__call_method(device.path, method_name, arguments, result)) - - while not task.done(): - sleep(.2) - - self.__gateway.send_rpc_reply(content["device"], - content["data"]["id"], - {content["data"]["method"]: result, "code": 200}) - self.__log.debug("method %s result is: %s", rpc['method'], result) - except Exception as e: - self.__log.exception(e) - self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], - {"error": str(e), "code": 500}) - else: - self.__log.error("Method %s not found for device %s", rpc_method, content["device"]) - self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], - {"error": "%s - Method not found" % rpc_method, - "code": 404}) - - except Exception as e: - self.__log.exception(e) - - async def __write_value(self, path, value, result={}): - try: - var = path - if isinstance(path, str): - var = self.__client.get_node(path.replace('\\.', '.')) - - await var.write_value(value) - except Exception as e: - result['error'] = e.__str__() - - async def __read_value(self, path, result={}): - try: - var = self.__client.get_node(path) - result['value'] = await var.read_value() - except Exception as e: - result['error'] = e.__str__() - - async def __call_method(self, path, method_name, arguments, result={}): - try: - var = await self.__client.nodes.root.get_child(path) - method_id = '{}:{}'.format(var.nodeid.NamespaceIndex, method_name) - result['result'] = await var.call_method(method_id, *arguments) - except Exception as e: - result['error'] = e.__str__() - - def update_converter_config(self, converter_name, config): - self.__log.debug('Received remote converter configuration update for %s with configuration %s', converter_name, - config) - for device in self.__device_nodes: - if device.converter.__class__.__name__ == converter_name: - device.config.update(config) - device.load_values() - self.__log.info('Updated converter configuration for: %s with configuration %s', - converter_name, device.config) - - for node_config in self.__server_conf['mapping']: - if node_config['deviceNodePattern'] == device.config['deviceNodePattern']: - node_config.update(config) - - self.__gateway.update_connector_config_file(self.name, {'server': self.__server_conf}) - - -class SubHandler: - def __init__(self, queue, logger): - self.__log = logger - self.__queue = queue - - def datachange_notification(self, node, _, data): - self.__log.debug("New data change event %s %s", node, data) - self.__queue.put((node, data)) diff --git a/thingsboard_gateway/connectors/opcua_asyncio/opcua_converter.py b/thingsboard_gateway/connectors/opcua_asyncio/opcua_converter.py deleted file mode 100644 index f05fc76c..00000000 --- a/thingsboard_gateway/connectors/opcua_asyncio/opcua_converter.py +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2024. ThingsBoard -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from thingsboard_gateway.connectors.converter import Converter, abstractmethod - - -class OpcUaConverter(Converter): - @abstractmethod - def convert(self, config, val): - pass diff --git a/thingsboard_gateway/connectors/opcua_asyncio/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua_asyncio/opcua_uplink_converter.py deleted file mode 100644 index 8ca8e3b8..00000000 --- a/thingsboard_gateway/connectors/opcua_asyncio/opcua_uplink_converter.py +++ /dev/null @@ -1,94 +0,0 @@ -# Copyright 2024. ThingsBoard -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from time import time -from datetime import timezone - -from thingsboard_gateway.connectors.opcua_asyncio.opcua_converter import OpcUaConverter -from asyncua.ua.uatypes import LocalizedText, VariantType - -DATA_TYPES = { - 'attributes': 'attributes', - 'timeseries': 'telemetry' -} - - -class OpcUaUplinkConverter(OpcUaConverter): - def __init__(self, config, logger): - self._log = logger - self.__config = config - self.data = { - 'deviceName': self.__config['device_name'], - 'deviceType': self.__config['device_type'], - 'attributes': [], - 'telemetry': [], - } - self._last_node_timestamp = 0 - - def clear_data(self): - self.data = { - 'deviceName': self.__config['device_name'], - 'deviceType': self.__config['device_type'], - 'attributes': [], - 'telemetry': [], - } - - def get_data(self): - if len(self.data['attributes']) or len(self.data['telemetry']): - data_list = [] - device_names = self.__config.get('device_names') - if device_names: - for device in device_names: - self.data['deviceName'] = device - data_list.append(self.data) - - return data_list - - return [self.data] - - return None - - def convert(self, config, val): - if not val: - return - - data = val.Value.Value - - if data is not None: - if isinstance(data, LocalizedText): - data = data.Text - elif val.Value.VariantType == VariantType.ExtensionObject: - data = str(data) - elif val.Value.VariantType == VariantType.DateTime: - if data.tzinfo is None: - data = data.replace(tzinfo=timezone.utc) - data = data.isoformat() - - if config['section'] == 'timeseries': - if val.SourceTimestamp and int(val.SourceTimestamp.replace( - tzinfo=timezone.utc).timestamp() * 1000) != self._last_node_timestamp: - timestamp = int(val.SourceTimestamp.replace(tzinfo=timezone.utc).timestamp() * 1000) - self._last_node_timestamp = timestamp - elif val.ServerTimestamp and int(val.ServerTimestamp.replace( - tzinfo=timezone.utc).timestamp() * 1000) != self._last_node_timestamp: - timestamp = int(val.ServerTimestamp.replace(tzinfo=timezone.utc).timestamp() * 1000) - self._last_node_timestamp = timestamp - else: - timestamp = int(time() * 1000) - - self.data[DATA_TYPES[config['section']]].append({'ts': timestamp, 'values': {config['key']: data}}) - else: - self.data[DATA_TYPES[config['section']]].append({config['key']: data}) - - self._log.debug('Converted data: %s', self.data) diff --git a/thingsboard_gateway/extensions/opcua_asyncio/__init__.py b/thingsboard_gateway/extensions/opcua_asyncio/__init__.py deleted file mode 100644 index 620ae837..00000000 --- a/thingsboard_gateway/extensions/opcua_asyncio/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2024. ThingsBoard -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 0feb233c..0824213c 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -73,8 +73,7 @@ main_handler = logging.handlers.MemoryHandler(-1) DEFAULT_CONNECTORS = { "mqtt": "MqttConnector", "modbus": "ModbusConnector", - "opcua": "OpcUaConnectorAsyncIO", - "opcua_asyncio": "OpcUaConnectorAsyncIO", + "opcua": "OpcUaConnector", "ble": "BLEConnector", "request": "RequestConnector", "can": "CanConnector", @@ -780,8 +779,8 @@ class TBGatewayService: connector_type = connector["type"].lower() if connector.get("type") is not None else None # can be removed in future releases - if connector_type == 'opcua': - connector_type = 'opcua_asyncio' + if connector_type == 'opcua_asyncio': + connector_type = 'opcua' if connector_type is None: log.error("Connector type is not defined!")