| """
|
| TCP Forwarding Engine Module for Outline VPN
|
|
|
| Handles TCP traffic forwarding and connection tracking with Shadowsocks protocol support
|
| """
|
|
|
| import asyncio
|
| import logging
|
| import socket
|
| from typing import Dict, Set, Optional, Tuple
|
| from dataclasses import dataclass
|
| from datetime import datetime
|
| from .shadowsocks_protocol import ShadowsocksProtocol
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| @dataclass
|
| class OutlineTCPConnection:
|
| client_addr: Tuple[str, int]
|
| target_addr: Tuple[str, int]
|
| client_writer: asyncio.StreamWriter
|
| target_writer: asyncio.StreamWriter
|
| shadowsocks: ShadowsocksProtocol
|
| bytes_in: int = 0
|
| bytes_out: int = 0
|
| created_at: datetime = datetime.now()
|
| last_activity: datetime = datetime.now()
|
|
|
| class OutlineTCPForwardingEngine:
|
| def __init__(self, access_key: str):
|
| self.connections: Dict[str, OutlineTCPConnection] = {}
|
| self.active_ports: Set[int] = set()
|
| self._lock = asyncio.Lock()
|
| self.buffer_size = 8192
|
| self.shadowsocks = ShadowsocksProtocol(access_key)
|
|
|
| async def create_connection(self,
|
| client_reader: asyncio.StreamReader,
|
| client_writer: asyncio.StreamWriter,
|
| target_host: str,
|
| target_port: int) -> Optional[OutlineTCPConnection]:
|
| """Create a new TCP connection with Shadowsocks encryption"""
|
| try:
|
|
|
| client_addr = client_writer.get_extra_info('peername')
|
|
|
|
|
| target_reader, target_writer = await asyncio.open_connection(
|
| target_host, target_port
|
| )
|
|
|
|
|
| conn = OutlineTCPConnection(
|
| client_addr=client_addr,
|
| target_addr=(target_host, target_port),
|
| client_writer=client_writer,
|
| target_writer=target_writer,
|
| shadowsocks=self.shadowsocks
|
| )
|
|
|
|
|
| conn_id = f"{client_addr[0]}:{client_addr[1]}-{target_host}:{target_port}"
|
| async with self._lock:
|
| self.connections[conn_id] = conn
|
|
|
|
|
| asyncio.create_task(self._forward_stream(
|
| client_reader, target_writer, conn, 'in'))
|
| asyncio.create_task(self._forward_stream(
|
| target_reader, client_writer, conn, 'out'))
|
|
|
| logger.info(f"Created Outline TCP connection: {conn_id}")
|
| return conn
|
|
|
| except Exception as e:
|
| logger.error(f"Error creating Outline TCP connection: {e}")
|
| if client_writer:
|
| client_writer.close()
|
| await client_writer.wait_closed()
|
| return None
|
|
|
| async def _forward_stream(self,
|
| reader: asyncio.StreamReader,
|
| writer: asyncio.StreamWriter,
|
| conn: OutlineTCPConnection,
|
| direction: str):
|
| """Forward data with Shadowsocks encryption/decryption"""
|
| try:
|
| while True:
|
| data = await reader.read(self.buffer_size)
|
| if not data:
|
| break
|
|
|
|
|
| if direction == 'in':
|
|
|
| data = conn.shadowsocks._decrypt_packet(data)
|
| conn.bytes_in += len(data)
|
| else:
|
|
|
| data = conn.shadowsocks._encrypt_packet(data)
|
| conn.bytes_out += len(data)
|
|
|
| writer.write(data)
|
| await writer.drain()
|
| conn.last_activity = datetime.now()
|
|
|
| except Exception as e:
|
| logger.error(f"Error forwarding Outline data: {e}")
|
|
|
| finally:
|
| writer.close()
|
| await writer.wait_closed()
|
|
|
| conn_id = (f"{conn.client_addr[0]}:{conn.client_addr[1]}-"
|
| f"{conn.target_addr[0]}:{conn.target_addr[1]}")
|
| async with self._lock:
|
| if conn_id in self.connections:
|
| del self.connections[conn_id]
|
|
|
| async def cleanup_inactive(self, timeout: int = 300):
|
| """Clean up inactive connections"""
|
| while True:
|
| try:
|
| now = datetime.now()
|
| to_remove = []
|
|
|
| async with self._lock:
|
| for conn_id, conn in self.connections.items():
|
| if (now - conn.last_activity).seconds > timeout:
|
| to_remove.append(conn_id)
|
|
|
| for conn_id in to_remove:
|
| if conn_id in self.connections:
|
| conn = self.connections[conn_id]
|
| conn.client_writer.close()
|
| conn.target_writer.close()
|
| del self.connections[conn_id]
|
| logger.info(f"Cleaned up inactive connection: {conn_id}")
|
|
|
| await asyncio.sleep(60)
|
|
|
| except Exception as e:
|
| logger.error(f"Error in connection cleanup: {e}")
|
| await asyncio.sleep(60)
|
|
|
| def get_connection_stats(self) -> Dict[str, Dict]:
|
| """Get statistics for all active connections"""
|
| stats = {}
|
| for conn_id, conn in self.connections.items():
|
| stats[conn_id] = {
|
| 'bytes_in': conn.bytes_in,
|
| 'bytes_out': conn.bytes_out,
|
| 'created_at': conn.created_at.isoformat(),
|
| 'last_activity': conn.last_activity.isoformat(),
|
| 'client_addr': f"{conn.client_addr[0]}:{conn.client_addr[1]}",
|
| 'target_addr': f"{conn.target_addr[0]}:{conn.target_addr[1]}"
|
| }
|
| return stats
|
|
|