Ghisalbertifederico's picture
Update tools.py
95641ba verified
import io
import os
import re
import sys
import json
import base64
import subprocess
from config import GROQ_API_KEY, OPENROUTER_API_KEY
from functools import lru_cache
# Force UTF-8 output on Windows to avoid charmap crashes with Unicode characters
if sys.platform == "win32":
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
import requests
from tempfile import NamedTemporaryFile
import pandas as pd
import markdownify
from langchain_community.document_loaders import WikipediaLoader
from langchain_core.tools import tool
from youtube_transcript_api import YouTubeTranscriptApi
# Import ddgs for web search (the standalone library, not langchain wrapper)
try:
from ddgs import DDGS
except ImportError:
try:
from duckduckgo_search import DDGS
except ImportError:
DDGS = None
# ──────────────────────────────────────────────────────────────────────────── #
# Wikipedia
# ──────────────────────────────────────────────────────────────────────────── #
@tool
def wikipedia_search(query: str, max_pages: int = 3) -> str:
"""Search Wikipedia for a query and return article summaries."""
print(f"[TOOL] wiki_search called with query: {query}")
try:
docs = WikipediaLoader(query=query, load_max_docs=max_pages).load()
joined = "\n\n---\n\n".join(d.page_content for d in docs)
return joined[:50_000] if joined else "No Wikipedia results found."
except Exception as e:
print(f"[TOOL] wiki_search error: {e}")
return f"Wikipedia search failed: {e}"
# ──────────────────────────────────────────────────────────────────────────── #
# Web Search (ddgs library β€” direct, not langchain wrapper)
# ──────────────────────────────────────────────────────────────────────────── #
def _ddg_search_raw(query: str, k: int = 8) -> list[dict]:
"""Search DuckDuckGo using the ddgs library directly."""
if DDGS is None:
print("[TOOL] DDG search unavailable β€” ddgs not installed")
return []
try:
results = DDGS().text(query, max_results=k)
return [
{
"title": r.get("title", "")[:500],
"snippet": r.get("body", "")[:4000],
"link": r.get("href", "")[:300],
}
for r in results[:k]
]
except Exception as e:
print(f"[TOOL] DDG search error: {e}")
return []
@tool
def web_search(query: str, k: int = 8) -> str:
"""Search the web using DuckDuckGo and return results as JSON."""
hits = _ddg_search_raw(query, k)
if hits:
return json.dumps(hits, ensure_ascii=False)
# Fallback: try with a simplified query
simplified = re.sub(r'["\']', '', query)
if simplified != query:
hits = _ddg_search_raw(simplified, k)
if hits:
return json.dumps(hits, ensure_ascii=False)
return "No search results found."
# ──────────────────────────────────────────────────────────────────────────── #
# Visit Webpage (fetch actual page content)
# ──────────────────────────────────────────────────────────────────────────── #
@tool
def visit_webpage(url: str) -> str:
"""Fetch the content of a webpage URL and return cleaned text.
Args:
url: The URL to fetch.
Returns:
The main text content of the page, truncated to ~40k chars.
"""
print(f"[TOOL] visit_webpage: {url}")
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
}
resp = requests.get(url, headers=headers, timeout=20)
resp.raise_for_status()
text = markdownify.markdownify(resp.text, strip=["img", "script", "style"])
text = re.sub(r'\n{3,}', '\n\n', text).strip()
return text[:40_000]
except Exception as e:
print(f"[TOOL] visit_webpage error: {e}")
return f"Could not fetch {url}: {e}"
# ──────────────────────────────────────────────────────────────────────────── #
# YouTube Transcript
# ──────────────────────────────────────────────────────────────────────────── #
@tool
def get_youtube_transcript(video_url: str) -> str:
"""Fetch the transcript/captions of a YouTube video.
Args:
video_url: Full YouTube URL or just the video ID.
Returns:
The full transcript as a single string, or TRANSCRIPT_UNAVAILABLE.
"""
match = re.search(r"(?:v=|youtu\.be/)([A-Za-z0-9_-]{11})", video_url)
video_id = match.group(1) if match else video_url
# Try new API first, then old API
for attempt_fn in [_fetch_transcript_new_api, _fetch_transcript_old_api]:
result = attempt_fn(video_id)
if result and result != "TRANSCRIPT_UNAVAILABLE":
print(f"[TOOL] YouTube transcript: {len(result)} chars")
return result
return "TRANSCRIPT_UNAVAILABLE"
def _fetch_transcript_new_api(video_id: str) -> str:
try:
ytt = YouTubeTranscriptApi()
entries = ytt.fetch(video_id)
return " ".join(
e.text if hasattr(e, 'text') else e.get("text", "")
for e in entries
)
except Exception:
return ""
def _fetch_transcript_old_api(video_id: str) -> str:
try:
entries = YouTubeTranscriptApi.get_transcript(video_id)
return " ".join(e["text"] for e in entries)
except Exception:
return ""
# ──────────────────────────────────────────────────────────────────────────── #
# Image Description (Vision model)
# ──────────────────────────────────────────────────────────────────────────── #
@tool
def describe_image(img_bytes: bytes, question: str) -> str:
"""Use a vision model to interpret or answer questions about an image file.
Args:
img_bytes: Raw image bytes.
question: Specific question to ask about the image content.
Returns:
A text description or answer about the image content.
"""
image_data = base64.standard_b64encode(img_bytes).decode("utf-8")
models_to_try = [
"google/gemini-2.0-flash-001",
"qwen/qwen-2.5-vl-72b-instruct",
"nvidia/nemotron-nano-12b-v2-vl:free",
]
for model in models_to_try:
try:
payload = {
"model": model,
"messages": [
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_data}"}},
{"type": "text", "text": (
f"{question}\n\n"
"Be extremely specific and precise. "
"If this is a chess position, list ALL pieces with their exact square coordinates in algebraic notation. "
"If there is text in the image, transcribe it exactly. "
"If there are numbers, list them all."
)},
],
}
],
"max_tokens": 2048,
}
headers = {"Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json"}
resp = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
json=payload, headers=headers, timeout=90,
)
resp.raise_for_status()
content = resp.json()["choices"][0]["message"]["content"]
if content and len(content.strip()) > 10:
print(f"[TOOL] describe_image success with {model}")
return content
except Exception as e:
print(f"[TOOL] describe_image failed with {model}: {e}")
continue
return "IMAGE_DESCRIPTION_UNAVAILABLE"
# ──────────────────────────────────────────────────────────────────────────── #
# Audio Transcription (Whisper via Groq)
# ──────────────────────────────────────────────────────────────────────────── #
@tool
def transcribe_audio(audio_bytes: bytes) -> str:
"""Transcribe an audio file (.mp3, .wav, .m4a, .flac) to text using Whisper."""
headers = {"Authorization": f"Bearer {GROQ_API_KEY}"}
with NamedTemporaryFile(suffix=".mp3", delete=False) as f:
f.write(audio_bytes)
file_path = f.name
try:
with open(file_path, "rb") as f:
resp = requests.post(
"https://api.groq.com/openai/v1/audio/transcriptions",
headers=headers,
files={"file": (os.path.basename(file_path), f)},
data={"model": "whisper-large-v3"},
timeout=120,
)
resp.raise_for_status()
text = resp.json().get("text", "")
print(f"[TOOL] transcribe_audio: {len(text)} chars")
return text
except Exception as e:
print(f"[TOOL] transcribe_audio error: {e}")
return f"TRANSCRIPTION_ERROR: {e}"
finally:
try:
os.unlink(file_path)
except OSError:
pass
# ──────────────────────────────────────────────────────────────────────────── #
# Python Execution
# ──────────────────────────────────────────────────────────────────────────── #
@tool
def run_python_file(code: str) -> str:
"""Execute Python code and return its printed output.
Args:
code: The Python source code to execute.
Returns:
The last line of stdout, or stderr if no stdout.
"""
try:
with NamedTemporaryFile(delete=False, suffix=".py", mode="w") as f:
f.write(code)
path = f.name
proc = subprocess.run(
[sys.executable, path], capture_output=True, text=True, timeout=45
)
stdout = proc.stdout.strip()
stderr = proc.stderr.strip()
if stdout:
lines = [l for l in stdout.splitlines() if l.strip()]
return lines[-1] if lines else stdout
elif stderr:
return f"py_stderr: {stderr[:2000]}"
else:
return ""
except subprocess.TimeoutExpired:
return "py_error: execution timed out after 45s"
except Exception as exc:
return f"py_error: {exc}"
finally:
try:
os.unlink(path)
except OSError:
pass
# ──────────────────────────────────────────────────────────────────────────── #
# File Reading (Excel / CSV / PDF / Text)
# ──────────────────────────────────────────────────────────────────────────── #
@tool
def read_task_file(xls_bytes: bytes) -> str:
"""Read the contents of a file attached to the task.
Supports Excel (.xlsx/.xls), CSV, PDF, and plain text.
Args:
xls_bytes: Raw bytes of the file.
Returns:
The file contents as text.
"""
# Try Excel first
try:
df = pd.read_excel(io.BytesIO(xls_bytes))
return df.to_string(index=False)
except Exception:
pass
# Try CSV
try:
df = pd.read_csv(io.BytesIO(xls_bytes))
return df.to_string(index=False)
except Exception:
pass
# Try PDF
try:
from pypdf import PdfReader
reader = PdfReader(io.BytesIO(xls_bytes))
pages = [page.extract_text() or "" for page in reader.pages]
text = "\n".join(pages).strip()
if text:
return text
except Exception:
pass
# Fallback: decode as UTF-8 text
try:
return xls_bytes.decode("utf-8", errors="replace")
except Exception:
return "Could not read the attached file in any supported format."
_DOWNLOAD_DIR = os.path.join(os.environ.get("TMPDIR", "/tmp"), "gaia_files")
os.makedirs(_DOWNLOAD_DIR, exist_ok=True)