Spaces:
Runtime error
Runtime error
| """ | |
| Socket Translator Module | |
| Bridges virtual connections to real host sockets: | |
| - Map virtual connections to host sockets/HTTP clients | |
| - Bidirectional data streaming | |
| - Connection lifecycle management | |
| - Protocol translation (TCP/UDP to host sockets) | |
| """ | |
| import socket | |
| import threading | |
| import time | |
| import asyncio | |
| import aiohttp | |
| import ssl | |
| from typing import Dict, Optional, Callable, Tuple, Any | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| import urllib.parse | |
| import json | |
| from .tcp_engine import TCPConnection | |
| class ConnectionType(Enum): | |
| TCP_SOCKET = "TCP_SOCKET" | |
| UDP_SOCKET = "UDP_SOCKET" | |
| HTTP_CLIENT = "HTTP_CLIENT" | |
| HTTPS_CLIENT = "HTTPS_CLIENT" | |
| class SocketConnection: | |
| """Represents a socket connection""" | |
| connection_id: str | |
| connection_type: ConnectionType | |
| virtual_connection: Optional[TCPConnection] | |
| host_socket: Optional[socket.socket] | |
| remote_host: str | |
| remote_port: int | |
| created_time: float | |
| last_activity: float | |
| bytes_sent: int = 0 | |
| bytes_received: int = 0 | |
| is_connected: bool = False | |
| error_count: int = 0 | |
| def update_activity(self, bytes_transferred: int = 0, direction: str = 'sent'): | |
| """Update connection activity""" | |
| self.last_activity = time.time() | |
| if direction == 'sent': | |
| self.bytes_sent += bytes_transferred | |
| else: | |
| self.bytes_received += bytes_transferred | |
| def to_dict(self) -> Dict: | |
| """Convert to dictionary""" | |
| return { | |
| 'connection_id': self.connection_id, | |
| 'connection_type': self.connection_type.value, | |
| 'remote_host': self.remote_host, | |
| 'remote_port': self.remote_port, | |
| 'created_time': self.created_time, | |
| 'last_activity': self.last_activity, | |
| 'bytes_sent': self.bytes_sent, | |
| 'bytes_received': self.bytes_received, | |
| 'is_connected': self.is_connected, | |
| 'error_count': self.error_count, | |
| 'duration': time.time() - self.created_time | |
| } | |
| class SocketTranslator: | |
| """Manages socket translations and connections""" | |
| _instance = None | |
| _lock = threading.Lock() | |
| def __new__(cls): | |
| with cls._lock: | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| if self._initialized: | |
| return | |
| self.connections: Dict[str, SocketConnection] = {} | |
| self.lock = threading.Lock() | |
| self.cleanup_thread = threading.Thread(target=self._cleanup_loop) | |
| self.cleanup_thread.daemon = True | |
| self.running = True | |
| self.cleanup_thread.start() | |
| # SSL context for HTTPS | |
| self.ssl_context = ssl.create_default_context() | |
| self.ssl_context.check_hostname = False | |
| self.ssl_context.verify_mode = ssl.CERT_NONE | |
| # Async event loop | |
| self.loop = asyncio.new_event_loop() | |
| self.async_thread = threading.Thread(target=self._run_async_loop) | |
| self.async_thread.daemon = True | |
| self.async_thread.start() | |
| self._initialized = True | |
| def create_connection(self, connection_type: ConnectionType, | |
| remote_host: str, remote_port: int, | |
| virtual_connection: Optional[TCPConnection] = None) -> str: | |
| """Create a new connection""" | |
| connection = SocketConnection( | |
| connection_id=str(time.time_ns()), | |
| connection_type=connection_type, | |
| virtual_connection=virtual_connection, | |
| host_socket=None, | |
| remote_host=remote_host, | |
| remote_port=remote_port, | |
| created_time=time.time(), | |
| last_activity=time.time() | |
| ) | |
| with self.lock: | |
| self.connections[connection.connection_id] = connection | |
| return connection.connection_id | |
| async def connect(self, connection_id: str) -> bool: | |
| """Establish connection""" | |
| connection = self.connections.get(connection_id) | |
| if not connection: | |
| return False | |
| try: | |
| if connection.connection_type in (ConnectionType.TCP_SOCKET, ConnectionType.UDP_SOCKET): | |
| # Create socket | |
| sock_type = socket.SOCK_STREAM if connection.connection_type == ConnectionType.TCP_SOCKET else socket.SOCK_DGRAM | |
| sock = socket.socket(socket.AF_INET, sock_type) | |
| sock.setblocking(False) | |
| # Connect | |
| try: | |
| sock.connect((connection.remote_host, connection.remote_port)) | |
| except BlockingIOError: | |
| pass # Expected for non-blocking socket | |
| connection.host_socket = sock | |
| connection.is_connected = True | |
| # Start monitoring | |
| asyncio.create_task(self._monitor_socket(connection)) | |
| elif connection.connection_type in (ConnectionType.HTTP_CLIENT, ConnectionType.HTTPS_CLIENT): | |
| # HTTP(S) connection will be made per request | |
| connection.is_connected = True | |
| connection.update_activity() | |
| return True | |
| except Exception as e: | |
| print(f"Connection error: {e}") | |
| connection.error_count += 1 | |
| return False | |
| async def send_data(self, connection_id: str, data: bytes) -> bool: | |
| """Send data through connection""" | |
| connection = self.connections.get(connection_id) | |
| if not connection or not connection.is_connected: | |
| return False | |
| try: | |
| if connection.connection_type in (ConnectionType.TCP_SOCKET, ConnectionType.UDP_SOCKET): | |
| await self.loop.sock_sendall(connection.host_socket, data) | |
| connection.update_activity(len(data), 'sent') | |
| elif connection.connection_type in (ConnectionType.HTTP_CLIENT, ConnectionType.HTTPS_CLIENT): | |
| # Parse HTTP request | |
| request = self._parse_http_request(data) | |
| if not request: | |
| return False | |
| # Make HTTP request | |
| async with aiohttp.ClientSession() as session: | |
| url = f"{'https' if connection.connection_type == ConnectionType.HTTPS_CLIENT else 'http'}://{connection.remote_host}:{connection.remote_port}{request['path']}" | |
| async with session.request( | |
| method=request['method'], | |
| url=url, | |
| headers=request['headers'], | |
| data=request.get('body', b''), | |
| ssl=self.ssl_context if connection.connection_type == ConnectionType.HTTPS_CLIENT else None | |
| ) as response: | |
| # Forward response to virtual connection | |
| response_data = await response.read() | |
| if connection.virtual_connection: | |
| connection.virtual_connection.send_data(response_data) | |
| connection.update_activity(len(data), 'sent') | |
| connection.update_activity(len(response_data), 'received') | |
| return True | |
| except Exception as e: | |
| print(f"Send error: {e}") | |
| connection.error_count += 1 | |
| return False | |
| def close_connection(self, connection_id: str): | |
| """Close a connection""" | |
| connection = self.connections.get(connection_id) | |
| if connection: | |
| if connection.host_socket: | |
| try: | |
| connection.host_socket.close() | |
| except: | |
| pass | |
| connection.is_connected = False | |
| connection.update_activity() | |
| def get_connection(self, connection_id: str) -> Optional[SocketConnection]: | |
| """Get connection by ID""" | |
| return self.connections.get(connection_id) | |
| def _run_async_loop(self): | |
| """Run async event loop""" | |
| asyncio.set_event_loop(self.loop) | |
| self.loop.run_forever() | |
| async def _monitor_socket(self, connection: SocketConnection): | |
| """Monitor socket for incoming data""" | |
| while connection.is_connected: | |
| try: | |
| data = await self.loop.sock_recv(connection.host_socket, 8192) | |
| if not data: | |
| break | |
| connection.update_activity(len(data), 'received') | |
| # Forward data to virtual connection | |
| if connection.virtual_connection: | |
| connection.virtual_connection.send_data(data) | |
| except Exception as e: | |
| print(f"Monitor error: {e}") | |
| connection.error_count += 1 | |
| break | |
| connection.is_connected = False | |
| def _cleanup_loop(self): | |
| """Background cleanup loop""" | |
| while self.running: | |
| time.sleep(60) # Run every minute | |
| try: | |
| self._cleanup_connections() | |
| except Exception as e: | |
| print(f"Cleanup error: {e}") | |
| def _cleanup_connections(self): | |
| """Clean up inactive connections""" | |
| current_time = time.time() | |
| to_remove = [] | |
| with self.lock: | |
| for connection_id, connection in self.connections.items(): | |
| # Remove if: | |
| # 1. Not connected and inactive for 5 minutes | |
| # 2. Connected but inactive for 30 minutes | |
| # 3. Too many errors | |
| if ((not connection.is_connected and current_time - connection.last_activity > 300) or | |
| (connection.is_connected and current_time - connection.last_activity > 1800) or | |
| connection.error_count > 5): | |
| self.close_connection(connection_id) | |
| to_remove.append(connection_id) | |
| for connection_id in to_remove: | |
| del self.connections[connection_id] | |
| def _parse_http_request(self, data: bytes) -> Optional[Dict]: | |
| """Parse HTTP request from raw data""" | |
| try: | |
| # Split into lines | |
| lines = data.decode('utf-8', errors='ignore').split('\r\n') | |
| if not lines: | |
| return None | |
| # Parse request line | |
| request_line = lines[0].split(' ') | |
| if len(request_line) < 3: | |
| return None | |
| method, path, version = request_line[0], request_line[1], request_line[2] | |
| # Parse headers | |
| headers = {} | |
| i = 1 | |
| while i < len(lines): | |
| line = lines[i].strip() | |
| if not line: | |
| break | |
| if ':' in line: | |
| key, value = line.split(':', 1) | |
| headers[key.strip()] = value.strip() | |
| i += 1 | |
| # Get body | |
| body = '\r\n'.join(lines[i+1:]).encode('utf-8') if i+1 < len(lines) else b'' | |
| return { | |
| 'method': method, | |
| 'path': path, | |
| 'version': version, | |
| 'headers': headers, | |
| 'body': body | |
| } | |
| except Exception as e: | |
| print(f"HTTP parse error: {e}") | |
| return None | |
| def shutdown(self): | |
| """Shutdown the translator""" | |
| self.running = False | |
| if self.cleanup_thread.is_alive(): | |
| self.cleanup_thread.join() | |
| # Close all connections | |
| with self.lock: | |
| for connection_id in list(self.connections.keys()): | |
| self.close_connection(connection_id) | |
| self.connections.clear() | |
| # Stop async loop | |
| self.loop.stop() | |
| if self.async_thread.is_alive(): | |
| self.async_thread.join() | |