""" TCP Forwarding Engine Module for Outline VPN Handles TCP traffic forwarding and connection tracking with Shadowsocks protocol support """ import asyncio import logging import socket from typing import Dict, Set, Optional, Tuple from dataclasses import dataclass from datetime import datetime from .shadowsocks_protocol import ShadowsocksProtocol logger = logging.getLogger(__name__) @dataclass class OutlineTCPConnection: client_addr: Tuple[str, int] target_addr: Tuple[str, int] client_writer: asyncio.StreamWriter target_writer: asyncio.StreamWriter shadowsocks: ShadowsocksProtocol bytes_in: int = 0 bytes_out: int = 0 created_at: datetime = datetime.now() last_activity: datetime = datetime.now() class OutlineTCPForwardingEngine: def __init__(self, access_key: str): self.connections: Dict[str, OutlineTCPConnection] = {} self.active_ports: Set[int] = set() self._lock = asyncio.Lock() self.buffer_size = 8192 self.shadowsocks = ShadowsocksProtocol(access_key) async def create_connection(self, client_reader: asyncio.StreamReader, client_writer: asyncio.StreamWriter, target_host: str, target_port: int) -> Optional[OutlineTCPConnection]: """Create a new TCP connection with Shadowsocks encryption""" try: # Get client information client_addr = client_writer.get_extra_info('peername') # Connect to target target_reader, target_writer = await asyncio.open_connection( target_host, target_port ) # Create connection object conn = OutlineTCPConnection( client_addr=client_addr, target_addr=(target_host, target_port), client_writer=client_writer, target_writer=target_writer, shadowsocks=self.shadowsocks ) # Store connection conn_id = f"{client_addr[0]}:{client_addr[1]}-{target_host}:{target_port}" async with self._lock: self.connections[conn_id] = conn # Start bidirectional forwarding with encryption asyncio.create_task(self._forward_stream( client_reader, target_writer, conn, 'in')) asyncio.create_task(self._forward_stream( target_reader, client_writer, conn, 'out')) logger.info(f"Created Outline TCP connection: {conn_id}") return conn except Exception as e: logger.error(f"Error creating Outline TCP connection: {e}") if client_writer: client_writer.close() await client_writer.wait_closed() return None async def _forward_stream(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, conn: OutlineTCPConnection, direction: str): """Forward data with Shadowsocks encryption/decryption""" try: while True: data = await reader.read(self.buffer_size) if not data: break # Handle encryption/decryption if direction == 'in': # Decrypt incoming data from client data = conn.shadowsocks._decrypt_packet(data) conn.bytes_in += len(data) else: # Encrypt outgoing data to client data = conn.shadowsocks._encrypt_packet(data) conn.bytes_out += len(data) writer.write(data) await writer.drain() conn.last_activity = datetime.now() except Exception as e: logger.error(f"Error forwarding Outline data: {e}") finally: writer.close() await writer.wait_closed() # Clean up connection conn_id = (f"{conn.client_addr[0]}:{conn.client_addr[1]}-" f"{conn.target_addr[0]}:{conn.target_addr[1]}") async with self._lock: if conn_id in self.connections: del self.connections[conn_id] async def cleanup_inactive(self, timeout: int = 300): """Clean up inactive connections""" while True: try: now = datetime.now() to_remove = [] async with self._lock: for conn_id, conn in self.connections.items(): if (now - conn.last_activity).seconds > timeout: to_remove.append(conn_id) for conn_id in to_remove: if conn_id in self.connections: conn = self.connections[conn_id] conn.client_writer.close() conn.target_writer.close() del self.connections[conn_id] logger.info(f"Cleaned up inactive connection: {conn_id}") await asyncio.sleep(60) # Check every minute except Exception as e: logger.error(f"Error in connection cleanup: {e}") await asyncio.sleep(60) # Retry after error def get_connection_stats(self) -> Dict[str, Dict]: """Get statistics for all active connections""" stats = {} for conn_id, conn in self.connections.items(): stats[conn_id] = { 'bytes_in': conn.bytes_in, 'bytes_out': conn.bytes_out, 'created_at': conn.created_at.isoformat(), 'last_activity': conn.last_activity.isoformat(), 'client_addr': f"{conn.client_addr[0]}:{conn.client_addr[1]}", 'target_addr': f"{conn.target_addr[0]}:{conn.target_addr[1]}" } return stats