NNT2 / core /traffic_router.py
Fred808's picture
Upload 54 files
47a9eda verified
"""
Traffic Router Module
Handles routing of all client traffic through VPN for free data access
"""
import os
import json
import subprocess
import threading
import time
import logging
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
import ipaddress
logger = logging.getLogger(__name__)
@dataclass
class RoutingRule:
"""Represents a traffic routing rule"""
rule_id: str
source_network: str
destination_network: str
gateway: str
interface: str
priority: int
enabled: bool = True
created_at: float = None
def __post_init__(self):
if self.created_at is None:
self.created_at = time.time()
@dataclass
class ClientRoute:
"""Represents a client's routing configuration"""
client_id: str
client_ip: str
vpn_gateway: str
original_gateway: str
routes_applied: List[str]
status: str = "active"
created_at: float = None
def __post_init__(self):
if self.created_at is None:
self.created_at = time.time()
class TrafficRouter:
"""Manages traffic routing for VPN clients to enable free data access"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.routing_rules: Dict[str, RoutingRule] = {}
self.client_routes: Dict[str, ClientRoute] = {}
self.vpn_network = ipaddress.IPv4Network("10.8.0.0/24")
self.vpn_gateway = "10.8.0.1"
self.is_running = False
# Integration with other components
self.nat_engine = None
self.firewall = None
self.dhcp_server = None
# Default routing rules for free data access
self.default_rules = [
{
"rule_id": "default_vpn_route",
"source_network": "0.0.0.0/0",
"destination_network": "0.0.0.0/0",
"gateway": self.vpn_gateway,
"interface": "tun0",
"priority": 100
},
{
"rule_id": "local_network_direct",
"source_network": "192.168.0.0/16",
"destination_network": "192.168.0.0/16",
"gateway": "direct",
"interface": "eth0",
"priority": 50
},
{
"rule_id": "vpn_network_direct",
"source_network": "10.8.0.0/24",
"destination_network": "10.8.0.0/24",
"gateway": "direct",
"interface": "tun0",
"priority": 50
}
]
logger.info("Traffic Router initialized")
def set_components(self, nat_engine=None, firewall=None, dhcp_server=None):
"""Set references to other ISP stack components"""
self.nat_engine = nat_engine
self.firewall = firewall
self.dhcp_server = dhcp_server
logger.info("Traffic Router components set")
def start(self):
"""Start the traffic router"""
try:
if self.is_running:
logger.warning("Traffic Router is already running")
return True
# Initialize default routing rules
self._initialize_default_rules()
# Setup VPN routing infrastructure
self._setup_vpn_routing()
self.is_running = True
logger.info("Traffic Router started successfully")
return True
except Exception as e:
logger.error(f"Error starting Traffic Router: {e}")
return False
def stop(self):
"""Stop the traffic router"""
try:
if not self.is_running:
logger.warning("Traffic Router is not running")
return True
# Clean up all client routes
for client_id in list(self.client_routes.keys()):
self.remove_client_route(client_id)
# Clean up routing rules
self._cleanup_routing_rules()
self.is_running = False
logger.info("Traffic Router stopped")
return True
except Exception as e:
logger.error(f"Error stopping Traffic Router: {e}")
return False
def _initialize_default_rules(self):
"""Initialize default routing rules"""
try:
for rule_config in self.default_rules:
rule = RoutingRule(**rule_config)
self.routing_rules[rule.rule_id] = rule
logger.debug(f"Initialized routing rule: {rule.rule_id}")
logger.info(f"Initialized {len(self.default_rules)} default routing rules")
except Exception as e:
logger.error(f"Error initializing default rules: {e}")
def _setup_vpn_routing(self):
"""Setup VPN routing infrastructure"""
try:
# Enable IP forwarding
self._enable_ip_forwarding()
# Setup iptables rules for VPN traffic
self._setup_iptables_rules()
# Configure routing tables
self._configure_routing_tables()
logger.info("VPN routing infrastructure setup completed")
except Exception as e:
logger.error(f"Error setting up VPN routing: {e}")
def _enable_ip_forwarding(self):
"""Enable IP forwarding on the system"""
try:
# Enable IPv4 forwarding
subprocess.run([
"/usr/bin/sudo", "sysctl", "-w", "net.ipv4.ip_forward=1"
], check=True, capture_output=True)
# Make it persistent
with open("/tmp/99-ip-forward.conf", "w") as f:
f.write("net.ipv4.ip_forward=1\n")
subprocess.run([
"/usr/bin/sudo", "cp", "/tmp/99-ip-forward.conf", "/etc/sysctl.d/"
], check=True, capture_output=True)
logger.info("IP forwarding enabled")
except subprocess.CalledProcessError as e:
logger.warning(f"Could not enable IP forwarding: {e}")
except Exception as e:
logger.error(f"Error enabling IP forwarding: {e}")
def _setup_iptables_rules(self):
"""Setup iptables rules for VPN traffic routing"""
try:
# Note: In a production environment, these would be actual iptables commands
# For this implementation, we'll log the commands that would be executed
iptables_rules = [
# Allow VPN traffic
"iptables -A INPUT -i tun0 -j ACCEPT",
"iptables -A FORWARD -i tun0 -j ACCEPT",
"iptables -A OUTPUT -o tun0 -j ACCEPT",
# NAT rules for VPN clients
"iptables -t nat -A POSTROUTING -s 10.8.0.0/24 -o eth0 -j MASQUERADE",
"iptables -t nat -A POSTROUTING -s 10.8.0.0/24 -o tun0 -j MASQUERADE",
# Forward VPN traffic
"iptables -A FORWARD -s 10.8.0.0/24 -j ACCEPT",
"iptables -A FORWARD -d 10.8.0.0/24 -j ACCEPT",
# Mark VPN traffic for special routing
"iptables -t mangle -A PREROUTING -s 10.8.0.0/24 -j MARK --set-mark 100",
"iptables -t mangle -A OUTPUT -s 10.8.0.0/24 -j MARK --set-mark 100"
]
for rule in iptables_rules:
logger.debug(f"Would execute: {rule}")
# In a real implementation, you would execute these with subprocess
# subprocess.run(rule.split(), check=True, capture_output=True)
logger.info("Iptables rules configured for VPN routing")
except Exception as e:
logger.error(f"Error setting up iptables rules: {e}")
def _configure_routing_tables(self):
"""Configure custom routing tables for VPN traffic"""
try:
# Create custom routing table for VPN traffic
routing_commands = [
# Add custom routing table
"echo '100 vpn_table' >> /etc/iproute2/rt_tables",
# Add routes to custom table
"ip route add default via 10.8.0.1 dev tun0 table vpn_table",
"ip route add 10.8.0.0/24 dev tun0 table vpn_table",
# Add routing rules
"ip rule add fwmark 100 table vpn_table",
"ip rule add from 10.8.0.0/24 table vpn_table"
]
for cmd in routing_commands:
logger.debug(f"Would execute: {cmd}")
# In a real implementation, you would execute these with subprocess
# subprocess.run(cmd.split(), check=True, capture_output=True)
logger.info("Custom routing tables configured")
except Exception as e:
logger.error(f"Error configuring routing tables: {e}")
def add_client_route(self, client_id: str, client_ip: str, original_gateway: str = None) -> bool:
"""Add routing configuration for a VPN client"""
try:
if client_id in self.client_routes:
logger.warning(f"Client route already exists for {client_id}")
return True
# Create client route configuration
client_route = ClientRoute(
client_id=client_id,
client_ip=client_ip,
vpn_gateway=self.vpn_gateway,
original_gateway=original_gateway or "auto",
routes_applied=[]
)
# Apply routing rules for this client
routes_applied = self._apply_client_routing_rules(client_route)
client_route.routes_applied = routes_applied
# Store client route
self.client_routes[client_id] = client_route
# Integrate with NAT engine if available
if self.nat_engine:
self._configure_nat_for_client(client_route)
logger.info(f"Added routing configuration for client {client_id} ({client_ip})")
return True
except Exception as e:
logger.error(f"Error adding client route for {client_id}: {e}")
return False
def remove_client_route(self, client_id: str) -> bool:
"""Remove routing configuration for a VPN client"""
try:
if client_id not in self.client_routes:
logger.warning(f"No route configuration found for client {client_id}")
return True
client_route = self.client_routes[client_id]
# Remove applied routing rules
self._remove_client_routing_rules(client_route)
# Remove NAT configuration if available
if self.nat_engine:
self._remove_nat_for_client(client_route)
# Remove from storage
del self.client_routes[client_id]
logger.info(f"Removed routing configuration for client {client_id}")
return True
except Exception as e:
logger.error(f"Error removing client route for {client_id}: {e}")
return False
def _apply_client_routing_rules(self, client_route: ClientRoute) -> List[str]:
"""Apply routing rules for a specific client"""
applied_routes = []
try:
# Route all client traffic through VPN
route_commands = [
f"ip route add default via {client_route.vpn_gateway} dev tun0 src {client_route.client_ip}",
f"ip route add {client_route.client_ip}/32 dev tun0",
f"iptables -t nat -A POSTROUTING -s {client_route.client_ip} -j MASQUERADE"
]
for cmd in route_commands:
logger.debug(f"Would execute for client {client_route.client_id}: {cmd}")
applied_routes.append(cmd)
# In a real implementation, you would execute these with subprocess
logger.debug(f"Applied {len(applied_routes)} routing rules for client {client_route.client_id}")
except Exception as e:
logger.error(f"Error applying routing rules for client {client_route.client_id}: {e}")
return applied_routes
def _remove_client_routing_rules(self, client_route: ClientRoute):
"""Remove routing rules for a specific client"""
try:
# Remove the applied routes (reverse of apply)
for route_cmd in client_route.routes_applied:
# Convert add commands to delete commands
delete_cmd = route_cmd.replace(" add ", " del ").replace(" -A ", " -D ")
logger.debug(f"Would execute for cleanup: {delete_cmd}")
# In a real implementation, you would execute these with subprocess
logger.debug(f"Removed routing rules for client {client_route.client_id}")
except Exception as e:
logger.error(f"Error removing routing rules for client {client_route.client_id}: {e}")
def _configure_nat_for_client(self, client_route: ClientRoute):
"""Configure NAT for VPN client"""
try:
if not self.nat_engine:
return
# Register client with NAT engine for special handling
nat_config = {
"client_id": client_route.client_id,
"client_ip": client_route.client_ip,
"vpn_gateway": client_route.vpn_gateway,
"routing_mode": "vpn_tunnel"
}
# This would integrate with the existing NAT engine
logger.debug(f"Would configure NAT for VPN client: {nat_config}")
except Exception as e:
logger.error(f"Error configuring NAT for client {client_route.client_id}: {e}")
def _remove_nat_for_client(self, client_route: ClientRoute):
"""Remove NAT configuration for VPN client"""
try:
if not self.nat_engine:
return
# Remove client from NAT engine
logger.debug(f"Would remove NAT configuration for client {client_route.client_id}")
except Exception as e:
logger.error(f"Error removing NAT for client {client_route.client_id}: {e}")
def _cleanup_routing_rules(self):
"""Clean up all routing rules"""
try:
# Remove custom routing table entries
cleanup_commands = [
"ip rule del fwmark 100 table vpn_table",
"ip rule del from 10.8.0.0/24 table vpn_table",
"ip route flush table vpn_table"
]
for cmd in cleanup_commands:
logger.debug(f"Would execute for cleanup: {cmd}")
# In a real implementation, you would execute these with subprocess
logger.info("Routing rules cleaned up")
except Exception as e:
logger.error(f"Error cleaning up routing rules: {e}")
def get_client_routes(self) -> List[Dict[str, Any]]:
"""Get list of all client routes"""
return [asdict(route) for route in self.client_routes.values()]
def get_routing_rules(self) -> List[Dict[str, Any]]:
"""Get list of all routing rules"""
return [asdict(rule) for rule in self.routing_rules.values()]
def get_stats(self) -> Dict[str, Any]:
"""Get traffic router statistics"""
return {
"is_running": self.is_running,
"total_clients": len(self.client_routes),
"active_clients": len([r for r in self.client_routes.values() if r.status == "active"]),
"total_rules": len(self.routing_rules),
"enabled_rules": len([r for r in self.routing_rules.values() if r.enabled]),
"vpn_network": str(self.vpn_network),
"vpn_gateway": self.vpn_gateway
}
def update_client_status(self, client_id: str, status: str) -> bool:
"""Update the status of a client route"""
try:
if client_id not in self.client_routes:
return False
self.client_routes[client_id].status = status
logger.debug(f"Updated client {client_id} status to {status}")
return True
except Exception as e:
logger.error(f"Error updating client status: {e}")
return False
def is_client_routed(self, client_id: str) -> bool:
"""Check if a client is currently routed through VPN"""
return (client_id in self.client_routes and
self.client_routes[client_id].status == "active")
def get_client_route_info(self, client_id: str) -> Optional[Dict[str, Any]]:
"""Get routing information for a specific client"""
if client_id in self.client_routes:
return asdict(self.client_routes[client_id])
return None