from typing import Dict, List, Optional import requests from bs4 import BeautifulSoup import time from app.config import API_KEYS, SEARCH_ENGINE_IDS import logging import hashlib from functools import lru_cache from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError as FuturesTimeoutError from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from urllib.parse import urlparse from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError import re import threading from ..logger import logger logger.info("Scraper started") # Optional fallback try: import cloudscraper CLOUDSCRAPER_AVAILABLE = True except Exception: CLOUDSCRAPER_AVAILABLE = False # ---- OPTIMIZED Configuration ---- MAX_WORKERS = 8 # Increased for parallelism REQUEST_TIMEOUT = 12 # Reduced from 6 - fail faster PER_URL_TIMEOUT = 14 # Reduced from 10 POLITENESS_DELAY = 0.1 # Reduced from 0.25 GOOGLE_NUM_DEFAULT = 10 MIN_TEXT_LENGTH = 700 GOOGLE_API_TIMEOUT = 6 BRAVE_API_KEY = "BSAE_jMY2tpTa_jYwCkcaiddxmzLs7m" BRAVE_ENDPOINT = "https://api.search.brave.com/res/v1/web/search" _api_key_index = 0 _api_key_lock = threading.Lock() def _get_next_api_credentials(): """Get next API key and search engine ID in round-robin fashion""" global _api_key_index with _api_key_lock: key = API_KEYS[_api_key_index] engine_id = SEARCH_ENGINE_IDS[_api_key_index] _api_key_index = (_api_key_index + 1) % len(API_KEYS) return key, engine_id # ✅ HARD TIMEOUT: 3 minutes (180 seconds) for ALL scraping MULTI_QUERY_TIMEOUT = 180 # Blacklist slow/unscrapeable domains BLACKLIST_DOMAINS = { 'neurips.cc', 'icml.cc', 'jmlr.org', 'researchgate.net', 'arxiv.org', 'springer.com', 'nature.com', 'nips.cc', 'iccv2023.thecvf.com' } # Logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("scraper") # ---- Session ---- def _make_session() -> requests.Session: s = requests.Session() retries = Retry( total=1, # Only 1 retry - fail fast backoff_factor=0.1, # Minimal backoff status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET", "POST"], respect_retry_after_header=False # Don't wait for Retry-After ) s.mount("https://", HTTPAdapter(max_retries=retries, pool_connections=20, pool_maxsize=20)) s.mount("http://", HTTPAdapter(max_retries=retries, pool_connections=20, pool_maxsize=20)) s.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Connection": "keep-alive", }) return s _SESSION = _make_session() # ---- Global timeout tracking ---- _scraping_start_time = None _scraping_deadline = None def _set_scraping_deadline(): """Set the scraping deadline to 3 minutes from now""" global _scraping_start_time, _scraping_deadline _scraping_start_time = time.time() _scraping_deadline = _scraping_start_time + MULTI_QUERY_TIMEOUT def _time_remaining() -> float: """Get remaining time in seconds""" if _scraping_deadline is None: return MULTI_QUERY_TIMEOUT remaining = _scraping_deadline - time.time() return max(0, remaining) def _is_timeout_exceeded() -> bool: """Check if timeout has been exceeded""" return _time_remaining() <= 0 # ---- Helpers ---- def _normalize_whitespace(s: str) -> str: return " ".join(s.split()) def _text_hash(text: str) -> str: return hashlib.sha256(text.encode("utf-8")).hexdigest() def _clean_soup(soup: BeautifulSoup, prefer_main: bool = True, max_chars: Optional[int] = None) -> str: for junk in soup(["script", "style", "nav", "footer", "noscript", "header"]): junk.decompose() parts = [] if prefer_main: main = soup.find(["main", "article", "section"]) if main: elems = main.find_all(["p", "h1", "h2", "h3", "li"]) else: elems = soup.find_all(["p", "h1", "h2", "h3"]) else: elems = soup.find_all(["p", "li"]) for el in elems: t = el.get_text(separator=" ", strip=True) if t and len(t) > 30: parts.append(t) text = _normalize_whitespace(" ".join(parts)) if max_chars and len(text) > max_chars: return text[:max_chars] return text # ---- Google Search ---- @lru_cache(maxsize=256) def google_search(query: str, num_results: int = 10): api_key, search_engine_id = _get_next_api_credentials() if not api_key or not search_engine_id: logger.warning("Missing API_KEY or SEARCH_ENGINE_ID in config.py") return [] url = "https://www.googleapis.com/customsearch/v1" params = {"key": api_key, "cx": search_engine_id, "q": query, "num": num_results} try: r = _SESSION.get(url, params=params, timeout=GOOGLE_API_TIMEOUT) r.raise_for_status() data = r.json() items = data.get("items", []) or [] out = [] for i in items: out.append({ "link": i.get("link"), "title": i.get("title", ""), "snippet": i.get("snippet", "") }) logger.info(f"google_search: got {len(out)} items for '{query[:60]}'") return out except Exception as e: logger.warning(f"google_search failed: {e}") return [] # ---- IMPROVED: Cloudscraper with early exit ---- def scrape_with_cloudscraper(url: str, timeout: int = 8): if not CLOUDSCRAPER_AVAILABLE: return "" try: logger.debug(f"cloudscraper: GET {url}") scraper = cloudscraper.create_scraper() r = scraper.get(url, timeout=timeout) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") text = _clean_soup(soup, prefer_main=True, max_chars=20000) if text and len(text) > MIN_TEXT_LENGTH: logger.info(f" ✅ Scraped {len(text)} chars (cloudscraper) for {url}") return text return "" except Exception as e: logger.debug(f"cloudscraper error for {url}: {e}") return "" # ---- IMPROVED: Playwright with hard timeout ---- def scrape_with_playwright(url: str, timeout: int = 8): """Scrape JS-heavy pages with aggressive waiting and content extraction.""" try: logger.debug(f"Playwright: GET {url}") with sync_playwright() as p: browser = p.chromium.launch( headless=True, args=[ "--disable-blink-features=AutomationControlled", "--disable-dev-shm-usage", "--no-sandbox", "--disable-gpu" ] ) context = browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" ) context.set_default_timeout(timeout * 1000) page = context.new_page() try: # Navigate with longer timeout for JS-heavy sites page.goto(url, wait_until="networkidle", timeout=timeout * 1000) # Wait for common content containers to load try: page.wait_for_selector("article, [role='article'], .post-content, .blog-content, main", timeout=3000) except: pass # Selector may not exist, continue anyway # Extended wait for dynamic content page.wait_for_timeout(3000) # 3 seconds for rendering # Scroll down to load lazy-loaded images/content page.evaluate(""" async () => { await new Promise((resolve) => { let totalHeight = 0; const distance = 100; const timer = setInterval(() => { window.scrollBy(0, distance); totalHeight += distance; if (totalHeight >= document.body.scrollHeight) { clearInterval(timer); resolve(); } }, 100); }); } """) # Wait after scrolling page.wait_for_timeout(1500) # Scroll back to top page.evaluate("window.scrollTo(0, 0)") page.wait_for_timeout(500) html = page.content() finally: context.close() browser.close() soup = BeautifulSoup(html, "html.parser") text = _clean_soup(soup, prefer_main=True, max_chars=25000) if text and len(text) > MIN_TEXT_LENGTH: logger.info(f" ✅ Scraped {len(text)} chars (Playwright) for {url}") return text # If we got very little content, try a less aggressive cleanup logger.debug(f"Initial extraction got {len(text)} chars, trying aggressive extraction") text_aggressive = _clean_soup(soup, prefer_main=False, max_chars=25000) if text_aggressive and len(text_aggressive) > MIN_TEXT_LENGTH and len(text_aggressive) > len(text): logger.info(f" ✅ Scraped {len(text_aggressive)} chars (Playwright aggressive) for {url}") return text_aggressive logger.debug(f"Playwright extraction minimal for {url}: {len(text)} chars") return text if text else "" except (PlaywrightTimeoutError, Exception) as e: logger.debug(f"Playwright error for {url}: {e}") return "" # ---- IMPROVED: Smart fallback strategy ---- def scrape_page(url: str, timeout: int = 5): """Scrape pages in order: requests → cloudscraper → Playwright.""" # ✅ HARD CHECK: Exit immediately if timeout exceeded if _is_timeout_exceeded(): logger.warning(f"Scraping timeout exceeded, skipping {url}") return "" try: domain = urlparse(url).netloc.lower() # Skip blacklisted domains entirely if any(bd in domain for bd in BLACKLIST_DOMAINS): logger.info(f"Skipping blacklisted domain: {domain}") return "" # --- 1️⃣ Requests (fastest) --- headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", } try: logger.debug(f"requests: GET {url}") r = _SESSION.get(url, headers=headers, timeout=timeout, allow_redirects=True) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") text = _clean_soup(soup, prefer_main=True, max_chars=20000) if text and len(text) > MIN_TEXT_LENGTH: logger.info(f" ✅ Scraped {len(text)} chars (requests) for {url}") return text except Exception as e: logger.debug(f"requests failed for {url}: {e}") # ✅ HARD CHECK: Exit if timeout exceeded between attempts if _is_timeout_exceeded(): logger.warning(f"Scraping timeout exceeded, stopping fallback methods for {url}") return "" # --- 2️⃣ Cloudscraper (medium) --- if CLOUDSCRAPER_AVAILABLE: try: logger.debug(f"cloudscraper: GET {url}") scraper = cloudscraper.create_scraper() r = scraper.get(url, timeout=timeout) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") text = _clean_soup(soup, prefer_main=True, max_chars=20000) if text and len(text) > MIN_TEXT_LENGTH: logger.info(f" ✅ Scraped {len(text)} chars (cloudscraper) for {url}") return text except Exception as e: logger.debug(f"cloudscraper failed for {url}: {e}") # ✅ HARD CHECK: Exit if timeout exceeded before Playwright if _is_timeout_exceeded(): logger.warning(f"Scraping timeout exceeded, skipping Playwright for {url}") return "" # --- 3️⃣ Playwright (heaviest) --- try: logger.debug(f"Playwright: GET {url}") res = scrape_with_playwright(url, timeout=12) if res and len(res) > MIN_TEXT_LENGTH: return res except Exception as e: logger.debug(f"Playwright failed for {url}: {e}") except Exception as e: logger.debug(f"All scrapers failed for {url}: {e}") return "" # ---- IMPROVED: Parallel fetch with HARD OVERALL TIMEOUT ---- def fetch_sources(query: str, num_results: int = 10): """Fetch sources for a single query""" logger.info(f"Fetching sources for query: '{query[:60]}'") items = google_search(query, num_results=num_results) if not items: logger.warning("No URLs returned from Google Search") return [] urls = [it["link"] for it in items if it.get("link")] snippets = {it["link"]: it.get("snippet", "") for it in items if it.get("link")} domain_last_hit: Dict[str, float] = {} def _scrape_task(u: str) -> Dict: try: dom = urlparse(u).netloc last = domain_last_hit.get(dom, 0.0) now = time.time() delta = now - last if delta < POLITENESS_DELAY: time.sleep(POLITENESS_DELAY - delta) domain_last_hit[dom] = time.time() logger.info(f"Scraping URL: {u}") text = scrape_page(u, timeout=REQUEST_TIMEOUT) if text: return {"url": u, "content": text, "method": "scraped", "len": len(text), "hash": _text_hash(text[:4000])} else: return {"url": u, "content": "", "method": "failed", "len": 0, "hash": ""} except Exception as e: logger.debug(f"Exception in _scrape_task for {u}: {e}") return {"url": u, "content": "", "method": "error", "len": 0, "hash": ""} results = [] with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex: future_to_url = {ex.submit(_scrape_task, u): u for u in urls} for fut in as_completed(future_to_url): u = future_to_url[fut] try: r = fut.result(timeout=PER_URL_TIMEOUT) if r.get("content"): results.append(r) except FuturesTimeoutError: logger.warning(f"Timeout scraping {u}") except Exception as e: logger.warning(f"Error scraping {u}: {e}") if not results: logger.info("No pages scraped; using snippets") for u in urls: snip = snippets.get(u, "") if snip: results.append({"url": u, "content": snip, "method": "google_snippet", "len": len(snip), "hash": _text_hash(snip)}) return results # Deduplicate seen: Dict[str, Dict] = {} for r in results: h = r.get("hash") or _text_hash(r.get("content", "")) cur = seen.get(h) if not cur or (r.get("len", 0) > cur.get("len", 0)): seen[h] = r deduped = list(seen.values()) # Fill from snippets if needed if len(deduped) < num_results: for u in urls: if any(x["url"] == u for x in deduped): continue snip = snippets.get(u, "") if snip: deduped.append({"url": u, "content": snip, "method": "google_snippet", "len": len(snip), "hash": _text_hash(snip)}) if len(deduped) >= num_results: break logger.info(f"Fetched {len(deduped)} sources for query") return deduped # ---- NEW: Multi-query fetch - accepts pre-generated queries ---- def fetch_sources_multi_query(query: str, num_results: int = 10) -> List[Dict[str, str]]: """ Accept a single pre-generated query and fetch sources. NO internal query generation - just scrape this query. """ _set_scraping_deadline() logger.info(f"Processing single query with overall 3-minute timeout") all_sources: Dict[str, Dict] = {} DOC_EXTENSIONS = [".pdf", ".doc", ".docx", ".odf", ".xls", ".xlsx", ".ppt", ".pptx"] lock = threading.Lock() def _process_url(u: str, items: List) -> Optional[Dict]: """Process single URL with timeout check""" # ✅ HARD CHECK: Exit immediately if timeout exceeded if _is_timeout_exceeded(): return None if u in all_sources: return None if any(ext in u.lower() for ext in DOC_EXTENSIONS): logger.info(f"Skipping document URL: {u}") return None logger.info(f"Scraping URL: {u}") try: # Try scraping the page text_content = scrape_page(u, timeout=REQUEST_TIMEOUT) if text_content and len(text_content) > MIN_TEXT_LENGTH: logger.info(f" ✅ Scraped {len(text_content)} chars for {u}") return {"url": u, "content": text_content, "source_url": u} # If scraping failed, try snippet as fallback snippet = next((it.get("snippet", "") for it in items if it.get("link") == u), "") if snippet and len(snippet) > 50: logger.info(f" ⚠️ Using snippet ({len(snippet)} chars) for {u}") return {"url": u, "content": snippet, "source_url": u} logger.warning(f" ❌ No content extracted for {u}") return None except Exception as e: logger.debug(f"Error scraping {u}: {e}") return None # ✅ HARD CHECK: Exit if timeout exceeded if _is_timeout_exceeded(): logger.warning(f"Timeout exceeded, skipping query") return [] logger.info(f"Query: '{query[:60]}'") items = google_search(query, num_results=num_results) if not items: return [] urls = [it["link"] for it in items if it.get("link")] # Process URLs for this query in parallel with ThreadPoolExecutor(max_workers=4) as url_ex: url_futures = {url_ex.submit(_process_url, u, items): u for u in urls} for fut in as_completed(url_futures): # ✅ HARD CHECK: Exit if timeout exceeded if _is_timeout_exceeded(): logger.warning(f"Timeout exceeded, stopping URL processing") for f in url_futures: f.cancel() break try: result = fut.result(timeout=PER_URL_TIMEOUT) if result: with lock: all_sources[result['url']] = result logger.info(f" ✅ Added: {result['url'][:50]}") except FuturesTimeoutError: logger.warning(f"Timeout on URL") except Exception as e: logger.debug(f"Error: {e}") time.sleep(0.05) res = list(all_sources.values()) elapsed = time.time() - _scraping_start_time if _scraping_start_time else 0 logger.info(f"Sources for this query: {len(res)} (elapsed: {elapsed:.1f}s)") return res def fetch_brave_sources(query: str, num_results: int = 5) -> list[dict]: """Fetch from Brave Search API.""" headers = { "Accept": "application/json", "X-API-KEY": BRAVE_API_KEY } params = {"q": query, "count": num_results} try: resp = requests.get(BRAVE_ENDPOINT, headers=headers, params=params, timeout=6) resp.raise_for_status() data = resp.json() results = [] for item in data.get("webPages", []): results.append({ "title": item.get("url") or "Unknown", "content": item.get("snippet") or "", }) return results except Exception as e: logging.warning(f"Brave search failed for query '{query}': {e}") return [] def prepare_brave_query(text: str, min_len: int = 20, max_len: int = 200) -> str: t = re.sub(r"\s+", " ", text).strip() if len(t) < min_len: return None if len(t) > max_len: t = t[:max_len].rsplit(" ", 1)[0] return t