import io import os import re import sys import json import base64 import subprocess from config import GROQ_API_KEY, OPENROUTER_API_KEY from functools import lru_cache # Force UTF-8 output on Windows to avoid charmap crashes with Unicode characters if sys.platform == "win32": sys.stdout.reconfigure(encoding="utf-8", errors="replace") sys.stderr.reconfigure(encoding="utf-8", errors="replace") import requests from tempfile import NamedTemporaryFile import pandas as pd import markdownify from langchain_community.document_loaders import WikipediaLoader from langchain_core.tools import tool from youtube_transcript_api import YouTubeTranscriptApi # Import ddgs for web search (the standalone library, not langchain wrapper) try: from ddgs import DDGS except ImportError: try: from duckduckgo_search import DDGS except ImportError: DDGS = None # ──────────────────────────────────────────────────────────────────────────── # # Wikipedia # ──────────────────────────────────────────────────────────────────────────── # @tool def wikipedia_search(query: str, max_pages: int = 3) -> str: """Search Wikipedia for a query and return article summaries.""" print(f"[TOOL] wiki_search called with query: {query}") try: docs = WikipediaLoader(query=query, load_max_docs=max_pages).load() joined = "\n\n---\n\n".join(d.page_content for d in docs) return joined[:50_000] if joined else "No Wikipedia results found." except Exception as e: print(f"[TOOL] wiki_search error: {e}") return f"Wikipedia search failed: {e}" # ──────────────────────────────────────────────────────────────────────────── # # Web Search (ddgs library — direct, not langchain wrapper) # ──────────────────────────────────────────────────────────────────────────── # def _ddg_search_raw(query: str, k: int = 8) -> list[dict]: """Search DuckDuckGo using the ddgs library directly.""" if DDGS is None: print("[TOOL] DDG search unavailable — ddgs not installed") return [] try: results = DDGS().text(query, max_results=k) return [ { "title": r.get("title", "")[:500], "snippet": r.get("body", "")[:4000], "link": r.get("href", "")[:300], } for r in results[:k] ] except Exception as e: print(f"[TOOL] DDG search error: {e}") return [] @tool def web_search(query: str, k: int = 8) -> str: """Search the web using DuckDuckGo and return results as JSON.""" hits = _ddg_search_raw(query, k) if hits: return json.dumps(hits, ensure_ascii=False) # Fallback: try with a simplified query simplified = re.sub(r'["\']', '', query) if simplified != query: hits = _ddg_search_raw(simplified, k) if hits: return json.dumps(hits, ensure_ascii=False) return "No search results found." # ──────────────────────────────────────────────────────────────────────────── # # Visit Webpage (fetch actual page content) # ──────────────────────────────────────────────────────────────────────────── # @tool def visit_webpage(url: str) -> str: """Fetch the content of a webpage URL and return cleaned text. Args: url: The URL to fetch. Returns: The main text content of the page, truncated to ~80k chars. """ print(f"[TOOL] visit_webpage: {url}") try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" } resp = requests.get(url, headers=headers, timeout=20) resp.raise_for_status() text = markdownify.markdownify(resp.text, strip=["img", "script", "style"]) text = re.sub(r'\n{3,}', '\n\n', text).strip() return text[:80_000] except Exception as e: print(f"[TOOL] visit_webpage error: {e}") return f"Could not fetch {url}: {e}" # ──────────────────────────────────────────────────────────────────────────── # # YouTube Transcript # ──────────────────────────────────────────────────────────────────────────── # @tool def get_youtube_transcript(video_url: str) -> str: """Fetch the transcript/captions of a YouTube video. Args: video_url: Full YouTube URL or just the video ID. Returns: The full transcript as a single string, or TRANSCRIPT_UNAVAILABLE. """ match = re.search(r"(?:v=|youtu\.be/)([A-Za-z0-9_-]{11})", video_url) video_id = match.group(1) if match else video_url # Try new API first, then old API for attempt_fn in [_fetch_transcript_new_api, _fetch_transcript_old_api]: result = attempt_fn(video_id) if result and result != "TRANSCRIPT_UNAVAILABLE": print(f"[TOOL] YouTube transcript: {len(result)} chars") return result return "TRANSCRIPT_UNAVAILABLE" def _fetch_transcript_new_api(video_id: str) -> str: try: ytt = YouTubeTranscriptApi() entries = ytt.fetch(video_id) return " ".join( e.text if hasattr(e, 'text') else e.get("text", "") for e in entries ) except Exception: return "" def _fetch_transcript_old_api(video_id: str) -> str: try: entries = YouTubeTranscriptApi.get_transcript(video_id) return " ".join(e["text"] for e in entries) except Exception: return "" # ──────────────────────────────────────────────────────────────────────────── # # Image Description (Vision model) # ──────────────────────────────────────────────────────────────────────────── # @tool def describe_image(img_bytes: bytes, question: str) -> str: """Use a vision model to interpret or answer questions about an image file. Args: img_bytes: Raw image bytes. question: Specific question to ask about the image content. Returns: A text description or answer about the image content. """ image_data = base64.standard_b64encode(img_bytes).decode("utf-8") models_to_try = [ "google/gemini-2.0-flash-001", "qwen/qwen-2.5-vl-72b-instruct", "nvidia/nemotron-nano-12b-v2-vl:free", ] for model in models_to_try: try: payload = { "model": model, "messages": [ { "role": "user", "content": [ {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_data}"}}, {"type": "text", "text": ( f"{question}\n\n" "Be extremely specific and precise. " "If this is a chess position, list ALL pieces with their exact square coordinates in algebraic notation. " "If there is text in the image, transcribe it exactly. " "If there are numbers, list them all." )}, ], } ], "max_tokens": 2048, } headers = {"Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json"} resp = requests.post( "https://openrouter.ai/api/v1/chat/completions", json=payload, headers=headers, timeout=90, ) resp.raise_for_status() content = resp.json()["choices"][0]["message"]["content"] if content and len(content.strip()) > 10: print(f"[TOOL] describe_image success with {model}") return content except Exception as e: print(f"[TOOL] describe_image failed with {model}: {e}") continue return "IMAGE_DESCRIPTION_UNAVAILABLE" # ──────────────────────────────────────────────────────────────────────────── # # Audio Transcription (Whisper via Groq) # ──────────────────────────────────────────────────────────────────────────── # @tool def transcribe_audio(audio_bytes: bytes) -> str: """Transcribe an audio file (.mp3, .wav, .m4a, .flac) to text using Whisper.""" headers = {"Authorization": f"Bearer {GROQ_API_KEY}"} with NamedTemporaryFile(suffix=".mp3", delete=False) as f: f.write(audio_bytes) file_path = f.name try: with open(file_path, "rb") as f: resp = requests.post( "https://api.groq.com/openai/v1/audio/transcriptions", headers=headers, files={"file": (os.path.basename(file_path), f)}, data={"model": "whisper-large-v3"}, timeout=120, ) resp.raise_for_status() text = resp.json().get("text", "") print(f"[TOOL] transcribe_audio: {len(text)} chars") return text except Exception as e: print(f"[TOOL] transcribe_audio error: {e}") return f"TRANSCRIPTION_ERROR: {e}" finally: try: os.unlink(file_path) except OSError: pass # ──────────────────────────────────────────────────────────────────────────── # # Python Execution # ──────────────────────────────────────────────────────────────────────────── # @tool def run_python_file(code: str) -> str: """Execute Python code and return its printed output. Args: code: The Python source code to execute. Returns: The last line of stdout, or stderr if no stdout. """ try: with NamedTemporaryFile(delete=False, suffix=".py", mode="w") as f: f.write(code) path = f.name proc = subprocess.run( [sys.executable, path], capture_output=True, text=True, timeout=45 ) stdout = proc.stdout.strip() stderr = proc.stderr.strip() if stdout: lines = [l for l in stdout.splitlines() if l.strip()] return lines[-1] if lines else stdout elif stderr: return f"py_stderr: {stderr[:2000]}" else: return "" except subprocess.TimeoutExpired: return "py_error: execution timed out after 45s" except Exception as exc: return f"py_error: {exc}" finally: try: os.unlink(path) except OSError: pass # ──────────────────────────────────────────────────────────────────────────── # # File Reading (Excel / CSV / PDF / Text) # ──────────────────────────────────────────────────────────────────────────── # @tool def read_task_file(xls_bytes: bytes) -> str: """Read the contents of a file attached to the task. Supports Excel (.xlsx/.xls), CSV, PDF, and plain text. Args: xls_bytes: Raw bytes of the file. Returns: The file contents as text. """ # Try Excel first try: df = pd.read_excel(io.BytesIO(xls_bytes)) return df.to_string(index=False) except Exception: pass # Try CSV try: df = pd.read_csv(io.BytesIO(xls_bytes)) return df.to_string(index=False) except Exception: pass # Try PDF try: from pypdf import PdfReader reader = PdfReader(io.BytesIO(xls_bytes)) pages = [page.extract_text() or "" for page in reader.pages] text = "\n".join(pages).strip() if text: return text except Exception: pass # Fallback: decode as UTF-8 text try: return xls_bytes.decode("utf-8", errors="replace") except Exception: return "Could not read the attached file in any supported format." _DOWNLOAD_DIR = os.path.join(os.environ.get("TMPDIR", "/tmp"), "gaia_files") os.makedirs(_DOWNLOAD_DIR, exist_ok=True)