HINTECH / routes /vpn_client.py
Factor Studios
Upload 73 files
aaaaa79 verified
"""
VPN Client Management Routes
Flask routes for VPN client creation, configuration, and management
"""
from flask import Blueprint, request, jsonify, send_file, current_app
from flask_cors import cross_origin
from models.enhanced_user import db, User, VPNClient, VPNSession
from routes.auth import token_required
import os
import json
import logging
import tempfile
import zipfile
from datetime import datetime
logger = logging.getLogger(__name__)
vpn_client_bp = Blueprint('vpn_client', __name__)
@vpn_client_bp.route('/vpn-clients', methods=['GET'])
@cross_origin()
@token_required
def list_vpn_clients(current_user):
"""List user's VPN clients"""
try:
clients = VPNClient.query.filter_by(user_id=current_user.id).all()
return jsonify({
'clients': [client.to_dict() for client in clients],
'total': len(clients),
'can_create_more': current_user.can_create_vpn_client()
}), 200
except Exception as e:
logger.error(f"VPN client listing error: {e}")
return jsonify({'error': 'Failed to retrieve VPN clients'}), 500
@vpn_client_bp.route('/vpn-clients', methods=['POST'])
@cross_origin()
@token_required
def create_vpn_client(current_user):
"""Create new VPN client configuration"""
try:
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Validate required fields
required_fields = ['client_name', 'protocol']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({'error': f'{field} is required'}), 400
client_name = data['client_name'].strip()
protocol = data['protocol'].lower()
device_type = data.get('device_type', 'unknown')
# Validate protocol
supported_protocols = ['openvpn', 'ikev2', 'wireguard']
if protocol not in supported_protocols:
return jsonify({
'error': f'Unsupported protocol. Supported: {", ".join(supported_protocols)}'
}), 400
# Validate client name
if len(client_name) < 3 or len(client_name) > 100:
return jsonify({'error': 'Client name must be 3-100 characters'}), 400
# Check if user can create more clients
if not current_user.can_create_vpn_client():
return jsonify({
'error': 'Maximum number of VPN clients reached for your subscription'
}), 403
# Check if client name already exists for this user
existing_client = VPNClient.query.filter_by(
user_id=current_user.id,
client_name=client_name
).first()
if existing_client:
return jsonify({'error': 'Client name already exists'}), 409
# Create VPN client
vpn_client = VPNClient(
user_id=current_user.id,
client_name=client_name,
protocol=protocol,
device_type=device_type
)
db.session.add(vpn_client)
db.session.commit()
# Generate certificates and configuration
try:
config_result = generate_client_configuration(vpn_client, current_user)
# Update client with certificate information
vpn_client.certificate_serial = config_result.get('certificate_serial')
vpn_client.certificate_path = config_result.get('certificate_path')
vpn_client.private_key_path = config_result.get('private_key_path')
vpn_client.config_file_path = config_result.get('config_file_path')
if protocol == 'wireguard':
vpn_client.public_key = config_result.get('public_key')
db.session.commit()
except Exception as e:
logger.error(f"Configuration generation error: {e}")
db.session.rollback()
return jsonify({'error': 'Failed to generate client configuration'}), 500
logger.info(f"VPN client created: {client_name} ({protocol}) for user {current_user.username}")
return jsonify({
'message': 'VPN client created successfully',
'client': vpn_client.to_dict(),
'download_url': f'/api/vpn-clients/{vpn_client.id}/download'
}), 201
except Exception as e:
logger.error(f"VPN client creation error: {e}")
db.session.rollback()
return jsonify({'error': 'Failed to create VPN client'}), 500
@vpn_client_bp.route('/vpn-clients/<int:client_id>', methods=['GET'])
@cross_origin()
@token_required
def get_vpn_client(current_user, client_id):
"""Get VPN client details"""
try:
client = VPNClient.query.filter_by(
id=client_id,
user_id=current_user.id
).first()
if not client:
return jsonify({'error': 'VPN client not found'}), 404
return jsonify({
'client': client.to_dict()
}), 200
except Exception as e:
logger.error(f"VPN client retrieval error: {e}")
return jsonify({'error': 'Failed to retrieve VPN client'}), 500
@vpn_client_bp.route('/vpn-clients/<int:client_id>', methods=['PUT'])
@cross_origin()
@token_required
def update_vpn_client(current_user, client_id):
"""Update VPN client"""
try:
client = VPNClient.query.filter_by(
id=client_id,
user_id=current_user.id
).first()
if not client:
return jsonify({'error': 'VPN client not found'}), 404
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Update allowed fields
if 'client_name' in data:
new_name = data['client_name'].strip()
if len(new_name) < 3 or len(new_name) > 100:
return jsonify({'error': 'Client name must be 3-100 characters'}), 400
# Check if new name already exists for this user
existing_client = VPNClient.query.filter_by(
user_id=current_user.id,
client_name=new_name
).first()
if existing_client and existing_client.id != client.id:
return jsonify({'error': 'Client name already exists'}), 409
client.client_name = new_name
if 'device_type' in data:
client.device_type = data['device_type']
if 'is_active' in data:
client.is_active = bool(data['is_active'])
db.session.commit()
logger.info(f"VPN client updated: {client.client_name} for user {current_user.username}")
return jsonify({
'message': 'VPN client updated successfully',
'client': client.to_dict()
}), 200
except Exception as e:
logger.error(f"VPN client update error: {e}")
db.session.rollback()
return jsonify({'error': 'Failed to update VPN client'}), 500
@vpn_client_bp.route('/vpn-clients/<int:client_id>', methods=['DELETE'])
@cross_origin()
@token_required
def delete_vpn_client(current_user, client_id):
"""Delete VPN client"""
try:
client = VPNClient.query.filter_by(
id=client_id,
user_id=current_user.id
).first()
if not client:
return jsonify({'error': 'VPN client not found'}), 404
# Disconnect any active sessions
active_sessions = [s for s in client.sessions if s.is_active()]
for session in active_sessions:
session.disconnect('Client deleted')
# Remove certificate files
try:
if client.certificate_path and os.path.exists(client.certificate_path):
os.remove(client.certificate_path)
if client.private_key_path and os.path.exists(client.private_key_path):
os.remove(client.private_key_path)
if client.config_file_path and os.path.exists(client.config_file_path):
os.remove(client.config_file_path)
except Exception as e:
logger.warning(f"Failed to remove certificate files: {e}")
db.session.delete(client)
db.session.commit()
logger.info(f"VPN client deleted: {client.client_name} for user {current_user.username}")
return jsonify({'message': 'VPN client deleted successfully'}), 200
except Exception as e:
logger.error(f"VPN client deletion error: {e}")
db.session.rollback()
return jsonify({'error': 'Failed to delete VPN client'}), 500
@vpn_client_bp.route('/vpn-clients/<int:client_id>/download', methods=['GET'])
@cross_origin()
@token_required
def download_client_config(current_user, client_id):
"""Download VPN client configuration"""
try:
# Get VPN client
vpn_client = VPNClient.query.filter_by(
id=client_id,
user_id=current_user.id
).first()
if not vpn_client:
return jsonify({'error': 'VPN client not found'}), 404
# Generate configuration if not exists
if not vpn_client.config_data:
config_result = generate_client_configuration(vpn_client, current_user)
if not config_result.get('success'):
return jsonify({'error': config_result.get('error', 'Failed to generate configuration')}), 500
# Create response with configuration file
from flask import make_response
filename = config_result.get('filename', f'{vpn_client.client_name}-{vpn_client.protocol}.conf')
config_content = config_result.get('config', vpn_client.config_data)
response = make_response(config_content)
response.headers['Content-Type'] = 'application/octet-stream'
response.headers['Content-Disposition'] = f'attachment; filename="{filename}"'
logger.info(f"Configuration downloaded for client {client_id} by user {current_user.username}")
return response
except Exception as e:
logger.error(f"Configuration download error: {e}")
return jsonify({'error': 'Failed to download configuration'}), 500
@vpn_client_bp.route('/vpn-clients/<int:client_id>/regenerate', methods=['POST'])
@cross_origin()
@token_required
def regenerate_vpn_config(current_user, client_id):
"""Regenerate VPN client configuration"""
try:
client = VPNClient.query.filter_by(
id=client_id,
user_id=current_user.id
).first()
if not client:
return jsonify({'error': 'VPN client not found'}), 404
# Disconnect any active sessions
active_sessions = [s for s in client.sessions if s.is_active()]
for session in active_sessions:
session.disconnect('Configuration regenerated')
# Remove old certificate files
try:
if client.certificate_path and os.path.exists(client.certificate_path):
os.remove(client.certificate_path)
if client.private_key_path and os.path.exists(client.private_key_path):
os.remove(client.private_key_path)
if client.config_file_path and os.path.exists(client.config_file_path):
os.remove(client.config_file_path)
except Exception as e:
logger.warning(f"Failed to remove old certificate files: {e}")
# Generate new configuration
config_result = generate_client_configuration(client, current_user)
# Update client with new certificate information
client.certificate_serial = config_result.get('certificate_serial')
client.certificate_path = config_result.get('certificate_path')
client.private_key_path = config_result.get('private_key_path')
client.config_file_path = config_result.get('config_file_path')
if client.protocol == 'wireguard':
client.public_key = config_result.get('public_key')
db.session.commit()
logger.info(f"Configuration regenerated: {client.client_name} for user {current_user.username}")
return jsonify({
'message': 'Configuration regenerated successfully',
'client': client.to_dict(),
'download_url': f'/api/vpn-clients/{client.id}/download'
}), 200
except Exception as e:
logger.error(f"Configuration regeneration error: {e}")
db.session.rollback()
return jsonify({'error': 'Failed to regenerate configuration'}), 500
@vpn_client_bp.route('/vpn-clients/<int:client_id>/sessions', methods=['GET'])
@cross_origin()
@token_required
def get_client_sessions(current_user, client_id):
"""Get VPN client session history"""
try:
client = VPNClient.query.filter_by(
id=client_id,
user_id=current_user.id
).first()
if not client:
return jsonify({'error': 'VPN client not found'}), 404
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
sessions = VPNSession.query.filter_by(client_id=client_id).order_by(
VPNSession.connected_at.desc()
).paginate(
page=page,
per_page=per_page,
error_out=False
)
return jsonify({
'sessions': [session.to_dict() for session in sessions.items],
'total': sessions.total,
'pages': sessions.pages,
'current_page': page,
'per_page': per_page,
'active_sessions': len([s for s in sessions.items if s.is_active()])
}), 200
except Exception as e:
logger.error(f"Client sessions retrieval error: {e}")
return jsonify({'error': 'Failed to retrieve sessions'}), 500
def generate_client_configuration(vpn_client, user):
"""Generate VPN client configuration and certificates"""
try:
# Initialize VPN server manager
vpn_manager = VPNServerManager()
# Generate client configuration for the specified protocol
config_data = vpn_manager.generate_client_config(
username=user.username,
protocol=vpn_client.protocol,
user_data={
'email': user.email,
'subscription_type': user.subscription_type
}
)
# Update VPN client record
vpn_client.server_ip = config_data.get('server_ip')
vpn_client.server_port = config_data.get('server_port')
vpn_client.config_data = config_data.get('config')
vpn_client.status = 'active'
vpn_client.created_at = datetime.utcnow()
db.session.commit()
logger.info(f"Generated {vpn_client.protocol} configuration for user {user.username}")
return {
'success': True,
'config': config_data.get('config'),
'filename': config_data.get('filename'),
'protocol': config_data.get('protocol'),
'server_info': {
'ip': config_data.get('server_ip'),
'port': config_data.get('server_port')
},
'additional_data': {
'ca_certificate': config_data.get('ca_certificate'),
'client_certificate': config_data.get('client_certificate'),
'client_private_key': config_data.get('client_private_key')
}
}
except Exception as e:
logger.error(f"Failed to generate client configuration: {e}")
db.session.rollback()
return {
'success': False,
'error': str(e)
}