| """HCaptcha solver using Playwright browser automation. |
| |
| Supports ``HCaptchaTaskProxyless`` task type. |
| |
| Strategy: |
| 1. Visit the target page with a realistic browser context. |
| 2. Click the hCaptcha checkbox. |
| 3. If a token is issued immediately, return it. |
| 4. If an image-selection challenge appears, extract the prompt + tile images, |
| call ``ClassificationSolver`` for ``HCaptchaClassification``-style |
| reasoning, click the matching tiles, submit the challenge, and continue |
| polling for the token. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import base64 |
| import logging |
| from typing import Any |
| from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit |
|
|
| from playwright.async_api import Browser, ElementHandle, Frame, Page, Playwright, async_playwright |
|
|
| from ..core.config import Config |
| from .classification import ClassificationSolver |
|
|
| log = logging.getLogger(__name__) |
|
|
| _STEALTH_JS = """ |
| Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); |
| Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']}); |
| Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]}); |
| window.chrome = {runtime: {}, loadTimes: () => {}, csi: () => {}}; |
| """ |
|
|
| _EXTRACT_HCAPTCHA_TOKEN_JS = """ |
| () => { |
| const textarea = document.querySelector('[name="h-captcha-response"]') |
| || document.querySelector('[name="g-recaptcha-response"]'); |
| if (textarea && textarea.value && textarea.value.length > 20) { |
| return textarea.value; |
| } |
| if (window.hcaptcha && typeof window.hcaptcha.getResponse === 'function') { |
| const resp = window.hcaptcha.getResponse(); |
| if (resp && resp.length > 20) return resp; |
| } |
| return null; |
| } |
| """ |
|
|
| _EXTRACT_HCAPTCHA_META_JS = """ |
| () => { |
| const result = { token: null, respKey: null }; |
| const textarea = document.querySelector('[name="h-captcha-response"]') |
| || document.querySelector('[name="g-recaptcha-response"]'); |
| if (textarea && textarea.value && textarea.value.length > 20) { |
| result.token = textarea.value; |
| } |
| try { |
| if (window.hcaptcha) { |
| if (!result.token && typeof window.hcaptcha.getResponse === 'function') { |
| const response = window.hcaptcha.getResponse(); |
| if (response && response.length > 20) { |
| result.token = response; |
| } |
| } |
| if (typeof window.hcaptcha.getRespKey === 'function') { |
| const respKey = window.hcaptcha.getRespKey(); |
| if (respKey) { |
| result.respKey = String(respKey); |
| } |
| } |
| } |
| } catch (err) { |
| return result; |
| } |
| return result; |
| } |
| """ |
|
|
| _QUESTION_JS = """ |
| () => { |
| const prompt = document.querySelector('.prompt-text') |
| || document.querySelector('h2.prompt-text') |
| || document.querySelector('.challenge-prompt') |
| || document.querySelector('[class*="prompt"]'); |
| return prompt?.textContent?.trim() || null; |
| } |
| """ |
|
|
| _CHALLENGE_TILE_SELECTORS = ( |
| ".task-grid .task-image", |
| ".task-grid .task", |
| ".task-grid .image", |
| ".challenge-container .task-image", |
| ".challenge-view .task-image", |
| ".task-image", |
| ".task", |
| ) |
|
|
| _EXAMPLE_IMAGE_SELECTORS = ( |
| ".challenge-example .image", |
| ".challenge-example", |
| ".example-wrapper .image", |
| ) |
|
|
| _VERIFY_BUTTON_SELECTORS = ( |
| ".button-submit", |
| 'button[type="submit"]', |
| 'button[aria-label*="Verify"]', |
| ) |
|
|
| _CHECKBOX_SELECTORS = ( |
| "#checkbox", |
| '[id="checkbox"]', |
| 'div[role="checkbox"]', |
| 'input[type="checkbox"]', |
| '.checkbox', |
| '[aria-checked]', |
| ) |
|
|
| _CHALLENGE_FRAME_HINTS = ( |
| "frame=challenge", |
| "challenge", |
| "hcaptcha-inner", |
| ) |
|
|
| _CHECKBOX_FRAME_HINTS = ( |
| "frame=checkbox", |
| "checkbox", |
| "hcaptcha-invisible", |
| "hcaptcha-checkbox", |
| ) |
|
|
|
|
| class HCaptchaSolver: |
| """Solves ``HCaptchaTaskProxyless`` tasks via Playwright.""" |
|
|
| def __init__( |
| self, |
| config: Config, |
| browser: Browser | None = None, |
| classifier: ClassificationSolver | None = None, |
| ) -> None: |
| self._config = config |
| self._playwright: Playwright | None = None |
| self._browser: Browser | None = browser |
| self._owns_browser = browser is None |
| self._classifier = classifier |
| self._start_lock = asyncio.Lock() |
|
|
| async def start(self) -> None: |
| if self._browser is not None: |
| return |
|
|
| async with self._start_lock: |
| if self._browser is not None: |
| return |
| playwright = await async_playwright().start() |
| try: |
| browser = await playwright.chromium.launch( |
| headless=self._config.browser_headless, |
| args=[ |
| "--disable-blink-features=AutomationControlled", |
| "--no-sandbox", |
| "--disable-dev-shm-usage", |
| "--disable-gpu", |
| ], |
| ) |
| except Exception: |
| await playwright.stop() |
| raise |
| self._playwright = playwright |
| self._browser = browser |
| log.info("HCaptchaSolver browser started lazily") |
|
|
| async def stop(self) -> None: |
| async with self._start_lock: |
| if self._owns_browser: |
| if self._browser: |
| await self._browser.close() |
| self._browser = None |
| if self._playwright: |
| await self._playwright.stop() |
| self._playwright = None |
| log.info("HCaptchaSolver stopped") |
|
|
| async def solve(self, params: dict[str, Any]) -> dict[str, Any]: |
| await self.start() |
| website_url = params["websiteURL"] |
| website_key = params["websiteKey"] |
| enterprise_payload = params.get("enterprisePayload") or {} |
| rqdata = str(enterprise_payload.get("rqdata") or "").strip() |
| if rqdata: |
| log.info("Received hCaptcha enterprisePayload.rqdata (len=%d)", len(rqdata)) |
|
|
| last_error: Exception | None = None |
| for attempt in range(self._config.captcha_retries): |
| try: |
| token, resp_key = await self._solve_once(website_url, website_key) |
| solution = {"gRecaptchaResponse": token} |
| if resp_key: |
| solution["respKey"] = resp_key |
| return solution |
| except Exception as exc: |
| last_error = exc |
| log.warning( |
| "HCaptcha attempt %d/%d failed: %s", |
| attempt + 1, |
| self._config.captcha_retries, |
| exc, |
| ) |
| if attempt < self._config.captcha_retries - 1: |
| await asyncio.sleep(2) |
|
|
| raise RuntimeError( |
| f"HCaptcha failed after {self._config.captcha_retries} attempts: {last_error}" |
| ) |
|
|
| async def _solve_once(self, website_url: str, website_key: str) -> tuple[str, str]: |
| assert self._browser is not None |
| target_url = self._prepare_target_url(website_url, website_key) |
| if target_url != website_url: |
| log.info("Normalized hCaptcha target URL to honor requested sitekey: %s", target_url) |
|
|
| context = await self._browser.new_context( |
| user_agent=( |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) " |
| "Chrome/131.0.0.0 Safari/537.36" |
| ), |
| viewport={"width": 1920, "height": 1080}, |
| locale="en-US", |
| ) |
| page = await context.new_page() |
| await page.add_init_script(_STEALTH_JS) |
|
|
| try: |
| timeout_ms = self._config.browser_timeout * 1000 |
| await page.goto(target_url, wait_until="networkidle", timeout=timeout_ms) |
| await page.mouse.move(400, 300) |
| await asyncio.sleep(1) |
|
|
| await self._click_checkbox(page) |
|
|
| |
| token = await self._wait_for_token(page, seconds=4) |
| if token: |
| resp_key = await self._wait_for_resp_key(page, seconds=2) |
| log.info("Got hCaptcha token directly after checkbox click (len=%d)", len(token)) |
| return token, resp_key |
|
|
| |
| log.info( |
| "No direct hCaptcha token after checkbox click, entering classification fallback" |
| ) |
| fallback_handled = await self._solve_image_selection_challenge(page) |
| if fallback_handled: |
| token = await self._wait_for_token(page) |
| resp_key = await self._wait_for_resp_key(page, seconds=2) |
|
|
| if not isinstance(token, str) or len(token) < 20: |
| raise RuntimeError(f"Invalid hCaptcha token: {token!r}") |
|
|
| log.info("Got hCaptcha token (len=%d)", len(token)) |
| return token, resp_key |
| finally: |
| await context.close() |
|
|
| async def _click_checkbox(self, page: Page) -> None: |
| frame = await self._find_frame(page, "checkbox", wait_seconds=10) |
| if frame is None: |
| raise RuntimeError(self._build_missing_frame_error(page, "checkbox")) |
|
|
| checkbox = await self._find_checkbox_element(frame) |
| if checkbox is None: |
| raise RuntimeError( |
| "Could not find hCaptcha checkbox element inside frame " |
| f"{getattr(frame, 'url', None) or '<empty>'}" |
| ) |
|
|
| await checkbox.click(timeout=10_000) |
| log.info("Clicked hCaptcha checkbox") |
|
|
| async def _wait_for_token(self, page: Page, *, seconds: int | None = None) -> str | None: |
| remaining = max(1, seconds or self._config.captcha_timeout) |
| for _ in range(remaining): |
| token, _ = await self._extract_hcaptcha_meta(page) |
| if isinstance(token, str) and len(token) > 20: |
| return token |
| await asyncio.sleep(1) |
| return None |
|
|
| async def _wait_for_resp_key(self, page: Page, *, seconds: int | None = None) -> str: |
| remaining = max(1, seconds or 2) |
| last_resp_key = "" |
| for _ in range(remaining): |
| _, resp_key = await self._extract_hcaptcha_meta(page) |
| if resp_key: |
| return resp_key |
| await asyncio.sleep(1) |
| return last_resp_key |
|
|
| async def _extract_hcaptcha_meta(self, page: Page) -> tuple[str | None, str]: |
| contexts: list[Page | Frame] = [page] |
| for frame in page.frames: |
| url = (getattr(frame, "url", None) or "").lower() |
| if self._is_hcaptcha_related_frame(url): |
| contexts.append(frame) |
|
|
| token: str | None = None |
| resp_key = "" |
| for context in contexts: |
| try: |
| payload = await context.evaluate(_EXTRACT_HCAPTCHA_META_JS) |
| except Exception: |
| continue |
| if not isinstance(payload, dict): |
| continue |
| current_token = payload.get("token") |
| current_resp_key = payload.get("respKey") |
| if not token and isinstance(current_token, str) and len(current_token) > 20: |
| token = current_token |
| if not resp_key and isinstance(current_resp_key, str) and current_resp_key.strip(): |
| resp_key = current_resp_key.strip() |
| if token and resp_key: |
| break |
| return token, resp_key |
|
|
| async def _find_frame( |
| self, page: Page, frame_role: str, *, wait_seconds: int = 5 |
| ) -> Frame | None: |
| attempts = max(1, wait_seconds * 2) |
| main_frame = getattr(page, "main_frame", None) |
| for _ in range(attempts): |
| exact_match: Frame | None = None |
| hinted_match: Frame | None = None |
| dom_match: Frame | None = None |
| for frame in page.frames: |
| if main_frame is not None and frame is main_frame: |
| continue |
| url = (frame.url or "").lower() |
| if not self._is_hcaptcha_related_frame(url): |
| continue |
|
|
| if frame_role == "checkbox": |
| if await self._find_checkbox_element(frame) is not None: |
| dom_match = dom_match or frame |
| if any(hint in url for hint in _CHECKBOX_FRAME_HINTS): |
| if "frame=checkbox" in url: |
| exact_match = exact_match or frame |
| else: |
| hinted_match = hinted_match or frame |
| elif frame_role == "challenge": |
| if await self._is_challenge_frame(frame): |
| dom_match = dom_match or frame |
| if any(hint in url for hint in _CHALLENGE_FRAME_HINTS): |
| if "frame=challenge" in url: |
| exact_match = exact_match or frame |
| else: |
| hinted_match = hinted_match or frame |
|
|
| if exact_match is not None: |
| return exact_match |
| if dom_match is not None: |
| return dom_match |
| if hinted_match is not None: |
| return hinted_match |
| await asyncio.sleep(0.5) |
| return None |
|
|
| @staticmethod |
| def _is_hcaptcha_related_frame(url: str) -> bool: |
| return ( |
| "hcaptcha" in url |
| or "newassets.hcaptcha.com" in url |
| or "api.hcaptcha.com" in url |
| or "js.stripe.com/v3/hcaptcha" in url |
| ) |
|
|
| async def _find_checkbox_element(self, frame: Frame) -> ElementHandle[Any] | None: |
| for selector in _CHECKBOX_SELECTORS: |
| element = await frame.query_selector(selector) |
| if element is not None: |
| return element |
| return None |
|
|
| async def _is_challenge_frame(self, frame: Frame) -> bool: |
| prompt = await frame.evaluate(_QUESTION_JS) |
| if isinstance(prompt, str) and prompt.strip(): |
| return True |
|
|
| for selector in _CHALLENGE_TILE_SELECTORS: |
| elements = await frame.query_selector_all(selector) |
| if elements: |
| return True |
|
|
| if await frame.locator("canvas").count() > 0: |
| return True |
|
|
| for selector in _VERIFY_BUTTON_SELECTORS: |
| if await frame.query_selector(selector) is not None: |
| return True |
|
|
| return False |
|
|
| @staticmethod |
| def _build_missing_frame_error(page: Page, frame_role: str) -> str: |
| frame_urls = [ |
| getattr(frame, "url", None) or "<empty>" |
| for frame in page.frames |
| ] |
| return ( |
| f"Could not find hCaptcha {frame_role} frame; available frames={frame_urls}" |
| ) |
|
|
| @staticmethod |
| def _prepare_target_url(website_url: str, website_key: str) -> str: |
| """为官方 demo 自动补齐/对齐 sitekey,确保按请求参数测试真实行为。""" |
| if not website_key: |
| return website_url |
|
|
| parsed = urlsplit(website_url) |
| host = parsed.netloc.lower() |
| path = parsed.path.rstrip("/") |
| is_official_demo = host in {"accounts.hcaptcha.com", "demo.hcaptcha.com"} and path == "/demo" |
| if not is_official_demo: |
| return website_url |
|
|
| query = parse_qs(parsed.query, keep_blank_values=True) |
| changed = False |
|
|
| current_sitekey = query.get("sitekey", [None])[0] |
| if current_sitekey != website_key: |
| query["sitekey"] = [website_key] |
| changed = True |
|
|
| if "hl" not in query: |
| query["hl"] = ["en"] |
| changed = True |
|
|
| if not changed: |
| return website_url |
|
|
| return urlunsplit( |
| ( |
| parsed.scheme, |
| parsed.netloc, |
| parsed.path, |
| urlencode(query, doseq=True), |
| parsed.fragment, |
| ) |
| ) |
|
|
| async def _solve_image_selection_challenge(self, page: Page) -> bool: |
| if self._classifier is None: |
| raise RuntimeError( |
| "Classification fallback is unavailable because no ClassificationSolver was injected" |
| ) |
|
|
| rounds = max(1, self._config.captcha_retries) |
| for round_index in range(rounds): |
| token = await self._wait_for_token(page, seconds=1) |
| if token: |
| return True |
|
|
| challenge = await self._collect_selection_challenge(page) |
| if challenge is None: |
| unsupported_reason = await self._describe_unsupported_challenge(page) |
| log.warning( |
| "Could not collect hCaptcha image-selection challenge in round %d: %s", |
| round_index + 1, |
| unsupported_reason, |
| ) |
| if round_index == 0: |
| raise RuntimeError(unsupported_reason) |
| return False |
|
|
| log.info( |
| "Collected hCaptcha image-selection challenge in round %d: question=%r tiles=%d examples=%d", |
| round_index + 1, |
| challenge["question"], |
| len(challenge["tiles"]), |
| len(challenge["examples"]), |
| ) |
| payload = self._build_classification_payload( |
| question=challenge["question"], |
| tile_images=challenge["tile_images"], |
| examples=challenge["examples"], |
| ) |
| result = await self._classifier.solve(payload) |
| log.info("Classification solver returned raw result: %s", result) |
| indices = self._extract_selection_indices( |
| result=result, |
| tile_count=len(challenge["tiles"]), |
| ) |
|
|
| await self._click_selected_tiles(challenge["tiles"], indices) |
| await self._click_verify_button(challenge["frame"]) |
|
|
| token = await self._wait_for_token(page, seconds=6) |
| if token: |
| return True |
|
|
| log.info( |
| "hCaptcha challenge round %d submitted without immediate token, retrying", |
| round_index + 1, |
| ) |
|
|
| return False |
|
|
| async def _collect_selection_challenge(self, page: Page) -> dict[str, Any] | None: |
| frame = await self._find_frame(page, "challenge", wait_seconds=10) |
| if frame is None: |
| return None |
|
|
| await asyncio.sleep(1) |
| question = await frame.evaluate(_QUESTION_JS) |
| if not isinstance(question, str) or not question.strip(): |
| return None |
|
|
| tiles = await self._find_clickable_tiles(frame) |
| if not tiles: |
| return None |
|
|
| tile_entries: list[tuple[ElementHandle[Any], str]] = [] |
| for tile in tiles: |
| encoded = await self._capture_element_base64(tile) |
| if encoded: |
| tile_entries.append((tile, encoded)) |
|
|
| if not tile_entries: |
| return None |
|
|
| return { |
| "frame": frame, |
| "question": question.strip(), |
| "tiles": [tile for tile, _ in tile_entries], |
| "tile_images": [encoded for _, encoded in tile_entries], |
| "examples": await self._extract_example_images(frame), |
| } |
|
|
| async def _find_clickable_tiles(self, frame: Frame) -> list[ElementHandle[Any]]: |
| for selector in _CHALLENGE_TILE_SELECTORS: |
| elements = await frame.query_selector_all(selector) |
| if elements: |
| return elements |
| return [] |
|
|
| async def _extract_example_images(self, frame: Frame) -> list[str]: |
| examples: list[str] = [] |
| for selector in _EXAMPLE_IMAGE_SELECTORS: |
| elements = await frame.query_selector_all(selector) |
| if not elements: |
| continue |
| for element in elements: |
| encoded = await self._capture_element_base64(element) |
| if encoded: |
| examples.append(encoded) |
| if examples: |
| break |
| return examples |
|
|
| async def _describe_unsupported_challenge(self, page: Page) -> str: |
| """给出更贴近真实 challenge 类型的错误信息,避免把 canvas/puzzle 误报成网格 DOM 问题。""" |
| frame = await self._find_frame(page, "challenge", wait_seconds=2) |
| if frame is None: |
| return ( |
| "hCaptcha challenge iframe disappeared before the built-in fallback " |
| "could inspect it" |
| ) |
|
|
| prompt = await frame.evaluate(_QUESTION_JS) |
| prompt_text = prompt.strip().lower() if isinstance(prompt, str) else "" |
| has_canvas = await frame.locator("canvas").count() > 0 |
| submit_text = ( |
| await frame.locator(".button-submit").first.inner_text() |
| if await frame.locator(".button-submit").count() > 0 |
| else "" |
| ) |
|
|
| if "puzzle piece" in prompt_text or (has_canvas and "skip" in submit_text.lower()): |
| log.warning( |
| "Detected unsupported hCaptcha canvas/puzzle challenge: prompt=%r submit=%r has_canvas=%s", |
| prompt, |
| submit_text, |
| has_canvas, |
| ) |
| return ( |
| "hCaptcha presented a canvas/puzzle challenge, which is not supported " |
| "by the built-in HCaptchaClassification fallback" |
| ) |
|
|
| log.warning( |
| "Detected unsupported hCaptcha challenge layout: prompt=%r submit=%r has_canvas=%s", |
| prompt, |
| submit_text, |
| has_canvas, |
| ) |
| return ( |
| "hCaptcha image challenge detected, but the current DOM layout is not " |
| "supported by the built-in classification fallback" |
| ) |
|
|
| async def _capture_element_base64(self, element: ElementHandle[Any]) -> str | None: |
| try: |
| image_bytes = await element.screenshot(type="png") |
| except Exception: |
| return None |
| return base64.b64encode(image_bytes).decode("ascii") |
|
|
| @staticmethod |
| def _build_classification_payload( |
| *, question: str, tile_images: list[str], examples: list[str] |
| ) -> dict[str, Any]: |
| payload: dict[str, Any] = { |
| "type": "HCaptchaClassification", |
| "question": question, |
| "images": tile_images, |
| } |
| if examples: |
| payload["examples"] = examples |
| return payload |
|
|
| @staticmethod |
| def _extract_selection_indices( |
| *, result: dict[str, Any], tile_count: int |
| ) -> list[int]: |
| raw_answer = result.get("answer") |
| if isinstance(raw_answer, bool): |
| indices = [0] if raw_answer and tile_count == 1 else [] |
| elif isinstance(raw_answer, list): |
| indices = [int(idx) for idx in raw_answer if isinstance(idx, int | float)] |
| else: |
| raw_objects = result.get("objects") |
| if isinstance(raw_objects, list): |
| indices = [int(idx) for idx in raw_objects if isinstance(idx, int | float)] |
| else: |
| indices = [] |
|
|
| deduped: list[int] = [] |
| for idx in indices: |
| if 0 <= idx < tile_count and idx not in deduped: |
| deduped.append(idx) |
| return deduped |
|
|
| async def _click_selected_tiles( |
| self, |
| tiles: list[ElementHandle[Any]], |
| indices: list[int], |
| ) -> None: |
| for idx in indices: |
| await tiles[idx].click(timeout=10_000) |
| await asyncio.sleep(0.2) |
| log.info("Clicked %d hCaptcha tile(s): %s", len(indices), indices) |
|
|
| async def _click_verify_button(self, frame: Frame) -> None: |
| for selector in _VERIFY_BUTTON_SELECTORS: |
| button = await frame.query_selector(selector) |
| if button is None: |
| continue |
| await button.click(timeout=10_000) |
| await asyncio.sleep(1) |
| log.info("Submitted hCaptcha challenge with selector %s", selector) |
| return |
| raise RuntimeError("Could not find hCaptcha verify/submit button") |
|
|