| """HTTP/WebSocket header builders for reverse-proxy requests. |
| |
| All values are sanitized to ASCII-safe Latin-1 before use. |
| """ |
|
|
| import base64 |
| import random |
| import re |
| import string |
| import uuid |
| from typing import Optional |
| from urllib.parse import urlparse |
|
|
|
|
| from app.platform.logging.logger import logger |
| from app.control.proxy.models import ProxyLease |
| from app.dataplane.proxy.adapters.profile import ProxyProfile, resolve_proxy_profile |
|
|
| |
| |
| |
|
|
| _CHAR_MAP = str.maketrans( |
| { |
| "\u2010": "-", |
| "\u2011": "-", |
| "\u2012": "-", |
| "\u2013": "-", |
| "\u2014": "-", |
| "\u2212": "-", |
| "\u2018": "'", |
| "\u2019": "'", |
| "\u201c": '"', |
| "\u201d": '"', |
| "\u00a0": " ", |
| "\u2007": " ", |
| "\u202f": " ", |
| "\u200b": "", |
| "\u200c": "", |
| "\u200d": "", |
| "\ufeff": "", |
| } |
| ) |
|
|
|
|
| def _sanitize(value: Optional[str], *, field: str, strip_spaces: bool = False) -> str: |
| raw = "" if value is None else str(value) |
| out = raw.translate(_CHAR_MAP) |
| out = re.sub(r"\s+", "", out) if strip_spaces else out.strip() |
| out = out.encode("latin-1", errors="ignore").decode("latin-1") |
| if out != raw: |
| logger.debug( |
| "header sanitized: field={} original_len={} sanitized_len={}", |
| field, |
| len(raw), |
| len(out), |
| ) |
| return out |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _statsig_id() -> str: |
| """Generate a Statsig evaluation fallback ID. |
| |
| The real browser's fetch interceptor tries to evaluate Statsig gates for |
| each request. When the Statsig SDK is not yet initialised (headless, |
| first paint, etc.) it catches the error and falls back to:: |
| |
| btoa("x1:" + error.toString()) |
| |
| The server accepts this fallback. We reproduce the exact format with |
| varied error messages to avoid a static fingerprint. |
| """ |
| if random.choice((True, False)): |
| rand = "".join(random.choices(string.ascii_lowercase + string.digits, k=5)) |
| msg = f"x1:TypeError: Cannot read properties of null (reading 'children[\\'{rand}\\']')" |
| else: |
| rand = "".join(random.choices(string.ascii_lowercase, k=10)) |
| msg = f"x1:TypeError: Cannot read properties of undefined (reading '{rand}')" |
| return base64.b64encode(msg.encode()).decode() |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _major_version(browser: Optional[str], ua: Optional[str]) -> Optional[str]: |
| for src in (browser or "", ua or ""): |
| m = re.search(r"(\d{2,3})", src) |
| if m: |
| return m.group(1) |
| return None |
|
|
|
|
| def _platform(ua: str) -> Optional[str]: |
| u = ua.lower() |
| if "windows" in u: |
| return "Windows" |
| if "mac os x" in u or "macintosh" in u: |
| return "macOS" |
| if "android" in u: |
| return "Android" |
| if "iphone" in u or "ipad" in u: |
| return "iOS" |
| if "linux" in u: |
| return "Linux" |
| return None |
|
|
|
|
| def _arch(ua: str) -> Optional[str]: |
| u = ua.lower() |
| if "aarch64" in u or "arm" in u: |
| return "arm" |
| if "x86_64" in u or "x64" in u or "win64" in u or "intel" in u: |
| return "x86" |
| return None |
|
|
|
|
| def _client_hints(browser: Optional[str], ua: Optional[str]) -> dict[str, str]: |
| b = (browser or "").lower() |
| u = (ua or "").lower() |
| is_chromium = any(k in b for k in ("chrome", "chromium", "edge", "brave")) or any( |
| k in u for k in ("chrome", "chromium", "edg") |
| ) |
| if not is_chromium or "firefox" in u or ("safari" in u and "chrome" not in u): |
| return {} |
| ver = _major_version(browser, ua) |
| if not ver: |
| return {} |
| if "edge" in b or "edg" in u: |
| brand = "Microsoft Edge" |
| elif "brave" in b: |
| brand = "Brave" |
| elif "chromium" in b: |
| brand = "Chromium" |
| else: |
| brand = "Google Chrome" |
|
|
| sec_ch_ua = f'"{brand}";v="{ver}", "Chromium";v="{ver}", "Not(A:Brand";v="24"' |
| plat = _platform(ua or "") |
| arch = _arch(ua or "") |
| mobile = "?1" if ("mobile" in u or plat in ("Android", "iOS")) else "?0" |
|
|
| hints: dict[str, str] = { |
| "Sec-Ch-Ua": sec_ch_ua, |
| "Sec-Ch-Ua-Mobile": mobile, |
| "Sec-Ch-Ua-Model": "", |
| } |
| if plat: |
| hints["Sec-Ch-Ua-Platform"] = f'"{plat}"' |
| if arch: |
| hints["Sec-Ch-Ua-Arch"] = arch |
| hints["Sec-Ch-Ua-Bitness"] = "64" |
| return hints |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _resolve_profile(lease: ProxyLease | None) -> ProxyProfile: |
| return resolve_proxy_profile(lease) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def build_sso_cookie( |
| sso_token: str, |
| *, |
| lease: ProxyLease | None = None, |
| cf_cookies: str | None = None, |
| cf_clearance: str | None = None, |
| ) -> str: |
| """Build the Cookie header value for an SSO-authenticated request. |
| |
| When *cf_clearance* is not provided, the value is resolved from the lease's |
| cf_cookies profile or falls back to the config's cf_clearance (supporting |
| both ``proxy.clearance.cf_clearance`` and legacy ``proxy.cf_clearance`` paths). |
| Historical bug: earlier v2.0 releases silently defaulted cf_clearance to the |
| empty string when not passed explicitly, causing Cookies without a CF |
| clearance token and immediate 403 from Cloudflare on every grok.com call. |
| """ |
| tok = sso_token[4:] if sso_token.startswith("sso=") else sso_token |
| tok = _sanitize(tok, field="sso_token", strip_spaces=True) |
|
|
| cookie = f"sso={tok}; sso-rw={tok}" |
| profile = _resolve_profile(lease) |
| eff_cookies = _sanitize( |
| cf_cookies if cf_cookies is not None else profile.cf_cookies, field="cf_cookies" |
| ) |
| eff_clearance = _sanitize( |
| cf_clearance if cf_clearance is not None else profile.cf_clearance, |
| field="cf_clearance", |
| strip_spaces=True, |
| ) |
|
|
| if eff_clearance and eff_cookies: |
| if re.search(r"(?:^|;\s*)cf_clearance=", eff_cookies): |
| eff_cookies = re.sub( |
| r"(^|;\s*)cf_clearance=[^;]*", |
| r"\1cf_clearance=" + eff_clearance, |
| eff_cookies, |
| count=1, |
| ) |
| else: |
| eff_cookies = f"{eff_cookies.rstrip('; ')}; cf_clearance={eff_clearance}" |
| elif eff_clearance: |
| eff_cookies = f"cf_clearance={eff_clearance}" |
|
|
| if eff_cookies: |
| cookie += f"; {eff_cookies}" |
| return cookie |
|
|
|
|
| def build_http_headers( |
| cookie_token: str, |
| *, |
| content_type: Optional[str] = None, |
| origin: Optional[str] = None, |
| referer: Optional[str] = None, |
| lease: ProxyLease | None = None, |
| ) -> dict[str, str]: |
| """Build headers for a standard HTTP reverse-proxy request.""" |
| profile = _resolve_profile(lease) |
| raw_ua = profile.user_agent |
| ua = _sanitize(raw_ua, field="user_agent") |
| browser = profile.browser |
| org = _sanitize(origin or "https://grok.com", field="origin") |
| ref = _sanitize(referer or "https://grok.com/", field="referer") |
|
|
| ct = content_type or "application/json" |
| if ct == "application/json": |
| accept = "*/*" |
| fd = "empty" |
| elif ct in ("image/jpeg", "image/png", "video/mp4", "video/webm"): |
| accept = ( |
| "text/html,application/xhtml+xml,application/xml;q=0.9," |
| "image/avif,image/webp,image/apng,*/*;q=0.8" |
| ) |
| fd = "document" |
| else: |
| accept = "*/*" |
| fd = "empty" |
|
|
| org_host = urlparse(org).hostname |
| ref_host = urlparse(ref).hostname |
| site = "same-origin" if org_host and org_host == ref_host else "same-site" |
|
|
| headers: dict[str, str] = { |
| "Accept": accept, |
| "Accept-Encoding": "gzip, deflate, br, zstd", |
| "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", |
| "Baggage": ( |
| "sentry-environment=production," |
| "sentry-release=d6add6fb0460641fd482d767a335ef72b9b6abb8," |
| "sentry-public_key=b311e0f2690c81f25e2c4cf6d4f7ce1c" |
| ), |
| "Content-Type": ct, |
| "Origin": org, |
| "Priority": "u=1, i", |
| "Referer": ref, |
| "Sec-Fetch-Dest": fd, |
| "Sec-Fetch-Mode": "cors", |
| "Sec-Fetch-Site": site, |
| "User-Agent": ua, |
| "x-statsig-id": _statsig_id(), |
| "x-xai-request-id": str(uuid.uuid4()), |
| } |
| headers.update(_client_hints(browser, raw_ua)) |
| headers["Cookie"] = build_sso_cookie(cookie_token, lease=lease) |
|
|
| logger.debug("http headers built: header_count={}", len(headers)) |
| return headers |
|
|
|
|
| def build_ws_headers( |
| token: Optional[str] = None, |
| *, |
| origin: Optional[str] = None, |
| extra: Optional[dict[str, str]] = None, |
| lease: ProxyLease | None = None, |
| ) -> dict[str, str]: |
| """Build headers for a WebSocket upgrade request.""" |
| profile = _resolve_profile(lease) |
| raw_ua = profile.user_agent |
| ua = _sanitize(raw_ua, field="user_agent") |
| browser = profile.browser |
| org = _sanitize(origin or "https://grok.com", field="origin") |
|
|
| headers: dict[str, str] = { |
| "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", |
| "Cache-Control": "no-cache", |
| "Origin": org, |
| "Pragma": "no-cache", |
| "User-Agent": ua, |
| } |
| headers.update(_client_hints(browser, raw_ua)) |
| if token: |
| headers["Cookie"] = build_sso_cookie(token, lease=lease) |
| if extra: |
| headers.update(extra) |
| return headers |
|
|
|
|
| __all__ = ["build_http_headers", "build_sso_cookie", "build_ws_headers"] |
|
|