ggload / app /services /reverse /utils /websocket.py
f2d90b38's picture
Upload 120 files
8cdca00 verified
"""
WebSocket helpers for reverse interfaces.
"""
import ssl
import certifi
import aiohttp
from aiohttp_socks import ProxyConnector
from typing import Mapping, Optional, Any
from urllib.parse import urlparse
from app.core.logger import logger
from app.core.config import get_config
def _default_ssl_context() -> ssl.SSLContext:
context = ssl.create_default_context()
context.load_verify_locations(certifi.where())
return context
def _normalize_socks_proxy(proxy_url: str) -> tuple[str, Optional[bool]]:
scheme = urlparse(proxy_url).scheme.lower()
rdns: Optional[bool] = None
base_scheme = scheme
if scheme == "socks5h":
base_scheme = "socks5"
rdns = True
elif scheme == "socks4a":
base_scheme = "socks4"
rdns = True
if base_scheme != scheme:
proxy_url = proxy_url.replace(f"{scheme}://", f"{base_scheme}://", 1)
return proxy_url, rdns
def resolve_proxy(proxy_url: Optional[str] = None, ssl_context: ssl.SSLContext = _default_ssl_context()) -> tuple[aiohttp.BaseConnector, Optional[str]]:
"""Resolve proxy connector.
Args:
proxy_url: Optional[str], the proxy URL. Defaults to None.
ssl_context: ssl.SSLContext, the SSL context. Defaults to _default_ssl_context().
Returns:
tuple[aiohttp.BaseConnector, Optional[str]]: The proxy connector and the proxy URL.
"""
if not proxy_url:
return aiohttp.TCPConnector(ssl=ssl_context), None
scheme = urlparse(proxy_url).scheme.lower()
if scheme.startswith("socks"):
normalized, rdns = _normalize_socks_proxy(proxy_url)
logger.info(f"Using SOCKS proxy: {proxy_url}")
try:
if rdns is not None:
return (
ProxyConnector.from_url(normalized, rdns=rdns, ssl=ssl_context),
None,
)
except TypeError:
return ProxyConnector.from_url(normalized, ssl=ssl_context), None
return ProxyConnector.from_url(normalized, ssl=ssl_context), None
logger.info(f"Using HTTP proxy: {proxy_url}")
return aiohttp.TCPConnector(ssl=ssl_context), proxy_url
class WebSocketConnection:
"""WebSocket connection wrapper."""
def __init__(self, session: aiohttp.ClientSession, ws: aiohttp.ClientWebSocketResponse) -> None:
self.session = session
self.ws = ws
async def close(self) -> None:
if not self.ws.closed:
await self.ws.close()
await self.session.close()
async def __aenter__(self) -> aiohttp.ClientWebSocketResponse:
return self.ws
async def __aexit__(self, exc_type, exc, tb) -> None:
await self.close()
class WebSocketClient:
"""WebSocket client with proxy support."""
def __init__(self, proxy: Optional[str] = None) -> None:
self._proxy_override = proxy
self._ssl_context = _default_ssl_context()
async def connect(
self,
url: str,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
ws_kwargs: Optional[Mapping[str, object]] = None,
) -> WebSocketConnection:
"""Connect to the WebSocket.
Args:
url: str, the URL to connect to.
headers: Optional[Mapping[str, str]], the headers to send. Defaults to None.
ws_kwargs: Optional[Mapping[str, object]], extra ws_connect kwargs. Defaults to None.
Returns:
WebSocketConnection: The WebSocket connection.
"""
# Resolve proxy dynamically from config if not overridden
proxy_url = self._proxy_override or get_config("proxy.base_proxy_url")
connector, resolved_proxy = resolve_proxy(proxy_url, self._ssl_context)
logger.debug(f"WebSocket connect: proxy_url={proxy_url}, resolved_proxy={resolved_proxy}, connector={type(connector).__name__}")
# Build client timeout
total_timeout = (
float(timeout)
if timeout is not None
else float(get_config("voice.timeout") or 120)
)
client_timeout = aiohttp.ClientTimeout(total=total_timeout)
# Create session
session = aiohttp.ClientSession(connector=connector, timeout=client_timeout)
try:
# Cast to Any to avoid Pylance errors with **extra_kwargs
extra_kwargs: dict[str, Any] = dict(ws_kwargs or {})
ws = await session.ws_connect(
url,
headers=headers,
proxy=resolved_proxy,
ssl=self._ssl_context,
**extra_kwargs,
)
return WebSocketConnection(session, ws)
except Exception:
await session.close()
raise
__all__ = ["WebSocketClient", "WebSocketConnection", "resolve_proxy"]