"""Runtime-aware reCAPTCHA v3 solver using Playwright browser automation. This module intentionally separates: 1. task normalization 2. runtime probing (standard v3 vs enterprise) 3. execution 4. result/artifact collection That structure mirrors mainstream captcha providers more closely than a single "get token if any string is returned" flow. """ from __future__ import annotations import asyncio import logging import re import time from dataclasses import dataclass, field from typing import Any from urllib.parse import unquote, urlparse from playwright.async_api import Browser, Page, Request, Response, Playwright, async_playwright from ..core.config import Config log = logging.getLogger(__name__) _STANDARD_RUNTIME = "standard" _ENTERPRISE_RUNTIME = "enterprise" _DEFAULT_API_DOMAIN = "www.google.com" _BROWSER_ACCEPT_LANGUAGE = "en-US,en;q=0.9" _TOKEN_RESPONSE_MARKERS = ( "/recaptcha/api2/reload", "/recaptcha/api2/clr", "/recaptcha/enterprise/reload", "/recaptcha/enterprise/clr", ) _TOKEN_RESPONSE_PATTERNS = ( re.compile(r'"rresp","([^"]+)"'), re.compile(r"'rresp','([^']+)'"), ) _WAIT_FOR_STANDARD_RUNTIME_JS = """ () => ( typeof window.grecaptcha !== 'undefined' && typeof window.grecaptcha.execute === 'function' ) """ _WAIT_FOR_ENTERPRISE_RUNTIME_JS = """ () => ( typeof window.grecaptcha !== 'undefined' && typeof window.grecaptcha.enterprise !== 'undefined' && typeof window.grecaptcha.enterprise.execute === 'function' ) """ _RUNTIME_INSPECTION_JS = """ () => { const scripts = [...document.scripts] .map((script) => script.src) .filter((src) => typeof src === 'string' && src.includes('/recaptcha/')); const cfg = window.___grecaptcha_cfg || {}; return { scripts, hasStandardExecute: typeof window.grecaptcha?.execute === 'function', hasEnterpriseExecute: typeof window.grecaptcha?.enterprise?.execute === 'function', enterpriseCfg: cfg.enterprise === true, }; } """ _ENSURE_SCRIPT_LOADED_JS = """ ([scriptUrl]) => new Promise((resolve, reject) => { const existing = [...document.scripts].find((script) => script.src === scriptUrl); if (existing) { resolve(scriptUrl); return; } const script = document.createElement('script'); script.src = scriptUrl; script.async = true; script.defer = true; script.onerror = () => reject(new Error(`Failed to load script: ${scriptUrl}`)); script.onload = () => resolve(scriptUrl); document.head.appendChild(script); }) """ _EXECUTE_STANDARD_JS = """ ([key, action]) => new Promise((resolve, reject) => { const gr = window.grecaptcha; if (!gr || typeof gr.execute !== 'function') { reject(new Error('grecaptcha.execute is not available')); return; } gr.ready(() => { const options = {}; if (action) { options.action = action; } gr.execute(key, options).then(resolve).catch(reject); }); }) """ _EXECUTE_ENTERPRISE_JS = """ ([key, action, sToken]) => new Promise((resolve, reject) => { const gr = window.grecaptcha?.enterprise; if (!gr || typeof gr.execute !== 'function') { reject(new Error('grecaptcha.enterprise.execute is not available')); return; } gr.ready(() => { const options = {}; if (action) { options.action = action; } if (sToken) { options.s = sToken; } gr.execute(key, options).then(resolve).catch(reject); }); }) """ _FALLBACK_FINGERPRINT_JS = """ () => { const uaData = navigator.userAgentData || null; let secChUa = ''; let secChUaMobile = ''; let secChUaPlatform = ''; if (uaData) { if (Array.isArray(uaData.brands) && uaData.brands.length > 0) { secChUa = uaData.brands .map((item) => `"${item.brand}";v="${item.version}"`) .join(', '); } secChUaMobile = uaData.mobile ? '?1' : '?0'; if (uaData.platform) { secChUaPlatform = `"${uaData.platform}"`; } } return { userAgent: navigator.userAgent || '', acceptLanguage: Array.isArray(navigator.languages) && navigator.languages.length > 0 ? navigator.languages.join(',') : (navigator.language || ''), secChUa, secChUaMobile, secChUaPlatform, }; } """ # Basic anti-detection init script _STEALTH_JS = """ Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']}); Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]}); window.chrome = {runtime: {}, loadTimes: () => {}, csi: () => {}}; """ @dataclass(frozen=True) class BrowserFingerprint: """Actual browser fingerprint observed during token generation.""" user_agent: str = "" accept_language: str = "" sec_ch_ua: str = "" sec_ch_ua_mobile: str = "" sec_ch_ua_platform: str = "" def to_solution_fields(self) -> dict[str, Any]: payload: dict[str, Any] = {} if self.user_agent: payload["userAgent"] = self.user_agent if self.accept_language: payload["acceptLanguage"] = self.accept_language if self.sec_ch_ua: payload["secChUa"] = self.sec_ch_ua if self.sec_ch_ua_mobile: payload["secChUaMobile"] = self.sec_ch_ua_mobile if self.sec_ch_ua_platform: payload["secChUaPlatform"] = self.sec_ch_ua_platform return payload @dataclass(frozen=True) class RecaptchaSessionArtifacts: """Session cookies that some mature vendors return for reCAPTCHA v3.""" recaptcha_ca_t: str | None = None recaptcha_ca_e: str | None = None def to_solution_fields(self) -> dict[str, Any]: payload: dict[str, Any] = {} if self.recaptcha_ca_t: payload["recaptcha-ca-t"] = self.recaptcha_ca_t if self.recaptcha_ca_e: payload["recaptcha-ca-e"] = self.recaptcha_ca_e return payload @dataclass(frozen=True) class RecaptchaTaskProfile: """Normalized internal task view.""" task_type: str website_url: str website_key: str page_action: str requested_runtime: str api_domain: str enterprise_s_token: str wants_session_artifacts: bool @dataclass class RecaptchaRuntimeEvidence: """Runtime signals collected before/after execute.""" runtime_kind: str detection_reason: str scripts: list[str] = field(default_factory=list) request_urls: list[str] = field(default_factory=list) response_statuses: dict[str, int] = field(default_factory=dict) has_standard_execute: bool = False has_enterprise_execute: bool = False enterprise_cfg: bool = False api_domain: str = _DEFAULT_API_DOMAIN def all_urls(self) -> list[str]: return [*self.scripts, *self.request_urls, *self.response_statuses.keys()] @dataclass(frozen=True) class RecaptchaV3SolveResult: """Final execution artifacts returned by `_solve_once()`.""" token: str runtime_kind: str fingerprint: BrowserFingerprint session_artifacts: RecaptchaSessionArtifacts create_time_ms: int class RecaptchaNetworkObserver: """Collect reCAPTCHA request evidence and the actual network fingerprint.""" def __init__(self) -> None: self.request_urls: list[str] = [] self.response_statuses: dict[str, int] = {} self._fingerprint = BrowserFingerprint() self._network_token = "" self._token_source_url = "" self._response_tasks: set[asyncio.Task[None]] = set() @staticmethod def _is_relevant_url(url: str) -> bool: return "/recaptcha/" in url @staticmethod def _is_token_response_url(url: str) -> bool: return any(marker in url for marker in _TOKEN_RESPONSE_MARKERS) @staticmethod def extract_token_from_body(body: str) -> str: for pattern in _TOKEN_RESPONSE_PATTERNS: match = pattern.search(body) if match: return match.group(1) return "" def bind(self, page: Page) -> None: page.on("request", self._capture_request) page.on("response", self._capture_response) def _capture_request(self, request: Request) -> None: url = request.url if not self._is_relevant_url(url): return self.request_urls.append(url) headers = { str(key).lower(): str(value) for key, value in (request.headers or {}).items() } self._fingerprint = BrowserFingerprint( user_agent=headers.get("user-agent", self._fingerprint.user_agent), accept_language=headers.get( "accept-language", self._fingerprint.accept_language, ), sec_ch_ua=headers.get("sec-ch-ua", self._fingerprint.sec_ch_ua), sec_ch_ua_mobile=headers.get( "sec-ch-ua-mobile", self._fingerprint.sec_ch_ua_mobile, ), sec_ch_ua_platform=headers.get( "sec-ch-ua-platform", self._fingerprint.sec_ch_ua_platform, ), ) def _capture_response(self, response: Response) -> None: url = response.url if not self._is_relevant_url(url): return self.response_statuses[url] = response.status if self._is_token_response_url(url): task = asyncio.create_task(self._capture_response_body(response)) self._response_tasks.add(task) task.add_done_callback(self._response_tasks.discard) def snapshot_fingerprint(self) -> BrowserFingerprint: return self._fingerprint @property def network_token(self) -> str: return self._network_token @property def token_source_url(self) -> str: return self._token_source_url async def flush(self) -> None: if not self._response_tasks: return await asyncio.gather(*tuple(self._response_tasks), return_exceptions=True) async def _capture_response_body(self, response: Response) -> None: try: body = await response.text() except Exception as exc: log.debug( "Failed to read reCAPTCHA response body from %s: %s", response.url, exc, ) return token = self.extract_token_from_body(body) if not token: return self._network_token = token self._token_source_url = response.url class RecaptchaV3Solver: """Solves reCAPTCHA v3 tasks via runtime-aware Playwright automation.""" def __init__(self, config: Config) -> None: self._config = config self._playwright: Playwright | None = None self._browser: Browser | None = None self._start_lock = asyncio.Lock() @staticmethod def _build_proxy_settings(raw_proxy_url: str) -> dict[str, str]: parsed = urlparse(raw_proxy_url.strip()) if not parsed.scheme or not parsed.hostname or not parsed.port: raise ValueError( "BROWSER_PROXY_URL must use a full URL such as socks5://user:pass@host:port" ) payload = {"server": f"{parsed.scheme}://{parsed.hostname}:{parsed.port}"} if parsed.username: payload["username"] = unquote(parsed.username) if parsed.password: payload["password"] = unquote(parsed.password) return payload @staticmethod def _extract_browser_major_version(browser_version: str) -> str: match = re.search(r"(\d+)", browser_version) return match.group(1) if match else "131" @staticmethod def _build_chromium_user_agent(browser_version: str) -> str: major = RecaptchaV3Solver._extract_browser_major_version(browser_version) return ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " f"Chrome/{major}.0.0.0 Safari/537.36" ) def _resolve_browser_version(self) -> str: if self._browser is None: return "" version_attr = getattr(self._browser, "version", "") if callable(version_attr): try: return str(version_attr()) except Exception: return "" return str(version_attr or "") def _build_browser_context_options(self) -> dict[str, Any]: return { "user_agent": self._build_chromium_user_agent( self._resolve_browser_version() ), "viewport": {"width": 1920, "height": 1080}, "locale": "en-US", "extra_http_headers": { "Accept-Language": _BROWSER_ACCEPT_LANGUAGE, }, } async def start(self) -> None: if self._browser is not None: return async with self._start_lock: if self._browser is not None: return playwright = await async_playwright().start() launch_options: dict[str, Any] = { "headless": self._config.browser_headless, "args": [ "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu", ], } if self._config.browser_proxy_url: launch_options["proxy"] = self._build_proxy_settings( self._config.browser_proxy_url ) try: browser = await playwright.chromium.launch(**launch_options) except Exception: await playwright.stop() raise self._playwright = playwright self._browser = browser log.info( "Playwright browser started lazily (headless=%s proxy=%s)", self._config.browser_headless, "configured" if self._config.browser_proxy_url else "none", ) async def stop(self) -> None: async with self._start_lock: if self._browser: await self._browser.close() self._browser = None if self._playwright: await self._playwright.stop() self._playwright = None log.info("Playwright browser stopped") async def solve(self, params: dict[str, Any]) -> dict[str, Any]: await self.start() profile = self._build_task_profile(params) last_error: Exception | None = None for attempt in range(self._config.captcha_retries): try: result = await self._solve_once(profile) return self._build_solution_payload(result) except Exception as exc: last_error = exc log.warning( "reCAPTCHA v3 attempt %d/%d failed for %s (%s): %s", attempt + 1, self._config.captcha_retries, profile.website_url, profile.task_type, exc, ) if attempt < self._config.captcha_retries - 1: await asyncio.sleep(2) raise RuntimeError( f"reCAPTCHA v3 failed after {self._config.captcha_retries} attempts: {last_error}" ) @staticmethod def _build_task_profile(params: dict[str, Any]) -> RecaptchaTaskProfile: task_type = str(params.get("type") or "RecaptchaV3TaskProxyless") website_url = str(params["websiteURL"]) website_key = str(params["websiteKey"]) page_action = str(params.get("pageAction") or "").strip() requested_runtime = ( _ENTERPRISE_RUNTIME if "enterprise" in task_type.lower() else _STANDARD_RUNTIME ) enterprise_payload = params.get("enterprisePayload") or {} enterprise_s_token = "" if isinstance(enterprise_payload, dict): enterprise_s_token = str(enterprise_payload.get("s") or "").strip() return RecaptchaTaskProfile( task_type=task_type, website_url=website_url, website_key=website_key, page_action=page_action, requested_runtime=requested_runtime, api_domain=RecaptchaV3Solver._normalize_api_domain( str(params.get("apiDomain") or "") ), enterprise_s_token=enterprise_s_token, wants_session_artifacts=bool(params.get("isSession")), ) @staticmethod def _normalize_api_domain(raw_domain: str) -> str: value = raw_domain.strip() if not value: return _DEFAULT_API_DOMAIN parsed = urlparse(value if "://" in value else f"https://{value}") return parsed.netloc or parsed.path or _DEFAULT_API_DOMAIN @staticmethod def _build_loader_url( runtime_kind: str, api_domain: str, website_key: str, ) -> str: domain = RecaptchaV3Solver._normalize_api_domain(api_domain) if runtime_kind == _ENTERPRISE_RUNTIME: return f"https://{domain}/recaptcha/enterprise.js?render={website_key}" return f"https://{domain}/recaptcha/api.js?render={website_key}" @staticmethod def _determine_runtime_kind( *, requested_runtime: str, scripts: list[str], request_urls: list[str], has_standard_execute: bool, has_enterprise_execute: bool, enterprise_cfg: bool, ) -> tuple[str, str]: evidence_urls = [*scripts, *request_urls] if has_enterprise_execute or enterprise_cfg: return _ENTERPRISE_RUNTIME, "page exposes grecaptcha.enterprise.execute" if any("/recaptcha/enterprise" in url for url in evidence_urls): return _ENTERPRISE_RUNTIME, "enterprise script or network evidence detected" if has_standard_execute: return _STANDARD_RUNTIME, "page exposes grecaptcha.execute" if any( "/recaptcha/api.js" in url or "/recaptcha/api2/" in url for url in evidence_urls ): return _STANDARD_RUNTIME, "standard script or network evidence detected" return requested_runtime, "no runtime evidence; falling back to task hint" @staticmethod def _has_runtime_network_evidence( runtime_kind: str, urls: list[str], ) -> bool: if runtime_kind == _ENTERPRISE_RUNTIME: return any("/recaptcha/enterprise" in url for url in urls) return any( "/recaptcha/api.js" in url or "/recaptcha/api2/" in url for url in urls ) @staticmethod def _extract_api_domain(urls: list[str], fallback: str) -> str: for url in urls: if "/recaptcha/" not in url: continue parsed = urlparse(url) if parsed.netloc: return parsed.netloc return RecaptchaV3Solver._normalize_api_domain(fallback) @staticmethod def _build_solution_payload(result: RecaptchaV3SolveResult) -> dict[str, Any]: payload: dict[str, Any] = { "gRecaptchaResponse": result.token, "createTime": result.create_time_ms, "runtimeKind": result.runtime_kind, } payload.update(result.fingerprint.to_solution_fields()) payload.update(result.session_artifacts.to_solution_fields()) return payload @staticmethod def _select_best_token( execute_token: str, network_token: str, network_token_source: str, ) -> str: execute_value = execute_token.strip() observed_value = network_token.strip() if not observed_value: return execute_value if execute_value and execute_value != observed_value: log.warning( "reCAPTCHA execute token differed from network token; using network token from %s", network_token_source or "unknown-source", ) return observed_value async def _solve_once(self, profile: RecaptchaTaskProfile) -> RecaptchaV3SolveResult: assert self._browser is not None context = await self._browser.new_context( **self._build_browser_context_options() ) page = await context.new_page() observer = RecaptchaNetworkObserver() observer.bind(page) await page.add_init_script(_STEALTH_JS) try: timeout_ms = self._config.browser_timeout * 1000 await page.goto( profile.website_url, wait_until="networkidle", timeout=timeout_ms, ) await self._simulate_human_activity(page) initial_runtime = await self._probe_runtime(page, profile, observer) if initial_runtime.runtime_kind != profile.requested_runtime: log.info( "Runtime probe selected %s for %s (requested=%s, reason=%s)", initial_runtime.runtime_kind, profile.website_url, profile.requested_runtime, initial_runtime.detection_reason, ) execute_token = await self._execute_for_runtime( page, profile, initial_runtime, ) await observer.flush() token = self._select_best_token( execute_token=execute_token, network_token=observer.network_token, network_token_source=observer.token_source_url, ) if not isinstance(token, str) or len(token) < 20: raise RuntimeError(f"Invalid token received: {token!r}") await asyncio.sleep(0.35) await observer.flush() final_runtime = await self._probe_runtime(page, profile, observer) if not self._has_runtime_network_evidence( final_runtime.runtime_kind, final_runtime.all_urls(), ): raise RuntimeError( f"No {final_runtime.runtime_kind} runtime evidence observed after execute" ) fingerprint = await self._capture_fingerprint(page, observer) session_artifacts = ( await self._capture_session_artifacts(context) if profile.wants_session_artifacts else RecaptchaSessionArtifacts() ) result = RecaptchaV3SolveResult( token=token, runtime_kind=final_runtime.runtime_kind, fingerprint=fingerprint, session_artifacts=session_artifacts, create_time_ms=int(time.time() * 1000), ) log.info( "Got reCAPTCHA v3 token for %s (runtime=%s len=%d ua=%s ca_t=%s ca_e=%s)", profile.website_url, result.runtime_kind, len(result.token), "yes" if result.fingerprint.user_agent else "no", "yes" if result.session_artifacts.recaptcha_ca_t else "no", "yes" if result.session_artifacts.recaptcha_ca_e else "no", ) return result finally: await context.close() async def _simulate_human_activity(self, page: Page) -> None: await page.mouse.move(400, 300) await asyncio.sleep(1) await page.mouse.move(600, 400) await asyncio.sleep(0.5) async def _probe_runtime( self, page: Page, profile: RecaptchaTaskProfile, observer: RecaptchaNetworkObserver, ) -> RecaptchaRuntimeEvidence: raw = await page.evaluate(_RUNTIME_INSPECTION_JS) scripts = list(raw.get("scripts") or []) request_urls = list(observer.request_urls) runtime_kind, detection_reason = self._determine_runtime_kind( requested_runtime=profile.requested_runtime, scripts=scripts, request_urls=request_urls, has_standard_execute=bool(raw.get("hasStandardExecute")), has_enterprise_execute=bool(raw.get("hasEnterpriseExecute")), enterprise_cfg=bool(raw.get("enterpriseCfg")), ) return RecaptchaRuntimeEvidence( runtime_kind=runtime_kind, detection_reason=detection_reason, scripts=scripts, request_urls=request_urls, response_statuses=dict(observer.response_statuses), has_standard_execute=bool(raw.get("hasStandardExecute")), has_enterprise_execute=bool(raw.get("hasEnterpriseExecute")), enterprise_cfg=bool(raw.get("enterpriseCfg")), api_domain=self._extract_api_domain( [*scripts, *request_urls], fallback=profile.api_domain, ), ) async def _execute_for_runtime( self, page: Page, profile: RecaptchaTaskProfile, runtime: RecaptchaRuntimeEvidence, ) -> str: loader_domain = runtime.api_domain or profile.api_domain if runtime.runtime_kind == _ENTERPRISE_RUNTIME: await self._ensure_runtime_loaded( page=page, ready_expression=_WAIT_FOR_ENTERPRISE_RUNTIME_JS, loader_url=self._build_loader_url( _ENTERPRISE_RUNTIME, loader_domain, profile.website_key, ), ) return await page.evaluate( _EXECUTE_ENTERPRISE_JS, [ profile.website_key, profile.page_action, profile.enterprise_s_token, ], ) await self._ensure_runtime_loaded( page=page, ready_expression=_WAIT_FOR_STANDARD_RUNTIME_JS, loader_url=self._build_loader_url( _STANDARD_RUNTIME, loader_domain, profile.website_key, ), ) return await page.evaluate( _EXECUTE_STANDARD_JS, [profile.website_key, profile.page_action], ) async def _ensure_runtime_loaded( self, *, page: Page, ready_expression: str, loader_url: str, ) -> None: try: await page.wait_for_function(ready_expression, timeout=5_000) return except Exception: log.info("reCAPTCHA runtime not ready, injecting %s", loader_url) await page.evaluate(_ENSURE_SCRIPT_LOADED_JS, [loader_url]) await page.wait_for_function(ready_expression, timeout=10_000) async def _capture_fingerprint( self, page: Page, observer: RecaptchaNetworkObserver, ) -> BrowserFingerprint: network_fp = observer.snapshot_fingerprint() if network_fp.user_agent or network_fp.sec_ch_ua or network_fp.accept_language: return network_fp fallback = await page.evaluate(_FALLBACK_FINGERPRINT_JS) return BrowserFingerprint( user_agent=str(fallback.get("userAgent") or ""), accept_language=str(fallback.get("acceptLanguage") or ""), sec_ch_ua=str(fallback.get("secChUa") or ""), sec_ch_ua_mobile=str(fallback.get("secChUaMobile") or ""), sec_ch_ua_platform=str(fallback.get("secChUaPlatform") or ""), ) async def _capture_session_artifacts( self, context: Any, ) -> RecaptchaSessionArtifacts: cookies = await context.cookies() recaptcha_ca_t = None recaptcha_ca_e = None for cookie in cookies: name = str(cookie.get("name") or "") value = str(cookie.get("value") or "") if not value: continue if name == "recaptcha-ca-t": recaptcha_ca_t = value elif name == "recaptcha-ca-e": recaptcha_ca_e = value return RecaptchaSessionArtifacts( recaptcha_ca_t=recaptcha_ca_t, recaptcha_ca_e=recaptcha_ca_e, )