|
|
""" |
|
|
Socket Translator Module |
|
|
|
|
|
Bridges virtual connections to real host sockets: |
|
|
- Map virtual connections to host sockets/HTTP clients |
|
|
- Bidirectional data streaming |
|
|
- Connection lifecycle management |
|
|
- Protocol translation (TCP/UDP to host sockets) |
|
|
""" |
|
|
|
|
|
import socket |
|
|
import threading |
|
|
import time |
|
|
import asyncio |
|
|
import aiohttp |
|
|
import ssl |
|
|
from typing import Dict, Optional, Callable, Tuple, Any |
|
|
from dataclasses import dataclass |
|
|
from enum import Enum |
|
|
import urllib.parse |
|
|
import json |
|
|
|
|
|
from .tcp_engine import TCPConnection |
|
|
|
|
|
|
|
|
class ConnectionType(Enum): |
|
|
TCP_SOCKET = "TCP_SOCKET" |
|
|
UDP_SOCKET = "UDP_SOCKET" |
|
|
HTTP_CLIENT = "HTTP_CLIENT" |
|
|
HTTPS_CLIENT = "HTTPS_CLIENT" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class SocketConnection: |
|
|
"""Represents a socket connection""" |
|
|
connection_id: str |
|
|
connection_type: ConnectionType |
|
|
virtual_connection: Optional[TCPConnection] |
|
|
host_socket: Optional[socket.socket] |
|
|
remote_host: str |
|
|
remote_port: int |
|
|
created_time: float |
|
|
last_activity: float |
|
|
bytes_sent: int = 0 |
|
|
bytes_received: int = 0 |
|
|
is_connected: bool = False |
|
|
error_count: int = 0 |
|
|
|
|
|
def update_activity(self, bytes_transferred: int = 0, direction: str = 'sent'): |
|
|
"""Update connection activity""" |
|
|
self.last_activity = time.time() |
|
|
if direction == 'sent': |
|
|
self.bytes_sent += bytes_transferred |
|
|
else: |
|
|
self.bytes_received += bytes_transferred |
|
|
|
|
|
def to_dict(self) -> Dict: |
|
|
"""Convert to dictionary""" |
|
|
return { |
|
|
'connection_id': self.connection_id, |
|
|
'connection_type': self.connection_type.value, |
|
|
'remote_host': self.remote_host, |
|
|
'remote_port': self.remote_port, |
|
|
'created_time': self.created_time, |
|
|
'last_activity': self.last_activity, |
|
|
'bytes_sent': self.bytes_sent, |
|
|
'bytes_received': self.bytes_received, |
|
|
'is_connected': self.is_connected, |
|
|
'error_count': self.error_count, |
|
|
'duration': time.time() - self.created_time |
|
|
} |
|
|
|
|
|
|
|
|
class HTTPRequest: |
|
|
"""Represents an HTTP request""" |
|
|
|
|
|
def __init__(self, method: str = 'GET', path: str = '/', headers: Dict[str, str] = None, body: bytes = b''): |
|
|
self.method = method.upper() |
|
|
self.path = path |
|
|
self.headers = headers or {} |
|
|
self.body = body |
|
|
self.version = 'HTTP/1.1' |
|
|
|
|
|
@classmethod |
|
|
def parse(cls, data: bytes) -> Optional['HTTPRequest']: |
|
|
"""Parse HTTP request from raw data""" |
|
|
try: |
|
|
lines = data.decode('utf-8', errors='ignore').split('\r\n') |
|
|
if not lines: |
|
|
return None |
|
|
|
|
|
|
|
|
request_line = lines[0].split(' ') |
|
|
if len(request_line) < 3: |
|
|
return None |
|
|
|
|
|
method, path, version = request_line[0], request_line[1], request_line[2] |
|
|
|
|
|
|
|
|
headers = {} |
|
|
body_start = 1 |
|
|
for i, line in enumerate(lines[1:], 1): |
|
|
if line == '': |
|
|
body_start = i + 1 |
|
|
break |
|
|
if ':' in line: |
|
|
key, value = line.split(':', 1) |
|
|
headers[key.strip().lower()] = value.strip() |
|
|
|
|
|
|
|
|
body_lines = lines[body_start:] |
|
|
body = '\r\n'.join(body_lines).encode('utf-8') |
|
|
|
|
|
return cls(method, path, headers, body) |
|
|
|
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
def to_bytes(self) -> bytes: |
|
|
"""Convert to raw HTTP request""" |
|
|
request_line = f"{self.method} {self.path} {self.version}\r\n" |
|
|
|
|
|
|
|
|
if 'host' not in self.headers: |
|
|
self.headers['host'] = 'localhost' |
|
|
if 'user-agent' not in self.headers: |
|
|
self.headers['user-agent'] = 'VirtualISP/1.0' |
|
|
if self.body and 'content-length' not in self.headers: |
|
|
self.headers['content-length'] = str(len(self.body)) |
|
|
|
|
|
|
|
|
header_lines = [] |
|
|
for key, value in self.headers.items(): |
|
|
header_lines.append(f"{key}: {value}\r\n") |
|
|
|
|
|
|
|
|
request_data = request_line + ''.join(header_lines) + '\r\n' |
|
|
return request_data.encode('utf-8') + self.body |
|
|
|
|
|
|
|
|
class HTTPResponse: |
|
|
"""Represents an HTTP response""" |
|
|
|
|
|
def __init__(self, status_code: int = 200, reason: str = 'OK', headers: Dict[str, str] = None, body: bytes = b''): |
|
|
self.status_code = status_code |
|
|
self.reason = reason |
|
|
self.headers = headers or {} |
|
|
self.body = body |
|
|
self.version = 'HTTP/1.1' |
|
|
|
|
|
@classmethod |
|
|
def parse(cls, data: bytes) -> Optional['HTTPResponse']: |
|
|
"""Parse HTTP response from raw data""" |
|
|
try: |
|
|
lines = data.decode('utf-8', errors='ignore').split('\r\n') |
|
|
if not lines: |
|
|
return None |
|
|
|
|
|
|
|
|
status_line = lines[0].split(' ', 2) |
|
|
if len(status_line) < 3: |
|
|
return None |
|
|
|
|
|
version, status_code, reason = status_line[0], int(status_line[1]), status_line[2] |
|
|
|
|
|
|
|
|
headers = {} |
|
|
body_start = 1 |
|
|
for i, line in enumerate(lines[1:], 1): |
|
|
if line == '': |
|
|
body_start = i + 1 |
|
|
break |
|
|
if ':' in line: |
|
|
key, value = line.split(':', 1) |
|
|
headers[key.strip().lower()] = value.strip() |
|
|
|
|
|
|
|
|
body_lines = lines[body_start:] |
|
|
body = '\r\n'.join(body_lines).encode('utf-8') |
|
|
|
|
|
return cls(status_code, reason, headers, body) |
|
|
|
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
def to_bytes(self) -> bytes: |
|
|
"""Convert to raw HTTP response""" |
|
|
status_line = f"{self.version} {self.status_code} {self.reason}\r\n" |
|
|
|
|
|
|
|
|
if 'content-length' not in self.headers and self.body: |
|
|
self.headers['content-length'] = str(len(self.body)) |
|
|
if 'server' not in self.headers: |
|
|
self.headers['server'] = 'VirtualISP/1.0' |
|
|
|
|
|
|
|
|
header_lines = [] |
|
|
for key, value in self.headers.items(): |
|
|
header_lines.append(f"{key}: {value}\r\n") |
|
|
|
|
|
|
|
|
response_data = status_line + ''.join(header_lines) + '\r\n' |
|
|
return response_data.encode('utf-8') + self.body |
|
|
|
|
|
|
|
|
class SocketTranslator: |
|
|
"""Socket translator implementation""" |
|
|
|
|
|
def __init__(self, config: Dict): |
|
|
self.config = config |
|
|
self.connections: Dict[str, SocketConnection] = {} |
|
|
self.lock = threading.Lock() |
|
|
|
|
|
|
|
|
self.connect_timeout = config.get('connect_timeout', 10) |
|
|
self.read_timeout = config.get('read_timeout', 30) |
|
|
self.max_connections = config.get('max_connections', 1000) |
|
|
self.buffer_size = config.get('buffer_size', 8192) |
|
|
|
|
|
|
|
|
self.http_session = None |
|
|
self.loop = None |
|
|
|
|
|
|
|
|
self.stats = { |
|
|
'total_connections': 0, |
|
|
'active_connections': 0, |
|
|
'failed_connections': 0, |
|
|
'bytes_transferred': 0, |
|
|
'http_requests': 0, |
|
|
'tcp_connections': 0, |
|
|
'udp_connections': 0 |
|
|
} |
|
|
|
|
|
|
|
|
self.running = False |
|
|
self.cleanup_thread = None |
|
|
|
|
|
async def _init_http_session(self): |
|
|
"""Initialize HTTP client session""" |
|
|
connector = aiohttp.TCPConnector( |
|
|
limit=100, |
|
|
limit_per_host=10, |
|
|
ttl_dns_cache=300, |
|
|
use_dns_cache=True, |
|
|
) |
|
|
|
|
|
timeout = aiohttp.ClientTimeout( |
|
|
total=self.read_timeout, |
|
|
connect=self.connect_timeout |
|
|
) |
|
|
|
|
|
self.http_session = aiohttp.ClientSession( |
|
|
connector=connector, |
|
|
timeout=timeout, |
|
|
headers={'User-Agent': 'VirtualISP/1.0'} |
|
|
) |
|
|
|
|
|
def _is_http_request(self, data: bytes) -> bool: |
|
|
"""Check if data looks like an HTTP request""" |
|
|
try: |
|
|
first_line = data.split(b'\r\n')[0].decode('utf-8', errors='ignore') |
|
|
methods = ['GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'OPTIONS', 'PATCH', 'TRACE'] |
|
|
return any(first_line.startswith(method + ' ') for method in methods) |
|
|
except: |
|
|
return False |
|
|
|
|
|
def _determine_connection_type(self, remote_host: str, remote_port: int, data: bytes = b'') -> ConnectionType: |
|
|
"""Determine the appropriate connection type""" |
|
|
|
|
|
if remote_port == 80 or (data and self._is_http_request(data)): |
|
|
return ConnectionType.HTTP_CLIENT |
|
|
elif remote_port == 443: |
|
|
return ConnectionType.HTTPS_CLIENT |
|
|
else: |
|
|
return ConnectionType.TCP_SOCKET |
|
|
|
|
|
def create_connection(self, virtual_conn: TCPConnection, remote_host: str, remote_port: int, |
|
|
initial_data: bytes = b'') -> Optional[SocketConnection]: |
|
|
"""Create a new socket connection""" |
|
|
connection_id = f"{virtual_conn.connection_id}->{remote_host}:{remote_port}" |
|
|
|
|
|
|
|
|
with self.lock: |
|
|
if len(self.connections) >= self.max_connections: |
|
|
return None |
|
|
|
|
|
|
|
|
conn_type = self._determine_connection_type(remote_host, remote_port, initial_data) |
|
|
|
|
|
|
|
|
socket_conn = SocketConnection( |
|
|
connection_id=connection_id, |
|
|
connection_type=conn_type, |
|
|
virtual_connection=virtual_conn, |
|
|
host_socket=None, |
|
|
remote_host=remote_host, |
|
|
remote_port=remote_port, |
|
|
created_time=time.time(), |
|
|
last_activity=time.time() |
|
|
) |
|
|
|
|
|
with self.lock: |
|
|
self.connections[connection_id] = socket_conn |
|
|
|
|
|
|
|
|
if conn_type in [ConnectionType.HTTP_CLIENT, ConnectionType.HTTPS_CLIENT]: |
|
|
success = self._create_http_connection(socket_conn, initial_data) |
|
|
else: |
|
|
success = self._create_tcp_connection(socket_conn, initial_data) |
|
|
|
|
|
if success: |
|
|
self.stats['total_connections'] += 1 |
|
|
self.stats['active_connections'] = len(self.connections) |
|
|
|
|
|
if conn_type in [ConnectionType.HTTP_CLIENT, ConnectionType.HTTPS_CLIENT]: |
|
|
self.stats['http_requests'] += 1 |
|
|
else: |
|
|
self.stats['tcp_connections'] += 1 |
|
|
else: |
|
|
self.stats['failed_connections'] += 1 |
|
|
with self.lock: |
|
|
if connection_id in self.connections: |
|
|
del self.connections[connection_id] |
|
|
return None |
|
|
|
|
|
return socket_conn |
|
|
|
|
|
def _create_tcp_connection(self, socket_conn: SocketConnection, initial_data: bytes) -> bool: |
|
|
"""Create TCP socket connection""" |
|
|
try: |
|
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
|
sock.settimeout(self.connect_timeout) |
|
|
|
|
|
|
|
|
sock.connect((socket_conn.remote_host, socket_conn.remote_port)) |
|
|
sock.settimeout(self.read_timeout) |
|
|
|
|
|
socket_conn.host_socket = sock |
|
|
socket_conn.is_connected = True |
|
|
|
|
|
|
|
|
if initial_data: |
|
|
sock.send(initial_data) |
|
|
socket_conn.update_activity(len(initial_data), 'sent') |
|
|
|
|
|
|
|
|
thread = threading.Thread( |
|
|
target=self._tcp_receive_loop, |
|
|
args=(socket_conn,), |
|
|
daemon=True |
|
|
) |
|
|
thread.start() |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Failed to create TCP connection to {socket_conn.remote_host}:{socket_conn.remote_port}: {e}") |
|
|
socket_conn.error_count += 1 |
|
|
return False |
|
|
|
|
|
def _create_http_connection(self, socket_conn: SocketConnection, initial_data: bytes) -> bool: |
|
|
"""Create HTTP connection""" |
|
|
try: |
|
|
|
|
|
http_request = HTTPRequest.parse(initial_data) |
|
|
if not http_request: |
|
|
return False |
|
|
|
|
|
|
|
|
http_request.headers['host'] = socket_conn.remote_host |
|
|
|
|
|
|
|
|
if self.loop and not self.loop.is_closed(): |
|
|
asyncio.run_coroutine_threadsafe( |
|
|
self._handle_http_request(socket_conn, http_request), |
|
|
self.loop |
|
|
) |
|
|
else: |
|
|
|
|
|
return self._handle_http_request_sync(socket_conn, http_request) |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Failed to create HTTP connection to {socket_conn.remote_host}:{socket_conn.remote_port}: {e}") |
|
|
socket_conn.error_count += 1 |
|
|
return False |
|
|
|
|
|
async def _handle_http_request(self, socket_conn: SocketConnection, http_request: HTTPRequest): |
|
|
"""Handle HTTP request asynchronously""" |
|
|
try: |
|
|
if not self.http_session: |
|
|
await self._init_http_session() |
|
|
|
|
|
|
|
|
scheme = 'https' if socket_conn.connection_type == ConnectionType.HTTPS_CLIENT else 'http' |
|
|
url = f"{scheme}://{socket_conn.remote_host}:{socket_conn.remote_port}{http_request.path}" |
|
|
|
|
|
|
|
|
async with self.http_session.request( |
|
|
method=http_request.method, |
|
|
url=url, |
|
|
headers=http_request.headers, |
|
|
data=http_request.body |
|
|
) as response: |
|
|
|
|
|
response_body = await response.read() |
|
|
|
|
|
|
|
|
http_response = HTTPResponse( |
|
|
status_code=response.status, |
|
|
reason=response.reason or 'OK', |
|
|
headers=dict(response.headers), |
|
|
body=response_body |
|
|
) |
|
|
|
|
|
|
|
|
response_data = http_response.to_bytes() |
|
|
if socket_conn.virtual_connection and socket_conn.virtual_connection.on_data_received: |
|
|
socket_conn.virtual_connection.on_data_received(response_data) |
|
|
|
|
|
socket_conn.update_activity(len(response_data), 'received') |
|
|
self.stats['bytes_transferred'] += len(response_data) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"HTTP request failed: {e}") |
|
|
socket_conn.error_count += 1 |
|
|
|
|
|
|
|
|
error_response = HTTPResponse( |
|
|
status_code=500, |
|
|
reason='Internal Server Error', |
|
|
body=f"Error: {str(e)}".encode('utf-8') |
|
|
) |
|
|
|
|
|
response_data = error_response.to_bytes() |
|
|
if socket_conn.virtual_connection and socket_conn.virtual_connection.on_data_received: |
|
|
socket_conn.virtual_connection.on_data_received(response_data) |
|
|
|
|
|
def _handle_http_request_sync(self, socket_conn: SocketConnection, http_request: HTTPRequest) -> bool: |
|
|
"""Handle HTTP request synchronously (fallback)""" |
|
|
try: |
|
|
|
|
|
scheme = 'https' if socket_conn.connection_type == ConnectionType.HTTPS_CLIENT else 'http' |
|
|
url = f"{scheme}://{socket_conn.remote_host}:{socket_conn.remote_port}{http_request.path}" |
|
|
|
|
|
import urllib.request |
|
|
import urllib.error |
|
|
|
|
|
|
|
|
req = urllib.request.Request( |
|
|
url, |
|
|
data=http_request.body if http_request.body else None, |
|
|
headers=http_request.headers, |
|
|
method=http_request.method |
|
|
) |
|
|
|
|
|
|
|
|
with urllib.request.urlopen(req, timeout=self.read_timeout) as response: |
|
|
response_body = response.read() |
|
|
|
|
|
|
|
|
http_response = HTTPResponse( |
|
|
status_code=response.getcode(), |
|
|
reason='OK', |
|
|
headers=dict(response.headers), |
|
|
body=response_body |
|
|
) |
|
|
|
|
|
|
|
|
response_data = http_response.to_bytes() |
|
|
if socket_conn.virtual_connection and socket_conn.virtual_connection.on_data_received: |
|
|
socket_conn.virtual_connection.on_data_received(response_data) |
|
|
|
|
|
socket_conn.update_activity(len(response_data), 'received') |
|
|
self.stats['bytes_transferred'] += len(response_data) |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Sync HTTP request failed: {e}") |
|
|
socket_conn.error_count += 1 |
|
|
return False |
|
|
|
|
|
def _tcp_receive_loop(self, socket_conn: SocketConnection): |
|
|
"""Background loop for receiving TCP data""" |
|
|
sock = socket_conn.host_socket |
|
|
if not sock: |
|
|
return |
|
|
|
|
|
try: |
|
|
while socket_conn.is_connected: |
|
|
try: |
|
|
data = sock.recv(self.buffer_size) |
|
|
if not data: |
|
|
break |
|
|
|
|
|
|
|
|
if socket_conn.virtual_connection and socket_conn.virtual_connection.on_data_received: |
|
|
socket_conn.virtual_connection.on_data_received(data) |
|
|
|
|
|
socket_conn.update_activity(len(data), 'received') |
|
|
self.stats['bytes_transferred'] += len(data) |
|
|
|
|
|
except socket.timeout: |
|
|
continue |
|
|
except Exception as e: |
|
|
print(f"TCP receive error: {e}") |
|
|
break |
|
|
|
|
|
finally: |
|
|
self._close_connection(socket_conn.connection_id) |
|
|
|
|
|
def send_data(self, connection_id: str, data: bytes) -> bool: |
|
|
"""Send data through socket connection""" |
|
|
with self.lock: |
|
|
socket_conn = self.connections.get(connection_id) |
|
|
|
|
|
if not socket_conn or not socket_conn.is_connected: |
|
|
return False |
|
|
|
|
|
try: |
|
|
if socket_conn.connection_type in [ConnectionType.HTTP_CLIENT, ConnectionType.HTTPS_CLIENT]: |
|
|
|
|
|
return self._create_http_connection(socket_conn, data) |
|
|
else: |
|
|
|
|
|
if socket_conn.host_socket: |
|
|
socket_conn.host_socket.send(data) |
|
|
socket_conn.update_activity(len(data), 'sent') |
|
|
self.stats['bytes_transferred'] += len(data) |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Failed to send data: {e}") |
|
|
socket_conn.error_count += 1 |
|
|
self._close_connection(connection_id) |
|
|
|
|
|
return False |
|
|
|
|
|
def _close_connection(self, connection_id: str): |
|
|
"""Close socket connection""" |
|
|
with self.lock: |
|
|
socket_conn = self.connections.get(connection_id) |
|
|
if not socket_conn: |
|
|
return |
|
|
|
|
|
|
|
|
if socket_conn.host_socket: |
|
|
try: |
|
|
socket_conn.host_socket.close() |
|
|
except: |
|
|
pass |
|
|
|
|
|
socket_conn.is_connected = False |
|
|
|
|
|
|
|
|
del self.connections[connection_id] |
|
|
|
|
|
self.stats['active_connections'] = len(self.connections) |
|
|
|
|
|
def close_connection(self, connection_id: str) -> bool: |
|
|
"""Manually close connection""" |
|
|
self._close_connection(connection_id) |
|
|
return True |
|
|
|
|
|
def get_connection(self, connection_id: str) -> Optional[SocketConnection]: |
|
|
"""Get socket connection""" |
|
|
with self.lock: |
|
|
return self.connections.get(connection_id) |
|
|
|
|
|
def get_connections(self) -> Dict[str, Dict]: |
|
|
"""Get all socket connections""" |
|
|
with self.lock: |
|
|
return { |
|
|
conn_id: conn.to_dict() |
|
|
for conn_id, conn in self.connections.items() |
|
|
} |
|
|
|
|
|
def get_stats(self) -> Dict: |
|
|
"""Get socket translator statistics""" |
|
|
with self.lock: |
|
|
stats = self.stats.copy() |
|
|
stats['active_connections'] = len(self.connections) |
|
|
|
|
|
return stats |
|
|
|
|
|
def _cleanup_loop(self): |
|
|
"""Background cleanup loop""" |
|
|
while self.running: |
|
|
try: |
|
|
current_time = time.time() |
|
|
expired_connections = [] |
|
|
|
|
|
with self.lock: |
|
|
for conn_id, conn in self.connections.items(): |
|
|
|
|
|
if current_time - conn.last_activity > self.read_timeout * 2: |
|
|
expired_connections.append(conn_id) |
|
|
|
|
|
for conn_id in expired_connections: |
|
|
self._close_connection(conn_id) |
|
|
|
|
|
time.sleep(30) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Socket translator cleanup error: {e}") |
|
|
time.sleep(5) |
|
|
|
|
|
def start(self): |
|
|
"""Start socket translator""" |
|
|
self.running = True |
|
|
|
|
|
|
|
|
try: |
|
|
self.loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(self.loop) |
|
|
|
|
|
|
|
|
self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) |
|
|
self.cleanup_thread.start() |
|
|
|
|
|
print("Socket translator started") |
|
|
except Exception as e: |
|
|
print(f"Failed to start socket translator: {e}") |
|
|
|
|
|
def stop(self): |
|
|
"""Stop socket translator""" |
|
|
self.running = False |
|
|
|
|
|
|
|
|
with self.lock: |
|
|
connection_ids = list(self.connections.keys()) |
|
|
|
|
|
for conn_id in connection_ids: |
|
|
self._close_connection(conn_id) |
|
|
|
|
|
|
|
|
if self.http_session: |
|
|
asyncio.run_coroutine_threadsafe(self.http_session.close(), self.loop) |
|
|
|
|
|
|
|
|
if self.loop and not self.loop.is_closed(): |
|
|
self.loop.call_soon_threadsafe(self.loop.stop) |
|
|
|
|
|
|
|
|
if self.cleanup_thread: |
|
|
self.cleanup_thread.join() |
|
|
|
|
|
print("Socket translator stopped") |
|
|
|
|
|
|