| import io |
| import os |
| import re |
| import sys |
| import json |
| import base64 |
| import subprocess |
| from config import GROQ_API_KEY, OPENROUTER_API_KEY |
| from functools import lru_cache |
|
|
| |
| if sys.platform == "win32": |
| sys.stdout.reconfigure(encoding="utf-8", errors="replace") |
| sys.stderr.reconfigure(encoding="utf-8", errors="replace") |
|
|
| import requests |
| from tempfile import NamedTemporaryFile |
| import pandas as pd |
| import markdownify |
| from langchain_community.document_loaders import WikipediaLoader |
| from langchain_core.tools import tool |
| from youtube_transcript_api import YouTubeTranscriptApi |
|
|
| |
| try: |
| from ddgs import DDGS |
| except ImportError: |
| try: |
| from duckduckgo_search import DDGS |
| except ImportError: |
| DDGS = None |
|
|
|
|
| |
| |
| |
| @tool |
| def wikipedia_search(query: str, max_pages: int = 3) -> str: |
| """Search Wikipedia for a query and return article summaries.""" |
| print(f"[TOOL] wiki_search called with query: {query}") |
| try: |
| docs = WikipediaLoader(query=query, load_max_docs=max_pages).load() |
| joined = "\n\n---\n\n".join(d.page_content for d in docs) |
| return joined[:50_000] if joined else "No Wikipedia results found." |
| except Exception as e: |
| print(f"[TOOL] wiki_search error: {e}") |
| return f"Wikipedia search failed: {e}" |
|
|
|
|
| |
| |
| |
| def _ddg_search_raw(query: str, k: int = 8) -> list[dict]: |
| """Search DuckDuckGo using the ddgs library directly.""" |
| if DDGS is None: |
| print("[TOOL] DDG search unavailable β ddgs not installed") |
| return [] |
| try: |
| results = DDGS().text(query, max_results=k) |
| return [ |
| { |
| "title": r.get("title", "")[:500], |
| "snippet": r.get("body", "")[:4000], |
| "link": r.get("href", "")[:300], |
| } |
| for r in results[:k] |
| ] |
| except Exception as e: |
| print(f"[TOOL] DDG search error: {e}") |
| return [] |
|
|
|
|
| @tool |
| def web_search(query: str, k: int = 8) -> str: |
| """Search the web using DuckDuckGo and return results as JSON.""" |
| hits = _ddg_search_raw(query, k) |
| if hits: |
| return json.dumps(hits, ensure_ascii=False) |
| |
| simplified = re.sub(r'["\']', '', query) |
| if simplified != query: |
| hits = _ddg_search_raw(simplified, k) |
| if hits: |
| return json.dumps(hits, ensure_ascii=False) |
| return "No search results found." |
|
|
|
|
| |
| |
| |
| @tool |
| def visit_webpage(url: str) -> str: |
| """Fetch the content of a webpage URL and return cleaned text. |
| |
| Args: |
| url: The URL to fetch. |
| |
| Returns: |
| The main text content of the page, truncated to ~40k chars. |
| """ |
| print(f"[TOOL] visit_webpage: {url}") |
| try: |
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " |
| "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" |
| } |
| resp = requests.get(url, headers=headers, timeout=20) |
| resp.raise_for_status() |
| text = markdownify.markdownify(resp.text, strip=["img", "script", "style"]) |
| text = re.sub(r'\n{3,}', '\n\n', text).strip() |
| return text[:40_000] |
| except Exception as e: |
| print(f"[TOOL] visit_webpage error: {e}") |
| return f"Could not fetch {url}: {e}" |
|
|
|
|
| |
| |
| |
| @tool |
| def get_youtube_transcript(video_url: str) -> str: |
| """Fetch the transcript/captions of a YouTube video. |
| |
| Args: |
| video_url: Full YouTube URL or just the video ID. |
| |
| Returns: |
| The full transcript as a single string, or TRANSCRIPT_UNAVAILABLE. |
| """ |
| match = re.search(r"(?:v=|youtu\.be/)([A-Za-z0-9_-]{11})", video_url) |
| video_id = match.group(1) if match else video_url |
|
|
| |
| for attempt_fn in [_fetch_transcript_new_api, _fetch_transcript_old_api]: |
| result = attempt_fn(video_id) |
| if result and result != "TRANSCRIPT_UNAVAILABLE": |
| print(f"[TOOL] YouTube transcript: {len(result)} chars") |
| return result |
|
|
| return "TRANSCRIPT_UNAVAILABLE" |
|
|
|
|
| def _fetch_transcript_new_api(video_id: str) -> str: |
| try: |
| ytt = YouTubeTranscriptApi() |
| entries = ytt.fetch(video_id) |
| return " ".join( |
| e.text if hasattr(e, 'text') else e.get("text", "") |
| for e in entries |
| ) |
| except Exception: |
| return "" |
|
|
|
|
| def _fetch_transcript_old_api(video_id: str) -> str: |
| try: |
| entries = YouTubeTranscriptApi.get_transcript(video_id) |
| return " ".join(e["text"] for e in entries) |
| except Exception: |
| return "" |
|
|
|
|
| |
| |
| |
| @tool |
| def describe_image(img_bytes: bytes, question: str) -> str: |
| """Use a vision model to interpret or answer questions about an image file. |
| |
| Args: |
| img_bytes: Raw image bytes. |
| question: Specific question to ask about the image content. |
| |
| Returns: |
| A text description or answer about the image content. |
| """ |
| image_data = base64.standard_b64encode(img_bytes).decode("utf-8") |
|
|
| models_to_try = [ |
| "google/gemini-2.0-flash-001", |
| "qwen/qwen-2.5-vl-72b-instruct", |
| "nvidia/nemotron-nano-12b-v2-vl:free", |
| ] |
|
|
| for model in models_to_try: |
| try: |
| payload = { |
| "model": model, |
| "messages": [ |
| { |
| "role": "user", |
| "content": [ |
| {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_data}"}}, |
| {"type": "text", "text": ( |
| f"{question}\n\n" |
| "Be extremely specific and precise. " |
| "If this is a chess position, list ALL pieces with their exact square coordinates in algebraic notation. " |
| "If there is text in the image, transcribe it exactly. " |
| "If there are numbers, list them all." |
| )}, |
| ], |
| } |
| ], |
| "max_tokens": 2048, |
| } |
| headers = {"Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json"} |
| resp = requests.post( |
| "https://openrouter.ai/api/v1/chat/completions", |
| json=payload, headers=headers, timeout=90, |
| ) |
| resp.raise_for_status() |
| content = resp.json()["choices"][0]["message"]["content"] |
| if content and len(content.strip()) > 10: |
| print(f"[TOOL] describe_image success with {model}") |
| return content |
| except Exception as e: |
| print(f"[TOOL] describe_image failed with {model}: {e}") |
| continue |
|
|
| return "IMAGE_DESCRIPTION_UNAVAILABLE" |
|
|
|
|
| |
| |
| |
| @tool |
| def transcribe_audio(audio_bytes: bytes) -> str: |
| """Transcribe an audio file (.mp3, .wav, .m4a, .flac) to text using Whisper.""" |
| headers = {"Authorization": f"Bearer {GROQ_API_KEY}"} |
| with NamedTemporaryFile(suffix=".mp3", delete=False) as f: |
| f.write(audio_bytes) |
| file_path = f.name |
| try: |
| with open(file_path, "rb") as f: |
| resp = requests.post( |
| "https://api.groq.com/openai/v1/audio/transcriptions", |
| headers=headers, |
| files={"file": (os.path.basename(file_path), f)}, |
| data={"model": "whisper-large-v3"}, |
| timeout=120, |
| ) |
| resp.raise_for_status() |
| text = resp.json().get("text", "") |
| print(f"[TOOL] transcribe_audio: {len(text)} chars") |
| return text |
| except Exception as e: |
| print(f"[TOOL] transcribe_audio error: {e}") |
| return f"TRANSCRIPTION_ERROR: {e}" |
| finally: |
| try: |
| os.unlink(file_path) |
| except OSError: |
| pass |
|
|
|
|
| |
| |
| |
| @tool |
| def run_python_file(code: str) -> str: |
| """Execute Python code and return its printed output. |
| |
| Args: |
| code: The Python source code to execute. |
| |
| Returns: |
| The last line of stdout, or stderr if no stdout. |
| """ |
| try: |
| with NamedTemporaryFile(delete=False, suffix=".py", mode="w") as f: |
| f.write(code) |
| path = f.name |
| proc = subprocess.run( |
| [sys.executable, path], capture_output=True, text=True, timeout=45 |
| ) |
| stdout = proc.stdout.strip() |
| stderr = proc.stderr.strip() |
| if stdout: |
| lines = [l for l in stdout.splitlines() if l.strip()] |
| return lines[-1] if lines else stdout |
| elif stderr: |
| return f"py_stderr: {stderr[:2000]}" |
| else: |
| return "" |
| except subprocess.TimeoutExpired: |
| return "py_error: execution timed out after 45s" |
| except Exception as exc: |
| return f"py_error: {exc}" |
| finally: |
| try: |
| os.unlink(path) |
| except OSError: |
| pass |
|
|
|
|
| |
| |
| |
| @tool |
| def read_task_file(xls_bytes: bytes) -> str: |
| """Read the contents of a file attached to the task. |
| Supports Excel (.xlsx/.xls), CSV, PDF, and plain text. |
| |
| Args: |
| xls_bytes: Raw bytes of the file. |
| |
| Returns: |
| The file contents as text. |
| """ |
| |
| try: |
| df = pd.read_excel(io.BytesIO(xls_bytes)) |
| return df.to_string(index=False) |
| except Exception: |
| pass |
|
|
| |
| try: |
| df = pd.read_csv(io.BytesIO(xls_bytes)) |
| return df.to_string(index=False) |
| except Exception: |
| pass |
|
|
| |
| try: |
| from pypdf import PdfReader |
| reader = PdfReader(io.BytesIO(xls_bytes)) |
| pages = [page.extract_text() or "" for page in reader.pages] |
| text = "\n".join(pages).strip() |
| if text: |
| return text |
| except Exception: |
| pass |
|
|
| |
| try: |
| return xls_bytes.decode("utf-8", errors="replace") |
| except Exception: |
| return "Could not read the attached file in any supported format." |
|
|
|
|
| _DOWNLOAD_DIR = os.path.join(os.environ.get("TMPDIR", "/tmp"), "gaia_files") |
| os.makedirs(_DOWNLOAD_DIR, exist_ok=True) |
|
|