1
0
mirror of https://github.com/thingsboard/thingsboard-gateway synced 2025-10-26 22:31:42 +08:00

Revert "Revert "Fixes for PR #880""

This reverts commit 53628473e1.
This commit is contained in:
samson0v
2022-07-15 14:49:40 +03:00
parent 53628473e1
commit 1317a7b603
3 changed files with 36 additions and 43 deletions

View File

@@ -53,6 +53,7 @@ setup(
'thingsboard_gateway.extensions.socket', 'thingsboard_gateway.extensions.xmpp',
],
install_requires=[
'pyopenssl',
'jsonpath-rw',
'regex',
'pip',

View File

@@ -22,7 +22,7 @@ from thingsboard_gateway.tb_utility.tb_utility import TBUtility
log = logging.getLogger("tb_connection")
CHECK_CERT_PERIOD = 24
CHECK_CERT_PERIOD = 86400
CERTIFICATE_DAYS_LEFT = 3
@@ -86,20 +86,20 @@ class TBClient(threading.Thread):
if time() - self._last_cert_check_time >= CHECK_CERT_PERIOD:
if self.__cert:
log.info('Will generate new certificate')
new_cert, new_key = TBUtility.check_certificate(self.__cert, days_left=CERTIFICATE_DAYS_LEFT)
new_cert = TBUtility.check_certificate(self.__cert, key=self.__private_key,
days_left=CERTIFICATE_DAYS_LEFT)
if new_cert:
self.client.send_attributes({'newCertificate': new_cert, 'newKey': new_key})
self.client.send_attributes({'newCertificate': new_cert})
if self.__ca_cert:
log.info('Will generate bew CA')
new_ca_cert, new_ca_key = TBUtility.check_certificate(self.__ca_cert,
days_left=CERTIFICATE_DAYS_LEFT)
is_outdated = TBUtility.check_certificate(self.__ca_cert, generate_new=False,
days_left=CERTIFICATE_DAYS_LEFT)
if new_ca_cert:
self.client.send_attributes({'newCACertificate': new_ca_cert, 'newCAKey': new_ca_key})
if is_outdated:
self.client.send_attributes({'CACertificate': 'CA certificate will outdated soon'})
sleep(.2)
sleep(10)
def pause(self):
self.__paused = True

View File

@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import ssl
from logging import getLogger
from re import search, findall
import OpenSSL
from jsonpath_rw import parse
from simplejson import JSONDecodeError, dumps, loads
@@ -173,49 +173,41 @@ class TBUtility:
return list(dictionary.values())[list(dictionary.values()).index(value)]
@staticmethod
def generate_certificate(old_certificate):
try:
import OpenSSL
except ImportError:
TBUtility.install_package('pyopenssl')
import OpenSSL
def generate_certificate(old_certificate_path, old_key_path, old_certificate):
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_EC, 2048)
cert = OpenSSL.crypto.X509()
subject = cert.get_subject()
subject.C = old_certificate['issuer'][0][-1][-1]
subject.ST = old_certificate['issuer'][1][-1][-1]
subject.L = old_certificate['issuer'][2][-1][-1]
subject.O = old_certificate['issuer'][3][-1][-1]
subject.OU = old_certificate['issuer'][4][-1][-1]
subject.CN = old_certificate['issuer'][5][-1][-1]
subject.emailAddress = old_certificate['issuer'][6][-1][-1]
cert.set_version(2)
cert.set_issuer(subject)
cert.set_subject(subject)
cert.set_serial_number(int(123123123))
cert.set_version(old_certificate.get_version())
cert.set_issuer(old_certificate.get_issuer())
cert.set_subject(old_certificate.get_subject())
cert.set_serial_number(old_certificate.get_serial_number())
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(31536000)
cert.set_pubkey(key)
cert.sign(key, 'sha256')
cert.sign(key, old_certificate.get_signature_algorithm().decode())
cert = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert).decode("utf-8")
key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key).decode("utf-8")
return cert, key
cert = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
with open(old_certificate_path, 'wb+') as f:
f.write(cert)
key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
with open(old_key_path, 'wb+') as f:
f.write(key)
return cert
@staticmethod
def check_certificate(certificate, days_left=3):
ctx = ssl.SSLContext()
ctx.load_verify_locations(certificate)
certificate_details = ctx.get_ca_certs()
cert_detail_dict = certificate_details[0]
not_after_str = cert_detail_dict['notAfter'].split(' GMT')[0]
not_after = datetime.datetime.strptime(not_after_str, '%b %d %H:%M:%S %Y')
def check_certificate(certificate, key=None, generate_new=True, days_left=3):
cert_detail = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM,
open(certificate, 'rb').read()
)
not_after = datetime.datetime.strptime(cert_detail.get_notAfter().decode('utf-8')[:-1], '%Y%m%d%H%M%S')
if not_after - datetime.datetime.now() <= datetime.timedelta(days=days_left):
return TBUtility.generate_certificate(cert_detail_dict)
if generate_new:
return TBUtility.generate_certificate(certificate, key, cert_detail)
else:
return True