Spaces:
Paused
Paused
| """ | |
| VPN Server Manager | |
| Manages multiple VPN protocols including OpenVPN, IKEv2/IPSec, and WireGuard | |
| """ | |
| import os | |
| import subprocess | |
| import logging | |
| import json | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| from datetime import datetime, timedelta | |
| logger = logging.getLogger(__name__) | |
| class VPNServerManager: | |
| """Manages VPN server configurations and operations""" | |
| def __init__(self, config_dir='/etc/vpn-server'): | |
| self.config_dir = config_dir | |
| self.openvpn_dir = os.path.join(config_dir, 'openvpn') | |
| self.ipsec_dir = os.path.join(config_dir, 'ipsec') | |
| self.wireguard_dir = os.path.join(config_dir, 'wireguard') | |
| # Create directories | |
| for directory in [self.config_dir, self.openvpn_dir, self.ipsec_dir, self.wireguard_dir]: | |
| os.makedirs(directory, mode=0o755, exist_ok=True) | |
| # Server configuration | |
| self.server_ip = self._get_server_ip() | |
| self.openvpn_port = 1194 | |
| self.ipsec_port = 500 | |
| self.wireguard_port = 51820 | |
| # Initialize servers | |
| self._initialize_servers() | |
| def _get_server_ip(self): | |
| """Get server public IP address""" | |
| try: | |
| # Try to get public IP | |
| result = subprocess.run(['curl', '-s', 'ifconfig.me'], | |
| capture_output=True, text=True, timeout=10) | |
| if result.returncode == 0 and result.stdout.strip(): | |
| return result.stdout.strip() | |
| except: | |
| pass | |
| # Fallback to local IP | |
| try: | |
| result = subprocess.run(['hostname', '-I'], | |
| capture_output=True, text=True) | |
| if result.returncode == 0: | |
| return result.stdout.strip().split()[0] | |
| except: | |
| pass | |
| return '127.0.0.1' | |
| def _initialize_servers(self): | |
| """Initialize VPN server configurations""" | |
| try: | |
| logger.info("Initializing VPN servers") | |
| # Initialize OpenVPN | |
| self._setup_openvpn_server() | |
| # Initialize IKEv2/IPSec | |
| self._setup_ipsec_server() | |
| # Initialize WireGuard | |
| self._setup_wireguard_server() | |
| logger.info("VPN servers initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize VPN servers: {e}") | |
| raise | |
| def _setup_openvpn_server(self): | |
| """Set up OpenVPN server configuration""" | |
| try: | |
| logger.info("Setting up OpenVPN server") | |
| # Create server configuration | |
| server_conf = f""" | |
| # OpenVPN Server Configuration | |
| port {self.openvpn_port} | |
| proto udp | |
| dev tun | |
| # Certificates and keys | |
| ca /etc/vpn-ca/ca.crt | |
| cert /etc/vpn-server/openvpn/server.crt | |
| key /etc/vpn-server/openvpn/server.key | |
| dh /etc/vpn-server/openvpn/dh.pem | |
| # Network configuration | |
| server 10.8.0.0 255.255.255.0 | |
| ifconfig-pool-persist /var/log/openvpn/ipp.txt | |
| # Client configuration | |
| client-config-dir /etc/vpn-server/openvpn/ccd | |
| client-to-client | |
| # Security | |
| tls-auth /etc/vpn-server/openvpn/ta.key 0 | |
| cipher AES-256-GCM | |
| auth SHA256 | |
| tls-version-min 1.2 | |
| # Networking | |
| push "redirect-gateway def1 bypass-dhcp" | |
| push "dhcp-option DNS 8.8.8.8" | |
| push "dhcp-option DNS 8.8.4.4" | |
| # Connection settings | |
| keepalive 10 120 | |
| comp-lzo | |
| persist-key | |
| persist-tun | |
| # Logging | |
| status /var/log/openvpn/openvpn-status.log | |
| log-append /var/log/openvpn/openvpn.log | |
| verb 3 | |
| mute 20 | |
| # Management interface | |
| management 127.0.0.1 7505 | |
| # User/group | |
| user nobody | |
| group nogroup | |
| """ | |
| server_conf_path = os.path.join(self.openvpn_dir, 'server.conf') | |
| with open(server_conf_path, 'w') as f: | |
| f.write(server_conf) | |
| # Create client config directory | |
| ccd_dir = os.path.join(self.openvpn_dir, 'ccd') | |
| os.makedirs(ccd_dir, mode=0o755, exist_ok=True) | |
| # Create log directory | |
| os.makedirs('/var/log/openvpn', mode=0o755, exist_ok=True) | |
| # Generate server certificates if they don't exist | |
| self._generate_openvpn_certificates() | |
| logger.info("OpenVPN server configuration created") | |
| except Exception as e: | |
| logger.error(f"Failed to setup OpenVPN server: {e}") | |
| raise | |
| def _generate_openvpn_certificates(self): | |
| """Generate OpenVPN server certificates""" | |
| try: | |
| server_cert_path = os.path.join(self.openvpn_dir, 'server.crt') | |
| server_key_path = os.path.join(self.openvpn_dir, 'server.key') | |
| dh_path = os.path.join(self.openvpn_dir, 'dh.pem') | |
| ta_key_path = os.path.join(self.openvpn_dir, 'ta.key') | |
| # Generate server certificate using our CA | |
| from services.certificate_authority import CertificateAuthority | |
| ca = CertificateAuthority() | |
| if not os.path.exists(server_cert_path): | |
| logger.info("Generating OpenVPN server certificate") | |
| cert_data = ca.generate_client_certificate( | |
| username='openvpn-server', | |
| email='server@vpn.local', | |
| validity_days=3650 | |
| ) | |
| with open(server_cert_path, 'wb') as f: | |
| f.write(cert_data['certificate']) | |
| with open(server_key_path, 'wb') as f: | |
| f.write(cert_data['private_key']) | |
| os.chmod(server_key_path, 0o600) | |
| # Generate DH parameters | |
| if not os.path.exists(dh_path): | |
| logger.info("Generating DH parameters (this may take a while)") | |
| subprocess.run(['openssl', 'dhparam', '-out', dh_path, '2048'], | |
| check=True) | |
| # Generate TLS auth key | |
| if not os.path.exists(ta_key_path): | |
| logger.info("Generating TLS auth key") | |
| subprocess.run(['openvpn', '--genkey', '--secret', ta_key_path], | |
| check=True) | |
| os.chmod(ta_key_path, 0o600) | |
| except Exception as e: | |
| logger.error(f"Failed to generate OpenVPN certificates: {e}") | |
| raise | |
| def _setup_ipsec_server(self): | |
| """Set up IKEv2/IPSec server configuration""" | |
| try: | |
| logger.info("Setting up IKEv2/IPSec server") | |
| # strongSwan configuration | |
| ipsec_conf = f""" | |
| # strongSwan IPSec configuration | |
| config setup | |
| charondebug="ike 1, knl 1, cfg 0" | |
| uniqueids=no | |
| conn ikev2-vpn | |
| auto=add | |
| compress=no | |
| type=tunnel | |
| keyexchange=ikev2 | |
| fragmentation=yes | |
| forceencaps=yes | |
| # Server side | |
| left=%any | |
| leftid=@vpn.server.local | |
| leftcert=server.crt | |
| leftsendcert=always | |
| leftsubnet=0.0.0.0/0 | |
| # Client side | |
| right=%any | |
| rightid=%any | |
| rightsourceip=10.10.10.0/24 | |
| rightdns=8.8.8.8,8.8.4.4 | |
| # Security | |
| ike=chacha20poly1305-sha256-curve25519-prfsha256,aes256gcm16-sha384-prfsha384-ecp384,aes256-sha1-modp1024,aes128-sha1-modp1024,3des-sha1-modp1024! | |
| esp=chacha20poly1305-sha256,aes256gcm16-ecp384,aes256-sha256,aes256-sha1,3des-sha1! | |
| # Authentication | |
| leftauth=pubkey | |
| rightauth=eap-mschapv2 | |
| rightsendcert=never | |
| eap_identity=%identity | |
| # Other settings | |
| dpdaction=clear | |
| dpddelay=300s | |
| rekey=no | |
| """ | |
| # strongSwan configuration | |
| ipsec_conf_path = os.path.join(self.ipsec_dir, 'ipsec.conf') | |
| with open(ipsec_conf_path, 'w') as f: | |
| f.write(ipsec_conf) | |
| # strongSwan secrets configuration | |
| ipsec_secrets = """ | |
| # strongSwan IPSec secrets | |
| : RSA "server.key" | |
| """ | |
| ipsec_secrets_path = os.path.join(self.ipsec_dir, 'ipsec.secrets') | |
| with open(ipsec_secrets_path, 'w') as f: | |
| f.write(ipsec_secrets) | |
| os.chmod(ipsec_secrets_path, 0o600) | |
| # Generate IPSec certificates | |
| self._generate_ipsec_certificates() | |
| logger.info("IKEv2/IPSec server configuration created") | |
| except Exception as e: | |
| logger.error(f"Failed to setup IKEv2/IPSec server: {e}") | |
| raise | |
| def _generate_ipsec_certificates(self): | |
| """Generate IKEv2/IPSec server certificates""" | |
| try: | |
| server_cert_path = os.path.join(self.ipsec_dir, 'server.crt') | |
| server_key_path = os.path.join(self.ipsec_dir, 'server.key') | |
| ca_cert_path = os.path.join(self.ipsec_dir, 'ca.crt') | |
| # Copy CA certificate | |
| if not os.path.exists(ca_cert_path): | |
| shutil.copy('/etc/vpn-ca/ca.crt', ca_cert_path) | |
| # Generate server certificate using our CA | |
| if not os.path.exists(server_cert_path): | |
| logger.info("Generating IKEv2 server certificate") | |
| from services.certificate_authority import CertificateAuthority | |
| ca = CertificateAuthority() | |
| cert_data = ca.generate_client_certificate( | |
| username='ipsec-server', | |
| email='ipsec@vpn.local', | |
| validity_days=3650 | |
| ) | |
| with open(server_cert_path, 'wb') as f: | |
| f.write(cert_data['certificate']) | |
| with open(server_key_path, 'wb') as f: | |
| f.write(cert_data['private_key']) | |
| os.chmod(server_key_path, 0o600) | |
| except Exception as e: | |
| logger.error(f"Failed to generate IKEv2 certificates: {e}") | |
| raise | |
| def _setup_wireguard_server(self): | |
| """Set up WireGuard server configuration""" | |
| try: | |
| logger.info("Setting up WireGuard server") | |
| # Generate server keys if they don't exist | |
| server_private_key_path = os.path.join(self.wireguard_dir, 'server_private.key') | |
| server_public_key_path = os.path.join(self.wireguard_dir, 'server_public.key') | |
| if not os.path.exists(server_private_key_path): | |
| # Generate private key | |
| result = subprocess.run(['wg', 'genkey'], capture_output=True, text=True) | |
| if result.returncode == 0: | |
| private_key = result.stdout.strip() | |
| with open(server_private_key_path, 'w') as f: | |
| f.write(private_key) | |
| os.chmod(server_private_key_path, 0o600) | |
| # Generate public key | |
| result = subprocess.run(['wg', 'pubkey'], | |
| input=private_key, | |
| capture_output=True, text=True) | |
| if result.returncode == 0: | |
| public_key = result.stdout.strip() | |
| with open(server_public_key_path, 'w') as f: | |
| f.write(public_key) | |
| # Read keys | |
| with open(server_private_key_path, 'r') as f: | |
| server_private_key = f.read().strip() | |
| with open(server_public_key_path, 'r') as f: | |
| server_public_key = f.read().strip() | |
| # WireGuard server configuration | |
| wg_conf = f""" | |
| [Interface] | |
| PrivateKey = {server_private_key} | |
| Address = 10.13.13.1/24 | |
| ListenPort = {self.wireguard_port} | |
| PostUp = iptables -A FORWARD -i %i -j ACCEPT; iptables -A FORWARD -o %i -j ACCEPT; iptables -t nat -A POSTROUTING -o eth0 -j MASQUERADE | |
| PostDown = iptables -D FORWARD -i %i -j ACCEPT; iptables -D FORWARD -o %i -j ACCEPT; iptables -t nat -D POSTROUTING -o eth0 -j MASQUERADE | |
| # Client configurations will be added here | |
| """ | |
| wg_conf_path = os.path.join(self.wireguard_dir, 'wg0.conf') | |
| with open(wg_conf_path, 'w') as f: | |
| f.write(wg_conf) | |
| logger.info("WireGuard server configuration created") | |
| except Exception as e: | |
| logger.error(f"Failed to setup WireGuard server: {e}") | |
| # WireGuard might not be available, continue without it | |
| logger.warning("WireGuard setup failed, continuing without WireGuard support") | |
| def start_openvpn_server(self): | |
| """Start OpenVPN server""" | |
| try: | |
| logger.info("Starting OpenVPN server") | |
| # Enable IP forwarding | |
| subprocess.run(['sudo', 'sysctl', '-w', 'net.ipv4.ip_forward=1'], check=True) | |
| # Start OpenVPN | |
| config_path = os.path.join(self.openvpn_dir, 'server.conf') | |
| cmd = ['sudo', 'openvpn', '--config', config_path, '--daemon'] | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode == 0: | |
| logger.info("OpenVPN server started successfully") | |
| return True | |
| else: | |
| logger.error(f"Failed to start OpenVPN server: {result.stderr}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error starting OpenVPN server: {e}") | |
| return False | |
| def start_ipsec_server(self): | |
| """Start IKEv2/IPSec server""" | |
| try: | |
| logger.info("Starting IKEv2/IPSec server") | |
| # Enable IP forwarding | |
| subprocess.run(['sudo', 'sysctl', '-w', 'net.ipv4.ip_forward=1'], check=True) | |
| # Start strongSwan | |
| result = subprocess.run(['sudo', 'systemctl', 'start', 'strongswan-starter'], | |
| capture_output=True, text=True) | |
| if result.returncode == 0: | |
| logger.info("IKEv2/IPSec server started successfully") | |
| return True | |
| else: | |
| logger.error(f"Failed to start IKEv2/IPSec server: {result.stderr}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error starting IKEv2/IPSec server: {e}") | |
| return False | |
| def start_wireguard_server(self): | |
| """Start WireGuard server""" | |
| try: | |
| logger.info("Starting WireGuard server") | |
| # Enable IP forwarding | |
| subprocess.run(['sudo', 'sysctl', '-w', 'net.ipv4.ip_forward=1'], check=True) | |
| # Start WireGuard | |
| config_path = os.path.join(self.wireguard_dir, 'wg0.conf') | |
| result = subprocess.run(['sudo', 'wg-quick', 'up', config_path], | |
| capture_output=True, text=True) | |
| if result.returncode == 0: | |
| logger.info("WireGuard server started successfully") | |
| return True | |
| else: | |
| logger.error(f"Failed to start WireGuard server: {result.stderr}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error starting WireGuard server: {e}") | |
| return False | |
| def stop_openvpn_server(self): | |
| """Stop OpenVPN server""" | |
| try: | |
| subprocess.run(['sudo', 'pkill', '-f', 'openvpn.*server.conf'], check=False) | |
| logger.info("OpenVPN server stopped") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error stopping OpenVPN server: {e}") | |
| return False | |
| def stop_ipsec_server(self): | |
| """Stop IKEv2/IPSec server""" | |
| try: | |
| subprocess.run(['sudo', 'systemctl', 'stop', 'strongswan-starter'], check=False) | |
| logger.info("IKEv2/IPSec server stopped") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error stopping IKEv2/IPSec server: {e}") | |
| return False | |
| def stop_wireguard_server(self): | |
| """Stop WireGuard server""" | |
| try: | |
| config_path = os.path.join(self.wireguard_dir, 'wg0.conf') | |
| subprocess.run(['sudo', 'wg-quick', 'down', config_path], check=False) | |
| logger.info("WireGuard server stopped") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error stopping WireGuard server: {e}") | |
| return False | |
| def get_server_status(self): | |
| """Get status of all VPN servers""" | |
| status = { | |
| 'openvpn': self._check_openvpn_status(), | |
| 'ipsec': self._check_ipsec_status(), | |
| 'wireguard': self._check_wireguard_status() | |
| } | |
| return status | |
| def _check_openvpn_status(self): | |
| """Check OpenVPN server status""" | |
| try: | |
| result = subprocess.run(['pgrep', '-f', 'openvpn.*server.conf'], | |
| capture_output=True, text=True) | |
| if result.returncode == 0: | |
| # Get connected clients | |
| try: | |
| with open('/var/log/openvpn/openvpn-status.log', 'r') as f: | |
| content = f.read() | |
| # Parse client connections | |
| clients = [] | |
| in_client_section = False | |
| for line in content.split('\n'): | |
| if line.startswith('Common Name,Real Address'): | |
| in_client_section = True | |
| continue | |
| elif line.startswith('ROUTING TABLE'): | |
| in_client_section = False | |
| break | |
| elif in_client_section and line.strip(): | |
| parts = line.split(',') | |
| if len(parts) >= 4: | |
| clients.append({ | |
| 'common_name': parts[0], | |
| 'real_address': parts[1], | |
| 'bytes_received': parts[2], | |
| 'bytes_sent': parts[3], | |
| 'connected_since': parts[4] if len(parts) > 4 else 'Unknown' | |
| }) | |
| return { | |
| 'running': True, | |
| 'port': self.openvpn_port, | |
| 'clients': clients, | |
| 'client_count': len(clients) | |
| } | |
| except: | |
| return { | |
| 'running': True, | |
| 'port': self.openvpn_port, | |
| 'clients': [], | |
| 'client_count': 0 | |
| } | |
| else: | |
| return { | |
| 'running': False, | |
| 'port': self.openvpn_port, | |
| 'clients': [], | |
| 'client_count': 0 | |
| } | |
| except Exception as e: | |
| logger.error(f"Error checking OpenVPN status: {e}") | |
| return { | |
| 'running': False, | |
| 'port': self.openvpn_port, | |
| 'clients': [], | |
| 'client_count': 0, | |
| 'error': str(e) | |
| } | |
| def _check_ipsec_status(self): | |
| """Check IKEv2/IPSec server status""" | |
| try: | |
| result = subprocess.run(['sudo', 'systemctl', 'is-active', 'strongswan-starter'], | |
| capture_output=True, text=True) | |
| running = result.stdout.strip() == 'active' | |
| clients = [] | |
| if running: | |
| # Get connected clients | |
| try: | |
| result = subprocess.run(['sudo', 'swanctl', '--list-sas'], | |
| capture_output=True, text=True) | |
| if result.returncode == 0: | |
| # Parse strongSwan output for connected clients | |
| # This is a simplified parser | |
| for line in result.stdout.split('\n'): | |
| if 'ESTABLISHED' in line: | |
| clients.append({ | |
| 'connection': line.strip(), | |
| 'status': 'ESTABLISHED' | |
| }) | |
| except: | |
| pass | |
| return { | |
| 'running': running, | |
| 'port': self.ipsec_port, | |
| 'clients': clients, | |
| 'client_count': len(clients) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error checking IKEv2/IPSec status: {e}") | |
| return { | |
| 'running': False, | |
| 'port': self.ipsec_port, | |
| 'clients': [], | |
| 'client_count': 0, | |
| 'error': str(e) | |
| } | |
| def _check_wireguard_status(self): | |
| """Check WireGuard server status""" | |
| try: | |
| result = subprocess.run(['sudo', 'wg', 'show'], | |
| capture_output=True, text=True) | |
| if result.returncode == 0 and 'wg0' in result.stdout: | |
| # Parse WireGuard output for connected clients | |
| clients = [] | |
| current_peer = None | |
| for line in result.stdout.split('\n'): | |
| line = line.strip() | |
| if line.startswith('peer:'): | |
| if current_peer: | |
| clients.append(current_peer) | |
| current_peer = {'public_key': line.split(':', 1)[1].strip()} | |
| elif current_peer and line.startswith('endpoint:'): | |
| current_peer['endpoint'] = line.split(':', 1)[1].strip() | |
| elif current_peer and line.startswith('allowed ips:'): | |
| current_peer['allowed_ips'] = line.split(':', 1)[1].strip() | |
| elif current_peer and line.startswith('latest handshake:'): | |
| current_peer['latest_handshake'] = line.split(':', 1)[1].strip() | |
| if current_peer: | |
| clients.append(current_peer) | |
| return { | |
| 'running': True, | |
| 'port': self.wireguard_port, | |
| 'clients': clients, | |
| 'client_count': len(clients) | |
| } | |
| else: | |
| return { | |
| 'running': False, | |
| 'port': self.wireguard_port, | |
| 'clients': [], | |
| 'client_count': 0 | |
| } | |
| except Exception as e: | |
| logger.error(f"Error checking WireGuard status: {e}") | |
| return { | |
| 'running': False, | |
| 'port': self.wireguard_port, | |
| 'clients': [], | |
| 'client_count': 0, | |
| 'error': str(e) | |
| } | |
| def generate_client_config(self, username: str, protocol: str, user_data: dict) -> dict: | |
| """Generate client configuration for specified protocol""" | |
| try: | |
| if protocol.lower() == 'openvpn': | |
| return self._generate_openvpn_client_config(username, user_data) | |
| elif protocol.lower() in ['ikev2', 'ipsec']: | |
| return self._generate_ipsec_client_config(username, user_data) | |
| elif protocol.lower() == 'wireguard': | |
| return self._generate_wireguard_client_config(username, user_data) | |
| else: | |
| raise ValueError(f"Unsupported protocol: {protocol}") | |
| except Exception as e: | |
| logger.error(f"Failed to generate client config for {username}: {e}") | |
| raise | |
| def _generate_openvpn_client_config(self, username: str, user_data: dict) -> dict: | |
| """Generate OpenVPN client configuration""" | |
| try: | |
| # Generate client certificate | |
| from services.certificate_authority import CertificateAuthority | |
| ca = CertificateAuthority() | |
| cert_data = ca.generate_client_certificate( | |
| username=username, | |
| email=user_data.get('email', f'{username}@vpn.local') | |
| ) | |
| # Read CA certificate and TLS auth key | |
| with open('/etc/vpn-ca/ca.crt', 'r') as f: | |
| ca_cert = f.read() | |
| with open(os.path.join(self.openvpn_dir, 'ta.key'), 'r') as f: | |
| ta_key = f.read() | |
| # Generate client configuration | |
| client_config = f""" | |
| # OpenVPN Client Configuration for {username} | |
| client | |
| dev tun | |
| proto udp | |
| remote {self.server_ip} {self.openvpn_port} | |
| resolv-retry infinite | |
| nobind | |
| persist-key | |
| persist-tun | |
| remote-cert-tls server | |
| cipher AES-256-GCM | |
| auth SHA256 | |
| tls-version-min 1.2 | |
| key-direction 1 | |
| verb 3 | |
| <ca> | |
| {ca_cert} | |
| </ca> | |
| <cert> | |
| {cert_data['certificate'].decode('utf-8')} | |
| </cert> | |
| <key> | |
| {cert_data['private_key'].decode('utf-8')} | |
| </key> | |
| <tls-auth> | |
| {ta_key} | |
| </tls-auth> | |
| """ | |
| return { | |
| 'config': client_config, | |
| 'filename': f'{username}-openvpn.ovpn', | |
| 'protocol': 'OpenVPN', | |
| 'server_ip': self.server_ip, | |
| 'server_port': self.openvpn_port | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to generate OpenVPN client config: {e}") | |
| raise | |
| def _generate_ipsec_client_config(self, username: str, user_data: dict) -> dict: | |
| """Generate IKEv2/IPSec client configuration""" | |
| try: | |
| # Read CA certificate | |
| with open('/etc/vpn-ca/ca.crt', 'r') as f: | |
| ca_cert = f.read() | |
| # Generate client certificate | |
| from services.certificate_authority import CertificateAuthority | |
| ca = CertificateAuthority() | |
| cert_data = ca.generate_client_certificate( | |
| username=username, | |
| email=user_data.get('email', f'{username}@vpn.local') | |
| ) | |
| # For mobile clients, we'll provide instructions and certificates | |
| config_instructions = f""" | |
| IKEv2/IPSec VPN Configuration for {username} | |
| Server Details: | |
| - Server Address: {self.server_ip} | |
| - Remote ID: vpn.server.local | |
| - Local ID: {username}@vpn.local | |
| Authentication: | |
| - Type: Certificate | |
| - Install the CA certificate and client certificate on your device | |
| For iOS: | |
| 1. Install the CA certificate first | |
| 2. Install the client certificate | |
| 3. Go to Settings > General > VPN & Device Management | |
| 4. Add VPN Configuration > IKEv2 | |
| 5. Enter server details above | |
| For Android: | |
| 1. Install the CA certificate in Settings > Security > Encryption & credentials | |
| 2. Install the client certificate | |
| 3. Go to Settings > Network & internet > VPN | |
| 4. Add VPN profile with IKEv2/IPSec PSK | |
| For Windows: | |
| 1. Install certificates in Certificate Manager | |
| 2. Add VPN connection with IKEv2 protocol | |
| 3. Use certificate authentication | |
| For macOS: | |
| 1. Install certificates in Keychain Access | |
| 2. Add VPN configuration in Network preferences | |
| 3. Select IKEv2 and certificate authentication | |
| """ | |
| return { | |
| 'config': config_instructions, | |
| 'ca_certificate': ca_cert, | |
| 'client_certificate': cert_data['certificate'].decode('utf-8'), | |
| 'client_private_key': cert_data['private_key'].decode('utf-8'), | |
| 'filename': f'{username}-ikev2-config.txt', | |
| 'protocol': 'IKEv2/IPSec', | |
| 'server_ip': self.server_ip, | |
| 'server_port': self.ipsec_port | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to generate IKEv2 client config: {e}") | |
| raise | |
| def _generate_wireguard_client_config(self, username: str, user_data: dict) -> dict: | |
| """Generate WireGuard client configuration""" | |
| try: | |
| # Generate client keys | |
| result = subprocess.run(['wg', 'genkey'], capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise Exception("Failed to generate WireGuard private key") | |
| client_private_key = result.stdout.strip() | |
| result = subprocess.run(['wg', 'pubkey'], | |
| input=client_private_key, | |
| capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise Exception("Failed to generate WireGuard public key") | |
| client_public_key = result.stdout.strip() | |
| # Read server public key | |
| with open(os.path.join(self.wireguard_dir, 'server_public.key'), 'r') as f: | |
| server_public_key = f.read().strip() | |
| # Assign client IP (simple allocation) | |
| client_ip = f"10.13.13.{hash(username) % 200 + 10}" | |
| # Generate client configuration | |
| client_config = f""" | |
| [Interface] | |
| PrivateKey = {client_private_key} | |
| Address = {client_ip}/32 | |
| DNS = 8.8.8.8 | |
| [Peer] | |
| PublicKey = {server_public_key} | |
| Endpoint = {self.server_ip}:{self.wireguard_port} | |
| AllowedIPs = 0.0.0.0/0 | |
| PersistentKeepalive = 25 | |
| """ | |
| # Add client to server configuration | |
| self._add_wireguard_peer(username, client_public_key, client_ip) | |
| return { | |
| 'config': client_config, | |
| 'filename': f'{username}-wireguard.conf', | |
| 'protocol': 'WireGuard', | |
| 'server_ip': self.server_ip, | |
| 'server_port': self.wireguard_port, | |
| 'client_public_key': client_public_key | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to generate WireGuard client config: {e}") | |
| raise | |
| def _add_wireguard_peer(self, username: str, client_public_key: str, client_ip: str): | |
| """Add WireGuard peer to server configuration""" | |
| try: | |
| wg_conf_path = os.path.join(self.wireguard_dir, 'wg0.conf') | |
| peer_config = f""" | |
| # Client: {username} | |
| [Peer] | |
| PublicKey = {client_public_key} | |
| AllowedIPs = {client_ip}/32 | |
| """ | |
| with open(wg_conf_path, 'a') as f: | |
| f.write(peer_config) | |
| # If WireGuard is running, add peer dynamically | |
| try: | |
| subprocess.run(['sudo', 'wg', 'set', 'wg0', 'peer', client_public_key, | |
| 'allowed-ips', f'{client_ip}/32'], check=True) | |
| except: | |
| # Server might not be running, that's okay | |
| pass | |
| except Exception as e: | |
| logger.error(f"Failed to add WireGuard peer: {e}") | |
| raise | |
| def revoke_client_access(self, username: str, protocol: str): | |
| """Revoke client access for specified protocol""" | |
| try: | |
| if protocol.lower() == 'openvpn': | |
| return self._revoke_openvpn_client(username) | |
| elif protocol.lower() in ['ikev2', 'ipsec']: | |
| return self._revoke_ipsec_client(username) | |
| elif protocol.lower() == 'wireguard': | |
| return self._revoke_wireguard_client(username) | |
| else: | |
| raise ValueError(f"Unsupported protocol: {protocol}") | |
| except Exception as e: | |
| logger.error(f"Failed to revoke client access for {username}: {e}") | |
| raise | |
| def _revoke_openvpn_client(self, username: str): | |
| """Revoke OpenVPN client certificate""" | |
| try: | |
| from services.certificate_authority import CertificateAuthority | |
| ca = CertificateAuthority() | |
| # Find certificate by username and revoke it | |
| certificates = ca.list_certificates() | |
| for cert in certificates: | |
| if username in cert.get('subject', ''): | |
| ca.revoke_certificate(cert['serial_number']) | |
| logger.info(f"Revoked OpenVPN certificate for {username}") | |
| return True | |
| logger.warning(f"No OpenVPN certificate found for {username}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Failed to revoke OpenVPN client: {e}") | |
| raise | |
| def _revoke_ipsec_client(self, username: str): | |
| """Revoke IKEv2/IPSec client certificate""" | |
| try: | |
| # Same as OpenVPN since we use the same CA | |
| return self._revoke_openvpn_client(username) | |
| except Exception as e: | |
| logger.error(f"Failed to revoke IKEv2 client: {e}") | |
| raise | |
| def _revoke_wireguard_client(self, username: str): | |
| """Remove WireGuard client from server configuration""" | |
| try: | |
| wg_conf_path = os.path.join(self.wireguard_dir, 'wg0.conf') | |
| # Read current configuration | |
| with open(wg_conf_path, 'r') as f: | |
| lines = f.readlines() | |
| # Remove client section | |
| new_lines = [] | |
| skip_section = False | |
| for line in lines: | |
| if line.strip() == f'# Client: {username}': | |
| skip_section = True | |
| continue | |
| elif line.strip().startswith('# Client:') and skip_section: | |
| skip_section = False | |
| elif line.strip().startswith('[Peer]') and not skip_section: | |
| new_lines.append(line) | |
| elif not skip_section: | |
| new_lines.append(line) | |
| # Write updated configuration | |
| with open(wg_conf_path, 'w') as f: | |
| f.writelines(new_lines) | |
| logger.info(f"Removed WireGuard client {username}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to revoke WireGuard client: {e}") | |
| raise | |
| def get_server_info(self): | |
| """Get server information for client configuration""" | |
| return { | |
| 'server_ip': self.server_ip, | |
| 'protocols': { | |
| 'openvpn': { | |
| 'port': self.openvpn_port, | |
| 'protocol': 'UDP' | |
| }, | |
| 'ikev2': { | |
| 'port': self.ipsec_port, | |
| 'protocol': 'UDP' | |
| }, | |
| 'wireguard': { | |
| 'port': self.wireguard_port, | |
| 'protocol': 'UDP' | |
| } | |
| } | |
| } | |