Spaces:
Runtime error
Runtime error
| """ | |
| TCP Forwarding Engine Module for Outline VPN | |
| Handles TCP traffic forwarding and connection tracking with Shadowsocks protocol support | |
| """ | |
| import asyncio | |
| import logging | |
| import socket | |
| from typing import Dict, Set, Optional, Tuple | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from .shadowsocks_protocol import ShadowsocksProtocol | |
| logger = logging.getLogger(__name__) | |
| class OutlineTCPConnection: | |
| client_addr: Tuple[str, int] | |
| target_addr: Tuple[str, int] | |
| client_writer: asyncio.StreamWriter | |
| target_writer: asyncio.StreamWriter | |
| shadowsocks: ShadowsocksProtocol | |
| bytes_in: int = 0 | |
| bytes_out: int = 0 | |
| created_at: datetime = datetime.now() | |
| last_activity: datetime = datetime.now() | |
| class OutlineTCPForwardingEngine: | |
| def __init__(self, access_key: str): | |
| self.connections: Dict[str, OutlineTCPConnection] = {} | |
| self.active_ports: Set[int] = set() | |
| self._lock = asyncio.Lock() | |
| self.buffer_size = 8192 | |
| self.shadowsocks = ShadowsocksProtocol(access_key) | |
| async def create_connection(self, | |
| client_reader: asyncio.StreamReader, | |
| client_writer: asyncio.StreamWriter, | |
| target_host: str, | |
| target_port: int) -> Optional[OutlineTCPConnection]: | |
| """Create a new TCP connection with Shadowsocks encryption""" | |
| try: | |
| # Get client information | |
| client_addr = client_writer.get_extra_info('peername') | |
| # Connect to target | |
| target_reader, target_writer = await asyncio.open_connection( | |
| target_host, target_port | |
| ) | |
| # Create connection object | |
| conn = OutlineTCPConnection( | |
| client_addr=client_addr, | |
| target_addr=(target_host, target_port), | |
| client_writer=client_writer, | |
| target_writer=target_writer, | |
| shadowsocks=self.shadowsocks | |
| ) | |
| # Store connection | |
| conn_id = f"{client_addr[0]}:{client_addr[1]}-{target_host}:{target_port}" | |
| async with self._lock: | |
| self.connections[conn_id] = conn | |
| # Start bidirectional forwarding with encryption | |
| asyncio.create_task(self._forward_stream( | |
| client_reader, target_writer, conn, 'in')) | |
| asyncio.create_task(self._forward_stream( | |
| target_reader, client_writer, conn, 'out')) | |
| logger.info(f"Created Outline TCP connection: {conn_id}") | |
| return conn | |
| except Exception as e: | |
| logger.error(f"Error creating Outline TCP connection: {e}") | |
| if client_writer: | |
| client_writer.close() | |
| await client_writer.wait_closed() | |
| return None | |
| async def _forward_stream(self, | |
| reader: asyncio.StreamReader, | |
| writer: asyncio.StreamWriter, | |
| conn: OutlineTCPConnection, | |
| direction: str): | |
| """Forward data with Shadowsocks encryption/decryption""" | |
| try: | |
| while True: | |
| data = await reader.read(self.buffer_size) | |
| if not data: | |
| break | |
| # Handle encryption/decryption | |
| if direction == 'in': | |
| # Decrypt incoming data from client | |
| data = conn.shadowsocks._decrypt_packet(data) | |
| conn.bytes_in += len(data) | |
| else: | |
| # Encrypt outgoing data to client | |
| data = conn.shadowsocks._encrypt_packet(data) | |
| conn.bytes_out += len(data) | |
| writer.write(data) | |
| await writer.drain() | |
| conn.last_activity = datetime.now() | |
| except Exception as e: | |
| logger.error(f"Error forwarding Outline data: {e}") | |
| finally: | |
| writer.close() | |
| await writer.wait_closed() | |
| # Clean up connection | |
| conn_id = (f"{conn.client_addr[0]}:{conn.client_addr[1]}-" | |
| f"{conn.target_addr[0]}:{conn.target_addr[1]}") | |
| async with self._lock: | |
| if conn_id in self.connections: | |
| del self.connections[conn_id] | |
| async def cleanup_inactive(self, timeout: int = 300): | |
| """Clean up inactive connections""" | |
| while True: | |
| try: | |
| now = datetime.now() | |
| to_remove = [] | |
| async with self._lock: | |
| for conn_id, conn in self.connections.items(): | |
| if (now - conn.last_activity).seconds > timeout: | |
| to_remove.append(conn_id) | |
| for conn_id in to_remove: | |
| if conn_id in self.connections: | |
| conn = self.connections[conn_id] | |
| conn.client_writer.close() | |
| conn.target_writer.close() | |
| del self.connections[conn_id] | |
| logger.info(f"Cleaned up inactive connection: {conn_id}") | |
| await asyncio.sleep(60) # Check every minute | |
| except Exception as e: | |
| logger.error(f"Error in connection cleanup: {e}") | |
| await asyncio.sleep(60) # Retry after error | |
| def get_connection_stats(self) -> Dict[str, Dict]: | |
| """Get statistics for all active connections""" | |
| stats = {} | |
| for conn_id, conn in self.connections.items(): | |
| stats[conn_id] = { | |
| 'bytes_in': conn.bytes_in, | |
| 'bytes_out': conn.bytes_out, | |
| 'created_at': conn.created_at.isoformat(), | |
| 'last_activity': conn.last_activity.isoformat(), | |
| 'client_addr': f"{conn.client_addr[0]}:{conn.client_addr[1]}", | |
| 'target_addr': f"{conn.target_addr[0]}:{conn.target_addr[1]}" | |
| } | |
| return stats | |