1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Changed certificate generation algorithm

This commit is contained in:
samson0v
2022-07-18 11:34:12 +03:00
parent c45b011c16
commit a476487313
2 changed files with 22 additions and 21 deletions

View File

@@ -99,6 +99,8 @@ class TBClient(threading.Thread):
if is_outdated:
self.client.send_attributes({'CACertificate': 'CA certificate will outdated soon'})
self._last_cert_check_time = time()
sleep(10)
def pause(self):

View File

@@ -16,6 +16,10 @@ from logging import getLogger
from re import search, findall
import OpenSSL
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from jsonpath_rw import parse
from simplejson import JSONDecodeError, dumps, loads
@@ -174,25 +178,24 @@ class TBUtility:
@staticmethod
def generate_certificate(old_certificate_path, old_key_path, old_certificate):
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
key = ec.generate_private_key(ec.SECP256R1())
public_key = key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(old_certificate.subject)
builder = builder.issuer_name(old_certificate.issuer)
builder = builder.not_valid_before(datetime.datetime.today() - datetime.timedelta(days=1))
builder = builder.not_valid_after(datetime.datetime.today() + (datetime.timedelta(1, 0, 0) * 365))
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(public_key)
certificate = builder.sign(private_key=key, algorithm=hashes.SHA256())
cert = OpenSSL.crypto.X509()
cert.set_version(old_certificate.get_version())
cert.set_issuer(old_certificate.get_issuer())
cert.set_subject(old_certificate.get_subject())
cert.set_serial_number(old_certificate.get_serial_number())
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(31536000)
cert.set_pubkey(key)
cert.sign(key, old_certificate.get_signature_algorithm().decode())
cert = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
cert = certificate.public_bytes(serialization.Encoding.PEM)
with open(old_certificate_path, 'wb+') as f:
f.write(cert)
key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
key = key.private_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
with open(old_key_path, 'wb+') as f:
f.write(key)
@@ -200,13 +203,9 @@ class TBUtility:
@staticmethod
def check_certificate(certificate, key=None, generate_new=True, days_left=3):
cert_detail = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM,
open(certificate, 'rb').read()
)
not_after = datetime.datetime.strptime(cert_detail.get_notAfter().decode('utf-8')[:-1], '%Y%m%d%H%M%S')
cert_detail = x509.load_pem_x509_certificate(open(certificate, 'rb').read())
if not_after - datetime.datetime.now() <= datetime.timedelta(days=days_left):
if cert_detail.not_valid_after - datetime.datetime.now() <= datetime.timedelta(days=days_left):
if generate_new:
return TBUtility.generate_certificate(certificate, key, cert_detail)
else: