|
|
from typing import Dict, List, Optional |
|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
import time |
|
|
from app.config import API_KEYS, SEARCH_ENGINE_IDS |
|
|
import logging |
|
|
import hashlib |
|
|
from functools import lru_cache |
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError as FuturesTimeoutError |
|
|
from requests.adapters import HTTPAdapter |
|
|
from urllib3.util.retry import Retry |
|
|
from urllib.parse import urlparse |
|
|
from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError |
|
|
import re |
|
|
import threading |
|
|
|
|
|
|
|
|
from ..logger import logger |
|
|
logger.info("Scraper started") |
|
|
|
|
|
|
|
|
try: |
|
|
import cloudscraper |
|
|
CLOUDSCRAPER_AVAILABLE = True |
|
|
except Exception: |
|
|
CLOUDSCRAPER_AVAILABLE = False |
|
|
|
|
|
|
|
|
MAX_WORKERS = 8 |
|
|
REQUEST_TIMEOUT = 12 |
|
|
PER_URL_TIMEOUT = 14 |
|
|
POLITENESS_DELAY = 0.1 |
|
|
GOOGLE_NUM_DEFAULT = 10 |
|
|
MIN_TEXT_LENGTH = 700 |
|
|
GOOGLE_API_TIMEOUT = 6 |
|
|
BRAVE_API_KEY = "BSAE_jMY2tpTa_jYwCkcaiddxmzLs7m" |
|
|
BRAVE_ENDPOINT = "https://api.search.brave.com/res/v1/web/search" |
|
|
|
|
|
_api_key_index = 0 |
|
|
_api_key_lock = threading.Lock() |
|
|
|
|
|
def _get_next_api_credentials(): |
|
|
"""Get next API key and search engine ID in round-robin fashion""" |
|
|
global _api_key_index |
|
|
with _api_key_lock: |
|
|
key = API_KEYS[_api_key_index] |
|
|
engine_id = SEARCH_ENGINE_IDS[_api_key_index] |
|
|
_api_key_index = (_api_key_index + 1) % len(API_KEYS) |
|
|
return key, engine_id |
|
|
|
|
|
|
|
|
MULTI_QUERY_TIMEOUT = 180 |
|
|
|
|
|
|
|
|
BLACKLIST_DOMAINS = { |
|
|
'neurips.cc', |
|
|
'icml.cc', |
|
|
'jmlr.org', |
|
|
'researchgate.net', |
|
|
'arxiv.org', |
|
|
'springer.com', |
|
|
'nature.com', |
|
|
'nips.cc', |
|
|
'iccv2023.thecvf.com' |
|
|
} |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger("scraper") |
|
|
|
|
|
|
|
|
def _make_session() -> requests.Session: |
|
|
s = requests.Session() |
|
|
retries = Retry( |
|
|
total=1, |
|
|
backoff_factor=0.1, |
|
|
status_forcelist=[429, 500, 502, 503, 504], |
|
|
allowed_methods=["GET", "POST"], |
|
|
respect_retry_after_header=False |
|
|
) |
|
|
s.mount("https://", HTTPAdapter(max_retries=retries, pool_connections=20, pool_maxsize=20)) |
|
|
s.mount("http://", HTTPAdapter(max_retries=retries, pool_connections=20, pool_maxsize=20)) |
|
|
s.headers.update({ |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", |
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
|
|
"Accept-Language": "en-US,en;q=0.5", |
|
|
"Connection": "keep-alive", |
|
|
}) |
|
|
return s |
|
|
|
|
|
_SESSION = _make_session() |
|
|
|
|
|
|
|
|
_scraping_start_time = None |
|
|
_scraping_deadline = None |
|
|
|
|
|
def _set_scraping_deadline(): |
|
|
"""Set the scraping deadline to 3 minutes from now""" |
|
|
global _scraping_start_time, _scraping_deadline |
|
|
_scraping_start_time = time.time() |
|
|
_scraping_deadline = _scraping_start_time + MULTI_QUERY_TIMEOUT |
|
|
|
|
|
def _time_remaining() -> float: |
|
|
"""Get remaining time in seconds""" |
|
|
if _scraping_deadline is None: |
|
|
return MULTI_QUERY_TIMEOUT |
|
|
remaining = _scraping_deadline - time.time() |
|
|
return max(0, remaining) |
|
|
|
|
|
def _is_timeout_exceeded() -> bool: |
|
|
"""Check if timeout has been exceeded""" |
|
|
return _time_remaining() <= 0 |
|
|
|
|
|
|
|
|
def _normalize_whitespace(s: str) -> str: |
|
|
return " ".join(s.split()) |
|
|
|
|
|
def _text_hash(text: str) -> str: |
|
|
return hashlib.sha256(text.encode("utf-8")).hexdigest() |
|
|
|
|
|
def _clean_soup(soup: BeautifulSoup, prefer_main: bool = True, max_chars: Optional[int] = None) -> str: |
|
|
for junk in soup(["script", "style", "nav", "footer", "noscript", "header"]): |
|
|
junk.decompose() |
|
|
parts = [] |
|
|
if prefer_main: |
|
|
main = soup.find(["main", "article", "section"]) |
|
|
if main: |
|
|
elems = main.find_all(["p", "h1", "h2", "h3", "li"]) |
|
|
else: |
|
|
elems = soup.find_all(["p", "h1", "h2", "h3"]) |
|
|
else: |
|
|
elems = soup.find_all(["p", "li"]) |
|
|
for el in elems: |
|
|
t = el.get_text(separator=" ", strip=True) |
|
|
if t and len(t) > 30: |
|
|
parts.append(t) |
|
|
text = _normalize_whitespace(" ".join(parts)) |
|
|
if max_chars and len(text) > max_chars: |
|
|
return text[:max_chars] |
|
|
return text |
|
|
|
|
|
|
|
|
@lru_cache(maxsize=256) |
|
|
def google_search(query: str, num_results: int = 10): |
|
|
api_key, search_engine_id = _get_next_api_credentials() |
|
|
|
|
|
if not api_key or not search_engine_id: |
|
|
logger.warning("Missing API_KEY or SEARCH_ENGINE_ID in config.py") |
|
|
return [] |
|
|
|
|
|
url = "https://www.googleapis.com/customsearch/v1" |
|
|
params = {"key": api_key, "cx": search_engine_id, "q": query, "num": num_results} |
|
|
try: |
|
|
r = _SESSION.get(url, params=params, timeout=GOOGLE_API_TIMEOUT) |
|
|
r.raise_for_status() |
|
|
data = r.json() |
|
|
items = data.get("items", []) or [] |
|
|
out = [] |
|
|
for i in items: |
|
|
out.append({ |
|
|
"link": i.get("link"), |
|
|
"title": i.get("title", ""), |
|
|
"snippet": i.get("snippet", "") |
|
|
}) |
|
|
logger.info(f"google_search: got {len(out)} items for '{query[:60]}'") |
|
|
return out |
|
|
except Exception as e: |
|
|
logger.warning(f"google_search failed: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def scrape_with_cloudscraper(url: str, timeout: int = 8): |
|
|
if not CLOUDSCRAPER_AVAILABLE: |
|
|
return "" |
|
|
try: |
|
|
logger.debug(f"cloudscraper: GET {url}") |
|
|
scraper = cloudscraper.create_scraper() |
|
|
r = scraper.get(url, timeout=timeout) |
|
|
r.raise_for_status() |
|
|
soup = BeautifulSoup(r.text, "html.parser") |
|
|
text = _clean_soup(soup, prefer_main=True, max_chars=20000) |
|
|
if text and len(text) > MIN_TEXT_LENGTH: |
|
|
logger.info(f" β
Scraped {len(text)} chars (cloudscraper) for {url}") |
|
|
return text |
|
|
return "" |
|
|
except Exception as e: |
|
|
logger.debug(f"cloudscraper error for {url}: {e}") |
|
|
return "" |
|
|
|
|
|
|
|
|
def scrape_with_playwright(url: str, timeout: int = 8): |
|
|
"""Scrape JS-heavy pages with aggressive waiting and content extraction.""" |
|
|
try: |
|
|
logger.debug(f"Playwright: GET {url}") |
|
|
with sync_playwright() as p: |
|
|
browser = p.chromium.launch( |
|
|
headless=True, |
|
|
args=[ |
|
|
"--disable-blink-features=AutomationControlled", |
|
|
"--disable-dev-shm-usage", |
|
|
"--no-sandbox", |
|
|
"--disable-gpu" |
|
|
] |
|
|
) |
|
|
context = browser.new_context( |
|
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
|
|
) |
|
|
context.set_default_timeout(timeout * 1000) |
|
|
page = context.new_page() |
|
|
|
|
|
try: |
|
|
|
|
|
page.goto(url, wait_until="networkidle", timeout=timeout * 1000) |
|
|
|
|
|
|
|
|
try: |
|
|
page.wait_for_selector("article, [role='article'], .post-content, .blog-content, main", timeout=3000) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
page.wait_for_timeout(3000) |
|
|
|
|
|
|
|
|
page.evaluate(""" |
|
|
async () => { |
|
|
await new Promise((resolve) => { |
|
|
let totalHeight = 0; |
|
|
const distance = 100; |
|
|
const timer = setInterval(() => { |
|
|
window.scrollBy(0, distance); |
|
|
totalHeight += distance; |
|
|
if (totalHeight >= document.body.scrollHeight) { |
|
|
clearInterval(timer); |
|
|
resolve(); |
|
|
} |
|
|
}, 100); |
|
|
}); |
|
|
} |
|
|
""") |
|
|
|
|
|
|
|
|
page.wait_for_timeout(1500) |
|
|
|
|
|
|
|
|
page.evaluate("window.scrollTo(0, 0)") |
|
|
page.wait_for_timeout(500) |
|
|
|
|
|
html = page.content() |
|
|
finally: |
|
|
context.close() |
|
|
browser.close() |
|
|
|
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
text = _clean_soup(soup, prefer_main=True, max_chars=25000) |
|
|
|
|
|
if text and len(text) > MIN_TEXT_LENGTH: |
|
|
logger.info(f" β
Scraped {len(text)} chars (Playwright) for {url}") |
|
|
return text |
|
|
|
|
|
|
|
|
logger.debug(f"Initial extraction got {len(text)} chars, trying aggressive extraction") |
|
|
text_aggressive = _clean_soup(soup, prefer_main=False, max_chars=25000) |
|
|
if text_aggressive and len(text_aggressive) > MIN_TEXT_LENGTH and len(text_aggressive) > len(text): |
|
|
logger.info(f" β
Scraped {len(text_aggressive)} chars (Playwright aggressive) for {url}") |
|
|
return text_aggressive |
|
|
|
|
|
logger.debug(f"Playwright extraction minimal for {url}: {len(text)} chars") |
|
|
return text if text else "" |
|
|
|
|
|
except (PlaywrightTimeoutError, Exception) as e: |
|
|
logger.debug(f"Playwright error for {url}: {e}") |
|
|
return "" |
|
|
|
|
|
|
|
|
def scrape_page(url: str, timeout: int = 5): |
|
|
"""Scrape pages in order: requests β cloudscraper β Playwright.""" |
|
|
|
|
|
if _is_timeout_exceeded(): |
|
|
logger.warning(f"Scraping timeout exceeded, skipping {url}") |
|
|
return "" |
|
|
|
|
|
try: |
|
|
domain = urlparse(url).netloc.lower() |
|
|
|
|
|
|
|
|
if any(bd in domain for bd in BLACKLIST_DOMAINS): |
|
|
logger.info(f"Skipping blacklisted domain: {domain}") |
|
|
return "" |
|
|
|
|
|
|
|
|
headers = { |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", |
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", |
|
|
"Accept-Language": "en-US,en;q=0.5", |
|
|
} |
|
|
try: |
|
|
logger.debug(f"requests: GET {url}") |
|
|
r = _SESSION.get(url, headers=headers, timeout=timeout, allow_redirects=True) |
|
|
r.raise_for_status() |
|
|
soup = BeautifulSoup(r.text, "html.parser") |
|
|
text = _clean_soup(soup, prefer_main=True, max_chars=20000) |
|
|
if text and len(text) > MIN_TEXT_LENGTH: |
|
|
logger.info(f" β
Scraped {len(text)} chars (requests) for {url}") |
|
|
return text |
|
|
except Exception as e: |
|
|
logger.debug(f"requests failed for {url}: {e}") |
|
|
|
|
|
|
|
|
if _is_timeout_exceeded(): |
|
|
logger.warning(f"Scraping timeout exceeded, stopping fallback methods for {url}") |
|
|
return "" |
|
|
|
|
|
|
|
|
if CLOUDSCRAPER_AVAILABLE: |
|
|
try: |
|
|
logger.debug(f"cloudscraper: GET {url}") |
|
|
scraper = cloudscraper.create_scraper() |
|
|
r = scraper.get(url, timeout=timeout) |
|
|
r.raise_for_status() |
|
|
soup = BeautifulSoup(r.text, "html.parser") |
|
|
text = _clean_soup(soup, prefer_main=True, max_chars=20000) |
|
|
if text and len(text) > MIN_TEXT_LENGTH: |
|
|
logger.info(f" β
Scraped {len(text)} chars (cloudscraper) for {url}") |
|
|
return text |
|
|
except Exception as e: |
|
|
logger.debug(f"cloudscraper failed for {url}: {e}") |
|
|
|
|
|
|
|
|
if _is_timeout_exceeded(): |
|
|
logger.warning(f"Scraping timeout exceeded, skipping Playwright for {url}") |
|
|
return "" |
|
|
|
|
|
|
|
|
try: |
|
|
logger.debug(f"Playwright: GET {url}") |
|
|
res = scrape_with_playwright(url, timeout=12) |
|
|
if res and len(res) > MIN_TEXT_LENGTH: |
|
|
return res |
|
|
except Exception as e: |
|
|
logger.debug(f"Playwright failed for {url}: {e}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"All scrapers failed for {url}: {e}") |
|
|
|
|
|
return "" |
|
|
|
|
|
|
|
|
def fetch_sources(query: str, num_results: int = 10): |
|
|
"""Fetch sources for a single query""" |
|
|
logger.info(f"Fetching sources for query: '{query[:60]}'") |
|
|
items = google_search(query, num_results=num_results) |
|
|
if not items: |
|
|
logger.warning("No URLs returned from Google Search") |
|
|
return [] |
|
|
|
|
|
urls = [it["link"] for it in items if it.get("link")] |
|
|
snippets = {it["link"]: it.get("snippet", "") for it in items if it.get("link")} |
|
|
domain_last_hit: Dict[str, float] = {} |
|
|
|
|
|
def _scrape_task(u: str) -> Dict: |
|
|
try: |
|
|
dom = urlparse(u).netloc |
|
|
last = domain_last_hit.get(dom, 0.0) |
|
|
now = time.time() |
|
|
delta = now - last |
|
|
if delta < POLITENESS_DELAY: |
|
|
time.sleep(POLITENESS_DELAY - delta) |
|
|
domain_last_hit[dom] = time.time() |
|
|
logger.info(f"Scraping URL: {u}") |
|
|
|
|
|
text = scrape_page(u, timeout=REQUEST_TIMEOUT) |
|
|
if text: |
|
|
return {"url": u, "content": text, "method": "scraped", "len": len(text), "hash": _text_hash(text[:4000])} |
|
|
else: |
|
|
return {"url": u, "content": "", "method": "failed", "len": 0, "hash": ""} |
|
|
except Exception as e: |
|
|
logger.debug(f"Exception in _scrape_task for {u}: {e}") |
|
|
return {"url": u, "content": "", "method": "error", "len": 0, "hash": ""} |
|
|
|
|
|
results = [] |
|
|
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex: |
|
|
future_to_url = {ex.submit(_scrape_task, u): u for u in urls} |
|
|
for fut in as_completed(future_to_url): |
|
|
u = future_to_url[fut] |
|
|
try: |
|
|
r = fut.result(timeout=PER_URL_TIMEOUT) |
|
|
if r.get("content"): |
|
|
results.append(r) |
|
|
except FuturesTimeoutError: |
|
|
logger.warning(f"Timeout scraping {u}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Error scraping {u}: {e}") |
|
|
|
|
|
if not results: |
|
|
logger.info("No pages scraped; using snippets") |
|
|
for u in urls: |
|
|
snip = snippets.get(u, "") |
|
|
if snip: |
|
|
results.append({"url": u, "content": snip, "method": "google_snippet", "len": len(snip), "hash": _text_hash(snip)}) |
|
|
return results |
|
|
|
|
|
|
|
|
seen: Dict[str, Dict] = {} |
|
|
for r in results: |
|
|
h = r.get("hash") or _text_hash(r.get("content", "")) |
|
|
cur = seen.get(h) |
|
|
if not cur or (r.get("len", 0) > cur.get("len", 0)): |
|
|
seen[h] = r |
|
|
|
|
|
deduped = list(seen.values()) |
|
|
|
|
|
|
|
|
if len(deduped) < num_results: |
|
|
for u in urls: |
|
|
if any(x["url"] == u for x in deduped): |
|
|
continue |
|
|
snip = snippets.get(u, "") |
|
|
if snip: |
|
|
deduped.append({"url": u, "content": snip, "method": "google_snippet", "len": len(snip), "hash": _text_hash(snip)}) |
|
|
if len(deduped) >= num_results: |
|
|
break |
|
|
|
|
|
logger.info(f"Fetched {len(deduped)} sources for query") |
|
|
return deduped |
|
|
|
|
|
|
|
|
def fetch_sources_multi_query(query: str, num_results: int = 10) -> List[Dict[str, str]]: |
|
|
""" |
|
|
Accept a single pre-generated query and fetch sources. |
|
|
NO internal query generation - just scrape this query. |
|
|
""" |
|
|
_set_scraping_deadline() |
|
|
|
|
|
logger.info(f"Processing single query with overall 3-minute timeout") |
|
|
|
|
|
all_sources: Dict[str, Dict] = {} |
|
|
DOC_EXTENSIONS = [".pdf", ".doc", ".docx", ".odf", ".xls", ".xlsx", ".ppt", ".pptx"] |
|
|
lock = threading.Lock() |
|
|
|
|
|
def _process_url(u: str, items: List) -> Optional[Dict]: |
|
|
"""Process single URL with timeout check""" |
|
|
|
|
|
if _is_timeout_exceeded(): |
|
|
return None |
|
|
|
|
|
if u in all_sources: |
|
|
return None |
|
|
if any(ext in u.lower() for ext in DOC_EXTENSIONS): |
|
|
logger.info(f"Skipping document URL: {u}") |
|
|
return None |
|
|
|
|
|
logger.info(f"Scraping URL: {u}") |
|
|
|
|
|
try: |
|
|
|
|
|
text_content = scrape_page(u, timeout=REQUEST_TIMEOUT) |
|
|
if text_content and len(text_content) > MIN_TEXT_LENGTH: |
|
|
logger.info(f" β
Scraped {len(text_content)} chars for {u}") |
|
|
return {"url": u, "content": text_content, "source_url": u} |
|
|
|
|
|
|
|
|
snippet = next((it.get("snippet", "") for it in items if it.get("link") == u), "") |
|
|
if snippet and len(snippet) > 50: |
|
|
logger.info(f" β οΈ Using snippet ({len(snippet)} chars) for {u}") |
|
|
return {"url": u, "content": snippet, "source_url": u} |
|
|
|
|
|
logger.warning(f" β No content extracted for {u}") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Error scraping {u}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
if _is_timeout_exceeded(): |
|
|
logger.warning(f"Timeout exceeded, skipping query") |
|
|
return [] |
|
|
|
|
|
logger.info(f"Query: '{query[:60]}'") |
|
|
items = google_search(query, num_results=num_results) |
|
|
if not items: |
|
|
return [] |
|
|
|
|
|
urls = [it["link"] for it in items if it.get("link")] |
|
|
|
|
|
|
|
|
with ThreadPoolExecutor(max_workers=4) as url_ex: |
|
|
url_futures = {url_ex.submit(_process_url, u, items): u for u in urls} |
|
|
for fut in as_completed(url_futures): |
|
|
|
|
|
if _is_timeout_exceeded(): |
|
|
logger.warning(f"Timeout exceeded, stopping URL processing") |
|
|
for f in url_futures: |
|
|
f.cancel() |
|
|
break |
|
|
|
|
|
try: |
|
|
result = fut.result(timeout=PER_URL_TIMEOUT) |
|
|
if result: |
|
|
with lock: |
|
|
all_sources[result['url']] = result |
|
|
logger.info(f" β
Added: {result['url'][:50]}") |
|
|
except FuturesTimeoutError: |
|
|
logger.warning(f"Timeout on URL") |
|
|
except Exception as e: |
|
|
logger.debug(f"Error: {e}") |
|
|
time.sleep(0.05) |
|
|
|
|
|
res = list(all_sources.values()) |
|
|
elapsed = time.time() - _scraping_start_time if _scraping_start_time else 0 |
|
|
logger.info(f"Sources for this query: {len(res)} (elapsed: {elapsed:.1f}s)") |
|
|
return res |
|
|
|
|
|
|
|
|
def fetch_brave_sources(query: str, num_results: int = 5) -> list[dict]: |
|
|
"""Fetch from Brave Search API.""" |
|
|
headers = { |
|
|
"Accept": "application/json", |
|
|
"X-API-KEY": BRAVE_API_KEY |
|
|
} |
|
|
params = {"q": query, "count": num_results} |
|
|
|
|
|
try: |
|
|
resp = requests.get(BRAVE_ENDPOINT, headers=headers, params=params, timeout=6) |
|
|
resp.raise_for_status() |
|
|
data = resp.json() |
|
|
results = [] |
|
|
for item in data.get("webPages", []): |
|
|
results.append({ |
|
|
"title": item.get("url") or "Unknown", |
|
|
"content": item.get("snippet") or "", |
|
|
}) |
|
|
return results |
|
|
except Exception as e: |
|
|
logging.warning(f"Brave search failed for query '{query}': {e}") |
|
|
return [] |
|
|
|
|
|
def prepare_brave_query(text: str, min_len: int = 20, max_len: int = 200) -> str: |
|
|
t = re.sub(r"\s+", " ", text).strip() |
|
|
if len(t) < min_len: |
|
|
return None |
|
|
if len(t) > max_len: |
|
|
t = t[:max_len].rsplit(" ", 1)[0] |
|
|
return t |