1
0
mirror of https://github.com/FreeOpcUa/opcua-asyncio synced 2025-10-29 17:07:18 +08:00

Add type hints to the Node methods (#1432)

* Add type hints to the Node methods

* remove a duplicated import

* make `server` accept as Client
This commit is contained in:
Yuta Okamoto
2023-09-11 22:07:40 +09:00
committed by GitHub
parent 0cd5464240
commit daa74c2803
17 changed files with 328 additions and 148 deletions

View File

@@ -37,7 +37,7 @@ jobs:
ruff check .
- name: mypy
run: |
python -m pip install types-aiofiles types-python-dateutil types-pytz
python -m pip install types-aiofiles types-python-dateutil types-pytz typing-extensions
python -m mypy asyncua/
- name: Test with pytest
run: |

View File

@@ -22,7 +22,6 @@ from ..common.utils import create_nonce, ServiceError
from ..common.ua_utils import value_to_datavalue, copy_dataclass_attr
from ..crypto import uacrypto, security_policies
from ..crypto.validator import CertificateValidatorMethod
from ..crypto.uacrypto import x509
_logger = logging.getLogger(__name__)
@@ -88,7 +87,7 @@ class Client:
self._locale = ["en"]
self._watchdog_intervall = watchdog_intervall
self._closing: bool = False
self.certificate_validator: Optional[CertificateValidatorMethod]= None
self.certificate_validator: Optional[CertificateValidatorMethod] = None
"""hook to validate a certificate, raises a ServiceError when not valid"""
async def __aenter__(self):

View File

@@ -340,7 +340,7 @@ class SecureConnection:
Validates the symmetric header of the message chunk and revolves the
security token if needed.
"""
assert isinstance(security_hdr, ua.SymmetricAlgorithmHeader), f"Expected SymAlgHeader, got: {security_hdr}"
assert isinstance(security_hdr, ua.SymmetricAlgorithmHeader), f"Expected SymAlgHeader, got: {type(security_hdr)}"
if security_hdr.TokenId == self.security_token.TokenId:
return

View File

@@ -1,13 +1,20 @@
import logging
from __future__ import annotations
import logging
from typing import Any, List, Optional
import asyncua
from asyncua import ua
from asyncua.common.session_interface import AbstractSession
from .node_factory import make_node
_logger = logging.getLogger(__name__)
async def copy_node(parent, node, nodeid=None, recursive=True):
async def copy_node(
parent: asyncua.Node, node: asyncua.Node, nodeid: Optional[ua.NodeId] = None, recursive: bool = True
) -> List[asyncua.Node]:
"""
Copy a node or node tree as child of parent node
"""
@@ -18,7 +25,7 @@ async def copy_node(parent, node, nodeid=None, recursive=True):
return [make_node(parent.session, nid) for nid in added_nodeids]
async def _copy_node(session, parent_nodeid, rdesc, nodeid, recursive):
async def _copy_node(session: AbstractSession, parent_nodeid: ua.NodeId, rdesc: ua.ReferenceDescription, nodeid: ua.NodeId, recursive: bool):
addnode = ua.AddNodesItem()
addnode.RequestedNewNodeId = nodeid
addnode.BrowseName = rdesc.BrowseName
@@ -41,11 +48,16 @@ async def _copy_node(session, parent_nodeid, rdesc, nodeid, recursive):
return added_nodes
async def _rdesc_from_node(parent, node):
async def _rdesc_from_node(parent: asyncua.Node, node: asyncua.Node) -> ua.ReferenceDescription:
results = await node.read_attributes([
ua.AttributeIds.NodeClass, ua.AttributeIds.BrowseName, ua.AttributeIds.DisplayName,
])
nclass, qname, dname = [res.Value.Value for res in results]
variants: List[ua.Variant] = []
for res in results:
res.StatusCode.check()
assert res.Value is not None, "Value must not be None if the result is in Good status"
variants.append(res.Value)
nclass, qname, dname = [v.Value for v in variants]
rdesc = ua.ReferenceDescription()
rdesc.NodeId = node.nodeid
rdesc.BrowseName = qname
@@ -61,7 +73,7 @@ async def _rdesc_from_node(parent, node):
return rdesc
async def _read_and_copy_attrs(node_type, struct, addnode):
async def _read_and_copy_attrs(node_type: asyncua.Node, struct: Any, addnode: ua.AddNodesItem) -> None:
names = [name for name in struct.__dict__.keys() if not name.startswith("_") and name not in (
"BodyLength", "TypeId", "SpecifiedAttributes", "Encoding", "IsAbstract", "EventNotifier",
)]
@@ -69,10 +81,12 @@ async def _read_and_copy_attrs(node_type, struct, addnode):
results = await node_type.read_attributes(attrs)
for idx, name in enumerate(names):
if results[idx].StatusCode.is_good():
variant = results[idx].Value
assert variant is not None, "Value must not be None if the result is in Good status"
if name == "Value":
setattr(struct, name, results[idx].Value)
setattr(struct, name, variant)
else:
setattr(struct, name, results[idx].Value.Value)
setattr(struct, name, variant.Value)
else:
_logger.warning(f"Instantiate: while copying attributes from node type {str(node_type)},"
f" attribute {str(name)}, statuscode is {str(results[idx].StatusCode)}")

View File

@@ -2,10 +2,13 @@
Instantiate a new node and its child nodes from a node type.
"""
from __future__ import annotations
import logging
from typing import Union
from typing import List, Optional, Union
import asyncua
from asyncua import ua
from .ua_utils import get_node_supertypes, is_child_present
from .copy_node_util import _rdesc_from_node, _read_and_copy_attrs
@@ -19,7 +22,15 @@ async def is_abstract(node_type) -> bool:
return result.Value.Value
async def instantiate(parent, node_type, nodeid: ua.NodeId = None, bname: Union[str, ua.QualifiedName] = None, dname: ua.LocalizedText = None, idx: int = 0, instantiate_optional: bool = True):
async def instantiate(
parent: asyncua.Node,
node_type: asyncua.Node,
nodeid: Optional[ua.NodeId] = None,
bname: Optional[Union[ua.QualifiedName, str]] = None,
dname: Optional[ua.LocalizedText] = None,
idx: int = 0,
instantiate_optional: bool = True,
) -> List[asyncua.Node]:
"""
instantiate a node type under a parent node.
nodeid and browse name of new node can be specified, or just namespace index

View File

@@ -1,11 +1,16 @@
"""
High level functions to create nodes
"""
from __future__ import annotations
import logging
from enum import Enum
import inspect
from typing import Any, Iterable, List, Optional, Tuple, Union
import asyncua
from asyncua import ua
from asyncua.common.session_interface import AbstractSession
from .instantiate_util import instantiate
from .node_factory import make_node
@@ -40,7 +45,7 @@ def _parse_nodeid_qname(*args):
)
async def create_folder(parent, nodeid, bname):
async def create_folder(parent: asyncua.Node, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str]) -> asyncua.Node:
"""
create a child node folder
arguments are nodeid, browsename
@@ -53,7 +58,13 @@ async def create_folder(parent, nodeid, bname):
)
async def create_object(parent, nodeid, bname, objecttype=None, instantiate_optional=True):
async def create_object(
parent: asyncua.Node,
nodeid: Union[ua.NodeId, str],
bname: Union[ua.QualifiedName, str],
objecttype: Optional[Union[ua.NodeId, int]] = None,
instantiate_optional: bool = True,
) -> asyncua.Node:
"""
create a child node object
arguments are nodeid, browsename, [objecttype]
@@ -62,9 +73,9 @@ async def create_object(parent, nodeid, bname, objecttype=None, instantiate_opti
"""
nodeid, qname = _parse_nodeid_qname(nodeid, bname)
if objecttype is not None:
objecttype = make_node(parent.session, objecttype)
objecttype_node = make_node(parent.session, objecttype)
dname = ua.LocalizedText(qname.Name)
nodes = await instantiate(parent, objecttype, nodeid, bname=qname, dname=dname, instantiate_optional=instantiate_optional)
nodes = await instantiate(parent, objecttype_node, nodeid, bname=qname, dname=dname, instantiate_optional=instantiate_optional)
return nodes[0]
else:
return make_node(
@@ -73,7 +84,14 @@ async def create_object(parent, nodeid, bname, objecttype=None, instantiate_opti
)
async def create_property(parent, nodeid, bname, val, varianttype=None, datatype=None):
async def create_property(
parent: asyncua.Node,
nodeid: Union[ua.NodeId, str],
bname: Union[ua.QualifiedName, str],
val: Any,
varianttype: Optional[ua.VariantType] = None,
datatype: Optional[Union[ua.NodeId, int]] = None,
) -> asyncua.Node:
"""
create a child node property
args are nodeid, browsename, value, [variant type]
@@ -91,7 +109,14 @@ async def create_property(parent, nodeid, bname, val, varianttype=None, datatype
)
async def create_variable(parent, nodeid, bname, val, varianttype=None, datatype=None):
async def create_variable(
parent: asyncua.Node,
nodeid: Union[ua.NodeId, str],
bname: Union[ua.QualifiedName, str],
val: Any,
varianttype: Optional[ua.VariantType] = None,
datatype: Optional[Union[ua.NodeId, int]] = None,
) -> asyncua.Node:
"""
create a child node variable
args are nodeid, browsename, value, [variant type], [data type]
@@ -110,7 +135,9 @@ async def create_variable(parent, nodeid, bname, val, varianttype=None, datatype
)
async def create_variable_type(parent, nodeid, bname, datatype):
async def create_variable_type(
parent: asyncua.Node, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str], datatype: Union[ua.NodeId, int]
) -> asyncua.Node:
"""
Create a new variable type
args are nodeid, browsename and datatype
@@ -128,7 +155,9 @@ async def create_variable_type(parent, nodeid, bname, datatype):
)
async def create_reference_type(parent, nodeid, bname, symmetric=True, inversename=None):
async def create_reference_type(
parent: asyncua.Node, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str], symmetric: bool = True, inversename: Optional[str] = None
) -> asyncua.Node:
"""
Create a new reference type
args are nodeid and browsename
@@ -141,7 +170,7 @@ async def create_reference_type(parent, nodeid, bname, symmetric=True, inversena
)
async def create_object_type(parent, nodeid, bname):
async def create_object_type(parent: asyncua.Node, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str]):
"""
Create a new object type to be instantiated in address space.
arguments are nodeid, browsename
@@ -151,7 +180,7 @@ async def create_object_type(parent, nodeid, bname):
return make_node(parent.session, await _create_object_type(parent.session, parent.nodeid, nodeid, qname))
async def create_method(parent, *args):
async def create_method(parent: asyncua.Node, *args) -> asyncua.Node:
"""
create a child method object
This is only possible on server side!!
@@ -308,7 +337,9 @@ async def _create_variable_type(session, parentnodeid, nodeid, qname, datatype,
return results[0].AddedNodeId
async def create_data_type(parent, nodeid, bname, description=None):
async def create_data_type(
parent: asyncua.Node, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str], description: Optional[str] = None
) -> asyncua.Node:
"""
Create a new data type to be used in new variables, etc ..
arguments are nodeid, browsename
@@ -349,7 +380,7 @@ async def create_data_type(parent, nodeid, bname, description=None):
return make_node(parent.session, new_node_id)
async def create_encoding(parent, nodeid, bname):
async def create_encoding(parent, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str]) -> asyncua.Node:
"""
Create a new encoding object to be instantiated in address space.
arguments are nodeid, browsename
@@ -445,11 +476,11 @@ def _vtype_to_argument(vtype):
return arg
def _guess_datatype(variant):
def _guess_datatype(variant: ua.Variant):
if variant.VariantType == ua.VariantType.ExtensionObject:
if variant.Value is None:
raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
if type(variant.Value) in (list, tuple):
if isinstance(variant.Value, (list, tuple)):
if len(variant.Value) == 0:
raise ua.UaError("Cannot guess DataType from Null ExtensionObject")
extobj = variant.Value[0]
@@ -465,13 +496,15 @@ def _guess_datatype(variant):
return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
async def delete_nodes(session, nodes, recursive=False, delete_target_references=True):
async def delete_nodes(
session: AbstractSession, nodes: Iterable[asyncua.Node], recursive: bool = False, delete_target_references: bool = True
) -> Tuple[List[asyncua.Node], List[ua.StatusCode]]:
"""
Delete specified nodes. Optionally delete recursively all nodes with a
downward hierachic references to the node
return the list of deleted node and the result
"""
nodestodelete = []
nodestodelete: List[ua.DeleteNodesItem] = []
if recursive:
nodes = await _add_childs(nodes)
for mynode in nodes:
@@ -481,10 +514,10 @@ async def delete_nodes(session, nodes, recursive=False, delete_target_references
nodestodelete.append(it)
params = ua.DeleteNodesParameters()
params.NodesToDelete = nodestodelete
return nodes, await session.delete_nodes(params)
return list(nodes), await session.delete_nodes(params)
async def _add_childs(nodes):
async def _add_childs(nodes: Iterable[asyncua.Node]) -> Iterable[asyncua.Node]:
results = []
for mynode in nodes:
results += await _add_childs(await mynode.get_children())

View File

@@ -2,12 +2,16 @@
High level method related functions
"""
from asyncio import iscoroutinefunction
from __future__ import annotations
from asyncio import iscoroutinefunction
from typing import Any, Union
import asyncua
from asyncua import ua
async def call_method(parent, methodid, *args):
async def call_method(parent: asyncua.Node, methodid: Union[ua.NodeId, ua.QualifiedName, str], *args) -> Any:
"""
Call an OPC-UA method. methodid is browse name of child method or the
nodeid of method as a NodeId object
@@ -26,7 +30,7 @@ async def call_method(parent, methodid, *args):
return result.OutputArguments
async def call_method_full(parent, methodid, *args):
async def call_method_full(parent: asyncua.Node, methodid: Union[ua.NodeId, ua.QualifiedName, str], *args) -> ua.CallMethodResult:
"""
Call an OPC-UA method. methodid is browse name of child method or the
nodeid of method as a NodeId object

View File

@@ -3,8 +3,15 @@ High level node object, to access node attribute
and browse address space
"""
from datetime import datetime
import logging
from typing import Union
import sys
from typing import Any, Iterable, List, Optional, Set, Union, overload
if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal
from asyncua import ua
from asyncua.common.session_interface import AbstractSession
@@ -26,14 +33,14 @@ def _check_results(results, reqlen=1):
r.check()
def _to_nodeid(nodeid):
def _to_nodeid(nodeid: Union["Node", ua.NodeId, str, int]) -> ua.NodeId:
if isinstance(nodeid, int):
return ua.TwoByteNodeId(nodeid)
if isinstance(nodeid, Node):
return nodeid.nodeid
if isinstance(nodeid, ua.NodeId):
return nodeid
if type(nodeid) in (str, bytes):
if isinstance(nodeid, str):
return ua.NodeId.from_string(nodeid)
raise ua.UaError(f"Could not resolve '{nodeid}' to a type id")
@@ -78,59 +85,66 @@ class Node:
def __hash__(self):
return self.nodeid.__hash__()
async def read_browse_name(self):
async def read_browse_name(self) -> ua.QualifiedName:
"""
Get browse name of a node. A browse name is a QualifiedName object
composed of a string(name) and a namespace index.
"""
result = await self.read_attribute(ua.AttributeIds.BrowseName)
assert result.Value is not None, "Value must not be None if the result is in Good status"
return result.Value.Value
async def read_display_name(self):
async def read_display_name(self) -> ua.LocalizedText:
"""
get DisplayName attribute of node
"""
result = await self.read_attribute(ua.AttributeIds.DisplayName)
assert result.Value is not None, "Value must not be None if the result is in Good status"
return result.Value.Value
async def read_data_type(self):
async def read_data_type(self) -> ua.NodeId:
"""
get data type of node as NodeId
"""
result = await self.read_attribute(ua.AttributeIds.DataType)
assert result.Value is not None, "Value must not be None if the result is in Good status"
return result.Value.Value
async def read_data_type_as_variant_type(self):
async def read_data_type_as_variant_type(self) -> ua.VariantType:
"""
get data type of node as VariantType
This only works if node is a variable, otherwise type
may not be convertible to VariantType
"""
result = await self.read_attribute(ua.AttributeIds.DataType)
assert result.Value is not None, "Value must not be None if the result is in Good status"
return await data_type_to_variant_type(Node(self.session, result.Value.Value))
async def get_access_level(self):
async def get_access_level(self) -> Set[ua.AccessLevel]:
"""
Get the access level attribute of the node as a set of AccessLevel enum values.
"""
result = await self.read_attribute(ua.AttributeIds.AccessLevel)
assert result.Value is not None, "Value must not be None if the result is in Good status"
return ua.AccessLevel.parse_bitfield(result.Value.Value)
async def get_user_access_level(self):
async def get_user_access_level(self) -> Set[ua.AccessLevel]:
"""
Get the user access level attribute of the node as a set of AccessLevel enum values.
"""
result = await self.read_attribute(ua.AttributeIds.UserAccessLevel)
assert result.Value is not None, "Value must not be None if the result is in Good status"
return ua.AccessLevel.parse_bitfield(result.Value.Value)
async def read_event_notifier(self):
async def read_event_notifier(self) -> Set[ua.EventNotifier]:
"""
Get the event notifier attribute of the node as a set of EventNotifier enum values.
"""
result = await self.read_attribute(ua.AttributeIds.EventNotifier)
assert result.Value is not None, "Value must not be None if the result is in Good status"
return ua.EventNotifier.parse_bitfield(result.Value.Value)
async def set_event_notifier(self, values):
async def set_event_notifier(self, values) -> None:
"""
Set the event notifier attribute.
@@ -139,22 +153,24 @@ class Node:
event_notifier_bitfield = ua.EventNotifier.to_bitfield(values)
await self.write_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(event_notifier_bitfield, ua.VariantType.Byte)))
async def read_node_class(self):
async def read_node_class(self) -> ua.NodeClass:
"""
get node class attribute of node
"""
result = await self.read_attribute(ua.AttributeIds.NodeClass)
assert result.Value is not None, "Value must not be None if the result is in Good status"
return ua.NodeClass(result.Value.Value)
async def read_data_type_definition(self):
async def read_data_type_definition(self) -> ua.DataTypeDefinition:
"""
read data type definition attribute of node
only DataType nodes following spec >= 1.04 have that attribute
"""
result = await self.read_attribute(ua.AttributeIds.DataTypeDefinition)
assert result.Value is not None, "Value must not be None if the result is in Good status"
return result.Value.Value
async def write_data_type_definition(self, sdef: ua.DataTypeDefinition):
async def write_data_type_definition(self, sdef: ua.DataTypeDefinition) -> None:
"""
write data type definition attribute of node
only DataType nodes following spec >= 1.04 have that attribute
@@ -162,14 +178,15 @@ class Node:
v = ua.Variant(sdef, ua.VariantType.ExtensionObject)
await self.write_attribute(ua.AttributeIds.DataTypeDefinition, ua.DataValue(v))
async def read_description(self):
async def read_description(self) -> ua.LocalizedText:
"""
get description attribute class of node
"""
result = await self.read_attribute(ua.AttributeIds.Description)
assert result.Value is not None, "Value must not be None if the result is in Good status"
return result.Value.Value
async def read_value(self):
async def read_value(self) -> Any:
"""
Get value of a node as a python type. Only variables ( and properties) have values.
An exception will be generated for other node types.
@@ -177,11 +194,12 @@ class Node:
Do not modify it if it is a mutable object unless you know what you are doing
"""
result = await self.read_data_value()
assert result.Value is not None, "Value must not be None if the result is in Good status"
return result.Value.Value
get_value = read_value # legacy compatibility
async def read_data_value(self, raise_on_bad_status=True):
async def read_data_value(self, raise_on_bad_status: bool = True) -> ua.DataValue:
"""
Get value of a node as a DataValue object. Only variables (and properties) have values.
An exception will be generated for other node types.
@@ -189,7 +207,7 @@ class Node:
"""
return await self.read_attribute(ua.AttributeIds.Value, None, raise_on_bad_status)
async def write_array_dimensions(self, value):
async def write_array_dimensions(self, value: int) -> None:
"""
Set attribute ArrayDimensions of node
make sure it has the correct data type
@@ -197,28 +215,30 @@ class Node:
v = ua.Variant(value, ua.VariantType.UInt32)
await self.write_attribute(ua.AttributeIds.ArrayDimensions, ua.DataValue(v))
async def read_array_dimensions(self):
async def read_array_dimensions(self) -> int:
"""
Read and return ArrayDimensions attribute of node
"""
res = await self.read_attribute(ua.AttributeIds.ArrayDimensions)
assert res.Value is not None, "Value must not be None if the result is in Good status"
return res.Value.Value
async def write_value_rank(self, value):
async def write_value_rank(self, value: int) -> None:
"""
Set attribute ValueRank of node
"""
v = ua.Variant(value, ua.VariantType.Int32)
await self.write_attribute(ua.AttributeIds.ValueRank, ua.DataValue(v))
async def read_value_rank(self):
async def read_value_rank(self) -> int:
"""
Read and return ValueRank attribute of node
"""
res = await self.read_attribute(ua.AttributeIds.ValueRank)
assert res.Value is not None, "Value must not be None if the result is in Good status"
return ua.ValueRank(res.Value.Value)
async def write_value(self, value, varianttype=None):
async def write_value(self, value: Any, varianttype: Optional[ua.VariantType] = None) -> None:
"""
Write value of a node. Only variables(properties) have values.
An exception will be generated for other node types.
@@ -237,7 +257,7 @@ class Node:
set_data_value = write_value # legacy compatibility
set_value = write_value # legacy compatibility
async def set_writable(self, writable=True):
async def set_writable(self, writable: bool = True) -> None:
"""
Set node as writable by clients.
A node is always writable on server side.
@@ -249,24 +269,26 @@ class Node:
await self.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.CurrentWrite)
await self.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.CurrentWrite)
async def set_attr_bit(self, attr, bit):
async def set_attr_bit(self, attr: ua.AttributeIds, bit: int) -> None:
dv = await self.read_attribute(attr)
assert dv.Value is not None, "Value must not be None if the result is in Good status"
val = ua.ua_binary.set_bit(dv.Value.Value, bit)
await self.write_attribute(attr, ua.DataValue(ua.Variant(val, dv.Value.VariantType)))
async def unset_attr_bit(self, attr, bit):
async def unset_attr_bit(self, attr: ua.AttributeIds, bit: int) -> None:
dv = await self.read_attribute(attr)
assert dv.Value is not None, "Value must not be None if the result is in Good status"
val = ua.ua_binary.unset_bit(dv.Value.Value, bit)
await self.write_attribute(attr, ua.DataValue(ua.Variant(val, dv.Value.VariantType)))
def set_read_only(self):
async def set_read_only(self) -> None:
"""
Set a node as read-only for clients.
A node is always writable on server side.
"""
return self.set_writable(False)
return await self.set_writable(False)
async def write_attribute(self, attributeid, datavalue, indexrange=None):
async def write_attribute(self, attributeid: ua.AttributeIds, datavalue: ua.DataValue, indexrange: Optional[str] = None) -> None:
"""
Set an attribute of a node
attributeid is a member of ua.AttributeIds
@@ -284,11 +306,11 @@ class Node:
result = await self.session.write(params)
result[0].check()
async def write_params(self, params):
async def write_params(self, params: ua.WriteParameters) -> List[ua.StatusCode]:
result = await self.session.write(params)
return result
async def read_attribute(self, attr, indexrange=None, raise_on_bad_status=True):
async def read_attribute(self, attr: ua.AttributeIds, indexrange: Optional[str] = None, raise_on_bad_status: bool = True) -> ua.DataValue:
"""
Read one attribute of a node
attributeid is a member of ua.AttributeIds
@@ -306,7 +328,7 @@ class Node:
result[0].StatusCode.check()
return result[0]
async def read_attributes(self, attrs):
async def read_attributes(self, attrs: Iterable[ua.AttributeIds]) -> List[ua.DataValue]:
"""
Read several attributes of a node
list of DataValue is returned
@@ -321,11 +343,13 @@ class Node:
results = await self.session.read(params)
return results
async def read_params(self, params):
async def read_params(self, params: ua.ReadParameters) -> List[ua.DataValue]:
result = await self.session.read(params)
return result
async def get_children(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified):
async def get_children(
self, refs: int = ua.ObjectIds.HierarchicalReferences, nodeclassmask: ua.NodeClass = ua.NodeClass.Unspecified,
) -> List["Node"]:
"""
Get all children of a node. By default hierarchical references and all node classes are returned.
Other reference types may be given:
@@ -349,38 +373,51 @@ class Node:
"""
return await self.get_referenced_nodes(refs, ua.BrowseDirection.Forward, nodeclassmask)
def get_properties(self):
async def get_properties(self) -> List["Node"]:
"""
return properties of node.
properties are child nodes with a reference of type HasProperty and a NodeClass of Variable
COROUTINE
"""
return self.get_children(refs=ua.ObjectIds.HasProperty, nodeclassmask=ua.NodeClass.Variable)
return await self.get_children(refs=ua.ObjectIds.HasProperty, nodeclassmask=ua.NodeClass.Variable)
def get_variables(self):
async def get_variables(self) -> List["Node"]:
"""
return variables of node.
variables are child nodes with a reference of type HasComponent and a NodeClass of Variable
"""
return self.get_children(refs=ua.ObjectIds.HasComponent, nodeclassmask=ua.NodeClass.Variable)
return await self.get_children(refs=ua.ObjectIds.HasComponent, nodeclassmask=ua.NodeClass.Variable)
def get_methods(self):
async def get_methods(self) -> List["Node"]:
"""
return methods of node.
methods are child nodes with a reference of type HasComponent and a NodeClass of Method
"""
return self.get_children(refs=ua.ObjectIds.HasComponent, nodeclassmask=ua.NodeClass.Method)
return await self.get_children(refs=ua.ObjectIds.HasComponent, nodeclassmask=ua.NodeClass.Method)
async def get_children_descriptions(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True, result_mask=ua.BrowseResultMask.All):
async def get_children_descriptions(
self,
refs: int = ua.ObjectIds.HierarchicalReferences,
nodeclassmask: ua.NodeClass = ua.NodeClass.Unspecified,
includesubtypes: bool = True,
result_mask: ua.BrowseResultMask = ua.BrowseResultMask.All
) -> List[ua.ReferenceDescription]:
return await self.get_references(refs, ua.BrowseDirection.Forward, nodeclassmask, includesubtypes, result_mask)
def get_encoding_refs(self):
return self.get_referenced_nodes(ua.ObjectIds.HasEncoding, ua.BrowseDirection.Forward)
async def get_encoding_refs(self) -> List["Node"]:
return await self.get_referenced_nodes(ua.ObjectIds.HasEncoding, ua.BrowseDirection.Forward)
def get_description_refs(self):
return self.get_referenced_nodes(ua.ObjectIds.HasDescription, ua.BrowseDirection.Forward)
async def get_description_refs(self) -> List["Node"]:
return await self.get_referenced_nodes(ua.ObjectIds.HasDescription, ua.BrowseDirection.Forward)
async def get_references(self, refs=ua.ObjectIds.References, direction=ua.BrowseDirection.Both, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True, result_mask=ua.BrowseResultMask.All):
async def get_references(
self,
refs: int = ua.ObjectIds.References,
direction: ua.BrowseDirection = ua.BrowseDirection.Both,
nodeclassmask: ua.NodeClass = ua.NodeClass.Unspecified,
includesubtypes: bool = True,
result_mask: ua.BrowseResultMask = ua.BrowseResultMask.All
) -> List[ua.ReferenceDescription]:
"""
returns references of the node based on specific filter defined with:
@@ -405,17 +442,24 @@ class Node:
references = await self._browse_next(results)
return references
async def _browse_next(self, results):
references = results[0].References
while results[0].ContinuationPoint:
async def _browse_next(self, results: Iterable[ua.BrowseResult]) -> List[ua.ReferenceDescription]:
head = next(iter(results))
references = head.References
while head.ContinuationPoint:
params = ua.BrowseNextParameters()
params.ContinuationPoints = [results[0].ContinuationPoint]
params.ContinuationPoints = [head.ContinuationPoint]
params.ReleaseContinuationPoints = False
results = await self.session.browse_next(params)
references.extend(results[0].References)
references.extend(head.References)
return references
async def get_referenced_nodes(self, refs=ua.ObjectIds.References, direction=ua.BrowseDirection.Both, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True):
async def get_referenced_nodes(
self,
refs: int = ua.ObjectIds.References,
direction: ua.BrowseDirection = ua.BrowseDirection.Both,
nodeclassmask: ua.NodeClass = ua.NodeClass.Unspecified,
includesubtypes: bool = True,
) -> List["Node"]:
"""
returns referenced nodes based on specific filter
Parameters are the same as for get_references
@@ -428,7 +472,7 @@ class Node:
nodes.append(node)
return nodes
async def read_type_definition(self):
async def read_type_definition(self) -> Optional[ua.NodeId]:
"""
returns type definition of the node.
"""
@@ -437,23 +481,30 @@ class Node:
return None
return references[0].NodeId
async def get_path(self, max_length=20, as_string=False):
@overload
async def get_path(self, max_length: int = 20, as_string: Literal[False] = False) -> List["Node"]:
...
@overload
async def get_path(self, max_length: int = 20, as_string: Literal[True] = True) -> List["str"]:
...
async def get_path(self, max_length: int = 20, as_string: bool = False) -> Union[List["Node"], List[str]]:
"""
Attempt to find path of node from root node and return it as a list of Nodes.
There might several possible paths to a node, this function will return one
Some nodes may be missing references, so this method may
return an empty list
Since address space may have circular references, a max length is specified
"""
path = await self._get_path(max_length)
path = [Node(self.session, ref.NodeId) for ref in path]
path.append(self)
nodes = [Node(self.session, ref.NodeId) for ref in path]
nodes.append(self)
if as_string:
path = [(await el.read_browse_name()).to_string() for el in path]
return path
return [(await el.read_browse_name()).to_string() for el in nodes]
return nodes
async def _get_path(self, max_length=20):
async def _get_path(self, max_length: int = 20) -> List[ua.ReferenceDescription]:
"""
Attempt to find path of node from root node and return it as a list of Nodes.
There might several possible paths to a node, this function will return one
@@ -474,7 +525,7 @@ class Node:
else:
return path
async def get_parent(self):
async def get_parent(self) -> Optional["Node"]:
"""
returns parent of the node.
A Node may have several parents, the first found is returned.
@@ -486,7 +537,21 @@ class Node:
return Node(self.session, refs[0].NodeId)
return None
async def get_child(self, path, return_all=False):
@overload
async def get_child(
self, path: Union[ua.QualifiedName, str, Iterable[Union[ua.QualifiedName, str]]], return_all: Literal[False] = False
) -> "Node":
...
@overload
async def get_child(
self, path: Union[ua.QualifiedName, str, Iterable[Union[ua.QualifiedName, str]]], return_all: Literal[True] = True
) -> List["Node"]:
...
async def get_child(
self, path: Union[ua.QualifiedName, str, Iterable[Union[ua.QualifiedName, str]]], return_all: bool = False
) -> Union["Node", List["Node"]]:
"""
get a child specified by its path from this node.
A path might be:
@@ -495,20 +560,24 @@ class Node:
* a list of string
* a list of qualified names
"""
if type(path) not in (list, tuple):
if isinstance(path, (ua.QualifiedName, str)):
path = [path]
rpath = self._make_relative_path(path)
bpath = ua.BrowsePath()
bpath.StartingNode = self.nodeid
bpath.RelativePath = rpath
result = await self.session.translate_browsepaths_to_nodeids([bpath])
result = result[0]
results = await self.session.translate_browsepaths_to_nodeids([bpath])
result = results[0]
result.StatusCode.check()
if return_all:
return [Node(self.session, target.TargetId) for target in result.Targets]
return Node(self.session, result.Targets[0].TargetId)
async def get_children_by_path(self, paths, raise_on_partial_error=True):
async def get_children_by_path(
self,
paths: Iterable[Union[ua.QualifiedName, str, Iterable[Union[ua.QualifiedName, str]]]],
raise_on_partial_error: bool = True
) -> List[List[Optional["Node"]]]:
"""
get children specified by their paths from this node.
A path might be:
@@ -517,9 +586,9 @@ class Node:
* a list of string
* a list of qualified names
"""
bpaths = []
bpaths: List[ua.BrowsePath] = []
for path in paths:
if type(path) not in (list, tuple):
if isinstance(path, (ua.QualifiedName, str)):
path = [path]
rpath = self._make_relative_path(path)
bpath = ua.BrowsePath()
@@ -541,7 +610,7 @@ class Node:
for result in results
]
def _make_relative_path(self, path):
def _make_relative_path(self, path: Iterable[Union[ua.QualifiedName, str]]) -> ua.RelativePath:
rpath = ua.RelativePath()
for item in path:
el = ua.RelativePathElement()
@@ -555,7 +624,13 @@ class Node:
rpath.Elements.append(el)
return rpath
async def read_raw_history(self, starttime=None, endtime=None, numvalues=0, return_bounds=True):
async def read_raw_history(
self,
starttime: Optional[datetime] = None,
endtime: Optional[datetime] = None,
numvalues: int = 0,
return_bounds: bool = True
) -> List[ua.DataValue]:
"""
Read raw history of a node
result code from server is checked and an exception is raised in case of error
@@ -580,7 +655,7 @@ class Node:
result = await self.history_read(details, continuation_point)
result.StatusCode.check()
continuation_point = result.ContinuationPoint
history.extend(result.HistoryData.DataValues)
history.extend(result.HistoryData.DataValues) # type: ignore[attr-defined]
# No more data available
if continuation_point is None:
break
@@ -589,7 +664,7 @@ class Node:
break
return history
async def history_read(self, details, continuation_point=None):
async def history_read(self, details: ua.ReadRawModifiedDetails, continuation_point: Optional[bytes] = None) -> ua.HistoryReadResult:
"""
Read raw history of a node, low-level function
result code from server is checked and an exception is raised in case of error
@@ -605,7 +680,13 @@ class Node:
params.NodesToRead.append(valueid)
return (await self.session.history_read(params))[0]
async def read_event_history(self, starttime=None, endtime=None, numvalues=0, evtypes=ua.ObjectIds.BaseEventType):
async def read_event_history(
self,
starttime: datetime = None,
endtime: datetime = None,
numvalues: int = 0,
evtypes: Union["Node", ua.NodeId, str, int, Iterable[Union["Node", ua.NodeId, str, int]]] = ua.ObjectIds.BaseEventType
) -> List[Event]:
"""
Read event history of a source node
result code from server is checked and an exception is raised in case of error
@@ -622,19 +703,19 @@ class Node:
else:
details.EndTime = ua.get_win_epoch()
details.NumValuesPerNode = numvalues
if not isinstance(evtypes, (list, tuple)):
if isinstance(evtypes, (Node, ua.NodeId, str, int)):
evtypes = [evtypes]
evtypes = [Node(self.session, evtype) for evtype in evtypes]
evfilter = await get_filter_from_event_type(evtypes)
evtype_nodes = [Node(self.session, evtype) for evtype in evtypes]
evfilter = await get_filter_from_event_type(evtype_nodes)
details.Filter = evfilter
result = await self.history_read_events(details)
result.StatusCode.check()
event_res = []
for res in result.HistoryData.Events:
for res in result.HistoryData.Events: # type: ignore[attr-defined]
event_res.append(Event.from_event_fields(evfilter.SelectClauses, res.EventFields))
return event_res
async def history_read_events(self, details):
async def history_read_events(self, details: Iterable[ua.ReadEventDetails]) -> ua.HistoryReadResult:
"""
Read event history of a node, low-level function
result code from server is checked and an exception is raised in case of error
@@ -649,7 +730,7 @@ class Node:
params.NodesToRead.append(valueid)
return (await self.session.history_read(params))[0]
async def delete(self, delete_references=True, recursive=False):
async def delete(self, delete_references: bool = True, recursive: bool = False) -> List["Node"]:
"""
Delete node from address space
"""
@@ -658,7 +739,7 @@ class Node:
r.check()
return nodes
def _fill_delete_reference_item(self, rdesc, bidirectional=False):
def _fill_delete_reference_item(self, rdesc: ua.ReferenceDescription, bidirectional: bool = False):
ditem = ua.DeleteReferencesItem()
ditem.SourceNodeId = self.nodeid
ditem.TargetNodeId = rdesc.NodeId
@@ -667,7 +748,9 @@ class Node:
ditem.DeleteBidirectional = bidirectional
return ditem
async def delete_reference(self, target, reftype, forward=True, bidirectional=True):
async def delete_reference(
self, target: Union["Node", ua.NodeId, str, int], reftype: int, forward: bool = True, bidirectional: bool = True
) -> None:
"""
Delete given node's references from address space
"""
@@ -682,7 +765,7 @@ class Node:
ditem = self._fill_delete_reference_item(rdesc, bidirectional)
(await self.session.delete_references([ditem]))[0].check()
async def add_reference(self, target, reftype, forward=True, bidirectional=True):
async def add_reference(self, target: Union["Node", ua.NodeId, str, int], reftype, forward=True, bidirectional=True) -> None:
"""
Add reference to node
"""
@@ -702,7 +785,7 @@ class Node:
results = await self.session.add_references(params)
_check_results(results, len(params))
async def set_modelling_rule(self, mandatory: bool):
async def set_modelling_rule(self, mandatory: bool) -> None:
"""
Add a modelling rule reference to Node.
When creating a new object type, its variable and child nodes will not
@@ -717,37 +800,59 @@ class Node:
rule = ua.ObjectIds.ModellingRule_Mandatory if mandatory else ua.ObjectIds.ModellingRule_Optional
await self.add_reference(rule, ua.ObjectIds.HasModellingRule, True, False)
async def add_folder(self, nodeid, bname):
async def add_folder(self, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str]) -> "Node":
return await create_folder(self, nodeid, bname)
async def add_object(self, nodeid, bname, objecttype=None, instantiate_optional=True):
async def add_object(
self,
nodeid: Union[ua.NodeId, str],
bname: Union[ua.QualifiedName, str],
objecttype: Optional[int] = None,
instantiate_optional: bool = True,
) -> "Node":
return await create_object(self, nodeid, bname, objecttype, instantiate_optional)
async def add_variable(self, nodeid, bname, val, varianttype=None, datatype=None):
async def add_variable(
self,
nodeid: Union[ua.NodeId, str],
bname: Union[ua.QualifiedName, str],
val: Any,
varianttype: Optional[ua.VariantType] = None,
datatype: Optional[Union[ua.NodeId, int]] = None,
) -> "Node":
return await create_variable(self, nodeid, bname, val, varianttype, datatype)
async def add_object_type(self, nodeid, bname):
async def add_object_type(self, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str]) -> "Node":
return await create_object_type(self, nodeid, bname)
async def add_variable_type(self, nodeid, bname, datatype):
async def add_variable_type(self, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str], datatype: Union[ua.NodeId, int]) -> "Node":
return await create_variable_type(self, nodeid, bname, datatype)
async def add_data_type(self, nodeid, bname, description=None):
async def add_data_type(self, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str], description: Optional[str] = None) -> "Node":
return await create_data_type(self, nodeid, bname, description=description)
async def add_property(self, nodeid, bname, val, varianttype=None, datatype=None):
async def add_property(
self,
nodeid: Union[ua.NodeId, str],
bname: Union[ua.QualifiedName, str],
val: Any,
varianttype: Optional[ua.VariantType] = None,
datatype: Optional[Union[ua.NodeId, int]] = None,
) -> "Node":
return await create_property(self, nodeid, bname, val, varianttype, datatype)
async def add_method(self, *args):
async def add_method(self, *args) -> "Node":
return await create_method(self, *args)
async def add_reference_type(self, nodeid, bname, symmetric=True, inversename=None):
async def add_reference_type(
self, nodeid: Union[ua.NodeId, str], bname: Union[ua.QualifiedName, str], symmetric: bool = True, inversename: Optional[str] = None
) -> "Node":
return await create_reference_type(self, nodeid, bname, symmetric, inversename)
async def call_method(self, methodid, *args):
async def call_method(self, methodid: Union[ua.NodeId, ua.QualifiedName, str], *args) -> Any:
return await call_method(self, methodid, *args)
async def register(self):
async def register(self) -> None:
"""
Register node for faster read and write access (if supported by server)
Rmw: This call modifies the nodeid of the node, the original nodeid is
@@ -757,7 +862,7 @@ class Node:
self.basenodeid = self.nodeid
self.nodeid = nodeid
async def unregister(self):
async def unregister(self) -> None:
if self.basenodeid is None:
return
await self.session.unregister_nodes([self.nodeid])
@@ -765,7 +870,7 @@ class Node:
self.basenodeid = None
@staticmethod
def new_node(session, nodeid: ua.NodeId):
def new_node(session, nodeid: ua.NodeId) -> "Node":
"""
Helper function to init nodes with out importing Node
"""

