1
0
mirror of https://github.com/FreeOpcUa/opcua-asyncio synced 2025-10-29 17:07:18 +08:00

[HaClient] Backport pre-requisites

- Add the possibility to specify a monitoring mode on subscription creation
- Add client and subscription methods to change the publishing and monitoring mode of an existing subscription
- Add a timeout on connect to force the connection to fail if the server can't process it.
- Add a shortcut for the service_level
This commit is contained in:
Julien Prigent
2020-10-28 03:57:19 -07:00
committed by oroulet
parent 9925e7d098
commit 6c504f2438
9 changed files with 154 additions and 12 deletions

View File

@@ -28,6 +28,7 @@ install:
- pip install pytest --upgrade
- pip install pytest-asyncio
- pip install pytest-mock
- pip install asynctest
- pip install cryptography
- pip install dataclasses
- pip install --editable .

View File

@@ -539,7 +539,7 @@ class Client:
"""
return Node(self.uaclient, nodeid)
async def create_subscription(self, period, handler):
async def create_subscription(self, period, handler, publishing=True):
"""
Create a subscription.
Returns a Subscription object which allows to subscribe to events or data changes on server.
@@ -558,7 +558,7 @@ class Client:
params.RequestedLifetimeCount = 10000
params.RequestedMaxKeepAliveCount = 3000
params.MaxNotificationsPerPublish = 10000
params.PublishingEnabled = True
params.PublishingEnabled = publishing
params.Priority = 0
subscription = Subscription(self.uaclient, params, handler)
await subscription.init()

View File

@@ -266,7 +266,10 @@ class UaClient:
async def connect_socket(self, host: str, port: int):
"""Connect to server socket."""
self.logger.info("opening connection")
await self.loop.create_connection(self._make_protocol, host, port)
# Timeout the connection when the server isn't available
await asyncio.wait_for(
self.loop.create_connection(self._make_protocol, host, port), self._timeout
)
def disconnect_socket(self):
if self.protocol and self.protocol.state == UASocketProtocol.CLOSED:
@@ -695,4 +698,28 @@ class UaClient:
response.ResponseHeader.ServiceResult.check()
return response.Results
async def set_monitoring_mode(self, params) -> ua.uatypes.StatusCode:
"""
Update the subscription monitoring mode
"""
self.logger.info("set_monitoring_mode")
request = ua.SetMonitoringModeRequest()
request.Parameters = params
data = await self.protocol.send_request(request)
response = struct_from_binary(ua.SetMonitoringModeResponse, data)
self.logger.debug(response)
response.ResponseHeader.ServiceResult.check()
return response.Parameters.Results
async def set_publishing_mode(self, params) -> ua.uatypes.StatusCode:
"""
Update the subscription publishing mode
"""
self.logger.info("set_publishing_mode")
request = ua.SetPublishingModeRequest()
request.Parameters = params
data = await self.protocol.send_request(request)
response = struct_from_binary(ua.SetPublishingModeResponse, data)
self.logger.debug(response)
response.ResponseHeader.ServiceResult.check()
return response.Parameters.Results

View File

@@ -27,6 +27,7 @@ class Shortcuts(object):
self.opc_binary = Node(server, ObjectIds.OPCBinarySchema_TypeSystem)
self.base_structure_type = Node(server, ObjectIds.Structure)
self.server_state = Node(server, ObjectIds.Server_ServerStatus_State)
self.service_level = Node(server, ObjectIds.Server_ServiceLevel)
self.HasComponent = Node(server, ObjectIds.HasComponent)
self.HasProperty = Node(server, ObjectIds.HasProperty)
self.Organizes = Node(server, ObjectIds.Organizes)

View File

