""" ISP API Routes Flask routes for the Virtual ISP Stack API endpoints """ from flask import Blueprint, jsonify, request, Response, send_file from flask_cors import cross_origin import json import time from typing import Dict, Any import asyncio # Import core modules from core.dhcp_server import DHCPServer from core.nat_engine import NATEngine from core.firewall import FirewallEngine, FirewallRule, FirewallRuleBuilder, FirewallAction, FirewallDirection from core.tcp_engine import TCPEngine from core.virtual_router import VirtualRouter from core.socket_translator import SocketTranslator from core.packet_bridge import PacketBridge from core.session_tracker import SessionTracker, SessionType, SessionState from core.logger import VirtualISPLogger, LogLevel, LogCategory, LogFilter from core.openvpn_manager import OpenVPNManager, initialize_openvpn_manager, get_openvpn_manager from core.traffic_router import TrafficRouter # Create blueprint isp_api = Blueprint("isp_api", __name__) # Global instances (will be initialized by main app) nat_engine: NATEngine = None traffic_router: TrafficRouter = None logger: VirtualISPLogger = None def init_engines(config: Dict[str, Any]): """Initialize all ISP stack engines including traffic router""" global nat_engine, traffic_router, logger try: # Initialize logger first logger = VirtualISPLogger({}) logger.start() # Initialize NAT engine nat_engine = NATEngine(config.get("nat", {})) # Initialize OpenVPN Manager initialize_openvpn_manager(config.get("openvpn", {})) # Initialize traffic router with NAT engine traffic_router = TrafficRouter(config.get("traffic_router", {})) traffic_router.set_components(nat_engine=nat_engine) # Start all engines nat_engine.start() # Run traffic router in a new event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(traffic_router.start()) logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", "All engines initialized and started") return True except Exception as e: print(f"Error initializing engines: {e}") return False # Default configuration DEFAULT_CONFIG = { "nat": { "port_range_start": 10000, "port_range_end": 65535, "session_timeout": 300 }, "traffic_router": { "vpn_interface": "eth0", "internet_interface": "eth1", }, } @isp_api.route("/api/status", methods=["GET"]) @cross_origin() def get_status(): """Get system status including traffic router""" try: # Collect status from all components status = { "components": {}, "stats": {}, "timestamp": time.time(), } # Component status status["components"]["nat_engine"] = nat_engine.is_running if nat_engine else False status["components"]["traffic_router"] = traffic_router.is_running if traffic_router else False status["components"]["logger"] = logger.is_running if logger else False # Statistics status["stats"]["nat_sessions"] = len(nat_engine.get_sessions()) if nat_engine else 0 # Traffic router stats if traffic_router: traffic_stats = traffic_router.get_stats() status["stats"]["traffic_router"] = traffic_stats return jsonify({ "status": "success", "system_status": status }) except Exception as e: return jsonify({ "status": "error", "message": str(e) }), 500 @isp_api.route("/api/config", methods=["POST"]) @cross_origin() def update_config(): """Update system configuration""" try: config_data = request.get_json() # Here you would update the actual configuration # For now, just return success if logger: logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", "Configuration updated", metadata=config_data) return jsonify({ "status": "success", "message": "Configuration updated successfully" }) except Exception as e: if logger: logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Configuration update failed: {str(e)}") return jsonify({ "status": "error", "message": str(e) }), 500 # NAT endpoints @isp_api.route("/nat/sessions", methods=["GET"]) @cross_origin() def get_nat_sessions(): """Get NAT session table""" try: if not nat_engine: return jsonify({"status": "error", "message": "NAT engine not initialized"}), 500 sessions = nat_engine.get_sessions() return jsonify({ "status": "success", "sessions": sessions, "count": len(sessions) }) except Exception as e: return jsonify({ "status": "error", "message": str(e) }), 500 @isp_api.route("/nat/stats", methods=["GET"]) @cross_origin() def get_nat_stats(): """Get NAT statistics""" try: if not nat_engine: return jsonify({"status": "error", "message": "NAT engine not initialized"}), 500 stats = nat_engine.get_stats() return jsonify({ "status": "success", "stats": stats }) except Exception as e: return jsonify({ "status": "error", "message": str(e) }), 500 @isp_api.route("/openvpn/start", methods=["POST"]) @cross_origin() def start_openvpn(): """Start the OpenVPN server.""" try: openvpn_manager = get_openvpn_manager() if not openvpn_manager: return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500 openvpn_manager.start_server() if logger: logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", "OpenVPN server started via API") return jsonify({"status": "success", "message": "OpenVPN server started successfully"}) except Exception as e: if logger: logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to start OpenVPN server via API: {str(e)}") return jsonify({"status": "error", "message": str(e)}), 500 @isp_api.route("/openvpn/status", methods=["GET"]) @cross_origin() def get_openvpn_status(): """Get OpenVPN server status.""" try: openvpn_manager = get_openvpn_manager() if not openvpn_manager: return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500 status = openvpn_manager.get_server_status() clients = openvpn_manager.get_connected_clients() return jsonify({ "status": "success", "server_status": status.__dict__, "connected_clients": clients }) except Exception as e: if logger: logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to get OpenVPN server status: {str(e)}") return jsonify({"status": "error", "message": str(e)}), 500 @isp_api.route("/openvpn/simulate_client", methods=["POST"]) @cross_origin() def simulate_openvpn_client(): """Simulate a VPN client connection.""" try: openvpn_manager = get_openvpn_manager() if not openvpn_manager: return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500 data = request.get_json() client_id = data.get("client_id") ip_address = data.get("ip_address") if not client_id or not ip_address: return jsonify({"status": "error", "message": "client_id and ip_address are required"}), 400 # Directly add client to the manager's clients dictionary from core.openvpn_manager import VPNClient client = VPNClient( client_id=client_id, common_name=client_id, ip_address=ip_address, connected_at=time.time(), bytes_received=0, bytes_sent=0, status="connected", routed_through_vpn=True ) openvpn_manager.clients[client_id] = client if logger: logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", f"Simulated VPN client connection: {client_id} ({ip_address})") return jsonify({"status": "success", "message": f"Simulated client {client_id} connected with IP {ip_address}"}) except Exception as e: if logger: logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to simulate VPN client connection: {str(e)}") return jsonify({"status": "error", "message": str(e)}), 500 @isp_api.route("/get_engines", methods=["GET"]) @cross_origin() def get_engines(): """Get references to NAT engine and Traffic Router for testing purposes.""" try: global nat_engine, traffic_router return jsonify({ "status": "success", "nat_engine_initialized": nat_engine is not None, "traffic_router_initialized": traffic_router is not None }) except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 @isp_api.route("/openvpn/generate_client_config", methods=["POST"]) @cross_origin() def generate_client_config_route(): """Generate OpenVPN client configuration file.""" try: openvpn_manager = get_openvpn_manager() if not openvpn_manager: return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500 data = request.get_json() client_name = data.get("client_name") server_ip = data.get("server_ip") if not client_name or not server_ip: return jsonify({"status": "error", "message": "client_name and server_ip are required"}), 400 config_content = openvpn_manager.generate_client_config(client_name, server_ip) if logger: logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", f"Generated client config for {client_name}") return jsonify({"status": "success", "config": config_content}) except Exception as e: if logger: logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to generate client config: {str(e)}") return jsonify({"status": "error", "message": str(e)}), 500 import smtplib from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.mime.text import MIMEText from email import encoders # Email configuration (replace with your actual details) SMTP_SERVER = "smtp.example.com" SMTP_PORT = 587 EMAIL_ADDRESS = "your_email@example.com" EMAIL_PASSWORD = "your_email_password" @isp_api.route("/api/openvpn/email_config", methods=["POST"]) @cross_origin() def email_client_config(): """Email OpenVPN client configuration file to the user.""" try: data = request.get_json() recipient_email = data.get("email") client_name = data.get("client_name") server_ip = data.get("server_ip") if not recipient_email or not client_name or not server_ip: return jsonify({"status": "error", "message": "email, client_name, and server_ip are required"}), 400 openvpn_manager = get_openvpn_manager() if not openvpn_manager: return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500 config_content = openvpn_manager.generate_client_config(client_name, server_ip) config_filename = f"{client_name}.ovpn" config_path = os.path.join(openvpn_manager.config_storage_path, config_filename) with open(config_path, "w") as f: f.write(config_content) msg = MIMEMultipart() msg["From"] = EMAIL_ADDRESS msg["To"] = recipient_email msg["Subject"] = "Your OpenVPN Configuration File" body = "Please find your OpenVPN configuration file attached." msg.attach(MIMEText(body, "plain")) with open(config_path, "rb") as attachment: part = MIMEBase("application", "octet-stream") part.set_payload(attachment.read()) encoders.encode_base64(part) part.add_header("Content-Disposition", f"attachment; filename= {config_filename}") msg.attach(part) with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: server.starttls() server.login(EMAIL_ADDRESS, EMAIL_PASSWORD) server.send_message(msg) os.remove(config_path) # Clean up the generated file if logger: logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", f"Emailed client config to {recipient_email} for {client_name}") return jsonify({"status": "success", "message": f"Configuration emailed to {recipient_email}"}) except Exception as e: if logger: logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to email client config: {str(e)}") return jsonify({"status": "error", "message": str(e)}), 500 @isp_api.route("/api/openvpn/download_config/", methods=["GET"]) @cross_origin() def download_client_config(client_name): """Download OpenVPN client configuration file.""" try: openvpn_manager = get_openvpn_manager() if not openvpn_manager: return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500 # Generate the config (or retrieve if already generated) # For simplicity, we'll regenerate it here. In a real app, you might store and retrieve. server_ip = request.host.split(":")[0] # Get current host IP config_content = openvpn_manager.generate_client_config(client_name, server_ip) config_filename = f"{client_name}.ovpn" config_path = os.path.join(openvpn_manager.config_storage_path, config_filename) with open(config_path, "w") as f: f.write(config_content) if logger: logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", f"Serving client config for download: {client_name}") return send_file(config_path, as_attachment=True, download_name=config_filename) except Exception as e: if logger: logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to download client config: {str(e)}") return jsonify({"status": "error", "message": str(e)}), 500