1
0
mirror of https://github.com/FreeOpcUa/opcua-asyncio synced 2025-10-29 17:07:18 +08:00

refactor: Enhance user manager classes with type hints and abstract base class implementation

This commit is contained in:
Paul Bouillon
2025-04-29 15:52:19 -04:00
parent b7e19a8920
commit ea0c9a763a

View File

@@ -1,18 +1,21 @@
import logging
from pathlib import Path
from typing import Union
from typing import Optional
from cryptography.x509 import Certificate
from asyncua.crypto import uacrypto
from asyncua.crypto.permission_rules import User, UserRole
from asyncua.server.base import AbstractUserManager, AbstractInternalServer
class UserManager:
def get_user(self, iserver, username=None, password=None, certificate=None):
raise NotImplementedError
class PermissiveUserManager:
def get_user(self, iserver, username=None, password=None, certificate=None):
class PermissiveUserManager(AbstractUserManager):
def get_user(
self,
iserver: AbstractInternalServer,
username: Optional[str] = None,
password: Optional[str] = None,
certificate: Optional[Certificate] = None,
) -> User:
"""
Default user_manager, does nothing much but check for admin
"""
@@ -22,15 +25,21 @@ class PermissiveUserManager:
return User(role=UserRole.User)
class CertificateUserManager:
class CertificateUserManager(AbstractUserManager):
"""
Certificate user manager, takes a certificate handler with its associated users and provides those users.
"""
def __init__(self):
self._trusted_certificates = {}
def __init__(self) -> None:
self._trusted_certificates: dict[str, dict] = {}
async def add_role(self, certificate_path: Path, user_role: UserRole, name: str, format: Union[str, None] = None):
async def add_role(
self,
certificate_path: Path,
user_role: UserRole,
name: str,
format: Optional[str] = None,
) -> None:
certificate = await uacrypto.load_certificate(certificate_path, format)
if name is None:
raise KeyError
@@ -44,10 +53,16 @@ class CertificateUserManager:
)
self._trusted_certificates[name] = {"certificate": uacrypto.der_from_x509(certificate), "user": user}
def get_user(self, iserver, username=None, password=None, certificate=None):
def get_user(
self,
iserver: AbstractInternalServer,
username: Optional[str] = None,
password: Optional[str] = None,
certificate: Optional[Certificate] = None,
) -> Optional[User]:
if certificate is None:
return None
correct_users = [
correct_users: list[User] = [
prospective_certificate["user"]
for prospective_certificate in self._trusted_certificates.values()
if certificate == prospective_certificate["certificate"]
@@ -57,8 +72,8 @@ class CertificateUserManager:
else:
return correct_users[0]
async def add_user(self, certificate_path: Path, name: str, format: Union[str, None] = None):
async def add_user(self, certificate_path: Path, name: str, format: Optional[str] = None):
await self.add_role(certificate_path=certificate_path, user_role=UserRole.User, name=name, format=format)
async def add_admin(self, certificate_path: Path, name: str, format: Union[str, None] = None):
async def add_admin(self, certificate_path: Path, name: str, format: Optional[str] = None):
await self.add_role(certificate_path=certificate_path, user_role=UserRole.Admin, name=name, format=format)