@@ -159,7 +159,9 @@ class Subscription:
async def subscribe_data_change(self,
nodes: Union[Node, Iterable[Node]],
attr=ua.AttributeIds.Value,
queuesize=0) -> Union[int, List[Union[int, ua.StatusCode]]]:
queuesize=0,
monitoring=ua.MonitoringMode.Reporting,
) -> Union[int, List[Union[int, ua.StatusCode]]]:
"""
Subscribe to data change events of one or multiple nodes.
The default attribute used for the subscription is `Value`.
@@ -176,7 +178,9 @@ class Subscription:
:param queuesize: 0 or 1 for default queue size (shall be 1 - no queuing), n for FIFO queue
:return: Handle for changing/cancelling of the subscription
"""
return await self._subscribe(nodes, attr, queuesize=queuesize)
return await self._subscribe(
nodes, attr, queuesize=queuesize, monitoring=monitoring
)
async def subscribe_events(self,
sourcenode: Node = ua.ObjectIds.Server,
@@ -205,14 +209,20 @@ class Subscription:
evfilter = await get_filter_from_event_type(evtypes)
return await self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize)
async def _subscribe(self, nodes: Union[Node, Iterable[Node]],
attr, mfilter=None, queuesize=0) -> Union[int, List[Union[int, ua.StatusCode]]]:
async def _subscribe(self,
nodes: Union[Node, Iterable[Node]],
attr=ua.AttributeIds.Value,
mfilter=None,
queuesize=0,
monitoring=ua.MonitoringMode.Reporting,
) -> Union[int, List[Union[int, ua.StatusCode]]]:
"""
Private low level method for subscribing.
:param nodes: One Node or an Iterable og Nodes.
:param attr: ua.AttributeId
:param mfilter: MonitoringFilter
:param queuesize: queue size
:param monitoring: ua.MonitoringMode
:return: Integer handle or if multiple Nodes were given a List of Integer handles/ua.StatusCode
"""
is_list = True
@@ -224,7 +234,9 @@ class Subscription:
# Create List of MonitoredItemCreateRequest
mirs = []
for node in nodes:
mir = self._make_monitored_item_request(node, attr, mfilter, queuesize)
mir = self._make_monitored_item_request(
node, attr, mfilter, queuesize, monitoring
)
mirs.append(mir)
# Await MonitoredItemCreateResult
mids = await self.create_monitored_items(mirs)
@@ -236,7 +248,12 @@ class Subscription:
mids[0].check()
return mids[0]
def _make_monitored_item_request(self, node: Node, attr, mfilter, queuesize) -> ua.MonitoredItemCreateRequest:
def _make_monitored_item_request(self,
node: Node,
attr,
mfilter,
queuesize,
monitoring) -> ua.MonitoredItemCreateRequest:
rv = ua.ReadValueId()
rv.NodeId = node.nodeid
rv.AttributeId = attr
@@ -251,7 +268,7 @@ class Subscription:
mparams.Filter = mfilter
mir = ua.MonitoredItemCreateRequest()
mir.ItemToMonitor = rv
mir.MonitoringMode = ua.MonitoringMode.Reporting
mir.MonitoringMode = monitoring
mir.RequestedParameters = mparams
return mir
@@ -370,3 +387,40 @@ class Subscription:
# absolute float value or from 0 to 100 for percentage deadband
deadband_filter.DeadbandValue = deadband_val
return self._subscribe(var, attr, deadband_filter, queuesize)
async def set_monitoring_mode(self, monitoring: ua.MonitoringMode) -> ua.uatypes.StatusCode:
"""
The monitoring mode parameter is used
to enable/disable the sampling of MonitoredItems
(Samples don't queue on the server side)
:param monitoring: The monitoring mode to apply
:return: Return a Set Monitoring Mode Result
"""
node_handles = []
for mi in self._monitored_items.values():
node_handles.append(mi.server_handle)
params = ua.SetMonitoringModeParameters()
params.SubscriptionId = self.subscription_id
params.MonitoredItemIds = node_handles
params.MonitoringMode = monitoring
return await self.server.set_monitoring_mode(params)
async def set_publishing_mode(self, publishing: bool) -> ua.uatypes.StatusCode:
"""
Disable publishing of NotificationMessages for the subscription,
but doesn't discontinue the sending of keep-alive Messages,
nor change the monitoring mode.
:param publishing: The publishing mode to apply
:return: Return a Set Publishing Mode Result
"""
self.logger.info("set_publishing_mode")
params = ua.SetPublishingModeParameters()
params.SubscriptionIds = [self.subscription_id]
params.PublishingEnabled = publishing
result = await self.server.set_publishing_mode(params)
if result[0].is_good():
self.parameters.PublishingEnabled = publishing
return result

