1
0
mirror of https://github.com/FreeOpcUa/opcua-asyncio synced 2025-10-29 17:07:18 +08:00

cleanup, docs, typings

This commit is contained in:
Christian Bergmiller 2019-05-02 21:57:25 +02:00 committed by oroulet
parent ea86c3177e
commit ca635d0bba
9 changed files with 58 additions and 56 deletions

View File

@ -163,7 +163,7 @@ class Subscription:
:param nodes: One Node or an Iterable of Nodes
:param attr: The Node attribute you want to subscribe to
:param queuesize: 0 or 1 for default queue size (shall be 1 - noe queuing), n for FIFO queue
:param queuesize: 0 or 1 for default queue size (shall be 1 - no queuing), n for FIFO queue
:return: Handle for changing/cancelling of the subscription
"""
return await self._subscribe(nodes, attr, queuesize=queuesize)

View File

@ -1,10 +1,10 @@
import logging
from datetime import datetime
import uuid
from typing import Optional
from asyncua import ua
from asyncua import Node
from ..common import events, event_objects
from ..common import events, event_objects, Node
class EventGenerator:
@ -22,8 +22,8 @@ class EventGenerator:
def __init__(self, isession):
self.logger = logging.getLogger(__name__)
self.isession = isession
self.event = None
self.emitting_node = None
self.event: Optional[event_objects.BaseEvent] = None
self.emitting_node: Optional[Node] = None
async def init(self, etype=None, emitting_node=ua.ObjectIds.Server):
node = None

View File

@ -3,6 +3,7 @@ Internal server implementing opcu-ua interface.
Can be used on server side or to implement binary/https opc-ua servers
"""
import asyncio
from datetime import datetime, timedelta
from copy import copy
from struct import unpack_from
@ -47,8 +48,8 @@ class InternalServer:
"""
There is one `InternalServer` for every `Server`.
"""
def __init__(self, loop):
self.loop = loop
def __init__(self, loop: asyncio.AbstractEventLoop):
self.loop: asyncio.AbstractEventLoop = loop
self.logger = logging.getLogger(__name__)
self.server_callback_dispatcher = CallbackDispatcher()
self.endpoints = []
@ -66,9 +67,7 @@ class InternalServer:
self.asyncio_transports = []
self.subscription_service: SubscriptionService = SubscriptionService(self.loop, self.aspace)
self.history_manager = HistoryManager(self)
self.user_manager = default_user_manager # defined at the end of this file
# create a session to use on server side
self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
@ -255,7 +254,7 @@ class InternalServer:
def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
"""
directly write datavalue to the Attribute, bypasing some checks and structure creation
directly write datavalue to the Attribute, bypassing some checks and structure creation
so it is a little faster
"""
self.aspace.set_attribute_value(nodeid, attr, datavalue)
@ -263,7 +262,7 @@ class InternalServer:
def set_user_manager(self, user_manager):
"""
set up a function which that will check for authorize users. Input function takes username
and password as paramters and returns True of user is allowed access, False otherwise.
and password as parameters and returns True of user is allowed access, False otherwise.
"""
self.user_manager = user_manager
@ -271,27 +270,25 @@ class InternalServer:
"""
unpack the username and password for the benefit of the user defined user manager
"""
userName = token.UserName
passwd = token.Password
user_name = token.UserName
password = token.Password
# decrypt password if we can
if str(token.EncryptionAlgorithm) != "None":
if not uacrypto:
return False
try:
if token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-1_5":
raw_pw = uacrypto.decrypt_rsa15(self.private_key, passwd)
raw_pw = uacrypto.decrypt_rsa15(self.private_key, password)
elif token.EncryptionAlgorithm == "http://www.w3.org/2001/04/xmlenc#rsa-oaep":
raw_pw = uacrypto.decrypt_rsa_oaep(self.private_key, passwd)
raw_pw = uacrypto.decrypt_rsa_oaep(self.private_key, password)
else:
self.logger.warning("Unknown password encoding %s", token.EncryptionAlgorithm)
return False
length = unpack_from('<I', raw_pw)[0] - len(isession.nonce)
passwd = raw_pw[4:4 + length]
passwd = passwd.decode('utf-8')
password = raw_pw[4:4 + length]
password = password.decode('utf-8')
except Exception:
self.logger.exception("Unable to decrypt password")
return False
# call user_manager
return self.user_manager(self, isession, userName, passwd)
return self.user_manager(self, isession, user_name, password)

