# tools.py import os import re import json import time import hashlib import requests from pathlib import Path from smolagents import tool from PyPDF2 import PdfReader from ddgs import DDGS # ────────────────────────────────────────────────────────────────────────────── # Disk cache # ────────────────────────────────────────────────────────────────────────────── _CACHE_PATH = Path(".page_cache.json") def _load_cache() -> dict: if _CACHE_PATH.exists(): try: return json.loads(_CACHE_PATH.read_text()) except Exception: return {} return {} def _save_cache(cache: dict) -> None: try: _CACHE_PATH.write_text(json.dumps(cache, indent=2)) except Exception: pass # ────────────────────────────────────────────────────────────────────────────── # Helpers used by agent.py (no LLM, no @tool) # ────────────────────────────────────────────────────────────────────────────── def classify_question(question: str) -> str: """ Route to one of: 'reasoning', 'youtube', 'image', 'wikipedia_log', 'web'. Checked in order; first match wins. """ q = question.lower() reasoning_patterns = [ r"\btable\b.*\bset\b.*\{", r"\boperation\b.*\bset\b", r"grocery list", r"\bbotany\b", r"categoriz", r"\balphabetiz", r"\bcommutativ", r"\bassociativ", r"making a pie", r"shopping list.*(?:recipe|ingredient|pie)", r"recipe.*ingredient", r"\bconvert\b.*\bunits?\b", r"\bcalculat", ] for pat in reasoning_patterns: if re.search(pat, q): return "reasoning" if "youtube.com/watch" in q or "youtu.be/" in q: return "youtube" if re.search( r"\bimage\b|\bchess\b|\bboard\b.*\bposition\b|\bpicture\b|\bphoto\b|\bscreenshot\b", q ): return "image" if re.search( r"featured article.*wikipedia.*nominated|nominated.*featured article.*wikipedia" r"|featured log|featured article.*promoted.*\d{4}|promoted.*featured article.*\d{4}", q ): return "wikipedia_log" return "web" def build_search_query(question: str) -> str: """ Turn a verbose GAIA question into a tight 4-8 word DDG query. Always appends 'wikipedia' to surface the right article first. """ q = question.strip() # Remove parenthetical hints q = re.sub(r"\(.*?\)", "", q).strip() # Drop question-word starters q = re.sub( r"^(how many|what (is|was|are|were)|who (is|was)|when (did|was|is)|" r"which|where (is|was)|why|tell me|find|give me|list)\s+", "", q, flags=re.I, ) # Drop known filler — order matters (longer patterns first) fillers = [ r"studio albums? (?:were )?published by\s*", r"albums? (?:were )?released by\s*", r"were published by\s*", r"was born in\s*", r"between \d{4} and \d{4}[^.]*", r"you can use [^.]*", r"the latest \d{4} version[^.]*", r"surname of (?:the)?\s*", r"(?:licensed|compiled) by .*", # drop "licensed by X …" r"from the chemistry materials?.*", r"in \d+\.\w+ exercises?.*", r"under the ck-12 .*", r"libretexts.*", r"mentioned in .*(?:exercises?|materials?)", ] for filler in fillers: q = re.sub(filler, " ", q, flags=re.I).strip() q = re.sub(r"\s+", " ", q).strip().rstrip("?.,;:") # Cap at 8 words so DDG returns precise results words = q.split() if len(words) > 8: q = " ".join(words[:8]) if "wikipedia" not in q.lower(): q += " wikipedia" return q def extract_best_url(search_output: str, question: str = "") -> str | None: """ Score URLs by keyword overlap with the question. Avoids known useless domains; returns None if nothing looks good. """ BAD_DOMAINS = { "youtube.com", "reddit.com", "facebook.com", "chegg.com", "studyx.ai", "lespac.com", "fandom.com", "quora.com", "answers.com", "yahoo.com", } blocks = re.split(r"\n\n+", search_output) candidates: list[tuple[str, str]] = [] for block in blocks: urls = re.findall(r"https?://[^\s'\"<>)\]]+", block) for url in urls: url = url.rstrip(".,;:)\"'") candidates.append((url, block.lower())) if not candidates: return None stop = { "how","many","what","was","is","are","were","the","a","an","of","in", "by","to","and","or","you","can","use","between","included","latest", "version","english","wikipedia","published","released","studio","albums", "surname","mentioned","exercises","chemistry","licensed","compiled", "materials","introductory","ck12","libretexts", } keywords = [ w.lower() for w in re.findall(r"[A-Za-z]{3,}", question) if w.lower() not in stop ] def score(url: str, ctx: str) -> int: s = 0 ul = url.lower() if "wikipedia.org/wiki/" in ul: s += 3 if "disambiguation" in ul or "disambiguation" in ctx: s -= 2 for bad in BAD_DOMAINS: if bad in ul: s -= 5 for kw in keywords: if kw in ul: s += 2 elif kw in ctx: s += 1 return s ranked = sorted(candidates, key=lambda x: score(x[0], x[1]), reverse=True) best_url, best_ctx = ranked[0] if score(best_url, best_ctx) < 0: return None return best_url def _extract_youtube_id(text: str) -> str | None: m = re.search(r"(?:v=|youtu\.be/)([A-Za-z0-9_-]{11})", text) return m.group(1) if m else None # ────────────────────────────────────────────────────────────────────────────── # Tool: web_search # ────────────────────────────────────────────────────────────────────────────── @tool def web_search(query: str) -> str: """Search the web. Pass a SHORT query (4-8 words), never the full question. Args: query: Short search query, e.g. 'Mercedes Sosa discography wikipedia' """ try: with DDGS() as ddgs: results = list(ddgs.text(query, region="wt-wt", safesearch="off", max_results=10)) if not results: return "No results found." return "\n\n".join( f"TITLE: {r.get('title','')}\nSNIPPET: {r.get('body','')}\nURL: {r.get('href','')}" for r in results ) except Exception as e: return f"Search error: {e}" # ────────────────────────────────────────────────────────────────────────────── # Tool: visit_webpage # ────────────────────────────────────────────────────────────────────────────── def _fetch_page(url: str, retries: int = 3) -> str: cache = _load_cache() key = hashlib.md5(url.encode()).hexdigest() if key in cache: print(f"[visit_webpage] cache hit: {url}", flush=True) return cache[key] for attempt in range(retries): try: resp = requests.get(url, timeout=12, headers={"User-Agent": "Mozilla/5.0"}) resp.raise_for_status() text = re.sub(r"]*>.*?", " ", resp.text, flags=re.S) text = re.sub(r"]*>.*?", " ", text, flags=re.S) text = re.sub(r"<[^>]+>", " ", text) text = re.sub(r"\s+", " ", text).strip() content = text[:8000] cache[key] = content _save_cache(cache) return content except requests.exceptions.Timeout: wait = 2 ** attempt print(f"[visit_webpage] timeout attempt {attempt+1}, retrying in {wait}s", flush=True) time.sleep(wait) except requests.exceptions.HTTPError as e: return f"HTTP {e.response.status_code} error fetching {url}" except Exception as e: return f"Error fetching page: {e}" return f"Error: could not fetch {url} after {retries} attempts." @tool def visit_webpage(url: str) -> str: """Fetch the plain-text content of a webpage (disk-cached). Args: url: Full URL including https:// """ return _fetch_page(url) # ────────────────────────────────────────────────────────────────────────────── # Tool: get_youtube_transcript # ────────────────────────────────────────────────────────────────────────────── @tool def get_youtube_transcript(video_url: str) -> str: """Fetch the auto-generated transcript of a YouTube video. Use this for any question that asks about spoken dialogue or audio in a video. Args: video_url: Full YouTube URL, e.g. 'https://www.youtube.com/watch?v=1htKBjuUWec' """ vid_id = _extract_youtube_id(video_url) if not vid_id: return f"Could not extract video ID from: {video_url}" # Primary: youtube-transcript-api (pip install youtube-transcript-api) try: from youtube_transcript_api import YouTubeTranscriptApi entries = YouTubeTranscriptApi.get_transcript(vid_id) text = " ".join(e["text"] for e in entries) return text[:8000] except Exception: pass # Fallback: scrape caption track from page source try: resp = requests.get( f"https://www.youtube.com/watch?v={vid_id}", headers={"User-Agent": "Mozilla/5.0"}, timeout=12 ) cap_match = re.search(r'"captionTracks":\[.*?"baseUrl":"(.*?)"', resp.text) if cap_match: cap_url = cap_match.group(1).replace("\\u0026", "&") cap_resp = requests.get(cap_url, timeout=10) text = re.sub(r"<[^>]+>", " ", cap_resp.text) text = re.sub(r"\s+", " ", text).strip() return text[:8000] return "No captions found for this video." except Exception as e: return f"Transcript fetch failed: {e}" # ────────────────────────────────────────────────────────────────────────────── # Tool: read_pdf # ────────────────────────────────────────────────────────────────────────────── @tool def read_pdf(filepath: str) -> str: """Read and extract text from a local PDF file. Args: filepath: Absolute path to the PDF file on disk. """ try: if not os.path.exists(filepath): return f"PDF error: file not found at {filepath}" reader = PdfReader(filepath) text = "".join(page.extract_text() or "" for page in reader.pages) if not text.strip(): return "PDF appears to be empty or image-only (no extractable text)." return text[:15000] except Exception as e: return f"PDF error: {e}"