Spaces:
Runtime error
Runtime error
File size: 6,464 Bytes
6a5b8d8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
"""
TCP Forwarding Engine Module for Outline VPN
Handles TCP traffic forwarding and connection tracking with Shadowsocks protocol support
"""
import asyncio
import logging
import socket
from typing import Dict, Set, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
from .shadowsocks_protocol import ShadowsocksProtocol
logger = logging.getLogger(__name__)
@dataclass
class OutlineTCPConnection:
client_addr: Tuple[str, int]
target_addr: Tuple[str, int]
client_writer: asyncio.StreamWriter
target_writer: asyncio.StreamWriter
shadowsocks: ShadowsocksProtocol
bytes_in: int = 0
bytes_out: int = 0
created_at: datetime = datetime.now()
last_activity: datetime = datetime.now()
class OutlineTCPForwardingEngine:
def __init__(self, access_key: str):
self.connections: Dict[str, OutlineTCPConnection] = {}
self.active_ports: Set[int] = set()
self._lock = asyncio.Lock()
self.buffer_size = 8192
self.shadowsocks = ShadowsocksProtocol(access_key)
async def create_connection(self,
client_reader: asyncio.StreamReader,
client_writer: asyncio.StreamWriter,
target_host: str,
target_port: int) -> Optional[OutlineTCPConnection]:
"""Create a new TCP connection with Shadowsocks encryption"""
try:
# Get client information
client_addr = client_writer.get_extra_info('peername')
# Connect to target
target_reader, target_writer = await asyncio.open_connection(
target_host, target_port
)
# Create connection object
conn = OutlineTCPConnection(
client_addr=client_addr,
target_addr=(target_host, target_port),
client_writer=client_writer,
target_writer=target_writer,
shadowsocks=self.shadowsocks
)
# Store connection
conn_id = f"{client_addr[0]}:{client_addr[1]}-{target_host}:{target_port}"
async with self._lock:
self.connections[conn_id] = conn
# Start bidirectional forwarding with encryption
asyncio.create_task(self._forward_stream(
client_reader, target_writer, conn, 'in'))
asyncio.create_task(self._forward_stream(
target_reader, client_writer, conn, 'out'))
logger.info(f"Created Outline TCP connection: {conn_id}")
return conn
except Exception as e:
logger.error(f"Error creating Outline TCP connection: {e}")
if client_writer:
client_writer.close()
await client_writer.wait_closed()
return None
async def _forward_stream(self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
conn: OutlineTCPConnection,
direction: str):
"""Forward data with Shadowsocks encryption/decryption"""
try:
while True:
data = await reader.read(self.buffer_size)
if not data:
break
# Handle encryption/decryption
if direction == 'in':
# Decrypt incoming data from client
data = conn.shadowsocks._decrypt_packet(data)
conn.bytes_in += len(data)
else:
# Encrypt outgoing data to client
data = conn.shadowsocks._encrypt_packet(data)
conn.bytes_out += len(data)
writer.write(data)
await writer.drain()
conn.last_activity = datetime.now()
except Exception as e:
logger.error(f"Error forwarding Outline data: {e}")
finally:
writer.close()
await writer.wait_closed()
# Clean up connection
conn_id = (f"{conn.client_addr[0]}:{conn.client_addr[1]}-"
f"{conn.target_addr[0]}:{conn.target_addr[1]}")
async with self._lock:
if conn_id in self.connections:
del self.connections[conn_id]
async def cleanup_inactive(self, timeout: int = 300):
"""Clean up inactive connections"""
while True:
try:
now = datetime.now()
to_remove = []
async with self._lock:
for conn_id, conn in self.connections.items():
if (now - conn.last_activity).seconds > timeout:
to_remove.append(conn_id)
for conn_id in to_remove:
if conn_id in self.connections:
conn = self.connections[conn_id]
conn.client_writer.close()
conn.target_writer.close()
del self.connections[conn_id]
logger.info(f"Cleaned up inactive connection: {conn_id}")
await asyncio.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Error in connection cleanup: {e}")
await asyncio.sleep(60) # Retry after error
def get_connection_stats(self) -> Dict[str, Dict]:
"""Get statistics for all active connections"""
stats = {}
for conn_id, conn in self.connections.items():
stats[conn_id] = {
'bytes_in': conn.bytes_in,
'bytes_out': conn.bytes_out,
'created_at': conn.created_at.isoformat(),
'last_activity': conn.last_activity.isoformat(),
'client_addr': f"{conn.client_addr[0]}:{conn.client_addr[1]}",
'target_addr': f"{conn.target_addr[0]}:{conn.target_addr[1]}"
}
return stats
|