Spaces:
Paused
Paused
| """ | |
| VPN Client Management Routes | |
| Flask routes for VPN client creation, configuration, and management | |
| """ | |
| from flask import Blueprint, request, jsonify, send_file, current_app | |
| from flask_cors import cross_origin | |
| from models.enhanced_user import db, User, VPNClient, VPNSession | |
| from routes.auth import token_required | |
| import os | |
| import json | |
| import logging | |
| import tempfile | |
| import zipfile | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| vpn_client_bp = Blueprint('vpn_client', __name__) | |
| def list_vpn_clients(current_user): | |
| """List user's VPN clients""" | |
| try: | |
| clients = VPNClient.query.filter_by(user_id=current_user.id).all() | |
| return jsonify({ | |
| 'clients': [client.to_dict() for client in clients], | |
| 'total': len(clients), | |
| 'can_create_more': current_user.can_create_vpn_client() | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"VPN client listing error: {e}") | |
| return jsonify({'error': 'Failed to retrieve VPN clients'}), 500 | |
| def create_vpn_client(current_user): | |
| """Create new VPN client configuration""" | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({'error': 'No data provided'}), 400 | |
| # Validate required fields | |
| required_fields = ['client_name', 'protocol'] | |
| for field in required_fields: | |
| if field not in data or not data[field]: | |
| return jsonify({'error': f'{field} is required'}), 400 | |
| client_name = data['client_name'].strip() | |
| protocol = data['protocol'].lower() | |
| device_type = data.get('device_type', 'unknown') | |
| # Validate protocol | |
| supported_protocols = ['openvpn', 'ikev2', 'wireguard'] | |
| if protocol not in supported_protocols: | |
| return jsonify({ | |
| 'error': f'Unsupported protocol. Supported: {", ".join(supported_protocols)}' | |
| }), 400 | |
| # Validate client name | |
| if len(client_name) < 3 or len(client_name) > 100: | |
| return jsonify({'error': 'Client name must be 3-100 characters'}), 400 | |
| # Check if user can create more clients | |
| if not current_user.can_create_vpn_client(): | |
| return jsonify({ | |
| 'error': 'Maximum number of VPN clients reached for your subscription' | |
| }), 403 | |
| # Check if client name already exists for this user | |
| existing_client = VPNClient.query.filter_by( | |
| user_id=current_user.id, | |
| client_name=client_name | |
| ).first() | |
| if existing_client: | |
| return jsonify({'error': 'Client name already exists'}), 409 | |
| # Create VPN client | |
| vpn_client = VPNClient( | |
| user_id=current_user.id, | |
| client_name=client_name, | |
| protocol=protocol, | |
| device_type=device_type | |
| ) | |
| db.session.add(vpn_client) | |
| db.session.commit() | |
| # Generate certificates and configuration | |
| try: | |
| config_result = generate_client_configuration(vpn_client, current_user) | |
| # Update client with certificate information | |
| vpn_client.certificate_serial = config_result.get('certificate_serial') | |
| vpn_client.certificate_path = config_result.get('certificate_path') | |
| vpn_client.private_key_path = config_result.get('private_key_path') | |
| vpn_client.config_file_path = config_result.get('config_file_path') | |
| if protocol == 'wireguard': | |
| vpn_client.public_key = config_result.get('public_key') | |
| db.session.commit() | |
| except Exception as e: | |
| logger.error(f"Configuration generation error: {e}") | |
| db.session.rollback() | |
| return jsonify({'error': 'Failed to generate client configuration'}), 500 | |
| logger.info(f"VPN client created: {client_name} ({protocol}) for user {current_user.username}") | |
| return jsonify({ | |
| 'message': 'VPN client created successfully', | |
| 'client': vpn_client.to_dict(), | |
| 'download_url': f'/api/vpn-clients/{vpn_client.id}/download' | |
| }), 201 | |
| except Exception as e: | |
| logger.error(f"VPN client creation error: {e}") | |
| db.session.rollback() | |
| return jsonify({'error': 'Failed to create VPN client'}), 500 | |
| def get_vpn_client(current_user, client_id): | |
| """Get VPN client details""" | |
| try: | |
| client = VPNClient.query.filter_by( | |
| id=client_id, | |
| user_id=current_user.id | |
| ).first() | |
| if not client: | |
| return jsonify({'error': 'VPN client not found'}), 404 | |
| return jsonify({ | |
| 'client': client.to_dict() | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"VPN client retrieval error: {e}") | |
| return jsonify({'error': 'Failed to retrieve VPN client'}), 500 | |
| def update_vpn_client(current_user, client_id): | |
| """Update VPN client""" | |
| try: | |
| client = VPNClient.query.filter_by( | |
| id=client_id, | |
| user_id=current_user.id | |
| ).first() | |
| if not client: | |
| return jsonify({'error': 'VPN client not found'}), 404 | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({'error': 'No data provided'}), 400 | |
| # Update allowed fields | |
| if 'client_name' in data: | |
| new_name = data['client_name'].strip() | |
| if len(new_name) < 3 or len(new_name) > 100: | |
| return jsonify({'error': 'Client name must be 3-100 characters'}), 400 | |
| # Check if new name already exists for this user | |
| existing_client = VPNClient.query.filter_by( | |
| user_id=current_user.id, | |
| client_name=new_name | |
| ).first() | |
| if existing_client and existing_client.id != client.id: | |
| return jsonify({'error': 'Client name already exists'}), 409 | |
| client.client_name = new_name | |
| if 'device_type' in data: | |
| client.device_type = data['device_type'] | |
| if 'is_active' in data: | |
| client.is_active = bool(data['is_active']) | |
| db.session.commit() | |
| logger.info(f"VPN client updated: {client.client_name} for user {current_user.username}") | |
| return jsonify({ | |
| 'message': 'VPN client updated successfully', | |
| 'client': client.to_dict() | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"VPN client update error: {e}") | |
| db.session.rollback() | |
| return jsonify({'error': 'Failed to update VPN client'}), 500 | |
| def delete_vpn_client(current_user, client_id): | |
| """Delete VPN client""" | |
| try: | |
| client = VPNClient.query.filter_by( | |
| id=client_id, | |
| user_id=current_user.id | |
| ).first() | |
| if not client: | |
| return jsonify({'error': 'VPN client not found'}), 404 | |
| # Disconnect any active sessions | |
| active_sessions = [s for s in client.sessions if s.is_active()] | |
| for session in active_sessions: | |
| session.disconnect('Client deleted') | |
| # Remove certificate files | |
| try: | |
| if client.certificate_path and os.path.exists(client.certificate_path): | |
| os.remove(client.certificate_path) | |
| if client.private_key_path and os.path.exists(client.private_key_path): | |
| os.remove(client.private_key_path) | |
| if client.config_file_path and os.path.exists(client.config_file_path): | |
| os.remove(client.config_file_path) | |
| except Exception as e: | |
| logger.warning(f"Failed to remove certificate files: {e}") | |
| db.session.delete(client) | |
| db.session.commit() | |
| logger.info(f"VPN client deleted: {client.client_name} for user {current_user.username}") | |
| return jsonify({'message': 'VPN client deleted successfully'}), 200 | |
| except Exception as e: | |
| logger.error(f"VPN client deletion error: {e}") | |
| db.session.rollback() | |
| return jsonify({'error': 'Failed to delete VPN client'}), 500 | |
| def download_client_config(current_user, client_id): | |
| """Download VPN client configuration""" | |
| try: | |
| # Get VPN client | |
| vpn_client = VPNClient.query.filter_by( | |
| id=client_id, | |
| user_id=current_user.id | |
| ).first() | |
| if not vpn_client: | |
| return jsonify({'error': 'VPN client not found'}), 404 | |
| # Generate configuration if not exists | |
| if not vpn_client.config_data: | |
| config_result = generate_client_configuration(vpn_client, current_user) | |
| if not config_result.get('success'): | |
| return jsonify({'error': config_result.get('error', 'Failed to generate configuration')}), 500 | |
| # Create response with configuration file | |
| from flask import make_response | |
| filename = config_result.get('filename', f'{vpn_client.client_name}-{vpn_client.protocol}.conf') | |
| config_content = config_result.get('config', vpn_client.config_data) | |
| response = make_response(config_content) | |
| response.headers['Content-Type'] = 'application/octet-stream' | |
| response.headers['Content-Disposition'] = f'attachment; filename="{filename}"' | |
| logger.info(f"Configuration downloaded for client {client_id} by user {current_user.username}") | |
| return response | |
| except Exception as e: | |
| logger.error(f"Configuration download error: {e}") | |
| return jsonify({'error': 'Failed to download configuration'}), 500 | |
| def regenerate_vpn_config(current_user, client_id): | |
| """Regenerate VPN client configuration""" | |
| try: | |
| client = VPNClient.query.filter_by( | |
| id=client_id, | |
| user_id=current_user.id | |
| ).first() | |
| if not client: | |
| return jsonify({'error': 'VPN client not found'}), 404 | |
| # Disconnect any active sessions | |
| active_sessions = [s for s in client.sessions if s.is_active()] | |
| for session in active_sessions: | |
| session.disconnect('Configuration regenerated') | |
| # Remove old certificate files | |
| try: | |
| if client.certificate_path and os.path.exists(client.certificate_path): | |
| os.remove(client.certificate_path) | |
| if client.private_key_path and os.path.exists(client.private_key_path): | |
| os.remove(client.private_key_path) | |
| if client.config_file_path and os.path.exists(client.config_file_path): | |
| os.remove(client.config_file_path) | |
| except Exception as e: | |
| logger.warning(f"Failed to remove old certificate files: {e}") | |
| # Generate new configuration | |
| config_result = generate_client_configuration(client, current_user) | |
| # Update client with new certificate information | |
| client.certificate_serial = config_result.get('certificate_serial') | |
| client.certificate_path = config_result.get('certificate_path') | |
| client.private_key_path = config_result.get('private_key_path') | |
| client.config_file_path = config_result.get('config_file_path') | |
| if client.protocol == 'wireguard': | |
| client.public_key = config_result.get('public_key') | |
| db.session.commit() | |
| logger.info(f"Configuration regenerated: {client.client_name} for user {current_user.username}") | |
| return jsonify({ | |
| 'message': 'Configuration regenerated successfully', | |
| 'client': client.to_dict(), | |
| 'download_url': f'/api/vpn-clients/{client.id}/download' | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"Configuration regeneration error: {e}") | |
| db.session.rollback() | |
| return jsonify({'error': 'Failed to regenerate configuration'}), 500 | |
| def get_client_sessions(current_user, client_id): | |
| """Get VPN client session history""" | |
| try: | |
| client = VPNClient.query.filter_by( | |
| id=client_id, | |
| user_id=current_user.id | |
| ).first() | |
| if not client: | |
| return jsonify({'error': 'VPN client not found'}), 404 | |
| page = request.args.get('page', 1, type=int) | |
| per_page = request.args.get('per_page', 20, type=int) | |
| sessions = VPNSession.query.filter_by(client_id=client_id).order_by( | |
| VPNSession.connected_at.desc() | |
| ).paginate( | |
| page=page, | |
| per_page=per_page, | |
| error_out=False | |
| ) | |
| return jsonify({ | |
| 'sessions': [session.to_dict() for session in sessions.items], | |
| 'total': sessions.total, | |
| 'pages': sessions.pages, | |
| 'current_page': page, | |
| 'per_page': per_page, | |
| 'active_sessions': len([s for s in sessions.items if s.is_active()]) | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"Client sessions retrieval error: {e}") | |
| return jsonify({'error': 'Failed to retrieve sessions'}), 500 | |
| def generate_client_configuration(vpn_client, user): | |
| """Generate VPN client configuration and certificates""" | |
| try: | |
| # Initialize VPN server manager | |
| vpn_manager = VPNServerManager() | |
| # Generate client configuration for the specified protocol | |
| config_data = vpn_manager.generate_client_config( | |
| username=user.username, | |
| protocol=vpn_client.protocol, | |
| user_data={ | |
| 'email': user.email, | |
| 'subscription_type': user.subscription_type | |
| } | |
| ) | |
| # Update VPN client record | |
| vpn_client.server_ip = config_data.get('server_ip') | |
| vpn_client.server_port = config_data.get('server_port') | |
| vpn_client.config_data = config_data.get('config') | |
| vpn_client.status = 'active' | |
| vpn_client.created_at = datetime.utcnow() | |
| db.session.commit() | |
| logger.info(f"Generated {vpn_client.protocol} configuration for user {user.username}") | |
| return { | |
| 'success': True, | |
| 'config': config_data.get('config'), | |
| 'filename': config_data.get('filename'), | |
| 'protocol': config_data.get('protocol'), | |
| 'server_info': { | |
| 'ip': config_data.get('server_ip'), | |
| 'port': config_data.get('server_port') | |
| }, | |
| 'additional_data': { | |
| 'ca_certificate': config_data.get('ca_certificate'), | |
| 'client_certificate': config_data.get('client_certificate'), | |
| 'client_private_key': config_data.get('client_private_key') | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to generate client configuration: {e}") | |
| db.session.rollback() | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |