| """Runtime-aware reCAPTCHA v3 solver using Playwright browser automation. |
| |
| This module intentionally separates: |
| |
| 1. task normalization |
| 2. runtime probing (standard v3 vs enterprise) |
| 3. execution |
| 4. result/artifact collection |
| |
| That structure mirrors mainstream captcha providers more closely than a |
| single "get token if any string is returned" flow. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import logging |
| import re |
| import time |
| from dataclasses import dataclass, field |
| from typing import Any |
| from urllib.parse import unquote, urlparse |
|
|
| from playwright.async_api import Browser, Page, Request, Response, Playwright, async_playwright |
|
|
| from ..core.config import Config |
|
|
| log = logging.getLogger(__name__) |
|
|
| _STANDARD_RUNTIME = "standard" |
| _ENTERPRISE_RUNTIME = "enterprise" |
| _DEFAULT_API_DOMAIN = "www.google.com" |
| _BROWSER_ACCEPT_LANGUAGE = "en-US,en;q=0.9" |
| _TOKEN_RESPONSE_MARKERS = ( |
| "/recaptcha/api2/reload", |
| "/recaptcha/api2/clr", |
| "/recaptcha/enterprise/reload", |
| "/recaptcha/enterprise/clr", |
| ) |
| _TOKEN_RESPONSE_PATTERNS = ( |
| re.compile(r'"rresp","([^"]+)"'), |
| re.compile(r"'rresp','([^']+)'"), |
| ) |
|
|
| _WAIT_FOR_STANDARD_RUNTIME_JS = """ |
| () => ( |
| typeof window.grecaptcha !== 'undefined' |
| && typeof window.grecaptcha.execute === 'function' |
| ) |
| """ |
|
|
| _WAIT_FOR_ENTERPRISE_RUNTIME_JS = """ |
| () => ( |
| typeof window.grecaptcha !== 'undefined' |
| && typeof window.grecaptcha.enterprise !== 'undefined' |
| && typeof window.grecaptcha.enterprise.execute === 'function' |
| ) |
| """ |
|
|
| _RUNTIME_INSPECTION_JS = """ |
| () => { |
| const scripts = [...document.scripts] |
| .map((script) => script.src) |
| .filter((src) => typeof src === 'string' && src.includes('/recaptcha/')); |
| const cfg = window.___grecaptcha_cfg || {}; |
| return { |
| scripts, |
| hasStandardExecute: typeof window.grecaptcha?.execute === 'function', |
| hasEnterpriseExecute: typeof window.grecaptcha?.enterprise?.execute === 'function', |
| enterpriseCfg: cfg.enterprise === true, |
| }; |
| } |
| """ |
|
|
| _ENSURE_SCRIPT_LOADED_JS = """ |
| ([scriptUrl]) => new Promise((resolve, reject) => { |
| const existing = [...document.scripts].find((script) => script.src === scriptUrl); |
| if (existing) { |
| resolve(scriptUrl); |
| return; |
| } |
| const script = document.createElement('script'); |
| script.src = scriptUrl; |
| script.async = true; |
| script.defer = true; |
| script.onerror = () => reject(new Error(`Failed to load script: ${scriptUrl}`)); |
| script.onload = () => resolve(scriptUrl); |
| document.head.appendChild(script); |
| }) |
| """ |
|
|
| _EXECUTE_STANDARD_JS = """ |
| ([key, action]) => new Promise((resolve, reject) => { |
| const gr = window.grecaptcha; |
| if (!gr || typeof gr.execute !== 'function') { |
| reject(new Error('grecaptcha.execute is not available')); |
| return; |
| } |
| gr.ready(() => { |
| const options = {}; |
| if (action) { |
| options.action = action; |
| } |
| gr.execute(key, options).then(resolve).catch(reject); |
| }); |
| }) |
| """ |
|
|
| _EXECUTE_ENTERPRISE_JS = """ |
| ([key, action, sToken]) => new Promise((resolve, reject) => { |
| const gr = window.grecaptcha?.enterprise; |
| if (!gr || typeof gr.execute !== 'function') { |
| reject(new Error('grecaptcha.enterprise.execute is not available')); |
| return; |
| } |
| gr.ready(() => { |
| const options = {}; |
| if (action) { |
| options.action = action; |
| } |
| if (sToken) { |
| options.s = sToken; |
| } |
| gr.execute(key, options).then(resolve).catch(reject); |
| }); |
| }) |
| """ |
|
|
| _FALLBACK_FINGERPRINT_JS = """ |
| () => { |
| const uaData = navigator.userAgentData || null; |
| let secChUa = ''; |
| let secChUaMobile = ''; |
| let secChUaPlatform = ''; |
| |
| if (uaData) { |
| if (Array.isArray(uaData.brands) && uaData.brands.length > 0) { |
| secChUa = uaData.brands |
| .map((item) => `"${item.brand}";v="${item.version}"`) |
| .join(', '); |
| } |
| secChUaMobile = uaData.mobile ? '?1' : '?0'; |
| if (uaData.platform) { |
| secChUaPlatform = `"${uaData.platform}"`; |
| } |
| } |
| |
| return { |
| userAgent: navigator.userAgent || '', |
| acceptLanguage: Array.isArray(navigator.languages) && navigator.languages.length > 0 |
| ? navigator.languages.join(',') |
| : (navigator.language || ''), |
| secChUa, |
| secChUaMobile, |
| secChUaPlatform, |
| }; |
| } |
| """ |
|
|
| |
| _STEALTH_JS = """ |
| Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); |
| Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']}); |
| Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]}); |
| window.chrome = {runtime: {}, loadTimes: () => {}, csi: () => {}}; |
| """ |
|
|
|
|
| @dataclass(frozen=True) |
| class BrowserFingerprint: |
| """Actual browser fingerprint observed during token generation.""" |
|
|
| user_agent: str = "" |
| accept_language: str = "" |
| sec_ch_ua: str = "" |
| sec_ch_ua_mobile: str = "" |
| sec_ch_ua_platform: str = "" |
|
|
| def to_solution_fields(self) -> dict[str, Any]: |
| payload: dict[str, Any] = {} |
| if self.user_agent: |
| payload["userAgent"] = self.user_agent |
| if self.accept_language: |
| payload["acceptLanguage"] = self.accept_language |
| if self.sec_ch_ua: |
| payload["secChUa"] = self.sec_ch_ua |
| if self.sec_ch_ua_mobile: |
| payload["secChUaMobile"] = self.sec_ch_ua_mobile |
| if self.sec_ch_ua_platform: |
| payload["secChUaPlatform"] = self.sec_ch_ua_platform |
| return payload |
|
|
|
|
| @dataclass(frozen=True) |
| class RecaptchaSessionArtifacts: |
| """Session cookies that some mature vendors return for reCAPTCHA v3.""" |
|
|
| recaptcha_ca_t: str | None = None |
| recaptcha_ca_e: str | None = None |
|
|
| def to_solution_fields(self) -> dict[str, Any]: |
| payload: dict[str, Any] = {} |
| if self.recaptcha_ca_t: |
| payload["recaptcha-ca-t"] = self.recaptcha_ca_t |
| if self.recaptcha_ca_e: |
| payload["recaptcha-ca-e"] = self.recaptcha_ca_e |
| return payload |
|
|
|
|
| @dataclass(frozen=True) |
| class RecaptchaTaskProfile: |
| """Normalized internal task view.""" |
|
|
| task_type: str |
| website_url: str |
| website_key: str |
| page_action: str |
| requested_runtime: str |
| api_domain: str |
| enterprise_s_token: str |
| wants_session_artifacts: bool |
|
|
|
|
| @dataclass |
| class RecaptchaRuntimeEvidence: |
| """Runtime signals collected before/after execute.""" |
|
|
| runtime_kind: str |
| detection_reason: str |
| scripts: list[str] = field(default_factory=list) |
| request_urls: list[str] = field(default_factory=list) |
| response_statuses: dict[str, int] = field(default_factory=dict) |
| has_standard_execute: bool = False |
| has_enterprise_execute: bool = False |
| enterprise_cfg: bool = False |
| api_domain: str = _DEFAULT_API_DOMAIN |
|
|
| def all_urls(self) -> list[str]: |
| return [*self.scripts, *self.request_urls, *self.response_statuses.keys()] |
|
|
|
|
| @dataclass(frozen=True) |
| class RecaptchaV3SolveResult: |
| """Final execution artifacts returned by `_solve_once()`.""" |
|
|
| token: str |
| runtime_kind: str |
| fingerprint: BrowserFingerprint |
| session_artifacts: RecaptchaSessionArtifacts |
| create_time_ms: int |
|
|
|
|
| class RecaptchaNetworkObserver: |
| """Collect reCAPTCHA request evidence and the actual network fingerprint.""" |
|
|
| def __init__(self) -> None: |
| self.request_urls: list[str] = [] |
| self.response_statuses: dict[str, int] = {} |
| self._fingerprint = BrowserFingerprint() |
| self._network_token = "" |
| self._token_source_url = "" |
| self._response_tasks: set[asyncio.Task[None]] = set() |
|
|
| @staticmethod |
| def _is_relevant_url(url: str) -> bool: |
| return "/recaptcha/" in url |
|
|
| @staticmethod |
| def _is_token_response_url(url: str) -> bool: |
| return any(marker in url for marker in _TOKEN_RESPONSE_MARKERS) |
|
|
| @staticmethod |
| def extract_token_from_body(body: str) -> str: |
| for pattern in _TOKEN_RESPONSE_PATTERNS: |
| match = pattern.search(body) |
| if match: |
| return match.group(1) |
| return "" |
|
|
| def bind(self, page: Page) -> None: |
| page.on("request", self._capture_request) |
| page.on("response", self._capture_response) |
|
|
| def _capture_request(self, request: Request) -> None: |
| url = request.url |
| if not self._is_relevant_url(url): |
| return |
| self.request_urls.append(url) |
| headers = { |
| str(key).lower(): str(value) |
| for key, value in (request.headers or {}).items() |
| } |
| self._fingerprint = BrowserFingerprint( |
| user_agent=headers.get("user-agent", self._fingerprint.user_agent), |
| accept_language=headers.get( |
| "accept-language", |
| self._fingerprint.accept_language, |
| ), |
| sec_ch_ua=headers.get("sec-ch-ua", self._fingerprint.sec_ch_ua), |
| sec_ch_ua_mobile=headers.get( |
| "sec-ch-ua-mobile", |
| self._fingerprint.sec_ch_ua_mobile, |
| ), |
| sec_ch_ua_platform=headers.get( |
| "sec-ch-ua-platform", |
| self._fingerprint.sec_ch_ua_platform, |
| ), |
| ) |
|
|
| def _capture_response(self, response: Response) -> None: |
| url = response.url |
| if not self._is_relevant_url(url): |
| return |
| self.response_statuses[url] = response.status |
| if self._is_token_response_url(url): |
| task = asyncio.create_task(self._capture_response_body(response)) |
| self._response_tasks.add(task) |
| task.add_done_callback(self._response_tasks.discard) |
|
|
| def snapshot_fingerprint(self) -> BrowserFingerprint: |
| return self._fingerprint |
|
|
| @property |
| def network_token(self) -> str: |
| return self._network_token |
|
|
| @property |
| def token_source_url(self) -> str: |
| return self._token_source_url |
|
|
| async def flush(self) -> None: |
| if not self._response_tasks: |
| return |
| await asyncio.gather(*tuple(self._response_tasks), return_exceptions=True) |
|
|
| async def _capture_response_body(self, response: Response) -> None: |
| try: |
| body = await response.text() |
| except Exception as exc: |
| log.debug( |
| "Failed to read reCAPTCHA response body from %s: %s", |
| response.url, |
| exc, |
| ) |
| return |
|
|
| token = self.extract_token_from_body(body) |
| if not token: |
| return |
|
|
| self._network_token = token |
| self._token_source_url = response.url |
|
|
|
|
| class RecaptchaV3Solver: |
| """Solves reCAPTCHA v3 tasks via runtime-aware Playwright automation.""" |
|
|
| def __init__(self, config: Config) -> None: |
| self._config = config |
| self._playwright: Playwright | None = None |
| self._browser: Browser | None = None |
| self._start_lock = asyncio.Lock() |
|
|
| @staticmethod |
| def _build_proxy_settings(raw_proxy_url: str) -> dict[str, str]: |
| parsed = urlparse(raw_proxy_url.strip()) |
| if not parsed.scheme or not parsed.hostname or not parsed.port: |
| raise ValueError( |
| "BROWSER_PROXY_URL must use a full URL such as socks5://user:pass@host:port" |
| ) |
|
|
| payload = {"server": f"{parsed.scheme}://{parsed.hostname}:{parsed.port}"} |
| if parsed.username: |
| payload["username"] = unquote(parsed.username) |
| if parsed.password: |
| payload["password"] = unquote(parsed.password) |
| return payload |
|
|
| @staticmethod |
| def _extract_browser_major_version(browser_version: str) -> str: |
| match = re.search(r"(\d+)", browser_version) |
| return match.group(1) if match else "131" |
|
|
| @staticmethod |
| def _build_chromium_user_agent(browser_version: str) -> str: |
| major = RecaptchaV3Solver._extract_browser_major_version(browser_version) |
| return ( |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| f"Chrome/{major}.0.0.0 Safari/537.36" |
| ) |
|
|
| def _resolve_browser_version(self) -> str: |
| if self._browser is None: |
| return "" |
|
|
| version_attr = getattr(self._browser, "version", "") |
| if callable(version_attr): |
| try: |
| return str(version_attr()) |
| except Exception: |
| return "" |
| return str(version_attr or "") |
|
|
| def _build_browser_context_options(self) -> dict[str, Any]: |
| return { |
| "user_agent": self._build_chromium_user_agent( |
| self._resolve_browser_version() |
| ), |
| "viewport": {"width": 1920, "height": 1080}, |
| "locale": "en-US", |
| "extra_http_headers": { |
| "Accept-Language": _BROWSER_ACCEPT_LANGUAGE, |
| }, |
| } |
|
|
| async def start(self) -> None: |
| if self._browser is not None: |
| return |
|
|
| async with self._start_lock: |
| if self._browser is not None: |
| return |
|
|
| playwright = await async_playwright().start() |
| launch_options: dict[str, Any] = { |
| "headless": self._config.browser_headless, |
| "args": [ |
| "--disable-blink-features=AutomationControlled", |
| "--no-sandbox", |
| "--disable-dev-shm-usage", |
| "--disable-gpu", |
| ], |
| } |
| if self._config.browser_proxy_url: |
| launch_options["proxy"] = self._build_proxy_settings( |
| self._config.browser_proxy_url |
| ) |
|
|
| try: |
| browser = await playwright.chromium.launch(**launch_options) |
| except Exception: |
| await playwright.stop() |
| raise |
|
|
| self._playwright = playwright |
| self._browser = browser |
| log.info( |
| "Playwright browser started lazily (headless=%s proxy=%s)", |
| self._config.browser_headless, |
| "configured" if self._config.browser_proxy_url else "none", |
| ) |
|
|
| async def stop(self) -> None: |
| async with self._start_lock: |
| if self._browser: |
| await self._browser.close() |
| self._browser = None |
| if self._playwright: |
| await self._playwright.stop() |
| self._playwright = None |
| log.info("Playwright browser stopped") |
|
|
| async def solve(self, params: dict[str, Any]) -> dict[str, Any]: |
| await self.start() |
| profile = self._build_task_profile(params) |
| last_error: Exception | None = None |
| for attempt in range(self._config.captcha_retries): |
| try: |
| result = await self._solve_once(profile) |
| return self._build_solution_payload(result) |
| except Exception as exc: |
| last_error = exc |
| log.warning( |
| "reCAPTCHA v3 attempt %d/%d failed for %s (%s): %s", |
| attempt + 1, |
| self._config.captcha_retries, |
| profile.website_url, |
| profile.task_type, |
| exc, |
| ) |
| if attempt < self._config.captcha_retries - 1: |
| await asyncio.sleep(2) |
|
|
| raise RuntimeError( |
| f"reCAPTCHA v3 failed after {self._config.captcha_retries} attempts: {last_error}" |
| ) |
|
|
| @staticmethod |
| def _build_task_profile(params: dict[str, Any]) -> RecaptchaTaskProfile: |
| task_type = str(params.get("type") or "RecaptchaV3TaskProxyless") |
| website_url = str(params["websiteURL"]) |
| website_key = str(params["websiteKey"]) |
| page_action = str(params.get("pageAction") or "").strip() |
| requested_runtime = ( |
| _ENTERPRISE_RUNTIME |
| if "enterprise" in task_type.lower() |
| else _STANDARD_RUNTIME |
| ) |
| enterprise_payload = params.get("enterprisePayload") or {} |
| enterprise_s_token = "" |
| if isinstance(enterprise_payload, dict): |
| enterprise_s_token = str(enterprise_payload.get("s") or "").strip() |
|
|
| return RecaptchaTaskProfile( |
| task_type=task_type, |
| website_url=website_url, |
| website_key=website_key, |
| page_action=page_action, |
| requested_runtime=requested_runtime, |
| api_domain=RecaptchaV3Solver._normalize_api_domain( |
| str(params.get("apiDomain") or "") |
| ), |
| enterprise_s_token=enterprise_s_token, |
| wants_session_artifacts=bool(params.get("isSession")), |
| ) |
|
|
| @staticmethod |
| def _normalize_api_domain(raw_domain: str) -> str: |
| value = raw_domain.strip() |
| if not value: |
| return _DEFAULT_API_DOMAIN |
| parsed = urlparse(value if "://" in value else f"https://{value}") |
| return parsed.netloc or parsed.path or _DEFAULT_API_DOMAIN |
|
|
| @staticmethod |
| def _build_loader_url( |
| runtime_kind: str, |
| api_domain: str, |
| website_key: str, |
| ) -> str: |
| domain = RecaptchaV3Solver._normalize_api_domain(api_domain) |
| if runtime_kind == _ENTERPRISE_RUNTIME: |
| return f"https://{domain}/recaptcha/enterprise.js?render={website_key}" |
| return f"https://{domain}/recaptcha/api.js?render={website_key}" |
|
|
| @staticmethod |
| def _determine_runtime_kind( |
| *, |
| requested_runtime: str, |
| scripts: list[str], |
| request_urls: list[str], |
| has_standard_execute: bool, |
| has_enterprise_execute: bool, |
| enterprise_cfg: bool, |
| ) -> tuple[str, str]: |
| evidence_urls = [*scripts, *request_urls] |
| if has_enterprise_execute or enterprise_cfg: |
| return _ENTERPRISE_RUNTIME, "page exposes grecaptcha.enterprise.execute" |
| if any("/recaptcha/enterprise" in url for url in evidence_urls): |
| return _ENTERPRISE_RUNTIME, "enterprise script or network evidence detected" |
| if has_standard_execute: |
| return _STANDARD_RUNTIME, "page exposes grecaptcha.execute" |
| if any( |
| "/recaptcha/api.js" in url or "/recaptcha/api2/" in url |
| for url in evidence_urls |
| ): |
| return _STANDARD_RUNTIME, "standard script or network evidence detected" |
| return requested_runtime, "no runtime evidence; falling back to task hint" |
|
|
| @staticmethod |
| def _has_runtime_network_evidence( |
| runtime_kind: str, |
| urls: list[str], |
| ) -> bool: |
| if runtime_kind == _ENTERPRISE_RUNTIME: |
| return any("/recaptcha/enterprise" in url for url in urls) |
| return any( |
| "/recaptcha/api.js" in url or "/recaptcha/api2/" in url |
| for url in urls |
| ) |
|
|
| @staticmethod |
| def _extract_api_domain(urls: list[str], fallback: str) -> str: |
| for url in urls: |
| if "/recaptcha/" not in url: |
| continue |
| parsed = urlparse(url) |
| if parsed.netloc: |
| return parsed.netloc |
| return RecaptchaV3Solver._normalize_api_domain(fallback) |
|
|
| @staticmethod |
| def _build_solution_payload(result: RecaptchaV3SolveResult) -> dict[str, Any]: |
| payload: dict[str, Any] = { |
| "gRecaptchaResponse": result.token, |
| "createTime": result.create_time_ms, |
| "runtimeKind": result.runtime_kind, |
| } |
| payload.update(result.fingerprint.to_solution_fields()) |
| payload.update(result.session_artifacts.to_solution_fields()) |
| return payload |
|
|
| @staticmethod |
| def _select_best_token( |
| execute_token: str, |
| network_token: str, |
| network_token_source: str, |
| ) -> str: |
| execute_value = execute_token.strip() |
| observed_value = network_token.strip() |
| if not observed_value: |
| return execute_value |
| if execute_value and execute_value != observed_value: |
| log.warning( |
| "reCAPTCHA execute token differed from network token; using network token from %s", |
| network_token_source or "unknown-source", |
| ) |
| return observed_value |
|
|
| async def _solve_once(self, profile: RecaptchaTaskProfile) -> RecaptchaV3SolveResult: |
| assert self._browser is not None |
|
|
| context = await self._browser.new_context( |
| **self._build_browser_context_options() |
| ) |
| page = await context.new_page() |
| observer = RecaptchaNetworkObserver() |
| observer.bind(page) |
| await page.add_init_script(_STEALTH_JS) |
|
|
| try: |
| timeout_ms = self._config.browser_timeout * 1000 |
| await page.goto( |
| profile.website_url, |
| wait_until="networkidle", |
| timeout=timeout_ms, |
| ) |
| await self._simulate_human_activity(page) |
|
|
| initial_runtime = await self._probe_runtime(page, profile, observer) |
| if initial_runtime.runtime_kind != profile.requested_runtime: |
| log.info( |
| "Runtime probe selected %s for %s (requested=%s, reason=%s)", |
| initial_runtime.runtime_kind, |
| profile.website_url, |
| profile.requested_runtime, |
| initial_runtime.detection_reason, |
| ) |
|
|
| execute_token = await self._execute_for_runtime( |
| page, |
| profile, |
| initial_runtime, |
| ) |
| await observer.flush() |
| token = self._select_best_token( |
| execute_token=execute_token, |
| network_token=observer.network_token, |
| network_token_source=observer.token_source_url, |
| ) |
| if not isinstance(token, str) or len(token) < 20: |
| raise RuntimeError(f"Invalid token received: {token!r}") |
|
|
| await asyncio.sleep(0.35) |
| await observer.flush() |
| final_runtime = await self._probe_runtime(page, profile, observer) |
| if not self._has_runtime_network_evidence( |
| final_runtime.runtime_kind, |
| final_runtime.all_urls(), |
| ): |
| raise RuntimeError( |
| f"No {final_runtime.runtime_kind} runtime evidence observed after execute" |
| ) |
|
|
| fingerprint = await self._capture_fingerprint(page, observer) |
| session_artifacts = ( |
| await self._capture_session_artifacts(context) |
| if profile.wants_session_artifacts |
| else RecaptchaSessionArtifacts() |
| ) |
| result = RecaptchaV3SolveResult( |
| token=token, |
| runtime_kind=final_runtime.runtime_kind, |
| fingerprint=fingerprint, |
| session_artifacts=session_artifacts, |
| create_time_ms=int(time.time() * 1000), |
| ) |
| log.info( |
| "Got reCAPTCHA v3 token for %s (runtime=%s len=%d ua=%s ca_t=%s ca_e=%s)", |
| profile.website_url, |
| result.runtime_kind, |
| len(result.token), |
| "yes" if result.fingerprint.user_agent else "no", |
| "yes" if result.session_artifacts.recaptcha_ca_t else "no", |
| "yes" if result.session_artifacts.recaptcha_ca_e else "no", |
| ) |
| return result |
| finally: |
| await context.close() |
|
|
| async def _simulate_human_activity(self, page: Page) -> None: |
| await page.mouse.move(400, 300) |
| await asyncio.sleep(1) |
| await page.mouse.move(600, 400) |
| await asyncio.sleep(0.5) |
|
|
| async def _probe_runtime( |
| self, |
| page: Page, |
| profile: RecaptchaTaskProfile, |
| observer: RecaptchaNetworkObserver, |
| ) -> RecaptchaRuntimeEvidence: |
| raw = await page.evaluate(_RUNTIME_INSPECTION_JS) |
| scripts = list(raw.get("scripts") or []) |
| request_urls = list(observer.request_urls) |
| runtime_kind, detection_reason = self._determine_runtime_kind( |
| requested_runtime=profile.requested_runtime, |
| scripts=scripts, |
| request_urls=request_urls, |
| has_standard_execute=bool(raw.get("hasStandardExecute")), |
| has_enterprise_execute=bool(raw.get("hasEnterpriseExecute")), |
| enterprise_cfg=bool(raw.get("enterpriseCfg")), |
| ) |
| return RecaptchaRuntimeEvidence( |
| runtime_kind=runtime_kind, |
| detection_reason=detection_reason, |
| scripts=scripts, |
| request_urls=request_urls, |
| response_statuses=dict(observer.response_statuses), |
| has_standard_execute=bool(raw.get("hasStandardExecute")), |
| has_enterprise_execute=bool(raw.get("hasEnterpriseExecute")), |
| enterprise_cfg=bool(raw.get("enterpriseCfg")), |
| api_domain=self._extract_api_domain( |
| [*scripts, *request_urls], |
| fallback=profile.api_domain, |
| ), |
| ) |
|
|
| async def _execute_for_runtime( |
| self, |
| page: Page, |
| profile: RecaptchaTaskProfile, |
| runtime: RecaptchaRuntimeEvidence, |
| ) -> str: |
| loader_domain = runtime.api_domain or profile.api_domain |
| if runtime.runtime_kind == _ENTERPRISE_RUNTIME: |
| await self._ensure_runtime_loaded( |
| page=page, |
| ready_expression=_WAIT_FOR_ENTERPRISE_RUNTIME_JS, |
| loader_url=self._build_loader_url( |
| _ENTERPRISE_RUNTIME, |
| loader_domain, |
| profile.website_key, |
| ), |
| ) |
| return await page.evaluate( |
| _EXECUTE_ENTERPRISE_JS, |
| [ |
| profile.website_key, |
| profile.page_action, |
| profile.enterprise_s_token, |
| ], |
| ) |
|
|
| await self._ensure_runtime_loaded( |
| page=page, |
| ready_expression=_WAIT_FOR_STANDARD_RUNTIME_JS, |
| loader_url=self._build_loader_url( |
| _STANDARD_RUNTIME, |
| loader_domain, |
| profile.website_key, |
| ), |
| ) |
| return await page.evaluate( |
| _EXECUTE_STANDARD_JS, |
| [profile.website_key, profile.page_action], |
| ) |
|
|
| async def _ensure_runtime_loaded( |
| self, |
| *, |
| page: Page, |
| ready_expression: str, |
| loader_url: str, |
| ) -> None: |
| try: |
| await page.wait_for_function(ready_expression, timeout=5_000) |
| return |
| except Exception: |
| log.info("reCAPTCHA runtime not ready, injecting %s", loader_url) |
|
|
| await page.evaluate(_ENSURE_SCRIPT_LOADED_JS, [loader_url]) |
| await page.wait_for_function(ready_expression, timeout=10_000) |
|
|
| async def _capture_fingerprint( |
| self, |
| page: Page, |
| observer: RecaptchaNetworkObserver, |
| ) -> BrowserFingerprint: |
| network_fp = observer.snapshot_fingerprint() |
| if network_fp.user_agent or network_fp.sec_ch_ua or network_fp.accept_language: |
| return network_fp |
|
|
| fallback = await page.evaluate(_FALLBACK_FINGERPRINT_JS) |
| return BrowserFingerprint( |
| user_agent=str(fallback.get("userAgent") or ""), |
| accept_language=str(fallback.get("acceptLanguage") or ""), |
| sec_ch_ua=str(fallback.get("secChUa") or ""), |
| sec_ch_ua_mobile=str(fallback.get("secChUaMobile") or ""), |
| sec_ch_ua_platform=str(fallback.get("secChUaPlatform") or ""), |
| ) |
|
|
| async def _capture_session_artifacts( |
| self, |
| context: Any, |
| ) -> RecaptchaSessionArtifacts: |
| cookies = await context.cookies() |
| recaptcha_ca_t = None |
| recaptcha_ca_e = None |
| for cookie in cookies: |
| name = str(cookie.get("name") or "") |
| value = str(cookie.get("value") or "") |
| if not value: |
| continue |
| if name == "recaptcha-ca-t": |
| recaptcha_ca_t = value |
| elif name == "recaptcha-ca-e": |
| recaptcha_ca_e = value |
| return RecaptchaSessionArtifacts( |
| recaptcha_ca_t=recaptcha_ca_t, |
| recaptcha_ca_e=recaptcha_ca_e, |
| ) |
|
|