dragg2 commited on
Commit
fed6868
·
verified ·
1 Parent(s): f0f845d

Upload hcaptcha.py

Browse files
Files changed (1) hide show
  1. src/services/hcaptcha.py +187 -12
src/services/hcaptcha.py CHANGED
@@ -49,6 +49,36 @@ _EXTRACT_HCAPTCHA_TOKEN_JS = """
49
  }
50
  """
51
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
  _QUESTION_JS = """
53
  () => {
54
  const prompt = document.querySelector('.prompt-text')
@@ -81,6 +111,28 @@ _VERIFY_BUTTON_SELECTORS = (
81
  'button[aria-label*="Verify"]',
82
  )
83
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
 
85
  class HCaptchaSolver:
86
  """Solves ``HCaptchaTaskProxyless`` tasks via Playwright."""
@@ -123,12 +175,19 @@ class HCaptchaSolver:
123
  async def solve(self, params: dict[str, Any]) -> dict[str, Any]:
124
  website_url = params["websiteURL"]
125
  website_key = params["websiteKey"]
 
 
 
 
126
 
127
  last_error: Exception | None = None
128
  for attempt in range(self._config.captcha_retries):
129
  try:
130
- token = await self._solve_once(website_url, website_key)
131
- return {"gRecaptchaResponse": token}
 
 
 
132
  except Exception as exc:
133
  last_error = exc
134
  log.warning(
@@ -144,7 +203,7 @@ class HCaptchaSolver:
144
  f"HCaptcha failed after {self._config.captcha_retries} attempts: {last_error}"
145
  )
146
 
147
- async def _solve_once(self, website_url: str, website_key: str) -> str:
148
  assert self._browser is not None
149
  target_url = self._prepare_target_url(website_url, website_key)
150
  if target_url != website_url:
@@ -173,8 +232,9 @@ class HCaptchaSolver:
173
  # 先给低风险会话一个直接出 token 的机会。
174
  token = await self._wait_for_token(page, seconds=4)
175
  if token:
 
176
  log.info("Got hCaptcha token directly after checkbox click (len=%d)", len(token))
177
- return token
178
 
179
  # 无头环境常见路径:进入图片 challenge,然后走 classification fallback。
180
  log.info(
@@ -183,23 +243,27 @@ class HCaptchaSolver:
183
  fallback_handled = await self._solve_image_selection_challenge(page)
184
  if fallback_handled:
185
  token = await self._wait_for_token(page)
 
186
 
187
  if not isinstance(token, str) or len(token) < 20:
188
  raise RuntimeError(f"Invalid hCaptcha token: {token!r}")
189
 
190
  log.info("Got hCaptcha token (len=%d)", len(token))
191
- return token
192
  finally:
193
  await context.close()
194
 
195
  async def _click_checkbox(self, page: Page) -> None:
196
  frame = await self._find_frame(page, "checkbox", wait_seconds=10)
197
  if frame is None:
198
- raise RuntimeError("Could not find hCaptcha checkbox frame")
199
 
200
- checkbox = await frame.query_selector("#checkbox")
201
  if checkbox is None:
202
- raise RuntimeError("Could not find hCaptcha checkbox element")
 
 
 
203
 
204
  await checkbox.click(timeout=10_000)
205
  log.info("Clicked hCaptcha checkbox")
@@ -207,24 +271,135 @@ class HCaptchaSolver:
207
  async def _wait_for_token(self, page: Page, *, seconds: int | None = None) -> str | None:
208
  remaining = max(1, seconds or self._config.captcha_timeout)
209
  for _ in range(remaining):
210
- token = await page.evaluate(_EXTRACT_HCAPTCHA_TOKEN_JS)
211
  if isinstance(token, str) and len(token) > 20:
212
  return token
213
  await asyncio.sleep(1)
214
  return None
215
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
216
  async def _find_frame(
217
  self, page: Page, frame_role: str, *, wait_seconds: int = 5
218
  ) -> Frame | None:
219
  attempts = max(1, wait_seconds * 2)
 
220
  for _ in range(attempts):
 
 
 
221
  for frame in page.frames:
222
- url = frame.url or ""
223
- if "hcaptcha" in url and f"frame={frame_role}" in url:
224
- return frame
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
225
  await asyncio.sleep(0.5)
226
  return None
227
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
228
  @staticmethod
229
  def _prepare_target_url(website_url: str, website_key: str) -> str:
230
  """为官方 demo 自动补齐/对齐 sitekey,确保按请求参数测试真实行为。"""
 
49
  }
50
  """
51
 
