mycaptcha / src /services /hcaptcha.py
dragg2's picture
Upload 33 files
c00180e verified
"""HCaptcha solver using Playwright browser automation.
Supports ``HCaptchaTaskProxyless`` task type.
Strategy:
1. Visit the target page with a realistic browser context.
2. Click the hCaptcha checkbox.
3. If a token is issued immediately, return it.
4. If an image-selection challenge appears, extract the prompt + tile images,
call ``ClassificationSolver`` for ``HCaptchaClassification``-style
reasoning, click the matching tiles, submit the challenge, and continue
polling for the token.
"""
from __future__ import annotations
import asyncio
import base64
import logging
from typing import Any
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit
from playwright.async_api import Browser, ElementHandle, Frame, Page, Playwright, async_playwright
from ..core.config import Config
from .classification import ClassificationSolver
log = logging.getLogger(__name__)
_STEALTH_JS = """
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
window.chrome = {runtime: {}, loadTimes: () => {}, csi: () => {}};
"""
_EXTRACT_HCAPTCHA_TOKEN_JS = """
() => {
const textarea = document.querySelector('[name="h-captcha-response"]')
|| document.querySelector('[name="g-recaptcha-response"]');
if (textarea && textarea.value && textarea.value.length > 20) {
return textarea.value;
}
if (window.hcaptcha && typeof window.hcaptcha.getResponse === 'function') {
const resp = window.hcaptcha.getResponse();
if (resp && resp.length > 20) return resp;
}
return null;
}
"""
_EXTRACT_HCAPTCHA_META_JS = """
() => {
const result = { token: null, respKey: null };
const textarea = document.querySelector('[name="h-captcha-response"]')
|| document.querySelector('[name="g-recaptcha-response"]');
if (textarea && textarea.value && textarea.value.length > 20) {
result.token = textarea.value;
}
try {
if (window.hcaptcha) {
if (!result.token && typeof window.hcaptcha.getResponse === 'function') {
const response = window.hcaptcha.getResponse();
if (response && response.length > 20) {
result.token = response;
}
}
if (typeof window.hcaptcha.getRespKey === 'function') {
const respKey = window.hcaptcha.getRespKey();
if (respKey) {
result.respKey = String(respKey);
}
}
}
} catch (err) {
return result;
}
return result;
}
"""
_QUESTION_JS = """
() => {
const prompt = document.querySelector('.prompt-text')
|| document.querySelector('h2.prompt-text')
|| document.querySelector('.challenge-prompt')
|| document.querySelector('[class*="prompt"]');
return prompt?.textContent?.trim() || null;
}
"""
_CHALLENGE_TILE_SELECTORS = (
".task-grid .task-image",
".task-grid .task",
".task-grid .image",
".challenge-container .task-image",
".challenge-view .task-image",
".task-image",
".task",
)
_EXAMPLE_IMAGE_SELECTORS = (
".challenge-example .image",
".challenge-example",
".example-wrapper .image",
)
_VERIFY_BUTTON_SELECTORS = (
".button-submit",
'button[type="submit"]',
'button[aria-label*="Verify"]',
)
_CHECKBOX_SELECTORS = (
"#checkbox",
'[id="checkbox"]',
'div[role="checkbox"]',
'input[type="checkbox"]',
'.checkbox',
'[aria-checked]',
)
_CHALLENGE_FRAME_HINTS = (
"frame=challenge",
"challenge",
"hcaptcha-inner",
)
_CHECKBOX_FRAME_HINTS = (
"frame=checkbox",
"checkbox",
"hcaptcha-invisible",
"hcaptcha-checkbox",
)
class HCaptchaSolver:
"""Solves ``HCaptchaTaskProxyless`` tasks via Playwright."""
def __init__(
self,
config: Config,
browser: Browser | None = None,
classifier: ClassificationSolver | None = None,
) -> None:
self._config = config
self._playwright: Playwright | None = None
self._browser: Browser | None = browser
self._owns_browser = browser is None
self._classifier = classifier
self._start_lock = asyncio.Lock()
async def start(self) -> None:
if self._browser is not None:
return
async with self._start_lock:
if self._browser is not None:
return
playwright = await async_playwright().start()
try:
browser = await playwright.chromium.launch(
headless=self._config.browser_headless,
args=[
"--disable-blink-features=AutomationControlled",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
],
)
except Exception:
await playwright.stop()
raise
self._playwright = playwright
self._browser = browser
log.info("HCaptchaSolver browser started lazily")
async def stop(self) -> None:
async with self._start_lock:
if self._owns_browser:
if self._browser:
await self._browser.close()
self._browser = None
if self._playwright:
await self._playwright.stop()
self._playwright = None
log.info("HCaptchaSolver stopped")
async def solve(self, params: dict[str, Any]) -> dict[str, Any]:
await self.start()
website_url = params["websiteURL"]
website_key = params["websiteKey"]
enterprise_payload = params.get("enterprisePayload") or {}
rqdata = str(enterprise_payload.get("rqdata") or "").strip()
if rqdata:
log.info("Received hCaptcha enterprisePayload.rqdata (len=%d)", len(rqdata))
last_error: Exception | None = None
for attempt in range(self._config.captcha_retries):
try:
token, resp_key = await self._solve_once(website_url, website_key)
solution = {"gRecaptchaResponse": token}
if resp_key:
solution["respKey"] = resp_key
return solution
except Exception as exc:
last_error = exc
log.warning(
"HCaptcha attempt %d/%d failed: %s",
attempt + 1,
self._config.captcha_retries,
exc,
)
if attempt < self._config.captcha_retries - 1:
await asyncio.sleep(2)
raise RuntimeError(
f"HCaptcha failed after {self._config.captcha_retries} attempts: {last_error}"
)
async def _solve_once(self, website_url: str, website_key: str) -> tuple[str, str]:
assert self._browser is not None
target_url = self._prepare_target_url(website_url, website_key)
if target_url != website_url:
log.info("Normalized hCaptcha target URL to honor requested sitekey: %s", target_url)
context = await self._browser.new_context(
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
),
viewport={"width": 1920, "height": 1080},
locale="en-US",
)
page = await context.new_page()
await page.add_init_script(_STEALTH_JS)
try:
timeout_ms = self._config.browser_timeout * 1000
await page.goto(target_url, wait_until="networkidle", timeout=timeout_ms)
await page.mouse.move(400, 300)
await asyncio.sleep(1)
await self._click_checkbox(page)
# 先给低风险会话一个直接出 token 的机会。
token = await self._wait_for_token(page, seconds=4)
if token:
resp_key = await self._wait_for_resp_key(page, seconds=2)
log.info("Got hCaptcha token directly after checkbox click (len=%d)", len(token))
return token, resp_key
# 无头环境常见路径:进入图片 challenge,然后走 classification fallback。
log.info(
"No direct hCaptcha token after checkbox click, entering classification fallback"
)
fallback_handled = await self._solve_image_selection_challenge(page)
if fallback_handled:
token = await self._wait_for_token(page)
resp_key = await self._wait_for_resp_key(page, seconds=2)
if not isinstance(token, str) or len(token) < 20:
raise RuntimeError(f"Invalid hCaptcha token: {token!r}")
log.info("Got hCaptcha token (len=%d)", len(token))
return token, resp_key
finally:
await context.close()
async def _click_checkbox(self, page: Page) -> None:
frame = await self._find_frame(page, "checkbox", wait_seconds=10)
if frame is None:
raise RuntimeError(self._build_missing_frame_error(page, "checkbox"))
checkbox = await self._find_checkbox_element(frame)
if checkbox is None:
raise RuntimeError(
"Could not find hCaptcha checkbox element inside frame "
f"{getattr(frame, 'url', None) or '<empty>'}"
)
await checkbox.click(timeout=10_000)
log.info("Clicked hCaptcha checkbox")
async def _wait_for_token(self, page: Page, *, seconds: int | None = None) -> str | None:
remaining = max(1, seconds or self._config.captcha_timeout)
for _ in range(remaining):
token, _ = await self._extract_hcaptcha_meta(page)
if isinstance(token, str) and len(token) > 20:
return token
await asyncio.sleep(1)
return None
async def _wait_for_resp_key(self, page: Page, *, seconds: int | None = None) -> str:
remaining = max(1, seconds or 2)
last_resp_key = ""
for _ in range(remaining):
_, resp_key = await self._extract_hcaptcha_meta(page)
if resp_key:
return resp_key
await asyncio.sleep(1)
return last_resp_key
async def _extract_hcaptcha_meta(self, page: Page) -> tuple[str | None, str]:
contexts: list[Page | Frame] = [page]
for frame in page.frames:
url = (getattr(frame, "url", None) or "").lower()
if self._is_hcaptcha_related_frame(url):
contexts.append(frame)
token: str | None = None
resp_key = ""
for context in contexts:
try:
payload = await context.evaluate(_EXTRACT_HCAPTCHA_META_JS)
except Exception:
continue
if not isinstance(payload, dict):
continue
current_token = payload.get("token")
current_resp_key = payload.get("respKey")
if not token and isinstance(current_token, str) and len(current_token) > 20:
token = current_token
if not resp_key and isinstance(current_resp_key, str) and current_resp_key.strip():
resp_key = current_resp_key.strip()
if token and resp_key:
break
return token, resp_key
async def _find_frame(
self, page: Page, frame_role: str, *, wait_seconds: int = 5
) -> Frame | None:
attempts = max(1, wait_seconds * 2)
main_frame = getattr(page, "main_frame", None)
for _ in range(attempts):
exact_match: Frame | None = None
hinted_match: Frame | None = None
dom_match: Frame | None = None
for frame in page.frames:
if main_frame is not None and frame is main_frame:
continue
url = (frame.url or "").lower()
if not self._is_hcaptcha_related_frame(url):
continue
if frame_role == "checkbox":
if await self._find_checkbox_element(frame) is not None:
dom_match = dom_match or frame
if any(hint in url for hint in _CHECKBOX_FRAME_HINTS):
if "frame=checkbox" in url:
exact_match = exact_match or frame
else:
hinted_match = hinted_match or frame
elif frame_role == "challenge":
if await self._is_challenge_frame(frame):
dom_match = dom_match or frame
if any(hint in url for hint in _CHALLENGE_FRAME_HINTS):
if "frame=challenge" in url:
exact_match = exact_match or frame
else:
hinted_match = hinted_match or frame
if exact_match is not None:
return exact_match
if dom_match is not None:
return dom_match
if hinted_match is not None:
return hinted_match
await asyncio.sleep(0.5)
return None
@staticmethod
def _is_hcaptcha_related_frame(url: str) -> bool:
return (
"hcaptcha" in url
or "newassets.hcaptcha.com" in url
or "api.hcaptcha.com" in url
or "js.stripe.com/v3/hcaptcha" in url
)
async def _find_checkbox_element(self, frame: Frame) -> ElementHandle[Any] | None:
for selector in _CHECKBOX_SELECTORS:
element = await frame.query_selector(selector)
if element is not None:
return element
return None
async def _is_challenge_frame(self, frame: Frame) -> bool:
prompt = await frame.evaluate(_QUESTION_JS)
if isinstance(prompt, str) and prompt.strip():
return True
for selector in _CHALLENGE_TILE_SELECTORS:
elements = await frame.query_selector_all(selector)
if elements:
return True
if await frame.locator("canvas").count() > 0:
return True
for selector in _VERIFY_BUTTON_SELECTORS:
if await frame.query_selector(selector) is not None:
return True
return False
@staticmethod
def _build_missing_frame_error(page: Page, frame_role: str) -> str:
frame_urls = [
getattr(frame, "url", None) or "<empty>"
for frame in page.frames
]
return (
f"Could not find hCaptcha {frame_role} frame; available frames={frame_urls}"
)
@staticmethod
def _prepare_target_url(website_url: str, website_key: str) -> str:
"""为官方 demo 自动补齐/对齐 sitekey,确保按请求参数测试真实行为。"""
if not website_key:
return website_url
parsed = urlsplit(website_url)
host = parsed.netloc.lower()
path = parsed.path.rstrip("/")
is_official_demo = host in {"accounts.hcaptcha.com", "demo.hcaptcha.com"} and path == "/demo"
if not is_official_demo:
return website_url
query = parse_qs(parsed.query, keep_blank_values=True)
changed = False
current_sitekey = query.get("sitekey", [None])[0]
if current_sitekey != website_key:
query["sitekey"] = [website_key]
changed = True
if "hl" not in query:
query["hl"] = ["en"]
changed = True
if not changed:
return website_url
return urlunsplit(
(
parsed.scheme,
parsed.netloc,
parsed.path,
urlencode(query, doseq=True),
parsed.fragment,
)
)
async def _solve_image_selection_challenge(self, page: Page) -> bool:
if self._classifier is None:
raise RuntimeError(
"Classification fallback is unavailable because no ClassificationSolver was injected"
)
rounds = max(1, self._config.captcha_retries)
for round_index in range(rounds):
token = await self._wait_for_token(page, seconds=1)
if token:
return True
challenge = await self._collect_selection_challenge(page)
if challenge is None:
unsupported_reason = await self._describe_unsupported_challenge(page)
log.warning(
"Could not collect hCaptcha image-selection challenge in round %d: %s",
round_index + 1,
unsupported_reason,
)
if round_index == 0:
raise RuntimeError(unsupported_reason)
return False
log.info(
"Collected hCaptcha image-selection challenge in round %d: question=%r tiles=%d examples=%d",
round_index + 1,
challenge["question"],
len(challenge["tiles"]),
len(challenge["examples"]),
)
payload = self._build_classification_payload(
question=challenge["question"],
tile_images=challenge["tile_images"],
examples=challenge["examples"],
)
result = await self._classifier.solve(payload)
log.info("Classification solver returned raw result: %s", result)
indices = self._extract_selection_indices(
result=result,
tile_count=len(challenge["tiles"]),
)
await self._click_selected_tiles(challenge["tiles"], indices)
await self._click_verify_button(challenge["frame"])
token = await self._wait_for_token(page, seconds=6)
if token:
return True
log.info(
"hCaptcha challenge round %d submitted without immediate token, retrying",
round_index + 1,
)
return False
async def _collect_selection_challenge(self, page: Page) -> dict[str, Any] | None:
frame = await self._find_frame(page, "challenge", wait_seconds=10)
if frame is None:
return None
await asyncio.sleep(1)
question = await frame.evaluate(_QUESTION_JS)
if not isinstance(question, str) or not question.strip():
return None
tiles = await self._find_clickable_tiles(frame)
if not tiles:
return None
tile_entries: list[tuple[ElementHandle[Any], str]] = []
for tile in tiles:
encoded = await self._capture_element_base64(tile)
if encoded:
tile_entries.append((tile, encoded))
if not tile_entries:
return None
return {
"frame": frame,
"question": question.strip(),
"tiles": [tile for tile, _ in tile_entries],
"tile_images": [encoded for _, encoded in tile_entries],
"examples": await self._extract_example_images(frame),
}
async def _find_clickable_tiles(self, frame: Frame) -> list[ElementHandle[Any]]:
for selector in _CHALLENGE_TILE_SELECTORS:
elements = await frame.query_selector_all(selector)
if elements:
return elements
return []
async def _extract_example_images(self, frame: Frame) -> list[str]:
examples: list[str] = []
for selector in _EXAMPLE_IMAGE_SELECTORS:
elements = await frame.query_selector_all(selector)
if not elements:
continue
for element in elements:
encoded = await self._capture_element_base64(element)
if encoded:
examples.append(encoded)
if examples:
break
return examples
async def _describe_unsupported_challenge(self, page: Page) -> str:
"""给出更贴近真实 challenge 类型的错误信息,避免把 canvas/puzzle 误报成网格 DOM 问题。"""
frame = await self._find_frame(page, "challenge", wait_seconds=2)
if frame is None:
return (
"hCaptcha challenge iframe disappeared before the built-in fallback "
"could inspect it"
)
prompt = await frame.evaluate(_QUESTION_JS)
prompt_text = prompt.strip().lower() if isinstance(prompt, str) else ""
has_canvas = await frame.locator("canvas").count() > 0
submit_text = (
await frame.locator(".button-submit").first.inner_text()
if await frame.locator(".button-submit").count() > 0
else ""
)
if "puzzle piece" in prompt_text or (has_canvas and "skip" in submit_text.lower()):
log.warning(
"Detected unsupported hCaptcha canvas/puzzle challenge: prompt=%r submit=%r has_canvas=%s",
prompt,
submit_text,
has_canvas,
)
return (
"hCaptcha presented a canvas/puzzle challenge, which is not supported "
"by the built-in HCaptchaClassification fallback"
)
log.warning(
"Detected unsupported hCaptcha challenge layout: prompt=%r submit=%r has_canvas=%s",
prompt,
submit_text,
has_canvas,
)
return (
"hCaptcha image challenge detected, but the current DOM layout is not "
"supported by the built-in classification fallback"
)
async def _capture_element_base64(self, element: ElementHandle[Any]) -> str | None:
try:
image_bytes = await element.screenshot(type="png")
except Exception:
return None
return base64.b64encode(image_bytes).decode("ascii")
@staticmethod
def _build_classification_payload(
*, question: str, tile_images: list[str], examples: list[str]
) -> dict[str, Any]:
payload: dict[str, Any] = {
"type": "HCaptchaClassification",
"question": question,
"images": tile_images,
}
if examples:
payload["examples"] = examples
return payload
@staticmethod
def _extract_selection_indices(
*, result: dict[str, Any], tile_count: int
) -> list[int]:
raw_answer = result.get("answer")
if isinstance(raw_answer, bool):
indices = [0] if raw_answer and tile_count == 1 else []
elif isinstance(raw_answer, list):
indices = [int(idx) for idx in raw_answer if isinstance(idx, int | float)]
else:
raw_objects = result.get("objects")
if isinstance(raw_objects, list):
indices = [int(idx) for idx in raw_objects if isinstance(idx, int | float)]
else:
indices = []
deduped: list[int] = []
for idx in indices:
if 0 <= idx < tile_count and idx not in deduped:
deduped.append(idx)
return deduped
async def _click_selected_tiles(
self,
tiles: list[ElementHandle[Any]],
indices: list[int],
) -> None:
for idx in indices:
await tiles[idx].click(timeout=10_000)
await asyncio.sleep(0.2)
log.info("Clicked %d hCaptcha tile(s): %s", len(indices), indices)
async def _click_verify_button(self, frame: Frame) -> None:
for selector in _VERIFY_BUTTON_SELECTORS:
button = await frame.query_selector(selector)
if button is None:
continue
await button.click(timeout=10_000)
await asyncio.sleep(1)
log.info("Submitted hCaptcha challenge with selector %s", selector)
return
raise RuntimeError("Could not find hCaptcha verify/submit button")