View File

@ -5,6 +5,7 @@ from typing import Coroutine, Iterable, Optional
from asyncua import ua
from ..common.callback import CallbackType, ServerItemCallback
from ..common.utils import create_nonce, ServiceError
from .address_space import AddressSpace
from .users import User
from .subscription_service import SubscriptionService
@ -22,12 +23,13 @@ class InternalSession:
_counter = 10
_auth_counter = 1000
def __init__(self, internal_server, aspace, submgr: SubscriptionService, name, user=User.Anonymous, external=False):
def __init__(self, internal_server, aspace: AddressSpace, submgr: SubscriptionService, name, user=User.Anonymous,
external=False):
self.logger = logging.getLogger(__name__)
self.iserver = internal_server
# define if session is external, we need to copy some objects if it is internal
self.external = external
self.aspace = aspace
self.aspace: AddressSpace = aspace
self.subscription_service: SubscriptionService = submgr
self.name = name
self.user = user
@ -119,7 +121,7 @@ class InternalSession:
result = await self.subscription_service.create_subscription(params, callback)
return result
async def create_monitored_items(self, params):
async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
"""Returns Future"""
subscription_result = await self.subscription_service.create_monitored_items(params)
self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionCreated,

View File

@ -8,6 +8,7 @@ import asyncio
from typing import Union, Iterable, Dict, List
from asyncua import ua
from .monitored_item_service import MonitoredItemService
from .address_space import AddressSpace
class InternalSubscription:
@ -15,13 +16,13 @@ class InternalSubscription:
"""
def __init__(self, subservice, data: ua.CreateSubscriptionResult, addressspace, callback=None):
def __init__(self, loop: asyncio.AbstractEventLoop, data: ua.CreateSubscriptionResult, aspace: AddressSpace,
callback=None):
self.logger = logging.getLogger(__name__)
self.aspace = addressspace
self.subservice = subservice
self.loop: asyncio.AbstractEventLoop = loop
self.data: ua.CreateSubscriptionResult = data
self.pub_result_callback = callback
self.monitored_item_srv = MonitoredItemService(self, addressspace)
self.monitored_item_srv = MonitoredItemService(self, aspace)
self._triggered_datachanges: Dict[int, List[ua.MonitoredItemNotification]] = {}
self._triggered_events: Dict[int, List[ua.EventFieldList]] = {}
self._triggered_statuschanges: list = []
@ -38,7 +39,7 @@ class InternalSubscription:
async def start(self):
self.logger.debug("starting subscription %s", self.data.SubscriptionId)
if self.data.RevisedPublishingInterval > 0.0:
self._task = self.subservice.loop.create_task(self._subscription_loop())
self._task = self.loop.create_task(self._subscription_loop())
async def stop(self):
self.logger.info("stopping internal subscription %s", self.data.SubscriptionId)
@ -162,21 +163,21 @@ class InternalSubscription:
self.logger.info("Error request to re-published non existing ack %s in subscription %s", nb, self)
return ua.NotificationMessage()
def enqueue_datachange_event(self, mid: int, eventdata: ua.MonitoredItemNotification, maxsize):
def enqueue_datachange_event(self, mid: int, eventdata: ua.MonitoredItemNotification, maxsize: int):
"""
Enqueue a monitored item data change.
:param mid: Monitored Item Id
:param eventdata: Monitored Item Notification
:param maxsize:
:param maxsize: Max queue size (0: No limit)
"""
self._enqueue_event(mid, eventdata, maxsize, self._triggered_datachanges)
def enqueue_event(self, mid: int, eventdata: ua.EventFieldList, maxsize):
def enqueue_event(self, mid: int, eventdata: ua.EventFieldList, maxsize: int):
"""
Enqueue a event.
:param mid: Monitored Item Id
:param eventdata: Event Field List
:param maxsize:
:param maxsize: Max queue size (0: No limit)
"""
self._enqueue_event(mid, eventdata, maxsize, self._triggered_events)
@ -188,7 +189,7 @@ class InternalSubscription:
self._triggered_statuschanges.append(code)
self._trigger_publish()
def _enqueue_event(self, mid: int, eventdata: Union[ua.MonitoredItemNotification, ua.EventFieldList], size,
def _enqueue_event(self, mid: int, eventdata: Union[ua.MonitoredItemNotification, ua.EventFieldList], size: int,
queue: dict):
if mid not in queue:
# New Monitored Item Id
@ -196,6 +197,7 @@ class InternalSubscription:
self._trigger_publish()
return
if size != 0:
# Limit queue size
if len(queue[mid]) >= size:
queue[mid].pop(0)
queue[mid].append(eventdata)

View File

@ -4,6 +4,7 @@ server side implementation of a subscription object
import logging
from asyncua import ua
from typing import Dict
from .address_space import AddressSpace
@ -44,9 +45,9 @@ class MonitoredItemService:
self.logger = logging.getLogger(f"{__name__}.{isub.data.SubscriptionId}")
self.isub = isub
self.aspace: AddressSpace = aspace
self._monitored_items = {}
self._monitored_items: Dict[int, MonitoredItemData] = {}
self._monitored_events = {}
self._monitored_datachange = {}
self._monitored_datachange: Dict[int, int] = {}
self._monitored_item_counter = 111
def __str__(self):
@ -55,7 +56,7 @@ class MonitoredItemService:
def delete_all_monitored_items(self):
self.delete_monitored_items([mdata.monitored_item_id for mdata in self._monitored_items.values()])
async def create_monitored_items(self, params):
async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
results = []
for item in params.ItemsToCreate:
if item.ItemToMonitor.AttributeId == ua.AttributeIds.EventNotifier:
@ -65,7 +66,7 @@ class MonitoredItemService:
results.append(result)
return results
def modify_monitored_items(self, params):
def modify_monitored_items(self, params: ua.ModifyMonitoredItemsParameters):
results = []
for item in params.ItemsToModify:
results.append(self._modify_monitored_item(item))
@ -76,7 +77,7 @@ class MonitoredItemService:
variant = self.aspace.get_attribute_value(nodeid, attr)
self.datachange_callback(handle, variant)
def _modify_monitored_item(self, params):
def _modify_monitored_item(self, params: ua.MonitoredItemModifyRequest):
for mdata in self._monitored_items.values():
result = ua.MonitoredItemModifyResult()
if mdata.monitored_item_id == params.MonitoredItemId:
@ -90,7 +91,7 @@ class MonitoredItemService:
result.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
return result
def _commit_monitored_item(self, result, mdata):
def _commit_monitored_item(self, result, mdata: MonitoredItemData):
if result.StatusCode.is_good():
self._monitored_items[result.MonitoredItemId] = mdata
self._monitored_item_counter += 1
@ -110,7 +111,7 @@ class MonitoredItemService:
mdata.filter = params.RequestedParameters.Filter
return result, mdata
def _create_events_monitored_item(self, params):
def _create_events_monitored_item(self, params: ua.MonitoredItemCreateRequest):
self.logger.info("request to subscribe to events for node %s and attribute %s", params.ItemToMonitor.NodeId,
params.ItemToMonitor.AttributeId)
@ -129,7 +130,7 @@ class MonitoredItemService:
self._monitored_events[params.ItemToMonitor.NodeId].append(result.MonitoredItemId)
return result
def _create_data_change_monitored_item(self, params):
def _create_data_change_monitored_item(self, params: ua.MonitoredItemCreateRequest):
self.logger.info("request to subscribe to datachange for node %s and attribute %s", params.ItemToMonitor.NodeId,
params.ItemToMonitor.AttributeId)
@ -155,7 +156,7 @@ class MonitoredItemService:
results.append(self._delete_monitored_items(mid))
return results
def _delete_monitored_items(self, mid):
def _delete_monitored_items(self, mid: int):
if mid not in self._monitored_items:
return ua.StatusCode(ua.StatusCodes.BadMonitoredItemIdInvalid)
for k, v in self._monitored_events.items():
@ -172,7 +173,7 @@ class MonitoredItemService:
self._monitored_items.pop(mid)
return ua.StatusCode()
def datachange_callback(self, handle, value, error=None):
def datachange_callback(self, handle: int, value, error=None):
if error:
self.logger.info("subscription %s: datachange callback called with handle '%s' and error '%s'", self,
handle, error)
@ -214,7 +215,7 @@ class MonitoredItemService:
self._trigger_event(event, mid)
return True
def _trigger_event(self, event, mid):
def _trigger_event(self, event, mid: int):
if mid not in self._monitored_items:
self.logger.debug("Could not find monitored items for id %s for event %s in subscription %s", mid, event,
self)
@ -233,7 +234,7 @@ class MonitoredItemService:
class WhereClauseEvaluator:
def __init__(self, logger, aspace, whereclause):
def __init__(self, logger, aspace: AddressSpace, whereclause):
self.logger = logger
self.elements = whereclause.Elements
self._aspace = aspace

View File

@ -6,10 +6,9 @@ import asyncio
import logging
from datetime import timedelta, datetime
from urllib.parse import urlparse
from typing import Coroutine
from typing import Coroutine, Optional
from asyncua import ua
# from asyncua.binary_server import BinaryServer
from .binary_server_asyncio import BinaryServer
from .internal_server import InternalServer
from .event_generator import EventGenerator
@ -65,8 +64,8 @@ class Server:
:ivar nodes: shortcuts to common nodes - `Shortcuts` instance
"""
def __init__(self, iserver: InternalServer = None, loop=None):
self.loop = loop or asyncio.get_event_loop()
def __init__(self, iserver: InternalServer = None, loop: asyncio.AbstractEventLoop = None):
self.loop: asyncio.AbstractEventLoop = loop or asyncio.get_event_loop()
self.logger = logging.getLogger(__name__)
self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
self._application_uri = "urn:freeopcua:python:server"
@ -76,7 +75,7 @@ class Server:
self.application_type = ua.ApplicationType.ClientAndServer
self.default_timeout: int = 60 * 60 * 1000
self.iserver = iserver if iserver else InternalServer(self.loop)
self.bserver: BinaryServer = None
self.bserver: Optional[BinaryServer] = None
self._discovery_clients = {}
self._discovery_period = 60
self._policies = []

