""" Traffic Router Module Handles routing of all client traffic through VPN for free data access using async TCP sockets. Optimized for performance and scalability. """ import asyncio import socket import logging from typing import Dict, Any, Optional logger = logging.getLogger(__name__) class TrafficRouter: """Manages traffic routing for VPN clients using async TCP sockets""" def __init__(self, config: Dict[str, Any], nat_engine: Any = None): self.config = config self.is_running = False self.vpn_host = self.config.get("vpn_host", "127.0.0.1") self.vpn_port = self.config.get("vpn_port", 9000) self.internet_host = self.config.get("internet_host", "0.0.0.0") self.internet_port = self.config.get("internet_port", 9001) self.nat_engine = nat_engine self.loop = None self.vpn_server = None self.internet_server = None self.connections = set() self.stats = { "total_connections": 0, "active_connections": 0, "bytes_forwarded": 0, "errors": 0 } async def start(self): """Start the traffic router using asyncio TCP servers""" if self.is_running: logger.warning("Traffic Router is already running") return True self.is_running = True self.loop = asyncio.get_event_loop() self.vpn_server = await asyncio.start_server( lambda r, w: self._handle_connection(r, w, "VPN"), self.vpn_host, self.vpn_port) self.internet_server = await asyncio.start_server( lambda r, w: self._handle_connection(r, w, "Internet"), self.internet_host, self.internet_port) logger.info(f"Traffic Router started on TCP endpoints: {self.vpn_host}:{self.vpn_port} and {self.internet_host}:{self.internet_port}") return True async def stop(self): """Stop the traffic router""" logger.info("Stopping Traffic Router...") self.is_running = False if self.vpn_server: self.vpn_server.close() await self.vpn_server.wait_closed() if self.internet_server: self.internet_server.close() await self.internet_server.wait_closed() for conn in list(self.connections): conn.close() logger.info("Traffic Router stopped") return True async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, source_name: str): """Handle a new connection and forward data asynchronously.""" peer = writer.get_extra_info("peername") logger.info(f"Accepted connection from {peer} on {source_name}") self.connections.add(writer) try: while self.is_running: data = await reader.read(4096) if not data: break processed_data = None if self.nat_engine: if source_name == "VPN": processed_data = self.nat_engine.process_outbound_packet(data) elif source_name == "Internet": processed_data = self.nat_engine.process_inbound_packet(data) if processed_data: await self._forward_data(processed_data, source_name) elif not self.nat_engine: await self._forward_data(data, source_name) self.stats["bytes_forwarded"] += len(data) except Exception as e: self.stats["errors"] += 1 logger.error(f"Error in {source_name} connection: {e}") finally: writer.close() await writer.wait_closed() self.connections.discard(writer) async def _forward_data(self, data: bytes, source_name: str): """Forward data to the opposite endpoint.""" # This is a placeholder for actual forwarding logic. # You may want to implement connection pooling or load balancing here. # For demo, just log the forwarding event. logger.debug(f"Forwarded {len(data)} bytes from {source_name}") def get_stats(self) -> Dict[str, Any]: """Get traffic router statistics""" return { "is_running": self.is_running, "vpn_host": self.vpn_host, "vpn_port": self.vpn_port, "internet_host": self.internet_host, "internet_port": self.internet_port, "total_connections": self.stats["total_connections"], "active_connections": len(self.connections), "bytes_forwarded": self.stats["bytes_forwarded"], "errors": self.stats["errors"] } def set_components(self, nat_engine: Any = None): """Set references to other components for inter-operation.""" if nat_engine: self.nat_engine = nat_engine