""" Socket Translator Module Bridges virtual connections to real host sockets: - Map virtual connections to host sockets/HTTP clients - Bidirectional data streaming - Connection lifecycle management - Protocol translation (TCP/UDP to host sockets) """ import socket import threading import time import asyncio import aiohttp import ssl from typing import Dict, Optional, Callable, Tuple, Any from dataclasses import dataclass from enum import Enum import urllib.parse import json from .tcp_engine import TCPConnection class ConnectionType(Enum): TCP_SOCKET = "TCP_SOCKET" UDP_SOCKET = "UDP_SOCKET" HTTP_CLIENT = "HTTP_CLIENT" HTTPS_CLIENT = "HTTPS_CLIENT" @dataclass class SocketConnection: """Represents a socket connection""" connection_id: str connection_type: ConnectionType virtual_connection: Optional[TCPConnection] host_socket: Optional[socket.socket] remote_host: str remote_port: int created_time: float last_activity: float bytes_sent: int = 0 bytes_received: int = 0 is_connected: bool = False error_count: int = 0 def update_activity(self, bytes_transferred: int = 0, direction: str = 'sent'): """Update connection activity""" self.last_activity = time.time() if direction == 'sent': self.bytes_sent += bytes_transferred else: self.bytes_received += bytes_transferred def to_dict(self) -> Dict: """Convert to dictionary""" return { 'connection_id': self.connection_id, 'connection_type': self.connection_type.value, 'remote_host': self.remote_host, 'remote_port': self.remote_port, 'created_time': self.created_time, 'last_activity': self.last_activity, 'bytes_sent': self.bytes_sent, 'bytes_received': self.bytes_received, 'is_connected': self.is_connected, 'error_count': self.error_count, 'duration': time.time() - self.created_time } class HTTPRequest: """Represents an HTTP request""" def __init__(self, method: str = 'GET', path: str = '/', headers: Dict[str, str] = None, body: bytes = b''): self.method = method.upper() self.path = path self.headers = headers or {} self.body = body self.version = 'HTTP/1.1' @classmethod def parse(cls, data: bytes) -> Optional['HTTPRequest']: """Parse HTTP request from raw data""" try: lines = data.decode('utf-8', errors='ignore').split('\r\n') if not lines: return None # Parse request line request_line = lines[0].split(' ') if len(request_line) < 3: return None method, path, version = request_line[0], request_line[1], request_line[2] # Parse headers headers = {} body_start = 1 for i, line in enumerate(lines[1:], 1): if line == '': body_start = i + 1 break if ':' in line: key, value = line.split(':', 1) headers[key.strip().lower()] = value.strip() # Parse body body_lines = lines[body_start:] body = '\r\n'.join(body_lines).encode('utf-8') return cls(method, path, headers, body) except Exception: return None def to_bytes(self) -> bytes: """Convert to raw HTTP request""" request_line = f"{self.method} {self.path} {self.version}\r\n" # Add default headers if 'host' not in self.headers: self.headers['host'] = 'localhost' if 'user-agent' not in self.headers: self.headers['user-agent'] = 'VirtualISP/1.0' if self.body and 'content-length' not in self.headers: self.headers['content-length'] = str(len(self.body)) # Build headers header_lines = [] for key, value in self.headers.items(): header_lines.append(f"{key}: {value}\r\n") # Combine all parts request_data = request_line + ''.join(header_lines) + '\r\n' return request_data.encode('utf-8') + self.body class HTTPResponse: """Represents an HTTP response""" def __init__(self, status_code: int = 200, reason: str = 'OK', headers: Dict[str, str] = None, body: bytes = b''): self.status_code = status_code self.reason = reason self.headers = headers or {} self.body = body self.version = 'HTTP/1.1' @classmethod def parse(cls, data: bytes) -> Optional['HTTPResponse']: """Parse HTTP response from raw data""" try: lines = data.decode('utf-8', errors='ignore').split('\r\n') if not lines: return None # Parse status line status_line = lines[0].split(' ', 2) if len(status_line) < 3: return None version, status_code, reason = status_line[0], int(status_line[1]), status_line[2] # Parse headers headers = {} body_start = 1 for i, line in enumerate(lines[1:], 1): if line == '': body_start = i + 1 break if ':' in line: key, value = line.split(':', 1) headers[key.strip().lower()] = value.strip() # Parse body body_lines = lines[body_start:] body = '\r\n'.join(body_lines).encode('utf-8') return cls(status_code, reason, headers, body) except Exception: return None def to_bytes(self) -> bytes: """Convert to raw HTTP response""" status_line = f"{self.version} {self.status_code} {self.reason}\r\n" # Add default headers if 'content-length' not in self.headers and self.body: self.headers['content-length'] = str(len(self.body)) if 'server' not in self.headers: self.headers['server'] = 'VirtualISP/1.0' # Build headers header_lines = [] for key, value in self.headers.items(): header_lines.append(f"{key}: {value}\r\n") # Combine all parts response_data = status_line + ''.join(header_lines) + '\r\n' return response_data.encode('utf-8') + self.body class SocketTranslator: """Socket translator implementation""" def __init__(self, config: Dict): self.config = config self.connections: Dict[str, SocketConnection] = {} self.lock = threading.Lock() # Configuration self.connect_timeout = config.get('connect_timeout', 10) self.read_timeout = config.get('read_timeout', 30) self.max_connections = config.get('max_connections', 1000) self.buffer_size = config.get('buffer_size', 8192) # HTTP client session self.http_session = None self.loop = None # Statistics self.stats = { 'total_connections': 0, 'active_connections': 0, 'failed_connections': 0, 'bytes_transferred': 0, 'http_requests': 0, 'tcp_connections': 0, 'udp_connections': 0 } # Background tasks self.running = False self.cleanup_thread = None async def _init_http_session(self): """Initialize HTTP client session""" connector = aiohttp.TCPConnector( limit=100, limit_per_host=10, ttl_dns_cache=300, use_dns_cache=True, ) timeout = aiohttp.ClientTimeout( total=self.read_timeout, connect=self.connect_timeout ) self.http_session = aiohttp.ClientSession( connector=connector, timeout=timeout, headers={'User-Agent': 'VirtualISP/1.0'} ) def _is_http_request(self, data: bytes) -> bool: """Check if data looks like an HTTP request""" try: first_line = data.split(b'\r\n')[0].decode('utf-8', errors='ignore') methods = ['GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS', 'PATCH', 'TRACE'] return any(first_line.startswith(method + ' ') for method in methods) except: return False def _determine_connection_type(self, remote_host: str, remote_port: int, data: bytes = b'') -> ConnectionType: """Determine the appropriate connection type""" # Check for HTTP/HTTPS based on port and data if remote_port == 80 or (data and self._is_http_request(data)): return ConnectionType.HTTP_CLIENT elif remote_port == 443: return ConnectionType.HTTPS_CLIENT else: return ConnectionType.TCP_SOCKET def create_connection(self, virtual_conn: TCPConnection, remote_host: str, remote_port: int, initial_data: bytes = b'') -> Optional[SocketConnection]: """Create a new socket connection""" connection_id = f"{virtual_conn.connection_id}->{remote_host}:{remote_port}" # Check connection limit with self.lock: if len(self.connections) >= self.max_connections: return None # Determine connection type conn_type = self._determine_connection_type(remote_host, remote_port, initial_data) # Create socket connection socket_conn = SocketConnection( connection_id=connection_id, connection_type=conn_type, virtual_connection=virtual_conn, host_socket=None, remote_host=remote_host, remote_port=remote_port, created_time=time.time(), last_activity=time.time() ) with self.lock: self.connections[connection_id] = socket_conn # Establish connection based on type if conn_type in [ConnectionType.HTTP_CLIENT, ConnectionType.HTTPS_CLIENT]: success = self._create_http_connection(socket_conn, initial_data) else: success = self._create_tcp_connection(socket_conn, initial_data) if success: self.stats['total_connections'] += 1 self.stats['active_connections'] = len(self.connections) if conn_type in [ConnectionType.HTTP_CLIENT, ConnectionType.HTTPS_CLIENT]: self.stats['http_requests'] += 1 else: self.stats['tcp_connections'] += 1 else: self.stats['failed_connections'] += 1 with self.lock: if connection_id in self.connections: del self.connections[connection_id] return None return socket_conn def _create_tcp_connection(self, socket_conn: SocketConnection, initial_data: bytes) -> bool: """Create TCP socket connection""" try: # Create socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.connect_timeout) # Connect sock.connect((socket_conn.remote_host, socket_conn.remote_port)) sock.settimeout(self.read_timeout) socket_conn.host_socket = sock socket_conn.is_connected = True # Send initial data if any if initial_data: sock.send(initial_data) socket_conn.update_activity(len(initial_data), 'sent') # Start background thread for receiving data thread = threading.Thread( target=self._tcp_receive_loop, args=(socket_conn,), daemon=True ) thread.start() return True except Exception as e: print(f"Failed to create TCP connection to {socket_conn.remote_host}:{socket_conn.remote_port}: {e}") socket_conn.error_count += 1 return False def _create_http_connection(self, socket_conn: SocketConnection, initial_data: bytes) -> bool: """Create HTTP connection""" try: # Parse HTTP request http_request = HTTPRequest.parse(initial_data) if not http_request: return False # Set host header http_request.headers['host'] = socket_conn.remote_host # Start async HTTP request if self.loop and not self.loop.is_closed(): asyncio.run_coroutine_threadsafe( self._handle_http_request(socket_conn, http_request), self.loop ) else: # Fallback to sync HTTP handling return self._handle_http_request_sync(socket_conn, http_request) return True except Exception as e: print(f"Failed to create HTTP connection to {socket_conn.remote_host}:{socket_conn.remote_port}: {e}") socket_conn.error_count += 1 return False async def _handle_http_request(self, socket_conn: SocketConnection, http_request: HTTPRequest): """Handle HTTP request asynchronously""" try: if not self.http_session: await self._init_http_session() # Build URL scheme = 'https' if socket_conn.connection_type == ConnectionType.HTTPS_CLIENT else 'http' url = f"{scheme}://{socket_conn.remote_host}:{socket_conn.remote_port}{http_request.path}" # Make request async with self.http_session.request( method=http_request.method, url=url, headers=http_request.headers, data=http_request.body ) as response: # Read response response_body = await response.read() # Create HTTP response http_response = HTTPResponse( status_code=response.status, reason=response.reason or 'OK', headers=dict(response.headers), body=response_body ) # Send response back to virtual connection response_data = http_response.to_bytes() if socket_conn.virtual_connection and socket_conn.virtual_connection.on_data_received: socket_conn.virtual_connection.on_data_received(response_data) socket_conn.update_activity(len(response_data), 'received') self.stats['bytes_transferred'] += len(response_data) except Exception as e: print(f"HTTP request failed: {e}") socket_conn.error_count += 1 # Send error response error_response = HTTPResponse( status_code=500, reason='Internal Server Error', body=f"Error: {str(e)}".encode('utf-8') ) response_data = error_response.to_bytes() if socket_conn.virtual_connection and socket_conn.virtual_connection.on_data_received: socket_conn.virtual_connection.on_data_received(response_data) def _handle_http_request_sync(self, socket_conn: SocketConnection, http_request: HTTPRequest) -> bool: """Handle HTTP request synchronously (fallback)""" try: # Use urllib for sync HTTP requests scheme = 'https' if socket_conn.connection_type == ConnectionType.HTTPS_CLIENT else 'http' url = f"{scheme}://{socket_conn.remote_host}:{socket_conn.remote_port}{http_request.path}" import urllib.request import urllib.error # Create request req = urllib.request.Request( url, data=http_request.body if http_request.body else None, headers=http_request.headers, method=http_request.method ) # Make request with urllib.request.urlopen(req, timeout=self.read_timeout) as response: response_body = response.read() # Create HTTP response http_response = HTTPResponse( status_code=response.getcode(), reason='OK', headers=dict(response.headers), body=response_body ) # Send response back to virtual connection response_data = http_response.to_bytes() if socket_conn.virtual_connection and socket_conn.virtual_connection.on_data_received: socket_conn.virtual_connection.on_data_received(response_data) socket_conn.update_activity(len(response_data), 'received') self.stats['bytes_transferred'] += len(response_data) return True except Exception as e: print(f"Sync HTTP request failed: {e}") socket_conn.error_count += 1 return False def _tcp_receive_loop(self, socket_conn: SocketConnection): """Background loop for receiving TCP data""" sock = socket_conn.host_socket if not sock: return try: while socket_conn.is_connected: try: data = sock.recv(self.buffer_size) if not data: break # Forward data to virtual connection if socket_conn.virtual_connection and socket_conn.virtual_connection.on_data_received: socket_conn.virtual_connection.on_data_received(data) socket_conn.update_activity(len(data), 'received') self.stats['bytes_transferred'] += len(data) except socket.timeout: continue except Exception as e: print(f"TCP receive error: {e}") break finally: self._close_connection(socket_conn.connection_id) def send_data(self, connection_id: str, data: bytes) -> bool: """Send data through socket connection""" with self.lock: socket_conn = self.connections.get(connection_id) if not socket_conn or not socket_conn.is_connected: return False try: if socket_conn.connection_type in [ConnectionType.HTTP_CLIENT, ConnectionType.HTTPS_CLIENT]: # For HTTP connections, treat as new request return self._create_http_connection(socket_conn, data) else: # TCP connection if socket_conn.host_socket: socket_conn.host_socket.send(data) socket_conn.update_activity(len(data), 'sent') self.stats['bytes_transferred'] += len(data) return True except Exception as e: print(f"Failed to send data: {e}") socket_conn.error_count += 1 self._close_connection(connection_id) return False def _close_connection(self, connection_id: str): """Close socket connection""" with self.lock: socket_conn = self.connections.get(connection_id) if not socket_conn: return # Close socket if socket_conn.host_socket: try: socket_conn.host_socket.close() except: pass socket_conn.is_connected = False # Remove from connections del self.connections[connection_id] self.stats['active_connections'] = len(self.connections) def close_connection(self, connection_id: str) -> bool: """Manually close connection""" self._close_connection(connection_id) return True def get_connection(self, connection_id: str) -> Optional[SocketConnection]: """Get socket connection""" with self.lock: return self.connections.get(connection_id) def get_connections(self) -> Dict[str, Dict]: """Get all socket connections""" with self.lock: return { conn_id: conn.to_dict() for conn_id, conn in self.connections.items() } def get_stats(self) -> Dict: """Get socket translator statistics""" with self.lock: stats = self.stats.copy() stats['active_connections'] = len(self.connections) return stats def _cleanup_loop(self): """Background cleanup loop""" while self.running: try: current_time = time.time() expired_connections = [] with self.lock: for conn_id, conn in self.connections.items(): # Close connections that have been inactive too long if current_time - conn.last_activity > self.read_timeout * 2: expired_connections.append(conn_id) for conn_id in expired_connections: self._close_connection(conn_id) time.sleep(30) # Cleanup every 30 seconds except Exception as e: print(f"Socket translator cleanup error: {e}") time.sleep(5) def start(self): """Start socket translator""" self.running = True # Start event loop for async HTTP try: self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) # Start cleanup thread self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) self.cleanup_thread.start() print("Socket translator started") except Exception as e: print(f"Failed to start socket translator: {e}") def stop(self): """Stop socket translator""" self.running = False # Close all connections with self.lock: connection_ids = list(self.connections.keys()) for conn_id in connection_ids: self._close_connection(conn_id) # Close HTTP session if self.http_session: asyncio.run_coroutine_threadsafe(self.http_session.close(), self.loop) # Close event loop if self.loop and not self.loop.is_closed(): self.loop.call_soon_threadsafe(self.loop.stop) # Wait for cleanup thread if self.cleanup_thread: self.cleanup_thread.join() print("Socket translator stopped")