HINTECH / routes /isp_api.py
Factor Studios
Upload 73 files
aaaaa79 verified
"""
ISP API Routes
Flask routes for the Virtual ISP Stack API endpoints
"""
from flask import Blueprint, jsonify, request, Response, send_file
from flask_cors import cross_origin
import json
import time
from typing import Dict, Any
import asyncio
# Import core modules
from core.dhcp_server import DHCPServer
from core.nat_engine import NATEngine
from core.firewall import FirewallEngine, FirewallRule, FirewallRuleBuilder, FirewallAction, FirewallDirection
from core.tcp_engine import TCPEngine
from core.virtual_router import VirtualRouter
from core.socket_translator import SocketTranslator
from core.packet_bridge import PacketBridge
from core.session_tracker import SessionTracker, SessionType, SessionState
from core.logger import VirtualISPLogger, LogLevel, LogCategory, LogFilter
from core.openvpn_manager import OpenVPNManager, initialize_openvpn_manager, get_openvpn_manager
from core.traffic_router import TrafficRouter
# Create blueprint
isp_api = Blueprint("isp_api", __name__)
# Global instances (will be initialized by main app)
nat_engine: NATEngine = None
traffic_router: TrafficRouter = None
logger: VirtualISPLogger = None
def init_engines(config: Dict[str, Any]):
"""Initialize all ISP stack engines including traffic router"""
global nat_engine, traffic_router, logger
try:
# Initialize logger first
logger = VirtualISPLogger({})
logger.start()
# Initialize NAT engine
nat_engine = NATEngine(config.get("nat", {}))
# Initialize OpenVPN Manager
initialize_openvpn_manager(config.get("openvpn", {}))
# Initialize traffic router with NAT engine
traffic_router = TrafficRouter(config.get("traffic_router", {}))
traffic_router.set_components(nat_engine=nat_engine)
# Start all engines
nat_engine.start()
# Run traffic router in a new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(traffic_router.start())
logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", "All engines initialized and started")
return True
except Exception as e:
print(f"Error initializing engines: {e}")
return False
# Default configuration
DEFAULT_CONFIG = {
"nat": {
"port_range_start": 10000,
"port_range_end": 65535,
"session_timeout": 300
},
"traffic_router": {
"vpn_interface": "eth0",
"internet_interface": "eth1",
},
}
@isp_api.route("/api/status", methods=["GET"])
@cross_origin()
def get_status():
"""Get system status including traffic router"""
try:
# Collect status from all components
status = {
"components": {},
"stats": {},
"timestamp": time.time(),
}
# Component status
status["components"]["nat_engine"] = nat_engine.is_running if nat_engine else False
status["components"]["traffic_router"] = traffic_router.is_running if traffic_router else False
status["components"]["logger"] = logger.is_running if logger else False
# Statistics
status["stats"]["nat_sessions"] = len(nat_engine.get_sessions()) if nat_engine else 0
# Traffic router stats
if traffic_router:
traffic_stats = traffic_router.get_stats()
status["stats"]["traffic_router"] = traffic_stats
return jsonify({
"status": "success",
"system_status": status
})
except Exception as e:
return jsonify({
"status": "error",
"message": str(e)
}), 500
@isp_api.route("/api/config", methods=["POST"])
@cross_origin()
def update_config():
"""Update system configuration"""
try:
config_data = request.get_json()
# Here you would update the actual configuration
# For now, just return success
if logger:
logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", "Configuration updated", metadata=config_data)
return jsonify({
"status": "success",
"message": "Configuration updated successfully"
})
except Exception as e:
if logger:
logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Configuration update failed: {str(e)}")
return jsonify({
"status": "error",
"message": str(e)
}), 500
# NAT endpoints
@isp_api.route("/nat/sessions", methods=["GET"])
@cross_origin()
def get_nat_sessions():
"""Get NAT session table"""
try:
if not nat_engine:
return jsonify({"status": "error", "message": "NAT engine not initialized"}), 500
sessions = nat_engine.get_sessions()
return jsonify({
"status": "success",
"sessions": sessions,
"count": len(sessions)
})
except Exception as e:
return jsonify({
"status": "error",
"message": str(e)
}), 500
@isp_api.route("/nat/stats", methods=["GET"])
@cross_origin()
def get_nat_stats():
"""Get NAT statistics"""
try:
if not nat_engine:
return jsonify({"status": "error", "message": "NAT engine not initialized"}), 500
stats = nat_engine.get_stats()
return jsonify({
"status": "success",
"stats": stats
})
except Exception as e:
return jsonify({
"status": "error",
"message": str(e)
}), 500
@isp_api.route("/openvpn/start", methods=["POST"])
@cross_origin()
def start_openvpn():
"""Start the OpenVPN server."""
try:
openvpn_manager = get_openvpn_manager()
if not openvpn_manager:
return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500
openvpn_manager.start_server()
if logger:
logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", "OpenVPN server started via API")
return jsonify({"status": "success", "message": "OpenVPN server started successfully"})
except Exception as e:
if logger:
logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to start OpenVPN server via API: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500
@isp_api.route("/openvpn/status", methods=["GET"])
@cross_origin()
def get_openvpn_status():
"""Get OpenVPN server status."""
try:
openvpn_manager = get_openvpn_manager()
if not openvpn_manager:
return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500
status = openvpn_manager.get_server_status()
clients = openvpn_manager.get_connected_clients()
return jsonify({
"status": "success",
"server_status": status.__dict__,
"connected_clients": clients
})
except Exception as e:
if logger:
logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to get OpenVPN server status: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500
@isp_api.route("/openvpn/simulate_client", methods=["POST"])
@cross_origin()
def simulate_openvpn_client():
"""Simulate a VPN client connection."""
try:
openvpn_manager = get_openvpn_manager()
if not openvpn_manager:
return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500
data = request.get_json()
client_id = data.get("client_id")
ip_address = data.get("ip_address")
if not client_id or not ip_address:
return jsonify({"status": "error", "message": "client_id and ip_address are required"}), 400
# Directly add client to the manager's clients dictionary
from core.openvpn_manager import VPNClient
client = VPNClient(
client_id=client_id,
common_name=client_id,
ip_address=ip_address,
connected_at=time.time(),
bytes_received=0,
bytes_sent=0,
status="connected",
routed_through_vpn=True
)
openvpn_manager.clients[client_id] = client
if logger:
logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", f"Simulated VPN client connection: {client_id} ({ip_address})")
return jsonify({"status": "success", "message": f"Simulated client {client_id} connected with IP {ip_address}"})
except Exception as e:
if logger:
logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to simulate VPN client connection: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500
@isp_api.route("/get_engines", methods=["GET"])
@cross_origin()
def get_engines():
"""Get references to NAT engine and Traffic Router for testing purposes."""
try:
global nat_engine, traffic_router
return jsonify({
"status": "success",
"nat_engine_initialized": nat_engine is not None,
"traffic_router_initialized": traffic_router is not None
})
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
@isp_api.route("/openvpn/generate_client_config", methods=["POST"])
@cross_origin()
def generate_client_config_route():
"""Generate OpenVPN client configuration file."""
try:
openvpn_manager = get_openvpn_manager()
if not openvpn_manager:
return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500
data = request.get_json()
client_name = data.get("client_name")
server_ip = data.get("server_ip")
if not client_name or not server_ip:
return jsonify({"status": "error", "message": "client_name and server_ip are required"}), 400
config_content = openvpn_manager.generate_client_config(client_name, server_ip)
if logger:
logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", f"Generated client config for {client_name}")
return jsonify({"status": "success", "config": config_content})
except Exception as e:
if logger:
logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to generate client config: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email import encoders
# Email configuration (replace with your actual details)
SMTP_SERVER = "smtp.example.com"
SMTP_PORT = 587
EMAIL_ADDRESS = "your_email@example.com"
EMAIL_PASSWORD = "your_email_password"
@isp_api.route("/api/openvpn/email_config", methods=["POST"])
@cross_origin()
def email_client_config():
"""Email OpenVPN client configuration file to the user."""
try:
data = request.get_json()
recipient_email = data.get("email")
client_name = data.get("client_name")
server_ip = data.get("server_ip")
if not recipient_email or not client_name or not server_ip:
return jsonify({"status": "error", "message": "email, client_name, and server_ip are required"}), 400
openvpn_manager = get_openvpn_manager()
if not openvpn_manager:
return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500
config_content = openvpn_manager.generate_client_config(client_name, server_ip)
config_filename = f"{client_name}.ovpn"
config_path = os.path.join(openvpn_manager.config_storage_path, config_filename)
with open(config_path, "w") as f:
f.write(config_content)
msg = MIMEMultipart()
msg["From"] = EMAIL_ADDRESS
msg["To"] = recipient_email
msg["Subject"] = "Your OpenVPN Configuration File"
body = "Please find your OpenVPN configuration file attached."
msg.attach(MIMEText(body, "plain"))
with open(config_path, "rb") as attachment:
part = MIMEBase("application", "octet-stream")
part.set_payload(attachment.read())
encoders.encode_base64(part)
part.add_header("Content-Disposition", f"attachment; filename= {config_filename}")
msg.attach(part)
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
server.send_message(msg)
os.remove(config_path) # Clean up the generated file
if logger:
logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", f"Emailed client config to {recipient_email} for {client_name}")
return jsonify({"status": "success", "message": f"Configuration emailed to {recipient_email}"})
except Exception as e:
if logger:
logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to email client config: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500
@isp_api.route("/api/openvpn/download_config/<client_name>", methods=["GET"])
@cross_origin()
def download_client_config(client_name):
"""Download OpenVPN client configuration file."""
try:
openvpn_manager = get_openvpn_manager()
if not openvpn_manager:
return jsonify({"status": "error", "message": "OpenVPN manager not initialized"}), 500
# Generate the config (or retrieve if already generated)
# For simplicity, we'll regenerate it here. In a real app, you might store and retrieve.
server_ip = request.host.split(":")[0] # Get current host IP
config_content = openvpn_manager.generate_client_config(client_name, server_ip)
config_filename = f"{client_name}.ovpn"
config_path = os.path.join(openvpn_manager.config_storage_path, config_filename)
with open(config_path, "w") as f:
f.write(config_content)
if logger:
logger.log(LogLevel.INFO, LogCategory.SYSTEM, "api", f"Serving client config for download: {client_name}")
return send_file(config_path, as_attachment=True, download_name=config_filename)
except Exception as e:
if logger:
logger.log(LogLevel.ERROR, LogCategory.SYSTEM, "api", f"Failed to download client config: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500