"""HTTP/WebSocket header builders for reverse-proxy requests. All values are sanitized to ASCII-safe Latin-1 before use. """ import base64 import random import re import string import uuid from typing import Optional from urllib.parse import urlparse from app.platform.logging.logger import logger from app.platform.config.snapshot import get_config from app.control.proxy.models import ProxyLease from app.dataplane.proxy.adapters.profile import ProxyProfile, resolve_proxy_profile # --------------------------------------------------------------------------- # Unicode → ASCII normalisation map # --------------------------------------------------------------------------- _CHAR_MAP = str.maketrans( { "\u2010": "-", "\u2011": "-", "\u2012": "-", "\u2013": "-", "\u2014": "-", "\u2212": "-", "\u2018": "'", "\u2019": "'", "\u201c": '"', "\u201d": '"', "\u00a0": " ", "\u2007": " ", "\u202f": " ", "\u200b": "", "\u200c": "", "\u200d": "", "\ufeff": "", } ) def _sanitize(value: Optional[str], *, field: str, strip_spaces: bool = False) -> str: raw = "" if value is None else str(value) out = raw.translate(_CHAR_MAP) out = re.sub(r"\s+", "", out) if strip_spaces else out.strip() out = out.encode("latin-1", errors="ignore").decode("latin-1") if out != raw: logger.debug( "header sanitized: field={} original_len={} sanitized_len={}", field, len(raw), len(out), ) return out # --------------------------------------------------------------------------- # Statsig / request-id generation # --------------------------------------------------------------------------- def _statsig_id() -> str: cfg = get_config() if cfg.get_bool("features.dynamic_statsig", False): if random.choice((True, False)): rand = "".join(random.choices(string.ascii_lowercase + string.digits, k=5)) msg = f"e:TypeError: Cannot read properties of null (reading 'children['{rand}']')" else: rand = "".join(random.choices(string.ascii_lowercase, k=10)) msg = f"e:TypeError: Cannot read properties of undefined (reading '{rand}')" return base64.b64encode(msg.encode()).decode() return ( "ZTpUeXBlRXJyb3I6IENhbm5vdCByZWFkIHByb3BlcnRpZXMgb2YgdW5kZWZpbmVkIChyZWFkaW5nICdjaGls" "ZE5vZGVzJyk=" ) # --------------------------------------------------------------------------- # Client-hints helpers # --------------------------------------------------------------------------- def _major_version(browser: Optional[str], ua: Optional[str]) -> Optional[str]: for src in (browser or "", ua or ""): m = re.search(r"(\d{2,3})", src) if m: return m.group(1) return None def _platform(ua: str) -> Optional[str]: u = ua.lower() if "windows" in u: return "Windows" if "mac os x" in u or "macintosh" in u: return "macOS" if "android" in u: return "Android" if "iphone" in u or "ipad" in u: return "iOS" if "linux" in u: return "Linux" return None def _arch(ua: str) -> Optional[str]: u = ua.lower() if "aarch64" in u or "arm" in u: return "arm" if "x86_64" in u or "x64" in u or "win64" in u or "intel" in u: return "x86" return None def _client_hints(browser: Optional[str], ua: Optional[str]) -> dict[str, str]: b = (browser or "").lower() u = (ua or "").lower() is_chromium = any(k in b for k in ("chrome", "chromium", "edge", "brave")) or any( k in u for k in ("chrome", "chromium", "edg") ) if not is_chromium or "firefox" in u or ("safari" in u and "chrome" not in u): return {} ver = _major_version(browser, ua) if not ver: return {} if "edge" in b or "edg" in u: brand = "Microsoft Edge" elif "brave" in b: brand = "Brave" elif "chromium" in b: brand = "Chromium" else: brand = "Google Chrome" sec_ch_ua = f'"{brand}";v="{ver}", "Chromium";v="{ver}", "Not(A:Brand";v="24"' plat = _platform(ua or "") arch = _arch(ua or "") mobile = "?1" if ("mobile" in u or plat in ("Android", "iOS")) else "?0" hints: dict[str, str] = { "Sec-Ch-Ua": sec_ch_ua, "Sec-Ch-Ua-Mobile": mobile, "Sec-Ch-Ua-Model": "", } if plat: hints["Sec-Ch-Ua-Platform"] = f'"{plat}"' if arch: hints["Sec-Ch-Ua-Arch"] = arch hints["Sec-Ch-Ua-Bitness"] = "64" return hints # --------------------------------------------------------------------------- # Lease resolution # --------------------------------------------------------------------------- def _resolve_profile(lease: ProxyLease | None) -> ProxyProfile: return resolve_proxy_profile(lease) # --------------------------------------------------------------------------- # Public builders # --------------------------------------------------------------------------- def build_sso_cookie( sso_token: str, *, lease: ProxyLease | None = None, cf_cookies: str | None = None, cf_clearance: str | None = None, ) -> str: """Build the Cookie header value for an SSO-authenticated request. When *cf_clearance* is not provided, the value is resolved from the lease's cf_cookies profile or falls back to the config's cf_clearance (supporting both ``proxy.clearance.cf_clearance`` and legacy ``proxy.cf_clearance`` paths). Historical bug: earlier v2.0 releases silently defaulted cf_clearance to the empty string when not passed explicitly, causing Cookies without a CF clearance token and immediate 403 from Cloudflare on every grok.com call. """ tok = sso_token[4:] if sso_token.startswith("sso=") else sso_token tok = _sanitize(tok, field="sso_token", strip_spaces=True) cookie = f"sso={tok}; sso-rw={tok}" profile = _resolve_profile(lease) eff_cookies = _sanitize( cf_cookies if cf_cookies is not None else profile.cf_cookies, field="cf_cookies" ) eff_clearance = _sanitize( cf_clearance if cf_clearance is not None else profile.cf_clearance, field="cf_clearance", strip_spaces=True, ) if eff_clearance and eff_cookies: if re.search(r"(?:^|;\s*)cf_clearance=", eff_cookies): eff_cookies = re.sub( r"(^|;\s*)cf_clearance=[^;]*", r"\1cf_clearance=" + eff_clearance, eff_cookies, count=1, ) else: eff_cookies = f"{eff_cookies.rstrip('; ')}; cf_clearance={eff_clearance}" elif eff_clearance: eff_cookies = f"cf_clearance={eff_clearance}" if eff_cookies: cookie += f"; {eff_cookies}" return cookie def build_http_headers( cookie_token: str, *, content_type: Optional[str] = None, origin: Optional[str] = None, referer: Optional[str] = None, lease: ProxyLease | None = None, ) -> dict[str, str]: """Build headers for a standard HTTP reverse-proxy request.""" profile = _resolve_profile(lease) raw_ua = profile.user_agent ua = _sanitize(raw_ua, field="user_agent") browser = profile.browser org = _sanitize(origin or "https://grok.com", field="origin") ref = _sanitize(referer or "https://grok.com/", field="referer") ct = content_type or "application/json" if ct == "application/json": accept = "*/*" fd = "empty" elif ct in ("image/jpeg", "image/png", "video/mp4", "video/webm"): accept = ( "text/html,application/xhtml+xml,application/xml;q=0.9," "image/avif,image/webp,image/apng,*/*;q=0.8" ) fd = "document" else: accept = "*/*" fd = "empty" org_host = urlparse(org).hostname ref_host = urlparse(ref).hostname site = "same-origin" if org_host and org_host == ref_host else "same-site" headers: dict[str, str] = { "Accept": accept, "Accept-Encoding": "gzip, deflate, br, zstd", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Baggage": ( "sentry-environment=production," "sentry-release=d6add6fb0460641fd482d767a335ef72b9b6abb8," "sentry-public_key=b311e0f2690c81f25e2c4cf6d4f7ce1c" ), "Content-Type": ct, "Origin": org, "Priority": "u=1, i", "Referer": ref, "Sec-Fetch-Dest": fd, "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": site, "User-Agent": ua, "x-statsig-id": _statsig_id(), "x-xai-request-id": str(uuid.uuid4()), } headers.update(_client_hints(browser, raw_ua)) headers["Cookie"] = build_sso_cookie(cookie_token, lease=lease) logger.debug("http headers built: header_count={}", len(headers)) return headers def build_ws_headers( token: Optional[str] = None, *, origin: Optional[str] = None, extra: Optional[dict[str, str]] = None, lease: ProxyLease | None = None, ) -> dict[str, str]: """Build headers for a WebSocket upgrade request.""" profile = _resolve_profile(lease) raw_ua = profile.user_agent ua = _sanitize(raw_ua, field="user_agent") browser = profile.browser org = _sanitize(origin or "https://grok.com", field="origin") headers: dict[str, str] = { "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Cache-Control": "no-cache", "Origin": org, "Pragma": "no-cache", "User-Agent": ua, } headers.update(_client_hints(browser, raw_ua)) if token: headers["Cookie"] = build_sso_cookie(token, lease=lease) if extra: headers.update(extra) return headers __all__ = ["build_http_headers", "build_sso_cookie", "build_ws_headers"]