| """
|
| Socket Translator Module
|
|
|
| Bridges virtual connections to real host sockets:
|
| - Map virtual connections to host sockets/HTTP clients
|
| - Bidirectional data streaming
|
| - Connection lifecycle management
|
| - Protocol translation (TCP/UDP to host sockets)
|
| """
|
|
|
| import socket
|
| import threading
|
| import time
|
| import asyncio
|
| import aiohttp
|
| import ssl
|
| from typing import Dict, Optional, Callable, Tuple, Any
|
| from dataclasses import dataclass
|
| from enum import Enum
|
| import urllib.parse
|
| import json
|
|
|
| from .tcp_engine import TCPConnection
|
|
|
|
|
| class ConnectionType(Enum):
|
| TCP_SOCKET = "TCP_SOCKET"
|
| UDP_SOCKET = "UDP_SOCKET"
|
| HTTP_CLIENT = "HTTP_CLIENT"
|
| HTTPS_CLIENT = "HTTPS_CLIENT"
|
|
|
|
|
| @dataclass
|
| class SocketConnection:
|
| """Represents a socket connection"""
|
| connection_id: str
|
| connection_type: ConnectionType
|
| virtual_connection: Optional[TCPConnection]
|
| host_socket: Optional[socket.socket]
|
| remote_host: str
|
| remote_port: int
|
| created_time: float
|
| last_activity: float
|
| bytes_sent: int = 0
|
| bytes_received: int = 0
|
| is_connected: bool = False
|
| error_count: int = 0
|
|
|
| def update_activity(self, bytes_transferred: int = 0, direction: str = 'sent'):
|
| """Update connection activity"""
|
| self.last_activity = time.time()
|
| if direction == 'sent':
|
| self.bytes_sent += bytes_transferred
|
| else:
|
| self.bytes_received += bytes_transferred
|
|
|
| def to_dict(self) -> Dict:
|
| """Convert to dictionary"""
|
| return {
|
| 'connection_id': self.connection_id,
|
| 'connection_type': self.connection_type.value,
|
| 'remote_host': self.remote_host,
|
| 'remote_port': self.remote_port,
|
| 'created_time': self.created_time,
|
| 'last_activity': self.last_activity,
|
| 'bytes_sent': self.bytes_sent,
|
| 'bytes_received': self.bytes_received,
|
| 'is_connected': self.is_connected,
|
| 'error_count': self.error_count,
|
| 'duration': time.time() - self.created_time
|
| }
|
|
|
|
|
| class SocketTranslator:
|
| """Manages socket translations and connections"""
|
|
|
| _instance = None
|
| _lock = threading.Lock()
|
|
|
| def __new__(cls):
|
| with cls._lock:
|
| if cls._instance is None:
|
| cls._instance = super().__new__(cls)
|
| cls._instance._initialized = False
|
| return cls._instance
|
|
|
| def __init__(self):
|
| if self._initialized:
|
| return
|
|
|
| self.connections: Dict[str, SocketConnection] = {}
|
| self.lock = threading.Lock()
|
| self.cleanup_thread = threading.Thread(target=self._cleanup_loop)
|
| self.cleanup_thread.daemon = True
|
| self.running = True
|
| self.cleanup_thread.start()
|
|
|
|
|
| self.ssl_context = ssl.create_default_context()
|
| self.ssl_context.check_hostname = False
|
| self.ssl_context.verify_mode = ssl.CERT_NONE
|
|
|
|
|
| self.loop = asyncio.new_event_loop()
|
| self.async_thread = threading.Thread(target=self._run_async_loop)
|
| self.async_thread.daemon = True
|
| self.async_thread.start()
|
|
|
| self._initialized = True
|
|
|
| def create_connection(self, connection_type: ConnectionType,
|
| remote_host: str, remote_port: int,
|
| virtual_connection: Optional[TCPConnection] = None) -> str:
|
| """Create a new connection"""
|
| connection = SocketConnection(
|
| connection_id=str(time.time_ns()),
|
| connection_type=connection_type,
|
| virtual_connection=virtual_connection,
|
| host_socket=None,
|
| remote_host=remote_host,
|
| remote_port=remote_port,
|
| created_time=time.time(),
|
| last_activity=time.time()
|
| )
|
|
|
| with self.lock:
|
| self.connections[connection.connection_id] = connection
|
|
|
| return connection.connection_id
|
|
|
| async def connect(self, connection_id: str) -> bool:
|
| """Establish connection"""
|
| connection = self.connections.get(connection_id)
|
| if not connection:
|
| return False
|
|
|
| try:
|
| if connection.connection_type in (ConnectionType.TCP_SOCKET, ConnectionType.UDP_SOCKET):
|
|
|
| sock_type = socket.SOCK_STREAM if connection.connection_type == ConnectionType.TCP_SOCKET else socket.SOCK_DGRAM
|
| sock = socket.socket(socket.AF_INET, sock_type)
|
| sock.setblocking(False)
|
|
|
|
|
| try:
|
| sock.connect((connection.remote_host, connection.remote_port))
|
| except BlockingIOError:
|
| pass
|
|
|
| connection.host_socket = sock
|
| connection.is_connected = True
|
|
|
|
|
| asyncio.create_task(self._monitor_socket(connection))
|
|
|
| elif connection.connection_type in (ConnectionType.HTTP_CLIENT, ConnectionType.HTTPS_CLIENT):
|
|
|
| connection.is_connected = True
|
|
|
| connection.update_activity()
|
| return True
|
|
|
| except Exception as e:
|
| print(f"Connection error: {e}")
|
| connection.error_count += 1
|
| return False
|
|
|
| async def send_data(self, connection_id: str, data: bytes) -> bool:
|
| """Send data through connection"""
|
| connection = self.connections.get(connection_id)
|
| if not connection or not connection.is_connected:
|
| return False
|
|
|
| try:
|
| if connection.connection_type in (ConnectionType.TCP_SOCKET, ConnectionType.UDP_SOCKET):
|
| await self.loop.sock_sendall(connection.host_socket, data)
|
| connection.update_activity(len(data), 'sent')
|
|
|
| elif connection.connection_type in (ConnectionType.HTTP_CLIENT, ConnectionType.HTTPS_CLIENT):
|
|
|
| request = self._parse_http_request(data)
|
| if not request:
|
| return False
|
|
|
|
|
| async with aiohttp.ClientSession() as session:
|
| url = f"{'https' if connection.connection_type == ConnectionType.HTTPS_CLIENT else 'http'}://{connection.remote_host}:{connection.remote_port}{request['path']}"
|
|
|
| async with session.request(
|
| method=request['method'],
|
| url=url,
|
| headers=request['headers'],
|
| data=request.get('body', b''),
|
| ssl=self.ssl_context if connection.connection_type == ConnectionType.HTTPS_CLIENT else None
|
| ) as response:
|
|
|
| response_data = await response.read()
|
| if connection.virtual_connection:
|
| connection.virtual_connection.send_data(response_data)
|
|
|
| connection.update_activity(len(data), 'sent')
|
| connection.update_activity(len(response_data), 'received')
|
|
|
| return True
|
|
|
| except Exception as e:
|
| print(f"Send error: {e}")
|
| connection.error_count += 1
|
| return False
|
|
|
| def close_connection(self, connection_id: str):
|
| """Close a connection"""
|
| connection = self.connections.get(connection_id)
|
| if connection:
|
| if connection.host_socket:
|
| try:
|
| connection.host_socket.close()
|
| except:
|
| pass
|
| connection.is_connected = False
|
| connection.update_activity()
|
|
|
| def get_connection(self, connection_id: str) -> Optional[SocketConnection]:
|
| """Get connection by ID"""
|
| return self.connections.get(connection_id)
|
|
|
| def _run_async_loop(self):
|
| """Run async event loop"""
|
| asyncio.set_event_loop(self.loop)
|
| self.loop.run_forever()
|
|
|
| async def _monitor_socket(self, connection: SocketConnection):
|
| """Monitor socket for incoming data"""
|
| while connection.is_connected:
|
| try:
|
| data = await self.loop.sock_recv(connection.host_socket, 8192)
|
| if not data:
|
| break
|
|
|
| connection.update_activity(len(data), 'received')
|
|
|
|
|
| if connection.virtual_connection:
|
| connection.virtual_connection.send_data(data)
|
|
|
| except Exception as e:
|
| print(f"Monitor error: {e}")
|
| connection.error_count += 1
|
| break
|
|
|
| connection.is_connected = False
|
|
|
| def _cleanup_loop(self):
|
| """Background cleanup loop"""
|
| while self.running:
|
| time.sleep(60)
|
| try:
|
| self._cleanup_connections()
|
| except Exception as e:
|
| print(f"Cleanup error: {e}")
|
|
|
| def _cleanup_connections(self):
|
| """Clean up inactive connections"""
|
| current_time = time.time()
|
| to_remove = []
|
|
|
| with self.lock:
|
| for connection_id, connection in self.connections.items():
|
|
|
|
|
|
|
|
|
| if ((not connection.is_connected and current_time - connection.last_activity > 300) or
|
| (connection.is_connected and current_time - connection.last_activity > 1800) or
|
| connection.error_count > 5):
|
| self.close_connection(connection_id)
|
| to_remove.append(connection_id)
|
|
|
| for connection_id in to_remove:
|
| del self.connections[connection_id]
|
|
|
| def _parse_http_request(self, data: bytes) -> Optional[Dict]:
|
| """Parse HTTP request from raw data"""
|
| try:
|
|
|
| lines = data.decode('utf-8', errors='ignore').split('\r\n')
|
| if not lines:
|
| return None
|
|
|
|
|
| request_line = lines[0].split(' ')
|
| if len(request_line) < 3:
|
| return None
|
|
|
| method, path, version = request_line[0], request_line[1], request_line[2]
|
|
|
|
|
| headers = {}
|
| i = 1
|
| while i < len(lines):
|
| line = lines[i].strip()
|
| if not line:
|
| break
|
| if ':' in line:
|
| key, value = line.split(':', 1)
|
| headers[key.strip()] = value.strip()
|
| i += 1
|
|
|
|
|
| body = '\r\n'.join(lines[i+1:]).encode('utf-8') if i+1 < len(lines) else b''
|
|
|
| return {
|
| 'method': method,
|
| 'path': path,
|
| 'version': version,
|
| 'headers': headers,
|
| 'body': body
|
| }
|
|
|
| except Exception as e:
|
| print(f"HTTP parse error: {e}")
|
| return None
|
|
|
| def shutdown(self):
|
| """Shutdown the translator"""
|
| self.running = False
|
| if self.cleanup_thread.is_alive():
|
| self.cleanup_thread.join()
|
|
|
|
|
| with self.lock:
|
| for connection_id in list(self.connections.keys()):
|
| self.close_connection(connection_id)
|
| self.connections.clear()
|
|
|
|
|
| self.loop.stop()
|
| if self.async_thread.is_alive():
|
| self.async_thread.join()
|
|
|