import datetime from pathlib import Path from typing import Any, Tuple from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.x509.oid import NameOID class CertificateManager: def __init__(self, cert_dir: str = "certs"): self.cert_dir = Path(cert_dir) self.cert_dir.mkdir(exist_ok=True) self.ca_key_path = self.cert_dir / "ca.key" self.ca_cert_path = self.cert_dir / "ca.crt" # Generate or load CA certificate if not self.ca_cert_path.exists() or not self.ca_key_path.exists(): self._generate_ca_cert() self._load_ca_cert() def _generate_ca_cert(self): """Generate a self-signed CA certificate""" # Generate private key private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) # Write private key to file with open(self.ca_key_path, "wb") as f: f.write( private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ) ) # Create self-signed certificate subject = issuer = x509.Name( [ x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "California"), x509.NameAttribute(NameOID.LOCALITY_NAME, "San Francisco"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Proxy CA"), x509.NameAttribute(NameOID.COMMON_NAME, "Proxy CA Root"), ] ) cert = ( x509.CertificateBuilder() .subject_name(subject) .issuer_name(issuer) .public_key(private_key.public_key()) .serial_number(x509.random_serial_number()) .not_valid_before(datetime.datetime.utcnow()) .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=3650)) .add_extension( x509.BasicConstraints(ca=True, path_length=None), critical=True ) .add_extension( x509.KeyUsage( digital_signature=True, content_commitment=False, key_encipherment=True, data_encipherment=False, key_agreement=False, key_cert_sign=True, crl_sign=True, encipher_only=False, decipher_only=False, ), critical=True, ) .sign(private_key, hashes.SHA256(), default_backend()) ) # Write certificate to file with open(self.ca_cert_path, "wb") as f: f.write(cert.public_bytes(serialization.Encoding.PEM)) def _load_ca_cert(self): """Load the CA certificate and private key""" with open(self.ca_key_path, "rb") as f: self.ca_key = serialization.load_pem_private_key( f.read(), password=None, backend=default_backend() ) with open(self.ca_cert_path, "rb") as f: self.ca_cert = x509.load_pem_x509_certificate(f.read(), default_backend()) def get_domain_cert(self, domain: str) -> Tuple[Any, Any]: """Get or generate a certificate for the specified domain""" cert_path = self.cert_dir / f"{domain}.crt" key_path = self.cert_dir / f"{domain}.key" if cert_path.exists() and key_path.exists(): # Load existing certificate and key with open(key_path, "rb") as f: private_key = serialization.load_pem_private_key( f.read(), password=None, backend=default_backend() ) with open(cert_path, "rb") as f: cert = x509.load_pem_x509_certificate(f.read(), default_backend()) return private_key, cert # Generate new certificate return self._generate_domain_cert(domain) def _generate_domain_cert(self, domain: str) -> Tuple[Any, Any]: """Generate a certificate for the specified domain signed by the CA""" # Generate private key private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) # Write private key to file key_path = self.cert_dir / f"{domain}.key" with open(key_path, "wb") as f: f.write( private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ) ) # Create certificate subject = x509.Name( [ x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "California"), x509.NameAttribute(NameOID.LOCALITY_NAME, "San Francisco"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Proxy Server"), x509.NameAttribute(NameOID.COMMON_NAME, domain), ] ) cert = ( x509.CertificateBuilder() .subject_name(subject) .issuer_name(self.ca_cert.subject) .public_key(private_key.public_key()) .serial_number(x509.random_serial_number()) .not_valid_before(datetime.datetime.utcnow()) .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=365)) .add_extension( x509.SubjectAlternativeName([x509.DNSName(domain)]), critical=False ) .sign(self.ca_key, hashes.SHA256(), default_backend()) # type: ignore[arg-type] ) # Write certificate to file cert_path = self.cert_dir / f"{domain}.crt" with open(cert_path, "wb") as f: f.write(cert.public_bytes(serialization.Encoding.PEM)) return private_key, cert