1
0
mirror of https://github.com/FreeOpcUa/opcua-asyncio synced 2025-10-29 17:07:18 +08:00

changes from code review, added docs

This commit is contained in:
Christian Bergmiller 2019-05-02 18:48:37 +02:00 committed by oroulet
parent f3dba9955e
commit ea86c3177e
3 changed files with 17 additions and 11 deletions

View File

@ -7,14 +7,10 @@ from typing import Dict, List
from asyncua import ua
from typing import Optional
from ..ua.ua_binary import struct_from_binary, uatcp_to_binary, struct_to_binary, nodeid_from_binary, header_from_binary
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed
from ..ua.uaerrors import BadTimeout, BadNoSubscription, BadSessionClosed, UaStructParsingError
from ..common.connection import SecureConnection
class ResponseParseError(ValueError):
"""Error while parsing responses."""
class UASocketProtocol(asyncio.Protocol):
"""
Handle socket connection and send ua messages.
@ -442,7 +438,7 @@ class UaClient:
self.logger.info("create_subscription success SubscriptionId %s", response.Parameters.SubscriptionId)
if not self._publish_task or self._publish_task.done():
# Start the publish cycle if it is not yet running
self._publish_task = self.loop.create_task(self._publish_cycle())
self._publish_task = self.loop.create_task(self._publish_loop())
return response.Parameters
async def delete_subscriptions(self, subscription_ids):
@ -473,10 +469,10 @@ class UaClient:
response = struct_from_binary(ua.PublishResponse, data)
except Exception:
self.logger.exception("Error parsing notification from server")
raise ResponseParseError
raise UaStructParsingError
return response
async def _publish_cycle(self):
async def _publish_loop(self):
"""
Start publish cycle that sends a publish request and waits for the publish response in an endless loop.
Forward the `PublishResult` to the matching `Subscription` by callback.
@ -505,7 +501,7 @@ class UaClient:
self.logger.info("BadNoSubscription received, ignoring because it's probably valid.")
# End task
return
except ResponseParseError:
except UaStructParsingError:
ack = None
continue
subscription_id = response.Parameters.SubscriptionId

View File

@ -48,11 +48,17 @@ class InternalSubscription:
self.monitored_item_srv.delete_all_monitored_items()
def _trigger_publish(self):
if self._task and self.data.RevisedPublishingInterval <= 0.0: # ToDo: check against Spec.
"""
Trigger immediate publication (if requested by the PublishingInterval).
"""
if self._task and self.data.RevisedPublishingInterval <= 0.0:
# Publish immediately (as fast as possible)
self.publish_results()
async def _subscription_loop(self):
"""Publication cycle."""
"""
Start the publication loop running at the RevisedPublishingInterval.
"""
try:
while True:
await asyncio.sleep(self.data.RevisedPublishingInterval / 1000.0)

View File

@ -84,3 +84,7 @@ class UaStatusCodeError(_AutoRegister("Meta", (UaError,), {})):
class UaStringParsingError(UaError):
pass
class UaStructParsingError(UaError):
pass