Spaces:
Runtime error
Runtime error
File size: 6,816 Bytes
6a5b8d8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
"""
IKEv2 Server Implementation for Outline VPN
"""
import os
import subprocess
import tempfile
from typing import Dict, Optional
import uuid
from datetime import datetime, timedelta
class IKEv2Server:
def __init__(self, server_ip: str, logger):
self.server_ip = server_ip
self.logger = logger
self.ca_dir = "ca"
self.cert_dir = "certs"
self.config_dir = "config"
self._setup_directories()
self._initialize_ca()
def _setup_directories(self):
"""Create necessary directories"""
for directory in [self.ca_dir, self.cert_dir, self.config_dir]:
os.makedirs(directory, exist_ok=True)
def _initialize_ca(self):
"""Initialize Certificate Authority if not already done"""
ca_key = os.path.join(self.ca_dir, "ca.key")
ca_cert = os.path.join(self.ca_dir, "ca.crt")
if not os.path.exists(ca_key) or not os.path.exists(ca_cert):
# Generate CA private key
subprocess.run([
"openssl", "genrsa",
"-out", ca_key,
"4096"
], check=True)
# Generate CA certificate
subprocess.run([
"openssl", "req",
"-x509",
"-new",
"-nodes",
"-key", ca_key,
"-sha256",
"-days", "3650",
"-out", ca_cert,
"-subj", f"/CN=Outline VPN CA"
], check=True)
def generate_certificate(self, user_id: str) -> Dict[str, str]:
"""Generate client certificate for IKEv2"""
cert_name = f"client_{user_id}"
key_path = os.path.join(self.cert_dir, f"{cert_name}.key")
csr_path = os.path.join(self.cert_dir, f"{cert_name}.csr")
cert_path = os.path.join(self.cert_dir, f"{cert_name}.crt")
p12_path = os.path.join(self.cert_dir, f"{cert_name}.p12")
try:
# Generate client private key
subprocess.run([
"openssl", "genrsa",
"-out", key_path,
"2048"
], check=True)
# Generate CSR
subprocess.run([
"openssl", "req",
"-new",
"-key", key_path,
"-out", csr_path,
"-subj", f"/CN=client_{user_id}"
], check=True)
# Sign client certificate with CA
subprocess.run([
"openssl", "x509",
"-req",
"-in", csr_path,
"-CA", os.path.join(self.ca_dir, "ca.crt"),
"-CAkey", os.path.join(self.ca_dir, "ca.key"),
"-CAcreateserial",
"-out", cert_path,
"-days", "365",
"-sha256"
], check=True)
# Create PKCS12 bundle
export_password = str(uuid.uuid4())
subprocess.run([
"openssl", "pkcs12",
"-export",
"-in", cert_path,
"-inkey", key_path,
"-out", p12_path,
"-password", f"pass:{export_password}"
], check=True)
# Read certificate files
with open(cert_path, 'r') as f:
cert_data = f.read()
with open(key_path, 'r') as f:
key_data = f.read()
with open(os.path.join(self.ca_dir, "ca.crt"), 'r') as f:
ca_data = f.read()
return {
'certificate': cert_data,
'private_key': key_data,
'ca_certificate': ca_data,
'p12_bundle': p12_path,
'p12_password': export_password
}
except Exception as e:
self.logger.error("Error generating certificate: " + str(e))
raise
def generate_strongswan_config(self, user_id: str, psk: str) -> str:
"""Generate strongSwan configuration for a user"""
config = f"""
conn outline-{user_id}
auto=add
compress=no
type=tunnel
keyexchange=ikev2
fragmentation=yes
forceencaps=yes
# Local/Server configuration
left=%any
leftsubnet=0.0.0.0/0
leftcert=/etc/ipsec.d/certs/server.crt
leftsendcert=always
leftid=@outline.vpn
# Remote/Client configuration
right=%any
rightid=%any
rightauth=eap-mschapv2
rightsourceip=10.10.10.0/24
rightdns=8.8.8.8,8.8.4.4
# Security parameters
ike=aes256-sha256-modp2048,aes128-sha1-modp2048
esp=aes256-sha256,aes128-sha1
dpdaction=clear
dpddelay=300s
rekey=no
"""
config_path = os.path.join(self.config_dir, f"outline-{user_id}.conf")
with open(config_path, 'w') as f:
f.write(config)
return config_path
def add_user(self, user_id: str, username: str, password: str, psk: str):
"""Add a new VPN user"""
# Generate certificates
cert_data = self.generate_certificate(user_id)
# Generate strongSwan config
config_path = self.generate_strongswan_config(user_id, psk)
# Add user credentials to strongSwan secrets
secrets_path = os.path.join(self.config_dir, "ipsec.secrets")
with open(secrets_path, 'a') as f:
f.write(f'{username} : EAP "{password}"\n')
f.write(f'{self.server_ip} %any : PSK "{psk}"\n')
return cert_data
def remove_user(self, user_id: str):
"""Remove a VPN user"""
# Remove certificates
cert_name = f"client_{user_id}"
for ext in ['.key', '.csr', '.crt', '.p12']:
path = os.path.join(self.cert_dir, f"{cert_name}{ext}")
if os.path.exists(path):
os.remove(path)
# Remove config
config_path = os.path.join(self.config_dir, f"outline-{user_id}.conf")
if os.path.exists(config_path):
os.remove(config_path)
# Remove from secrets (would need to rewrite the file)
# This is a bit more complex and would require parsing and rewriting ipsec.secrets
async def start(self):
"""Start the IKEv2 service"""
self.logger.info("Starting IKEv2 service...")
# Placeholder for actual IKEv2 service startup logic
# This might involve starting strongSwan or similar
pass
async def stop(self):
"""Stop the IKEv2 service"""
self.logger.info("Stopping IKEv2 service...")
# Placeholder for actual IKEv2 service shutdown logic
pass
|