""" Unified scraper. Calls the right parser based on the URL via the site registry. Tries ScraperAPI first (residential proxy → bypasses most blocks), falls back to Playwright (headless browser), then to plain requests as last resort. Site parsers all return the same ProductData shape — see src/sites/_base.py. """ import logging import random import re from typing import Optional from bs4 import BeautifulSoup from . import config from .sites import find_parser, generic from .sites._base import clean logger = logging.getLogger(__name__) USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_4) AppleWebKit/605.1.15 " "(KHTML, like Gecko) Version/17.4 Safari/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", ] def scrape_url(url: str) -> dict: """ Scraping pipeline: 1. ScraperAPI (residential proxy) 2. Playwright (headless Chromium) 3. Plain requests (last resort) First one that returns valid HTML wins. Then the URL-specific parser extracts product data from that HTML. """ if not url: return {"error": "URL is required."} url = url.strip() if not url.startswith(("http://", "https://")): url = "https://" + url last_error: Optional[str] = None html: Optional[str] = None method: Optional[str] = None # 1) ScraperAPI if config.SCRAPERAPI_ENABLED: try: html, err = _fetch_scraperapi(url) if html: method = "scraperapi" else: last_error = err logger.warning(f"ScraperAPI failed: {err}") except Exception as e: last_error = f"ScraperAPI crashed: {e}" logger.exception(last_error) # 2) Playwright if not html and config.PLAYWRIGHT_ENABLED: try: html, err = _fetch_playwright(url) if html: method = "playwright" else: last_error = err logger.warning(f"Playwright failed: {err}") except Exception as e: last_error = f"Playwright crashed: {e}" logger.exception(last_error) # 3) Plain requests if not html: try: html, err = _fetch_requests(url) if html: method = "requests" else: last_error = err except Exception as e: last_error = f"Requests crashed: {e}" logger.exception(last_error) if not html: return { "error": f"All scrapers failed. Last: {last_error or 'unknown'}", "scraper_used": "none", } # ── Parse with the right site parser ── soup = BeautifulSoup(html, "html.parser") for tag in soup(["script", "style", "noscript", "iframe"]): tag.decompose() parser = find_parser(url) if parser is None: parser = generic.parse logger.info(f"No site parser for {url} — using generic") else: logger.info(f"Using {parser.__module__} for {url}") try: data = parser(soup).to_dict() except Exception as e: logger.exception("Parser crashed; falling back to generic") data = generic.parse(soup).to_dict() data["parse_error"] = str(e) # ── Build the QA context (everything BERT can search over) ── parts = [] if data.get("title"): parts.append(f"Product: {data['title']}.") if data.get("features"): parts.append(f"Features: {data['features']}") if data.get("description"): parts.append(f"Description: {data['description']}") if data.get("specs"): parts.append(f"Specifications: {data['specs']}") if data.get("materials"): parts.append(f"Materials: {data['materials']}") if data.get("sizes"): parts.append(f"Available sizes: {data['sizes']}") if data.get("return_policy"): parts.append(f"Return policy: {data['return_policy']}") if data.get("rating_text"): parts.append(f"Rating: {data['rating_text']}") context = clean(" ".join(parts), limit=20000) data["context"] = context data["char_count"] = len(context) data["scraper_used"] = method data["reviews"] = data.get("reviews", []) data["review_count"] = len(data["reviews"]) if len(context) < 50 and not data["reviews"]: data["warning"] = ( f"Very little usable text was extracted with {method}. " "The site may have served a CAPTCHA or blocked content. " "Try the URL again, or paste the product description manually " "in Text mode." ) logger.info( f"Scraped [{data['source']} via {method}] " f"title={data.get('title', '?')[:60]!r} " f"chars={len(context)} reviews={data['review_count']}" ) return data # ──────────────────────────────────────────────────────────────────── # Fetchers # ──────────────────────────────────────────────────────────────────── def _fetch_scraperapi(url: str) -> tuple[Optional[str], Optional[str]]: """Returns (html, None) on success, (None, error_str) on failure.""" import requests from urllib.parse import urlencode if not config.SCRAPERAPI_KEY: return None, "ScraperAPI key not configured." params = { "api_key": config.SCRAPERAPI_KEY, "url": url, "country_code": "in", "render": "true" if config.SCRAPERAPI_RENDER_JS else "false", "keep_headers": "false", } request_url = f"https://api.scraperapi.com?{urlencode(params)}" try: logger.info(f"ScraperAPI fetching: {url}") resp = requests.get(request_url, timeout=config.SCRAPERAPI_TIMEOUT) except requests.exceptions.Timeout: return None, f"ScraperAPI timeout ({config.SCRAPERAPI_TIMEOUT}s)" except requests.exceptions.RequestException as e: return None, f"ScraperAPI network: {e}" if resp.status_code == 401: return None, "ScraperAPI key rejected (401)" if resp.status_code == 403: return None, "ScraperAPI denied (403) — out of credits?" if resp.status_code == 429: return None, "ScraperAPI rate limited (429)" if resp.status_code >= 400: return None, f"ScraperAPI HTTP {resp.status_code}" html = resp.text or "" if not html.strip(): return None, "ScraperAPI returned empty body" return html, None def _fetch_playwright(url: str) -> tuple[Optional[str], Optional[str]]: try: from playwright.sync_api import sync_playwright, TimeoutError as PWTimeoutError except ImportError: return None, "Playwright not installed" with sync_playwright() as pw: try: browser = pw.chromium.launch( headless=config.PLAYWRIGHT_HEADLESS, args=[ "--no-sandbox", "--disable-dev-shm-usage", "--disable-blink-features=AutomationControlled", ], ) except Exception as e: return None, f"Browser launch failed: {e}" try: context = browser.new_context( user_agent=random.choice(USER_AGENTS), viewport={"width": 1280, "height": 900}, locale="en-US", ) context.route( "**/*", lambda route: ( route.abort() if route.request.resource_type in {"image", "media", "font"} else route.continue_() ), ) page = context.new_page() page.set_default_timeout(config.PLAYWRIGHT_TIMEOUT_MS) try: page.goto(url, wait_until="domcontentloaded", timeout=config.PLAYWRIGHT_TIMEOUT_MS) except PWTimeoutError: return None, f"Page load timed out ({config.PLAYWRIGHT_TIMEOUT_MS//1000}s)" head = page.content()[:5000].lower() if any(p in head for p in [ "enter the characters you see below", "type the characters", "automated access", ]): return None, "Site served a CAPTCHA" try: page.wait_for_load_state("networkidle", timeout=8000) except PWTimeoutError: pass # Auto-scroll for lazy-loaded reviews (helpful on Flipkart, Myntra) try: for _ in range(3): page.mouse.wheel(0, 1500) page.wait_for_timeout(400) except Exception: pass html = page.content() finally: try: context.close() except Exception: pass browser.close() return html, None def _fetch_requests(url: str) -> tuple[Optional[str], Optional[str]]: """Plain requests — last-resort fallback.""" import requests headers = { "User-Agent": random.choice(USER_AGENTS), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9", "Connection": "keep-alive", } try: resp = requests.get(url, headers=headers, timeout=config.SCRAPE_TIMEOUT) except requests.exceptions.Timeout: return None, "Request timeout" except requests.exceptions.RequestException as e: return None, f"Request error: {e}" if resp.status_code == 403: return None, "Site blocked the request (HTTP 403)" if resp.status_code >= 400: return None, f"HTTP {resp.status_code}" return resp.text or "", None