Spaces:
Paused
Paused
File size: 4,946 Bytes
aaaaa79 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
"""
Traffic Router Module
Handles routing of all client traffic through VPN for free data access using async TCP sockets.
Optimized for performance and scalability.
"""
import asyncio
import socket
import logging
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class TrafficRouter:
"""Manages traffic routing for VPN clients using async TCP sockets"""
def __init__(self, config: Dict[str, Any], nat_engine: Any = None):
self.config = config
self.is_running = False
self.vpn_host = self.config.get("vpn_host", "127.0.0.1")
self.vpn_port = self.config.get("vpn_port", 9000)
self.internet_host = self.config.get("internet_host", "0.0.0.0")
self.internet_port = self.config.get("internet_port", 9001)
self.nat_engine = nat_engine
self.loop = None
self.vpn_server = None
self.internet_server = None
self.connections = set()
self.stats = {
"total_connections": 0,
"active_connections": 0,
"bytes_forwarded": 0,
"errors": 0
}
async def start(self):
"""Start the traffic router using asyncio TCP servers"""
if self.is_running:
logger.warning("Traffic Router is already running")
return True
self.is_running = True
self.loop = asyncio.get_event_loop()
self.vpn_server = await asyncio.start_server(
lambda r, w: self._handle_connection(r, w, "VPN"),
self.vpn_host, self.vpn_port)
self.internet_server = await asyncio.start_server(
lambda r, w: self._handle_connection(r, w, "Internet"),
self.internet_host, self.internet_port)
logger.info(f"Traffic Router started on TCP endpoints: {self.vpn_host}:{self.vpn_port} and {self.internet_host}:{self.internet_port}")
return True
async def stop(self):
"""Stop the traffic router"""
logger.info("Stopping Traffic Router...")
self.is_running = False
if self.vpn_server:
self.vpn_server.close()
await self.vpn_server.wait_closed()
if self.internet_server:
self.internet_server.close()
await self.internet_server.wait_closed()
for conn in list(self.connections):
conn.close()
logger.info("Traffic Router stopped")
return True
async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, source_name: str):
"""Handle a new connection and forward data asynchronously."""
peer = writer.get_extra_info("peername")
logger.info(f"Accepted connection from {peer} on {source_name}")
self.connections.add(writer)
try:
while self.is_running:
data = await reader.read(4096)
if not data:
break
processed_data = None
if self.nat_engine:
if source_name == "VPN":
processed_data = self.nat_engine.process_outbound_packet(data)
elif source_name == "Internet":
processed_data = self.nat_engine.process_inbound_packet(data)
if processed_data:
await self._forward_data(processed_data, source_name)
elif not self.nat_engine:
await self._forward_data(data, source_name)
self.stats["bytes_forwarded"] += len(data)
except Exception as e:
self.stats["errors"] += 1
logger.error(f"Error in {source_name} connection: {e}")
finally:
writer.close()
await writer.wait_closed()
self.connections.discard(writer)
async def _forward_data(self, data: bytes, source_name: str):
"""Forward data to the opposite endpoint."""
# This is a placeholder for actual forwarding logic.
# You may want to implement connection pooling or load balancing here.
# For demo, just log the forwarding event.
logger.debug(f"Forwarded {len(data)} bytes from {source_name}")
def get_stats(self) -> Dict[str, Any]:
"""Get traffic router statistics"""
return {
"is_running": self.is_running,
"vpn_host": self.vpn_host,
"vpn_port": self.vpn_port,
"internet_host": self.internet_host,
"internet_port": self.internet_port,
"total_connections": self.stats["total_connections"],
"active_connections": len(self.connections),
"bytes_forwarded": self.stats["bytes_forwarded"],
"errors": self.stats["errors"]
}
def set_components(self, nat_engine: Any = None):
"""Set references to other components for inter-operation."""
if nat_engine:
self.nat_engine = nat_engine
|