1
0
mirror of https://github.com/FreeOpcUa/opcua-asyncio synced 2025-10-29 17:07:18 +08:00

User management with certificates (#209)

* getting started

* remove ridiculous change to generated code

* remove ridiculous change to generated code

* checkpoint

* another checkpoint

* found the error, tests now pass

* improve error when person with incorrect credentials tries to connect

* add better error handling for when a user does something they aren't permitted to do

* pass tests

* add tests for simple permissions

* add third certificate and some documentation

* clean up pull request

* remove pointless import

* hopefully pass tests

* implement changes requested

* add dataclasses to requirements for python 3.6

* put dataclasses in the travis file instead of the dev_requirements.txt

* implement user manager in iserver instead of using the certificate handler

* uncomment some code

* add permissions=None to base security policy object

* pass crypto connect tests

* fix tests

* hopefully pass all the tests now

* actually pass tests this time

* remove certificate handler and deal with permissions exclusively with CertificateUserManager

* remove commented out code
This commit is contained in:
JoeyFaulkner
2020-06-11 11:33:51 +02:00
committed by GitHub
parent b0b54cb954
commit 47920a7fe0
22 changed files with 578 additions and 155 deletions

View File

@@ -12,5 +12,6 @@ install:
- pip install pytest --upgrade
- pip install pytest-asyncio
- pip install cryptography
- pip install dataclasses
# command to run tests
script: pytest -v -s

View File

@@ -1,23 +0,0 @@
from asyncua.crypto import uacrypto
import sys
import logging
sys.path.append('..')
class CertificateHandler:
def __init__(self):
self._trusted_certificates = {}
async def trust_certificate(self, certificate_path: str, format: str = None, label: str = None):
certificate = await uacrypto.load_certificate(certificate_path, format)
if label is None:
label = certificate_path
if label in self._trusted_certificates:
logging.warning(f"certificate with label {label} "
f"attempted to be added multiple times, only the last version will be kept.")
self._trusted_certificates[label] = uacrypto.der_from_x509(certificate)
def __contains__(self, certificate):
return any(certificate == prospective_cert
for prospective_cert
in self._trusted_certificates.values())

View File

@@ -0,0 +1,69 @@
from asyncua import ua
from asyncua.server.users import UserRole
WRITE_TYPES = [
ua.ObjectIds.WriteRequest_Encoding_DefaultBinary,
ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary,
ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary,
ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary,
ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary,
ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary,
ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary,
ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary,
ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary
]
READ_TYPES = [
ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary,
ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary,
ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary,
ua.ObjectIds.ReadRequest_Encoding_DefaultBinary,
ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary,
ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary,
ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary,
ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary,
ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary,
ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary,
ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary,
ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary,
ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary,
ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary,
ua.ObjectIds.PublishRequest_Encoding_DefaultBinary,
ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary,
ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary,
ua.ObjectIds.CallRequest_Encoding_DefaultBinary,
ua.ObjectIds.SetMonitoringModeRequest_Encoding_DefaultBinary,
ua.ObjectIds.SetPublishingModeRequest_Encoding_DefaultBinary
]
class PermissionRuleset:
"""
Base class for permission ruleset
"""
def check_validity(self, user, action_type, body):
raise NotImplementedError
class SimpleRoleRuleset(PermissionRuleset):
"""
Standard simple role-based ruleset.
Admins alone can write, admins and users can read, and anonymous users can't do anything.
"""
def __init__(self):
write_ids = list(map(ua.NodeId, WRITE_TYPES))
read_ids = list(map(ua.NodeId, READ_TYPES))
self._permission_dict = {
UserRole.Admin: set().union(write_ids, read_ids),
UserRole.User: set().union(read_ids),
UserRole.Anonymous: set()
}
def check_validity(self, user, action_type_id, body):
if action_type_id in self._permission_dict[user.role]:
return True
else:
return False

View File

@@ -2,7 +2,8 @@ import logging
import struct
from abc import ABCMeta, abstractmethod
from ..ua import CryptographyNone, SecurityPolicy, MessageSecurityMode, UaError
from ..ua import CryptographyNone, SecurityPolicy, MessageSecurityMode, UaError, uaerrors
try:
from ..crypto import uacrypto
CRYPTOGRAPHY_AVAILABLE = True
@@ -562,7 +563,8 @@ class SecurityPolicyBasic256Sha256(SecurityPolicy):
def encrypt_asymmetric(pubkey, data):
return uacrypto.encrypt_rsa_oaep(pubkey, data)
def __init__(self, peer_cert, host_cert, client_pk, mode, certificate_handler=None):
def __init__(self, peer_cert, host_cert, client_pk, mode,
permission_ruleset=None):
require_cryptography(self)
if isinstance(peer_cert, bytes):
peer_cert = uacrypto.x509_from_der(peer_cert)
@@ -580,8 +582,11 @@ class SecurityPolicyBasic256Sha256(SecurityPolicy):
self.Mode = mode
self.peer_certificate = uacrypto.der_from_x509(peer_cert)
self.host_certificate = uacrypto.der_from_x509(host_cert)
if certificate_handler:
assert self.peer_certificate in certificate_handler
if permission_ruleset is None:
from asyncua.crypto.permission_rules import SimpleRoleRuleset
permission_ruleset = SimpleRoleRuleset()
self.permissions = permission_ruleset
def make_local_symmetric_key(self, secret, seed):
# specs part 6, 6.7.5

View File

@@ -8,7 +8,7 @@ from concurrent.futures import ThreadPoolExecutor
from functools import partial
from asyncua import ua
from .users import User
from .users import User, UserRole
_logger = logging.getLogger(__name__)
@@ -53,11 +53,11 @@ class AttributeService:
res.append(self._aspace.read_attribute_value(readvalue.NodeId, readvalue.AttributeId))
return res
async def write(self, params, user=User.Admin):
async def write(self, params, user=User(role=UserRole.Admin)):
#self.logger.debug("write %s as user %s", params, user)
res = []
for writevalue in params.NodesToWrite:
if user != User.Admin:
if user.role != UserRole.Admin:
if writevalue.AttributeId != ua.AttributeIds.Value:
res.append(ua.StatusCode(ua.StatusCodes.BadUserAccessDenied))
continue
@@ -199,13 +199,13 @@ class NodeManagementService:
self.logger = logging.getLogger(__name__)
self._aspace: "AddressSpace" = aspace
def add_nodes(self, addnodeitems, user=User.Admin):
def add_nodes(self, addnodeitems, user=User(role=UserRole.Admin)):
results = []
for item in addnodeitems:
results.append(self._add_node(item, user))
return results
def try_add_nodes(self, addnodeitems, user=User.Admin, check=True):
def try_add_nodes(self, addnodeitems, user=User(role=UserRole.Admin), check=True):
for item in addnodeitems:
ret = self._add_node(item, user, check=check)
if not ret.StatusCode.is_good():
@@ -215,7 +215,7 @@ class NodeManagementService:
#self.logger.debug("Adding node %s %s", item.RequestedNewNodeId, item.BrowseName)
result = ua.AddNodesResult()
if not user == User.Admin:
if not user.role == UserRole.Admin:
result.StatusCode = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
return result
@@ -319,14 +319,14 @@ class NodeManagementService:
addref.TargetNodeClass = ua.NodeClass.DataType
self._add_reference_no_check(nodedata, addref)
def delete_nodes(self, deletenodeitems, user=User.Admin):
def delete_nodes(self, deletenodeitems, user=User(role=UserRole.Admin)):
results = []
for item in deletenodeitems.NodesToDelete:
results.append(self._delete_node(item, user))
return results
def _delete_node(self, item, user):
if user != User.Admin:
if user.role != UserRole.Admin:
return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
if item.NodeId not in self._aspace:
@@ -355,13 +355,13 @@ class NodeManagementService:
self.logger.exception("Error calling delete node callback callback %s, %s, %s", nodedata,
ua.AttributeIds.Value, ex)
def add_references(self, refs, user=User.Admin):
def add_references(self, refs, user=User(role=UserRole.Admin)):
result = []
for ref in refs:
result.append(self._add_reference(ref, user))
return result
def try_add_references(self, refs, user=User.Admin):
def try_add_references(self, refs, user=User(role=UserRole.Admin)):
for ref in refs:
if not self._add_reference(ref, user).is_good():
yield ref
@@ -372,7 +372,7 @@ class NodeManagementService:
return ua.StatusCode(ua.StatusCodes.BadSourceNodeIdInvalid)
if addref.TargetNodeId not in self._aspace:
return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
if user != User.Admin:
if user.role != UserRole.Admin:
return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
return self._add_reference_no_check(sourcedata, addref)
@@ -391,10 +391,10 @@ class NodeManagementService:
rdesc.BrowseName = bname
dname = self._aspace.read_attribute_value(addref.TargetNodeId, ua.AttributeIds.DisplayName).Value.Value
if dname:
rdesc.DisplayName = dname
rdesc.DisplayUser = dname
return self._add_unique_reference(sourcedata, rdesc)
def delete_references(self, refs, user=User.Admin):
def delete_references(self, refs, user=User(role=UserRole.Admin)):
result = []
for ref in refs:
result.append(self._delete_reference(ref, user))
@@ -419,7 +419,7 @@ class NodeManagementService:
return ua.StatusCode(ua.StatusCodes.BadTargetNodeIdInvalid)
if item.ReferenceTypeId not in self._aspace:
return ua.StatusCode(ua.StatusCodes.BadReferenceTypeIdInvalid)
if user != User.Admin:
if user.role != UserRole.Admin:
return ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
if item.DeleteBidirectional:

View File

@@ -13,13 +13,14 @@ from urllib.parse import urlparse
from typing import Coroutine
from asyncua import ua
from .user_managers import PermissiveUserManager, UserManager
from ..common.callback import CallbackDispatcher
from ..common.node import Node
from .history import HistoryManager
from .address_space import AddressSpace, AttributeService, ViewService, NodeManagementService, MethodService
from .subscription_service import SubscriptionService
from .standard_address_space import standard_address_space
from .users import User
from .users import User, UserRole
from .internal_session import InternalSession
try:
@@ -28,6 +29,8 @@ except ImportError:
logging.getLogger(__name__).warning("cryptography is not installed, use of crypto disabled")
uacrypto = False
logger = logging.getLogger()
class ServerDesc:
def __init__(self, serv, cap=None):
@@ -35,21 +38,12 @@ class ServerDesc:
self.Capabilities = cap
def default_user_manager(iserver, isession, username, password):
"""
Default user_manager, does nothing much but check for admin
"""
if iserver.allow_remote_admin and username in ("admin", "Admin"):
isession.user = User.Admin
return True
class InternalServer:
"""
There is one `InternalServer` for every `Server`.
"""
def __init__(self, loop: asyncio.AbstractEventLoop):
def __init__(self, loop: asyncio.AbstractEventLoop, user_manager: UserManager = None):
self.loop: asyncio.AbstractEventLoop = loop
self.logger = logging.getLogger(__name__)
self.server_callback_dispatcher = CallbackDispatcher()
@@ -68,9 +62,13 @@ class InternalServer:
self.asyncio_transports = []
self.subscription_service: SubscriptionService = SubscriptionService(self.loop, self.aspace)
self.history_manager = HistoryManager(self)
self.user_manager = default_user_manager # defined at the end of this file
if user_manager is None:
logger.warning("No user manager specified. Using default permissive manager instead.")
user_manager = PermissiveUserManager()
self.user_manager = user_manager
# create a session to use on server side
self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal", user=User.Admin)
self.isession = InternalSession(self, self.aspace, self.subscription_service, "Internal",
user=User(role=UserRole.Admin))
self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
async def init(self, shelffile=None):
@@ -228,7 +226,7 @@ class InternalServer:
def register_server2(self, params):
return self.register_server(params.Server, params.DiscoveryConfiguration)
def create_session(self, name, user=User.Anonymous, external=False):
def create_session(self, name, user=User(role=UserRole.Anonymous), external=False):
return InternalSession(self, self.aspace, self.subscription_service, name, user=user, external=external)
async def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
@@ -300,6 +298,7 @@ class InternalServer:
"""
user_name = token.UserName
password = token.Password
# decrypt password if we can
if str(token.EncryptionAlgorithm) != "None":
if not uacrypto:

View File

@@ -6,7 +6,7 @@ from asyncua import ua
from ..common.callback import CallbackType, ServerItemCallback
from ..common.utils import create_nonce, ServiceError
from .address_space import AddressSpace
from .users import User
from .users import User, UserRole
from .subscription_service import SubscriptionService
@@ -25,7 +25,8 @@ class InternalSession:
_counter = 10
_auth_counter = 1000
def __init__(self, internal_server, aspace: AddressSpace, submgr: SubscriptionService, name, user=User.Anonymous, external=False):
def __init__(self, internal_server, aspace: AddressSpace, submgr: SubscriptionService, name,
user=User(role=UserRole.Anonymous), external=False):
self.logger = logging.getLogger(__name__)
self.iserver = internal_server
# define if session is external, we need to copy some objects if it is internal
@@ -46,7 +47,6 @@ class InternalSession:
def __str__(self):
return f'InternalSession(name:{self.name}, user:{self.user}, id:{self.session_id}, auth_token:{self.auth_token})'
async def get_endpoints(self, params=None, sockname=None):
return await self.iserver.get_endpoints(params, sockname)
@@ -72,7 +72,7 @@ class InternalSession:
self.state = SessionState.Closed
await self.delete_subscriptions(self.subscriptions)
def activate_session(self, params):
def activate_session(self, params, peer_certificate):
self.logger.info('activate session')
result = ua.ActivateSessionResult()
if self.state != SessionState.Created:
@@ -86,9 +86,19 @@ class InternalSession:
self.state = SessionState.Activated
InternalSession._current_connections += 1
id_token = params.UserIdentityToken
if isinstance(id_token, ua.UserNameIdentityToken):
if self.iserver.check_user_token(self, id_token) is False:
if self.iserver.user_manager is not None:
if isinstance(id_token, ua.UserNameIdentityToken):
username = id_token.UserName
password = id_token.Password
else:
username, password = None, None
user = self.iserver.user_manager.get_user(self.iserver, username=username, password=password,
certificate=peer_certificate)
if user is None:
raise ServiceError(ua.StatusCodes.BadUserAccessDenied)
else:
self.user = user
self.logger.info("Activated internal session %s for user %s", self.name, self.user)
return result
@@ -100,7 +110,12 @@ class InternalSession:
return await self.iserver.history_manager.read_history(params)
async def write(self, params):
return await self.iserver.attribute_service.write(params, self.user)
if self.user is None:
user = User()
else:
user = self.user
write_result = await self.iserver.attribute_service.write(params, user=user)
return write_result
async def browse(self, params):
return self.iserver.view_service.browse(params)
@@ -136,13 +151,13 @@ class InternalSession:
"""Returns Future"""
subscription_result = await self.subscription_service.create_monitored_items(params)
self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionCreated,
ServerItemCallback(params, subscription_result))
ServerItemCallback(params, subscription_result))
return subscription_result
async def modify_monitored_items(self, params):
subscription_result = self.subscription_service.modify_monitored_items(params)
self.iserver.server_callback_dispatcher.dispatch(CallbackType.ItemSubscriptionModified,
ServerItemCallback(params, subscription_result))
ServerItemCallback(params, subscription_result))
return subscription_result
def republish(self, params):

View File

@@ -72,7 +72,7 @@ class Server:
:ivar nodes: shortcuts to common nodes - `Shortcuts` instance
"""
def __init__(self, iserver: InternalServer = None, loop: asyncio.AbstractEventLoop = None):
def __init__(self, iserver: InternalServer = None, loop: asyncio.AbstractEventLoop = None, user_manager=None):
self.loop: asyncio.AbstractEventLoop = loop or asyncio.get_event_loop()
_logger = logging.getLogger(__name__)
self.endpoint = urlparse("opc.tcp://0.0.0.0:4840/freeopcua/server/")
@@ -82,7 +82,7 @@ class Server:
self.manufacturer_name = "FreeOpcUa"
self.application_type = ua.ApplicationType.ClientAndServer
self.default_timeout: int = 60 * 60 * 1000
self.iserver = iserver if iserver else InternalServer(self.loop)
self.iserver = iserver if iserver else InternalServer(self.loop, user_manager=user_manager)
self.bserver: Optional[BinaryServer] = None
self._discovery_clients = {}
self._discovery_period = 60
@@ -94,7 +94,7 @@ class Server:
ua.SecurityPolicyType.Basic256Sha256_Sign
]
# allow all certificates by default
self._certificate_handler = None
self._permission_ruleset = None
self._policyIDs = ["Anonymous", "Basic256Sha256", "Username"]
self.certificate = None
@@ -245,7 +245,7 @@ class Server:
async def get_endpoints(self) -> Coroutine:
return await self.iserver.get_endpoints()
def set_security_policy(self, security_policy, certificate_handler=None):
def set_security_policy(self, security_policy, permission_ruleset=None):
"""
Method setting up the security policies for connections
to the server, where security_policy is a list of integers.
@@ -264,7 +264,7 @@ class Server:
"""
self._security_policy = security_policy
self._certificate_handler = certificate_handler
self._permission_ruleset = permission_ruleset
def set_security_IDs(self, policy_ids):
"""
@@ -303,12 +303,14 @@ class Server:
self._policies.append(
ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
ua.MessageSecurityMode.SignAndEncrypt, self.certificate,
self.iserver.private_key, certificate_handler=self._certificate_handler))
self.iserver.private_key,
permission_ruleset=self._permission_ruleset))
if ua.SecurityPolicyType.Basic256Sha256_Sign in self._security_policy:
self._set_endpoints(security_policies.SecurityPolicyBasic256Sha256, ua.MessageSecurityMode.Sign)
self._policies.append(
ua.SecurityPolicyFactory(security_policies.SecurityPolicyBasic256Sha256,
ua.MessageSecurityMode.Sign, self.certificate, self.iserver.private_key, certificate_handler=self._certificate_handler))
ua.MessageSecurityMode.Sign, self.certificate, self.iserver.private_key,
permission_ruleset=self._permission_ruleset))
def _set_endpoints(self, policy=ua.SecurityPolicy, mode=ua.MessageSecurityMode.None_):
idtokens = []

View File

@@ -84,7 +84,11 @@ class UaProcessor:
self.send_response(requestdata.requesthdr.RequestHandle, requestdata.seqhdr, response)
async def process(self, header, body):
msg = self._connection.receive_from_header_and_body(header, body)
try:
msg = self._connection.receive_from_header_and_body(header, body)
except ua.uaerrors.BadUserAccessDenied as e:
_logger.warning("Unauthenticated user attempted to connect")
return False
if isinstance(msg, ua.Message):
if header.MessageType == ua.MessageType.SecureOpen:
self.open_secure_channel(msg.SecurityHeader(), msg.SequenceHeader(), msg.body())
@@ -124,6 +128,15 @@ class UaProcessor:
_logger.error("sending service fault response: %s (%s)", status.doc, status.name)
self.send_response(requesthdr.RequestHandle, seqhdr, response)
return True
except ua.uaerrors.BadUserAccessDenied as e:
if self.session:
user = self.session.user
else:
user = 'Someone'
_logger.warning("%s attempted to do something they are not permitted to do", user)
response = ua.ServiceFault()
response.ResponseHeader.ServiceResult = ua.StatusCode(ua.StatusCodes.BadUserAccessDenied)
self.send_response(requesthdr.RequestHandle, seqhdr, response)
except Exception:
_logger.exception('Error while processing message')
response = ua.ServiceFault()
@@ -132,8 +145,20 @@ class UaProcessor:
return True
async def _process_message(self, typeid, requesthdr, seqhdr, body):
if typeid in [ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary),
ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary)]:
# The connection is first created without a user being attached, and then during activation the
user = None
elif self.session is None:
raise ua.uaerrors.BadUserAccessDenied
else:
user = self.session.user
if self._connection.security_policy.permissions is not None:
if self._connection.security_policy.permissions.check_validity(user, typeid, body) is False:
raise ua.uaerrors.BadUserAccessDenied
if typeid == ua.NodeId(ua.ObjectIds.CreateSessionRequest_Encoding_DefaultBinary):
_logger.info("Create session request")
_logger.info("Create session request (%s)", user)
params = struct_from_binary(ua.CreateSessionParameters, body)
# create the session on server
self.session = self.iserver.create_session(self.name, external=True)
@@ -153,36 +178,36 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.CloseSessionRequest_Encoding_DefaultBinary):
_logger.info("Close session request")
_logger.info("Close session request (%s)", user)
if self.session:
deletesubs = ua.ua_binary.Primitives.Boolean.unpack(body)
await self.session.close_session(deletesubs)
else:
_logger.info("Request to close non-existing session")
_logger.info("Request to close non-existing session (%s)", user)
response = ua.CloseSessionResponse()
_logger.info("sending close session response")
_logger.info("sending close session response (%s)", user)
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.ActivateSessionRequest_Encoding_DefaultBinary):
_logger.info("Activate session request")
_logger.info("Activate session request (%s)", user)
params = struct_from_binary(ua.ActivateSessionParameters, body)
if not self.session:
_logger.info("request to activate non-existing session")
_logger.info("request to activate non-existing session (%s)", user)
raise ServiceError(ua.StatusCodes.BadSessionIdInvalid)
if self._connection.security_policy.host_certificate is None:
data = self.session.nonce
else:
data = self._connection.security_policy.host_certificate + self.session.nonce
self._connection.security_policy.asymmetric_cryptography.verify(data, params.ClientSignature.Signature)
result = self.session.activate_session(params)
result = self.session.activate_session(params, self._connection.security_policy.peer_certificate)
response = ua.ActivateSessionResponse()
response.Parameters = result
#_logger.info("sending read response")
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.ReadRequest_Encoding_DefaultBinary):
_logger.info("Read request")
_logger.info("Read request (%s)", user)
params = struct_from_binary(ua.ReadParameters, body)
results = await self.session.read(params)
response = ua.ReadResponse()
@@ -191,7 +216,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.WriteRequest_Encoding_DefaultBinary):
_logger.info("Write request")
_logger.info("Write request (%s)", user)
params = struct_from_binary(ua.WriteParameters, body)
results = await self.session.write(params)
response = ua.WriteResponse()
@@ -200,7 +225,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.BrowseRequest_Encoding_DefaultBinary):
_logger.info("Browse request")
_logger.info("Browse request (%s)", user)
params = struct_from_binary(ua.BrowseParameters, body)
results = await self.session.browse(params)
response = ua.BrowseResponse()
@@ -209,7 +234,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.GetEndpointsRequest_Encoding_DefaultBinary):
_logger.info("get endpoints request")
_logger.info("get endpoints request (%s)", user)
params = struct_from_binary(ua.GetEndpointsParameters, body)
endpoints = await self.iserver.get_endpoints(params, sockname=self.sockname)
response = ua.GetEndpointsResponse()
@@ -218,7 +243,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.FindServersRequest_Encoding_DefaultBinary):
_logger.info("find servers request")
_logger.info("find servers request (%s)", user)
params = struct_from_binary(ua.FindServersParameters, body)
servers = self.iserver.find_servers(params)
response = ua.FindServersResponse()
@@ -227,7 +252,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.RegisterServerRequest_Encoding_DefaultBinary):
_logger.info("register server request")
_logger.info("register server request %s", user)
serv = struct_from_binary(ua.RegisteredServer, body)
self.iserver.register_server(serv)
response = ua.RegisterServerResponse()
@@ -235,7 +260,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.RegisterServer2Request_Encoding_DefaultBinary):
_logger.info("register server 2 request")
_logger.info("register server 2 request %s", user)
params = struct_from_binary(ua.RegisterServer2Parameters, body)
results = self.iserver.register_server2(params)
response = ua.RegisterServer2Response()
@@ -244,7 +269,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.TranslateBrowsePathsToNodeIdsRequest_Encoding_DefaultBinary):
_logger.info("translate browsepaths to nodeids request")
_logger.info("translate browsepaths to nodeids request (%s)", user)
params = struct_from_binary(ua.TranslateBrowsePathsToNodeIdsParameters, body)
paths = await self.session.translate_browsepaths_to_nodeids(params.BrowsePaths)
response = ua.TranslateBrowsePathsToNodeIdsResponse()
@@ -253,7 +278,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.AddNodesRequest_Encoding_DefaultBinary):
_logger.info("add nodes request")
_logger.info("add nodes request (%s)", user)
params = struct_from_binary(ua.AddNodesParameters, body)
results = await self.session.add_nodes(params.NodesToAdd)
response = ua.AddNodesResponse()
@@ -262,7 +287,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.DeleteNodesRequest_Encoding_DefaultBinary):
_logger.info("delete nodes request")
_logger.info("delete nodes request (%s)", user)
params = struct_from_binary(ua.DeleteNodesParameters, body)
results = await self.session.delete_nodes(params)
response = ua.DeleteNodesResponse()
@@ -271,7 +296,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.AddReferencesRequest_Encoding_DefaultBinary):
_logger.info("add references request")
_logger.info("add references request (%s)", user)
params = struct_from_binary(ua.AddReferencesParameters, body)
results = await self.session.add_references(params.ReferencesToAdd)
response = ua.AddReferencesResponse()
@@ -280,7 +305,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.DeleteReferencesRequest_Encoding_DefaultBinary):
_logger.info("delete references request")
_logger.info("delete references request (%s)", user)
params = struct_from_binary(ua.DeleteReferencesParameters, body)
results = await self.session.delete_references(params.ReferencesToDelete)
response = ua.DeleteReferencesResponse()
@@ -289,7 +314,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.CreateSubscriptionRequest_Encoding_DefaultBinary):
_logger.info("create subscription request")
_logger.info("create subscription request (%s)", user)
params = struct_from_binary(ua.CreateSubscriptionParameters, body)
result = await self.session.create_subscription(params, callback=self.forward_publish_response)
response = ua.CreateSubscriptionResponse()
@@ -298,7 +323,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.DeleteSubscriptionsRequest_Encoding_DefaultBinary):
_logger.info("delete subscriptions request")
_logger.info("delete subscriptions request (%s)", user)
params = struct_from_binary(ua.DeleteSubscriptionsParameters, body)
results = await self.session.delete_subscriptions(params.SubscriptionIds)
response = ua.DeleteSubscriptionsResponse()
@@ -307,7 +332,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.CreateMonitoredItemsRequest_Encoding_DefaultBinary):
_logger.info("create monitored items request")
_logger.info("create monitored items request (%s)", user)
params = struct_from_binary(ua.CreateMonitoredItemsParameters, body)
results = await self.session.create_monitored_items(params)
response = ua.CreateMonitoredItemsResponse()
@@ -316,7 +341,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.ModifyMonitoredItemsRequest_Encoding_DefaultBinary):
_logger.info("modify monitored items request")
_logger.info("modify monitored items request (%s)", user)
params = struct_from_binary(ua.ModifyMonitoredItemsParameters, body)
results = await self.session.modify_monitored_items(params)
response = ua.ModifyMonitoredItemsResponse()
@@ -325,7 +350,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.DeleteMonitoredItemsRequest_Encoding_DefaultBinary):
_logger.info("delete monitored items request")
_logger.info("delete monitored items request (%s)", user)
params = struct_from_binary(ua.DeleteMonitoredItemsParameters, body)
results = await self.session.delete_monitored_items(params)
response = ua.DeleteMonitoredItemsResponse()
@@ -334,7 +359,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.HistoryReadRequest_Encoding_DefaultBinary):
_logger.info("history read request")
_logger.info("history read request (%s)", user)
params = struct_from_binary(ua.HistoryReadParameters, body)
results = await self.session.history_read(params)
response = ua.HistoryReadResponse()
@@ -343,7 +368,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.RegisterNodesRequest_Encoding_DefaultBinary):
_logger.info("register nodes request")
_logger.info("register nodes request (%s)", user)
params = struct_from_binary(ua.RegisterNodesParameters, body)
_logger.info("Node registration not implemented")
response = ua.RegisterNodesResponse()
@@ -352,14 +377,14 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.UnregisterNodesRequest_Encoding_DefaultBinary):
_logger.info("unregister nodes request")
_logger.info("unregister nodes request (%s)", user)
params = struct_from_binary(ua.UnregisterNodesParameters, body)
response = ua.UnregisterNodesResponse()
#_logger.info("sending unregister nodes response")
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.PublishRequest_Encoding_DefaultBinary):
_logger.debug("publish request")
_logger.debug("publish request (%s)", user)
if not self.session:
return False
params = struct_from_binary(ua.PublishParameters, body)
@@ -378,7 +403,7 @@ class UaProcessor:
#_logger.debug("publish forward to server")
elif typeid == ua.NodeId(ua.ObjectIds.RepublishRequest_Encoding_DefaultBinary):
_logger.info("re-publish request")
_logger.info("re-publish request (%s)", user)
params = struct_from_binary(ua.RepublishParameters, body)
msg = self.session.republish(params)
response = ua.RepublishResponse()
@@ -386,14 +411,14 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.CloseSecureChannelRequest_Encoding_DefaultBinary):
_logger.info("close secure channel request")
_logger.info("close secure channel request (%s)", user)
self._connection.close()
response = ua.CloseSecureChannelResponse()
self.send_response(requesthdr.RequestHandle, seqhdr, response)
return False
elif typeid == ua.NodeId(ua.ObjectIds.CallRequest_Encoding_DefaultBinary):
_logger.info("call request")
_logger.info("call request (%s)", user)
params = struct_from_binary(ua.CallParameters, body)
results = await self.session.call(params.MethodsToCall)
response = ua.CallResponse()
@@ -401,7 +426,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.SetMonitoringModeRequest_Encoding_DefaultBinary):
_logger.info("set monitoring mode request")
_logger.info("set monitoring mode request (%s)", user)
params = struct_from_binary(ua.SetMonitoringModeParameters, body)
# FIXME: Implement SetMonitoringMode
# For now send dummy results to keep clients happy
@@ -415,7 +440,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
elif typeid == ua.NodeId(ua.ObjectIds.SetPublishingModeRequest_Encoding_DefaultBinary):
_logger.info("set publishing mode request")
_logger.info("set publishing mode request (%s)", user)
params = struct_from_binary(ua.SetPublishingModeParameters, body)
# FIXME: Implement SetPublishingMode
# For now send dummy results to keep clients happy
@@ -429,7 +454,7 @@ class UaProcessor:
self.send_response(requesthdr.RequestHandle, seqhdr, response)
else:
_logger.warning("Unknown message received %s", typeid)
_logger.warning("Unknown message received %s (%s)", typeid, user)
raise ServiceError(ua.StatusCodes.BadServiceUnsupported)
return True

View File

@@ -0,0 +1,55 @@
from asyncua.crypto import uacrypto
import logging
from asyncua.server.users import UserRole, User
class UserManager:
def get_user(self, iserver, username=None, password=None, certificate=None):
raise NotImplementedError
class PermissiveUserManager:
def get_user(self, iserver, username=None, password=None, certificate=None):
"""
Default user_manager, does nothing much but check for admin
"""
if username and iserver.allow_remote_admin and username in ("admin", "Admin"):
return User(role=UserRole.Admin)
else:
return User(role=UserRole.User)
class CertificateUserManager:
"""
Certificate user manager, takes a certificate handler with its associated users and provides those users.
"""
def __init__(self):
self._trusted_certificates = {}
async def add_role(self, certificate_path: str, user_role: UserRole, name: str, format: str = None):
certificate = await uacrypto.load_certificate(certificate_path, format)
if name is None:
raise KeyError
user = User(role=user_role, name=name)
if name in self._trusted_certificates:
logging.warning(f"certificate with name {name} "
f"attempted to be added multiple times, only the last version will be kept.")
self._trusted_certificates[name] = {'certificate': uacrypto.der_from_x509(certificate), 'user': user}
def get_user(self, iserver, username=None, password=None, certificate=None):
if certificate is None:
return None
correct_users = [prospective_certificate['user'] for prospective_certificate in self._trusted_certificates.values()
if certificate == prospective_certificate['certificate']]
if len(correct_users) == 0:
return None
else:
return correct_users[0]
async def add_user(self, certificate_path: str, name: str, format: str = None):
await self.add_role(certificate_path=certificate_path, user_role=UserRole.User, name=name, format=format)
async def add_admin(self, certificate_path: str, name:str, format: str = None):
await self.add_role(certificate_path=certificate_path, user_role=UserRole.Admin, name=name, format=format)

View File

@@ -3,12 +3,19 @@ Implement user management here.
"""
from enum import Enum
from dataclasses import dataclass
class User(Enum):
class UserRole(Enum):
"""
Define some default users.
User Roles
"""
Admin = 0
Anonymous = 1
User = 3
@dataclass
class User:
role: UserRole = UserRole.Anonymous
name: str = None

