1
0
mirror of https://github.com/FreeOpcUa/opcua-asyncio synced 2025-10-29 17:07:18 +08:00

cleanup, respect subscription id 0 to end publish cycle as specified

This commit is contained in:
Christian Bergmiller 2019-05-02 16:58:52 +02:00 committed by oroulet
parent 614249ddfa
commit f3dba9955e
4 changed files with 19 additions and 13 deletions

View File

@ -464,6 +464,7 @@ class UaClient:
"""
Send a PublishRequest to the server.
"""
self.logger.debug('publish %r', acks)
request = ua.PublishRequest()
request.Parameters.SubscriptionAcknowledgements = acks if acks else []
data = await self.protocol.send_request(request, timeout=0)
@ -505,16 +506,21 @@ class UaClient:
# End task
return
except ResponseParseError:
# send publish request to server so he does stop sending notifications
ack = None
continue
# look for matching subscription callback
subscription_id = response.Parameters.SubscriptionId
if not subscription_id:
# The value 0 is used to indicate that there were no Subscriptions defined for which a
# response could be sent. See Spec. Part 4 - Section 5.13.5 "Publish"
# End task
return
try:
callback = self._subscription_callbacks[subscription_id]
except KeyError:
self.logger.warning("Received data for unknown subscription %s active are %s", subscription_id,
self._subscription_callbacks.keys())
self.logger.warning(
"Received data for unknown subscription %s active are %s", subscription_id,
self._subscription_callbacks.keys()
)
else:
try:
callback(response.Parameters)

View File

@ -38,7 +38,7 @@ class SubHandler:
class SubscriptionItemData:
"""
To store useful data from a monitored item
To store useful data from a monitored item.
"""
def __init__(self):
@ -51,7 +51,7 @@ class SubscriptionItemData:
class DataChangeNotif:
"""
To be send to clients for every datachange notification from server
To be send to clients for every datachange notification from server.
"""
def __init__(self, subscription_data, monitored_item):
@ -59,7 +59,7 @@ class DataChangeNotif:
self.subscription_data = subscription_data
def __str__(self):
return "DataChangeNotification({0}, {1})".format(self.subscription_data, self.monitored_item)
return f"DataChangeNotification({self.subscription_data}, {self.monitored_item})"
__repr__ = __str__
@ -73,13 +73,13 @@ class Subscription:
:param server: `InternalSession` or `UAClient`
"""
def __init__(self, server, params, handler, loop=None):
def __init__(self, server, params: ua.CreateSubscriptionParameters, handler: SubHandler, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.logger = logging.getLogger(__name__)
self.server = server
self._client_handle = 200
self._handler = handler
self.parameters = params # move to data class
self._handler: SubHandler = handler
self.parameters: ua.CreateSubscriptionParameters = params # move to data class
self._monitored_items = {}
self.subscription_id = None

View File

@ -59,7 +59,7 @@ class InternalSession:
return result
async def close_session(self, delete_subs=True):
self.logger.info('close session %s')
self.logger.info('close session %s', self.name)
self.state = SessionState.Closed
await self.delete_subscriptions(list(self.subscription_service.subscriptions.keys()))

View File

@ -22,7 +22,6 @@ class InternalSubscription:
self.data: ua.CreateSubscriptionResult = data
self.pub_result_callback = callback
self.monitored_item_srv = MonitoredItemService(self, addressspace)
self.task = None
self._triggered_datachanges: Dict[int, List[ua.MonitoredItemNotification]] = {}
self._triggered_events: Dict[int, List[ua.EventFieldList]] = {}
self._triggered_statuschanges: list = []
@ -45,10 +44,11 @@ class InternalSubscription:
self.logger.info("stopping internal subscription %s", self.data.SubscriptionId)
self._task.cancel()
await self._task
self._task = None
self.monitored_item_srv.delete_all_monitored_items()
def _trigger_publish(self):
if self._task and self.data.RevisedPublishingInterval <= 0.0:
if self._task and self.data.RevisedPublishingInterval <= 0.0: # ToDo: check against Spec.
self.publish_results()
async def _subscription_loop(self):