| """ |
| Certificate Authority Service |
| |
| Provides PKI infrastructure for VPN client certificate management |
| """ |
|
|
| import os |
| import subprocess |
| import logging |
| from cryptography import x509 |
| from cryptography.x509.oid import NameOID, ExtensionOID |
| from cryptography.hazmat.primitives import hashes, serialization |
| from cryptography.hazmat.primitives.asymmetric import rsa |
| from datetime import datetime, timedelta |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class CertificateAuthority: |
| """Certificate Authority for VPN client certificates""" |
| |
| def __init__(self, ca_dir='/etc/vpn-ca'): |
| self.ca_dir = ca_dir |
| self.ca_cert_path = os.path.join(ca_dir, 'ca.crt') |
| self.ca_key_path = os.path.join(ca_dir, 'ca.key') |
| self.crl_path = os.path.join(ca_dir, 'crl.pem') |
| self.serial_file = os.path.join(ca_dir, 'serial') |
| self.index_file = os.path.join(ca_dir, 'index.txt') |
| |
| |
| os.makedirs(ca_dir, mode=0o700, exist_ok=True) |
| |
| |
| if not os.path.exists(self.ca_cert_path): |
| self._create_root_ca() |
| |
| |
| if not os.path.exists(self.index_file): |
| with open(self.index_file, 'w') as f: |
| f.write('') |
| |
| def _create_root_ca(self): |
| """Create root CA certificate and private key""" |
| try: |
| logger.info("Creating root CA certificate") |
| |
| |
| private_key = rsa.generate_private_key( |
| public_exponent=65537, |
| key_size=4096 |
| ) |
| |
| |
| subject = issuer = x509.Name([ |
| x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), |
| x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Virtual"), |
| x509.NameAttribute(NameOID.LOCALITY_NAME, "Internet"), |
| x509.NameAttribute(NameOID.ORGANIZATION_NAME, "VPN Service CA"), |
| x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "Certificate Authority"), |
| x509.NameAttribute(NameOID.COMMON_NAME, "VPN Root CA"), |
| ]) |
| |
| cert = x509.CertificateBuilder().subject_name( |
| subject |
| ).issuer_name( |
| issuer |
| ).public_key( |
| private_key.public_key() |
| ).serial_number( |
| 1 |
| ).not_valid_before( |
| datetime.utcnow() |
| ).not_valid_after( |
| datetime.utcnow() + timedelta(days=3650) |
| ).add_extension( |
| x509.BasicConstraints(ca=True, path_length=None), |
| critical=True, |
| ).add_extension( |
| x509.KeyUsage( |
| key_cert_sign=True, |
| crl_sign=True, |
| digital_signature=False, |
| key_encipherment=False, |
| key_agreement=False, |
| data_encipherment=False, |
| content_commitment=False, |
| encipher_only=False, |
| decipher_only=False |
| ), |
| critical=True, |
| ).add_extension( |
| x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()), |
| critical=False, |
| ).sign(private_key, hashes.SHA256()) |
| |
| |
| with open(self.ca_cert_path, 'wb') as f: |
| f.write(cert.public_bytes(serialization.Encoding.PEM)) |
| |
| with open(self.ca_key_path, 'wb') as f: |
| f.write(private_key.private_bytes( |
| encoding=serialization.Encoding.PEM, |
| format=serialization.PrivateFormat.PKCS8, |
| encryption_algorithm=serialization.NoEncryption() |
| )) |
| |
| |
| os.chmod(self.ca_key_path, 0o600) |
| os.chmod(self.ca_cert_path, 0o644) |
| |
| |
| with open(self.serial_file, 'w') as f: |
| f.write('02') |
| |
| logger.info("Root CA certificate created successfully") |
| |
| except Exception as e: |
| logger.error(f"Failed to create root CA: {e}") |
| raise |
| |
| def generate_client_certificate(self, username, email, validity_days=365): |
| """Generate client certificate for VPN authentication""" |
| try: |
| logger.info(f"Generating client certificate for {username}") |
| |
| |
| with open(self.ca_cert_path, 'rb') as f: |
| ca_cert = x509.load_pem_x509_certificate(f.read()) |
| |
| with open(self.ca_key_path, 'rb') as f: |
| ca_private_key = serialization.load_pem_private_key(f.read(), password=None) |
| |
| |
| client_private_key = rsa.generate_private_key( |
| public_exponent=65537, |
| key_size=2048 |
| ) |
| |
| |
| serial_number = self._get_next_serial() |
| |
| |
| subject = x509.Name([ |
| x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), |
| x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Virtual"), |
| x509.NameAttribute(NameOID.LOCALITY_NAME, "Internet"), |
| x509.NameAttribute(NameOID.ORGANIZATION_NAME, "VPN Service"), |
| x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "VPN Clients"), |
| x509.NameAttribute(NameOID.COMMON_NAME, username), |
| x509.NameAttribute(NameOID.EMAIL_ADDRESS, email), |
| ]) |
| |
| cert = x509.CertificateBuilder().subject_name( |
| subject |
| ).issuer_name( |
| ca_cert.subject |
| ).public_key( |
| client_private_key.public_key() |
| ).serial_number( |
| serial_number |
| ).not_valid_before( |
| datetime.utcnow() |
| ).not_valid_after( |
| datetime.utcnow() + timedelta(days=validity_days) |
| ).add_extension( |
| x509.BasicConstraints(ca=False, path_length=None), |
| critical=True, |
| ).add_extension( |
| x509.KeyUsage( |
| key_cert_sign=False, |
| crl_sign=False, |
| digital_signature=True, |
| key_encipherment=True, |
| key_agreement=False, |
| data_encipherment=False, |
| content_commitment=False, |
| encipher_only=False, |
| decipher_only=False |
| ), |
| critical=True, |
| ).add_extension( |
| x509.ExtendedKeyUsage([ |
| x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH, |
| ]), |
| critical=True, |
| ).add_extension( |
| x509.SubjectKeyIdentifier.from_public_key(client_private_key.public_key()), |
| critical=False, |
| ).add_extension( |
| x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_cert.public_key()), |
| critical=False, |
| ).sign(ca_private_key, hashes.SHA256()) |
| |
| |
| self._update_certificate_index(cert, 'V') |
| |
| logger.info(f"Client certificate generated successfully for {username} (Serial: {serial_number})") |
| |
| return { |
| 'certificate': cert.public_bytes(serialization.Encoding.PEM), |
| 'private_key': client_private_key.private_bytes( |
| encoding=serialization.Encoding.PEM, |
| format=serialization.PrivateFormat.PKCS8, |
| encryption_algorithm=serialization.NoEncryption() |
| ), |
| 'serial_number': serial_number, |
| 'not_valid_before': cert.not_valid_before, |
| 'not_valid_after': cert.not_valid_after |
| } |
| |
| except Exception as e: |
| logger.error(f"Failed to generate client certificate: {e}") |
| raise |
| |
| def revoke_certificate(self, serial_number, reason='unspecified'): |
| """Revoke a client certificate""" |
| try: |
| logger.info(f"Revoking certificate with serial {serial_number}") |
| |
| |
| self._update_certificate_index_status(serial_number, 'R', reason) |
| |
| |
| self._generate_crl() |
| |
| logger.info(f"Certificate {serial_number} revoked successfully") |
| |
| except Exception as e: |
| logger.error(f"Failed to revoke certificate {serial_number}: {e}") |
| raise |
| |
| def get_certificate_status(self, serial_number): |
| """Get certificate status from index""" |
| try: |
| with open(self.index_file, 'r') as f: |
| for line in f: |
| if line.strip(): |
| parts = line.strip().split('\t') |
| if len(parts) >= 3 and parts[3] == str(serial_number): |
| status = parts[0] |
| if status == 'V': |
| return 'valid' |
| elif status == 'R': |
| return 'revoked' |
| elif status == 'E': |
| return 'expired' |
| |
| return 'unknown' |
| |
| except Exception as e: |
| logger.error(f"Failed to get certificate status: {e}") |
| return 'unknown' |
| |
| def list_certificates(self): |
| """List all certificates in the index""" |
| try: |
| certificates = [] |
| |
| with open(self.index_file, 'r') as f: |
| for line in f: |
| if line.strip(): |
| parts = line.strip().split('\t') |
| if len(parts) >= 6: |
| cert_info = { |
| 'status': parts[0], |
| 'expiry_date': parts[1], |
| 'revocation_date': parts[2] if parts[2] else None, |
| 'serial_number': parts[3], |
| 'filename': parts[4], |
| 'subject': parts[5] |
| } |
| certificates.append(cert_info) |
| |
| return certificates |
| |
| except Exception as e: |
| logger.error(f"Failed to list certificates: {e}") |
| return [] |
| |
| def _get_next_serial(self): |
| """Get next serial number for certificate""" |
| try: |
| with open(self.serial_file, 'r') as f: |
| serial = int(f.read().strip(), 16) |
| except (FileNotFoundError, ValueError): |
| serial = 2 |
| |
| |
| with open(self.serial_file, 'w') as f: |
| f.write(f'{serial + 1:02X}') |
| |
| return serial |
| |
| def _update_certificate_index(self, cert, status): |
| """Update certificate index with new certificate""" |
| try: |
| |
| expiry_date = cert.not_valid_after.strftime('%y%m%d%H%M%SZ') |
| serial_hex = f'{cert.serial_number:02X}' |
| subject_str = cert.subject.rfc4514_string() |
| |
| index_line = f"{status}\t{expiry_date}\t\t{serial_hex}\tunknown\t{subject_str}\n" |
| |
| with open(self.index_file, 'a') as f: |
| f.write(index_line) |
| |
| except Exception as e: |
| logger.error(f"Failed to update certificate index: {e}") |
| raise |
| |
| def _update_certificate_index_status(self, serial_number, new_status, reason=None): |
| """Update certificate status in index""" |
| try: |
| lines = [] |
| updated = False |
| |
| with open(self.index_file, 'r') as f: |
| lines = f.readlines() |
| |
| for i, line in enumerate(lines): |
| if line.strip(): |
| parts = line.strip().split('\t') |
| if len(parts) >= 4 and parts[3] == str(serial_number): |
| |
| parts[0] = new_status |
| if new_status == 'R' and reason: |
| |
| parts[2] = datetime.utcnow().strftime('%y%m%d%H%M%SZ') |
| |
| lines[i] = '\t'.join(parts) + '\n' |
| updated = True |
| break |
| |
| if updated: |
| with open(self.index_file, 'w') as f: |
| f.writelines(lines) |
| |
| except Exception as e: |
| logger.error(f"Failed to update certificate status: {e}") |
| raise |
| |
| def _generate_crl(self): |
| """Generate Certificate Revocation List""" |
| try: |
| logger.info("Generating Certificate Revocation List") |
| |
| |
| with open(self.ca_cert_path, 'rb') as f: |
| ca_cert = x509.load_pem_x509_certificate(f.read()) |
| |
| with open(self.ca_key_path, 'rb') as f: |
| ca_private_key = serialization.load_pem_private_key(f.read(), password=None) |
| |
| |
| revoked_certs = [] |
| |
| with open(self.index_file, 'r') as f: |
| for line in f: |
| if line.strip(): |
| parts = line.strip().split('\t') |
| if len(parts) >= 4 and parts[0] == 'R': |
| serial_number = int(parts[3], 16) |
| revocation_date = datetime.strptime(parts[2], '%y%m%d%H%M%SZ') |
| |
| revoked_cert = x509.RevokedCertificateBuilder().serial_number( |
| serial_number |
| ).revocation_date( |
| revocation_date |
| ).build() |
| |
| revoked_certs.append(revoked_cert) |
| |
| |
| crl_builder = x509.CertificateRevocationListBuilder().issuer_name( |
| ca_cert.subject |
| ).last_update( |
| datetime.utcnow() |
| ).next_update( |
| datetime.utcnow() + timedelta(days=30) |
| ) |
| |
| for revoked_cert in revoked_certs: |
| crl_builder = crl_builder.add_revoked_certificate(revoked_cert) |
| |
| crl = crl_builder.sign(ca_private_key, hashes.SHA256()) |
| |
| |
| with open(self.crl_path, 'wb') as f: |
| f.write(crl.public_bytes(serialization.Encoding.PEM)) |
| |
| logger.info("CRL generated successfully") |
| |
| except Exception as e: |
| logger.error(f"Failed to generate CRL: {e}") |
| raise |
| |
| def get_ca_certificate(self): |
| """Get CA certificate in PEM format""" |
| try: |
| with open(self.ca_cert_path, 'rb') as f: |
| return f.read() |
| except Exception as e: |
| logger.error(f"Failed to read CA certificate: {e}") |
| return None |
| |
| def get_crl(self): |
| """Get Certificate Revocation List""" |
| try: |
| if os.path.exists(self.crl_path): |
| with open(self.crl_path, 'rb') as f: |
| return f.read() |
| else: |
| |
| self._generate_crl() |
| with open(self.crl_path, 'rb') as f: |
| return f.read() |
| except Exception as e: |
| logger.error(f"Failed to read CRL: {e}") |
| return None |
| |
| def verify_certificate(self, cert_pem): |
| """Verify a certificate against the CA""" |
| try: |
| |
| cert = x509.load_pem_x509_certificate(cert_pem) |
| |
| with open(self.ca_cert_path, 'rb') as f: |
| ca_cert = x509.load_pem_x509_certificate(f.read()) |
| |
| |
| ca_public_key = ca_cert.public_key() |
| ca_public_key.verify( |
| cert.signature, |
| cert.tbs_certificate_bytes, |
| cert.signature_algorithm_oid._name |
| ) |
| |
| |
| now = datetime.utcnow() |
| if now < cert.not_valid_before or now > cert.not_valid_after: |
| return False, "Certificate expired or not yet valid" |
| |
| |
| status = self.get_certificate_status(cert.serial_number) |
| if status == 'revoked': |
| return False, "Certificate revoked" |
| |
| return True, "Certificate valid" |
| |
| except Exception as e: |
| logger.error(f"Certificate verification failed: {e}") |
| return False, str(e) |
| |
| def cleanup_expired_certificates(self): |
| """Clean up expired certificates from the index""" |
| try: |
| logger.info("Cleaning up expired certificates") |
| |
| lines = [] |
| updated_count = 0 |
| |
| with open(self.index_file, 'r') as f: |
| lines = f.readlines() |
| |
| now = datetime.utcnow() |
| |
| for i, line in enumerate(lines): |
| if line.strip(): |
| parts = line.strip().split('\t') |
| if len(parts) >= 2 and parts[0] == 'V': |
| try: |
| expiry_date = datetime.strptime(parts[1], '%y%m%d%H%M%SZ') |
| if now > expiry_date: |
| |
| parts[0] = 'E' |
| lines[i] = '\t'.join(parts) + '\n' |
| updated_count += 1 |
| except ValueError: |
| continue |
| |
| if updated_count > 0: |
| with open(self.index_file, 'w') as f: |
| f.writelines(lines) |
| |
| logger.info(f"Marked {updated_count} certificates as expired") |
| |
| except Exception as e: |
| logger.error(f"Failed to cleanup expired certificates: {e}") |
| |
| def get_statistics(self): |
| """Get CA statistics""" |
| try: |
| stats = { |
| 'total_certificates': 0, |
| 'valid_certificates': 0, |
| 'revoked_certificates': 0, |
| 'expired_certificates': 0 |
| } |
| |
| with open(self.index_file, 'r') as f: |
| for line in f: |
| if line.strip(): |
| parts = line.strip().split('\t') |
| if len(parts) >= 1: |
| stats['total_certificates'] += 1 |
| status = parts[0] |
| if status == 'V': |
| stats['valid_certificates'] += 1 |
| elif status == 'R': |
| stats['revoked_certificates'] += 1 |
| elif status == 'E': |
| stats['expired_certificates'] += 1 |
| |
| return stats |
| |
| except Exception as e: |
| logger.error(f"Failed to get CA statistics: {e}") |
| return { |
| 'total_certificates': 0, |
| 'valid_certificates': 0, |
| 'revoked_certificates': 0, |
| 'expired_certificates': 0 |
| } |
|
|
|
|