File size: 6,464 Bytes
6a5b8d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""

TCP Forwarding Engine Module for Outline VPN



Handles TCP traffic forwarding and connection tracking with Shadowsocks protocol support

"""

import asyncio
import logging
import socket
from typing import Dict, Set, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
from .shadowsocks_protocol import ShadowsocksProtocol

logger = logging.getLogger(__name__)

@dataclass
class OutlineTCPConnection:
    client_addr: Tuple[str, int]
    target_addr: Tuple[str, int]
    client_writer: asyncio.StreamWriter
    target_writer: asyncio.StreamWriter
    shadowsocks: ShadowsocksProtocol
    bytes_in: int = 0
    bytes_out: int = 0
    created_at: datetime = datetime.now()
    last_activity: datetime = datetime.now()

class OutlineTCPForwardingEngine:
    def __init__(self, access_key: str):
        self.connections: Dict[str, OutlineTCPConnection] = {}
        self.active_ports: Set[int] = set()
        self._lock = asyncio.Lock()
        self.buffer_size = 8192
        self.shadowsocks = ShadowsocksProtocol(access_key)

    async def create_connection(self, 

                              client_reader: asyncio.StreamReader,

                              client_writer: asyncio.StreamWriter,

                              target_host: str,

                              target_port: int) -> Optional[OutlineTCPConnection]:
        """Create a new TCP connection with Shadowsocks encryption"""
        try:
            # Get client information
            client_addr = client_writer.get_extra_info('peername')
            
            # Connect to target
            target_reader, target_writer = await asyncio.open_connection(
                target_host, target_port
            )
            
            # Create connection object
            conn = OutlineTCPConnection(
                client_addr=client_addr,
                target_addr=(target_host, target_port),
                client_writer=client_writer,
                target_writer=target_writer,
                shadowsocks=self.shadowsocks
            )
            
            # Store connection
            conn_id = f"{client_addr[0]}:{client_addr[1]}-{target_host}:{target_port}"
            async with self._lock:
                self.connections[conn_id] = conn
            
            # Start bidirectional forwarding with encryption
            asyncio.create_task(self._forward_stream(
                client_reader, target_writer, conn, 'in'))
            asyncio.create_task(self._forward_stream(
                target_reader, client_writer, conn, 'out'))
            
            logger.info(f"Created Outline TCP connection: {conn_id}")
            return conn
            
        except Exception as e:
            logger.error(f"Error creating Outline TCP connection: {e}")
            if client_writer:
                client_writer.close()
                await client_writer.wait_closed()
            return None

    async def _forward_stream(self,

                            reader: asyncio.StreamReader,

                            writer: asyncio.StreamWriter,

                            conn: OutlineTCPConnection,

                            direction: str):
        """Forward data with Shadowsocks encryption/decryption"""
        try:
            while True:
                data = await reader.read(self.buffer_size)
                if not data:
                    break
                
                # Handle encryption/decryption
                if direction == 'in':
                    # Decrypt incoming data from client
                    data = conn.shadowsocks._decrypt_packet(data)
                    conn.bytes_in += len(data)
                else:
                    # Encrypt outgoing data to client
                    data = conn.shadowsocks._encrypt_packet(data)
                    conn.bytes_out += len(data)
                
                writer.write(data)
                await writer.drain()
                conn.last_activity = datetime.now()
                
        except Exception as e:
            logger.error(f"Error forwarding Outline data: {e}")
            
        finally:
            writer.close()
            await writer.wait_closed()
            # Clean up connection
            conn_id = (f"{conn.client_addr[0]}:{conn.client_addr[1]}-"
                      f"{conn.target_addr[0]}:{conn.target_addr[1]}")
            async with self._lock:
                if conn_id in self.connections:
                    del self.connections[conn_id]

    async def cleanup_inactive(self, timeout: int = 300):
        """Clean up inactive connections"""
        while True:
            try:
                now = datetime.now()
                to_remove = []
                
                async with self._lock:
                    for conn_id, conn in self.connections.items():
                        if (now - conn.last_activity).seconds > timeout:
                            to_remove.append(conn_id)
                    
                    for conn_id in to_remove:
                        if conn_id in self.connections:
                            conn = self.connections[conn_id]
                            conn.client_writer.close()
                            conn.target_writer.close()
                            del self.connections[conn_id]
                            logger.info(f"Cleaned up inactive connection: {conn_id}")
                
                await asyncio.sleep(60)  # Check every minute
                
            except Exception as e:
                logger.error(f"Error in connection cleanup: {e}")
                await asyncio.sleep(60)  # Retry after error

    def get_connection_stats(self) -> Dict[str, Dict]:
        """Get statistics for all active connections"""
        stats = {}
        for conn_id, conn in self.connections.items():
            stats[conn_id] = {
                'bytes_in': conn.bytes_in,
                'bytes_out': conn.bytes_out,
                'created_at': conn.created_at.isoformat(),
                'last_activity': conn.last_activity.isoformat(),
                'client_addr': f"{conn.client_addr[0]}:{conn.client_addr[1]}",
                'target_addr': f"{conn.target_addr[0]}:{conn.target_addr[1]}"
            }
        return stats