| """Pluggable web search backends. |
| |
| All backends implement the same interface: |
| search(query: str) -> List[str] |
| returning a list of text chunks (typically "title: content"). |
| |
| Backends: |
| - WikipediaSearch — free, no key, encyclopedic content |
| - TavilySearch — AI-tuned web search (1000 q/month free, key required) |
| - BraveSearch — general web (2000 q/month free, key required) |
| - PlaywrightBingSearch — scrape Bing via headless Chromium (no key needed) |
| - CompositeSearch — fallback chain across multiple backends |
| |
| For production, you typically want Tavily or Brave as the primary backend |
| (broader, more recent than Wikipedia). Wikipedia is great as a fallback or |
| for queries where encyclopedic accuracy matters. |
| """ |
|
|
| from typing import List, Optional |
| import os |
| import re |
| import time |
| import json |
| import urllib.parse |
| import urllib.request |
|
|
|
|
| |
| |
| |
| class WikipediaSearch: |
| """Wikipedia full-text search; returns short summary chunks.""" |
|
|
| SEARCH_URL = "https://en.wikipedia.org/w/api.php" |
| SUMMARY_URL = "https://en.wikipedia.org/api/rest_v1/page/summary/{}" |
| UA = "sp-distill-runtime/1.0 (https://huggingface.co/baya1116/hypernet-sp-distill)" |
|
|
| def __init__(self, n_results: int = 3, timeout: float = 8.0, max_retries: int = 3): |
| self.n_results = n_results |
| self.timeout = timeout |
| self.max_retries = max_retries |
|
|
| def _http_get(self, url: str) -> dict: |
| last_err = None |
| for attempt in range(self.max_retries): |
| try: |
| req = urllib.request.Request(url, headers={"User-Agent": self.UA}) |
| with urllib.request.urlopen(req, timeout=self.timeout) as r: |
| return json.loads(r.read().decode("utf-8")) |
| except urllib.error.HTTPError as e: |
| last_err = e |
| if e.code == 429 and attempt < self.max_retries - 1: |
| time.sleep(2 ** attempt) |
| continue |
| raise |
| except Exception as e: |
| last_err = e |
| if attempt < self.max_retries - 1: |
| time.sleep(1) |
| continue |
| raise |
| raise last_err |
|
|
| def search(self, query: str) -> List[str]: |
| try: |
| params = { |
| "action": "query", "list": "search", |
| "srsearch": query, "srlimit": self.n_results, "format": "json", |
| } |
| url = f"{self.SEARCH_URL}?{urllib.parse.urlencode(params)}" |
| data = self._http_get(url) |
| titles = [h["title"] for h in data.get("query", {}).get("search", [])] |
| except Exception as e: |
| print(f" [WikipediaSearch] search failed: {e}") |
| return [] |
|
|
| chunks: List[str] = [] |
| for t in titles: |
| try: |
| safe = urllib.parse.quote(t.replace(" ", "_")) |
| d = self._http_get(self.SUMMARY_URL.format(safe)) |
| extract = d.get("extract", "") |
| if extract and len(extract) > 50: |
| chunks.append(f"{t}: {extract}") |
| except Exception: |
| continue |
| return chunks |
|
|
|
|
| |
| |
| |
| class TavilySearch: |
| """Tavily web search — designed for LLM RAG. |
| |
| Env: set TAVILY_API_KEY or pass api_key= explicitly. |
| Free tier: 1000 queries/month at https://tavily.com/. |
| """ |
|
|
| ENDPOINT = "https://api.tavily.com/search" |
|
|
| def __init__(self, api_key: Optional[str] = None, n_results: int = 3, |
| search_depth: str = "basic", timeout: float = 10.0): |
| self.api_key = api_key or os.environ.get("TAVILY_API_KEY") |
| if not self.api_key: |
| raise ValueError("Tavily API key required (env TAVILY_API_KEY or api_key=)") |
| self.n_results = n_results |
| self.search_depth = search_depth |
| self.timeout = timeout |
|
|
| def search(self, query: str) -> List[str]: |
| payload = { |
| "api_key": self.api_key, |
| "query": query, |
| "max_results": self.n_results, |
| "search_depth": self.search_depth, |
| "include_answer": False, |
| "include_raw_content": False, |
| } |
| req = urllib.request.Request( |
| self.ENDPOINT, |
| data=json.dumps(payload).encode("utf-8"), |
| headers={"Content-Type": "application/json"}, |
| method="POST", |
| ) |
| try: |
| with urllib.request.urlopen(req, timeout=self.timeout) as r: |
| data = json.loads(r.read().decode("utf-8")) |
| except Exception as e: |
| print(f" [TavilySearch] failed: {e}") |
| return [] |
| chunks: List[str] = [] |
| for r in data.get("results", []): |
| title = r.get("title", "").strip() |
| content = r.get("content", "").strip() |
| if content and len(content) > 50: |
| chunks.append(f"{title}: {content}" if title else content) |
| return chunks |
|
|
|
|
| |
| |
| |
| class BraveSearch: |
| """Brave Search API — general web; returns title + snippet. |
| |
| Env: set BRAVE_API_KEY or pass api_key= explicitly. |
| Free tier: 2000 queries/month at https://api.search.brave.com/. |
| |
| Returns snippet-level chunks. For full-content RAG, fetch+scrape the URLs |
| separately (not done here to keep the dependency surface small). |
| """ |
|
|
| ENDPOINT = "https://api.search.brave.com/res/v1/web/search" |
|
|
| def __init__(self, api_key: Optional[str] = None, n_results: int = 3, |
| timeout: float = 10.0): |
| self.api_key = api_key or os.environ.get("BRAVE_API_KEY") |
| if not self.api_key: |
| raise ValueError("Brave API key required (env BRAVE_API_KEY or api_key=)") |
| self.n_results = n_results |
| self.timeout = timeout |
|
|
| def search(self, query: str) -> List[str]: |
| params = {"q": query, "count": self.n_results} |
| url = f"{self.ENDPOINT}?{urllib.parse.urlencode(params)}" |
| req = urllib.request.Request( |
| url, |
| headers={ |
| "Accept": "application/json", |
| "X-Subscription-Token": self.api_key, |
| }, |
| ) |
| try: |
| with urllib.request.urlopen(req, timeout=self.timeout) as r: |
| data = json.loads(r.read().decode("utf-8")) |
| except Exception as e: |
| print(f" [BraveSearch] failed: {e}") |
| return [] |
| results = data.get("web", {}).get("results", []) |
| chunks: List[str] = [] |
| for r in results[: self.n_results]: |
| title = r.get("title", "").strip() |
| desc = r.get("description", "").strip() |
| if desc and len(desc) > 30: |
| chunks.append(f"{title}: {desc}" if title else desc) |
| return chunks |
|
|
|
|
| |
| |
| |
| |
| class PlaywrightBingSearch: |
| """Scrape Bing search results via headless Chromium. No API key. |
| |
| Setup once: |
| pip install playwright beautifulsoup4 lxml |
| playwright install chromium |
| |
| Browser is started once and kept alive for the lifetime of this object; |
| call `.close()` (or rely on __del__) when done. Each search opens a new |
| page, navigates, and parses results with bs4. |
| |
| Bing is currently the most scrape-friendly major engine (Google blocks |
| headless aggressively, DDG returns anti-bot challenges). |
| """ |
|
|
| USER_AGENT = ("Mozilla/5.0 (X11; Linux x86_64; rv:120.0) " |
| "Gecko/20100101 Firefox/120.0") |
|
|
| def __init__(self, n_results: int = 3, timeout_ms: int = 20000, |
| wait_ms: int = 1500, region: str = "us"): |
| try: |
| from playwright.sync_api import sync_playwright |
| except ImportError as e: |
| raise ImportError( |
| "Install playwright first:\n" |
| " pip install playwright beautifulsoup4 lxml\n" |
| " playwright install chromium" |
| ) from e |
| self.n_results = n_results |
| self.timeout_ms = timeout_ms |
| self.wait_ms = wait_ms |
| self.region = region |
| self._p = sync_playwright().start() |
| self._browser = self._p.chromium.launch( |
| headless=True, |
| args=["--ignore-certificate-errors"], |
| ) |
| self._ctx = self._browser.new_context( |
| ignore_https_errors=True, |
| user_agent=self.USER_AGENT, |
| ) |
|
|
| def close(self): |
| for fn in (self._ctx.close, self._browser.close, self._p.stop): |
| try: fn() |
| except Exception: pass |
|
|
| def __del__(self): |
| try: self.close() |
| except Exception: pass |
|
|
| def search(self, query: str) -> List[str]: |
| from bs4 import BeautifulSoup |
| page = self._ctx.new_page() |
| try: |
| url = ("https://www.bing.com/search?" |
| + urllib.parse.urlencode({"q": query, "cc": self.region})) |
| page.goto(url, wait_until="domcontentloaded", timeout=self.timeout_ms) |
| page.wait_for_timeout(self.wait_ms) |
| html = page.content() |
| except Exception as e: |
| print(f" [PlaywrightBingSearch] failed: {e}") |
| return [] |
| finally: |
| try: page.close() |
| except Exception: pass |
|
|
| soup = BeautifulSoup(html, "lxml") |
| chunks: List[str] = [] |
| for li in soup.select("li.b_algo")[: self.n_results]: |
| t = li.select_one("h2") |
| cap = li.select_one(".b_caption") or li.select_one("p") |
| if not t or not cap: |
| continue |
| title = t.get_text(strip=True) |
| snippet = cap.get_text(" ", strip=True) |
| if len(snippet) > 30: |
| chunks.append(f"{title}: {snippet}") |
| return chunks |
|
|
|
|
| |
| |
| |
| class CompositeSearch: |
| """Try multiple backends in order; return chunks from first that succeeds.""" |
|
|
| def __init__(self, backends: list, mode: str = "fallback"): |
| """ |
| mode='fallback': return first backend's results if non-empty |
| mode='merge' : run all, concatenate (let BGE rerank pick best) |
| """ |
| assert mode in ("fallback", "merge") |
| self.backends = backends |
| self.mode = mode |
|
|
| def search(self, query: str) -> List[str]: |
| if self.mode == "fallback": |
| for b in self.backends: |
| try: |
| chunks = b.search(query) |
| if chunks: |
| return chunks |
| except Exception as e: |
| print(f" [Composite] {type(b).__name__} threw: {e}") |
| continue |
| return [] |
| |
| all_chunks: List[str] = [] |
| for b in self.backends: |
| try: |
| all_chunks.extend(b.search(query)) |
| except Exception as e: |
| print(f" [Composite] {type(b).__name__} threw: {e}") |
| continue |
| |
| seen = set() |
| out = [] |
| for c in all_chunks: |
| key = c[:200] |
| if key not in seen: |
| seen.add(key) |
| out.append(c) |
| return out |
|
|