Spaces:
Paused
Paused
| """ | |
| ISP API Routes | |
| Flask routes for the Virtual ISP Stack API endpoints | |
| """ | |
| from flask import Blueprint, jsonify, request, Response, send_file | |
| from flask_cors import cross_origin | |
| import json | |
| import time | |
| from typing import Dict, Any | |
| import asyncio | |
| # Import core modules | |
| from core.dhcp_server import DHCPServer | |
| from core.nat_engine import NATEngine | |
| from core.firewall import FirewallEngine, FirewallRule, FirewallRuleBuilder, FirewallAction, FirewallDirection | |
| from core.tcp_engine import TCPEngine | |
| from core.virtual_router import VirtualRouter | |
| from core.socket_translator import SocketTranslator | |
| from core.packet_bridge import PacketBridge | |
| from core.session_tracker import SessionTracker, SessionType, SessionState | |
| from core.logger import VirtualISPLogger, LogLevel, LogCategory, LogFilter | |
| from core.openvpn_manager import OpenVPNManager, initialize_openvpn_manager, get_openvpn_manager | |
| from core.traffic_router import TrafficRouter | |
| # Create blueprint | |
| isp_api = Blueprint("isp_api", __name__) | |
| # Global instances (will be initialized by main app) | |
| nat_engine: NATEngine = None | |
| traffic_router: TrafficRouter = None | |
| logger: VirtualISPLogger = None | |
| def init_engines(config: Dict[str, Any]): | |
| """Initialize all ISP stack engines including traffic router""" | |
| global nat_engine, traffic_router, logger | |
| try: | |
| # Initialize logger first | |
| logger = VirtualISPLogger({}) | |
| logger.start() | |
| # Initialize NAT engine | |
| nat_engine = NATEngine(config.get("nat", {})) | |
| # Initialize OpenVPN Manager | |
| initialize_openvpn_manager(config.get("openvpn", {})) | |
| # Initialize traffic router with NAT engine | |
| traffic_router = TrafficRouter(config.get("traffic_router", {})) | |
| traffic_router.set_components(nat_engine=nat_engine) | |
| # Start all engines | |
| nat_engine.start() | |
| # Run traffic router in a new event loop | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| loop.run_until_complete(traffic_router.start()) | |
| logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", "All engines initialized and started") | |
| return True | |
| except Exception as e: | |
| print(f"Error initializing engines: {e}") | |
| return False | |
| # Default configuration | |
| DEFAULT_CONFIG = { | |
| "nat": { | |
| "port_range_start": 10000, | |
| "port_range_end": 65535, | |
| "session_timeout": 300 | |
| }, | |
| "traffic_router": { | |
| "vpn_interface": "eth0", | |
| "internet_interface": "eth1", | |
| }, | |
| } | |
| def get_status(): | |
| """Get system status including traffic router""" | |
| try: | |
| # Collect status from all components | |
| status = { | |
| "components": {}, | |
| "stats": {}, | |
| "timestamp": time.time(), | |
| } | |
| # Component status | |
| status["components"]["nat_engine"] = nat_engine.is_running if nat_engine else False | |
| status["components"]["traffic_router"] = traffic_router.is_running if traffic_router else False | |
| status["components"]["logger"] = logger.is_running if logger else False | |
| # Statistics | |
| status["stats"]["nat_sessions"] = len(nat_engine.get_sessions()) if nat_engine else 0 | |
| # Traffic router stats | |
| if traffic_router: | |
| traffic_stats = traffic_router.get_stats() | |
| status["stats"]["traffic_router"] = traffic_stats | |
| return jsonify({ | |
| "status": "success", | |
| "system_status": status | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| "status": "error", | |
| "message": str(e) | |
| }), 500 | |
| def update_config(): | |
| """Update system configuration""" | |
| try: | |
| config_data = request.get_json() | |
| # Here you would update the actual configuration | |
| # For now, just return success | |
| if logger: | |
| logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", "Configuration updated", metadata=config_data) | |
| return jsonify({ | |
| "status": "success", | |
| "message": "Configuration updated successfully" | |
| }) | |
| except Exception as e: | |
| if logger: | |
| logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Configuration update failed: {str(e)}") | |
| return jsonify({ | |
| "status": "error", | |
| "message": str(e) | |
| }), 500 | |
| # NAT endpoints | |
| def get_nat_sessions(): | |
| """Get NAT session table""" | |
| try: | |
| if not nat_engine: | |
| return jsonify({"status": "error", "message": "NAT engine not initialized"}), 500 | |
| sessions = nat_engine.get_sessions() | |
| return jsonify({ | |
| "status": "success", | |
| "sessions": sessions, | |
| "count": len(sessions) | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| "status": "error", | |
| "message": str(e) | |
| }), 500 | |
| def get_nat_stats(): | |
| """Get NAT statistics""" | |
| try: | |
| if not nat_engine: | |
| return jsonify({"status": "error", "message": "NAT engine not initialized"}), 500 | |
| stats = nat_engine.get_stats() | |
| return jsonify({ | |
| "status": "success", | |
| "stats": stats | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| "status": "error", | |
| "message": str(e) | |
| }), 500 | |
| def start_openvpn(): | |
| """Start the OpenVPN server.""" | |
| try: | |
| openvpn_manager = get_openvpn_manager() | |
| if not openvpn_manager: | |
| return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500 | |
| openvpn_manager.start_server() | |
| if logger: | |
| logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", "OpenVPN server started via API") | |
| return jsonify({"status": "success", "message": "OpenVPN server started successfully"}) | |
| except Exception as e: | |
| if logger: | |
| logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to start OpenVPN server via API: {str(e)}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| def get_openvpn_status(): | |
| """Get OpenVPN server status.""" | |
| try: | |
| openvpn_manager = get_openvpn_manager() | |
| if not openvpn_manager: | |
| return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500 | |
| status = openvpn_manager.get_server_status() | |
| clients = openvpn_manager.get_connected_clients() | |
| return jsonify({ | |
| "status": "success", | |
| "server_status": status.__dict__, | |
| "connected_clients": clients | |
| }) | |
| except Exception as e: | |
| if logger: | |
| logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to get OpenVPN server status: {str(e)}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| def simulate_openvpn_client(): | |
| """Simulate a VPN client connection.""" | |
| try: | |
| openvpn_manager = get_openvpn_manager() | |
| if not openvpn_manager: | |
| return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500 | |
| data = request.get_json() | |
| client_id = data.get("client_id") | |
| ip_address = data.get("ip_address") | |
| if not client_id or not ip_address: | |
| return jsonify({"status": "error", "message": "client_id and ip_address are required"}), 400 | |
| # Directly add client to the manager's clients dictionary | |
| from core.openvpn_manager import VPNClient | |
| client = VPNClient( | |
| client_id=client_id, | |
| common_name=client_id, | |
| ip_address=ip_address, | |
| connected_at=time.time(), | |
| bytes_received=0, | |
| bytes_sent=0, | |
| status="connected", | |
| routed_through_vpn=True | |
| ) | |
| openvpn_manager.clients[client_id] = client | |
| if logger: | |
| logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", f"Simulated VPN client connection: {client_id} ({ip_address})") | |
| return jsonify({"status": "success", "message": f"Simulated client {client_id} connected with IP {ip_address}"}) | |
| except Exception as e: | |
| if logger: | |
| logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to simulate VPN client connection: {str(e)}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| def get_engines(): | |
| """Get references to NAT engine and Traffic Router for testing purposes.""" | |
| try: | |
| global nat_engine, traffic_router | |
| return jsonify({ | |
| "status": "success", | |
| "nat_engine_initialized": nat_engine is not None, | |
| "traffic_router_initialized": traffic_router is not None | |
| }) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| def generate_client_config_route(): | |
| """Generate OpenVPN client configuration file.""" | |
| try: | |
| openvpn_manager = get_openvpn_manager() | |
| if not openvpn_manager: | |
| return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500 | |
| data = request.get_json() | |
| client_name = data.get("client_name") | |
| server_ip = data.get("server_ip") | |
| if not client_name or not server_ip: | |
| return jsonify({"status": "error", "message": "client_name and server_ip are required"}), 400 | |
| config_content = openvpn_manager.generate_client_config(client_name, server_ip) | |
| if logger: | |
| logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", f"Generated client config for {client_name}") | |
| return jsonify({"status": "success", "config": config_content}) | |
| except Exception as e: | |
| if logger: | |
| logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to generate client config: {str(e)}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| import smtplib | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.base import MIMEBase | |
| from email.mime.text import MIMEText | |
| from email import encoders | |
| # Email configuration (replace with your actual details) | |
| SMTP_SERVER = "smtp.example.com" | |
| SMTP_PORT = 587 | |
| EMAIL_ADDRESS = "your_email@example.com" | |
| EMAIL_PASSWORD = "your_email_password" | |
| def email_client_config(): | |
| """Email OpenVPN client configuration file to the user.""" | |
| try: | |
| data = request.get_json() | |
| recipient_email = data.get("email") | |
| client_name = data.get("client_name") | |
| server_ip = data.get("server_ip") | |
| if not recipient_email or not client_name or not server_ip: | |
| return jsonify({"status": "error", "message": "email, client_name, and server_ip are required"}), 400 | |
| openvpn_manager = get_openvpn_manager() | |
| if not openvpn_manager: | |
| return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500 | |
| config_content = openvpn_manager.generate_client_config(client_name, server_ip) | |
| config_filename = f"{client_name}.ovpn" | |
| config_path = os.path.join(openvpn_manager.config_storage_path, config_filename) | |
| with open(config_path, "w") as f: | |
| f.write(config_content) | |
| msg = MIMEMultipart() | |
| msg["From"] = EMAIL_ADDRESS | |
| msg["To"] = recipient_email | |
| msg["Subject"] = "Your OpenVPN Configuration File" | |
| body = "Please find your OpenVPN configuration file attached." | |
| msg.attach(MIMEText(body, "plain")) | |
| with open(config_path, "rb") as attachment: | |
| part = MIMEBase("application", "octet-stream") | |
| part.set_payload(attachment.read()) | |
| encoders.encode_base64(part) | |
| part.add_header("Content-Disposition", f"attachment; filename= {config_filename}") | |
| msg.attach(part) | |
| with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: | |
| server.starttls() | |
| server.login(EMAIL_ADDRESS, EMAIL_PASSWORD) | |
| server.send_message(msg) | |
| os.remove(config_path) # Clean up the generated file | |
| if logger: | |
| logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", f"Emailed client config to {recipient_email} for {client_name}") | |
| return jsonify({"status": "success", "message": f"Configuration emailed to {recipient_email}"}) | |
| except Exception as e: | |
| if logger: | |
| logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to email client config: {str(e)}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| def download_client_config(client_name): | |
| """Download OpenVPN client configuration file.""" | |
| try: | |
| openvpn_manager = get_openvpn_manager() | |
| if not openvpn_manager: | |
| return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500 | |
| # Generate the config (or retrieve if already generated) | |
| # For simplicity, we'll regenerate it here. In a real app, you might store and retrieve. | |
| server_ip = request.host.split(":")[0] # Get current host IP | |
| config_content = openvpn_manager.generate_client_config(client_name, server_ip) | |
| config_filename = f"{client_name}.ovpn" | |
| config_path = os.path.join(openvpn_manager.config_storage_path, config_filename) | |
| with open(config_path, "w") as f: | |
| f.write(config_content) | |
| if logger: | |
| logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", f"Serving client config for download: {client_name}") | |
| return send_file(config_path, as_attachment=True, download_name=config_filename) | |
| except Exception as e: | |
| if logger: | |
| logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to download client config: {str(e)}") | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |