mycaptcha / src /services /recaptcha_v3.py
dragg2's picture
Upload 33 files
c00180e verified
"""Runtime-aware reCAPTCHA v3 solver using Playwright browser automation.
This module intentionally separates:
1. task normalization
2. runtime probing (standard v3 vs enterprise)
3. execution
4. result/artifact collection
That structure mirrors mainstream captcha providers more closely than a
single "get token if any string is returned" flow.
"""
from __future__ import annotations
import asyncio
import logging
import re
import time
from dataclasses import dataclass, field
from typing import Any
from urllib.parse import unquote, urlparse
from playwright.async_api import Browser, Page, Request, Response, Playwright, async_playwright
from ..core.config import Config
log = logging.getLogger(__name__)
_STANDARD_RUNTIME = "standard"
_ENTERPRISE_RUNTIME = "enterprise"
_DEFAULT_API_DOMAIN = "www.google.com"
_BROWSER_ACCEPT_LANGUAGE = "en-US,en;q=0.9"
_TOKEN_RESPONSE_MARKERS = (
"/recaptcha/api2/reload",
"/recaptcha/api2/clr",
"/recaptcha/enterprise/reload",
"/recaptcha/enterprise/clr",
)
_TOKEN_RESPONSE_PATTERNS = (
re.compile(r'"rresp","([^"]+)"'),
re.compile(r"'rresp','([^']+)'"),
)
_WAIT_FOR_STANDARD_RUNTIME_JS = """
() => (
typeof window.grecaptcha !== 'undefined'
&& typeof window.grecaptcha.execute === 'function'
)
"""
_WAIT_FOR_ENTERPRISE_RUNTIME_JS = """
() => (
typeof window.grecaptcha !== 'undefined'
&& typeof window.grecaptcha.enterprise !== 'undefined'
&& typeof window.grecaptcha.enterprise.execute === 'function'
)
"""
_RUNTIME_INSPECTION_JS = """
() => {
const scripts = [...document.scripts]
.map((script) => script.src)
.filter((src) => typeof src === 'string' && src.includes('/recaptcha/'));
const cfg = window.___grecaptcha_cfg || {};
return {
scripts,
hasStandardExecute: typeof window.grecaptcha?.execute === 'function',
hasEnterpriseExecute: typeof window.grecaptcha?.enterprise?.execute === 'function',
enterpriseCfg: cfg.enterprise === true,
};
}
"""
_ENSURE_SCRIPT_LOADED_JS = """
([scriptUrl]) => new Promise((resolve, reject) => {
const existing = [...document.scripts].find((script) => script.src === scriptUrl);
if (existing) {
resolve(scriptUrl);
return;
}
const script = document.createElement('script');
script.src = scriptUrl;
script.async = true;
script.defer = true;
script.onerror = () => reject(new Error(`Failed to load script: ${scriptUrl}`));
script.onload = () => resolve(scriptUrl);
document.head.appendChild(script);
})
"""
_EXECUTE_STANDARD_JS = """
([key, action]) => new Promise((resolve, reject) => {
const gr = window.grecaptcha;
if (!gr || typeof gr.execute !== 'function') {
reject(new Error('grecaptcha.execute is not available'));
return;
}
gr.ready(() => {
const options = {};
if (action) {
options.action = action;
}
gr.execute(key, options).then(resolve).catch(reject);
});
})
"""
_EXECUTE_ENTERPRISE_JS = """
([key, action, sToken]) => new Promise((resolve, reject) => {
const gr = window.grecaptcha?.enterprise;
if (!gr || typeof gr.execute !== 'function') {
reject(new Error('grecaptcha.enterprise.execute is not available'));
return;
}
gr.ready(() => {
const options = {};
if (action) {
options.action = action;
}
if (sToken) {
options.s = sToken;
}
gr.execute(key, options).then(resolve).catch(reject);
});
})
"""
_FALLBACK_FINGERPRINT_JS = """
() => {
const uaData = navigator.userAgentData || null;
let secChUa = '';
let secChUaMobile = '';
let secChUaPlatform = '';
if (uaData) {
if (Array.isArray(uaData.brands) && uaData.brands.length > 0) {
secChUa = uaData.brands
.map((item) => `"${item.brand}";v="${item.version}"`)
.join(', ');
}
secChUaMobile = uaData.mobile ? '?1' : '?0';
if (uaData.platform) {
secChUaPlatform = `"${uaData.platform}"`;
}
}
return {
userAgent: navigator.userAgent || '',
acceptLanguage: Array.isArray(navigator.languages) && navigator.languages.length > 0
? navigator.languages.join(',')
: (navigator.language || ''),
secChUa,
secChUaMobile,
secChUaPlatform,
};
}
"""
# Basic anti-detection init script
_STEALTH_JS = """
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
window.chrome = {runtime: {}, loadTimes: () => {}, csi: () => {}};
"""
@dataclass(frozen=True)
class BrowserFingerprint:
"""Actual browser fingerprint observed during token generation."""
user_agent: str = ""
accept_language: str = ""
sec_ch_ua: str = ""
sec_ch_ua_mobile: str = ""
sec_ch_ua_platform: str = ""
def to_solution_fields(self) -> dict[str, Any]:
payload: dict[str, Any] = {}
if self.user_agent:
payload["userAgent"] = self.user_agent
if self.accept_language:
payload["acceptLanguage"] = self.accept_language
if self.sec_ch_ua:
payload["secChUa"] = self.sec_ch_ua
if self.sec_ch_ua_mobile:
payload["secChUaMobile"] = self.sec_ch_ua_mobile
if self.sec_ch_ua_platform:
payload["secChUaPlatform"] = self.sec_ch_ua_platform
return payload
@dataclass(frozen=True)
class RecaptchaSessionArtifacts:
"""Session cookies that some mature vendors return for reCAPTCHA v3."""
recaptcha_ca_t: str | None = None
recaptcha_ca_e: str | None = None
def to_solution_fields(self) -> dict[str, Any]:
payload: dict[str, Any] = {}
if self.recaptcha_ca_t:
payload["recaptcha-ca-t"] = self.recaptcha_ca_t
if self.recaptcha_ca_e:
payload["recaptcha-ca-e"] = self.recaptcha_ca_e
return payload
@dataclass(frozen=True)
class RecaptchaTaskProfile:
"""Normalized internal task view."""
task_type: str
website_url: str
website_key: str
page_action: str
requested_runtime: str
api_domain: str
enterprise_s_token: str
wants_session_artifacts: bool
@dataclass
class RecaptchaRuntimeEvidence:
"""Runtime signals collected before/after execute."""
runtime_kind: str
detection_reason: str
scripts: list[str] = field(default_factory=list)
request_urls: list[str] = field(default_factory=list)
response_statuses: dict[str, int] = field(default_factory=dict)
has_standard_execute: bool = False
has_enterprise_execute: bool = False
enterprise_cfg: bool = False
api_domain: str = _DEFAULT_API_DOMAIN
def all_urls(self) -> list[str]:
return [*self.scripts, *self.request_urls, *self.response_statuses.keys()]
@dataclass(frozen=True)
class RecaptchaV3SolveResult:
"""Final execution artifacts returned by `_solve_once()`."""
token: str
runtime_kind: str
fingerprint: BrowserFingerprint
session_artifacts: RecaptchaSessionArtifacts
create_time_ms: int
class RecaptchaNetworkObserver:
"""Collect reCAPTCHA request evidence and the actual network fingerprint."""
def __init__(self) -> None:
self.request_urls: list[str] = []
self.response_statuses: dict[str, int] = {}
self._fingerprint = BrowserFingerprint()
self._network_token = ""
self._token_source_url = ""
self._response_tasks: set[asyncio.Task[None]] = set()
@staticmethod
def _is_relevant_url(url: str) -> bool:
return "/recaptcha/" in url
@staticmethod
def _is_token_response_url(url: str) -> bool:
return any(marker in url for marker in _TOKEN_RESPONSE_MARKERS)
@staticmethod
def extract_token_from_body(body: str) -> str:
for pattern in _TOKEN_RESPONSE_PATTERNS:
match = pattern.search(body)
if match:
return match.group(1)
return ""
def bind(self, page: Page) -> None:
page.on("request", self._capture_request)
page.on("response", self._capture_response)
def _capture_request(self, request: Request) -> None:
url = request.url
if not self._is_relevant_url(url):
return
self.request_urls.append(url)
headers = {
str(key).lower(): str(value)
for key, value in (request.headers or {}).items()
}
self._fingerprint = BrowserFingerprint(
user_agent=headers.get("user-agent", self._fingerprint.user_agent),
accept_language=headers.get(
"accept-language",
self._fingerprint.accept_language,
),
sec_ch_ua=headers.get("sec-ch-ua", self._fingerprint.sec_ch_ua),
sec_ch_ua_mobile=headers.get(
"sec-ch-ua-mobile",
self._fingerprint.sec_ch_ua_mobile,
),
sec_ch_ua_platform=headers.get(
"sec-ch-ua-platform",
self._fingerprint.sec_ch_ua_platform,
),
)
def _capture_response(self, response: Response) -> None:
url = response.url
if not self._is_relevant_url(url):
return
self.response_statuses[url] = response.status
if self._is_token_response_url(url):
task = asyncio.create_task(self._capture_response_body(response))
self._response_tasks.add(task)
task.add_done_callback(self._response_tasks.discard)
def snapshot_fingerprint(self) -> BrowserFingerprint:
return self._fingerprint
@property
def network_token(self) -> str:
return self._network_token
@property
def token_source_url(self) -> str:
return self._token_source_url
async def flush(self) -> None:
if not self._response_tasks:
return
await asyncio.gather(*tuple(self._response_tasks), return_exceptions=True)
async def _capture_response_body(self, response: Response) -> None:
try:
body = await response.text()
except Exception as exc:
log.debug(
"Failed to read reCAPTCHA response body from %s: %s",
response.url,
exc,
)
return
token = self.extract_token_from_body(body)
if not token:
return
self._network_token = token
self._token_source_url = response.url
class RecaptchaV3Solver:
"""Solves reCAPTCHA v3 tasks via runtime-aware Playwright automation."""
def __init__(self, config: Config) -> None:
self._config = config
self._playwright: Playwright | None = None
self._browser: Browser | None = None
self._start_lock = asyncio.Lock()
@staticmethod
def _build_proxy_settings(raw_proxy_url: str) -> dict[str, str]:
parsed = urlparse(raw_proxy_url.strip())
if not parsed.scheme or not parsed.hostname or not parsed.port:
raise ValueError(
"BROWSER_PROXY_URL must use a full URL such as socks5://user:pass@host:port"
)
payload = {"server": f"{parsed.scheme}://{parsed.hostname}:{parsed.port}"}
if parsed.username:
payload["username"] = unquote(parsed.username)
if parsed.password:
payload["password"] = unquote(parsed.password)
return payload
@staticmethod
def _extract_browser_major_version(browser_version: str) -> str:
match = re.search(r"(\d+)", browser_version)
return match.group(1) if match else "131"
@staticmethod
def _build_chromium_user_agent(browser_version: str) -> str:
major = RecaptchaV3Solver._extract_browser_major_version(browser_version)
return (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
f"Chrome/{major}.0.0.0 Safari/537.36"
)
def _resolve_browser_version(self) -> str:
if self._browser is None:
return ""
version_attr = getattr(self._browser, "version", "")
if callable(version_attr):
try:
return str(version_attr())
except Exception:
return ""
return str(version_attr or "")
def _build_browser_context_options(self) -> dict[str, Any]:
return {
"user_agent": self._build_chromium_user_agent(
self._resolve_browser_version()
),
"viewport": {"width": 1920, "height": 1080},
"locale": "en-US",
"extra_http_headers": {
"Accept-Language": _BROWSER_ACCEPT_LANGUAGE,
},
}
async def start(self) -> None:
if self._browser is not None:
return
async with self._start_lock:
if self._browser is not None:
return
playwright = await async_playwright().start()
launch_options: dict[str, Any] = {
"headless": self._config.browser_headless,
"args": [
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
],
}
if self._config.browser_proxy_url:
launch_options["proxy"] = self._build_proxy_settings(
self._config.browser_proxy_url
)
try:
browser = await playwright.chromium.launch(**launch_options)
except Exception:
await playwright.stop()
raise
self._playwright = playwright
self._browser = browser
log.info(
"Playwright browser started lazily (headless=%s proxy=%s)",
self._config.browser_headless,
"configured" if self._config.browser_proxy_url else "none",
)
async def stop(self) -> None:
async with self._start_lock:
if self._browser:
await self._browser.close()
self._browser = None
if self._playwright:
await self._playwright.stop()
self._playwright = None
log.info("Playwright browser stopped")
async def solve(self, params: dict[str, Any]) -> dict[str, Any]:
await self.start()
profile = self._build_task_profile(params)
last_error: Exception | None = None
for attempt in range(self._config.captcha_retries):
try:
result = await self._solve_once(profile)
return self._build_solution_payload(result)
except Exception as exc:
last_error = exc
log.warning(
"reCAPTCHA v3 attempt %d/%d failed for %s (%s): %s",
attempt + 1,
self._config.captcha_retries,
profile.website_url,
profile.task_type,
exc,
)
if attempt < self._config.captcha_retries - 1:
await asyncio.sleep(2)
raise RuntimeError(
f"reCAPTCHA v3 failed after {self._config.captcha_retries} attempts: {last_error}"
)
@staticmethod
def _build_task_profile(params: dict[str, Any]) -> RecaptchaTaskProfile:
task_type = str(params.get("type") or "RecaptchaV3TaskProxyless")
website_url = str(params["websiteURL"])
website_key = str(params["websiteKey"])
page_action = str(params.get("pageAction") or "").strip()
requested_runtime = (
_ENTERPRISE_RUNTIME
if "enterprise" in task_type.lower()
else _STANDARD_RUNTIME
)
enterprise_payload = params.get("enterprisePayload") or {}
enterprise_s_token = ""
if isinstance(enterprise_payload, dict):
enterprise_s_token = str(enterprise_payload.get("s") or "").strip()
return RecaptchaTaskProfile(
task_type=task_type,
website_url=website_url,
website_key=website_key,
page_action=page_action,
requested_runtime=requested_runtime,
api_domain=RecaptchaV3Solver._normalize_api_domain(
str(params.get("apiDomain") or "")
),
enterprise_s_token=enterprise_s_token,
wants_session_artifacts=bool(params.get("isSession")),
)
@staticmethod
def _normalize_api_domain(raw_domain: str) -> str:
value = raw_domain.strip()
if not value:
return _DEFAULT_API_DOMAIN
parsed = urlparse(value if "://" in value else f"https://{value}")
return parsed.netloc or parsed.path or _DEFAULT_API_DOMAIN
@staticmethod
def _build_loader_url(
runtime_kind: str,
api_domain: str,
website_key: str,
) -> str:
domain = RecaptchaV3Solver._normalize_api_domain(api_domain)
if runtime_kind == _ENTERPRISE_RUNTIME:
return f"https://{domain}/recaptcha/enterprise.js?render={website_key}"
return f"https://{domain}/recaptcha/api.js?render={website_key}"
@staticmethod
def _determine_runtime_kind(
*,
requested_runtime: str,
scripts: list[str],
request_urls: list[str],
has_standard_execute: bool,
has_enterprise_execute: bool,
enterprise_cfg: bool,
) -> tuple[str, str]:
evidence_urls = [*scripts, *request_urls]
if has_enterprise_execute or enterprise_cfg:
return _ENTERPRISE_RUNTIME, "page exposes grecaptcha.enterprise.execute"
if any("/recaptcha/enterprise" in url for url in evidence_urls):
return _ENTERPRISE_RUNTIME, "enterprise script or network evidence detected"
if has_standard_execute:
return _STANDARD_RUNTIME, "page exposes grecaptcha.execute"
if any(
"/recaptcha/api.js" in url or "/recaptcha/api2/" in url
for url in evidence_urls
):
return _STANDARD_RUNTIME, "standard script or network evidence detected"
return requested_runtime, "no runtime evidence; falling back to task hint"
@staticmethod
def _has_runtime_network_evidence(
runtime_kind: str,
urls: list[str],
) -> bool:
if runtime_kind == _ENTERPRISE_RUNTIME:
return any("/recaptcha/enterprise" in url for url in urls)
return any(
"/recaptcha/api.js" in url or "/recaptcha/api2/" in url
for url in urls
)
@staticmethod
def _extract_api_domain(urls: list[str], fallback: str) -> str:
for url in urls:
if "/recaptcha/" not in url:
continue
parsed = urlparse(url)
if parsed.netloc:
return parsed.netloc
return RecaptchaV3Solver._normalize_api_domain(fallback)
@staticmethod
def _build_solution_payload(result: RecaptchaV3SolveResult) -> dict[str, Any]:
payload: dict[str, Any] = {
"gRecaptchaResponse": result.token,
"createTime": result.create_time_ms,
"runtimeKind": result.runtime_kind,
}
payload.update(result.fingerprint.to_solution_fields())
payload.update(result.session_artifacts.to_solution_fields())
return payload
@staticmethod
def _select_best_token(
execute_token: str,
network_token: str,
network_token_source: str,
) -> str:
execute_value = execute_token.strip()
observed_value = network_token.strip()
if not observed_value:
return execute_value
if execute_value and execute_value != observed_value:
log.warning(
"reCAPTCHA execute token differed from network token; using network token from %s",
network_token_source or "unknown-source",
)
return observed_value
async def _solve_once(self, profile: RecaptchaTaskProfile) -> RecaptchaV3SolveResult:
assert self._browser is not None
context = await self._browser.new_context(
**self._build_browser_context_options()
)
page = await context.new_page()
observer = RecaptchaNetworkObserver()
observer.bind(page)
await page.add_init_script(_STEALTH_JS)
try:
timeout_ms = self._config.browser_timeout * 1000
await page.goto(
profile.website_url,
wait_until="networkidle",
timeout=timeout_ms,
)
await self._simulate_human_activity(page)
initial_runtime = await self._probe_runtime(page, profile, observer)
if initial_runtime.runtime_kind != profile.requested_runtime:
log.info(
"Runtime probe selected %s for %s (requested=%s, reason=%s)",
initial_runtime.runtime_kind,
profile.website_url,
profile.requested_runtime,
initial_runtime.detection_reason,
)
execute_token = await self._execute_for_runtime(
page,
profile,
initial_runtime,
)
await observer.flush()
token = self._select_best_token(
execute_token=execute_token,
network_token=observer.network_token,
network_token_source=observer.token_source_url,
)
if not isinstance(token, str) or len(token) < 20:
raise RuntimeError(f"Invalid token received: {token!r}")
await asyncio.sleep(0.35)
await observer.flush()
final_runtime = await self._probe_runtime(page, profile, observer)
if not self._has_runtime_network_evidence(
final_runtime.runtime_kind,
final_runtime.all_urls(),
):
raise RuntimeError(
f"No {final_runtime.runtime_kind} runtime evidence observed after execute"
)
fingerprint = await self._capture_fingerprint(page, observer)
session_artifacts = (
await self._capture_session_artifacts(context)
if profile.wants_session_artifacts
else RecaptchaSessionArtifacts()
)
result = RecaptchaV3SolveResult(
token=token,
runtime_kind=final_runtime.runtime_kind,
fingerprint=fingerprint,
session_artifacts=session_artifacts,
create_time_ms=int(time.time() * 1000),
)
log.info(
"Got reCAPTCHA v3 token for %s (runtime=%s len=%d ua=%s ca_t=%s ca_e=%s)",
profile.website_url,
result.runtime_kind,
len(result.token),
"yes" if result.fingerprint.user_agent else "no",
"yes" if result.session_artifacts.recaptcha_ca_t else "no",
"yes" if result.session_artifacts.recaptcha_ca_e else "no",
)
return result
finally:
await context.close()
async def _simulate_human_activity(self, page: Page) -> None:
await page.mouse.move(400, 300)
await asyncio.sleep(1)
await page.mouse.move(600, 400)
await asyncio.sleep(0.5)
async def _probe_runtime(
self,
page: Page,
profile: RecaptchaTaskProfile,
observer: RecaptchaNetworkObserver,
) -> RecaptchaRuntimeEvidence:
raw = await page.evaluate(_RUNTIME_INSPECTION_JS)
scripts = list(raw.get("scripts") or [])
request_urls = list(observer.request_urls)
runtime_kind, detection_reason = self._determine_runtime_kind(
requested_runtime=profile.requested_runtime,
scripts=scripts,
request_urls=request_urls,
has_standard_execute=bool(raw.get("hasStandardExecute")),
has_enterprise_execute=bool(raw.get("hasEnterpriseExecute")),
enterprise_cfg=bool(raw.get("enterpriseCfg")),
)
return RecaptchaRuntimeEvidence(
runtime_kind=runtime_kind,
detection_reason=detection_reason,
scripts=scripts,
request_urls=request_urls,
response_statuses=dict(observer.response_statuses),
has_standard_execute=bool(raw.get("hasStandardExecute")),
has_enterprise_execute=bool(raw.get("hasEnterpriseExecute")),
enterprise_cfg=bool(raw.get("enterpriseCfg")),
api_domain=self._extract_api_domain(
[*scripts, *request_urls],
fallback=profile.api_domain,
),
)
async def _execute_for_runtime(
self,
page: Page,
profile: RecaptchaTaskProfile,
runtime: RecaptchaRuntimeEvidence,
) -> str:
loader_domain = runtime.api_domain or profile.api_domain
if runtime.runtime_kind == _ENTERPRISE_RUNTIME:
await self._ensure_runtime_loaded(
page=page,
ready_expression=_WAIT_FOR_ENTERPRISE_RUNTIME_JS,
loader_url=self._build_loader_url(
_ENTERPRISE_RUNTIME,
loader_domain,
profile.website_key,
),
)
return await page.evaluate(
_EXECUTE_ENTERPRISE_JS,
[
profile.website_key,
profile.page_action,
profile.enterprise_s_token,
],
)
await self._ensure_runtime_loaded(
page=page,
ready_expression=_WAIT_FOR_STANDARD_RUNTIME_JS,
loader_url=self._build_loader_url(
_STANDARD_RUNTIME,
loader_domain,
profile.website_key,
),
)
return await page.evaluate(
_EXECUTE_STANDARD_JS,
[profile.website_key, profile.page_action],
)
async def _ensure_runtime_loaded(
self,
*,
page: Page,
ready_expression: str,
loader_url: str,
) -> None:
try:
await page.wait_for_function(ready_expression, timeout=5_000)
return
except Exception:
log.info("reCAPTCHA runtime not ready, injecting %s", loader_url)
await page.evaluate(_ENSURE_SCRIPT_LOADED_JS, [loader_url])
await page.wait_for_function(ready_expression, timeout=10_000)
async def _capture_fingerprint(
self,
page: Page,
observer: RecaptchaNetworkObserver,
) -> BrowserFingerprint:
network_fp = observer.snapshot_fingerprint()
if network_fp.user_agent or network_fp.sec_ch_ua or network_fp.accept_language:
return network_fp
fallback = await page.evaluate(_FALLBACK_FINGERPRINT_JS)
return BrowserFingerprint(
user_agent=str(fallback.get("userAgent") or ""),
accept_language=str(fallback.get("acceptLanguage") or ""),
sec_ch_ua=str(fallback.get("secChUa") or ""),
sec_ch_ua_mobile=str(fallback.get("secChUaMobile") or ""),
sec_ch_ua_platform=str(fallback.get("secChUaPlatform") or ""),
)
async def _capture_session_artifacts(
self,
context: Any,
) -> RecaptchaSessionArtifacts:
cookies = await context.cookies()
recaptcha_ca_t = None
recaptcha_ca_e = None
for cookie in cookies:
name = str(cookie.get("name") or "")
value = str(cookie.get("value") or "")
if not value:
continue
if name == "recaptcha-ca-t":
recaptcha_ca_t = value
elif name == "recaptcha-ca-e":
recaptcha_ca_e = value
return RecaptchaSessionArtifacts(
recaptcha_ca_t=recaptcha_ca_t,
recaptcha_ca_e=recaptcha_ca_e,
)