"""HTTP/WebSocket header builders for reverse-proxy requests. All values are sanitized to ASCII-safe Latin-1 before use. """ import base64 import random import re import string import uuid from typing import Optional from urllib.parse import urlparse from app.platform.logging.logger import logger from app.control.proxy.models import ProxyLease from app.dataplane.proxy.adapters.profile import ProxyProfile, resolve_proxy_profile # --------------------------------------------------------------------------- # Unicode → ASCII normalisation map # --------------------------------------------------------------------------- _CHAR_MAP = str.maketrans( { "\u2010": "-", "\u2011": "-", "\u2012": "-", "\u2013": "-", "\u2014": "-", "\u2212": "-", "\u2018": "'", "\u2019": "'", "\u201c": '"', "\u201d": '"', "\u00a0": " ", "\u2007": " ", "\u202f": " ", "\u200b": "", "\u200c": "", "\u200d": "", "\ufeff": "", } ) def _sanitize(value: Optional[str], *, field: str, strip_spaces: bool = False) -> str: raw = "" if value is None else str(value) out = raw.translate(_CHAR_MAP) out = re.sub(r"\s+", "", out) if strip_spaces else out.strip() out = out.encode("latin-1", errors="ignore").decode("latin-1") if out != raw: logger.debug( "header sanitized: field={} original_len={} sanitized_len={}", field, len(raw), len(out), ) return out # --------------------------------------------------------------------------- # Statsig / request-id generation # --------------------------------------------------------------------------- def _statsig_id() -> str: """Generate a Statsig evaluation fallback ID. The real browser's fetch interceptor tries to evaluate Statsig gates for each request. When the Statsig SDK is not yet initialised (headless, first paint, etc.) it catches the error and falls back to:: btoa("x1:" + error.toString()) The server accepts this fallback. We reproduce the exact format with varied error messages to avoid a static fingerprint. """ if random.choice((True, False)): rand = "".join(random.choices(string.ascii_lowercase + string.digits, k=5)) msg = f"x1:TypeError: Cannot read properties of null (reading 'children[\\'{rand}\\']')" else: rand = "".join(random.choices(string.ascii_lowercase, k=10)) msg = f"x1:TypeError: Cannot read properties of undefined (reading '{rand}')" return base64.b64encode(msg.encode()).decode() # --------------------------------------------------------------------------- # Client-hints helpers # --------------------------------------------------------------------------- def _major_version(browser: Optional[str], ua: Optional[str]) -> Optional[str]: for src in (browser or "", ua or ""): m = re.search(r"(\d{2,3})", src) if m: return m.group(1) return None def _platform(ua: str) -> Optional[str]: u = ua.lower() if "windows" in u: return "Windows" if "mac os x" in u or "macintosh" in u: return "macOS" if "android" in u: return "Android" if "iphone" in u or "ipad" in u: return "iOS" if "linux" in u: return "Linux" return None def _arch(ua: str) -> Optional[str]: u = ua.lower() if "aarch64" in u or "arm" in u: return "arm" if "x86_64" in u or "x64" in u or "win64" in u or "intel" in u: return "x86" return None def _client_hints(browser: Optional[str], ua: Optional[str]) -> dict[str, str]: b = (browser or "").lower() u = (ua or "").lower() is_chromium = any(k in b for k in ("chrome", "chromium", "edge", "brave")) or any( k in u for k in ("chrome", "chromium", "edg") ) if not is_chromium or "firefox" in u or ("safari" in u and "chrome" not in u): return {} ver = _major_version(browser, ua) if not ver: return {} if "edge" in b or "edg" in u: brand = "Microsoft Edge" elif "brave" in b: brand = "Brave" elif "chromium" in b: brand = "Chromium" else: brand = "Google Chrome" sec_ch_ua = f'"{brand}";v="{ver}", "Chromium";v="{ver}", "Not(A:Brand";v="24"' plat = _platform(ua or "") arch = _arch(ua or "") mobile = "?1" if ("mobile" in u or plat in ("Android", "iOS")) else "?0" hints: dict[str, str] = { "Sec-Ch-Ua": sec_ch_ua, "Sec-Ch-Ua-Mobile": mobile, "Sec-Ch-Ua-Model": "", } if plat: hints["Sec-Ch-Ua-Platform"] = f'"{plat}"' if arch: hints["Sec-Ch-Ua-Arch"] = arch hints["Sec-Ch-Ua-Bitness"] = "64" return hints # --------------------------------------------------------------------------- # Lease resolution # --------------------------------------------------------------------------- def _resolve_profile(lease: ProxyLease | None) -> ProxyProfile: return resolve_proxy_profile(lease) # --------------------------------------------------------------------------- # Public builders # --------------------------------------------------------------------------- def build_sso_cookie( sso_token: str, *, lease: ProxyLease | None = None, cf_cookies: str | None = None, cf_clearance: str | None = None, ) -> str: """Build the Cookie header value for an SSO-authenticated request. When *cf_clearance* is not provided, the value is resolved from the lease's cf_cookies profile or falls back to the config's cf_clearance (supporting both ``proxy.clearance.cf_clearance`` and legacy ``proxy.cf_clearance`` paths). Historical bug: earlier v2.0 releases silently defaulted cf_clearance to the empty string when not passed explicitly, causing Cookies without a CF clearance token and immediate 403 from Cloudflare on every grok.com call. """ tok = sso_token[4:] if sso_token.startswith("sso=") else sso_token tok = _sanitize(tok, field="sso_token", strip_spaces=True) cookie = f"sso={tok}; sso-rw={tok}" profile = _resolve_profile(lease) eff_cookies = _sanitize( cf_cookies if cf_cookies is not None else profile.cf_cookies, field="cf_cookies" ) eff_clearance = _sanitize( cf_clearance if cf_clearance is not None else profile.cf_clearance, field="cf_clearance", strip_spaces=True, ) if eff_clearance and eff_cookies: if re.search(r"(?:^|;\s*)cf_clearance=", eff_cookies): eff_cookies = re.sub( r"(^|;\s*)cf_clearance=[^;]*", r"\1cf_clearance=" + eff_clearance, eff_cookies, count=1, ) else: eff_cookies = f"{eff_cookies.rstrip('; ')}; cf_clearance={eff_clearance}" elif eff_clearance: eff_cookies = f"cf_clearance={eff_clearance}" if eff_cookies: cookie += f"; {eff_cookies}" return cookie def build_http_headers( cookie_token: str, *, content_type: Optional[str] = None, origin: Optional[str] = None, referer: Optional[str] = None, lease: ProxyLease | None = None, ) -> dict[str, str]: """Build headers for a standard HTTP reverse-proxy request.""" profile = _resolve_profile(lease) raw_ua = profile.user_agent ua = _sanitize(raw_ua, field="user_agent") browser = profile.browser org = _sanitize(origin or "https://grok.com", field="origin") ref = _sanitize(referer or "https://grok.com/", field="referer") ct = content_type or "application/json" if ct == "application/json": accept = "*/*" fd = "empty" elif ct in ("image/jpeg", "image/png", "video/mp4", "video/webm"): accept = ( "text/html,application/xhtml+xml,application/xml;q=0.9," "image/avif,image/webp,image/apng,*/*;q=0.8" ) fd = "document" else: accept = "*/*" fd = "empty" org_host = urlparse(org).hostname ref_host = urlparse(ref).hostname site = "same-origin" if org_host and org_host == ref_host else "same-site" headers: dict[str, str] = { "Accept": accept, "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Baggage": ( "sentry-environment=production," "sentry-release=d6add6fb0460641fd482d767a335ef72b9b6abb8," "sentry-public_key=b311e0f2690c81f25e2c4cf6d4f7ce1c" ), "Content-Type": ct, "Origin": org, "Priority": "u=1, i", "Referer": ref, "Sec-Fetch-Dest": fd, "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": site, "User-Agent": ua, "x-statsig-id": _statsig_id(), "x-xai-request-id": str(uuid.uuid4()), } headers.update(_client_hints(browser, raw_ua)) headers["Cookie"] = build_sso_cookie(cookie_token, lease=lease) logger.debug("http headers built: header_count={}", len(headers)) return headers def build_ws_headers( token: Optional[str] = None, *, origin: Optional[str] = None, extra: Optional[dict[str, str]] = None, lease: ProxyLease | None = None, ) -> dict[str, str]: """Build headers for a WebSocket upgrade request.""" profile = _resolve_profile(lease) raw_ua = profile.user_agent ua = _sanitize(raw_ua, field="user_agent") browser = profile.browser org = _sanitize(origin or "https://grok.com", field="origin") headers: dict[str, str] = { "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Cache-Control": "no-cache", "Origin": org, "Pragma": "no-cache", "User-Agent": ua, } headers.update(_client_hints(browser, raw_ua)) if token: headers["Cookie"] = build_sso_cookie(token, lease=lease) if extra: headers.update(extra) return headers __all__ = ["build_http_headers", "build_sso_cookie", "build_ws_headers"]