| |
| import os |
| import re |
| import json |
| import time |
| import hashlib |
| import requests |
| from pathlib import Path |
|
|
| from smolagents import tool |
| from PyPDF2 import PdfReader |
| from ddgs import DDGS |
|
|
|
|
| |
| |
| |
| _CACHE_PATH = Path(".page_cache.json") |
|
|
| def _load_cache() -> dict: |
| if _CACHE_PATH.exists(): |
| try: |
| return json.loads(_CACHE_PATH.read_text()) |
| except Exception: |
| return {} |
| return {} |
|
|
| def _save_cache(cache: dict) -> None: |
| try: |
| _CACHE_PATH.write_text(json.dumps(cache, indent=2)) |
| except Exception: |
| pass |
|
|
|
|
| |
| |
| |
|
|
| def classify_question(question: str) -> str: |
| """ |
| Route to one of: 'reasoning', 'youtube', 'image', 'wikipedia_log', 'web'. |
| Checked in order; first match wins. |
| """ |
| q = question.lower() |
|
|
| reasoning_patterns = [ |
| r"\btable\b.*\bset\b.*\{", |
| r"\boperation\b.*\bset\b", |
| r"grocery list", |
| r"\bbotany\b", |
| r"categoriz", |
| r"\balphabetiz", |
| r"\bcommutativ", |
| r"\bassociativ", |
| r"making a pie", |
| r"shopping list.*(?:recipe|ingredient|pie)", |
| r"recipe.*ingredient", |
| r"\bconvert\b.*\bunits?\b", |
| r"\bcalculat", |
| ] |
| for pat in reasoning_patterns: |
| if re.search(pat, q): |
| return "reasoning" |
|
|
| if "youtube.com/watch" in q or "youtu.be/" in q: |
| return "youtube" |
|
|
| if re.search( |
| r"\bimage\b|\bchess\b|\bboard\b.*\bposition\b|\bpicture\b|\bphoto\b|\bscreenshot\b", |
| q |
| ): |
| return "image" |
|
|
| if re.search( |
| r"featured article.*wikipedia.*nominated|nominated.*featured article.*wikipedia" |
| r"|featured log|featured article.*promoted.*\d{4}|promoted.*featured article.*\d{4}", |
| q |
| ): |
| return "wikipedia_log" |
|
|
| return "web" |
|
|
|
|
| def build_search_query(question: str) -> str: |
| """ |
| Turn a verbose GAIA question into a tight 4-8 word DDG query. |
| Always appends 'wikipedia' to surface the right article first. |
| """ |
| q = question.strip() |
| |
| q = re.sub(r"\(.*?\)", "", q).strip() |
|
|
| |
| q = re.sub( |
| r"^(how many|what (is|was|are|were)|who (is|was)|when (did|was|is)|" |
| r"which|where (is|was)|why|tell me|find|give me|list)\s+", |
| "", q, flags=re.I, |
| ) |
|
|
| |
| fillers = [ |
| r"studio albums? (?:were )?published by\s*", |
| r"albums? (?:were )?released by\s*", |
| r"were published by\s*", |
| r"was born in\s*", |
| r"between \d{4} and \d{4}[^.]*", |
| r"you can use [^.]*", |
| r"the latest \d{4} version[^.]*", |
| r"surname of (?:the)?\s*", |
| r"(?:licensed|compiled) by .*", |
| r"from the chemistry materials?.*", |
| r"in \d+\.\w+ exercises?.*", |
| r"under the ck-12 .*", |
| r"libretexts.*", |
| r"mentioned in .*(?:exercises?|materials?)", |
| ] |
| for filler in fillers: |
| q = re.sub(filler, " ", q, flags=re.I).strip() |
|
|
| q = re.sub(r"\s+", " ", q).strip().rstrip("?.,;:") |
|
|
| |
| words = q.split() |
| if len(words) > 8: |
| q = " ".join(words[:8]) |
|
|
| if "wikipedia" not in q.lower(): |
| q += " wikipedia" |
| return q |
|
|
|
|
| def extract_best_url(search_output: str, question: str = "") -> str | None: |
| """ |
| Score URLs by keyword overlap with the question. |
| Avoids known useless domains; returns None if nothing looks good. |
| """ |
| BAD_DOMAINS = { |
| "youtube.com", "reddit.com", "facebook.com", "chegg.com", |
| "studyx.ai", "lespac.com", "fandom.com", "quora.com", |
| "answers.com", "yahoo.com", |
| } |
|
|
| blocks = re.split(r"\n\n+", search_output) |
| candidates: list[tuple[str, str]] = [] |
| for block in blocks: |
| urls = re.findall(r"https?://[^\s'\"<>)\]]+", block) |
| for url in urls: |
| url = url.rstrip(".,;:)\"'") |
| candidates.append((url, block.lower())) |
|
|
| if not candidates: |
| return None |
|
|
| stop = { |
| "how","many","what","was","is","are","were","the","a","an","of","in", |
| "by","to","and","or","you","can","use","between","included","latest", |
| "version","english","wikipedia","published","released","studio","albums", |
| "surname","mentioned","exercises","chemistry","licensed","compiled", |
| "materials","introductory","ck12","libretexts", |
| } |
| keywords = [ |
| w.lower() for w in re.findall(r"[A-Za-z]{3,}", question) |
| if w.lower() not in stop |
| ] |
|
|
| def score(url: str, ctx: str) -> int: |
| s = 0 |
| ul = url.lower() |
| if "wikipedia.org/wiki/" in ul: |
| s += 3 |
| if "disambiguation" in ul or "disambiguation" in ctx: |
| s -= 2 |
| for bad in BAD_DOMAINS: |
| if bad in ul: |
| s -= 5 |
| for kw in keywords: |
| if kw in ul: |
| s += 2 |
| elif kw in ctx: |
| s += 1 |
| return s |
|
|
| ranked = sorted(candidates, key=lambda x: score(x[0], x[1]), reverse=True) |
| best_url, best_ctx = ranked[0] |
| if score(best_url, best_ctx) < 0: |
| return None |
| return best_url |
|
|
|
|
| def _extract_youtube_id(text: str) -> str | None: |
| m = re.search(r"(?:v=|youtu\.be/)([A-Za-z0-9_-]{11})", text) |
| return m.group(1) if m else None |
|
|
|
|
| |
| |
| |
| @tool |
| def web_search(query: str) -> str: |
| """Search the web. Pass a SHORT query (4-8 words), never the full question. |
| |
| Args: |
| query: Short search query, e.g. 'Mercedes Sosa discography wikipedia' |
| """ |
| try: |
| with DDGS() as ddgs: |
| results = list(ddgs.text(query, region="wt-wt", safesearch="off", max_results=10)) |
| if not results: |
| return "No results found." |
| return "\n\n".join( |
| f"TITLE: {r.get('title','')}\nSNIPPET: {r.get('body','')}\nURL: {r.get('href','')}" |
| for r in results |
| ) |
| except Exception as e: |
| return f"Search error: {e}" |
|
|
|
|
| |
| |
| |
| def _fetch_page(url: str, retries: int = 3) -> str: |
| cache = _load_cache() |
| key = hashlib.md5(url.encode()).hexdigest() |
| if key in cache: |
| print(f"[visit_webpage] cache hit: {url}", flush=True) |
| return cache[key] |
|
|
| for attempt in range(retries): |
| try: |
| resp = requests.get(url, timeout=12, headers={"User-Agent": "Mozilla/5.0"}) |
| resp.raise_for_status() |
| text = re.sub(r"<style[^>]*>.*?</style>", " ", resp.text, flags=re.S) |
| text = re.sub(r"<script[^>]*>.*?</script>", " ", text, flags=re.S) |
| text = re.sub(r"<[^>]+>", " ", text) |
| text = re.sub(r"\s+", " ", text).strip() |
| content = text[:8000] |
| cache[key] = content |
| _save_cache(cache) |
| return content |
| except requests.exceptions.Timeout: |
| wait = 2 ** attempt |
| print(f"[visit_webpage] timeout attempt {attempt+1}, retrying in {wait}s", flush=True) |
| time.sleep(wait) |
| except requests.exceptions.HTTPError as e: |
| return f"HTTP {e.response.status_code} error fetching {url}" |
| except Exception as e: |
| return f"Error fetching page: {e}" |
| return f"Error: could not fetch {url} after {retries} attempts." |
|
|
|
|
| @tool |
| def visit_webpage(url: str) -> str: |
| """Fetch the plain-text content of a webpage (disk-cached). |
| |
| Args: |
| url: Full URL including https:// |
| """ |
| return _fetch_page(url) |
|
|
|
|
| |
| |
| |
| @tool |
| def get_youtube_transcript(video_url: str) -> str: |
| """Fetch the auto-generated transcript of a YouTube video. |
| Use this for any question that asks about spoken dialogue or audio in a video. |
| |
| Args: |
| video_url: Full YouTube URL, e.g. 'https://www.youtube.com/watch?v=1htKBjuUWec' |
| """ |
| vid_id = _extract_youtube_id(video_url) |
| if not vid_id: |
| return f"Could not extract video ID from: {video_url}" |
|
|
| |
| try: |
| from youtube_transcript_api import YouTubeTranscriptApi |
| entries = YouTubeTranscriptApi.get_transcript(vid_id) |
| text = " ".join(e["text"] for e in entries) |
| return text[:8000] |
| except Exception: |
| pass |
|
|
| |
| try: |
| resp = requests.get( |
| f"https://www.youtube.com/watch?v={vid_id}", |
| headers={"User-Agent": "Mozilla/5.0"}, timeout=12 |
| ) |
| cap_match = re.search(r'"captionTracks":\[.*?"baseUrl":"(.*?)"', resp.text) |
| if cap_match: |
| cap_url = cap_match.group(1).replace("\\u0026", "&") |
| cap_resp = requests.get(cap_url, timeout=10) |
| text = re.sub(r"<[^>]+>", " ", cap_resp.text) |
| text = re.sub(r"\s+", " ", text).strip() |
| return text[:8000] |
| return "No captions found for this video." |
| except Exception as e: |
| return f"Transcript fetch failed: {e}" |
|
|
|
|
| |
| |
| |
| @tool |
| def read_pdf(filepath: str) -> str: |
| """Read and extract text from a local PDF file. |
| |
| Args: |
| filepath: Absolute path to the PDF file on disk. |
| """ |
| try: |
| if not os.path.exists(filepath): |
| return f"PDF error: file not found at {filepath}" |
| reader = PdfReader(filepath) |
| text = "".join(page.extract_text() or "" for page in reader.pages) |
| if not text.strip(): |
| return "PDF appears to be empty or image-only (no extractable text)." |
| return text[:15000] |
| except Exception as e: |
| return f"PDF error: {e}" |