Nigou Julien
Build routed GAIA agent v1
07fb471
from __future__ import annotations
import re
from dataclasses import dataclass
from html.parser import HTMLParser
from typing import Iterable
from urllib.parse import parse_qs, unquote, urlparse
import requests
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124 Safari/537.36"
)
@dataclass(frozen=True)
class SearchResult:
title: str
url: str
snippet: str = ""
def extract_urls(text: str) -> list[str]:
return re.findall(r"https?://[^\s)>\]]+", text)
def fetch_url(url: str, *, timeout: int = 20, max_chars: int = 30_000) -> str:
response = requests.get(
url,
headers={"User-Agent": USER_AGENT},
timeout=timeout,
)
response.raise_for_status()
content_type = response.headers.get("content-type", "")
raw_text = response.text
if "html" in content_type:
raw_text = html_to_text(raw_text)
raw_text = normalize_whitespace(raw_text)
if len(raw_text) <= max_chars:
return raw_text
return f"{raw_text[:max_chars]}\n\n[truncated after {max_chars} characters]"
def web_search(query: str, *, max_results: int = 5, timeout: int = 20) -> list[SearchResult]:
results = _duckduckgo_search(query, max_results=max_results, timeout=timeout)
if results:
return results[:max_results]
return _wikipedia_search(query, max_results=max_results, timeout=timeout)
def get_youtube_transcript(url_or_id: str) -> str:
video_id = extract_youtube_id(url_or_id)
if not video_id:
raise ValueError(f"Could not extract a YouTube video id from {url_or_id!r}.")
try:
from youtube_transcript_api import YouTubeTranscriptApi
except ImportError as exc:
raise RuntimeError(
"youtube-transcript-api is not installed, so YouTube transcripts "
"cannot be fetched."
) from exc
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id)
except AttributeError:
transcript = YouTubeTranscriptApi().fetch(video_id).to_raw_data()
return "\n".join(
f"[{entry.get('start', 0):.1f}] {entry.get('text', '')}"
for entry in transcript
)
def extract_youtube_id(url_or_id: str) -> str | None:
if re.fullmatch(r"[\w-]{11}", url_or_id):
return url_or_id
parsed = urlparse(url_or_id)
if parsed.hostname in {"youtu.be", "www.youtu.be"}:
return parsed.path.lstrip("/")[:11]
if parsed.hostname and "youtube.com" in parsed.hostname:
query_id = parse_qs(parsed.query).get("v", [None])[0]
if query_id:
return query_id[:11]
match = re.search(r"/(?:shorts|embed)/([\w-]{11})", parsed.path)
if match:
return match.group(1)
return None
def html_to_text(html: str) -> str:
parser = _TextExtractor()
parser.feed(html)
return parser.text()
def normalize_whitespace(text: str) -> str:
return re.sub(r"\s+", " ", text).strip()
def _duckduckgo_search(
query: str,
*,
max_results: int,
timeout: int,
) -> list[SearchResult]:
response = requests.get(
"https://duckduckgo.com/html/",
params={"q": query},
headers={"User-Agent": USER_AGENT},
timeout=timeout,
)
response.raise_for_status()
parser = _DuckDuckGoParser()
parser.feed(response.text)
return parser.results[:max_results]
def _wikipedia_search(
query: str,
*,
max_results: int,
timeout: int,
) -> list[SearchResult]:
response = requests.get(
"https://en.wikipedia.org/w/api.php",
params={
"action": "query",
"list": "search",
"srsearch": query,
"format": "json",
"srlimit": max_results,
},
headers={"User-Agent": USER_AGENT},
timeout=timeout,
)
response.raise_for_status()
payload = response.json()
results = []
for item in payload.get("query", {}).get("search", []):
title = item.get("title", "")
url_title = title.replace(" ", "_")
results.append(
SearchResult(
title=title,
url=f"https://en.wikipedia.org/wiki/{url_title}",
snippet=html_to_text(item.get("snippet", "")),
)
)
return results
class _TextExtractor(HTMLParser):
def __init__(self) -> None:
super().__init__()
self._chunks: list[str] = []
self._skip_depth = 0
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
if tag in {"script", "style", "noscript", "svg"}:
self._skip_depth += 1
if tag in {"p", "br", "li", "tr", "h1", "h2", "h3", "h4"}:
self._chunks.append("\n")
def handle_endtag(self, tag: str) -> None:
if tag in {"script", "style", "noscript", "svg"} and self._skip_depth:
self._skip_depth -= 1
if tag in {"p", "li", "tr"}:
self._chunks.append("\n")
def handle_data(self, data: str) -> None:
if not self._skip_depth:
self._chunks.append(data)
def text(self) -> str:
return "\n".join(
chunk.strip() for chunk in self._chunks if chunk and chunk.strip()
)
class _DuckDuckGoParser(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.results: list[SearchResult] = []
self._active_href: str | None = None
self._active_chunks: list[str] = []
def handle_starttag(self, tag: str, attrs: Iterable[tuple[str, str | None]]) -> None:
if tag != "a":
return
attr_map = {key: value or "" for key, value in attrs}
css_class = attr_map.get("class", "")
href = attr_map.get("href", "")
if "result__a" in css_class and href:
self._active_href = _unwrap_duckduckgo_url(href)
self._active_chunks = []
def handle_data(self, data: str) -> None:
if self._active_href:
self._active_chunks.append(data)
def handle_endtag(self, tag: str) -> None:
if tag != "a" or not self._active_href:
return
title = normalize_whitespace(" ".join(self._active_chunks))
if title and self._active_href.startswith("http"):
self.results.append(SearchResult(title=title, url=self._active_href))
self._active_href = None
self._active_chunks = []
def _unwrap_duckduckgo_url(url: str) -> str:
if url.startswith("//"):
url = f"https:{url}"
parsed = urlparse(url)
if "duckduckgo.com" in parsed.netloc:
uddg = parse_qs(parsed.query).get("uddg", [None])[0]
if uddg:
return unquote(uddg)
return url