Spaces:
Running
Running
| """ | |
| Unified scraper. | |
| Calls the right parser based on the URL via the site registry. Tries | |
| ScraperAPI first (residential proxy → bypasses most blocks), falls back | |
| to Playwright (headless browser), then to plain requests as last resort. | |
| Site parsers all return the same ProductData shape — see src/sites/_base.py. | |
| """ | |
| import logging | |
| import random | |
| import re | |
| from typing import Optional | |
| from bs4 import BeautifulSoup | |
| from . import config | |
| from .sites import find_parser, generic | |
| from .sites._base import clean | |
| logger = logging.getLogger(__name__) | |
| USER_AGENTS = [ | |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " | |
| "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_4) AppleWebKit/605.1.15 " | |
| "(KHTML, like Gecko) Version/17.4 Safari/605.1.15", | |
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " | |
| "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", | |
| ] | |
| def scrape_url(url: str) -> dict: | |
| """ | |
| Scraping pipeline: | |
| 1. ScraperAPI (residential proxy) | |
| 2. Playwright (headless Chromium) | |
| 3. Plain requests (last resort) | |
| First one that returns valid HTML wins. Then the URL-specific parser | |
| extracts product data from that HTML. | |
| """ | |
| if not url: | |
| return {"error": "URL is required."} | |
| url = url.strip() | |
| if not url.startswith(("http://", "https://")): | |
| url = "https://" + url | |
| last_error: Optional[str] = None | |
| html: Optional[str] = None | |
| method: Optional[str] = None | |
| # 1) ScraperAPI | |
| if config.SCRAPERAPI_ENABLED: | |
| try: | |
| html, err = _fetch_scraperapi(url) | |
| if html: | |
| method = "scraperapi" | |
| else: | |
| last_error = err | |
| logger.warning(f"ScraperAPI failed: {err}") | |
| except Exception as e: | |
| last_error = f"ScraperAPI crashed: {e}" | |
| logger.exception(last_error) | |
| # 2) Playwright | |
| if not html and config.PLAYWRIGHT_ENABLED: | |
| try: | |
| html, err = _fetch_playwright(url) | |
| if html: | |
| method = "playwright" | |
| else: | |
| last_error = err | |
| logger.warning(f"Playwright failed: {err}") | |
| except Exception as e: | |
| last_error = f"Playwright crashed: {e}" | |
| logger.exception(last_error) | |
| # 3) Plain requests | |
| if not html: | |
| try: | |
| html, err = _fetch_requests(url) | |
| if html: | |
| method = "requests" | |
| else: | |
| last_error = err | |
| except Exception as e: | |
| last_error = f"Requests crashed: {e}" | |
| logger.exception(last_error) | |
| if not html: | |
| return { | |
| "error": f"All scrapers failed. Last: {last_error or 'unknown'}", | |
| "scraper_used": "none", | |
| } | |
| # ── Parse with the right site parser ── | |
| soup = BeautifulSoup(html, "html.parser") | |
| for tag in soup(["script", "style", "noscript", "iframe"]): | |
| tag.decompose() | |
| parser = find_parser(url) | |
| if parser is None: | |
| parser = generic.parse | |
| logger.info(f"No site parser for {url} — using generic") | |
| else: | |
| logger.info(f"Using {parser.__module__} for {url}") | |
| try: | |
| data = parser(soup).to_dict() | |
| except Exception as e: | |
| logger.exception("Parser crashed; falling back to generic") | |
| data = generic.parse(soup).to_dict() | |
| data["parse_error"] = str(e) | |
| # ── Build the QA context (everything BERT can search over) ── | |
| parts = [] | |
| if data.get("title"): | |
| parts.append(f"Product: {data['title']}.") | |
| if data.get("features"): | |
| parts.append(f"Features: {data['features']}") | |
| if data.get("description"): | |
| parts.append(f"Description: {data['description']}") | |
| if data.get("specs"): | |
| parts.append(f"Specifications: {data['specs']}") | |
| if data.get("materials"): | |
| parts.append(f"Materials: {data['materials']}") | |
| if data.get("sizes"): | |
| parts.append(f"Available sizes: {data['sizes']}") | |
| if data.get("return_policy"): | |
| parts.append(f"Return policy: {data['return_policy']}") | |
| if data.get("rating_text"): | |
| parts.append(f"Rating: {data['rating_text']}") | |
| context = clean(" ".join(parts), limit=20000) | |
| data["context"] = context | |
| data["char_count"] = len(context) | |
| data["scraper_used"] = method | |
| data["reviews"] = data.get("reviews", []) | |
| data["review_count"] = len(data["reviews"]) | |
| if len(context) < 50 and not data["reviews"]: | |
| data["warning"] = ( | |
| f"Very little usable text was extracted with {method}. " | |
| "The site may have served a CAPTCHA or blocked content. " | |
| "Try the URL again, or paste the product description manually " | |
| "in Text mode." | |
| ) | |
| logger.info( | |
| f"Scraped [{data['source']} via {method}] " | |
| f"title={data.get('title', '?')[:60]!r} " | |
| f"chars={len(context)} reviews={data['review_count']}" | |
| ) | |
| return data | |
| # ──────────────────────────────────────────────────────────────────── | |
| # Fetchers | |
| # ──────────────────────────────────────────────────────────────────── | |
| def _fetch_scraperapi(url: str) -> tuple[Optional[str], Optional[str]]: | |
| """Returns (html, None) on success, (None, error_str) on failure.""" | |
| import requests | |
| from urllib.parse import urlencode | |
| if not config.SCRAPERAPI_KEY: | |
| return None, "ScraperAPI key not configured." | |
| params = { | |
| "api_key": config.SCRAPERAPI_KEY, | |
| "url": url, | |
| "country_code": "in", | |
| "render": "true" if config.SCRAPERAPI_RENDER_JS else "false", | |
| "keep_headers": "false", | |
| } | |
| request_url = f"https://api.scraperapi.com?{urlencode(params)}" | |
| try: | |
| logger.info(f"ScraperAPI fetching: {url}") | |
| resp = requests.get(request_url, timeout=config.SCRAPERAPI_TIMEOUT) | |
| except requests.exceptions.Timeout: | |
| return None, f"ScraperAPI timeout ({config.SCRAPERAPI_TIMEOUT}s)" | |
| except requests.exceptions.RequestException as e: | |
| return None, f"ScraperAPI network: {e}" | |
| if resp.status_code == 401: | |
| return None, "ScraperAPI key rejected (401)" | |
| if resp.status_code == 403: | |
| return None, "ScraperAPI denied (403) — out of credits?" | |
| if resp.status_code == 429: | |
| return None, "ScraperAPI rate limited (429)" | |
| if resp.status_code >= 400: | |
| return None, f"ScraperAPI HTTP {resp.status_code}" | |
| html = resp.text or "" | |
| if not html.strip(): | |
| return None, "ScraperAPI returned empty body" | |
| return html, None | |
| def _fetch_playwright(url: str) -> tuple[Optional[str], Optional[str]]: | |
| try: | |
| from playwright.sync_api import sync_playwright, TimeoutError as PWTimeoutError | |
| except ImportError: | |
| return None, "Playwright not installed" | |
| with sync_playwright() as pw: | |
| try: | |
| browser = pw.chromium.launch( | |
| headless=config.PLAYWRIGHT_HEADLESS, | |
| args=[ | |
| "--no-sandbox", | |
| "--disable-dev-shm-usage", | |
| "--disable-blink-features=AutomationControlled", | |
| ], | |
| ) | |
| except Exception as e: | |
| return None, f"Browser launch failed: {e}" | |
| try: | |
| context = browser.new_context( | |
| user_agent=random.choice(USER_AGENTS), | |
| viewport={"width": 1280, "height": 900}, | |
| locale="en-US", | |
| ) | |
| context.route( | |
| "**/*", | |
| lambda route: ( | |
| route.abort() | |
| if route.request.resource_type in {"image", "media", "font"} | |
| else route.continue_() | |
| ), | |
| ) | |
| page = context.new_page() | |
| page.set_default_timeout(config.PLAYWRIGHT_TIMEOUT_MS) | |
| try: | |
| page.goto(url, wait_until="domcontentloaded", | |
| timeout=config.PLAYWRIGHT_TIMEOUT_MS) | |
| except PWTimeoutError: | |
| return None, f"Page load timed out ({config.PLAYWRIGHT_TIMEOUT_MS//1000}s)" | |
| head = page.content()[:5000].lower() | |
| if any(p in head for p in [ | |
| "enter the characters you see below", | |
| "type the characters", | |
| "automated access", | |
| ]): | |
| return None, "Site served a CAPTCHA" | |
| try: | |
| page.wait_for_load_state("networkidle", timeout=8000) | |
| except PWTimeoutError: | |
| pass | |
| # Auto-scroll for lazy-loaded reviews (helpful on Flipkart, Myntra) | |
| try: | |
| for _ in range(3): | |
| page.mouse.wheel(0, 1500) | |
| page.wait_for_timeout(400) | |
| except Exception: | |
| pass | |
| html = page.content() | |
| finally: | |
| try: | |
| context.close() | |
| except Exception: | |
| pass | |
| browser.close() | |
| return html, None | |
| def _fetch_requests(url: str) -> tuple[Optional[str], Optional[str]]: | |
| """Plain requests — last-resort fallback.""" | |
| import requests | |
| headers = { | |
| "User-Agent": random.choice(USER_AGENTS), | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Accept-Language": "en-US,en;q=0.9", | |
| "Connection": "keep-alive", | |
| } | |
| try: | |
| resp = requests.get(url, headers=headers, | |
| timeout=config.SCRAPE_TIMEOUT) | |
| except requests.exceptions.Timeout: | |
| return None, "Request timeout" | |
| except requests.exceptions.RequestException as e: | |
| return None, f"Request error: {e}" | |
| if resp.status_code == 403: | |
| return None, "Site blocked the request (HTTP 403)" | |
| if resp.status_code >= 400: | |
| return None, f"HTTP {resp.status_code}" | |
| return resp.text or "", None | |