JRNET / core /ikev2_server.py
Factor Studios
Upload 96 files
6a5b8d8 verified
"""
IKEv2 Server Implementation for Outline VPN
"""
import os
import subprocess
import tempfile
from typing import Dict, Optional
import uuid
from datetime import datetime, timedelta
class IKEv2Server:
def __init__(self, server_ip: str, logger):
self.server_ip = server_ip
self.logger = logger
self.ca_dir = "ca"
self.cert_dir = "certs"
self.config_dir = "config"
self._setup_directories()
self._initialize_ca()
def _setup_directories(self):
"""Create necessary directories"""
for directory in [self.ca_dir, self.cert_dir, self.config_dir]:
os.makedirs(directory, exist_ok=True)
def _initialize_ca(self):
"""Initialize Certificate Authority if not already done"""
ca_key = os.path.join(self.ca_dir, "ca.key")
ca_cert = os.path.join(self.ca_dir, "ca.crt")
if not os.path.exists(ca_key) or not os.path.exists(ca_cert):
# Generate CA private key
subprocess.run([
"openssl", "genrsa",
"-out", ca_key,
"4096"
], check=True)
# Generate CA certificate
subprocess.run([
"openssl", "req",
"-x509",
"-new",
"-nodes",
"-key", ca_key,
"-sha256",
"-days", "3650",
"-out", ca_cert,
"-subj", f"/CN=Outline VPN CA"
], check=True)
def generate_certificate(self, user_id: str) -> Dict[str, str]:
"""Generate client certificate for IKEv2"""
cert_name = f"client_{user_id}"
key_path = os.path.join(self.cert_dir, f"{cert_name}.key")
csr_path = os.path.join(self.cert_dir, f"{cert_name}.csr")
cert_path = os.path.join(self.cert_dir, f"{cert_name}.crt")
p12_path = os.path.join(self.cert_dir, f"{cert_name}.p12")
try:
# Generate client private key
subprocess.run([
"openssl", "genrsa",
"-out", key_path,
"2048"
], check=True)
# Generate CSR
subprocess.run([
"openssl", "req",
"-new",
"-key", key_path,
"-out", csr_path,
"-subj", f"/CN=client_{user_id}"
], check=True)
# Sign client certificate with CA
subprocess.run([
"openssl", "x509",
"-req",
"-in", csr_path,
"-CA", os.path.join(self.ca_dir, "ca.crt"),
"-CAkey", os.path.join(self.ca_dir, "ca.key"),
"-CAcreateserial",
"-out", cert_path,
"-days", "365",
"-sha256"
], check=True)
# Create PKCS12 bundle
export_password = str(uuid.uuid4())
subprocess.run([
"openssl", "pkcs12",
"-export",
"-in", cert_path,
"-inkey", key_path,
"-out", p12_path,
"-password", f"pass:{export_password}"
], check=True)
# Read certificate files
with open(cert_path, 'r') as f:
cert_data = f.read()
with open(key_path, 'r') as f:
key_data = f.read()
with open(os.path.join(self.ca_dir, "ca.crt"), 'r') as f:
ca_data = f.read()
return {
'certificate': cert_data,
'private_key': key_data,
'ca_certificate': ca_data,
'p12_bundle': p12_path,
'p12_password': export_password
}
except Exception as e:
self.logger.error("Error generating certificate: " + str(e))
raise
def generate_strongswan_config(self, user_id: str, psk: str) -> str:
"""Generate strongSwan configuration for a user"""
config = f"""
conn outline-{user_id}
auto=add
compress=no
type=tunnel
keyexchange=ikev2
fragmentation=yes
forceencaps=yes
# Local/Server configuration
left=%any
leftsubnet=0.0.0.0/0
leftcert=/etc/ipsec.d/certs/server.crt
leftsendcert=always
leftid=@outline.vpn
# Remote/Client configuration
right=%any
rightid=%any
rightauth=eap-mschapv2
rightsourceip=10.10.10.0/24
rightdns=8.8.8.8,8.8.4.4
# Security parameters
ike=aes256-sha256-modp2048,aes128-sha1-modp2048
esp=aes256-sha256,aes128-sha1
dpdaction=clear
dpddelay=300s
rekey=no
"""
config_path = os.path.join(self.config_dir, f"outline-{user_id}.conf")
with open(config_path, 'w') as f:
f.write(config)
return config_path
def add_user(self, user_id: str, username: str, password: str, psk: str):
"""Add a new VPN user"""
# Generate certificates
cert_data = self.generate_certificate(user_id)
# Generate strongSwan config
config_path = self.generate_strongswan_config(user_id, psk)
# Add user credentials to strongSwan secrets
secrets_path = os.path.join(self.config_dir, "ipsec.secrets")
with open(secrets_path, 'a') as f:
f.write(f'{username} : EAP "{password}"\n')
f.write(f'{self.server_ip} %any : PSK "{psk}"\n')
return cert_data
def remove_user(self, user_id: str):
"""Remove a VPN user"""
# Remove certificates
cert_name = f"client_{user_id}"
for ext in ['.key', '.csr', '.crt', '.p12']:
path = os.path.join(self.cert_dir, f"{cert_name}{ext}")
if os.path.exists(path):
os.remove(path)
# Remove config
config_path = os.path.join(self.config_dir, f"outline-{user_id}.conf")
if os.path.exists(config_path):
os.remove(config_path)
# Remove from secrets (would need to rewrite the file)
# This is a bit more complex and would require parsing and rewriting ipsec.secrets
async def start(self):
"""Start the IKEv2 service"""
self.logger.info("Starting IKEv2 service...")
# Placeholder for actual IKEv2 service startup logic
# This might involve starting strongSwan or similar
pass
async def stop(self):
"""Stop the IKEv2 service"""
self.logger.info("Stopping IKEv2 service...")
# Placeholder for actual IKEv2 service shutdown logic
pass