import os import datetime from pathlib import Path from cryptography import x509 from cryptography.x509.oid import NameOID from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.backends import default_backend class CertificateManager: def __init__(self, cert_dir='certs'): self.cert_dir = Path(cert_dir) self.cert_dir.mkdir(exist_ok=True) self.ca_key_path = self.cert_dir / 'ca.key' self.ca_cert_path = self.cert_dir / 'ca.crt' # Generate or load CA certificate if not self.ca_cert_path.exists() or not self.ca_key_path.exists(): self._generate_ca_cert() self._load_ca_cert() def _generate_ca_cert(self): """Generate a self-signed CA certificate""" # Generate private key private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) # Write private key to file with open(self.ca_key_path, 'wb') as f: f.write(private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() )) # Create self-signed certificate subject = issuer = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "California"), x509.NameAttribute(NameOID.LOCALITY_NAME, "San Francisco"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Proxy CA"), x509.NameAttribute(NameOID.COMMON_NAME, "Proxy CA Root"), ]) cert = x509.CertificateBuilder().subject_name( subject ).issuer_name( issuer ).public_key( private_key.public_key() ).serial_number( x509.random_serial_number() ).not_valid_before( datetime.datetime.utcnow() ).not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=3650) ).add_extension( x509.BasicConstraints(ca=True, path_length=None), critical=True ).add_extension( x509.KeyUsage( digital_signature=True, content_commitment=False, key_encipherment=True, data_encipherment=False, key_agreement=False, key_cert_sign=True, crl_sign=True, encipher_only=False, decipher_only=False ), critical=True ).sign(private_key, hashes.SHA256(), default_backend()) # Write certificate to file with open(self.ca_cert_path, 'wb') as f: f.write(cert.public_bytes(serialization.Encoding.PEM)) def _load_ca_cert(self): """Load the CA certificate and private key""" with open(self.ca_key_path, 'rb') as f: self.ca_key = serialization.load_pem_private_key( f.read(), password=None, backend=default_backend() ) with open(self.ca_cert_path, 'rb') as f: self.ca_cert = x509.load_pem_x509_certificate( f.read(), default_backend() ) def get_domain_cert(self, domain): """Get or generate a certificate for the specified domain""" cert_path = self.cert_dir / f"{domain}.crt" key_path = self.cert_dir / f"{domain}.key" if cert_path.exists() and key_path.exists(): # Load existing certificate and key with open(key_path, 'rb') as f: private_key = serialization.load_pem_private_key( f.read(), password=None, backend=default_backend() ) with open(cert_path, 'rb') as f: cert = x509.load_pem_x509_certificate( f.read(), default_backend() ) return private_key, cert # Generate new certificate return self._generate_domain_cert(domain) def _generate_domain_cert(self, domain): """Generate a certificate for the specified domain signed by the CA""" # Generate private key private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) # Write private key to file key_path = self.cert_dir / f"{domain}.key" with open(key_path, 'wb') as f: f.write(private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() )) # Create certificate subject = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "California"), x509.NameAttribute(NameOID.LOCALITY_NAME, "San Francisco"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Proxy Server"), x509.NameAttribute(NameOID.COMMON_NAME, domain), ]) cert = x509.CertificateBuilder().subject_name( subject ).issuer_name( self.ca_cert.subject ).public_key( private_key.public_key() ).serial_number( x509.random_serial_number() ).not_valid_before( datetime.datetime.utcnow() ).not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=365) ).add_extension( x509.SubjectAlternativeName([x509.DNSName(domain)]), critical=False ).sign(self.ca_key, hashes.SHA256(), default_backend()) # Write certificate to file cert_path = self.cert_dir / f"{domain}.crt" with open(cert_path, 'wb') as f: f.write(cert.public_bytes(serialization.Encoding.PEM)) return private_key, cert