1
0
mirror of https://github.com/FreeOpcUa/opcua-asyncio synced 2025-10-29 17:07:18 +08:00

Fix #71 (missing publish call in internal server subscriptions) (#80)

* add missing publish call

* added InternalSubscription is_for_client flag

* rename is_for_client to na_acks, use isession.external prop

* docs
This commit is contained in:
cbergmiller
2019-09-02 11:51:16 +02:00
committed by oroulet
parent 52f1453b77
commit e487f7b9aa
6 changed files with 46 additions and 9 deletions

View File

@@ -526,7 +526,7 @@ class UaClient:
else:
try:
callback(response.Parameters)
except Exception: # we call client code, catch everything!
except Exception: # we call user code, catch everything!
self.logger.exception("Exception while calling user callback: %s")
# Repeat with acknowledgement
ack = ua.SubscriptionAcknowledgement()

View File

@@ -119,7 +119,7 @@ class InternalSession:
return self.iserver.method_service.call(params)
async def create_subscription(self, params, callback=None):
result = await self.subscription_service.create_subscription(params, callback)
result = await self.subscription_service.create_subscription(params, callback, external=self.external)
self.subscriptions.append(result.SubscriptionId)
return result

View File

@@ -13,11 +13,19 @@ from .address_space import AddressSpace
class InternalSubscription:
"""
Server internal subscription.
Runs the publication loop and stores the Publication Results until they are acknowledged.
"""
def __init__(self, loop: asyncio.AbstractEventLoop, data: ua.CreateSubscriptionResult, aspace: AddressSpace,
callback=None):
callback=None, no_acks=False):
"""
:param loop: Event loop instance
:param data: Create Subscription Result
:param aspace: Server Address Space
:param callback: Callback for publishing
:param no_acks: If true no acknowledging will be expected (for server internal subscriptions)
"""
self.logger = logging.getLogger(__name__)
self.loop: asyncio.AbstractEventLoop = loop
self.data: ua.CreateSubscriptionResult = data
@@ -32,6 +40,7 @@ class InternalSubscription:
self._keep_alive_count = 0
self._publish_cycles_count = 0
self._task = None
self.no_acks = no_acks
def __str__(self):
return f"Subscription(id:{self.data.SubscriptionId})"
@@ -92,10 +101,14 @@ class InternalSubscription:
self.monitored_item_srv.trigger_statuschange(ua.StatusCode(ua.StatusCodes.BadTimeout))
result = None
if self.has_published_results():
self._publish_cycles_count += 1
if not self.no_acks:
self._publish_cycles_count += 1
result = self._pop_publish_result()
if result is not None:
self.logger.info('publish_results for %s', self.data.SubscriptionId)
# The callback can be:
# Subscription.publish_callback -> server internal subscription
# UaProcessor.forward_publish_response -> client subscription
self.pub_result_callback(result)
def _pop_publish_result(self) -> ua.PublishResult:
@@ -111,7 +124,8 @@ class InternalSubscription:
self._keep_alive_count = 0
self._startup = False
result.NotificationMessage.SequenceNumber = self._notification_seq
if result.NotificationMessage.NotificationData:
if result.NotificationMessage.NotificationData and not self.no_acks:
# Acknowledgement is only expected when the Subscription is for a client.
self._notification_seq += 1
self._not_acknowledged_results[result.NotificationMessage.SequenceNumber] = result
result.MoreNotifications = False

View File

@@ -28,7 +28,7 @@ class SubscriptionService:
def active_subscription_ids(self):
return self.subscriptions.keys()
async def create_subscription(self, params, callback=None):
async def create_subscription(self, params, callback=None, external=False):
self.logger.info("create subscription")
result = ua.CreateSubscriptionResult()
result.RevisedPublishingInterval = params.RequestedPublishingInterval
@@ -36,7 +36,7 @@ class SubscriptionService:
result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
self._sub_id_counter += 1
result.SubscriptionId = self._sub_id_counter
internal_sub = InternalSubscription(self.loop, result, self.aspace, callback)
internal_sub = InternalSubscription(self.loop, result, self.aspace, callback=callback, no_acks=not external)
await internal_sub.start()
self.subscriptions[result.SubscriptionId] = internal_sub
return result

View File

@@ -100,7 +100,6 @@ async def opc(request):
await clt.disconnect()
await srv.stop()
elif request.param == 'server':
# start our own server
# start our own server
srv = Server()
await srv.init()

View File

@@ -637,3 +637,27 @@ async def test_several_different_events_2(opc):
assert ev1s[0].PropertyNum3 is None
await sub.unsubscribe(handle)
await sub.delete()
async def test_internal_server_subscription(opc):
"""
Test that an internal server subscription is handled correctly when
data of a node changes (by external client and internally).
"""
sub_handler = MySubHandler2()
uri = 'http://examples.freeopcua.github.io'
idx = await opc.server.register_namespace(uri)
objects = opc.server.get_objects_node()
sub_obj = await objects.add_object(idx, 'SubTestObject')
sub_var = await sub_obj.add_variable(idx, 'SubTestVariable', 0)
sub = await opc.server.create_subscription(1, sub_handler)
# Server subscribes to own variable data changes
await sub.subscribe_data_change([sub_var])
client_var = await opc.opc.nodes.objects.get_child([f"{idx}:SubTestObject", f"{idx}:SubTestVariable"])
for i in range(10):
await client_var.set_value(i)
await asyncio.sleep(0.01)
assert [v for n, v in sub_handler.results] == list(range(10))
internal_sub = opc.server.iserver.subscription_service.subscriptions[sub.subscription_id]
# Check that the results are not left un-acknowledged on internal Server Subscriptions.
assert len(internal_sub._not_acknowledged_results) == 0