""" Outline VPN Configuration Manager """ import os import json from dataclasses import dataclass from typing import List, Optional, Dict @dataclass class OutlineConfig: port: int = 443 cipher: str = "chacha20-ietf-poly1305" timeout: int = 600 workers: int = 4 bind_address: str = "0.0.0.0" access_key_salt: str = os.urandom(32).hex() @dataclass class UserConfig: user_id: str access_key: str data_limit: Optional[int] = None expiry_date: Optional[str] = None is_active: bool = True last_connection: Optional[str] = None bandwidth_usage: int = 0 class OutlineManager: def __init__(self, config_path: str = "config/outline_config.json"): self.config_path = config_path self.config = OutlineConfig() self.users: List[UserConfig] = [] self._load_config() def _load_config(self): """Load configuration from file or create default""" os.makedirs(os.path.dirname(self.config_path), exist_ok=True) if os.path.exists(self.config_path): with open(self.config_path, 'r') as f: data = json.load(f) self.config = OutlineConfig(**data.get('server', {})) self.users = [UserConfig(**u) for u in data.get('users', [])] else: self.save_config() def save_config(self): """Save current configuration to file""" data = { 'server': self.config.__dict__, 'users': [u.__dict__ for u in self.users] } with open(self.config_path, 'w') as f: json.dump(data, f, indent=4) def add_user(self, user_id: str, data_limit: Optional[int] = None) -> UserConfig: """Add a new user and generate their access key""" # Check if user already exists if any(u.user_id == user_id for u in self.users): raise ValueError(f"User {user_id} already exists") access_key = self._generate_access_key(user_id) user = UserConfig( user_id=user_id, access_key=access_key, data_limit=data_limit ) self.users.append(user) self.save_config() return user def remove_user(self, user_id: str) -> bool: """Remove a user by their ID""" initial_length = len(self.users) self.users = [u for u in self.users if u.user_id != user_id] if len(self.users) < initial_length: self.save_config() return True return False def get_user_by_key(self, access_key: str) -> Optional[UserConfig]: """Find user by their access key""" for user in self.users: if user.access_key == access_key and user.is_active: return user return None def update_user_bandwidth(self, user_id: str, bytes_used: int): """Update user's bandwidth usage""" for user in self.users: if user.user_id == user_id: user.bandwidth_usage += bytes_used if user.data_limit and user.bandwidth_usage >= user.data_limit: user.is_active = False self.save_config() break def _generate_access_key(self, user_id: str) -> str: """Generate a unique access key for a user""" import hashlib key = hashlib.sha256(f"{user_id}{self.config.access_key_salt}".encode()).hexdigest() return key[:32] # Return first 32 chars as access key def get_server_stats(self) -> Dict: """Get server statistics""" return { "total_users": len(self.users), "active_users": sum(1 for u in self.users if u.is_active), "total_bandwidth": sum(u.bandwidth_usage for u in self.users) } def generate_openvpn_certificates(config_id: str) -> Dict: """Placeholder for OpenVPN certificate generation""" return {"cert": "placeholder_cert", "key": "placeholder_key"} def generate_openvpn_config(config_id: str, server_ip: str) -> str: """Placeholder for OpenVPN config generation""" return f"client\ndev tun\nproto udp\nremote {server_ip} 1194\nresolv-retry infinite\nnobind\npersist-key\npersist-tun\nremote-cert-tls server\ncipher AES-256-CBC\nverb 3" def generate_wireguard_keys(config_id: str) -> Dict: """Placeholder for WireGuard key generation""" return {"server_public": "placeholder_server_public", "client_private": "placeholder_client_private", "client_public": "placeholder_client_public"}