View File

@@ -2,6 +2,7 @@
from asyncua.ua.uaerrors import UaStatusCodeError
class StatusCodes:
Good = 0
Uncertain = 0x40000000

View File

@@ -228,6 +228,8 @@ class SecurityPolicy:
self.Mode = auto.MessageSecurityMode.None_
self.peer_certificate = None
self.host_certificate = None
self.user = None
self.permissions = None
def make_local_symmetric_key(self, secret, seed):
pass
@@ -243,12 +245,13 @@ class SecurityPolicyFactory:
SecurityPolicy for every client and client's certificate
"""
def __init__(self, cls=SecurityPolicy, mode=auto.MessageSecurityMode.None_, certificate=None, private_key=None, certificate_handler=None):
def __init__(self, cls=SecurityPolicy, mode=auto.MessageSecurityMode.None_, certificate=None, private_key=None,
permission_ruleset=None):
self.cls = cls
self.mode = mode
self.certificate = certificate
self.private_key = private_key
self.certificate_handler = certificate_handler
self.permission_ruleset = permission_ruleset
def matches(self, uri, mode=None):
return self.cls.URI == uri and (mode is None or self.mode == mode)
@@ -257,7 +260,8 @@ class SecurityPolicyFactory:
if self.cls is SecurityPolicy:
return self.cls()
else:
return self.cls(peer_certificate, self.certificate, self.private_key, self.mode, certificate_handler=self.certificate_handler)
return self.cls(peer_certificate, self.certificate, self.private_key, self.mode,
permission_ruleset=self.permission_ruleset)
class Message:

View File

@@ -1,4 +1,4 @@
pytest
pytest-asyncio
coverage
pytest-cov
pytest-cov

60
docs/user_management.md Normal file
View File

@@ -0,0 +1,60 @@
# User Management
## Overview
Currently user management on OPC-UA servers here is done exclusively through certificates, though there is the potential to create new user management objects.
How this works in practice is that every user generates a certificate/private key pair, and then the certificate is given
to the OPC-UA server. The administrator of the OPC-UA server enters the certificate into the `certificate_user_manager` with a `UserRole`
and a `name`. When a user connects with this certificate, every action they do will be associated with their name in the logs,
and the `permission_ruleset` will determine whether a user with that role can carry out that action.
## Usage
an example of usage is in `examples/server-with-encryption.py`:
```python
user_manager = CertificateUserManager()
await user_manager.add_user("certificates/peer-certificate-example-1.der", name='user1')
server = Server(user_manager=user_manager)
await server.init()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
server.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt],
permission_ruleset=SimpleRoleRuleset())
```
We can see here that a certificate user manager object is made, and told to assign peer-certificate-example-1 with User credentials.
When the client wants to actually carry out some actions, the user manager `CertificateUserManager` will use the
certificate handler to associate a user to the certificate, and then the `permission_ruleset` will determine if they are
allowed to do the action.
## Custom permission rules
The permission ruleset object has been designed in a way to allow new rulesets to be made easily. For example, lets look
at the implementation of `SimpleRoleRuleset`:
```python
class SimpleRoleRuleset(PermissionRuleset):
"""
Standard simple role-based ruleset.
Admins alone can write, admins and users can read, and anonymous users can't do anything.
"""
def __init__(self):
write_ids = list(map(ua.NodeId, WRITE_TYPES))
read_ids = list(map(ua.NodeId, READ_TYPES))
self._permission_dict = {
UserRole.Admin: set().union(write_ids, read_ids),
UserRole.User: set().union(read_ids),
UserRole.Anonymous: set()
}
def check_validity(self, user, action_type_id, body):
if action_type_id in self._permission_dict[user.role]:
return True
else:
return False
```
all that is needed to create a permission ruleset is to create a function `check_validity` which takes information about
the user and the action type, and returns `True` if it is allowed, or `False` if it isn't. In this case, we simply take the
user role and compare the action it wants to do with a list of actions stored in a dictionary. A more complex ruleset could use the body
of the request to determine some users as being able to write some variables, but not others. Another potential option is
having more user roles than those we have set here.

Binary file not shown.

View File

@@ -0,0 +1,52 @@
-----BEGIN PRIVATE KEY-----
MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQCoLlw1UN2MaMc8
dMxbtXItWD1INAh7QLcocdBwATrghPAGlZkAupNXWi4kkMRGjy84BxN1T7QMOCZo
RLksEUpL8SpQ3boYgN9GkQZxE78NozE1OSGuzTZuCRLYQNvkFHM2gChQn0cg3OG8
PzxtBQtpeawYq2n3TS0YNKt6SnnYFOEFVNPu5cNTTFT/9epCXA9MEI+ZCtq/OIyg
2RE0X8t9wnmp4f0In/ZeDGxBgRYb6ODg0jgpQ0+Rc5+KtrYoOQ5dFxeelo1qOp+l
bOlQ8hV8cihTpi3dUPyIclE0nmw7lHduFfgMveBExNBySewwGsVNeHbDDTBKpFaQ
vC/Db0zWNc87i8O6i2zsheD80dmmPOYYFIljzyTuf5J2UX2N7QlV37nh5+x4M64w
tIfcdFaZsu2wisSjV6Fm5HZuauCZXhjM0Pgd1dno6dYH5Rbmv1TTrzZLxQJJEIrb
iTwIezWRIbq+GJ/1cF8D6qeMljvLKmIB8JaZ2fDnlDQ82WeedELsXUv9/fp5dlC4
3coBikYBDlzyIkdrTcSh51WerZ9RtrNsadhf15YdcwMmHLJ7DNt5OPA7DoPPnh4N
bkj4HAZ9b8dPttHM1wpI2LNDiDuYi46TvUM4kVe40CZEXwsXQWsquQJQIt3Cf6PI
OdZikCwduR1JLqfpoA9o0DBnmORi5wIDAQABAoICAAwUQFUn+LGT+PNTpDXWA1xU
YRplelJ35G/INONGg1cK1T5z0ihmPhzhAj1oFGDI+eg5QeGEFvdltQR6Ix6SI1bu
/E4QjsQhxWeJFQBcHbW6Cde7wbqPTKB8kFjt1507BsdcTwjzV6V5l9DwbcOawCYU
MhMXhKXZaiq55bMD1Fp7rWm3nP0kjXHXIqe7ntajTAn3iojAadOgGSGYVY8Abloa
9KgJ7iWHb6dzlvbhcZgRks0y7AuypO1ac5P/6XdcGbb/6JdgUsnVag/0kZFZwcdN
X64aWmLx5SFbrI9t7XIeCLHkbAtmO8xUonAC9S7DTXZx932s3dIOmZUNnxSXUtpV
9mTMhfmXLYqIAC9sr8wr2S/1jRs6MFgu8NI9S/q+Mkt/4pfZJsVNco6C3hkyFUpq
yU8OgXQSZ4YLdRkaidI4m7cDCV9Akt8XmA2XCwcqLySeHrZbBpIgCGKOkVPwVg1t
BqkhhYNEhgV3BeCIbHniSKWuydiaSvq2YT+gidcGcoGh3PHcngTGMXuI+u0hdcNO
YcLlsWIlcpGKNcZy8HA4i8wV+pLpqze+hJvA7Tx91YmUFwdsbH2N4dY/mOIvfVxF
TDkb6tsJf/WYtSxhTdIP4LQtllfHBZLxS36/nAQMwV2ywJSCRrW4RTBLE6sjMcKV
j0ghiNraUQSUnx4Z4zDJAoIBAQDRbwgPHRPgoi341uJ/apwtDMwSftAV8bYu98st
EUJGqJTQiq1wuVmx7VLcv5hOtfwvfGuimroIzK9QyJJP6wLmuKf3js9ryg8SUtAm
70adJhpqz5rwcNaSxzQYVacLR+H1hUP7uAfU1WZgE0yaL6yIMpmvunPfefdN0rRg
jJHoLXUeUu2FIbK0De8T7pgxxrgJ4pXqXVVLARFcxpLMjXLVRpYVrFrxLeYOhOG6
ixJiH7kOyREPKTNNw3MYmsjXlUZVM4l1dLqbhxRNbsq0k0Gz0d7PUhyfvIymT5ss
PSjUs7go4dLFkql8fCMcCGzSP+kmv7/nWhiS08lWsdA2+o0DAoIBAQDNkzqrp1oU
8sn/RIu53aucXZOmZUCamnsWMTPBlvy5hSfXUuW6HdRBsWxCdOMqOU6X9hdJRXRG
UGxcTimSgZfbZZpymVB75OdACWFz1BNeQ3S5pEkS9EKm9kGhhzrlIhJswvvQ8lPa
cpw7ztlW+2f126cn0Clzm3yFCXrjDFjyB91dZfZV79CzPHCRipYV0w3WIvHr3bHI
23D0UxN0Wyz7NCGqxlI5qqepb9b7XOcq3ie86SKFlgCjRd/RWXYdCUtopnYisPBG
jaRvtKryGnpRy2NjLAt/wT7TSO0DcjLpE8Wo93e3SsWHe84394AAVbp0kYwP8E3c
WI/+btWL8FNNAoIBAQCLP12GXvj2ZB5Y0tbpjMKmGVod+f70eC9/bvXB0zXTsQaw
2WOUDUibyoVLjIAB5XH1SK5wAJwseZsfBRxy0nitWp0jNeqnakM3FaScHn8+wTTT
S5FiyEOCd7wlad2U3IhaK+8VkDh0xCY7PtmxsnzuWWZhtoCQV5GcRrnzb8MbMg4q
yUxvNgaRPKp3AC5k+hEnUWYH22J4ag9XNhgotOti7nmfk2cr8C6D/Mnws8LXRuxN
7lXPVL9ttsPx2ggrfm6y+38rfx8LhXZ0PXO9OCN5IdMY2Sl682w4r7kmYsuhEJUj
aXIwNuFFiWv2mYtxB1lHkDpR4k9wYCv/3l3oEBJtAoIBAFhQGXJIcIBzVKee0M/G
FMUVxipDcR95ocsE1BEkRemE9HqiGCJ+UgLur21VPYmUGNlbVOGbPro+Lm7w/Rb9
KTmNSm5UwqiWy2lNHlpeDvv9ypU9qE7GNCP5wnQb3gc1PekPUpKOp8s+VTO0RtYB
JsYN2J9X45bQgwu76NjFIyjGjChldw+v+F4nZsx/Vr6FbnpBhOAo2wlIjBvJ3J+t
04MPAC+XBxfS4jf93cMaRAy8ZyHWnYf5yshZai9LWVxb89M/P5zjmjUVAEEkwhND
oKymzUl7UG1PnSezOm6yjoBmiBFwTjOloXn0i8CrSIfcTD9rj1oBh9FTEnjhMEfv
tDkCggEAYw4p4JsVWi08xkoJO8R93fziBZaQdBObiIM1gzDX6amxBjHK5C2nOQpf
9I+q0/Xaf3PfCixCmFYP/bdHUW3JlmWl5xkZ8jsoO/zs7MRljqnjLxMZpH9kSllx
m5ITaItw2zubLHIefEL2HYsp65Mkp/dWoPPeirgeO5ojd7FmUkFdAXSmk3l9NYEj
mqR88FXX4491+WBhYxw5A5oKF5gJsP3S+RHYQu9J1Xc4ZAcCBp2MnSlW9xgy1RZz
Vqcrkuc675O3LQFGN03fmYVJYNhMFzv4YZFMQgLr8mQQJGAxxrui8fBilKDWgErO
rTvgWzBzQqWMmzFItbDLerUpY7u2fg==
-----END PRIVATE KEY-----

View File

@@ -8,8 +8,9 @@ from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger("asyncua")
cert = "certificates/peer-certificate-example-1.der"
private_key = "certificates/peer-private-key-example-1.pem"
cert_idx = 1
cert = f"certificates/peer-certificate-example-{cert_idx}.der"
private_key = f"certificates/peer-private-key-example-{cert_idx}.pem"
async def task(loop):
@@ -19,12 +20,15 @@ async def task(loop):
await client.set_security(
SecurityPolicyBasic256Sha256,
certificate_path=cert,
private_key_path=private_key
private_key_path=private_key,
server_certificate_path="certificate-example.der"
)
await client.connect()
root = client.nodes.root
print(await root.get_children())
objects = client.nodes.objects
child = await objects.get_child(['0:MyObject', '0:MyVariable'])
print(await child.get_value())
await child.set_value(42)
print(await child.get_value())
except Exception:
_logger.exception('error')
finally:

View File

@@ -1,9 +1,12 @@
import logging
import asyncio
import sys
sys.path.insert(0, "..")
from asyncua import ua, Server
from asyncua.common.methods import uamethod
logging.basicConfig(level=logging.INFO)
_logger = logging.getLogger('asyncua')

View File

@@ -3,52 +3,50 @@ import sys
import logging
sys.path.insert(0, "..")
from asyncua.crypto.certificate_handler import CertificateHandler
from asyncua import Server
from asyncua import ua
from asyncua.crypto.permission_rules import SimpleRoleRuleset
from asyncua.server.users import UserRole
from asyncua.server.user_managers import CertificateUserManager
logging.basicConfig(level=logging.INFO)
async def main():
server = Server()
cert_handler = CertificateHandler()
cert_user_manager = CertificateUserManager()
await cert_user_manager.add_user("certificates/peer-certificate-example-1.der", name='test_user')
server = Server(user_manager=cert_user_manager)
await server.init()
await cert_handler.trust_certificate("certificates/peer-certificate-example-1.der")
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
server.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt],
certificate_handler=cert_handler)
permission_ruleset=SimpleRoleRuleset())
# load server certificate and private key. This enables endpoints
# with signing and encryption.
await server.load_certificate("certificate-example.der")
await server.load_private_key("private-key-example.pem")
# setup our own namespace, not really necessary but should as spec
uri = "http://examples.freeopcua.github.io"
idx = await server.register_namespace(uri)
idx = 0
# populating our address space
myobj = await server.nodes.objects.add_object(idx, "MyObject")
myvar = await myobj.add_variable(idx, "MyVariable", 6.7)
myvar = await myobj.add_variable(idx, "MyVariable", 0.0)
await myvar.set_writable() # Set MyVariable to be writable by clients
# starting!
async with server:
count = 0
while True:
await asyncio.sleep(1)
count += 0.1
current_val = await myvar.get_value()
count = current_val + 0.1
await myvar.write_value(count)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())

View File

@@ -10,7 +10,7 @@ else:
from asyncua import Client
from asyncua import Server
from asyncua import ua
from asyncua.crypto.certificate_handler import CertificateHandler
from asyncua.server.user_managers import CertificateUserManager
try:
from asyncua.crypto import uacrypto
@@ -55,19 +55,20 @@ encrypted_private_key_peer_creds = {
@pytest.fixture(params=srv_crypto_params)
async def srv_crypto_encrypted_key_one_cert(request):
srv = Server()
peer_certificate = encrypted_private_key_peer_creds["certificate"]
cert_handler = CertificateHandler()
user_manager = CertificateUserManager()
key, cert = request.param
await cert_handler.trust_certificate(peer_certificate)
await user_manager.add_admin(peer_certificate, 'test1')
srv = Server(user_manager=user_manager)
await srv.init()
srv.set_endpoint(uri_crypto_cert)
srv.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt],
certificate_handler=cert_handler)
srv.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
await srv.load_certificate(cert)
await srv.load_private_key(key)
await srv.start()
yield srv
yield srv, cert
# stop the server
await srv.stop()
@@ -82,26 +83,27 @@ async def srv_crypto_all_certs(request):
await srv.load_certificate(cert)
await srv.load_private_key(key)
await srv.start()
yield srv
yield srv, cert
# stop the server
await srv.stop()
@pytest.fixture(params=srv_crypto_params)
async def srv_crypto_one_cert(request):
srv = Server()
peer_certificate = peer_creds["certificate"]
cert_handler = CertificateHandler()
user_manager = CertificateUserManager()
key, cert = request.param
await cert_handler.trust_certificate(peer_certificate)
await user_manager.add_admin(peer_certificate, 'test1')
srv = Server(user_manager=user_manager)
await srv.init()
srv.set_endpoint(uri_crypto_cert)
srv.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt],
certificate_handler=cert_handler)
srv.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt])
await srv.load_certificate(cert)
await srv.load_private_key(key)
await srv.start()
yield srv
yield srv, cert
# stop the server
await srv.stop()
@@ -132,36 +134,42 @@ async def test_nocrypto_fail(srv_no_crypto):
async def test_basic256(srv_crypto_all_certs):
_, cert = srv_crypto_all_certs
clt = Client(uri_crypto)
await clt.set_security_string(
f"Basic256Sha256,Sign,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem")
f"Basic256Sha256,Sign,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem,{cert}")
async with clt:
assert await clt.nodes.objects.get_children()
async def test_basic256_encrypt(srv_crypto_all_certs):
_, cert = srv_crypto_all_certs
clt = Client(uri_crypto)
await clt.set_security_string(
f"Basic256Sha256,SignAndEncrypt,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem")
f"Basic256Sha256,SignAndEncrypt,{EXAMPLE_PATH}certificate-example.der,{EXAMPLE_PATH}private-key-example.pem,{cert}")
async with clt:
assert await clt.nodes.objects.get_children()
async def test_basic256_encrypt_success(srv_crypto_all_certs):
clt = Client(uri_crypto)
_, cert = srv_crypto_all_certs
await clt.set_security(
security_policies.SecurityPolicyBasic256Sha256,
f"{EXAMPLE_PATH}certificate-example.der",
f"{EXAMPLE_PATH}private-key-example.pem",
None,
mode=ua.MessageSecurityMode.SignAndEncrypt
)
cert,
ua.MessageSecurityMode.SignAndEncrypt
)
async with clt:
assert await clt.nodes.objects.get_children()
async def test_basic256_encrypt_fail(srv_crypto_all_certs):
# FIXME: how to make it fail???
_, cert = srv_crypto_all_certs
clt = Client(uri_crypto)
with pytest.raises(ua.UaError):
await clt.set_security(
@@ -169,17 +177,20 @@ async def test_basic256_encrypt_fail(srv_crypto_all_certs):
f"{EXAMPLE_PATH}certificate-example.der",
f"{EXAMPLE_PATH}private-key-example.pem",
None,
None,
mode=ua.MessageSecurityMode.None_
)
async def test_certificate_handling_success(srv_crypto_one_cert):
_, cert = srv_crypto_one_cert
clt = Client(uri_crypto_cert)
await clt.set_security(
security_policies.SecurityPolicyBasic256Sha256,
peer_creds['certificate'],
peer_creds['private_key'],
None,
cert,
mode=ua.MessageSecurityMode.SignAndEncrypt
)
async with clt:
@@ -187,13 +198,14 @@ async def test_certificate_handling_success(srv_crypto_one_cert):
async def test_encrypted_private_key_handling_success(srv_crypto_encrypted_key_one_cert):
_, cert = srv_crypto_encrypted_key_one_cert
clt = Client(uri_crypto_cert)
await clt.set_security(
security_policies.SecurityPolicyBasic256Sha256,
encrypted_private_key_peer_creds['certificate'],
encrypted_private_key_peer_creds['private_key'],
encrypted_private_key_peer_creds['password'],
None,
cert,
mode=ua.MessageSecurityMode.SignAndEncrypt
)
async with clt:
@@ -201,9 +213,10 @@ async def test_encrypted_private_key_handling_success(srv_crypto_encrypted_key_o
async def test_certificate_handling_failure(srv_crypto_one_cert):
_, cert = srv_crypto_one_cert
clt = Client(uri_crypto_cert)
with pytest.raises(TimeoutError):
with pytest.raises(ua.uaerrors.BadUserAccessDenied):
await clt.set_security(
security_policies.SecurityPolicyBasic256Sha256,
unauthorized_peer_creds['certificate'],
@@ -211,20 +224,22 @@ async def test_certificate_handling_failure(srv_crypto_one_cert):
None,
mode=ua.MessageSecurityMode.SignAndEncrypt
)
async with clt:
assert await clt.get_objects_node().get_children()
async def test_encrypted_private_key_handling_failure(srv_crypto_one_cert):
_, cert = srv_crypto_one_cert
clt = Client(uri_crypto_cert)
with pytest.raises(TimeoutError):
with pytest.raises(ua.uaerrors.BadUserAccessDenied):
await clt.set_security(
security_policies.SecurityPolicyBasic256Sha256,
unauthorized_peer_creds['certificate'],
unauthorized_peer_creds['private_key'],
None, # Pass None for an empty password to test incorrect password.
None,
cert,
mode=ua.MessageSecurityMode.SignAndEncrypt
)
async with clt:
@@ -232,6 +247,7 @@ async def test_encrypted_private_key_handling_failure(srv_crypto_one_cert):
async def test_certificate_handling_mismatched_creds(srv_crypto_one_cert):
_, cert = srv_crypto_one_cert
clt = Client(uri_crypto_cert)
with pytest.raises(TimeoutError):
await clt.set_security(
@@ -239,6 +255,7 @@ async def test_certificate_handling_mismatched_creds(srv_crypto_one_cert):
peer_creds['certificate'],
unauthorized_peer_creds['private_key'],
None,
cert,
mode=ua.MessageSecurityMode.SignAndEncrypt
)
async with clt:

129
tests/test_permissions.py Normal file
View File

@@ -0,0 +1,129 @@
import os
import pytest
import sys
from asyncua import Client
from asyncua import Server
from asyncua import ua
from asyncua.crypto.permission_rules import SimpleRoleRuleset
from asyncua.server.users import UserRole
from asyncua.server.user_managers import CertificateUserManager
try:
from asyncua.crypto import uacrypto
from asyncua.crypto import security_policies
except ImportError:
print("WARNING: CRYPTO NOT AVAILABLE, CRYPTO TESTS DISABLED!!")
disable_crypto_tests = True
else:
disable_crypto_tests = False
pytestmark = pytest.mark.asyncio
port_num1 = 48515
port_num2 = 48512
port_num3 = 48516
uri_crypto = "opc.tcp://127.0.0.1:{0:d}".format(port_num1)
uri_no_crypto = "opc.tcp://127.0.0.1:{0:d}".format(port_num2)
uri_crypto_cert = "opc.tcp://127.0.0.1:{0:d}".format(port_num3)
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
EXAMPLE_PATH = os.path.join(BASE_DIR, "examples") + os.sep
srv_crypto_params = [(f"{EXAMPLE_PATH}private-key-example.pem",
f"{EXAMPLE_PATH}certificate-example.der"),]
admin_peer_creds = {
"certificate": f"{EXAMPLE_PATH}certificates/peer-certificate-example-1.der",
"private_key": f"{EXAMPLE_PATH}certificates/peer-private-key-example-1.pem"
}
user_peer_creds = {
"certificate": f"{EXAMPLE_PATH}certificates/peer-certificate-example-2.der",
"private_key": f"{EXAMPLE_PATH}certificates/peer-private-key-example-2.pem"
}
anonymous_peer_creds = {
"certificate": f"{EXAMPLE_PATH}certificates/peer-certificate-example-3.der",
"private_key": f"{EXAMPLE_PATH}certificates/peer-private-key-example-3.pem"
}
@pytest.fixture(params=srv_crypto_params)
async def srv_crypto_one_cert(request):
cert_user_manager = CertificateUserManager()
admin_peer_certificate = admin_peer_creds["certificate"]
user_peer_certificate = user_peer_creds["certificate"]
anonymous_peer_certificate = anonymous_peer_creds["certificate"]
key, cert = request.param
await cert_user_manager.add_admin(admin_peer_certificate, name='Admin')
await cert_user_manager.add_user(user_peer_certificate, name='User')
await cert_user_manager.add_role(anonymous_peer_certificate, name='Anonymous', user_role=UserRole.Anonymous)
srv = Server(user_manager=cert_user_manager)
srv.set_endpoint(uri_crypto_cert)
srv.set_security_policy([ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt],
permission_ruleset=SimpleRoleRuleset())
await srv.init()
await srv.load_certificate(cert)
await srv.load_private_key(key)
idx = 0
myobj = await srv.nodes.objects.add_object(idx, "MyObject")
myvar = await myobj.add_variable(idx, "MyVariable", 0.0)
await myvar.set_writable() # Set MyVariable to be writable by clients
await srv.start()
yield srv
# stop the server
await srv.stop()
async def test_permissions_admin(srv_crypto_one_cert):
clt = Client(uri_crypto_cert)
await clt.set_security(
security_policies.SecurityPolicyBasic256Sha256,
admin_peer_creds['certificate'],
admin_peer_creds['private_key'],
None,
server_certificate_path=srv_crypto_params[0][1],
mode=ua.MessageSecurityMode.SignAndEncrypt
)
async with clt:
assert await clt.get_objects_node().get_children()
objects = clt.nodes.objects
child = await objects.get_child(['0:MyObject', '0:MyVariable'])
await child.read_value()
await child.set_value(42)
async def test_permissions_user(srv_crypto_one_cert):
clt = Client(uri_crypto_cert)
await clt.set_security(
security_policies.SecurityPolicyBasic256Sha256,
user_peer_creds['certificate'],
user_peer_creds['private_key'],
None,
server_certificate_path=srv_crypto_params[0][1],
mode=ua.MessageSecurityMode.SignAndEncrypt
)
async with clt:
assert await clt.get_objects_node().get_children()
objects = clt.nodes.objects
child = await objects.get_child(['0:MyObject', '0:MyVariable'])
await child.read_value()
with pytest.raises(ua.uaerrors.BadUserAccessDenied):
await child.set_value(42)
async def test_permissions_anonymous(srv_crypto_one_cert):
clt = Client(uri_crypto_cert)
await clt.set_security(
security_policies.SecurityPolicyBasic256Sha256,
anonymous_peer_creds['certificate'],
anonymous_peer_creds['private_key'],
None,
server_certificate_path=srv_crypto_params[0][1],
mode=ua.MessageSecurityMode.SignAndEncrypt
)
await clt.connect()
with pytest.raises(ua.uaerrors.BadUserAccessDenied):
await clt.get_endpoints()