| """
|
| IKEv2 Server Implementation for Outline VPN
|
| """
|
|
|
| import os
|
| import subprocess
|
| import tempfile
|
| from typing import Dict, Optional
|
| import uuid
|
| from datetime import datetime, timedelta
|
|
|
| class IKEv2Server:
|
| def __init__(self, server_ip: str, logger):
|
| self.server_ip = server_ip
|
| self.logger = logger
|
| self.ca_dir = "ca"
|
| self.cert_dir = "certs"
|
| self.config_dir = "config"
|
| self._setup_directories()
|
| self._initialize_ca()
|
|
|
| def _setup_directories(self):
|
| """Create necessary directories"""
|
| for directory in [self.ca_dir, self.cert_dir, self.config_dir]:
|
| os.makedirs(directory, exist_ok=True)
|
|
|
| def _initialize_ca(self):
|
| """Initialize Certificate Authority if not already done"""
|
| ca_key = os.path.join(self.ca_dir, "ca.key")
|
| ca_cert = os.path.join(self.ca_dir, "ca.crt")
|
|
|
| if not os.path.exists(ca_key) or not os.path.exists(ca_cert):
|
|
|
| subprocess.run([
|
| "openssl", "genrsa",
|
| "-out", ca_key,
|
| "4096"
|
| ], check=True)
|
|
|
|
|
| subprocess.run([
|
| "openssl", "req",
|
| "-x509",
|
| "-new",
|
| "-nodes",
|
| "-key", ca_key,
|
| "-sha256",
|
| "-days", "3650",
|
| "-out", ca_cert,
|
| "-subj", f"/CN=Outline VPN CA"
|
| ], check=True)
|
|
|
| def generate_certificate(self, user_id: str) -> Dict[str, str]:
|
| """Generate client certificate for IKEv2"""
|
| cert_name = f"client_{user_id}"
|
| key_path = os.path.join(self.cert_dir, f"{cert_name}.key")
|
| csr_path = os.path.join(self.cert_dir, f"{cert_name}.csr")
|
| cert_path = os.path.join(self.cert_dir, f"{cert_name}.crt")
|
| p12_path = os.path.join(self.cert_dir, f"{cert_name}.p12")
|
|
|
| try:
|
|
|
| subprocess.run([
|
| "openssl", "genrsa",
|
| "-out", key_path,
|
| "2048"
|
| ], check=True)
|
|
|
|
|
| subprocess.run([
|
| "openssl", "req",
|
| "-new",
|
| "-key", key_path,
|
| "-out", csr_path,
|
| "-subj", f"/CN=client_{user_id}"
|
| ], check=True)
|
|
|
|
|
| subprocess.run([
|
| "openssl", "x509",
|
| "-req",
|
| "-in", csr_path,
|
| "-CA", os.path.join(self.ca_dir, "ca.crt"),
|
| "-CAkey", os.path.join(self.ca_dir, "ca.key"),
|
| "-CAcreateserial",
|
| "-out", cert_path,
|
| "-days", "365",
|
| "-sha256"
|
| ], check=True)
|
|
|
|
|
| export_password = str(uuid.uuid4())
|
| subprocess.run([
|
| "openssl", "pkcs12",
|
| "-export",
|
| "-in", cert_path,
|
| "-inkey", key_path,
|
| "-out", p12_path,
|
| "-password", f"pass:{export_password}"
|
| ], check=True)
|
|
|
|
|
| with open(cert_path, 'r') as f:
|
| cert_data = f.read()
|
| with open(key_path, 'r') as f:
|
| key_data = f.read()
|
| with open(os.path.join(self.ca_dir, "ca.crt"), 'r') as f:
|
| ca_data = f.read()
|
|
|
| return {
|
| 'certificate': cert_data,
|
| 'private_key': key_data,
|
| 'ca_certificate': ca_data,
|
| 'p12_bundle': p12_path,
|
| 'p12_password': export_password
|
| }
|
|
|
| except Exception as e:
|
| self.logger.error("Error generating certificate: " + str(e))
|
| raise
|
|
|
| def generate_strongswan_config(self, user_id: str, psk: str) -> str:
|
| """Generate strongSwan configuration for a user"""
|
| config = f"""
|
| conn outline-{user_id}
|
| auto=add
|
| compress=no
|
| type=tunnel
|
| keyexchange=ikev2
|
| fragmentation=yes
|
| forceencaps=yes
|
|
|
| # Local/Server configuration
|
| left=%any
|
| leftsubnet=0.0.0.0/0
|
| leftcert=/etc/ipsec.d/certs/server.crt
|
| leftsendcert=always
|
| leftid=@outline.vpn
|
|
|
| # Remote/Client configuration
|
| right=%any
|
| rightid=%any
|
| rightauth=eap-mschapv2
|
| rightsourceip=10.10.10.0/24
|
| rightdns=8.8.8.8,8.8.4.4
|
|
|
| # Security parameters
|
| ike=aes256-sha256-modp2048,aes128-sha1-modp2048
|
| esp=aes256-sha256,aes128-sha1
|
| dpdaction=clear
|
| dpddelay=300s
|
| rekey=no
|
| """
|
| config_path = os.path.join(self.config_dir, f"outline-{user_id}.conf")
|
| with open(config_path, 'w') as f:
|
| f.write(config)
|
|
|
| return config_path
|
|
|
| def add_user(self, user_id: str, username: str, password: str, psk: str):
|
| """Add a new VPN user"""
|
|
|
| cert_data = self.generate_certificate(user_id)
|
|
|
|
|
| config_path = self.generate_strongswan_config(user_id, psk)
|
|
|
|
|
| secrets_path = os.path.join(self.config_dir, "ipsec.secrets")
|
| with open(secrets_path, 'a') as f:
|
| f.write(f'{username} : EAP "{password}"\n')
|
| f.write(f'{self.server_ip} %any : PSK "{psk}"\n')
|
|
|
| return cert_data
|
|
|
| def remove_user(self, user_id: str):
|
| """Remove a VPN user"""
|
|
|
| cert_name = f"client_{user_id}"
|
| for ext in ['.key', '.csr', '.crt', '.p12']:
|
| path = os.path.join(self.cert_dir, f"{cert_name}{ext}")
|
| if os.path.exists(path):
|
| os.remove(path)
|
|
|
|
|
| config_path = os.path.join(self.config_dir, f"outline-{user_id}.conf")
|
| if os.path.exists(config_path):
|
| os.remove(config_path)
|
|
|
|
|
|
|
|
|
|
|
| async def start(self): |
| """Start the IKEv2 service""" |
| self.logger.info("Starting IKEv2 service...") |
| |
| |
| pass |
|
|
| async def stop(self): |
| """Stop the IKEv2 service""" |
| self.logger.info("Stopping IKEv2 service...") |
| |
| pass |
|
|
|
|
|
|