from __future__ import annotations import re from dataclasses import dataclass from html.parser import HTMLParser from typing import Iterable from urllib.parse import parse_qs, unquote, urlparse import requests USER_AGENT = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124 Safari/537.36" ) @dataclass(frozen=True) class SearchResult: title: str url: str snippet: str = "" def extract_urls(text: str) -> list[str]: return re.findall(r"https?://[^\s)>\]]+", text) def fetch_url(url: str, *, timeout: int = 20, max_chars: int = 30_000) -> str: response = requests.get( url, headers={"User-Agent": USER_AGENT}, timeout=timeout, ) response.raise_for_status() content_type = response.headers.get("content-type", "") raw_text = response.text if "html" in content_type: raw_text = html_to_text(raw_text) raw_text = normalize_whitespace(raw_text) if len(raw_text) <= max_chars: return raw_text return f"{raw_text[:max_chars]}\n\n[truncated after {max_chars} characters]" def web_search(query: str, *, max_results: int = 5, timeout: int = 20) -> list[SearchResult]: results = _duckduckgo_search(query, max_results=max_results, timeout=timeout) if results: return results[:max_results] return _wikipedia_search(query, max_results=max_results, timeout=timeout) def get_youtube_transcript(url_or_id: str) -> str: video_id = extract_youtube_id(url_or_id) if not video_id: raise ValueError(f"Could not extract a YouTube video id from {url_or_id!r}.") try: from youtube_transcript_api import YouTubeTranscriptApi except ImportError as exc: raise RuntimeError( "youtube-transcript-api is not installed, so YouTube transcripts " "cannot be fetched." ) from exc try: transcript = YouTubeTranscriptApi.get_transcript(video_id) except AttributeError: transcript = YouTubeTranscriptApi().fetch(video_id).to_raw_data() return "\n".join( f"[{entry.get('start', 0):.1f}] {entry.get('text', '')}" for entry in transcript ) def extract_youtube_id(url_or_id: str) -> str | None: if re.fullmatch(r"[\w-]{11}", url_or_id): return url_or_id parsed = urlparse(url_or_id) if parsed.hostname in {"youtu.be", "www.youtu.be"}: return parsed.path.lstrip("/")[:11] if parsed.hostname and "youtube.com" in parsed.hostname: query_id = parse_qs(parsed.query).get("v", [None])[0] if query_id: return query_id[:11] match = re.search(r"/(?:shorts|embed)/([\w-]{11})", parsed.path) if match: return match.group(1) return None def html_to_text(html: str) -> str: parser = _TextExtractor() parser.feed(html) return parser.text() def normalize_whitespace(text: str) -> str: return re.sub(r"\s+", " ", text).strip() def _duckduckgo_search( query: str, *, max_results: int, timeout: int, ) -> list[SearchResult]: response = requests.get( "https://duckduckgo.com/html/", params={"q": query}, headers={"User-Agent": USER_AGENT}, timeout=timeout, ) response.raise_for_status() parser = _DuckDuckGoParser() parser.feed(response.text) return parser.results[:max_results] def _wikipedia_search( query: str, *, max_results: int, timeout: int, ) -> list[SearchResult]: response = requests.get( "https://en.wikipedia.org/w/api.php", params={ "action": "query", "list": "search", "srsearch": query, "format": "json", "srlimit": max_results, }, headers={"User-Agent": USER_AGENT}, timeout=timeout, ) response.raise_for_status() payload = response.json() results = [] for item in payload.get("query", {}).get("search", []): title = item.get("title", "") url_title = title.replace(" ", "_") results.append( SearchResult( title=title, url=f"https://en.wikipedia.org/wiki/{url_title}", snippet=html_to_text(item.get("snippet", "")), ) ) return results class _TextExtractor(HTMLParser): def __init__(self) -> None: super().__init__() self._chunks: list[str] = [] self._skip_depth = 0 def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: if tag in {"script", "style", "noscript", "svg"}: self._skip_depth += 1 if tag in {"p", "br", "li", "tr", "h1", "h2", "h3", "h4"}: self._chunks.append("\n") def handle_endtag(self, tag: str) -> None: if tag in {"script", "style", "noscript", "svg"} and self._skip_depth: self._skip_depth -= 1 if tag in {"p", "li", "tr"}: self._chunks.append("\n") def handle_data(self, data: str) -> None: if not self._skip_depth: self._chunks.append(data) def text(self) -> str: return "\n".join( chunk.strip() for chunk in self._chunks if chunk and chunk.strip() ) class _DuckDuckGoParser(HTMLParser): def __init__(self) -> None: super().__init__() self.results: list[SearchResult] = [] self._active_href: str | None = None self._active_chunks: list[str] = [] def handle_starttag(self, tag: str, attrs: Iterable[tuple[str, str | None]]) -> None: if tag != "a": return attr_map = {key: value or "" for key, value in attrs} css_class = attr_map.get("class", "") href = attr_map.get("href", "") if "result__a" in css_class and href: self._active_href = _unwrap_duckduckgo_url(href) self._active_chunks = [] def handle_data(self, data: str) -> None: if self._active_href: self._active_chunks.append(data) def handle_endtag(self, tag: str) -> None: if tag != "a" or not self._active_href: return title = normalize_whitespace(" ".join(self._active_chunks)) if title and self._active_href.startswith("http"): self.results.append(SearchResult(title=title, url=self._active_href)) self._active_href = None self._active_chunks = [] def _unwrap_duckduckgo_url(url: str) -> str: if url.startswith("//"): url = f"https:{url}" parsed = urlparse(url) if "duckduckgo.com" in parsed.netloc: uddg = parse_qs(parsed.query).get("uddg", [None])[0] if uddg: return unquote(uddg) return url