1
0
mirror of https://github.com/FreeOpcUa/opcua-asyncio synced 2025-10-29 17:07:18 +08:00

support user auth certificate chain

This commit is contained in:
Hryhorii Biloshenko
2025-09-17 16:20:30 +02:00
parent b3be188414
commit 93cb5a2be2

View File

@@ -81,6 +81,7 @@ class Client:
self.uaclient.pre_request_hook = self.check_connection
self.user_certificate: Optional[x509.Certificate] = None
self.user_private_key: Optional[PrivateKeyTypes] = None
self.user_certificate_chain: List[x509.Certificate] = []
self._server_nonce = None
self._session_counter = 1
self.nodes: Shortcuts = Shortcuts(self.uaclient)
@@ -260,10 +261,18 @@ class Client:
async def load_client_certificate(self, path: str, extension: Optional[str] = None) -> None:
"""
load our certificate from file, either pem or der
Load our certificate from file, either pem or der
"""
self.user_certificate = await uacrypto.load_certificate(path, extension)
async def load_client_chain(self, certs: Iterable[uacrypto.CertProperties]) -> None:
"""
Load intermediate chain certificates, either pem or der
"""
self.user_certificate_chain = await asyncio.gather(
*(uacrypto.load_certificate(cert.path_or_content, cert.extension) for cert in certs)
)
async def load_private_key(
self, path: Path, password: Optional[Union[str, bytes]] = None, extension: Optional[str] = None
) -> None:
@@ -334,7 +343,10 @@ class Client:
await self.create_session()
try:
await self.activate_session(
username=self._username, password=self._password, certificate=self.user_certificate
username=self._username,
password=self._password,
certificate=self.user_certificate,
certificate_chain=self.user_certificate_chain,
)
except Exception:
# clean up session
@@ -654,6 +666,7 @@ class Client:
username: Optional[str] = None,
password: Optional[str] = None,
certificate: Optional[x509.Certificate] = None,
certificate_chain: Optional[x509.Certificate] = None,
) -> ua.ActivateSessionResult:
"""
Activate session using either username and password or private_key
@@ -674,7 +687,7 @@ class Client:
if not username and not (user_certificate and self.user_private_key):
self._add_anonymous_auth(params)
elif user_certificate:
self._add_certificate_auth(params, user_certificate, challenge)
self._add_certificate_auth(params, user_certificate, challenge, certificate_chain)
else:
self._add_user_auth(params, username, password)
res = await self.uaclient.activate_session(params)
@@ -685,9 +698,12 @@ class Client:
params.UserIdentityToken = ua.AnonymousIdentityToken()
params.UserIdentityToken.PolicyId = self.server_policy(ua.UserTokenType.Anonymous).PolicyId
def _add_certificate_auth(self, params, certificate, challenge):
def _add_certificate_auth(self, params, certificate, challenge, certificate_chain=None):
params.UserIdentityToken = ua.X509IdentityToken()
params.UserIdentityToken.CertificateData = uacrypto.der_from_x509(certificate)
certificate_chain = certificate_chain or []
for cert in certificate_chain:
params.UserIdentityToken.CertificateData += uacrypto.der_from_x509(cert)
# specs part 4, 5.6.3.1: the data to sign is created by appending
# the last serverNonce to the serverCertificate
policy = self.server_policy(ua.UserTokenType.Certificate)