View File

@@ -3,3 +3,4 @@ pytest-asyncio
coverage
pytest-cov
pytest-mock
asynctest

View File

@@ -41,5 +41,5 @@ setup(
]
},
setup_requires=[] + pytest_runner,
tests_require=['pytest', 'pytest-mock'],
tests_require=['pytest', 'pytest-mock', 'asynctest'],
)

View File

@@ -17,7 +17,11 @@ Opc = namedtuple('opc', ['opc', 'server'])
def pytest_generate_tests(metafunc):
if 'opc' in metafunc.fixturenames:
mark = metafunc.definition.get_closest_marker('parametrize')
# override the opc parameters when explicilty provided
if getattr(mark, "args", None) and "opc" in mark.args:
pass
elif "opc" in metafunc.fixturenames:
metafunc.parametrize('opc', ['client', 'server'], indirect=True)
elif 'history' in metafunc.fixturenames:
metafunc.parametrize('history', ['dict', 'sqlite'], indirect=True)

View File

@@ -3,6 +3,7 @@ import pytest
from copy import copy
from asyncio import Future, sleep, wait_for, TimeoutError
from datetime import datetime, timedelta
from asynctest import CoroutineMock
import asyncua
from asyncua import ua
@@ -233,6 +234,59 @@ async def test_subscription_data_change(opc):
await sub.unsubscribe(handle1) # sub does not exist anymore
@pytest.mark.parametrize("opc", ["client"], indirect=True)
async def test_create_subscription_publishing(opc):
"""
Test the publishing argument is set during subscription creation
"""
myhandler = MySubHandler()
o = opc.opc.nodes.objects
v = await o.add_variable(3, 'SubscriptionVariable', 123)
# publishing default to True
sub = await opc.opc.create_subscription(100, myhandler)
assert sub.parameters.PublishingEnabled == True
sub = await opc.opc.create_subscription(100, myhandler, publishing=False)
assert sub.parameters.PublishingEnabled == False
@pytest.mark.parametrize("opc", ["client"], indirect=True)
async def test_set_monitoring_mode(opc, mocker):
"""
test set_monitoring_mode parameter for all MIs of a subscription
"""
myhandler = MySubHandler()
o = opc.opc.nodes.objects
monitoring_mode = ua.SetMonitoringModeParameters()
mock_set_monitoring = mocker.patch.object(ua, "SetMonitoringModeParameters", return_value=monitoring_mode)
mock_client_monitoring = mocker.patch("asyncua.client.ua_client.UaClient.set_monitoring_mode", new=CoroutineMock())
v = await o.add_variable(3, 'SubscriptionVariable', 123)
sub = await opc.opc.create_subscription(100, myhandler)
await sub.set_monitoring_mode(ua.MonitoringMode.Disabled)
assert monitoring_mode.MonitoringMode == ua.MonitoringMode.Disabled
await sub.set_monitoring_mode(ua.MonitoringMode.Reporting)
assert monitoring_mode.MonitoringMode == ua.MonitoringMode.Reporting
@pytest.mark.parametrize("opc", ["client"], indirect=True)
async def test_set_publishing_mode(opc, mocker):
"""
test flipping the publishing parameter for an existing subscription
"""
myhandler = MySubHandler()
o = opc.opc.nodes.objects
publishing_mode = ua.SetPublishingModeParameters()
mock_set_monitoring = mocker.patch.object(ua, "SetPublishingModeParameters", return_value=publishing_mode)
mock_client_monitoring = mocker.patch("asyncua.client.ua_client.UaClient.set_publishing_mode", new=CoroutineMock())
v = await o.add_variable(3, 'SubscriptionVariable', 123)
sub = await opc.opc.create_subscription(100, myhandler)
await sub.set_publishing_mode(False)
assert publishing_mode.PublishingEnabled == False
await sub.set_publishing_mode(True)
assert publishing_mode.PublishingEnabled == True
async def test_subscription_data_change_bool(opc):
"""
test subscriptions. This is far too complicated for