52
+ _EXTRACT_HCAPTCHA_META_JS = """
53
+ () => {
54
+ const result = { token: null, respKey: null };
55
+ const textarea = document.querySelector('[name="h-captcha-response"]')
56
+ || document.querySelector('[name="g-recaptcha-response"]');
57
+ if (textarea && textarea.value && textarea.value.length > 20) {
58
+ result.token = textarea.value;
59
+ }
60
+ try {
61
+ if (window.hcaptcha) {
62
+ if (!result.token && typeof window.hcaptcha.getResponse === 'function') {
63
+ const response = window.hcaptcha.getResponse();
64
+ if (response && response.length > 20) {
65
+ result.token = response;
66
+ }
67
+ }
68
+ if (typeof window.hcaptcha.getRespKey === 'function') {
69
+ const respKey = window.hcaptcha.getRespKey();
70
+ if (respKey) {
71
+ result.respKey = String(respKey);
72
+ }
73
+ }
74
+ }
75
+ } catch (err) {
76
+ return result;
77
+ }
78
+ return result;
79
+ }
80
+ """
81
+
82
  _QUESTION_JS = """
83
  () => {
84
  const prompt = document.querySelector('.prompt-text')
 
111
  'button[aria-label*="Verify"]',
112
  )
113
 
114
+ _CHECKBOX_SELECTORS = (
115
+ "#checkbox",
116
+ '[id="checkbox"]',
117
+ 'div[role="checkbox"]',
118
+ 'input[type="checkbox"]',
119
+ '.checkbox',
120
+ '[aria-checked]',
121
+ )
122
+
123
+ _CHALLENGE_FRAME_HINTS = (
124
+ "frame=challenge",
125
+ "challenge",
126
+ "hcaptcha-inner",
127
+ )
128
+
129
+ _CHECKBOX_FRAME_HINTS = (
130
+ "frame=checkbox",
131
+ "checkbox",
132
+ "hcaptcha-invisible",
133
+ "hcaptcha-checkbox",
134
+ )
135
+
136
 
137
  class HCaptchaSolver:
138
  """Solves ``HCaptchaTaskProxyless`` tasks via Playwright."""
 
175
  async def solve(self, params: dict[str, Any]) -> dict[str, Any]:
176
  website_url = params["websiteURL"]
177
  website_key = params["websiteKey"]
178
+ enterprise_payload = params.get("enterprisePayload") or {}
179
+ rqdata = str(enterprise_payload.get("rqdata") or "").strip()
180
+ if rqdata:
181
+ log.info("Received hCaptcha enterprisePayload.rqdata (len=%d)", len(rqdata))
182
 
183
  last_error: Exception | None = None
184
  for attempt in range(self._config.captcha_retries):
185
  try:
186
+ token, resp_key = await self._solve_once(website_url, website_key)
187
+ solution = {"gRecaptchaResponse": token}
188
+ if resp_key:
189
+ solution["respKey"] = resp_key
190
+ return solution
191
  except Exception as exc:
192
  last_error = exc
193
  log.warning(
 
203
  f"HCaptcha failed after {self._config.captcha_retries} attempts: {last_error}"
204
  )
205
 
206
+ async def _solve_once(self, website_url: str, website_key: str) -> tuple[str, str]:
207
  assert self._browser is not None
208
  target_url = self._prepare_target_url(website_url, website_key)
209
  if target_url != website_url:
 
232
  # 先给低风险会话一个直接出 token 的机会。
233
  token = await self._wait_for_token(page, seconds=4)
234
  if token:
235
+ resp_key = await self._wait_for_resp_key(page, seconds=2)
236
  log.info("Got hCaptcha token directly after checkbox click (len=%d)", len(token))
237
+ return token, resp_key
238
 
239
  # 无头环境常见路径:进入图片 challenge,然后走 classification fallback。
240
  log.info(
 
243
  fallback_handled = await self._solve_image_selection_challenge(page)
244
  if fallback_handled:
245
  token = await self._wait_for_token(page)
246
+ resp_key = await self._wait_for_resp_key(page, seconds=2)
247
 
248
  if not isinstance(token, str) or len(token) < 20:
249
  raise RuntimeError(f"Invalid hCaptcha token: {token!r}")
250
 
251
  log.info("Got hCaptcha token (len=%d)", len(token))
252
+ return token, resp_key
253
  finally:
254
  await context.close()
255
 
256
  async def _click_checkbox(self, page: Page) -> None:
257
  frame = await self._find_frame(page, "checkbox", wait_seconds=10)
258
  if frame is None:
259
+ raise RuntimeError(self._build_missing_frame_error(page, "checkbox"))
260
 
261
+ checkbox = await self._find_checkbox_element(frame)
262
  if checkbox is None:
263
+ raise RuntimeError(
264
+ "Could not find hCaptcha checkbox element inside frame "
265
+ f"{getattr(frame, 'url', None) or '<empty>'}"
266
+ )
267
 
268
  await checkbox.click(timeout=10_000)
269
  log.info("Clicked hCaptcha checkbox")
 
271
  async def _wait_for_token(self, page: Page, *, seconds: int | None = None) -> str | None:
272
  remaining = max(1, seconds or self._config.captcha_timeout)
273
  for _ in range(remaining):
274
+ token, _ = await self._extract_hcaptcha_meta(page)
275
  if isinstance(token, str) and len(token) > 20:
276
  return token
277
  await asyncio.sleep(1)
278
  return None
279
 
280
+ async def _wait_for_resp_key(self, page: Page, *, seconds: int | None = None) -> str:
281
+ remaining = max(1, seconds or 2)
282
+ last_resp_key = ""
283
+ for _ in range(remaining):
284
+ _, resp_key = await self._extract_hcaptcha_meta(page)
285
+ if resp_key:
286
+ return resp_key
287
+ await asyncio.sleep(1)
288
+ return last_resp_key
289
+
290
+ async def _extract_hcaptcha_meta(self, page: Page) -> tuple[str | None, str]:
291
+ contexts: list[Page | Frame] = [page]
292
+ for frame in page.frames:
293
+ url = (getattr(frame, "url", None) or "").lower()
294
+ if self._is_hcaptcha_related_frame(url):
295
+ contexts.append(frame)
296
+
297
+ token: str | None = None
298
+ resp_key = ""
299
+ for context in contexts:
300
+ try:
301
+ payload = await context.evaluate(_EXTRACT_HCAPTCHA_META_JS)
302
+ except Exception:
303
+ continue
304
+ if not isinstance(payload, dict):
305
+ continue
306
+ current_token = payload.get("token")
307
+ current_resp_key = payload.get("respKey")
308
+ if not token and isinstance(current_token, str) and len(current_token) > 20:
309
+ token = current_token
310
+ if not resp_key and isinstance(current_resp_key, str) and current_resp_key.strip():
311
+ resp_key = current_resp_key.strip()
312
+ if token and resp_key:
313
+ break
314
+ return token, resp_key
315
+
316
  async def _find_frame(
317
  self, page: Page, frame_role: str, *, wait_seconds: int = 5
318
  ) -> Frame | None:
319
  attempts = max(1, wait_seconds * 2)
320
+ main_frame = getattr(page, "main_frame", None)
321
  for _ in range(attempts):
322
+ exact_match: Frame | None = None
323
+ hinted_match: Frame | None = None
324
+ dom_match: Frame | None = None
325
  for frame in page.frames:
326
+ if main_frame is not None and frame is main_frame:
327
+ continue
328
+ url = (frame.url or "").lower()
329
+ if not self._is_hcaptcha_related_frame(url):
330
+ continue
331
+
332
+ if frame_role == "checkbox":
333
+ if await self._find_checkbox_element(frame) is not None:
334
+ dom_match = dom_match or frame
335
+ if any(hint in url for hint in _CHECKBOX_FRAME_HINTS):
336
+ if "frame=checkbox" in url:
337
+ exact_match = exact_match or frame
338
+ else:
339
+ hinted_match = hinted_match or frame
340
+ elif frame_role == "challenge":
341
+ if await self._is_challenge_frame(frame):
342
+ dom_match = dom_match or frame
343
+ if any(hint in url for hint in _CHALLENGE_FRAME_HINTS):
344
+ if "frame=challenge" in url:
345
+ exact_match = exact_match or frame
346
+ else:
347
+ hinted_match = hinted_match or frame
348
+
349
+ if exact_match is not None:
350
+ return exact_match
351
+ if dom_match is not None:
352
+ return dom_match
353
+ if hinted_match is not None:
354
+ return hinted_match
355
  await asyncio.sleep(0.5)
356
  return None
357
 
358
+ @staticmethod
359
+ def _is_hcaptcha_related_frame(url: str) -> bool:
360
+ return (
361
+ "hcaptcha" in url
362
+ or "newassets.hcaptcha.com" in url
363
+ or "api.hcaptcha.com" in url
364
+ or "js.stripe.com/v3/hcaptcha" in url
365
+ )
366
+
367
+ async def _find_checkbox_element(self, frame: Frame) -> ElementHandle[Any] | None:
368
+ for selector in _CHECKBOX_SELECTORS:
369
+ element = await frame.query_selector(selector)
370
+ if element is not None:
371
+ return element
372
+ return None
373
+
374
+ async def _is_challenge_frame(self, frame: Frame) -> bool:
375
+ prompt = await frame.evaluate(_QUESTION_JS)
376
+ if isinstance(prompt, str) and prompt.strip():
377
+ return True
378
+
379
+ for selector in _CHALLENGE_TILE_SELECTORS:
380
+ elements = await frame.query_selector_all(selector)
381
+ if elements:
382
+ return True
383
+
384
+ if await frame.locator("canvas").count() > 0:
385
+ return True
386
+
387
+ for selector in _VERIFY_BUTTON_SELECTORS:
388
+ if await frame.query_selector(selector) is not None:
389
+ return True
390
+
391
+ return False
392
+
393
+ @staticmethod
394
+ def _build_missing_frame_error(page: Page, frame_role: str) -> str:
395
+ frame_urls = [
396
+ getattr(frame, "url", None) or "<empty>"
397
+ for frame in page.frames
398
+ ]
399
+ return (
400
+ f"Could not find hCaptcha {frame_role} frame; available frames={frame_urls}"
401
+ )
402
+
403
  @staticmethod
404
  def _prepare_target_url(website_url: str, website_key: str) -> str:
405
  """为官方 demo 自动补齐/对齐 sitekey,确保按请求参数测试真实行为。"""