dragg2 commited on
Commit
f0f845d
·
verified ·
1 Parent(s): 30c93e8

Delete hcaptcha.py

Browse files
Files changed (1) hide show
  1. hcaptcha.py +0 -577
hcaptcha.py DELETED
@@ -1,577 +0,0 @@
1
- """HCaptcha solver using Playwright browser automation.
2
-
3
- Supports ``HCaptchaTaskProxyless`` task type.
4
-
5
- Strategy:
6
- 1. Visit the target page with a realistic browser context.
7
- 2. Click the hCaptcha checkbox.
8
- 3. If a token is issued immediately, return it.
9
- 4. If an image-selection challenge appears, extract the prompt + tile images,
10
- call ``ClassificationSolver`` for ``HCaptchaClassification``-style
11
- reasoning, click the matching tiles, submit the challenge, and continue
12
- polling for the token.
13
- """
14
-
15
- from __future__ import annotations
16
-
17
- import asyncio
18
- import base64
19
- import logging
20
- from typing import Any
21
- from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit
22
-
23
- from playwright.async_api import Browser, ElementHandle, Frame, Page, Playwright, async_playwright
24
-
25
- from ..core.config import Config
26
- from .classification import ClassificationSolver
27
-
28
- log = logging.getLogger(__name__)
29
-
30
- _STEALTH_JS = """
31
- Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
32
- Object.defineProperty(navigator, 'languages', {get: () => ['en-US', 'en']});
33
- Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
34
- window.chrome = {runtime: {}, loadTimes: () => {}, csi: () => {}};
35
- """
36
-
37
- _EXTRACT_HCAPTCHA_TOKEN_JS = """
38
- () => {
39
- const textarea = document.querySelector('[name="h-captcha-response"]')
40
- || document.querySelector('[name="g-recaptcha-response"]');
41
- if (textarea && textarea.value && textarea.value.length > 20) {
42
- return textarea.value;
43
- }
44
- if (window.hcaptcha && typeof window.hcaptcha.getResponse === 'function') {
45
- const resp = window.hcaptcha.getResponse();
46
- if (resp && resp.length > 20) return resp;
47
- }
48
- return null;
49
- }
50
- """
51
-
52
- _QUESTION_JS = """
53
- () => {
54
- const prompt = document.querySelector('.prompt-text')
55
- || document.querySelector('h2.prompt-text')
56
- || document.querySelector('.challenge-prompt')
57
- || document.querySelector('[class*="prompt"]');
58
- return prompt?.textContent?.trim() || null;
59
- }
60
- """
61
-
62
- _CHALLENGE_TILE_SELECTORS = (
63
- ".task-grid .task-image",
64
- ".task-grid .task",
65
- ".task-grid .image",
66
- ".challenge-container .task-image",
67
- ".challenge-view .task-image",
68
- ".task-image",
69
- ".task",
70
- )
71
-
72
- _EXAMPLE_IMAGE_SELECTORS = (
73
- ".challenge-example .image",
74
- ".challenge-example",
75
- ".example-wrapper .image",
76
- )
77
-
78
- _VERIFY_BUTTON_SELECTORS = (
79
- ".button-submit",
80
- 'button[type="submit"]',
81
- 'button[aria-label*="Verify"]',
82
- )
83
-
84
- _CHECKBOX_SELECTORS = (
85
- "#checkbox",
86
- '[id="checkbox"]',
87
- 'div[role="checkbox"]',
88
- 'input[type="checkbox"]',
89
- '.checkbox',
90
- '[aria-checked]',
91
- )
92
-
93
- _CHALLENGE_FRAME_HINTS = (
94
- "frame=challenge",
95
- "challenge",
96
- "hcaptcha-inner",
97
- )
98
-
99
- _CHECKBOX_FRAME_HINTS = (
100
- "frame=checkbox",
101
- "checkbox",
102
- "hcaptcha-invisible",
103
- "hcaptcha-checkbox",
104
- )
105
-
106
-
107
- class HCaptchaSolver:
108
- """Solves ``HCaptchaTaskProxyless`` tasks via Playwright."""
109
-
110
- def __init__(
111
- self,
112
- config: Config,
113
- browser: Browser | None = None,
114
- classifier: ClassificationSolver | None = None,
115
- ) -> None:
116
- self._config = config
117
- self._playwright: Playwright | None = None
118
- self._browser: Browser | None = browser
119
- self._owns_browser = browser is None
120
- self._classifier = classifier
121
-
122
- async def start(self) -> None:
123
- if self._browser is not None:
124
- return
125
- self._playwright = await async_playwright().start()
126
- self._browser = await self._playwright.chromium.launch(
127
- headless=self._config.browser_headless,
128
- args=[
129
- "--disable-blink-features=AutomationControlled",
130
- "--no-sandbox",
131
- "--disable-dev-shm-usage",
132
- "--disable-gpu",
133
- ],
134
- )
135
- log.info("HCaptchaSolver browser started")
136
-
137
- async def stop(self) -> None:
138
- if self._owns_browser:
139
- if self._browser:
140
- await self._browser.close()
141
- if self._playwright:
142
- await self._playwright.stop()
143
- log.info("HCaptchaSolver stopped")
144
-
145
- async def solve(self, params: dict[str, Any]) -> dict[str, Any]:
146
- website_url = params["websiteURL"]
147
- website_key = params["websiteKey"]
148
-
149
- last_error: Exception | None = None
150
- for attempt in range(self._config.captcha_retries):
151
- try:
152
- token = await self._solve_once(website_url, website_key)
153
- return {"gRecaptchaResponse": token}
154
- except Exception as exc:
155
- last_error = exc
156
- log.warning(
157
- "HCaptcha attempt %d/%d failed: %s",
158
- attempt + 1,
159
- self._config.captcha_retries,
160
- exc,
161
- )
162
- if attempt < self._config.captcha_retries - 1:
163
- await asyncio.sleep(2)
164
-
165
- raise RuntimeError(
166
- f"HCaptcha failed after {self._config.captcha_retries} attempts: {last_error}"
167
- )
168
-
169
- async def _solve_once(self, website_url: str, website_key: str) -> str:
170
- assert self._browser is not None
171
- target_url = self._prepare_target_url(website_url, website_key)
172
- if target_url != website_url:
173
- log.info("Normalized hCaptcha target URL to honor requested sitekey: %s", target_url)
174
-
175
- context = await self._browser.new_context(
176
- user_agent=(
177
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
178
- "AppleWebKit/537.36 (KHTML, like Gecko) "
179
- "Chrome/131.0.0.0 Safari/537.36"
180
- ),
181
- viewport={"width": 1920, "height": 1080},
182
- locale="en-US",
183
- )
184
- page = await context.new_page()
185
- await page.add_init_script(_STEALTH_JS)
186
-
187
- try:
188
- timeout_ms = self._config.browser_timeout * 1000
189
- await page.goto(target_url, wait_until="networkidle", timeout=timeout_ms)
190
- await page.mouse.move(400, 300)
191
- await asyncio.sleep(1)
192
-
193
- await self._click_checkbox(page)
194
-
195
- # 先给低风险会话一个直接出 token 的机会。
196
- token = await self._wait_for_token(page, seconds=4)
197
- if token:
198
- log.info("Got hCaptcha token directly after checkbox click (len=%d)", len(token))
199
- return token
200
-
201
- # 无头环境常见路径:进入图片 challenge,然后走 classification fallback。
202
- log.info(
203
- "No direct hCaptcha token after checkbox click, entering classification fallback"
204
- )
205
- fallback_handled = await self._solve_image_selection_challenge(page)
206
- if fallback_handled:
207
- token = await self._wait_for_token(page)
208
-
209
- if not isinstance(token, str) or len(token) < 20:
210
- raise RuntimeError(f"Invalid hCaptcha token: {token!r}")
211
-
212
- log.info("Got hCaptcha token (len=%d)", len(token))
213
- return token
214
- finally:
215
- await context.close()
216
-
217
- async def _click_checkbox(self, page: Page) -> None:
218
- frame = await self._find_frame(page, "checkbox", wait_seconds=10)
219
- if frame is None:
220
- raise RuntimeError(self._build_missing_frame_error(page, "checkbox"))
221
-
222
- checkbox = await self._find_checkbox_element(frame)
223
- if checkbox is None:
224
- raise RuntimeError(
225
- "Could not find hCaptcha checkbox element inside frame "
226
- f"{getattr(frame, 'url', None) or '<empty>'}"
227
- )
228
-
229
- await checkbox.click(timeout=10_000)
230
- log.info("Clicked hCaptcha checkbox")
231
-
232
- async def _wait_for_token(self, page: Page, *, seconds: int | None = None) -> str | None:
233
- remaining = max(1, seconds or self._config.captcha_timeout)
234
- for _ in range(remaining):
235
- token = await page.evaluate(_EXTRACT_HCAPTCHA_TOKEN_JS)
236
- if isinstance(token, str) and len(token) > 20:
237
- return token
238
- await asyncio.sleep(1)
239
- return None
240
-
241
- async def _find_frame(
242
- self, page: Page, frame_role: str, *, wait_seconds: int = 5
243
- ) -> Frame | None:
244
- attempts = max(1, wait_seconds * 2)
245
- main_frame = getattr(page, "main_frame", None)
246
- for _ in range(attempts):
247
- exact_match: Frame | None = None
248
- hinted_match: Frame | None = None
249
- dom_match: Frame | None = None
250
- for frame in page.frames:
251
- if main_frame is not None and frame is main_frame:
252
- continue
253
- url = (frame.url or "").lower()
254
- if not self._is_hcaptcha_related_frame(url):
255
- continue
256
-
257
- if frame_role == "checkbox":
258
- if await self._find_checkbox_element(frame) is not None:
259
- dom_match = dom_match or frame
260
- if any(hint in url for hint in _CHECKBOX_FRAME_HINTS):
261
- if "frame=checkbox" in url:
262
- exact_match = exact_match or frame
263
- else:
264
- hinted_match = hinted_match or frame
265
- elif frame_role == "challenge":
266
- if await self._is_challenge_frame(frame):
267
- dom_match = dom_match or frame
268
- if any(hint in url for hint in _CHALLENGE_FRAME_HINTS):
269
- if "frame=challenge" in url:
270
- exact_match = exact_match or frame
271
- else:
272
- hinted_match = hinted_match or frame
273
-
274
- if exact_match is not None:
275
- return exact_match
276
- if dom_match is not None:
277
- return dom_match
278
- if hinted_match is not None:
279
- return hinted_match
280
- await asyncio.sleep(0.5)
281
- return None
282
-
283
- @staticmethod
284
- def _is_hcaptcha_related_frame(url: str) -> bool:
285
- return (
286
- "hcaptcha" in url
287
- or "newassets.hcaptcha.com" in url
288
- or "api.hcaptcha.com" in url
289
- or "js.stripe.com/v3/hcaptcha" in url
290
- )
291
-
292
- async def _find_checkbox_element(self, frame: Frame) -> ElementHandle[Any] | None:
293
- for selector in _CHECKBOX_SELECTORS:
294
- element = await frame.query_selector(selector)
295
- if element is not None:
296
- return element
297
- return None
298
-
299
- async def _is_challenge_frame(self, frame: Frame) -> bool:
300
- prompt = await frame.evaluate(_QUESTION_JS)
301
- if isinstance(prompt, str) and prompt.strip():
302
- return True
303
-
304
- for selector in _CHALLENGE_TILE_SELECTORS:
305
- elements = await frame.query_selector_all(selector)
306
- if elements:
307
- return True
308
-
309
- if await frame.locator("canvas").count() > 0:
310
- return True
311
-
312
- for selector in _VERIFY_BUTTON_SELECTORS:
313
- if await frame.query_selector(selector) is not None:
314
- return True
315
-
316
- return False
317
-
318
- @staticmethod
319
- def _build_missing_frame_error(page: Page, frame_role: str) -> str:
320
- frame_urls = [
321
- getattr(frame, "url", None) or "<empty>"
322
- for frame in page.frames
323
- ]
324
- return (
325
- f"Could not find hCaptcha {frame_role} frame; available frames={frame_urls}"
326
- )
327
-
328
- @staticmethod
329
- def _prepare_target_url(website_url: str, website_key: str) -> str:
330
- """为官方 demo 自动补齐/对齐 sitekey,确保按请求参数测试真实行为。"""
331
- if not website_key:
332
- return website_url
333
-
334
- parsed = urlsplit(website_url)
335
- host = parsed.netloc.lower()
336
- path = parsed.path.rstrip("/")
337
- is_official_demo = host in {"accounts.hcaptcha.com", "demo.hcaptcha.com"} and path == "/demo"
338
- if not is_official_demo:
339
- return website_url
340
-
341
- query = parse_qs(parsed.query, keep_blank_values=True)
342
- changed = False
343
-
344
- current_sitekey = query.get("sitekey", [None])[0]
345
- if current_sitekey != website_key:
346
- query["sitekey"] = [website_key]
347
- changed = True
348
-
349
- if "hl" not in query:
350
- query["hl"] = ["en"]
351
- changed = True
352
-
353
- if not changed:
354
- return website_url
355
-
356
- return urlunsplit(
357
- (
358
- parsed.scheme,
359
- parsed.netloc,
360
- parsed.path,
361
- urlencode(query, doseq=True),
362
- parsed.fragment,
363
- )
364
- )
365
-
366
- async def _solve_image_selection_challenge(self, page: Page) -> bool:
367
- if self._classifier is None:
368
- raise RuntimeError(
369
- "Classification fallback is unavailable because no ClassificationSolver was injected"
370
- )
371
-
372
- rounds = max(1, self._config.captcha_retries)
373
- for round_index in range(rounds):
374
- token = await self._wait_for_token(page, seconds=1)
375
- if token:
376
- return True
377
-
378
- challenge = await self._collect_selection_challenge(page)
379
- if challenge is None:
380
- unsupported_reason = await self._describe_unsupported_challenge(page)
381
- log.warning(
382
- "Could not collect hCaptcha image-selection challenge in round %d: %s",
383
- round_index + 1,
384
- unsupported_reason,
385
- )
386
- if round_index == 0:
387
- raise RuntimeError(unsupported_reason)
388
- return False
389
-
390
- log.info(
391
- "Collected hCaptcha image-selection challenge in round %d: question=%r tiles=%d examples=%d",
392
- round_index + 1,
393
- challenge["question"],
394
- len(challenge["tiles"]),
395
- len(challenge["examples"]),
396
- )
397
- payload = self._build_classification_payload(
398
- question=challenge["question"],
399
- tile_images=challenge["tile_images"],
400
- examples=challenge["examples"],
401
- )
402
- result = await self._classifier.solve(payload)
403
- log.info("Classification solver returned raw result: %s", result)
404
- indices = self._extract_selection_indices(
405
- result=result,
406
- tile_count=len(challenge["tiles"]),
407
- )
408
-
409
- await self._click_selected_tiles(challenge["tiles"], indices)
410
- await self._click_verify_button(challenge["frame"])
411
-
412
- token = await self._wait_for_token(page, seconds=6)
413
- if token:
414
- return True
415
-
416
- log.info(
417
- "hCaptcha challenge round %d submitted without immediate token, retrying",
418
- round_index + 1,
419
- )
420
-
421
- return False
422
-
423
- async def _collect_selection_challenge(self, page: Page) -> dict[str, Any] | None:
424
- frame = await self._find_frame(page, "challenge", wait_seconds=10)
425
- if frame is None:
426
- return None
427
-
428
- await asyncio.sleep(1)
429
- question = await frame.evaluate(_QUESTION_JS)
430
- if not isinstance(question, str) or not question.strip():
431
- return None
432
-
433
- tiles = await self._find_clickable_tiles(frame)
434
- if not tiles:
435
- return None
436
-
437
- tile_entries: list[tuple[ElementHandle[Any], str]] = []
438
- for tile in tiles:
439
- encoded = await self._capture_element_base64(tile)
440
- if encoded:
441
- tile_entries.append((tile, encoded))
442
-
443
- if not tile_entries:
444
- return None
445
-
446
- return {
447
- "frame": frame,
448
- "question": question.strip(),
449
- "tiles": [tile for tile, _ in tile_entries],
450
- "tile_images": [encoded for _, encoded in tile_entries],
451
- "examples": await self._extract_example_images(frame),
452
- }
453
-
454
- async def _find_clickable_tiles(self, frame: Frame) -> list[ElementHandle[Any]]:
455
- for selector in _CHALLENGE_TILE_SELECTORS:
456
- elements = await frame.query_selector_all(selector)
457
- if elements:
458
- return elements
459
- return []
460
-
461
- async def _extract_example_images(self, frame: Frame) -> list[str]:
462
- examples: list[str] = []
463
- for selector in _EXAMPLE_IMAGE_SELECTORS:
464
- elements = await frame.query_selector_all(selector)
465
- if not elements:
466
- continue
467
- for element in elements:
468
- encoded = await self._capture_element_base64(element)
469
- if encoded:
470
- examples.append(encoded)
471
- if examples:
472
- break
473
- return examples
474
-
475
- async def _describe_unsupported_challenge(self, page: Page) -> str:
476
- """给出更贴近真实 challenge 类型的错误信息,避免把 canvas/puzzle 误报成网格 DOM 问题。"""
477
- frame = await self._find_frame(page, "challenge", wait_seconds=2)
478
- if frame is None:
479
- return (
480
- "hCaptcha challenge iframe disappeared before the built-in fallback "
481
- "could inspect it"
482
- )
483
-
484
- prompt = await frame.evaluate(_QUESTION_JS)
485
- prompt_text = prompt.strip().lower() if isinstance(prompt, str) else ""
486
- has_canvas = await frame.locator("canvas").count() > 0
487
- submit_text = (
488
- await frame.locator(".button-submit").first.inner_text()
489
- if await frame.locator(".button-submit").count() > 0
490
- else ""
491
- )
492
-
493
- if "puzzle piece" in prompt_text or (has_canvas and "skip" in submit_text.lower()):
494
- log.warning(
495
- "Detected unsupported hCaptcha canvas/puzzle challenge: prompt=%r submit=%r has_canvas=%s",
496
- prompt,
497
- submit_text,
498
- has_canvas,
499
- )
500
- return (
501
- "hCaptcha presented a canvas/puzzle challenge, which is not supported "
502
- "by the built-in HCaptchaClassification fallback"
503
- )
504
-
505
- log.warning(
506
- "Detected unsupported hCaptcha challenge layout: prompt=%r submit=%r has_canvas=%s",
507
- prompt,
508
- submit_text,
509
- has_canvas,
510
- )
511
- return (
512
- "hCaptcha image challenge detected, but the current DOM layout is not "
513
- "supported by the built-in classification fallback"
514
- )
515
-
516
- async def _capture_element_base64(self, element: ElementHandle[Any]) -> str | None:
517
- try:
518
- image_bytes = await element.screenshot(type="png")
519
- except Exception:
520
- return None
521
- return base64.b64encode(image_bytes).decode("ascii")
522
-
523
- @staticmethod
524
- def _build_classification_payload(
525
- *, question: str, tile_images: list[str], examples: list[str]
526
- ) -> dict[str, Any]:
527
- payload: dict[str, Any] = {
528
- "type": "HCaptchaClassification",
529
- "question": question,
530
- "images": tile_images,
531
- }
532
- if examples:
533
- payload["examples"] = examples
534
- return payload
535
-
536
- @staticmethod
537
- def _extract_selection_indices(
538
- *, result: dict[str, Any], tile_count: int
539
- ) -> list[int]:
540
- raw_answer = result.get("answer")
541
- if isinstance(raw_answer, bool):
542
- indices = [0] if raw_answer and tile_count == 1 else []
543
- elif isinstance(raw_answer, list):
544
- indices = [int(idx) for idx in raw_answer if isinstance(idx, int | float)]
545
- else:
546
- raw_objects = result.get("objects")
547
- if isinstance(raw_objects, list):
548
- indices = [int(idx) for idx in raw_objects if isinstance(idx, int | float)]
549
- else:
550
- indices = []
551
-
552
- deduped: list[int] = []
553
- for idx in indices:
554
- if 0 <= idx < tile_count and idx not in deduped:
555
- deduped.append(idx)
556
- return deduped
557
-
558
- async def _click_selected_tiles(
559
- self,
560
- tiles: list[ElementHandle[Any]],
561
- indices: list[int],
562
- ) -> None:
563
- for idx in indices:
564
- await tiles[idx].click(timeout=10_000)
565
- await asyncio.sleep(0.2)
566
- log.info("Clicked %d hCaptcha tile(s): %s", len(indices), indices)
567
-
568
- async def _click_verify_button(self, frame: Frame) -> None:
569
- for selector in _VERIFY_BUTTON_SELECTORS:
570
- button = await frame.query_selector(selector)
571
- if button is None:
572
- continue
573
- await button.click(timeout=10_000)
574
- await asyncio.sleep(1)
575
- log.info("Submitted hCaptcha challenge with selector %s", selector)
576
- return
577
- raise RuntimeError("Could not find hCaptcha verify/submit button")