"""HCaptcha solver using Playwright browser automation. Supports ``HCaptchaTaskProxyless`` task type. Strategy: 1. Visit the target page with a realistic browser context. 2. Click the hCaptcha checkbox. 3. If a token is issued immediately, return it. 4. If an image-selection challenge appears, extract the prompt + tile images, call ``ClassificationSolver`` for ``HCaptchaClassification``-style reasoning, click the matching tiles, submit the challenge, and continue polling for the token. """ from __future__ import annotations import asyncio import base64 import logging from typing import Any from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit from playwright.async_api import Browser, ElementHandle, Frame, Page, Playwright, async_playwright from ..core.config import Config from .classification import ClassificationSolver log = logging.getLogger(__name__) _STEALTH_JS = """ Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']}); Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]}); window.chrome = {runtime: {}, loadTimes: () => {}, csi: () => {}}; """ _EXTRACT_HCAPTCHA_TOKEN_JS = """ () => { const textarea = document.querySelector('[name="h-captcha-response"]') || document.querySelector('[name="g-recaptcha-response"]'); if (textarea && textarea.value && textarea.value.length > 20) { return textarea.value; } if (window.hcaptcha && typeof window.hcaptcha.getResponse === 'function') { const resp = window.hcaptcha.getResponse(); if (resp && resp.length > 20) return resp; } return null; } """ _EXTRACT_HCAPTCHA_META_JS = """ () => { const result = { token: null, respKey: null }; const textarea = document.querySelector('[name="h-captcha-response"]') || document.querySelector('[name="g-recaptcha-response"]'); if (textarea && textarea.value && textarea.value.length > 20) { result.token = textarea.value; } try { if (window.hcaptcha) { if (!result.token && typeof window.hcaptcha.getResponse === 'function') { const response = window.hcaptcha.getResponse(); if (response && response.length > 20) { result.token = response; } } if (typeof window.hcaptcha.getRespKey === 'function') { const respKey = window.hcaptcha.getRespKey(); if (respKey) { result.respKey = String(respKey); } } } } catch (err) { return result; } return result; } """ _QUESTION_JS = """ () => { const prompt = document.querySelector('.prompt-text') || document.querySelector('h2.prompt-text') || document.querySelector('.challenge-prompt') || document.querySelector('[class*="prompt"]'); return prompt?.textContent?.trim() || null; } """ _CHALLENGE_TILE_SELECTORS = ( ".task-grid .task-image", ".task-grid .task", ".task-grid .image", ".challenge-container .task-image", ".challenge-view .task-image", ".task-image", ".task", ) _EXAMPLE_IMAGE_SELECTORS = ( ".challenge-example .image", ".challenge-example", ".example-wrapper .image", ) _VERIFY_BUTTON_SELECTORS = ( ".button-submit", 'button[type="submit"]', 'button[aria-label*="Verify"]', ) _CHECKBOX_SELECTORS = ( "#checkbox", '[id="checkbox"]', 'div[role="checkbox"]', 'input[type="checkbox"]', '.checkbox', '[aria-checked]', ) _CHALLENGE_FRAME_HINTS = ( "frame=challenge", "challenge", "hcaptcha-inner", ) _CHECKBOX_FRAME_HINTS = ( "frame=checkbox", "checkbox", "hcaptcha-invisible", "hcaptcha-checkbox", ) class HCaptchaSolver: """Solves ``HCaptchaTaskProxyless`` tasks via Playwright.""" def __init__( self, config: Config, browser: Browser | None = None, classifier: ClassificationSolver | None = None, ) -> None: self._config = config self._playwright: Playwright | None = None self._browser: Browser | None = browser self._owns_browser = browser is None self._classifier = classifier self._start_lock = asyncio.Lock() async def start(self) -> None: if self._browser is not None: return async with self._start_lock: if self._browser is not None: return playwright = await async_playwright().start() try: browser = await playwright.chromium.launch( headless=self._config.browser_headless, args=[ "--disable-blink-features=AutomationControlled", "--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu", ], ) except Exception: await playwright.stop() raise self._playwright = playwright self._browser = browser log.info("HCaptchaSolver browser started lazily") async def stop(self) -> None: async with self._start_lock: if self._owns_browser: if self._browser: await self._browser.close() self._browser = None if self._playwright: await self._playwright.stop() self._playwright = None log.info("HCaptchaSolver stopped") async def solve(self, params: dict[str, Any]) -> dict[str, Any]: await self.start() website_url = params["websiteURL"] website_key = params["websiteKey"] enterprise_payload = params.get("enterprisePayload") or {} rqdata = str(enterprise_payload.get("rqdata") or "").strip() if rqdata: log.info("Received hCaptcha enterprisePayload.rqdata (len=%d)", len(rqdata)) last_error: Exception | None = None for attempt in range(self._config.captcha_retries): try: token, resp_key = await self._solve_once(website_url, website_key) solution = {"gRecaptchaResponse": token} if resp_key: solution["respKey"] = resp_key return solution except Exception as exc: last_error = exc log.warning( "HCaptcha attempt %d/%d failed: %s", attempt + 1, self._config.captcha_retries, exc, ) if attempt < self._config.captcha_retries - 1: await asyncio.sleep(2) raise RuntimeError( f"HCaptcha failed after {self._config.captcha_retries} attempts: {last_error}" ) async def _solve_once(self, website_url: str, website_key: str) -> tuple[str, str]: assert self._browser is not None target_url = self._prepare_target_url(website_url, website_key) if target_url != website_url: log.info("Normalized hCaptcha target URL to honor requested sitekey: %s", target_url) context = await self._browser.new_context( user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/131.0.0.0 Safari/537.36" ), viewport={"width": 1920, "height": 1080}, locale="en-US", ) page = await context.new_page() await page.add_init_script(_STEALTH_JS) try: timeout_ms = self._config.browser_timeout * 1000 await page.goto(target_url, wait_until="networkidle", timeout=timeout_ms) await page.mouse.move(400, 300) await asyncio.sleep(1) await self._click_checkbox(page) # 先给低风险会话一个直接出 token 的机会。 token = await self._wait_for_token(page, seconds=4) if token: resp_key = await self._wait_for_resp_key(page, seconds=2) log.info("Got hCaptcha token directly after checkbox click (len=%d)", len(token)) return token, resp_key # 无头环境常见路径:进入图片 challenge,然后走 classification fallback。 log.info( "No direct hCaptcha token after checkbox click, entering classification fallback" ) fallback_handled = await self._solve_image_selection_challenge(page) if fallback_handled: token = await self._wait_for_token(page) resp_key = await self._wait_for_resp_key(page, seconds=2) if not isinstance(token, str) or len(token) < 20: raise RuntimeError(f"Invalid hCaptcha token: {token!r}") log.info("Got hCaptcha token (len=%d)", len(token)) return token, resp_key finally: await context.close() async def _click_checkbox(self, page: Page) -> None: frame = await self._find_frame(page, "checkbox", wait_seconds=10) if frame is None: raise RuntimeError(self._build_missing_frame_error(page, "checkbox")) checkbox = await self._find_checkbox_element(frame) if checkbox is None: raise RuntimeError( "Could not find hCaptcha checkbox element inside frame " f"{getattr(frame, 'url', None) or ''}" ) await checkbox.click(timeout=10_000) log.info("Clicked hCaptcha checkbox") async def _wait_for_token(self, page: Page, *, seconds: int | None = None) -> str | None: remaining = max(1, seconds or self._config.captcha_timeout) for _ in range(remaining): token, _ = await self._extract_hcaptcha_meta(page) if isinstance(token, str) and len(token) > 20: return token await asyncio.sleep(1) return None async def _wait_for_resp_key(self, page: Page, *, seconds: int | None = None) -> str: remaining = max(1, seconds or 2) last_resp_key = "" for _ in range(remaining): _, resp_key = await self._extract_hcaptcha_meta(page) if resp_key: return resp_key await asyncio.sleep(1) return last_resp_key async def _extract_hcaptcha_meta(self, page: Page) -> tuple[str | None, str]: contexts: list[Page | Frame] = [page] for frame in page.frames: url = (getattr(frame, "url", None) or "").lower() if self._is_hcaptcha_related_frame(url): contexts.append(frame) token: str | None = None resp_key = "" for context in contexts: try: payload = await context.evaluate(_EXTRACT_HCAPTCHA_META_JS) except Exception: continue if not isinstance(payload, dict): continue current_token = payload.get("token") current_resp_key = payload.get("respKey") if not token and isinstance(current_token, str) and len(current_token) > 20: token = current_token if not resp_key and isinstance(current_resp_key, str) and current_resp_key.strip(): resp_key = current_resp_key.strip() if token and resp_key: break return token, resp_key async def _find_frame( self, page: Page, frame_role: str, *, wait_seconds: int = 5 ) -> Frame | None: attempts = max(1, wait_seconds * 2) main_frame = getattr(page, "main_frame", None) for _ in range(attempts): exact_match: Frame | None = None hinted_match: Frame | None = None dom_match: Frame | None = None for frame in page.frames: if main_frame is not None and frame is main_frame: continue url = (frame.url or "").lower() if not self._is_hcaptcha_related_frame(url): continue if frame_role == "checkbox": if await self._find_checkbox_element(frame) is not None: dom_match = dom_match or frame if any(hint in url for hint in _CHECKBOX_FRAME_HINTS): if "frame=checkbox" in url: exact_match = exact_match or frame else: hinted_match = hinted_match or frame elif frame_role == "challenge": if await self._is_challenge_frame(frame): dom_match = dom_match or frame if any(hint in url for hint in _CHALLENGE_FRAME_HINTS): if "frame=challenge" in url: exact_match = exact_match or frame else: hinted_match = hinted_match or frame if exact_match is not None: return exact_match if dom_match is not None: return dom_match if hinted_match is not None: return hinted_match await asyncio.sleep(0.5) return None @staticmethod def _is_hcaptcha_related_frame(url: str) -> bool: return ( "hcaptcha" in url or "newassets.hcaptcha.com" in url or "api.hcaptcha.com" in url or "js.stripe.com/v3/hcaptcha" in url ) async def _find_checkbox_element(self, frame: Frame) -> ElementHandle[Any] | None: for selector in _CHECKBOX_SELECTORS: element = await frame.query_selector(selector) if element is not None: return element return None async def _is_challenge_frame(self, frame: Frame) -> bool: prompt = await frame.evaluate(_QUESTION_JS) if isinstance(prompt, str) and prompt.strip(): return True for selector in _CHALLENGE_TILE_SELECTORS: elements = await frame.query_selector_all(selector) if elements: return True if await frame.locator("canvas").count() > 0: return True for selector in _VERIFY_BUTTON_SELECTORS: if await frame.query_selector(selector) is not None: return True return False @staticmethod def _build_missing_frame_error(page: Page, frame_role: str) -> str: frame_urls = [ getattr(frame, "url", None) or "" for frame in page.frames ] return ( f"Could not find hCaptcha {frame_role} frame; available frames={frame_urls}" ) @staticmethod def _prepare_target_url(website_url: str, website_key: str) -> str: """为官方 demo 自动补齐/对齐 sitekey,确保按请求参数测试真实行为。""" if not website_key: return website_url parsed = urlsplit(website_url) host = parsed.netloc.lower() path = parsed.path.rstrip("/") is_official_demo = host in {"accounts.hcaptcha.com", "demo.hcaptcha.com"} and path == "/demo" if not is_official_demo: return website_url query = parse_qs(parsed.query, keep_blank_values=True) changed = False current_sitekey = query.get("sitekey", [None])[0] if current_sitekey != website_key: query["sitekey"] = [website_key] changed = True if "hl" not in query: query["hl"] = ["en"] changed = True if not changed: return website_url return urlunsplit( ( parsed.scheme, parsed.netloc, parsed.path, urlencode(query, doseq=True), parsed.fragment, ) ) async def _solve_image_selection_challenge(self, page: Page) -> bool: if self._classifier is None: raise RuntimeError( "Classification fallback is unavailable because no ClassificationSolver was injected" ) rounds = max(1, self._config.captcha_retries) for round_index in range(rounds): token = await self._wait_for_token(page, seconds=1) if token: return True challenge = await self._collect_selection_challenge(page) if challenge is None: unsupported_reason = await self._describe_unsupported_challenge(page) log.warning( "Could not collect hCaptcha image-selection challenge in round %d: %s", round_index + 1, unsupported_reason, ) if round_index == 0: raise RuntimeError(unsupported_reason) return False log.info( "Collected hCaptcha image-selection challenge in round %d: question=%r tiles=%d examples=%d", round_index + 1, challenge["question"], len(challenge["tiles"]), len(challenge["examples"]), ) payload = self._build_classification_payload( question=challenge["question"], tile_images=challenge["tile_images"], examples=challenge["examples"], ) result = await self._classifier.solve(payload) log.info("Classification solver returned raw result: %s", result) indices = self._extract_selection_indices( result=result, tile_count=len(challenge["tiles"]), ) await self._click_selected_tiles(challenge["tiles"], indices) await self._click_verify_button(challenge["frame"]) token = await self._wait_for_token(page, seconds=6) if token: return True log.info( "hCaptcha challenge round %d submitted without immediate token, retrying", round_index + 1, ) return False async def _collect_selection_challenge(self, page: Page) -> dict[str, Any] | None: frame = await self._find_frame(page, "challenge", wait_seconds=10) if frame is None: return None await asyncio.sleep(1) question = await frame.evaluate(_QUESTION_JS) if not isinstance(question, str) or not question.strip(): return None tiles = await self._find_clickable_tiles(frame) if not tiles: return None tile_entries: list[tuple[ElementHandle[Any], str]] = [] for tile in tiles: encoded = await self._capture_element_base64(tile) if encoded: tile_entries.append((tile, encoded)) if not tile_entries: return None return { "frame": frame, "question": question.strip(), "tiles": [tile for tile, _ in tile_entries], "tile_images": [encoded for _, encoded in tile_entries], "examples": await self._extract_example_images(frame), } async def _find_clickable_tiles(self, frame: Frame) -> list[ElementHandle[Any]]: for selector in _CHALLENGE_TILE_SELECTORS: elements = await frame.query_selector_all(selector) if elements: return elements return [] async def _extract_example_images(self, frame: Frame) -> list[str]: examples: list[str] = [] for selector in _EXAMPLE_IMAGE_SELECTORS: elements = await frame.query_selector_all(selector) if not elements: continue for element in elements: encoded = await self._capture_element_base64(element) if encoded: examples.append(encoded) if examples: break return examples async def _describe_unsupported_challenge(self, page: Page) -> str: """给出更贴近真实 challenge 类型的错误信息,避免把 canvas/puzzle 误报成网格 DOM 问题。""" frame = await self._find_frame(page, "challenge", wait_seconds=2) if frame is None: return ( "hCaptcha challenge iframe disappeared before the built-in fallback " "could inspect it" ) prompt = await frame.evaluate(_QUESTION_JS) prompt_text = prompt.strip().lower() if isinstance(prompt, str) else "" has_canvas = await frame.locator("canvas").count() > 0 submit_text = ( await frame.locator(".button-submit").first.inner_text() if await frame.locator(".button-submit").count() > 0 else "" ) if "puzzle piece" in prompt_text or (has_canvas and "skip" in submit_text.lower()): log.warning( "Detected unsupported hCaptcha canvas/puzzle challenge: prompt=%r submit=%r has_canvas=%s", prompt, submit_text, has_canvas, ) return ( "hCaptcha presented a canvas/puzzle challenge, which is not supported " "by the built-in HCaptchaClassification fallback" ) log.warning( "Detected unsupported hCaptcha challenge layout: prompt=%r submit=%r has_canvas=%s", prompt, submit_text, has_canvas, ) return ( "hCaptcha image challenge detected, but the current DOM layout is not " "supported by the built-in classification fallback" ) async def _capture_element_base64(self, element: ElementHandle[Any]) -> str | None: try: image_bytes = await element.screenshot(type="png") except Exception: return None return base64.b64encode(image_bytes).decode("ascii") @staticmethod def _build_classification_payload( *, question: str, tile_images: list[str], examples: list[str] ) -> dict[str, Any]: payload: dict[str, Any] = { "type": "HCaptchaClassification", "question": question, "images": tile_images, } if examples: payload["examples"] = examples return payload @staticmethod def _extract_selection_indices( *, result: dict[str, Any], tile_count: int ) -> list[int]: raw_answer = result.get("answer") if isinstance(raw_answer, bool): indices = [0] if raw_answer and tile_count == 1 else [] elif isinstance(raw_answer, list): indices = [int(idx) for idx in raw_answer if isinstance(idx, int | float)] else: raw_objects = result.get("objects") if isinstance(raw_objects, list): indices = [int(idx) for idx in raw_objects if isinstance(idx, int | float)] else: indices = [] deduped: list[int] = [] for idx in indices: if 0 <= idx < tile_count and idx not in deduped: deduped.append(idx) return deduped async def _click_selected_tiles( self, tiles: list[ElementHandle[Any]], indices: list[int], ) -> None: for idx in indices: await tiles[idx].click(timeout=10_000) await asyncio.sleep(0.2) log.info("Clicked %d hCaptcha tile(s): %s", len(indices), indices) async def _click_verify_button(self, frame: Frame) -> None: for selector in _VERIFY_BUTTON_SELECTORS: button = await frame.query_selector(selector) if button is None: continue await button.click(timeout=10_000) await asyncio.sleep(1) log.info("Submitted hCaptcha challenge with selector %s", selector) return raise RuntimeError("Could not find hCaptcha verify/submit button")