Nfff / app /scraper /captcha_detector.py
Ruhivig65's picture
Upload 5 files
14345e3 verified
"""
============================================
Captcha & Anti-Bot Detection System
- Detects Cloudflare challenges
- Detects common captcha types
- Takes screenshots for manual intervention
- Manages the intervention workflow
============================================
"""
import asyncio
import os
import time
import logging
from typing import Optional, Dict, Tuple
from datetime import datetime
from playwright.async_api import Page, TimeoutError as PlaywrightTimeout
from app.config import settings
logger = logging.getLogger(__name__)
# ============================================
# Known Detection Patterns
# ============================================
CLOUDFLARE_INDICATORS = [
# Page title patterns
"just a moment",
"attention required",
"checking your browser",
"please wait",
"ddos protection",
"cloudflare",
"security check",
"ray id",
"please verify you are a human",
]
CAPTCHA_SELECTORS = [
# Cloudflare Turnstile
"iframe[src*='challenges.cloudflare.com']",
"#turnstile-wrapper",
".cf-turnstile",
# Google reCAPTCHA
"iframe[src*='google.com/recaptcha']",
".g-recaptcha",
"#recaptcha",
"iframe[title*='reCAPTCHA']",
# hCaptcha
"iframe[src*='hcaptcha.com']",
".h-captcha",
# Generic captcha patterns
"[class*='captcha']",
"[id*='captcha']",
"[class*='challenge']",
"img[src*='captcha']",
# Common anti-bot pages
".cf-browser-verification",
"#cf-challenge-running",
".ray_id",
"#challenge-form",
"#challenge-stage",
]
BLOCK_PAGE_INDICATORS = [
# Text content that indicates blocking
"access denied",
"forbidden",
"blocked",
"you have been blocked",
"your ip has been",
"rate limit",
"too many requests",
"403 forbidden",
"bot detected",
]
class CaptchaDetector:
"""
Detects anti-bot protections and manages manual intervention.
"""
def __init__(self):
# Store intervention status for each novel
# {novel_id: {"waiting": bool, "screenshot": str, "timestamp": float}}
self._interventions: Dict[int, Dict] = {}
async def check_for_protection(
self,
page: Page,
novel_id: int,
expected_content_selector: str,
) -> Tuple[bool, str]:
"""
Check if the current page has anti-bot protection.
Logic:
1. Try to find the expected content (chapter text)
2. If content is found → No protection, continue
3. If content is NOT found → Check for known captcha/block patterns
4. Return (is_blocked, reason)
Returns:
(is_blocked: bool, reason: str)
"""
try:
# --- Step 1: Quick check - is the expected content there? ---
try:
await page.wait_for_selector(
expected_content_selector,
timeout=settings.CAPTCHA_CHECK_TIMEOUT * 1000,
state="attached",
)
# Content found! No protection detected
return False, "content_found"
except PlaywrightTimeout:
logger.warning(
f"Novel {novel_id}: Expected content not found within "
f"{settings.CAPTCHA_CHECK_TIMEOUT}s, checking for protection..."
)
# --- Step 2: Check page title for Cloudflare indicators ---
page_title = (await page.title()).lower()
for indicator in CLOUDFLARE_INDICATORS:
if indicator in page_title:
reason = f"Cloudflare detected (title: '{page_title}')"
logger.warning(f"Novel {novel_id}: {reason}")
return True, reason
# --- Step 3: Check for captcha elements on the page ---
for selector in CAPTCHA_SELECTORS:
try:
element = await page.query_selector(selector)
if element:
reason = f"Captcha element found: {selector}"
logger.warning(f"Novel {novel_id}: {reason}")
return True, reason
except Exception:
continue
# --- Step 4: Check page body text for block indicators ---
try:
body_text = await page.evaluate(
"document.body ? document.body.innerText.toLowerCase().substring(0, 2000) : ''"
)
for indicator in BLOCK_PAGE_INDICATORS:
if indicator in body_text:
reason = f"Block page detected (text contains: '{indicator}')"
logger.warning(f"Novel {novel_id}: {reason}")
return True, reason
except Exception:
pass
# --- Step 5: Check if page is mostly empty (another sign of blocking) ---
try:
body_length = await page.evaluate(
"document.body ? document.body.innerText.length : 0"
)
if body_length < 100:
reason = "Page appears empty (possible block)"
logger.warning(f"Novel {novel_id}: {reason}")
return True, reason
except Exception:
pass
# --- Step 6: Content not found but no known protection either ---
# This might be a layout issue, not a captcha
reason = "Content selector not found (possible layout change)"
logger.warning(f"Novel {novel_id}: {reason}")
return True, reason
except Exception as e:
logger.error(f"Novel {novel_id}: Error during protection check: {e}")
return True, f"Error during check: {str(e)}"
async def handle_protection_detected(
self,
page: Page,
novel_id: int,
reason: str,
browser_manager,
) -> str:
"""
Handle detected protection:
1. Take a screenshot
2. Store intervention info
3. Return the screenshot filename
The frontend will poll for this and show the screenshot to the user.
"""
# Generate screenshot filename
timestamp = int(time.time())
filename = f"novel_{novel_id}_captcha_{timestamp}.png"
# Take screenshot
screenshot_path = await browser_manager.take_screenshot(novel_id, filename)
if screenshot_path is None:
screenshot_path = "screenshot_failed"
filename = "screenshot_failed.png"
# Store intervention info
self._interventions[novel_id] = {
"waiting": True,
"screenshot": filename,
"screenshot_path": screenshot_path,
"reason": reason,
"timestamp": time.time(),
"page_url": page.url,
}
logger.info(
f"🚨 Novel {novel_id}: Intervention required!\n"
f" Reason: {reason}\n"
f" Screenshot: {filename}\n"
f" URL: {page.url}"
)
return filename
async def wait_for_intervention(
self,
novel_id: int,
timeout_minutes: int = 30,
) -> bool:
"""
Wait for manual intervention to be completed.
The frontend will:
1. See the screenshot
2. User clicks on captcha in the UI
3. UI sends click coordinates to backend
4. Backend clicks on the actual page
5. This method checks if the intervention resolved the issue
Returns:
True if intervention succeeded (continue scraping)
False if timeout (give up)
"""
logger.info(f"Novel {novel_id}: Waiting for manual intervention...")
timeout = timeout_minutes * 60 # Convert to seconds
start_time = time.time()
while True:
# Check if intervention is still needed
intervention = self._interventions.get(novel_id, {})
if not intervention.get("waiting", False):
logger.info(f"Novel {novel_id}: Intervention completed! ✅")
return True
# Check timeout
elapsed = time.time() - start_time
if elapsed > timeout:
logger.warning(
f"Novel {novel_id}: Intervention timeout after "
f"{timeout_minutes} minutes"
)
return False
# Wait before checking again
await asyncio.sleep(3)
def mark_intervention_complete(self, novel_id: int):
"""
Mark that manual intervention has been completed for a novel.
Called after the user successfully solves the captcha.
"""
if novel_id in self._interventions:
self._interventions[novel_id]["waiting"] = False
logger.info(f"Novel {novel_id}: Intervention marked as complete")
def get_intervention_status(self, novel_id: int) -> Optional[Dict]:
"""Get the current intervention status for a novel."""
return self._interventions.get(novel_id)
def get_all_interventions(self) -> Dict:
"""Get all active interventions (for the UI dashboard)."""
return {
nid: info
for nid, info in self._interventions.items()
if info.get("waiting", False)
}
def clear_intervention(self, novel_id: int):
"""Clear intervention data for a novel."""
self._interventions.pop(novel_id, None)
async def wait_for_cloudflare_auto_resolve(
self,
page: Page,
novel_id: int,
max_wait: int = 15,
) -> bool:
"""
Sometimes Cloudflare resolves automatically after a few seconds.
Wait and check if it resolves on its own before flagging for intervention.
Returns:
True if auto-resolved
False if still blocked
"""
logger.info(
f"Novel {novel_id}: Waiting up to {max_wait}s for auto-resolution..."
)
for i in range(max_wait):
await asyncio.sleep(1)
# Check if Cloudflare challenge is gone
title = (await page.title()).lower()
has_challenge = any(
indicator in title for indicator in CLOUDFLARE_INDICATORS
)
if not has_challenge:
# Check if any captcha elements are still present
captcha_found = False
for selector in CAPTCHA_SELECTORS[:6]: # Check main ones
try:
element = await page.query_selector(selector)
if element:
captcha_found = True
break
except Exception:
continue
if not captcha_found:
logger.info(
f"Novel {novel_id}: Cloudflare auto-resolved after {i+1}s! ✅"
)
return True
logger.warning(f"Novel {novel_id}: Cloudflare did NOT auto-resolve")
return False
# ============================================
# Global Singleton Instance
# ============================================
captcha_detector = CaptchaDetector()