HINTECH / core /traffic_router.py
Factor Studios
Upload 73 files
aaaaa79 verified
"""
Traffic Router Module
Handles routing of all client traffic through VPN for free data access using async TCP sockets.
Optimized for performance and scalability.
"""
import asyncio
import socket
import logging
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
class TrafficRouter:
"""Manages traffic routing for VPN clients using async TCP sockets"""
def __init__(self, config: Dict[str, Any], nat_engine: Any = None):
self.config = config
self.is_running = False
self.vpn_host = self.config.get("vpn_host", "127.0.0.1")
self.vpn_port = self.config.get("vpn_port", 9000)
self.internet_host = self.config.get("internet_host", "0.0.0.0")
self.internet_port = self.config.get("internet_port", 9001)
self.nat_engine = nat_engine
self.loop = None
self.vpn_server = None
self.internet_server = None
self.connections = set()
self.stats = {
"total_connections": 0,
"active_connections": 0,
"bytes_forwarded": 0,
"errors": 0
}
async def start(self):
"""Start the traffic router using asyncio TCP servers"""
if self.is_running:
logger.warning("Traffic Router is already running")
return True
self.is_running = True
self.loop = asyncio.get_event_loop()
self.vpn_server = await asyncio.start_server(
lambda r, w: self._handle_connection(r, w, "VPN"),
self.vpn_host, self.vpn_port)
self.internet_server = await asyncio.start_server(
lambda r, w: self._handle_connection(r, w, "Internet"),
self.internet_host, self.internet_port)
logger.info(f"Traffic Router started on TCP endpoints: {self.vpn_host}:{self.vpn_port} and {self.internet_host}:{self.internet_port}")
return True
async def stop(self):
"""Stop the traffic router"""
logger.info("Stopping Traffic Router...")
self.is_running = False
if self.vpn_server:
self.vpn_server.close()
await self.vpn_server.wait_closed()
if self.internet_server:
self.internet_server.close()
await self.internet_server.wait_closed()
for conn in list(self.connections):
conn.close()
logger.info("Traffic Router stopped")
return True
async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, source_name: str):
"""Handle a new connection and forward data asynchronously."""
peer = writer.get_extra_info("peername")
logger.info(f"Accepted connection from {peer} on {source_name}")
self.connections.add(writer)
try:
while self.is_running:
data = await reader.read(4096)
if not data:
break
processed_data = None
if self.nat_engine:
if source_name == "VPN":
processed_data = self.nat_engine.process_outbound_packet(data)
elif source_name == "Internet":
processed_data = self.nat_engine.process_inbound_packet(data)
if processed_data:
await self._forward_data(processed_data, source_name)
elif not self.nat_engine:
await self._forward_data(data, source_name)
self.stats["bytes_forwarded"] += len(data)
except Exception as e:
self.stats["errors"] += 1
logger.error(f"Error in {source_name} connection: {e}")
finally:
writer.close()
await writer.wait_closed()
self.connections.discard(writer)
async def _forward_data(self, data: bytes, source_name: str):
"""Forward data to the opposite endpoint."""
# This is a placeholder for actual forwarding logic.
# You may want to implement connection pooling or load balancing here.
# For demo, just log the forwarding event.
logger.debug(f"Forwarded {len(data)} bytes from {source_name}")
def get_stats(self) -> Dict[str, Any]:
"""Get traffic router statistics"""
return {
"is_running": self.is_running,
"vpn_host": self.vpn_host,
"vpn_port": self.vpn_port,
"internet_host": self.internet_host,
"internet_port": self.internet_port,
"total_connections": self.stats["total_connections"],
"active_connections": len(self.connections),
"bytes_forwarded": self.stats["bytes_forwarded"],
"errors": self.stats["errors"]
}
def set_components(self, nat_engine: Any = None):
"""Set references to other components for inter-operation."""
if nat_engine:
self.nat_engine = nat_engine