Translaterpeed / app /scraper /scraper_engine.py
Ruhivig65's picture
Upload 5 files
5a22919 verified
"""
============================================
Core Scraper Engine
- Orchestrates the entire scraping process for ONE novel
- Login → Navigate → Scrape Chapter → Next Chapter → Repeat
- Handles errors, captchas, and database saves
- Each novel runs as an independent async task
============================================
"""
import asyncio
import logging
import random
import re
from typing import Optional, Dict, Any
from datetime import datetime, timezone
from playwright.async_api import (
Page,
TimeoutError as PlaywrightTimeout,
Error as PlaywrightError,
)
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError
from app.config import settings
from app.scraper.browser_manager import browser_manager
from app.scraper.human_simulator import HumanSimulator
from app.scraper.captcha_detector import captcha_detector
from app.database.connection import get_session_factory
from deep_translator import GoogleTranslator
from app.database.crud import (
save_chapter,
update_novel_status,
increment_chapter_count,
get_last_chapter_number,
chapter_exists,
)
from app.database.models import NovelStatus
logger = logging.getLogger(__name__)
# ============================================
# Shared State for UI Status Updates
# ============================================
# This dictionary is read by the API routes to show live status
scraper_status: Dict[int, Dict[str, Any]] = {}
def update_status(novel_id: int, **kwargs):
"""Update the live status for a novel (read by UI)."""
if novel_id not in scraper_status:
scraper_status[novel_id] = {}
scraper_status[novel_id].update(kwargs)
scraper_status[novel_id]["last_updated"] = datetime.now(timezone.utc).isoformat()
# ============================================
# Login Handlers
# ============================================
async def attempt_login(
page: Page,
human: HumanSimulator,
novel_id: int,
url: str,
email: Optional[str],
password: Optional[str],
) -> bool:
"""
Attempt to log in to the novel website.
Strategy:
1. Navigate to the URL
2. Look for common login form selectors
3. Fill in credentials
4. Submit the form
5. Verify login succeeded
Returns True if login succeeded (or wasn't needed).
"""
if not email or not password:
logger.info(f"Novel {novel_id}: No credentials provided, skipping login.")
return True
update_status(novel_id, phase="logging_in", message="Attempting login...")
try:
# Navigate to the page first
logger.info(f"Novel {novel_id}: Navigating to {url} for login...")
await page.goto(url, wait_until="domcontentloaded")
await human.simulate_page_arrival()
# --- Common login form selectors (try each one) ---
login_selectors = [
# Email/username fields
{
"email": [
"input[name='email']",
"input[type='email']",
"input[name='username']",
"input[name='log']",
"input[id='email']",
"input[id='username']",
"#login-email",
"input[placeholder*='email' i]",
"input[placeholder*='username' i]",
],
"password": [
"input[name='password']",
"input[type='password']",
"input[name='pwd']",
"input[id='password']",
"#login-password",
],
"submit": [
"button[type='submit']",
"input[type='submit']",
"button.login-btn",
"button.sign-in",
"#login-submit",
"button:has-text('Log In')",
"button:has-text('Sign In')",
"button:has-text('Login')",
],
}
]
# Try to find and fill the login form
email_filled = False
password_filled = False
for selector_group in login_selectors:
# Find email field
for email_sel in selector_group["email"]:
try:
email_field = await page.query_selector(email_sel)
if email_field:
await human.type_like_human(email_sel, email)
email_filled = True
logger.info(f"Novel {novel_id}: Email filled using {email_sel}")
break
except Exception:
continue
# Find password field
for pass_sel in selector_group["password"]:
try:
pass_field = await page.query_selector(pass_sel)
if pass_field:
await human.type_like_human(pass_sel, password)
password_filled = True
logger.info(f"Novel {novel_id}: Password filled using {pass_sel}")
break
except Exception:
continue
if email_filled and password_filled:
break
if not email_filled or not password_filled:
logger.warning(
f"Novel {novel_id}: Could not find login form fields. "
f"Email found: {email_filled}, Password found: {password_filled}. "
f"Proceeding without login."
)
return True # Continue anyway, maybe login isn't needed
# Submit the form
await human.medium_delay("before clicking login")
submitted = False
for selector_group in login_selectors:
for submit_sel in selector_group["submit"]:
try:
submit_btn = await page.query_selector(submit_sel)
if submit_btn:
await submit_btn.click()
submitted = True
logger.info(f"Novel {novel_id}: Login form submitted using {submit_sel}")
break
except Exception:
continue
if submitted:
break
if not submitted:
# Try pressing Enter as fallback
await page.keyboard.press("Enter")
logger.info(f"Novel {novel_id}: Login submitted via Enter key")
# Wait for navigation after login
await human.long_delay("waiting for login response")
# Check if login was successful (very basic check)
page_content = await page.content()
page_content_lower = page_content.lower()
login_failed_indicators = [
"invalid password",
"incorrect password",
"login failed",
"wrong password",
"invalid credentials",
"account not found",
"error logging in",
]
for indicator in login_failed_indicators:
if indicator in page_content_lower:
logger.error(f"Novel {novel_id}: Login appears to have failed ('{indicator}' found)")
update_status(novel_id, phase="login_failed", message=f"Login failed: {indicator}")
return False
logger.info(f"Novel {novel_id}: Login appears successful! ✅")
update_status(novel_id, phase="logged_in", message="Login successful")
return True
except PlaywrightTimeout:
logger.error(f"Novel {novel_id}: Login page timed out")
return False
except Exception as e:
logger.error(f"Novel {novel_id}: Login error: {e}")
return False
# ============================================
# Content Extraction
# ============================================
async def extract_chapter_content(
page: Page,
novel_id: int,
content_selector: str,
) -> Optional[Dict[str, str]]:
"""
Extract clean chapter text from the current page.
Returns:
{
"title": "Chapter 123: The Battle",
"content": "The full chapter text...",
"url": "https://..."
}
or None if extraction failed
"""
try:
# --- Try multiple content selectors (comma-separated) ---
selectors = [s.strip() for s in content_selector.split(",")]
content_element = None
used_selector = None
for selector in selectors:
try:
content_element = await page.query_selector(selector)
if content_element:
used_selector = selector
break
except Exception:
continue
if content_element is None:
logger.warning(f"Novel {novel_id}: No content element found with selectors: {content_selector}")
return None
# --- Extract text content ---
raw_text = await content_element.inner_text()
if not raw_text or len(raw_text.strip()) < 50:
logger.warning(f"Novel {novel_id}: Content too short ({len(raw_text.strip())} chars)")
return None
# --- Clean the text ---
clean_text = clean_chapter_text(raw_text)
# --- Try to get chapter title ---
title = await extract_chapter_title(page)
# --- Get current URL ---
current_url = page.url
logger.info(
f"Novel {novel_id}: Extracted chapter "
f"(title: '{title}', {len(clean_text)} chars, selector: {used_selector})"
)
return {
"title": title,
"content": clean_text,
"url": current_url,
}
except Exception as e:
logger.error(f"Novel {novel_id}: Content extraction error: {e}")
return None
async def extract_chapter_title(page: Page) -> str:
"""
Try to extract the chapter title from the page.
Tries multiple common selectors.
"""
title_selectors = [
"h1.chapter-title",
"h1.entry-title",
".chapter-title",
"h1",
"h2.chapter-title",
".text-center h1",
".reader-title",
"#chapter-heading",
".chr-title",
".title-chapter",
]
for selector in title_selectors:
try:
element = await page.query_selector(selector)
if element:
title = await element.inner_text()
title = title.strip()
if title and len(title) < 200: # Reasonable title length
return title
except Exception:
continue
# Fallback: use page title
try:
page_title = await page.title()
if page_title:
return page_title.strip()[:200]
except Exception:
pass
return "Untitled Chapter"
def clean_chapter_text(raw_text: str) -> str:
"""
Clean raw scraped text:
- Remove excessive whitespace
- Remove common ad/junk text
- Normalize line breaks
"""
if not raw_text:
return ""
text = raw_text
# --- Remove common junk patterns ---
junk_patterns = [
r"(?i)translator[:\s]*.*?(?=\n|$)",
r"(?i)editor[:\s]*.*?(?=\n|$)",
r"(?i)please read at.*?(?=\n|$)",
r"(?i)support us at.*?(?=\n|$)",
r"(?i)read (?:the )?latest (?:chapter )?at.*?(?=\n|$)",
r"(?i)visit.*?for (?:the )?latest.*?(?=\n|$)",
r"(?i)if you find any errors.*?(?=\n|$)",
r"(?i)join our discord.*?(?=\n|$)",
r"(?i)patreon\.com.*?(?=\n|$)",
r"(?i)ko-fi\.com.*?(?=\n|$)",
r"(?i)advertisement",
r"(?i)sponsored content",
]
for pattern in junk_patterns:
text = re.sub(pattern, "", text)
# --- Normalize whitespace ---
# Replace multiple newlines with double newline (paragraph break)
text = re.sub(r"\n{3,}", "\n\n", text)
# Replace multiple spaces with single space
text = re.sub(r" {2,}", " ", text)
# Clean up lines
lines = text.split("\n")
cleaned_lines = [line.strip() for line in lines]
text = "\n".join(cleaned_lines)
# Remove leading/trailing whitespace
text = text.strip()
return text
# ============================================
# Next Chapter Navigation
# ============================================
async def click_next_chapter(
page: Page,
human: HumanSimulator,
novel_id: int,
next_button_selector: str,
) -> bool:
"""
Find and click the "Next Chapter" button.
Returns True if navigation succeeded, False if no next chapter found.
"""
try:
# --- Try multiple selectors (comma-separated) ---
selectors = [s.strip() for s in next_button_selector.split(",")]
next_button = None
used_selector = None
for selector in selectors:
try:
next_button = await page.query_selector(selector)
if next_button:
# Verify it's visible and clickable
is_visible = await next_button.is_visible()
is_enabled = await next_button.is_enabled()
if is_visible and is_enabled:
used_selector = selector
break
else:
next_button = None
except Exception:
continue
if next_button is None:
# --- Fallback: Try text-based search ---
text_patterns = [
"Next Chapter",
"Next",
"Next →",
"Next >>",
"→",
">>",
"Continue Reading",
"NEXT",
]
for pattern in text_patterns:
try:
next_button = await page.query_selector(f"a:has-text('{pattern}')")
if next_button:
is_visible = await next_button.is_visible()
if is_visible:
used_selector = f"text='{pattern}'"
break
next_button = None
except Exception:
continue
if next_button is None:
logger.info(f"Novel {novel_id}: No 'Next Chapter' button found. Might be the last chapter.")
return False
# --- Get button position for human-like mouse movement ---
try:
box = await next_button.bounding_box()
if box:
center_x = int(box["x"] + box["width"] / 2)
center_y = int(box["y"] + box["height"] / 2)
await human.simulate_before_click(center_x, center_y)
except Exception:
pass
# --- Click the button ---
current_url = page.url
await next_button.click()
logger.info(f"Novel {novel_id}: Clicked 'Next Chapter' ({used_selector})")
# --- Wait for navigation ---
try:
await page.wait_for_load_state("domcontentloaded", timeout=20000)
except PlaywrightTimeout:
logger.warning(f"Novel {novel_id}: Page load timeout after clicking next")
# --- Verify we actually navigated ---
new_url = page.url
if new_url == current_url:
# Sometimes click doesn't navigate, maybe it opens in same page via AJAX
await asyncio.sleep(3)
new_url = page.url
if new_url == current_url:
logger.warning(f"Novel {novel_id}: URL didn't change after clicking next")
# Still might have loaded content via AJAX, so continue
# --- Human delay after navigation ---
await human.simulate_page_arrival()
return True
except PlaywrightTimeout:
logger.warning(f"Novel {novel_id}: Timeout while clicking next chapter")
return False
except Exception as e:
logger.error(f"Novel {novel_id}: Error clicking next chapter: {e}")
return False
# ============================================
# Main Scraping Loop (ONE Novel)
# ============================================
# ============================================
# Helper: Translate to Hindi (Google Translate)
# ============================================
def _translate_hindi(text: str) -> str:
"""Translate English text to Hindi in chunks."""
try:
translator = GoogleTranslator(source='en', target='hi')
chunk_size = 4000
if len(text) <= chunk_size:
return translator.translate(text) or text
chunks, current = [], ""
for word in text.split(' '):
if len(current) + len(word) + 1 <= chunk_size:
current += (" " + word) if current else word
else:
if current:
chunks.append(translator.translate(current) or current)
current = word
if current:
chunks.append(translator.translate(current) or current)
return " ".join(chunks)
except Exception as e:
logger.warning(f"Translation failed: {e} — saving English only")
return ""
async def scrape_novel(novel_id: int, novel_data: Dict[str, Any]):
"""
Main scraping function for a single novel.
This runs as an independent async task.
Flow:
1. Acquire semaphore slot
2. Create browser context
3. Login (if credentials provided)
4. Navigate to starting URL
5. Loop:
a. Check for captcha/protection
b. Extract chapter content
c. Save to database
d. Click next chapter
e. Repeat
6. Clean up browser context
"""
url = novel_data["url"]
title = novel_data.get("title", "Unknown Novel")
email = novel_data.get("login_email")
password = novel_data.get("login_password")
next_btn_selector = novel_data.get(
"next_button_selector",
"a.next_page, a[rel='next'], .next-chap, button.next-chapter"
)
content_sel = novel_data.get(
"content_selector",
".chapter-content, .reading-content, #chapter-content, .text-left"
)
update_status(
novel_id,
title=title,
phase="queued",
message="Waiting for available browser slot...",
chapters_scraped=0,
current_url=url,
)
# --- Step 1: Acquire semaphore (wait for available slot) ---
logger.info(f"Novel {novel_id} ({title}): Waiting for browser slot...")
async with browser_manager.semaphore:
logger.info(f"Novel {novel_id} ({title}): 🟢 Got browser slot! Starting...")
context = None
page = None
session_factory = get_session_factory()
try:
# --- Step 2: Create browser context ---
update_status(novel_id, phase="initializing", message="Creating browser...")
context, page = await browser_manager.create_context_for_novel(novel_id)
human = HumanSimulator(page)
# --- Step 3: Update DB status ---
async with session_factory() as db:
await update_novel_status(
db, novel_id, NovelStatus.LOGGING_IN,
current_url=url,
)
await db.commit()
# --- Step 4: Login ---
login_success = await attempt_login(
page, human, novel_id, url, email, password
)
if not login_success:
async with session_factory() as db:
await update_novel_status(
db, novel_id, NovelStatus.FAILED,
error_message="Login failed",
)
await db.commit()
update_status(novel_id, phase="failed", message="Login failed!")
return
# --- Step 5: Navigate to starting URL ---
update_status(novel_id, phase="navigating", message=f"Going to {url}")
logger.info(f"Novel {novel_id}: Navigating to starting URL: {url}")
await page.goto(url, wait_until="domcontentloaded")
await human.simulate_page_arrival()
async with session_factory() as db:
await update_novel_status(
db, novel_id, NovelStatus.SCRAPING,
current_url=url,
)
await db.commit()
# --- Step 6: Get last scraped chapter number (for resume) ---
async with session_factory() as db:
chapter_number = await get_last_chapter_number(db, novel_id)
chapter_number += 1 # Start from next chapter
consecutive_failures = 0
max_consecutive_failures = 5
# --- Step 7: MAIN SCRAPING LOOP ---
logger.info(f"Novel {novel_id}: Starting scraping loop from chapter {chapter_number}")
while True:
update_status(
novel_id,
phase="scraping",
message=f"Processing chapter {chapter_number}...",
current_url=page.url,
chapters_scraped=chapter_number - 1,
)
# --- 7a: Check for captcha/protection ---
is_blocked, block_reason = await captcha_detector.check_for_protection(
page, novel_id, content_sel
)
if is_blocked:
# First, try waiting for auto-resolution (Cloudflare)
if "cloudflare" in block_reason.lower():
auto_resolved = await captcha_detector.wait_for_cloudflare_auto_resolve(
page, novel_id, max_wait=15
)
if auto_resolved:
is_blocked = False
if is_blocked:
# --- Need manual intervention ---
update_status(
novel_id,
phase="captcha_detected",
message=f"⚠️ Protection detected: {block_reason}",
)
# Take screenshot and notify
screenshot_file = await captcha_detector.handle_protection_detected(
page, novel_id, block_reason, browser_manager
)
# Update DB
async with session_factory() as db:
await update_novel_status(
db, novel_id, NovelStatus.PAUSED_CAPTCHA,
error_message=block_reason,
screenshot_path=screenshot_file,
needs_intervention=True,
)
await db.commit()
update_status(
novel_id,
phase="waiting_intervention",
message="Waiting for you to solve the captcha...",
screenshot=screenshot_file,
)
# Wait for user to solve it
intervention_success = await captcha_detector.wait_for_intervention(
novel_id, timeout_minutes=30
)
if not intervention_success:
logger.error(f"Novel {novel_id}: Intervention timeout, stopping.")
async with session_factory() as db:
await update_novel_status(
db, novel_id, NovelStatus.FAILED,
error_message="Captcha intervention timeout",
)
await db.commit()
update_status(novel_id, phase="failed", message="Captcha timeout!")
return
# Intervention completed, clear the flag
async with session_factory() as db:
await update_novel_status(
db, novel_id, NovelStatus.SCRAPING,
needs_intervention=False,
)
await db.commit()
# Wait a moment and re-check
await human.medium_delay("post-intervention")
continue # Re-check the page
# --- 7b: Extract chapter content ---
chapter_data = await extract_chapter_content(
page, novel_id, content_sel
)
if chapter_data is None:
consecutive_failures += 1
logger.warning(
f"Novel {novel_id}: Failed to extract chapter {chapter_number} "
f"(failure {consecutive_failures}/{max_consecutive_failures})"
)
if consecutive_failures >= max_consecutive_failures:
logger.error(
f"Novel {novel_id}: Too many consecutive failures, stopping."
)
async with session_factory() as db:
await update_novel_status(
db, novel_id, NovelStatus.FAILED,
error_message=f"Too many extraction failures at chapter {chapter_number}",
)
await db.commit()
update_status(
novel_id, phase="failed",
message=f"Failed after {max_consecutive_failures} consecutive extraction failures"
)
return
# Try clicking next anyway
has_next = await click_next_chapter(
page, human, novel_id, next_btn_selector
)
if not has_next:
break
continue
# Reset failure counter on success
consecutive_failures = 0
# --- 7c: Save to database ---
try:
async with session_factory() as db:
# Check if already saved (for resume scenarios)
already_exists = await chapter_exists(
db, novel_id, chapter_number
)
if not already_exists:
# --- Translate to Hindi in background thread ---
loop = asyncio.get_event_loop()
hindi_content = await loop.run_in_executor(
None, _translate_hindi, chapter_data["content"]
)
hindi_title = await loop.run_in_executor(
None, _translate_hindi, chapter_data["title"]
)
logger.info(
f"Novel {novel_id}: 🇮🇳 Translated Ch {chapter_number} to Hindi"
)
await save_chapter(
db,
novel_id=novel_id,
chapter_number=chapter_number,
content=chapter_data["content"],
title=chapter_data["title"],
url=chapter_data["url"],
content_hindi=hindi_content or None,
title_hindi=hindi_title or None,
)
await increment_chapter_count(db, novel_id)
await db.commit()
logger.info(
f"Novel {novel_id}: ✅ Saved Chapter {chapter_number} "
f"- '{chapter_data['title']}'"
)
else:
logger.info(
f"Novel {novel_id}: Chapter {chapter_number} already exists, skipping save"
)
except IntegrityError:
logger.warning(
f"Novel {novel_id}: Chapter {chapter_number} duplicate, skipping"
)
except Exception as e:
logger.error(
f"Novel {novel_id}: Database error saving chapter {chapter_number}: {e}"
)
# Don't stop scraping for DB errors, log and continue
update_status(
novel_id,
phase="scraping",
message=f"✅ Chapter {chapter_number} saved! Moving to next...",
chapters_scraped=chapter_number,
last_chapter_title=chapter_data["title"],
)
# --- 7d: Human-like reading delay ---
await human.simulate_reading_chapter()
# --- 7e: Click Next Chapter ---
has_next = await click_next_chapter(
page, human, novel_id, next_btn_selector
)
if not has_next:
logger.info(
f"Novel {novel_id}: No next chapter found after chapter {chapter_number}. "
f"Novel might be complete! 🎉"
)
break
chapter_number += 1
# --- Safety: Don't scrape more than 10000 chapters ---
if chapter_number > 10000:
logger.warning(f"Novel {novel_id}: Hit 10000 chapter limit, stopping")
break
# --- Step 8: Scraping Complete ---
async with session_factory() as db:
await update_novel_status(
db, novel_id, NovelStatus.COMPLETED,
)
await db.commit()
update_status(
novel_id,
phase="completed",
message=f"🎉 Completed! {chapter_number - 1} chapters scraped.",
chapters_scraped=chapter_number - 1,
)
logger.info(
f"Novel {novel_id} ({title}): 🎉 COMPLETED! "
f"{chapter_number - 1} chapters scraped."
)
except PlaywrightError as e:
error_msg = f"Browser error: {str(e)}"
logger.error(f"Novel {novel_id}: {error_msg}")
async with session_factory() as db:
await update_novel_status(
db, novel_id, NovelStatus.PAUSED_ERROR,
error_message=error_msg,
)
await db.commit()
update_status(novel_id, phase="error", message=error_msg)
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
logger.error(f"Novel {novel_id}: {error_msg}", exc_info=True)
try:
async with session_factory() as db:
await update_novel_status(
db, novel_id, NovelStatus.PAUSED_ERROR,
error_message=error_msg,
)
await db.commit()
except Exception:
pass
update_status(novel_id, phase="error", message=error_msg)
finally:
# --- Always clean up the browser context ---
await browser_manager.close_context(novel_id)
captcha_detector.clear_intervention(novel_id)
logger.info(f"Novel {novel_id}: Browser context cleaned up.")
# ============================================
# Task Manager (Start Multiple Novels)
# ============================================
# Store active tasks: {novel_id: asyncio.Task}
active_tasks: Dict[int, asyncio.Task] = {}
async def start_scraping_novel(novel_id: int, novel_data: Dict[str, Any]):
"""
Start scraping a novel as a background task.
"""
if novel_id in active_tasks:
task = active_tasks[novel_id]
if not task.done():
logger.warning(f"Novel {novel_id}: Already scraping, ignoring duplicate start")
return False
# Create and store the task
task = asyncio.create_task(
scrape_novel(novel_id, novel_data),
name=f"scraper-novel-{novel_id}",
)
active_tasks[novel_id] = task
# Add callback to clean up when done
def task_done_callback(t: asyncio.Task):
try:
exception = t.exception()
if exception:
logger.error(
f"Novel {novel_id} task failed with exception: {exception}"
)
except asyncio.CancelledError:
logger.info(f"Novel {novel_id} task was cancelled")
task.add_done_callback(task_done_callback)
logger.info(f"Novel {novel_id}: Scraping task started! 🚀")
return True
async def stop_scraping_novel(novel_id: int) -> bool:
"""
Stop scraping a specific novel.
"""
if novel_id in active_tasks:
task = active_tasks[novel_id]
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Clean up browser
await browser_manager.close_context(novel_id)
logger.info(f"Novel {novel_id}: Scraping stopped ✋")
update_status(novel_id, phase="stopped", message="Scraping stopped by user")
return True
return False
async def stop_all_scraping():
"""Stop all active scraping tasks."""
for novel_id in list(active_tasks.keys()):
await stop_scraping_novel(novel_id)
logger.info("All scraping tasks stopped.")
def get_active_task_ids() -> list:
"""Get IDs of all novels currently being scraped."""
return [nid for nid, task in active_tasks.items() if not task.done()]