View File

@@ -1,6 +1,11 @@
from __future__ import annotations
import asyncua
from asyncua import ua
from asyncua.common.session_interface import AbstractSession
def make_node(session, nodeid):
def make_node(session: AbstractSession, nodeid: ua.NodeId) -> asyncua.Node:
"""
Node factory
Needed no break cyclical import of `Node`

View File

@@ -92,7 +92,7 @@ class AbstractSession(ABC):
'''
@abstractmethod
async def history_read(self, params: ua.HistoryReadParameters) -> ua.HistoryReadResult:
async def history_read(self, params: ua.HistoryReadParameters) -> List[ua.HistoryReadResult]:
'''
https://reference.opcfoundation.org/Core/Part4/v104/5.10.3/

View File

@@ -370,6 +370,7 @@ async def _get_parent_types(node: Node):
async def load_custom_struct(node: Node) -> Any:
sdef = await node.read_data_type_definition()
assert isinstance(sdef, ua.StructureDefinition), f"Expected StructureDefinition, got: {type(sdef)}"
name = (await node.read_browse_name()).Name
for parent in await _get_parent_types(node):
parent_sdef = await parent.read_data_type_definition()

View File

@@ -245,7 +245,7 @@ class Subscription:
"""
sourcenode = Node(self.server, sourcenode)
if evfilter is None:
if type(evtypes) not in (list, tuple) and evtypes == ua.ObjectIds.BaseEventType:
if not isinstance(evtypes, (list, tuple)) and evtypes == ua.ObjectIds.BaseEventType:
# Remove where clause for base event type, for servers that have problems with long WhereClauses.
# Also because BaseEventType wants every event we can ommit it. Issue: #1205
where_clause_generation = False
@@ -313,7 +313,7 @@ class Subscription:
# Return results for multiple nodes
return mids
# Check and return result for single node (raise `UaStatusCodeError` if subscription failed)
if type(mids[0]) == ua.StatusCode:
if isinstance(mids[0], ua.StatusCode):
mids[0].check()
return mids[0] # type: ignore

