Spaces:
Running
Running
| from __future__ import annotations | |
| import re | |
| from langchain_core.tools import tool | |
| from tavily import TavilyClient # type:ignore | |
| from youtube_transcript_api import YouTubeTranscriptApi #type:ignore | |
| from config import TAVILY_API_KEY | |
| # ββ Web Search (Tavily) βββββββββββββββββββββββββββββββββββββββ | |
| def run_web_search( | |
| query: str, | |
| api_key: str = "", | |
| *, | |
| search_depth: str = "advanced", | |
| topic: str = "general", | |
| max_results: int = 5, | |
| include_answer: bool = True, | |
| ) -> str: | |
| """ | |
| Run a web search using the Tavily API. | |
| `api_key` is the user's own Tavily key (BYOK). Falls back to the shared | |
| server key if the user didn't provide one. If no key is available at all, | |
| web search is treated as disabled. | |
| Returns a formatted block: an optional LLM-ready answer followed by | |
| result cards (title, URL, relevance score, content snippet). | |
| """ | |
| key = api_key or TAVILY_API_KEY | |
| if not key: | |
| return ( | |
| "Web search is unavailable: no Tavily API key configured. " | |
| "Add a Tavily API key in Settings to enable web search." | |
| ) | |
| try: | |
| client = TavilyClient(api_key=key) | |
| resp = client.search( | |
| query, | |
| search_depth=search_depth, | |
| topic=topic, | |
| max_results=max_results, | |
| include_answer=include_answer, | |
| include_raw_content=False, | |
| chunks_per_source=3, | |
| ) | |
| except Exception as e: | |
| print(f"[TAVILY SEARCH ERROR] {e}", flush=True) | |
| return f"Web search is temporarily unavailable. (Error: {e})" | |
| results = resp.get("results", []) if isinstance(resp, dict) else [] | |
| if not results and not (isinstance(resp, dict) and resp.get("answer")): | |
| return "No search results found." | |
| blocks = [] | |
| answer = resp.get("answer") if isinstance(resp, dict) else None | |
| if answer: | |
| blocks.append(f"Answer: {answer}\n") | |
| for r in results: | |
| title = r.get("title", "No Title") | |
| url = r.get("url", "") | |
| score = r.get("score", "") | |
| content = r.get("content", "") | |
| score_str = f" (relevance: {score:.2f})" if isinstance(score, (int, float)) else "" | |
| blocks.append(f"Title: {title}{score_str}\nURL: {url}\nSnippet: {content}\n") | |
| return "\n".join(blocks) | |
| def web_search(query: str) -> str: | |
| """ | |
| Search the internet for current information. Use when the student asks | |
| about recent events, specific facts, or anything not covered by the NCERT | |
| curriculum context. Input: a concise search query string. | |
| """ | |
| return run_web_search(query) | |
| # ββ YouTube Transcript ββββββββββββββββββββββββββββββββββββββββ | |
| def _extract_video_id(url_or_id: str) -> str | None: | |
| """Extract YouTube 11-character video ID from URL or bare ID.""" | |
| patterns = [ | |
| r"(?:v=|youtu\.be/|embed/|shorts/)([A-Za-z0-9_-]{11})", | |
| r"^([A-Za-z0-9_-]{11})$", | |
| ] | |
| for pat in patterns: | |
| m = re.search(pat, url_or_id.strip()) | |
| if m: | |
| return m.group(1) | |
| return None | |
| def yt_transcript(youtube_url: str) -> str: | |
| """ | |
| Fetch the full transcript of a YouTube video. | |
| """ | |
| return fetch_yt_transcript(youtube_url) | |
| def fetch_yt_transcript(youtube_url: str) -> str: | |
| """ | |
| Programmatic helper to fetch the transcript of a YouTube video URL or ID. | |
| """ | |
| video_id = _extract_video_id(youtube_url) | |
| if not video_id: | |
| print(f"[YT TRANSCRIPT] No video ID found in input: {youtube_url[:80]}", flush=True) | |
| return "TRANSCRIPT_UNAVAILABLE: Could not extract a valid YouTube video ID from the message." | |
| try: | |
| langs = ["en", "hi", "en-IN", "en-US"] | |
| # youtube-transcript-api β₯ 1.0 replaced the static `get_transcript` | |
| # with an instance method `.fetch()`. Support both APIs. | |
| if hasattr(YouTubeTranscriptApi, "get_transcript"): | |
| fetched = YouTubeTranscriptApi.get_transcript(video_id, languages=langs) # type:ignore[attr-defined] | |
| transcript = " ".join(seg["text"] for seg in fetched) | |
| else: | |
| fetched = YouTubeTranscriptApi().fetch(video_id, languages=langs) | |
| # FetchedTranscript yields snippet objects with a `.text` attribute | |
| # (older dict form `seg["text"]` is handled as a fallback). | |
| transcript = " ".join( | |
| getattr(seg, "text", None) or (seg.get("text", "") if isinstance(seg, dict) else "") | |
| for seg in fetched | |
| ) | |
| if not transcript.strip(): | |
| print(f"[YT TRANSCRIPT] Empty transcript for video_id={video_id}", flush=True) | |
| return "TRANSCRIPT_UNAVAILABLE: Transcript is empty for this video." | |
| print(f"[YT TRANSCRIPT] OK β {len(transcript)} chars for video_id={video_id}", flush=True) | |
| return transcript | |
| except Exception as exc: | |
| print(f"[YT TRANSCRIPT EXCEPTION] video_id={video_id} | error={exc}", flush=True) | |
| err = str(exc).lower() | |
| if "disabled" in err or "no transcript" in err or "no element" in err: | |
| return ( | |
| "TRANSCRIPT_UNAVAILABLE: This video has no transcript available " | |
| "(subtitles are disabled or no captions exist for this video)." | |
| ) | |
| if "too many requests" in err or "429" in err: | |
| return ( | |
| "TRANSCRIPT_UNAVAILABLE: YouTube is rate-limiting transcript requests right now. " | |
| "Please try again in a few minutes." | |
| ) | |
| return ( | |
| f"TRANSCRIPT_UNAVAILABLE: Could not retrieve transcript. Reason: {exc}" | |
| ) | |
| # ββ Exported list βββββββββββββββββββββββββββββββββββββββββββββ | |
| TOOLS = [web_search, yt_transcript] | |