Spaces:
Paused
Paused
| """ | |
| Traffic Router Module | |
| Handles routing of all client traffic through VPN for free data access using async TCP sockets. | |
| Optimized for performance and scalability. | |
| """ | |
| import asyncio | |
| import socket | |
| import logging | |
| from typing import Dict, Any, Optional | |
| logger = logging.getLogger(__name__) | |
| class TrafficRouter: | |
| """Manages traffic routing for VPN clients using async TCP sockets""" | |
| def __init__(self, config: Dict[str, Any], nat_engine: Any = None): | |
| self.config = config | |
| self.is_running = False | |
| self.vpn_host = self.config.get("vpn_host", "127.0.0.1") | |
| self.vpn_port = self.config.get("vpn_port", 9000) | |
| self.internet_host = self.config.get("internet_host", "0.0.0.0") | |
| self.internet_port = self.config.get("internet_port", 9001) | |
| self.nat_engine = nat_engine | |
| self.loop = None | |
| self.vpn_server = None | |
| self.internet_server = None | |
| self.connections = set() | |
| self.stats = { | |
| "total_connections": 0, | |
| "active_connections": 0, | |
| "bytes_forwarded": 0, | |
| "errors": 0 | |
| } | |
| async def start(self): | |
| """Start the traffic router using asyncio TCP servers""" | |
| if self.is_running: | |
| logger.warning("Traffic Router is already running") | |
| return True | |
| self.is_running = True | |
| self.loop = asyncio.get_event_loop() | |
| self.vpn_server = await asyncio.start_server( | |
| lambda r, w: self._handle_connection(r, w, "VPN"), | |
| self.vpn_host, self.vpn_port) | |
| self.internet_server = await asyncio.start_server( | |
| lambda r, w: self._handle_connection(r, w, "Internet"), | |
| self.internet_host, self.internet_port) | |
| logger.info(f"Traffic Router started on TCP endpoints: {self.vpn_host}:{self.vpn_port} and {self.internet_host}:{self.internet_port}") | |
| return True | |
| async def stop(self): | |
| """Stop the traffic router""" | |
| logger.info("Stopping Traffic Router...") | |
| self.is_running = False | |
| if self.vpn_server: | |
| self.vpn_server.close() | |
| await self.vpn_server.wait_closed() | |
| if self.internet_server: | |
| self.internet_server.close() | |
| await self.internet_server.wait_closed() | |
| for conn in list(self.connections): | |
| conn.close() | |
| logger.info("Traffic Router stopped") | |
| return True | |
| async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, source_name: str): | |
| """Handle a new connection and forward data asynchronously.""" | |
| peer = writer.get_extra_info("peername") | |
| logger.info(f"Accepted connection from {peer} on {source_name}") | |
| self.connections.add(writer) | |
| try: | |
| while self.is_running: | |
| data = await reader.read(4096) | |
| if not data: | |
| break | |
| processed_data = None | |
| if self.nat_engine: | |
| if source_name == "VPN": | |
| processed_data = self.nat_engine.process_outbound_packet(data) | |
| elif source_name == "Internet": | |
| processed_data = self.nat_engine.process_inbound_packet(data) | |
| if processed_data: | |
| await self._forward_data(processed_data, source_name) | |
| elif not self.nat_engine: | |
| await self._forward_data(data, source_name) | |
| self.stats["bytes_forwarded"] += len(data) | |
| except Exception as e: | |
| self.stats["errors"] += 1 | |
| logger.error(f"Error in {source_name} connection: {e}") | |
| finally: | |
| writer.close() | |
| await writer.wait_closed() | |
| self.connections.discard(writer) | |
| async def _forward_data(self, data: bytes, source_name: str): | |
| """Forward data to the opposite endpoint.""" | |
| # This is a placeholder for actual forwarding logic. | |
| # You may want to implement connection pooling or load balancing here. | |
| # For demo, just log the forwarding event. | |
| logger.debug(f"Forwarded {len(data)} bytes from {source_name}") | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Get traffic router statistics""" | |
| return { | |
| "is_running": self.is_running, | |
| "vpn_host": self.vpn_host, | |
| "vpn_port": self.vpn_port, | |
| "internet_host": self.internet_host, | |
| "internet_port": self.internet_port, | |
| "total_connections": self.stats["total_connections"], | |
| "active_connections": len(self.connections), | |
| "bytes_forwarded": self.stats["bytes_forwarded"], | |
| "errors": self.stats["errors"] | |
| } | |
| def set_components(self, nat_engine: Any = None): | |
| """Set references to other components for inter-operation.""" | |
| if nat_engine: | |
| self.nat_engine = nat_engine | |