View File

@ -7,6 +7,7 @@ import logging
from typing import Dict, Iterable
from asyncua import ua
from .address_space import AddressSpace
from .internal_subscription import InternalSubscription
@ -16,10 +17,10 @@ class SubscriptionService:
There is one `SubscriptionService` instance for every `Server`/`InternalServer`.
"""
def __init__(self, loop, aspace):
def __init__(self, loop: asyncio.AbstractEventLoop, aspace: AddressSpace):
self.logger = logging.getLogger(__name__)
self.loop = loop
self.aspace = aspace
self.loop: asyncio.AbstractEventLoop = loop
self.aspace: AddressSpace = aspace
self.subscriptions: Dict[int, InternalSubscription] = {}
self._sub_id_counter = 77
@ -35,7 +36,7 @@ class SubscriptionService:
result.RevisedMaxKeepAliveCount = params.RequestedMaxKeepAliveCount
self._sub_id_counter += 1
result.SubscriptionId = self._sub_id_counter
internal_sub = InternalSubscription(self, result, self.aspace, callback)
internal_sub = InternalSubscription(self.loop, result, self.aspace, callback)
await internal_sub.start()
self.subscriptions[result.SubscriptionId] = internal_sub
return result
@ -59,7 +60,7 @@ class SubscriptionService:
for subid, sub in self.subscriptions.items():
sub.publish([ack.SequenceNumber for ack in acks if ack.SubscriptionId == subid])
async def create_monitored_items(self, params):
async def create_monitored_items(self, params: ua.CreateMonitoredItemsParameters):
self.logger.info("create monitored items")
if params.SubscriptionId not in self.subscriptions:
res = []

View File

@ -1,5 +1,5 @@
"""
Implement user managent here
Implement user management here.
"""
from enum import Enum