Spaces:
Runtime error
Runtime error
| """ | |
| Outline VPN Configuration Manager | |
| """ | |
| import os | |
| import json | |
| from dataclasses import dataclass | |
| from typing import List, Optional, Dict | |
| class OutlineConfig: | |
| port: int = 443 | |
| cipher: str = "chacha20-ietf-poly1305" | |
| timeout: int = 600 | |
| workers: int = 4 | |
| bind_address: str = "0.0.0.0" | |
| access_key_salt: str = os.urandom(32).hex() | |
| class UserConfig: | |
| user_id: str | |
| access_key: str | |
| data_limit: Optional[int] = None | |
| expiry_date: Optional[str] = None | |
| is_active: bool = True | |
| last_connection: Optional[str] = None | |
| bandwidth_usage: int = 0 | |
| class OutlineManager: | |
| def __init__(self, config_path: str = "config/outline_config.json"): | |
| self.config_path = config_path | |
| self.config = OutlineConfig() | |
| self.users: List[UserConfig] = [] | |
| self._load_config() | |
| def _load_config(self): | |
| """Load configuration from file or create default""" | |
| os.makedirs(os.path.dirname(self.config_path), exist_ok=True) | |
| if os.path.exists(self.config_path): | |
| with open(self.config_path, 'r') as f: | |
| data = json.load(f) | |
| self.config = OutlineConfig(**data.get('server', {})) | |
| self.users = [UserConfig(**u) for u in data.get('users', [])] | |
| else: | |
| self.save_config() | |
| def save_config(self): | |
| """Save current configuration to file""" | |
| data = { | |
| 'server': self.config.__dict__, | |
| 'users': [u.__dict__ for u in self.users] | |
| } | |
| with open(self.config_path, 'w') as f: | |
| json.dump(data, f, indent=4) | |
| def add_user(self, user_id: str, data_limit: Optional[int] = None) -> UserConfig: | |
| """Add a new user and generate their access key""" | |
| # Check if user already exists | |
| if any(u.user_id == user_id for u in self.users): | |
| raise ValueError(f"User {user_id} already exists") | |
| access_key = self._generate_access_key(user_id) | |
| user = UserConfig( | |
| user_id=user_id, | |
| access_key=access_key, | |
| data_limit=data_limit | |
| ) | |
| self.users.append(user) | |
| self.save_config() | |
| return user | |
| def remove_user(self, user_id: str) -> bool: | |
| """Remove a user by their ID""" | |
| initial_length = len(self.users) | |
| self.users = [u for u in self.users if u.user_id != user_id] | |
| if len(self.users) < initial_length: | |
| self.save_config() | |
| return True | |
| return False | |
| def get_user_by_key(self, access_key: str) -> Optional[UserConfig]: | |
| """Find user by their access key""" | |
| for user in self.users: | |
| if user.access_key == access_key and user.is_active: | |
| return user | |
| return None | |
| def update_user_bandwidth(self, user_id: str, bytes_used: int): | |
| """Update user's bandwidth usage""" | |
| for user in self.users: | |
| if user.user_id == user_id: | |
| user.bandwidth_usage += bytes_used | |
| if user.data_limit and user.bandwidth_usage >= user.data_limit: | |
| user.is_active = False | |
| self.save_config() | |
| break | |
| def _generate_access_key(self, user_id: str) -> str: | |
| """Generate a unique access key for a user""" | |
| import hashlib | |
| key = hashlib.sha256(f"{user_id}{self.config.access_key_salt}".encode()).hexdigest() | |
| return key[:32] # Return first 32 chars as access key | |
| def get_server_stats(self) -> Dict: | |
| """Get server statistics""" | |
| return { | |
| "total_users": len(self.users), | |
| "active_users": sum(1 for u in self.users if u.is_active), | |
| "total_bandwidth": sum(u.bandwidth_usage for u in self.users) | |
| } | |
| def generate_openvpn_certificates(config_id: str) -> Dict: | |
| """Placeholder for OpenVPN certificate generation""" | |
| return {"cert": "placeholder_cert", "key": "placeholder_key"} | |
| def generate_openvpn_config(config_id: str, server_ip: str) -> str: | |
| """Placeholder for OpenVPN config generation""" | |
| return f"client\ndev tun\nproto udp\nremote {server_ip} 1194\nresolv-retry infinite\nnobind\npersist-key\npersist-tun\nremote-cert-tls server\ncipher AES-256-CBC\nverb 3" | |
| def generate_wireguard_keys(config_id: str) -> Dict: | |
| """Placeholder for WireGuard key generation""" | |
| return {"server_public": "placeholder_server_public", "client_private": "placeholder_client_private", "client_public": "placeholder_client_public"} | |