mirror of
https://github.com/FreeOpcUa/opcua-asyncio
synced 2025-10-29 17:07:18 +08:00
make it possible to load one custom structure
This commit is contained in:
parent
c0ddec78bf
commit
55a4223c93
|
|
@ -4,7 +4,7 @@ from datetime import datetime
|
|||
import uuid
|
||||
import logging
|
||||
import re
|
||||
from typing import Union, List, TYPE_CHECKING, Tuple, Optional
|
||||
from typing import Union, List, TYPE_CHECKING, Tuple, Optional, Any
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from asyncua import ua
|
||||
|
|
@ -272,12 +272,37 @@ async def _recursive_parse(server, base_node, dtypes, parent_sdef=None, add_exis
|
|||
await _recursive_parse(server, server.get_node(desc.NodeId), dtypes, parent_sdef=sdef, add_existing=add_existing)
|
||||
|
||||
|
||||
async def _get_parent_types(node: Node):
|
||||
parents = []
|
||||
tmp_node = node
|
||||
for _ in range(10):
|
||||
refs = await tmp_node.get_references(refs=ua.ObjectIds.HasSubtype, direction=ua.BrowseDirection.Inverse)
|
||||
if not refs or refs[0].NodeId.NamespaceIndex == 0 and refs[0].NodeId.Identifier == 22:
|
||||
return parents
|
||||
tmp_node = ua.Node(tmp_node.server, refs[0])
|
||||
parents.append(tmp_node)
|
||||
logger.warning("Went 10 layers up while look of subtype of given node %s, something is wrong: %s", node, parents)
|
||||
|
||||
|
||||
async def load_custom_struct(node: Node) -> Any:
|
||||
sdef = await node.read_data_type_definition()
|
||||
name = (await node.read_browse_name()).Name
|
||||
for parent in await _get_parent_types(node):
|
||||
parent_sdef = await parent.read_data_type_definition()
|
||||
for f in reversed(parent_sdef.fields):
|
||||
sdef.Fields.insert(0, f)
|
||||
env = await _generate_object(name, sdef, data_type=node.nodeid)
|
||||
struct = env[name]
|
||||
ua.register_extension_object(name, sdef.DefaultEncodingId, struct, node.nodeid)
|
||||
return env[name]
|
||||
|
||||
|
||||
async def load_data_type_definitions(server: Union["Server", "Client"], base_node: Node = None, overwrite_existing=False) -> None:
|
||||
"""
|
||||
Read DataTypeDefition attribute on all Structure and Enumeration defined
|
||||
on server and generate Python objects in ua namespace to be used to talk with server
|
||||
"""
|
||||
await load_enums(server) # we need all enums to generate structure code
|
||||
new_objects = await load_enums(server) # we need all enums to generate structure code
|
||||
if base_node is None:
|
||||
base_node = server.nodes.base_structure_type
|
||||
dtypes = []
|
||||
|
|
@ -286,9 +311,11 @@ async def load_data_type_definitions(server: Union["Server", "Client"], base_nod
|
|||
for dts in dtypes:
|
||||
try:
|
||||
env = await _generate_object(dts.name, dts.sdef, data_type=dts.data_type)
|
||||
ua.register_extension_object(dts.name, dts.encoding_id, env[dts.name], dts.desc.NodeId)
|
||||
ua.register_extension_object(dts.name, dts.encoding_id, env[dts.name], dts.data_type)
|
||||
new_objects[dts.name] = env[dts.name]
|
||||
except NotImplementedError:
|
||||
logger.exception("Structure type %s not implemented", dts.sdef)
|
||||
return new_objects
|
||||
|
||||
|
||||
async def _read_data_type_definition(server, desc: ua.BrowseDescription, read_existing: bool = False):
|
||||
|
|
@ -336,6 +363,7 @@ class {name}(IntEnum):
|
|||
async def load_enums(server: Union["Server", "Client"], base_node: Node = None) -> None:
|
||||
if base_node is None:
|
||||
base_node = server.nodes.enum_data_type
|
||||
new_enums = {}
|
||||
for desc in await base_node.get_children_descriptions(refs=ua.ObjectIds.HasSubtype):
|
||||
name = clean_name(desc.BrowseName.Name)
|
||||
if hasattr(ua, name):
|
||||
|
|
@ -346,3 +374,5 @@ async def load_enums(server: Union["Server", "Client"], base_node: Node = None)
|
|||
continue
|
||||
env = await _generate_object(name, edef, enum=True)
|
||||
ua.register_enum(name, desc.NodeId, env[name])
|
||||
new_enums[name] = env[name]
|
||||
return new_enums
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import asyncio
|
|||
import logging
|
||||
|
||||
from asyncua import Client, Node, ua
|
||||
from asyncua.common.structures104 import load_custom_struct
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
_logger = logging.getLogger('asyncua')
|
||||
|
|
@ -21,5 +22,9 @@ async def main():
|
|||
my_struct_opt = await client.nodes.objects.get_child(f"{idx}:my_struct_optional")
|
||||
print("STRUCT WITH OPTIA VALUE", await my_struct_opt.read_value())
|
||||
|
||||
# loading one specific custom struct
|
||||
mystructnode = await client.nodes.base_structure_type.get_child(f"{idx}:MyStruct")
|
||||
my_type = await load_custom_struct(mystructnode)
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
|
|
|
|||
|
|
@ -33,7 +33,10 @@ async def main():
|
|||
"tutu",
|
||||
])
|
||||
|
||||
await server.load_data_type_definitions()
|
||||
custom_objs = await server.load_data_type_definitions()
|
||||
print("Custom objects on server")
|
||||
for name, obj in custom_objs.items():
|
||||
print(" ", obj)
|
||||
|
||||
valnode = await server.nodes.objects.add_variable(idx, "my_enum", ua.MyEnum.toto)
|
||||
await server.nodes.objects.add_variable(idx, "my_struct", ua.Variant(ua.MyStruct(), ua.VariantType.ExtensionObject))
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user