Spaces:
Runtime error
Runtime error
| from flask import Flask, render_template, jsonify, request, redirect, url_for, flash, session | |
| from flask_login import LoginManager, login_user, logout_user, login_required, current_user | |
| from functools import wraps | |
| import asyncio | |
| import threading | |
| import os | |
| import json | |
| import uuid | |
| import bcrypt | |
| from datetime import datetime, timedelta | |
| import logging | |
| from typing import Dict, Optional | |
| import sys | |
| from pathlib import Path | |
| sys.path.append(str(Path(__file__).parent.parent)) | |
| # Import core VPN components | |
| from core.shadowsocks_protocol import ShadowsocksProtocol | |
| from core.tcp_forward import OutlineTCPForwardingEngine | |
| from core.session_tracker import SessionTracker, UnifiedSession, SessionType, SessionState | |
| from core.logger import LogManager, LogCategory, LogLevel | |
| from core.outline_server import OutlineServer | |
| from core.ikev2_server import IKEv2Server | |
| from core.models.user import User, UserSession, UserRole, UserStatus | |
| from core.database import SessionLocal, engine | |
| from core.database_init import init_db | |
| from core.services.user_service import UserService | |
| from core.outline_config import generate_openvpn_certificates, generate_openvpn_config, generate_wireguard_keys | |
| # For IP detection | |
| import socket | |
| import requests | |
| app = Flask(__name__) | |
| app.secret_key = os.urandom(24) | |
| app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(days=30) | |
| # Initialize Flask-Login | |
| login_manager = LoginManager() | |
| login_manager.init_app(app) | |
| login_manager.login_view = "login" | |
| login_manager.login_message = "Please log in to access this page." | |
| def load_user(user_id): | |
| db = SessionLocal() | |
| try: | |
| return db.query(User).get(int(user_id)) | |
| finally: | |
| db.close() | |
| # Global VPN server state | |
| vpn_server: Optional[OutlineServer] = None | |
| session_tracker: Optional[SessionTracker] = None | |
| logger: Optional[LogManager] = None | |
| # Initialize database | |
| init_db() | |
| CONFIG_DIR = 'config' | |
| USERS_FILE = os.path.join(CONFIG_DIR, 'users.json') | |
| os.makedirs(CONFIG_DIR, exist_ok=True) | |
| def load_users(): | |
| if os.path.exists(USERS_FILE): | |
| with open(USERS_FILE, 'r') as f: | |
| return json.load(f) | |
| return {} | |
| def save_users(users): | |
| with open(USERS_FILE, 'w') as f: | |
| json.dump(users, f) | |
| def get_server_ip(): | |
| """Get the server's public IP address""" | |
| try: | |
| # First try to get public IP from external service | |
| response = requests.get('https://api.ipify.org', timeout=5) | |
| if response.status_code == 200: | |
| return response.text.strip() | |
| except: | |
| pass | |
| try: | |
| # Try another public IP service as backup | |
| response = requests.get('https://ifconfig.me', timeout=5) | |
| if response.status_code == 200: | |
| return response.text.strip() | |
| except: | |
| pass | |
| # Fallback: Get local IP | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| s.connect(('8.8.8.8', 80)) | |
| local_ip = s.getsockname()[0] | |
| s.close() | |
| return local_ip | |
| except: | |
| # Last resort fallback | |
| return '127.0.0.1' | |
| def initialize_ikev2_server(): | |
| """Initialize IKEv2 server""" | |
| global ikev2_server | |
| server_ip = get_server_ip() | |
| ikev2_server = IKEv2Server(server_ip, logger) | |
| logger.log(LogLevel.INFO, LogCategory.SYSTEM, "app", "IKEv2 server initialized") | |
| def initialize_vpn_server(): | |
| """Initialize the VPN server components""" | |
| global vpn_server, session_tracker, logger, ikev2_server | |
| # Initialize logger | |
| logger = LogManager() | |
| logger.log(LogLevel.INFO, LogCategory.SYSTEM, "app", "Initializing VPN server") | |
| # Initialize session tracker | |
| session_tracker = SessionTracker() | |
| # Initialize IKEv2 server | |
| initialize_ikev2_server() | |
| # Initialize VPN server | |
| server_ip = get_server_ip() | |
| vpn_server_config = { | |
| "server": { | |
| "host": server_ip, # Use automatically detected server IP | |
| "port": 8388, # Default Shadowsocks port | |
| "virtual_network": "10.7.0.0/24", # Virtual network for client IPs | |
| "protocols": { | |
| "shadowsocks": { | |
| "enabled": True, | |
| "port": 8388 | |
| }, | |
| "wireguard": { | |
| "enabled": True, | |
| "port": 51820 | |
| }, | |
| "openvpn": { | |
| "enabled": True, | |
| "port": 1194 | |
| }, | |
| "ikev2": { | |
| "enabled": True, | |
| "port": 500 | |
| } | |
| } | |
| }, | |
| "security": { | |
| "cipher": "aes-256-gcm", | |
| "auth": "sha256", | |
| "enable_perfect_forward_secrecy": True | |
| } | |
| } | |
| vpn_server = OutlineServer(vpn_server_config) | |
| # Start the VPN server in a separate thread | |
| def run_server(): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| loop.run_until_complete(vpn_server.start()) | |
| loop.run_forever() | |
| server_thread = threading.Thread(target=run_server, daemon=True) | |
| server_thread.start() | |
| logger.log(LogLevel.INFO, LogCategory.SYSTEM, "app", f"VPN server initialized and started on {server_ip}") | |
| def load_users(): | |
| if os.path.exists(USERS_FILE): | |
| with open(USERS_FILE, 'r') as f: | |
| return json.load(f) | |
| return {} | |
| def save_users(users): | |
| with open(USERS_FILE, 'w') as f: | |
| json.dump(users, f) | |
| def login_required(f): | |
| def decorated_function(*args, **kwargs): | |
| if 'user_id' not in session: | |
| return redirect(url_for('login')) | |
| return f(*args, **kwargs) | |
| return decorated_function | |
| def index(): | |
| if 'user_id' in session: | |
| return redirect(url_for('dashboard')) | |
| return render_template('index.html') | |
| def login(): | |
| if current_user.is_authenticated: | |
| return redirect(url_for("dashboard")) | |
| if request.method == "POST": | |
| email = request.form.get("email") | |
| password = request.form.get("password") | |
| remember_me = request.form.get("remember_me") == "on" | |
| db = SessionLocal() | |
| try: | |
| user_service = UserService(db) | |
| success, message, user = user_service.authenticate_user(email, password) | |
| if success: | |
| if user.status == UserStatus.LOCKED: | |
| flash('Your account is locked. Please try again later or contact support.') | |
| return redirect(url_for('login')) | |
| # Create a new session | |
| ip_address = request.remote_addr | |
| device_info = request.user_agent.string | |
| user_service.create_session(user, ip_address, device_info) | |
| # Log in the user | |
| login_user(user, remember=remember_me) | |
| # Record successful login | |
| user.record_login_attempt(success=True) | |
| db.commit() | |
| next_page = request.args.get('next') | |
| if not next_page or not next_page.startswith('/'): | |
| next_page = url_for('dashboard') | |
| return redirect(next_page) | |
| else: | |
| if user: | |
| # Record failed attempt | |
| user.record_login_attempt(success=False) | |
| db.commit() | |
| flash(message) | |
| return redirect(url_for('login')) | |
| finally: | |
| db.close() | |
| return render_template('login.html') | |
| def signup(): | |
| if request.method == 'POST': | |
| users = load_users() | |
| email = request.form.get('email') | |
| password = request.form.get('password') | |
| if email in users: | |
| flash('Email already registered') | |
| return redirect(url_for('signup')) | |
| # Hash password and create user | |
| hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() | |
| users[email] = { | |
| 'password': hashed, | |
| 'created_at': datetime.now().isoformat(), | |
| 'config_id': str(uuid.uuid4()) | |
| } | |
| save_users(users) | |
| # Create user's VPN configuration | |
| create_user_config(users[email]['config_id']) | |
| session['user_id'] = email | |
| return redirect(url_for('dashboard')) | |
| return render_template('signup.html') | |
| def dashboard(): | |
| users = load_users() | |
| user = users[session['user_id']] | |
| stats = get_user_stats(user['config_id']) | |
| return render_template('dashboard.html', user=user, stats=stats) | |
| def download_config(): | |
| users = load_users() | |
| user = users[session['user_id']] | |
| config_path = os.path.join(CONFIG_DIR, f"{user['config_id']}.json") | |
| if not os.path.exists(config_path): | |
| flash('Configuration not found') | |
| return redirect(url_for('dashboard')) | |
| with open(config_path, 'r') as f: | |
| config = json.load(f) | |
| return jsonify(config) | |
| def get_stats(): | |
| users = load_users() | |
| user = users[session['user_id']] | |
| return jsonify(get_user_stats(user['config_id'])) | |
| def get_server_ip(): | |
| """Get the server's public IP address""" | |
| try: | |
| # First try to get public IP from external service | |
| response = requests.get('https://api.ipify.org') | |
| if response.status_code == 200: | |
| return response.text.strip() | |
| except: | |
| pass | |
| # Fallback: Get local IP | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| s.connect(('8.8.8.8', 80)) | |
| local_ip = s.getsockname()[0] | |
| s.close() | |
| return local_ip | |
| except: | |
| return '127.0.0.1' # Last resort fallback | |
| def initialize_ikev2_server(): | |
| """Initialize IKEv2 server""" | |
| global ikev2_server | |
| server_ip = get_server_ip() | |
| ikev2_server = IKEv2Server(server_ip, logger) | |
| logger.log(LogLevel.INFO, LogCategory.SYSTEM, "app", "IKEv2 server initialized") | |
| def generate_ikev2_certificate(config_id: str) -> Dict: | |
| """Generate IKEv2 certificates for a user""" | |
| username = f"user_{config_id[:8]}" | |
| password = str(uuid.uuid4()) | |
| psk = str(uuid.uuid4()) | |
| try: | |
| cert_data = ikev2_server.add_user(config_id, username, password, psk) | |
| logger.info(LogCategory.SYSTEM, "app", f"Generated IKEv2 certificates for user {config_id}") | |
| return cert_data | |
| except Exception as e: | |
| logger.error(LogCategory.SYSTEM, "app", f"Failed to generate IKEv2 certificates: {e}") | |
| return None | |
| def create_user_config(config_id): | |
| """Create Outline VPN configuration for a new user""" | |
| if not os.path.exists(CONFIG_DIR): | |
| os.makedirs(CONFIG_DIR) | |
| server_ip = get_server_ip() | |
| access_key = str(uuid.uuid4()) | |
| # Outline/Shadowsocks config | |
| ss_config = { | |
| 'id': config_id, | |
| 'server': { | |
| 'host': server_ip, | |
| 'port': 8388 # Shadowsocks port | |
| }, | |
| 'access_key': access_key, | |
| 'protocol': 'shadowsocks', | |
| 'created_at': datetime.now().isoformat() | |
| } | |
| # IKEv2 config (Windows 10/11, Android 10+) | |
| ikev2_config = { | |
| 'id': f"{config_id}_ikev2", | |
| 'server': { | |
| 'host': server_ip, | |
| 'port': 500 # IKEv2 port | |
| }, | |
| 'credentials': { | |
| 'username': f"user_{config_id[:8]}", | |
| 'password': str(uuid.uuid4()), | |
| }, | |
| 'psk': str(uuid.uuid4()), # Pre-shared key | |
| 'certificate': generate_ikev2_certificate(config_id), | |
| 'protocol': 'ikev2', | |
| 'created_at': datetime.now().isoformat() | |
| } | |
| # L2TP/IPsec config (Windows, Android) | |
| l2tp_config = { | |
| 'id': f"{config_id}_l2tp", | |
| 'server': { | |
| 'host': server_ip, | |
| 'ports': { | |
| 'l2tp': 1701, | |
| 'ipsec': [500, 4500] # IPsec ports for NAT traversal | |
| } | |
| }, | |
| 'credentials': { | |
| 'username': f"user_{config_id[:8]}", | |
| 'password': str(uuid.uuid4()) | |
| }, | |
| 'ipsec': { | |
| 'psk': str(uuid.uuid4()), # Pre-shared key for IPsec | |
| 'encryption': 'aes-256-cbc', | |
| 'hash': 'sha256' | |
| }, | |
| 'protocol': 'l2tp_ipsec', | |
| 'created_at': datetime.now().isoformat() | |
| } | |
| # PPTP config (Legacy support - Windows, Android) | |
| pptp_config = { | |
| 'id': f"{config_id}_pptp", | |
| 'server': { | |
| 'host': server_ip, | |
| 'port': 1723 # PPTP port | |
| }, | |
| 'credentials': { | |
| 'username': f"user_{config_id[:8]}", | |
| 'password': str(uuid.uuid4()) | |
| }, | |
| 'protocol': 'pptp', | |
| 'encryption': 'require-mppe', # Maximum PPTP security | |
| 'warning': 'PPTP is considered less secure, use IKEv2 or L2TP/IPsec when possible', | |
| 'created_at': datetime.now().isoformat() | |
| } | |
| # OpenVPN config (Universal support) | |
| openvpn_config = { | |
| 'id': f"{config_id}_openvpn", | |
| 'server': { | |
| 'host': server_ip, | |
| 'port': 1194, # OpenVPN default port | |
| 'protocol': 'udp' # UDP for better performance | |
| }, | |
| 'credentials': { | |
| 'username': f"user_{config_id[:8]}", | |
| 'password': str(uuid.uuid4()) | |
| }, | |
| 'certificates': generate_openvpn_certificates(config_id), | |
| 'protocol': 'openvpn', | |
| 'created_at': datetime.now().isoformat(), | |
| 'config_file': generate_openvpn_config(config_id, server_ip) | |
| } | |
| # WireGuard config (Built-in Windows 11, Android, iOS) | |
| wireguard_config = { | |
| 'id': f"{config_id}_wireguard", | |
| 'server': { | |
| 'host': server_ip, | |
| 'port': 51820, # WireGuard default port | |
| 'public_key': generate_wireguard_keys(config_id)['server_public'], | |
| 'allowed_ips': ['0.0.0.0/0', '::/0'] # Route all traffic | |
| }, | |
| 'client': { | |
| 'private_key': generate_wireguard_keys(config_id)['client_private'], | |
| 'public_key': generate_wireguard_keys(config_id)['client_public'], | |
| 'address': f'10.7.0.{2 + len(load_users())}', # Unique IP for each client | |
| 'dns': ['1.1.1.1', '8.8.8.8'] | |
| }, | |
| 'protocol': 'wireguard', | |
| 'created_at': datetime.now().isoformat() | |
| } | |
| # L2TP/IPsec config (Built-in Windows, Android, iOS) | |
| l2tp_config = { | |
| 'id': f"{config_id}_l2tp", | |
| 'server': { | |
| 'host': server_ip, | |
| 'port': 1701, # L2TP port | |
| }, | |
| 'credentials': { | |
| 'username': f"user_{config_id[:8]}", | |
| 'password': str(uuid.uuid4()) | |
| }, | |
| 'ipsec': { | |
| 'psk': str(uuid.uuid4()) # Pre-shared key for IPsec | |
| }, | |
| 'protocol': 'l2tp_ipsec', | |
| 'created_at': datetime.now().isoformat() | |
| } | |
| # Combined config with all supported protocols | |
| config = { | |
| 'id': config_id, | |
| 'protocols': { | |
| 'shadowsocks': ss_config, | |
| 'ikev2': ikev2_config, | |
| 'l2tp': l2tp_config, | |
| 'pptp': pptp_config | |
| }, | |
| 'recommended_protocol': { | |
| 'windows': 'ikev2', | |
| 'android': 'ikev2', | |
| 'fallback': 'l2tp' | |
| }, | |
| 'created_at': datetime.now().isoformat() | |
| } | |
| config_path = os.path.join(CONFIG_DIR, f"{config_id}.json") | |
| with open(config_path, 'w') as f: | |
| json.dump(config, f) | |
| def get_user_stats(config_id): | |
| """Get real VPN usage statistics for a user from all active sessions""" | |
| try: | |
| if not session_tracker: | |
| logger.error(LogCategory.SYSTEM, "app", "Session tracker not initialized") | |
| return None | |
| # Get all sessions for this user | |
| user_sessions = session_tracker.get_user_sessions(config_id) | |
| if not user_sessions: | |
| return { | |
| 'bytes_sent': 0, | |
| 'bytes_received': 0, | |
| 'connected_since': None, | |
| 'last_seen': None, | |
| 'status': 'disconnected', | |
| 'active_sessions': [], | |
| 'protocols': [] | |
| } | |
| # Aggregate stats from all active sessions | |
| total_bytes_sent = 0 | |
| total_bytes_received = 0 | |
| earliest_connection = None | |
| latest_seen = None | |
| active_sessions = [] | |
| used_protocols = set() | |
| for sess in user_sessions: | |
| # Update totals | |
| total_bytes_sent += sess.bytes_out | |
| total_bytes_received += sess.bytes_in | |
| # Track connection times | |
| session_start = datetime.fromtimestamp(sess.start_time) | |
| session_last_seen = datetime.fromtimestamp(sess.last_seen) | |
| if not earliest_connection or session_start < earliest_connection: | |
| earliest_connection = session_start | |
| if not latest_seen or session_last_seen > latest_seen: | |
| latest_seen = session_last_seen | |
| # Track protocols | |
| used_protocols.add(sess.protocol) | |
| # Get session details | |
| session_info = { | |
| 'id': sess.session_id, | |
| 'protocol': sess.protocol, | |
| 'assigned_ip': sess.assigned_ip, | |
| 'connected_since': session_start.isoformat(), | |
| 'last_seen': session_last_seen.isoformat(), | |
| 'bytes_sent': sess.bytes_out, | |
| 'bytes_received': sess.bytes_in, | |
| 'is_offline': sess.is_offline | |
| } | |
| active_sessions.append(session_info) | |
| # Determine overall status | |
| current_time = datetime.now() | |
| is_active = any( | |
| (current_time - datetime.fromtimestamp(s.last_seen)).total_seconds() < 300 # 5 minutes | |
| for s in user_sessions | |
| ) | |
| status = 'active' if is_active else 'offline' | |
| if not is_active and any(s.is_offline for s in user_sessions): | |
| status = 'offline_available' | |
| return { | |
| 'bytes_sent': total_bytes_sent, | |
| 'bytes_received': total_bytes_received, | |
| 'connected_since': earliest_connection.isoformat() if earliest_connection else None, | |
| 'last_seen': latest_seen.isoformat() if latest_seen else None, | |
| 'status': status, | |
| 'active_sessions': active_sessions, | |
| 'protocols': list(used_protocols) | |
| } | |
| except Exception as e: | |
| logger.error(LogCategory.SYSTEM, "app", f"Error getting user stats: {e}") | |
| return None | |
| def logout(): | |
| if current_user.is_authenticated: | |
| db = SessionLocal() | |
| try: | |
| # Find and end the current session | |
| current_session = ( | |
| db.query(UserSession) | |
| .filter(UserSession.user_id == current_user.id) | |
| .order_by(UserSession.created_at.desc()) | |
| .first() | |
| ) | |
| if current_session: | |
| current_session.expires_at = datetime.utcnow() | |
| db.commit() | |
| finally: | |
| db.close() | |
| logout_user() | |
| flash('You have been logged out.') | |
| return redirect(url_for('index')) | |
| def forgot_password(): | |
| if request.method == 'POST': | |
| email = request.form.get('email') | |
| db = SessionLocal() | |
| try: | |
| user = db.query(User).filter(User.username == email).first() | |
| if user: | |
| # Generate password reset token | |
| user_service = UserService(db) | |
| reset_token = user_service.generate_reset_token() | |
| user.reset_token = reset_token | |
| user.reset_token_expires = datetime.utcnow() + timedelta(hours=24) | |
| db.commit() | |
| # TODO: Send reset email with token | |
| # For now, just show the token (in production, you'd send this via email) | |
| flash(f'Password reset link has been sent to your email address.') | |
| else: | |
| # To prevent user enumeration, show the same message | |
| flash(f'Password reset link has been sent to your email address.') | |
| return redirect(url_for('login')) | |
| finally: | |
| db.close() | |
| return render_template('forgot_password.html') | |
| if __name__ == '__main__': | |
| # Initialize the VPN server first | |
| initialize_vpn_server() | |
| # Run Flask development server | |
| app.run(host="0.0.0.0", port=7860, debug=True) | |
| # If you want to use Uvicorn (production), uncomment these lines and comment out app.run(): | |
| # from asgiref.wsgi import WsgiToAsgi | |
| # asgi_app = WsgiToAsgi(app) | |
| # import uvicorn | |
| # uvicorn.run(asgi_app, host="0.0.0.0", port=7860) | |
| def shutdown_vpn_server(exception=None): | |
| global vpn_server | |
| if vpn_server and vpn_server.is_running: | |
| logger.log(LogLevel.INFO, LogCategory.SYSTEM, "app", "Shutting down VPN server...") | |
| asyncio.run(vpn_server.stop()) | |
| logger.log(LogLevel.INFO, LogCategory.SYSTEM, "app", "VPN server shut down.") | |