harini-012's picture
Update tools.py
122389a verified
Raw
History Blame Contribute Delete
12.5 kB
# tools.py
import os
import re
import json
import time
import hashlib
import requests
from pathlib import Path
from smolagents import tool
from PyPDF2 import PdfReader
from ddgs import DDGS
# ──────────────────────────────────────────────────────────────────────────────
# Disk cache
# ──────────────────────────────────────────────────────────────────────────────
_CACHE_PATH = Path(".page_cache.json")
def _load_cache() -> dict:
if _CACHE_PATH.exists():
try:
return json.loads(_CACHE_PATH.read_text())
except Exception:
return {}
return {}
def _save_cache(cache: dict) -> None:
try:
_CACHE_PATH.write_text(json.dumps(cache, indent=2))
except Exception:
pass
# ──────────────────────────────────────────────────────────────────────────────
# Helpers used by agent.py (no LLM, no @tool)
# ──────────────────────────────────────────────────────────────────────────────
def classify_question(question: str) -> str:
"""
Route to one of: 'reasoning', 'youtube', 'image', 'wikipedia_log', 'web'.
Checked in order; first match wins.
"""
q = question.lower()
reasoning_patterns = [
r"\btable\b.*\bset\b.*\{",
r"\boperation\b.*\bset\b",
r"grocery list",
r"\bbotany\b",
r"categoriz",
r"\balphabetiz",
r"\bcommutativ",
r"\bassociativ",
r"making a pie",
r"shopping list.*(?:recipe|ingredient|pie)",
r"recipe.*ingredient",
r"\bconvert\b.*\bunits?\b",
r"\bcalculat",
]
for pat in reasoning_patterns:
if re.search(pat, q):
return "reasoning"
if "youtube.com/watch" in q or "youtu.be/" in q:
return "youtube"
if re.search(
r"\bimage\b|\bchess\b|\bboard\b.*\bposition\b|\bpicture\b|\bphoto\b|\bscreenshot\b",
q
):
return "image"
if re.search(
r"featured article.*wikipedia.*nominated|nominated.*featured article.*wikipedia"
r"|featured log|featured article.*promoted.*\d{4}|promoted.*featured article.*\d{4}",
q
):
return "wikipedia_log"
return "web"
def build_search_query(question: str) -> str:
"""
Turn a verbose GAIA question into a tight 4-8 word DDG query.
Always appends 'wikipedia' to surface the right article first.
"""
q = question.strip()
# Remove parenthetical hints
q = re.sub(r"\(.*?\)", "", q).strip()
# Drop question-word starters
q = re.sub(
r"^(how many|what (is|was|are|were)|who (is|was)|when (did|was|is)|"
r"which|where (is|was)|why|tell me|find|give me|list)\s+",
"", q, flags=re.I,
)
# Drop known filler β€” order matters (longer patterns first)
fillers = [
r"studio albums? (?:were )?published by\s*",
r"albums? (?:were )?released by\s*",
r"were published by\s*",
r"was born in\s*",
r"between \d{4} and \d{4}[^.]*",
r"you can use [^.]*",
r"the latest \d{4} version[^.]*",
r"surname of (?:the)?\s*",
r"(?:licensed|compiled) by .*", # drop "licensed by X …"
r"from the chemistry materials?.*",
r"in \d+\.\w+ exercises?.*",
r"under the ck-12 .*",
r"libretexts.*",
r"mentioned in .*(?:exercises?|materials?)",
]
for filler in fillers:
q = re.sub(filler, " ", q, flags=re.I).strip()
q = re.sub(r"\s+", " ", q).strip().rstrip("?.,;:")
# Cap at 8 words so DDG returns precise results
words = q.split()
if len(words) > 8:
q = " ".join(words[:8])
if "wikipedia" not in q.lower():
q += " wikipedia"
return q
def extract_best_url(search_output: str, question: str = "") -> str | None:
"""
Score URLs by keyword overlap with the question.
Avoids known useless domains; returns None if nothing looks good.
"""
BAD_DOMAINS = {
"youtube.com", "reddit.com", "facebook.com", "chegg.com",
"studyx.ai", "lespac.com", "fandom.com", "quora.com",
"answers.com", "yahoo.com",
}
blocks = re.split(r"\n\n+", search_output)
candidates: list[tuple[str, str]] = []
for block in blocks:
urls = re.findall(r"https?://[^\s'\"<>)\]]+", block)
for url in urls:
url = url.rstrip(".,;:)\"'")
candidates.append((url, block.lower()))
if not candidates:
return None
stop = {
"how","many","what","was","is","are","were","the","a","an","of","in",
"by","to","and","or","you","can","use","between","included","latest",
"version","english","wikipedia","published","released","studio","albums",
"surname","mentioned","exercises","chemistry","licensed","compiled",
"materials","introductory","ck12","libretexts",
}
keywords = [
w.lower() for w in re.findall(r"[A-Za-z]{3,}", question)
if w.lower() not in stop
]
def score(url: str, ctx: str) -> int:
s = 0
ul = url.lower()
if "wikipedia.org/wiki/" in ul:
s += 3
if "disambiguation" in ul or "disambiguation" in ctx:
s -= 2
for bad in BAD_DOMAINS:
if bad in ul:
s -= 5
for kw in keywords:
if kw in ul:
s += 2
elif kw in ctx:
s += 1
return s
ranked = sorted(candidates, key=lambda x: score(x[0], x[1]), reverse=True)
best_url, best_ctx = ranked[0]
if score(best_url, best_ctx) < 0:
return None
return best_url
def _extract_youtube_id(text: str) -> str | None:
m = re.search(r"(?:v=|youtu\.be/)([A-Za-z0-9_-]{11})", text)
return m.group(1) if m else None
# ──────────────────────────────────────────────────────────────────────────────
# Tool: web_search
# ──────────────────────────────────────────────────────────────────────────────
@tool
def web_search(query: str) -> str:
"""Search the web. Pass a SHORT query (4-8 words), never the full question.
Args:
query: Short search query, e.g. 'Mercedes Sosa discography wikipedia'
"""
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, region="wt-wt", safesearch="off", max_results=10))
if not results:
return "No results found."
return "\n\n".join(
f"TITLE: {r.get('title','')}\nSNIPPET: {r.get('body','')}\nURL: {r.get('href','')}"
for r in results
)
except Exception as e:
return f"Search error: {e}"
# ──────────────────────────────────────────────────────────────────────────────
# Tool: visit_webpage
# ──────────────────────────────────────────────────────────────────────────────
def _fetch_page(url: str, retries: int = 3) -> str:
cache = _load_cache()
key = hashlib.md5(url.encode()).hexdigest()
if key in cache:
print(f"[visit_webpage] cache hit: {url}", flush=True)
return cache[key]
for attempt in range(retries):
try:
resp = requests.get(url, timeout=12, headers={"User-Agent": "Mozilla/5.0"})
resp.raise_for_status()
text = re.sub(r"<style[^>]*>.*?</style>", " ", resp.text, flags=re.S)
text = re.sub(r"<script[^>]*>.*?</script>", " ", text, flags=re.S)
text = re.sub(r"<[^>]+>", " ", text)
text = re.sub(r"\s+", " ", text).strip()
content = text[:8000]
cache[key] = content
_save_cache(cache)
return content
except requests.exceptions.Timeout:
wait = 2 ** attempt
print(f"[visit_webpage] timeout attempt {attempt+1}, retrying in {wait}s", flush=True)
time.sleep(wait)
except requests.exceptions.HTTPError as e:
return f"HTTP {e.response.status_code} error fetching {url}"
except Exception as e:
return f"Error fetching page: {e}"
return f"Error: could not fetch {url} after {retries} attempts."
@tool
def visit_webpage(url: str) -> str:
"""Fetch the plain-text content of a webpage (disk-cached).
Args:
url: Full URL including https://
"""
return _fetch_page(url)
# ──────────────────────────────────────────────────────────────────────────────
# Tool: get_youtube_transcript
# ──────────────────────────────────────────────────────────────────────────────
@tool
def get_youtube_transcript(video_url: str) -> str:
"""Fetch the auto-generated transcript of a YouTube video.
Use this for any question that asks about spoken dialogue or audio in a video.
Args:
video_url: Full YouTube URL, e.g. 'https://www.youtube.com/watch?v=1htKBjuUWec'
"""
vid_id = _extract_youtube_id(video_url)
if not vid_id:
return f"Could not extract video ID from: {video_url}"
# Primary: youtube-transcript-api (pip install youtube-transcript-api)
try:
from youtube_transcript_api import YouTubeTranscriptApi
entries = YouTubeTranscriptApi.get_transcript(vid_id)
text = " ".join(e["text"] for e in entries)
return text[:8000]
except Exception:
pass
# Fallback: scrape caption track from page source
try:
resp = requests.get(
f"https://www.youtube.com/watch?v={vid_id}",
headers={"User-Agent": "Mozilla/5.0"}, timeout=12
)
cap_match = re.search(r'"captionTracks":\[.*?"baseUrl":"(.*?)"', resp.text)
if cap_match:
cap_url = cap_match.group(1).replace("\\u0026", "&")
cap_resp = requests.get(cap_url, timeout=10)
text = re.sub(r"<[^>]+>", " ", cap_resp.text)
text = re.sub(r"\s+", " ", text).strip()
return text[:8000]
return "No captions found for this video."
except Exception as e:
return f"Transcript fetch failed: {e}"
# ──────────────────────────────────────────────────────────────────────────────
# Tool: read_pdf
# ──────────────────────────────────────────────────────────────────────────────
@tool
def read_pdf(filepath: str) -> str:
"""Read and extract text from a local PDF file.
Args:
filepath: Absolute path to the PDF file on disk.
"""
try:
if not os.path.exists(filepath):
return f"PDF error: file not found at {filepath}"
reader = PdfReader(filepath)
text = "".join(page.extract_text() or "" for page in reader.pages)
if not text.strip():
return "PDF appears to be empty or image-only (no extractable text)."
return text[:15000]
except Exception as e:
return f"PDF error: {e}"