""" ============================================ Core Scraper Engine - Orchestrates the entire scraping process for ONE novel - Login → Navigate → Scrape Chapter → Next Chapter → Repeat - Handles errors, captchas, and database saves - Each novel runs as an independent async task ============================================ """ import asyncio import logging import random import re from typing import Optional, Dict, Any from datetime import datetime, timezone from playwright.async_api import ( Page, TimeoutError as PlaywrightTimeout, Error as PlaywrightError, ) from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.exc import IntegrityError from app.config import settings from app.scraper.browser_manager import browser_manager from app.scraper.human_simulator import HumanSimulator from app.scraper.captcha_detector import captcha_detector from app.database.connection import get_session_factory from deep_translator import GoogleTranslator from app.database.crud import ( save_chapter, update_novel_status, increment_chapter_count, get_last_chapter_number, chapter_exists, ) from app.database.models import NovelStatus logger = logging.getLogger(__name__) # ============================================ # Shared State for UI Status Updates # ============================================ # This dictionary is read by the API routes to show live status scraper_status: Dict[int, Dict[str, Any]] = {} def update_status(novel_id: int, **kwargs): """Update the live status for a novel (read by UI).""" if novel_id not in scraper_status: scraper_status[novel_id] = {} scraper_status[novel_id].update(kwargs) scraper_status[novel_id]["last_updated"] = datetime.now(timezone.utc).isoformat() # ============================================ # Login Handlers # ============================================ async def attempt_login( page: Page, human: HumanSimulator, novel_id: int, url: str, email: Optional[str], password: Optional[str], ) -> bool: """ Attempt to log in to the novel website. Strategy: 1. Navigate to the URL 2. Look for common login form selectors 3. Fill in credentials 4. Submit the form 5. Verify login succeeded Returns True if login succeeded (or wasn't needed). """ if not email or not password: logger.info(f"Novel {novel_id}: No credentials provided, skipping login.") return True update_status(novel_id, phase="logging_in", message="Attempting login...") try: # Navigate to the page first logger.info(f"Novel {novel_id}: Navigating to {url} for login...") await page.goto(url, wait_until="domcontentloaded") await human.simulate_page_arrival() # --- Common login form selectors (try each one) --- login_selectors = [ # Email/username fields { "email": [ "input[name='email']", "input[type='email']", "input[name='username']", "input[name='log']", "input[id='email']", "input[id='username']", "#login-email", "input[placeholder*='email' i]", "input[placeholder*='username' i]", ], "password": [ "input[name='password']", "input[type='password']", "input[name='pwd']", "input[id='password']", "#login-password", ], "submit": [ "button[type='submit']", "input[type='submit']", "button.login-btn", "button.sign-in", "#login-submit", "button:has-text('Log In')", "button:has-text('Sign In')", "button:has-text('Login')", ], } ] # Try to find and fill the login form email_filled = False password_filled = False for selector_group in login_selectors: # Find email field for email_sel in selector_group["email"]: try: email_field = await page.query_selector(email_sel) if email_field: await human.type_like_human(email_sel, email) email_filled = True logger.info(f"Novel {novel_id}: Email filled using {email_sel}") break except Exception: continue # Find password field for pass_sel in selector_group["password"]: try: pass_field = await page.query_selector(pass_sel) if pass_field: await human.type_like_human(pass_sel, password) password_filled = True logger.info(f"Novel {novel_id}: Password filled using {pass_sel}") break except Exception: continue if email_filled and password_filled: break if not email_filled or not password_filled: logger.warning( f"Novel {novel_id}: Could not find login form fields. " f"Email found: {email_filled}, Password found: {password_filled}. " f"Proceeding without login." ) return True # Continue anyway, maybe login isn't needed # Submit the form await human.medium_delay("before clicking login") submitted = False for selector_group in login_selectors: for submit_sel in selector_group["submit"]: try: submit_btn = await page.query_selector(submit_sel) if submit_btn: await submit_btn.click() submitted = True logger.info(f"Novel {novel_id}: Login form submitted using {submit_sel}") break except Exception: continue if submitted: break if not submitted: # Try pressing Enter as fallback await page.keyboard.press("Enter") logger.info(f"Novel {novel_id}: Login submitted via Enter key") # Wait for navigation after login await human.long_delay("waiting for login response") # Check if login was successful (very basic check) page_content = await page.content() page_content_lower = page_content.lower() login_failed_indicators = [ "invalid password", "incorrect password", "login failed", "wrong password", "invalid credentials", "account not found", "error logging in", ] for indicator in login_failed_indicators: if indicator in page_content_lower: logger.error(f"Novel {novel_id}: Login appears to have failed ('{indicator}' found)") update_status(novel_id, phase="login_failed", message=f"Login failed: {indicator}") return False logger.info(f"Novel {novel_id}: Login appears successful! ✅") update_status(novel_id, phase="logged_in", message="Login successful") return True except PlaywrightTimeout: logger.error(f"Novel {novel_id}: Login page timed out") return False except Exception as e: logger.error(f"Novel {novel_id}: Login error: {e}") return False # ============================================ # Content Extraction # ============================================ async def extract_chapter_content( page: Page, novel_id: int, content_selector: str, ) -> Optional[Dict[str, str]]: """ Extract clean chapter text from the current page. Returns: { "title": "Chapter 123: The Battle", "content": "The full chapter text...", "url": "https://..." } or None if extraction failed """ try: # --- Try multiple content selectors (comma-separated) --- selectors = [s.strip() for s in content_selector.split(",")] content_element = None used_selector = None for selector in selectors: try: content_element = await page.query_selector(selector) if content_element: used_selector = selector break except Exception: continue if content_element is None: logger.warning(f"Novel {novel_id}: No content element found with selectors: {content_selector}") return None # --- Extract text content --- raw_text = await content_element.inner_text() if not raw_text or len(raw_text.strip()) < 50: logger.warning(f"Novel {novel_id}: Content too short ({len(raw_text.strip())} chars)") return None # --- Clean the text --- clean_text = clean_chapter_text(raw_text) # --- Try to get chapter title --- title = await extract_chapter_title(page) # --- Get current URL --- current_url = page.url logger.info( f"Novel {novel_id}: Extracted chapter " f"(title: '{title}', {len(clean_text)} chars, selector: {used_selector})" ) return { "title": title, "content": clean_text, "url": current_url, } except Exception as e: logger.error(f"Novel {novel_id}: Content extraction error: {e}") return None async def extract_chapter_title(page: Page) -> str: """ Try to extract the chapter title from the page. Tries multiple common selectors. """ title_selectors = [ "h1.chapter-title", "h1.entry-title", ".chapter-title", "h1", "h2.chapter-title", ".text-center h1", ".reader-title", "#chapter-heading", ".chr-title", ".title-chapter", ] for selector in title_selectors: try: element = await page.query_selector(selector) if element: title = await element.inner_text() title = title.strip() if title and len(title) < 200: # Reasonable title length return title except Exception: continue # Fallback: use page title try: page_title = await page.title() if page_title: return page_title.strip()[:200] except Exception: pass return "Untitled Chapter" def clean_chapter_text(raw_text: str) -> str: """ Clean raw scraped text: - Remove excessive whitespace - Remove common ad/junk text - Normalize line breaks """ if not raw_text: return "" text = raw_text # --- Remove common junk patterns --- junk_patterns = [ r"(?i)translator[:\s]*.*?(?=\n|$)", r"(?i)editor[:\s]*.*?(?=\n|$)", r"(?i)please read at.*?(?=\n|$)", r"(?i)support us at.*?(?=\n|$)", r"(?i)read (?:the )?latest (?:chapter )?at.*?(?=\n|$)", r"(?i)visit.*?for (?:the )?latest.*?(?=\n|$)", r"(?i)if you find any errors.*?(?=\n|$)", r"(?i)join our discord.*?(?=\n|$)", r"(?i)patreon\.com.*?(?=\n|$)", r"(?i)ko-fi\.com.*?(?=\n|$)", r"(?i)advertisement", r"(?i)sponsored content", ] for pattern in junk_patterns: text = re.sub(pattern, "", text) # --- Normalize whitespace --- # Replace multiple newlines with double newline (paragraph break) text = re.sub(r"\n{3,}", "\n\n", text) # Replace multiple spaces with single space text = re.sub(r" {2,}", " ", text) # Clean up lines lines = text.split("\n") cleaned_lines = [line.strip() for line in lines] text = "\n".join(cleaned_lines) # Remove leading/trailing whitespace text = text.strip() return text # ============================================ # Next Chapter Navigation # ============================================ async def click_next_chapter( page: Page, human: HumanSimulator, novel_id: int, next_button_selector: str, ) -> bool: """ Find and click the "Next Chapter" button. Returns True if navigation succeeded, False if no next chapter found. """ try: # --- Try multiple selectors (comma-separated) --- selectors = [s.strip() for s in next_button_selector.split(",")] next_button = None used_selector = None for selector in selectors: try: next_button = await page.query_selector(selector) if next_button: # Verify it's visible and clickable is_visible = await next_button.is_visible() is_enabled = await next_button.is_enabled() if is_visible and is_enabled: used_selector = selector break else: next_button = None except Exception: continue if next_button is None: # --- Fallback: Try text-based search --- text_patterns = [ "Next Chapter", "Next", "Next →", "Next >>", "→", ">>", "Continue Reading", "NEXT", ] for pattern in text_patterns: try: next_button = await page.query_selector(f"a:has-text('{pattern}')") if next_button: is_visible = await next_button.is_visible() if is_visible: used_selector = f"text='{pattern}'" break next_button = None except Exception: continue if next_button is None: logger.info(f"Novel {novel_id}: No 'Next Chapter' button found. Might be the last chapter.") return False # --- Get button position for human-like mouse movement --- try: box = await next_button.bounding_box() if box: center_x = int(box["x"] + box["width"] / 2) center_y = int(box["y"] + box["height"] / 2) await human.simulate_before_click(center_x, center_y) except Exception: pass # --- Click the button --- current_url = page.url await next_button.click() logger.info(f"Novel {novel_id}: Clicked 'Next Chapter' ({used_selector})") # --- Wait for navigation --- try: await page.wait_for_load_state("domcontentloaded", timeout=20000) except PlaywrightTimeout: logger.warning(f"Novel {novel_id}: Page load timeout after clicking next") # --- Verify we actually navigated --- new_url = page.url if new_url == current_url: # Sometimes click doesn't navigate, maybe it opens in same page via AJAX await asyncio.sleep(3) new_url = page.url if new_url == current_url: logger.warning(f"Novel {novel_id}: URL didn't change after clicking next") # Still might have loaded content via AJAX, so continue # --- Human delay after navigation --- await human.simulate_page_arrival() return True except PlaywrightTimeout: logger.warning(f"Novel {novel_id}: Timeout while clicking next chapter") return False except Exception as e: logger.error(f"Novel {novel_id}: Error clicking next chapter: {e}") return False # ============================================ # Main Scraping Loop (ONE Novel) # ============================================ # ============================================ # Helper: Translate to Hindi (Google Translate) # ============================================ def _translate_hindi(text: str) -> str: """Translate English text to Hindi in chunks.""" try: translator = GoogleTranslator(source='en', target='hi') chunk_size = 4000 if len(text) <= chunk_size: return translator.translate(text) or text chunks, current = [], "" for word in text.split(' '): if len(current) + len(word) + 1 <= chunk_size: current += (" " + word) if current else word else: if current: chunks.append(translator.translate(current) or current) current = word if current: chunks.append(translator.translate(current) or current) return " ".join(chunks) except Exception as e: logger.warning(f"Translation failed: {e} — saving English only") return "" async def scrape_novel(novel_id: int, novel_data: Dict[str, Any]): """ Main scraping function for a single novel. This runs as an independent async task. Flow: 1. Acquire semaphore slot 2. Create browser context 3. Login (if credentials provided) 4. Navigate to starting URL 5. Loop: a. Check for captcha/protection b. Extract chapter content c. Save to database d. Click next chapter e. Repeat 6. Clean up browser context """ url = novel_data["url"] title = novel_data.get("title", "Unknown Novel") email = novel_data.get("login_email") password = novel_data.get("login_password") next_btn_selector = novel_data.get( "next_button_selector", "a.next_page, a[rel='next'], .next-chap, button.next-chapter" ) content_sel = novel_data.get( "content_selector", ".chapter-content, .reading-content, #chapter-content, .text-left" ) update_status( novel_id, title=title, phase="queued", message="Waiting for available browser slot...", chapters_scraped=0, current_url=url, ) # --- Step 1: Acquire semaphore (wait for available slot) --- logger.info(f"Novel {novel_id} ({title}): Waiting for browser slot...") async with browser_manager.semaphore: logger.info(f"Novel {novel_id} ({title}): 🟢 Got browser slot! Starting...") context = None page = None session_factory = get_session_factory() try: # --- Step 2: Create browser context --- update_status(novel_id, phase="initializing", message="Creating browser...") context, page = await browser_manager.create_context_for_novel(novel_id) human = HumanSimulator(page) # --- Step 3: Update DB status --- async with session_factory() as db: await update_novel_status( db, novel_id, NovelStatus.LOGGING_IN, current_url=url, ) await db.commit() # --- Step 4: Login --- login_success = await attempt_login( page, human, novel_id, url, email, password ) if not login_success: async with session_factory() as db: await update_novel_status( db, novel_id, NovelStatus.FAILED, error_message="Login failed", ) await db.commit() update_status(novel_id, phase="failed", message="Login failed!") return # --- Step 5: Navigate to starting URL --- update_status(novel_id, phase="navigating", message=f"Going to {url}") logger.info(f"Novel {novel_id}: Navigating to starting URL: {url}") await page.goto(url, wait_until="domcontentloaded") await human.simulate_page_arrival() async with session_factory() as db: await update_novel_status( db, novel_id, NovelStatus.SCRAPING, current_url=url, ) await db.commit() # --- Step 6: Get last scraped chapter number (for resume) --- async with session_factory() as db: chapter_number = await get_last_chapter_number(db, novel_id) chapter_number += 1 # Start from next chapter consecutive_failures = 0 max_consecutive_failures = 5 # --- Step 7: MAIN SCRAPING LOOP --- logger.info(f"Novel {novel_id}: Starting scraping loop from chapter {chapter_number}") while True: update_status( novel_id, phase="scraping", message=f"Processing chapter {chapter_number}...", current_url=page.url, chapters_scraped=chapter_number - 1, ) # --- 7a: Check for captcha/protection --- is_blocked, block_reason = await captcha_detector.check_for_protection( page, novel_id, content_sel ) if is_blocked: # First, try waiting for auto-resolution (Cloudflare) if "cloudflare" in block_reason.lower(): auto_resolved = await captcha_detector.wait_for_cloudflare_auto_resolve( page, novel_id, max_wait=15 ) if auto_resolved: is_blocked = False if is_blocked: # --- Need manual intervention --- update_status( novel_id, phase="captcha_detected", message=f"⚠️ Protection detected: {block_reason}", ) # Take screenshot and notify screenshot_file = await captcha_detector.handle_protection_detected( page, novel_id, block_reason, browser_manager ) # Update DB async with session_factory() as db: await update_novel_status( db, novel_id, NovelStatus.PAUSED_CAPTCHA, error_message=block_reason, screenshot_path=screenshot_file, needs_intervention=True, ) await db.commit() update_status( novel_id, phase="waiting_intervention", message="Waiting for you to solve the captcha...", screenshot=screenshot_file, ) # Wait for user to solve it intervention_success = await captcha_detector.wait_for_intervention( novel_id, timeout_minutes=30 ) if not intervention_success: logger.error(f"Novel {novel_id}: Intervention timeout, stopping.") async with session_factory() as db: await update_novel_status( db, novel_id, NovelStatus.FAILED, error_message="Captcha intervention timeout", ) await db.commit() update_status(novel_id, phase="failed", message="Captcha timeout!") return # Intervention completed, clear the flag async with session_factory() as db: await update_novel_status( db, novel_id, NovelStatus.SCRAPING, needs_intervention=False, ) await db.commit() # Wait a moment and re-check await human.medium_delay("post-intervention") continue # Re-check the page # --- 7b: Extract chapter content --- chapter_data = await extract_chapter_content( page, novel_id, content_sel ) if chapter_data is None: consecutive_failures += 1 logger.warning( f"Novel {novel_id}: Failed to extract chapter {chapter_number} " f"(failure {consecutive_failures}/{max_consecutive_failures})" ) if consecutive_failures >= max_consecutive_failures: logger.error( f"Novel {novel_id}: Too many consecutive failures, stopping." ) async with session_factory() as db: await update_novel_status( db, novel_id, NovelStatus.FAILED, error_message=f"Too many extraction failures at chapter {chapter_number}", ) await db.commit() update_status( novel_id, phase="failed", message=f"Failed after {max_consecutive_failures} consecutive extraction failures" ) return # Try clicking next anyway has_next = await click_next_chapter( page, human, novel_id, next_btn_selector ) if not has_next: break continue # Reset failure counter on success consecutive_failures = 0 # --- 7c: Save to database --- try: async with session_factory() as db: # Check if already saved (for resume scenarios) already_exists = await chapter_exists( db, novel_id, chapter_number ) if not already_exists: # --- Translate to Hindi in background thread --- loop = asyncio.get_event_loop() hindi_content = await loop.run_in_executor( None, _translate_hindi, chapter_data["content"] ) hindi_title = await loop.run_in_executor( None, _translate_hindi, chapter_data["title"] ) logger.info( f"Novel {novel_id}: 🇮🇳 Translated Ch {chapter_number} to Hindi" ) await save_chapter( db, novel_id=novel_id, chapter_number=chapter_number, content=chapter_data["content"], title=chapter_data["title"], url=chapter_data["url"], content_hindi=hindi_content or None, title_hindi=hindi_title or None, ) await increment_chapter_count(db, novel_id) await db.commit() logger.info( f"Novel {novel_id}: ✅ Saved Chapter {chapter_number} " f"- '{chapter_data['title']}'" ) else: logger.info( f"Novel {novel_id}: Chapter {chapter_number} already exists, skipping save" ) except IntegrityError: logger.warning( f"Novel {novel_id}: Chapter {chapter_number} duplicate, skipping" ) except Exception as e: logger.error( f"Novel {novel_id}: Database error saving chapter {chapter_number}: {e}" ) # Don't stop scraping for DB errors, log and continue update_status( novel_id, phase="scraping", message=f"✅ Chapter {chapter_number} saved! Moving to next...", chapters_scraped=chapter_number, last_chapter_title=chapter_data["title"], ) # --- 7d: Human-like reading delay --- await human.simulate_reading_chapter() # --- 7e: Click Next Chapter --- has_next = await click_next_chapter( page, human, novel_id, next_btn_selector ) if not has_next: logger.info( f"Novel {novel_id}: No next chapter found after chapter {chapter_number}. " f"Novel might be complete! 🎉" ) break chapter_number += 1 # --- Safety: Don't scrape more than 10000 chapters --- if chapter_number > 10000: logger.warning(f"Novel {novel_id}: Hit 10000 chapter limit, stopping") break # --- Step 8: Scraping Complete --- async with session_factory() as db: await update_novel_status( db, novel_id, NovelStatus.COMPLETED, ) await db.commit() update_status( novel_id, phase="completed", message=f"🎉 Completed! {chapter_number - 1} chapters scraped.", chapters_scraped=chapter_number - 1, ) logger.info( f"Novel {novel_id} ({title}): 🎉 COMPLETED! " f"{chapter_number - 1} chapters scraped." ) except PlaywrightError as e: error_msg = f"Browser error: {str(e)}" logger.error(f"Novel {novel_id}: {error_msg}") async with session_factory() as db: await update_novel_status( db, novel_id, NovelStatus.PAUSED_ERROR, error_message=error_msg, ) await db.commit() update_status(novel_id, phase="error", message=error_msg) except Exception as e: error_msg = f"Unexpected error: {str(e)}" logger.error(f"Novel {novel_id}: {error_msg}", exc_info=True) try: async with session_factory() as db: await update_novel_status( db, novel_id, NovelStatus.PAUSED_ERROR, error_message=error_msg, ) await db.commit() except Exception: pass update_status(novel_id, phase="error", message=error_msg) finally: # --- Always clean up the browser context --- await browser_manager.close_context(novel_id) captcha_detector.clear_intervention(novel_id) logger.info(f"Novel {novel_id}: Browser context cleaned up.") # ============================================ # Task Manager (Start Multiple Novels) # ============================================ # Store active tasks: {novel_id: asyncio.Task} active_tasks: Dict[int, asyncio.Task] = {} async def start_scraping_novel(novel_id: int, novel_data: Dict[str, Any]): """ Start scraping a novel as a background task. """ if novel_id in active_tasks: task = active_tasks[novel_id] if not task.done(): logger.warning(f"Novel {novel_id}: Already scraping, ignoring duplicate start") return False # Create and store the task task = asyncio.create_task( scrape_novel(novel_id, novel_data), name=f"scraper-novel-{novel_id}", ) active_tasks[novel_id] = task # Add callback to clean up when done def task_done_callback(t: asyncio.Task): try: exception = t.exception() if exception: logger.error( f"Novel {novel_id} task failed with exception: {exception}" ) except asyncio.CancelledError: logger.info(f"Novel {novel_id} task was cancelled") task.add_done_callback(task_done_callback) logger.info(f"Novel {novel_id}: Scraping task started! 🚀") return True async def stop_scraping_novel(novel_id: int) -> bool: """ Stop scraping a specific novel. """ if novel_id in active_tasks: task = active_tasks[novel_id] if not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass # Clean up browser await browser_manager.close_context(novel_id) logger.info(f"Novel {novel_id}: Scraping stopped ✋") update_status(novel_id, phase="stopped", message="Scraping stopped by user") return True return False async def stop_all_scraping(): """Stop all active scraping tasks.""" for novel_id in list(active_tasks.keys()): await stop_scraping_novel(novel_id) logger.info("All scraping tasks stopped.") def get_active_task_ids() -> list: """Get IDs of all novels currently being scraped.""" return [nid for nid, task in active_tasks.items() if not task.done()]