View File

@@ -2,15 +2,19 @@
from a list of nodes in the address space, build an XML file
format is the one from opc-ua specification
"""
from __future__ import annotations
import logging
import asyncio
import functools
from collections import OrderedDict
from typing import Any, Union
import xml.etree.ElementTree as Et
import base64
from dataclasses import is_dataclass
from enum import Enum
import asyncua
from asyncua import ua
from asyncua.ua.uatypes import type_string_from_type
from asyncua.ua.uaerrors import UaError
@@ -35,7 +39,7 @@ class XmlExporter:
]
}
def __init__(self, server, export_values: bool = False):
def __init__(self, server: Union[asyncua.Server, asyncua.Client], export_values: bool = False):
"""
param: export_values: exports values from variants (CustomDataTypes are not support!)
"""
@@ -442,11 +446,12 @@ class XmlExporter:
val_el = Et.SubElement(el, 'Value')
await self._value_to_etree(val_el, dtype_name, dtype, var.Value.Value)
async def _value_to_etree(self, el, type_name, dtype, val):
async def _value_to_etree(self, el: Et.Element, type_name: str, dtype: ua.NodeId, val: Any) -> None:
if val is None:
return
if isinstance(val, (list, tuple)):
assert isinstance(dtype.Identifier, int), f"Expected int, got {type(dtype.Identifier)}"
if dtype.NamespaceIndex == 0 and dtype.Identifier <= 21:
elname = "uax:ListOf" + type_name
else: # this is an extensionObject:
@@ -470,7 +475,7 @@ class XmlExporter:
else:
await self._extobj_to_etree(el, type_name, dtype, val)
async def _extobj_to_etree(self, val_el, name, dtype, val):
async def _extobj_to_etree(self, val_el: Et.Element, name: str, dtype: ua.NodeId, val: Any) -> None:
if "=" in name:
try:
name = ua.extension_objects_by_datatype[dtype].__name__
@@ -489,7 +494,7 @@ class XmlExporter:
struct_el = Et.SubElement(body_el, "uax:" + name)
await self._all_fields_to_etree(struct_el, val)
async def _all_fields_to_etree(self, struct_el, val):
async def _all_fields_to_etree(self, struct_el: Et.Element, val: Any) -> None:
# TODO: adding the 'ua' module to the globals to resolve the type hints might not be enough.
# it is possible that the type annotations also refere to classes defined in other modules.
for field in fields_with_resolved_types(val, globalns={"ua": ua}):

View File

@@ -2,12 +2,15 @@
add nodes defined in XML to address space
format is the one from opc-ua specification
"""
from __future__ import annotations
import logging
import uuid
from typing import Union, Dict, List, Tuple
from typing import Set, Union, Dict, List, Tuple
from dataclasses import fields, is_dataclass
from asyncua.common.structures104 import load_custom_struct_xml_import, load_enum_xml_import, load_basetype_alias_xml_import
from asyncua import ua
import asyncua
from asyncua import ua, Node
from asyncua.ua.uatypes import type_is_union, types_from_union, type_is_list, type_from_list
from .xmlparser import XMLParser, ua_type_to_python
from ..ua.uaerrors import UaError
@@ -21,7 +24,7 @@ def _parse_version(version_string: str) -> List[int]:
class XmlImporter:
def __init__(self, server, strict_mode=True):
def __init__(self, server: Union[asyncua.Server, asyncua.Client], strict_mode: bool = True):
'''
strict_mode: stop on an error, if False only an error message is logged,
but the import continues
@@ -156,7 +159,7 @@ class XmlImporter:
await self._check_if_namespace_meta_information_is_added()
return nodes
async def _add_missing_reverse_references(self, new_nodes):
async def _add_missing_reverse_references(self, new_nodes: List[Node]) -> Set[Node]:
__unidirectional_types = {ua.ObjectIds.GuardVariableType, ua.ObjectIds.HasGuard,
ua.ObjectIds.TransitionVariableType, ua.ObjectIds.StateMachineType,
ua.ObjectIds.StateVariableType, ua.ObjectIds.TwoStateVariableType,
@@ -176,7 +179,7 @@ class XmlImporter:
dangling_refs_to_missing_nodes.discard(ref.NodeId)
if ref.ReferenceTypeId.NamespaceIndex != 0 or ref.ReferenceTypeId.Identifier not in __unidirectional_types:
ref_key = (new_node_id, ref.NodeId, ref.ReferenceTypeId)
ref_key: RefSpecKey = (new_node_id, ref.NodeId, ref.ReferenceTypeId)
node_reference_map[ref_key] = ref
for node in dangling_refs_to_missing_nodes:

View File

@@ -140,7 +140,7 @@ class InternalSession(AbstractSession):
ServerItemCallback(params, results, user, self.external))
return results
async def history_read(self, params) -> ua.HistoryReadResult:
async def history_read(self, params) -> List[ua.HistoryReadResult]:
return await self.iserver.history_manager.read_history(params)
async def write(self, params):

View File

@@ -20,17 +20,17 @@ logger = logging.getLogger('__name__')
T = TypeVar('T')
def test_bit(data, offset):
def test_bit(data: int, offset: int) -> int:
mask = 1 << offset
return data & mask
def set_bit(data, offset):
def set_bit(data: int, offset: int) -> int:
mask = 1 << offset
return data | mask
def unset_bit(data, offset):
def unset_bit(data: int, offset: int) -> int:
mask = 1 << offset
return data & ~mask

View File

@@ -23,7 +23,7 @@ setup(
packages=find_packages(exclude=["tests"]),
provides=["asyncua"],
license="GNU Lesser General Public License v3 or later",
install_requires=["aiofiles", "aiosqlite", "python-dateutil", "pytz", "cryptography", "sortedcontainers", "importlib-metadata;python_version<'3.8'", "pyOpenSSL"],
install_requires=["aiofiles", "aiosqlite", "python-dateutil", "pytz", "cryptography", "sortedcontainers", "importlib-metadata;python_version<'3.8'", "pyOpenSSL", "typing-extensions"],
classifiers=[
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",