mirror of
				https://github.com/thingsboard/thingsboard-gateway
				synced 2025-10-26 22:31:42 +08:00 
			
		
		
		
	
							parent
							
								
									4ad52750b1
								
							
						
					
					
						commit
						53628473e1
					
				
							
								
								
									
										1
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								setup.py
									
									
									
									
									
								
							|  | @ -53,7 +53,6 @@ setup( | |||
|               'thingsboard_gateway.extensions.socket', 'thingsboard_gateway.extensions.xmpp', | ||||
|               ], | ||||
|     install_requires=[ | ||||
|         'pyopenssl', | ||||
|         'jsonpath-rw', | ||||
|         'regex', | ||||
|         'pip', | ||||
|  |  | |||
|  | @ -22,7 +22,7 @@ from thingsboard_gateway.tb_utility.tb_utility import TBUtility | |||
| 
 | ||||
| log = logging.getLogger("tb_connection") | ||||
| 
 | ||||
| CHECK_CERT_PERIOD = 86400 | ||||
| CHECK_CERT_PERIOD = 24 | ||||
| CERTIFICATE_DAYS_LEFT = 3 | ||||
| 
 | ||||
| 
 | ||||
|  | @ -86,20 +86,20 @@ class TBClient(threading.Thread): | |||
|             if time() - self._last_cert_check_time >= CHECK_CERT_PERIOD: | ||||
|                 if self.__cert: | ||||
|                     log.info('Will generate new certificate') | ||||
|                     new_cert = TBUtility.check_certificate(self.__cert, key=self.__private_key, | ||||
|                                                            days_left=CERTIFICATE_DAYS_LEFT) | ||||
|                     new_cert, new_key = TBUtility.check_certificate(self.__cert, days_left=CERTIFICATE_DAYS_LEFT) | ||||
| 
 | ||||
|                     if new_cert: | ||||
|                         self.client.send_attributes({'newCertificate': new_cert}) | ||||
|                         self.client.send_attributes({'newCertificate': new_cert, 'newKey': new_key}) | ||||
| 
 | ||||
|                 if self.__ca_cert: | ||||
|                     is_outdated = TBUtility.check_certificate(self.__ca_cert, generate_new=False, | ||||
|                                                               days_left=CERTIFICATE_DAYS_LEFT) | ||||
|                     log.info('Will generate bew CA') | ||||
|                     new_ca_cert, new_ca_key = TBUtility.check_certificate(self.__ca_cert, | ||||
|                                                                           days_left=CERTIFICATE_DAYS_LEFT) | ||||
| 
 | ||||
|                     if is_outdated: | ||||
|                         self.client.send_attributes({'CACertificate': 'CA certificate will outdated soon'}) | ||||
|                     if new_ca_cert: | ||||
|                         self.client.send_attributes({'newCACertificate': new_ca_cert, 'newCAKey': new_ca_key}) | ||||
| 
 | ||||
|             sleep(10) | ||||
|             sleep(.2) | ||||
| 
 | ||||
|     def pause(self): | ||||
|         self.__paused = True | ||||
|  |  | |||
|  | @ -12,10 +12,10 @@ | |||
| #     See the License for the specific language governing permissions and | ||||
| #     limitations under the License. | ||||
| import datetime | ||||
| import ssl | ||||
| from logging import getLogger | ||||
| from re import search, findall | ||||
| 
 | ||||
| import OpenSSL | ||||
| from jsonpath_rw import parse | ||||
| from simplejson import JSONDecodeError, dumps, loads | ||||
| 
 | ||||
|  | @ -173,41 +173,49 @@ class TBUtility: | |||
|         return list(dictionary.values())[list(dictionary.values()).index(value)] | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def generate_certificate(old_certificate_path, old_key_path, old_certificate): | ||||
|     def generate_certificate(old_certificate): | ||||
|         try: | ||||
|             import OpenSSL | ||||
|         except ImportError: | ||||
|             TBUtility.install_package('pyopenssl') | ||||
|             import OpenSSL | ||||
| 
 | ||||
|         key = OpenSSL.crypto.PKey() | ||||
|         key.generate_key(OpenSSL.crypto.TYPE_EC, 2048) | ||||
| 
 | ||||
|         cert = OpenSSL.crypto.X509() | ||||
| 
 | ||||
|         cert.set_version(old_certificate.get_version()) | ||||
|         cert.set_issuer(old_certificate.get_issuer()) | ||||
|         cert.set_subject(old_certificate.get_subject()) | ||||
|         cert.set_serial_number(old_certificate.get_serial_number()) | ||||
|         subject = cert.get_subject() | ||||
|         subject.C = old_certificate['issuer'][0][-1][-1] | ||||
|         subject.ST = old_certificate['issuer'][1][-1][-1] | ||||
|         subject.L = old_certificate['issuer'][2][-1][-1] | ||||
|         subject.O = old_certificate['issuer'][3][-1][-1] | ||||
|         subject.OU = old_certificate['issuer'][4][-1][-1] | ||||
|         subject.CN = old_certificate['issuer'][5][-1][-1] | ||||
|         subject.emailAddress = old_certificate['issuer'][6][-1][-1] | ||||
| 
 | ||||
|         cert.set_version(2) | ||||
|         cert.set_issuer(subject) | ||||
|         cert.set_subject(subject) | ||||
|         cert.set_serial_number(int(123123123)) | ||||
|         cert.gmtime_adj_notBefore(0) | ||||
|         cert.gmtime_adj_notAfter(31536000) | ||||
|         cert.set_pubkey(key) | ||||
|         cert.sign(key, old_certificate.get_signature_algorithm().decode()) | ||||
|         cert.sign(key, 'sha256') | ||||
| 
 | ||||
|         cert = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) | ||||
|         with open(old_certificate_path, 'wb+') as f: | ||||
|             f.write(cert) | ||||
| 
 | ||||
|         key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key) | ||||
|         with open(old_key_path, 'wb+') as f: | ||||
|             f.write(key) | ||||
| 
 | ||||
|         return cert | ||||
|         cert = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert).decode("utf-8") | ||||
|         key = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key).decode("utf-8") | ||||
|         return cert, key | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def check_certificate(certificate, key=None, generate_new=True, days_left=3): | ||||
|         cert_detail = OpenSSL.crypto.load_certificate( | ||||
|             OpenSSL.crypto.FILETYPE_PEM, | ||||
|             open(certificate, 'rb').read() | ||||
|         ) | ||||
|         not_after = datetime.datetime.strptime(cert_detail.get_notAfter().decode('utf-8')[:-1], '%Y%m%d%H%M%S') | ||||
|     def check_certificate(certificate, days_left=3): | ||||
|         ctx = ssl.SSLContext() | ||||
|         ctx.load_verify_locations(certificate) | ||||
|         certificate_details = ctx.get_ca_certs() | ||||
| 
 | ||||
|         cert_detail_dict = certificate_details[0] | ||||
|         not_after_str = cert_detail_dict['notAfter'].split(' GMT')[0] | ||||
|         not_after = datetime.datetime.strptime(not_after_str, '%b %d %H:%M:%S %Y') | ||||
| 
 | ||||
|         if not_after - datetime.datetime.now() <= datetime.timedelta(days=days_left): | ||||
|             if generate_new: | ||||
|                 return TBUtility.generate_certificate(certificate, key, cert_detail) | ||||
|             else: | ||||
|                 return True | ||||
|             return TBUtility.generate_certificate(cert_detail_dict) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue
	
	Block a user
	 samson0v
						samson0v