Spaces:
Paused
Paused
| """ | |
| VPN Server Management Routes | |
| Provides API endpoints for managing VPN servers and monitoring connections | |
| """ | |
| import logging | |
| from flask import Blueprint, request, jsonify, current_app | |
| from functools import wraps | |
| from models.enhanced_user import User | |
| from services.vpn_server_manager import VPNServerManager | |
| import jwt | |
| logger = logging.getLogger(__name__) | |
| vpn_server_bp = Blueprint('vpn_server', __name__) | |
| def token_required(f): | |
| """Decorator to require valid JWT token""" | |
| def decorated(*args, **kwargs): | |
| token = request.headers.get('Authorization') | |
| if not token: | |
| return jsonify({'error': 'Token is missing'}), 401 | |
| try: | |
| if token.startswith('Bearer '): | |
| token = token[7:] | |
| data = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256']) | |
| current_user = User.query.get(data['user_id']) | |
| if not current_user: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| except jwt.ExpiredSignatureError: | |
| return jsonify({'error': 'Token has expired'}), 401 | |
| except jwt.InvalidTokenError: | |
| return jsonify({'error': 'Invalid token'}), 401 | |
| return f(current_user, *args, **kwargs) | |
| return decorated | |
| def get_server_status(current_user): | |
| """Get VPN server status for all protocols""" | |
| try: | |
| vpn_manager = VPNServerManager() | |
| status = vpn_manager.get_server_status() | |
| return jsonify({ | |
| 'success': True, | |
| 'status': status, | |
| 'server_info': vpn_manager.get_server_info() | |
| }) | |
| except Exception as e: | |
| logger.error(f"Server status error: {e}") | |
| return jsonify({'error': 'Failed to get server status'}), 500 | |
| def start_server(current_user, protocol): | |
| """Start VPN server for specified protocol""" | |
| try: | |
| # Check if user has admin privileges (for now, allow all authenticated users) | |
| vpn_manager = VPNServerManager() | |
| if protocol.lower() == 'openvpn': | |
| success = vpn_manager.start_openvpn_server() | |
| elif protocol.lower() in ['ikev2', 'ipsec']: | |
| success = vpn_manager.start_ipsec_server() | |
| elif protocol.lower() == 'wireguard': | |
| success = vpn_manager.start_wireguard_server() | |
| else: | |
| return jsonify({'error': 'Unsupported protocol'}), 400 | |
| if success: | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'{protocol.upper()} server started successfully' | |
| }) | |
| else: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Failed to start {protocol.upper()} server' | |
| }), 500 | |
| except Exception as e: | |
| logger.error(f"Server start error: {e}") | |
| return jsonify({'error': 'Failed to start server'}), 500 | |
| def stop_server(current_user, protocol): | |
| """Stop VPN server for specified protocol""" | |
| try: | |
| vpn_manager = VPNServerManager() | |
| if protocol.lower() == 'openvpn': | |
| success = vpn_manager.stop_openvpn_server() | |
| elif protocol.lower() in ['ikev2', 'ipsec']: | |
| success = vpn_manager.stop_ipsec_server() | |
| elif protocol.lower() == 'wireguard': | |
| success = vpn_manager.stop_wireguard_server() | |
| else: | |
| return jsonify({'error': 'Unsupported protocol'}), 400 | |
| if success: | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'{protocol.upper()} server stopped successfully' | |
| }) | |
| else: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Failed to stop {protocol.upper()} server' | |
| }), 500 | |
| except Exception as e: | |
| logger.error(f"Server stop error: {e}") | |
| return jsonify({'error': 'Failed to stop server'}), 500 | |
| def restart_server(current_user, protocol): | |
| """Restart VPN server for specified protocol""" | |
| try: | |
| vpn_manager = VPNServerManager() | |
| # Stop server first | |
| if protocol.lower() == 'openvpn': | |
| vpn_manager.stop_openvpn_server() | |
| success = vpn_manager.start_openvpn_server() | |
| elif protocol.lower() in ['ikev2', 'ipsec']: | |
| vpn_manager.stop_ipsec_server() | |
| success = vpn_manager.start_ipsec_server() | |
| elif protocol.lower() == 'wireguard': | |
| vpn_manager.stop_wireguard_server() | |
| success = vpn_manager.start_wireguard_server() | |
| else: | |
| return jsonify({'error': 'Unsupported protocol'}), 400 | |
| if success: | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'{protocol.upper()} server restarted successfully' | |
| }) | |
| else: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': f'Failed to restart {protocol.upper()} server' | |
| }), 500 | |
| except Exception as e: | |
| logger.error(f"Server restart error: {e}") | |
| return jsonify({'error': 'Failed to restart server'}), 500 | |
| def get_connected_clients(current_user, protocol): | |
| """Get connected clients for specified protocol""" | |
| try: | |
| vpn_manager = VPNServerManager() | |
| status = vpn_manager.get_server_status() | |
| protocol_status = status.get(protocol.lower()) | |
| if not protocol_status: | |
| return jsonify({'error': 'Unsupported protocol'}), 400 | |
| return jsonify({ | |
| 'success': True, | |
| 'protocol': protocol.upper(), | |
| 'clients': protocol_status.get('clients', []), | |
| 'client_count': protocol_status.get('client_count', 0), | |
| 'running': protocol_status.get('running', False) | |
| }) | |
| except Exception as e: | |
| logger.error(f"Connected clients error: {e}") | |
| return jsonify({'error': 'Failed to get connected clients'}), 500 | |
| def get_server_info(current_user): | |
| """Get server information for client configuration""" | |
| try: | |
| vpn_manager = VPNServerManager() | |
| server_info = vpn_manager.get_server_info() | |
| return jsonify({ | |
| 'success': True, | |
| 'server_info': server_info | |
| }) | |
| except Exception as e: | |
| logger.error(f"Server info error: {e}") | |
| return jsonify({'error': 'Failed to get server info'}), 500 | |
| def get_supported_protocols(): | |
| """Get list of supported VPN protocols""" | |
| try: | |
| protocols = [ | |
| { | |
| 'name': 'OpenVPN', | |
| 'key': 'openvpn', | |
| 'description': 'Secure, reliable, and widely supported VPN protocol', | |
| 'port': 1194, | |
| 'transport': 'UDP', | |
| 'features': [ | |
| 'Cross-platform compatibility', | |
| 'Strong encryption (AES-256)', | |
| 'Certificate-based authentication', | |
| 'Firewall-friendly' | |
| ] | |
| }, | |
| { | |
| 'name': 'IKEv2/IPSec', | |
| 'key': 'ikev2', | |
| 'description': 'Fast and secure protocol, ideal for mobile devices', | |
| 'port': 500, | |
| 'transport': 'UDP', | |
| 'features': [ | |
| 'Excellent mobile support', | |
| 'Fast reconnection', | |
| 'Built into most devices', | |
| 'Strong security' | |
| ] | |
| }, | |
| { | |
| 'name': 'WireGuard', | |
| 'key': 'wireguard', | |
| 'description': 'Modern, fast, and lightweight VPN protocol', | |
| 'port': 51820, | |
| 'transport': 'UDP', | |
| 'features': [ | |
| 'Exceptional performance', | |
| 'Minimal codebase', | |
| 'Modern cryptography', | |
| 'Low battery usage' | |
| ] | |
| } | |
| ] | |
| return jsonify({ | |
| 'success': True, | |
| 'protocols': protocols | |
| }) | |
| except Exception as e: | |
| logger.error(f"Protocols info error: {e}") | |
| return jsonify({'error': 'Failed to get protocols info'}), 500 | |
| def get_server_logs(current_user, protocol): | |
| """Get server logs for specified protocol""" | |
| try: | |
| lines = request.args.get('lines', 50, type=int) | |
| lines = min(lines, 1000) # Limit to 1000 lines max | |
| log_files = { | |
| 'openvpn': '/var/log/openvpn/openvpn.log', | |
| 'ikev2': '/var/log/syslog', # strongSwan logs to syslog | |
| 'ipsec': '/var/log/syslog', | |
| 'wireguard': '/var/log/syslog' # WireGuard logs to syslog | |
| } | |
| log_file = log_files.get(protocol.lower()) | |
| if not log_file: | |
| return jsonify({'error': 'Unsupported protocol'}), 400 | |
| try: | |
| import subprocess | |
| if protocol.lower() == 'openvpn': | |
| # Get OpenVPN specific logs | |
| result = subprocess.run( | |
| ['tail', '-n', str(lines), log_file], | |
| capture_output=True, text=True | |
| ) | |
| else: | |
| # Filter syslog for protocol-specific entries | |
| if protocol.lower() in ['ikev2', 'ipsec']: | |
| grep_pattern = 'charon' | |
| else: # wireguard | |
| grep_pattern = 'wireguard' | |
| result = subprocess.run( | |
| f'grep "{grep_pattern}" {log_file} | tail -n {lines}', | |
| shell=True, capture_output=True, text=True | |
| ) | |
| if result.returncode == 0: | |
| logs = result.stdout.strip().split('\n') if result.stdout.strip() else [] | |
| else: | |
| logs = [] | |
| except Exception: | |
| logs = [] | |
| return jsonify({ | |
| 'success': True, | |
| 'protocol': protocol.upper(), | |
| 'logs': logs, | |
| 'line_count': len(logs) | |
| }) | |
| except Exception as e: | |
| logger.error(f"Server logs error: {e}") | |
| return jsonify({'error': 'Failed to get server logs'}), 500 | |
| def get_server_statistics(current_user): | |
| """Get comprehensive server statistics""" | |
| try: | |
| vpn_manager = VPNServerManager() | |
| status = vpn_manager.get_server_status() | |
| # Calculate overall statistics | |
| total_clients = sum(proto.get('client_count', 0) for proto in status.values()) | |
| running_servers = sum(1 for proto in status.values() if proto.get('running', False)) | |
| statistics = { | |
| 'overview': { | |
| 'total_clients': total_clients, | |
| 'running_servers': running_servers, | |
| 'total_servers': len(status), | |
| 'server_ip': vpn_manager.server_ip | |
| }, | |
| 'protocols': {} | |
| } | |
| for protocol, proto_status in status.items(): | |
| statistics['protocols'][protocol] = { | |
| 'running': proto_status.get('running', False), | |
| 'port': proto_status.get('port', 0), | |
| 'client_count': proto_status.get('client_count', 0), | |
| 'clients': proto_status.get('clients', []) | |
| } | |
| return jsonify({ | |
| 'success': True, | |
| 'statistics': statistics | |
| }) | |
| except Exception as e: | |
| logger.error(f"Server statistics error: {e}") | |
| return jsonify({'error': 'Failed to get server statistics'}), 500 | |