gaia-agent / agent.py
DannyJun's picture
Initial commit: GAIA agent code for AI Agents Course final
2a5f443 verified
"""
GAIA Final Challenge agent for the HF AI Agents course.
Uses Claude Haiku 4.5 with tool use.
"""
import os, sys, json, subprocess, tempfile, traceback, base64, mimetypes
sys.stdout.reconfigure(encoding='utf-8')
import requests
import anthropic
API_BASE = "https://agents-course-unit4-scoring.hf.space"
MODEL = "claude-haiku-4-5"
MAX_TURNS = 12
WORK_DIR = "C:/Users/22678/Downloads/test/test/gaia_work"
os.makedirs(WORK_DIR, exist_ok=True)
client = anthropic.Anthropic()
# ---------- TOOLS ----------
def tool_wikipedia_search(query: str) -> str:
"""Search English Wikipedia and return top result extracts (summary text)."""
try:
r = requests.get(
"https://en.wikipedia.org/w/api.php",
params={
"action": "query",
"list": "search",
"srsearch": query,
"format": "json",
"srlimit": 5,
},
timeout=20,
headers={"User-Agent": "gaia-agent/0.1 (course exercise)"},
)
results = r.json().get("query", {}).get("search", [])
if not results:
return f"No results for '{query}'."
out = [f"Top {len(results)} Wikipedia hits for '{query}':"]
for hit in results:
title = hit["title"]
snippet = hit.get("snippet", "").replace('<span class="searchmatch">', '**').replace("</span>", "**")
url = f"https://en.wikipedia.org/wiki/{title.replace(' ', '_')}"
out.append(f"\n- **{title}** — {url}\n {snippet}")
return "\n".join(out)
except Exception as e:
return f"Error: {e}"
def tool_fetch_url(url: str, max_chars: int = 8000) -> str:
"""Fetch a URL and return its text content (stripped of HTML)."""
try:
r = requests.get(url, timeout=30, headers={"User-Agent": "Mozilla/5.0 gaia-agent"})
ct = r.headers.get("content-type", "")
if "html" in ct or url.endswith(".html") or "wikipedia.org" in url:
from bs4 import BeautifulSoup
soup = BeautifulSoup(r.text, "html.parser")
for s in soup(["script", "style", "nav", "footer"]):
s.decompose()
text = soup.get_text(separator="\n")
text = "\n".join(line.strip() for line in text.splitlines() if line.strip())
else:
text = r.text
if len(text) > max_chars:
text = text[:max_chars] + f"\n[...truncated {len(text)-max_chars} chars]"
return text
except Exception as e:
return f"Error fetching {url}: {e}"
def tool_download_task_file(task_id: str) -> str:
"""Download the file attached to a GAIA task. Returns local file path."""
try:
r = requests.get(f"{API_BASE}/files/{task_id}", timeout=60)
r.raise_for_status()
# Try to get filename from header
cd = r.headers.get("content-disposition", "")
fname = task_id
if "filename=" in cd:
fname = cd.split("filename=")[1].strip('"; ')
local = os.path.join(WORK_DIR, fname)
with open(local, "wb") as f:
f.write(r.content)
return f"Downloaded to {local} ({len(r.content)} bytes)"
except Exception as e:
return f"Error: {e}"
def tool_run_python(code: str, working_file: str = "") -> str:
"""Execute Python code. If working_file points to a .py file, just run that file."""
try:
if working_file and working_file.endswith(".py"):
r = subprocess.run(
[sys.executable, working_file],
capture_output=True, text=True, timeout=60, cwd=WORK_DIR,
)
return f"stdout:\n{r.stdout}\n\nstderr:\n{r.stderr}\nreturncode={r.returncode}"
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, encoding="utf-8") as f:
f.write(code)
tmp = f.name
try:
r = subprocess.run(
[sys.executable, tmp],
capture_output=True, text=True, timeout=60, cwd=WORK_DIR,
)
return f"stdout:\n{r.stdout}\n\nstderr:\n{r.stderr}\nreturncode={r.returncode}"
finally:
os.unlink(tmp)
except subprocess.TimeoutExpired:
return "Error: Timed out after 60s"
except Exception as e:
return f"Error: {e}\n{traceback.format_exc()}"
def tool_youtube_transcript(video_url: str) -> str:
"""Try to fetch YouTube transcript."""
try:
from youtube_transcript_api import YouTubeTranscriptApi
vid = video_url.split("v=")[1].split("&")[0]
transcript = YouTubeTranscriptApi.get_transcript(vid)
return "\n".join(f"[{t['start']:.1f}s] {t['text']}" for t in transcript)
except Exception as e:
return f"Error: {e}"
TOOLS = [
{
"name": "wikipedia_search",
"description": "Search English Wikipedia and get top 5 results with snippets and URLs. Use this FIRST for any factual question.",
"input_schema": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]},
},
{
"name": "fetch_url",
"description": "Fetch a URL (usually a Wikipedia page) and return its cleaned text content.",
"input_schema": {"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]},
},
{
"name": "download_task_file",
"description": "Download the file attached to the current GAIA task. Returns the local file path.",
"input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]},
},
{
"name": "run_python",
"description": "Execute Python code OR run an existing .py file. For .xlsx parsing, use pandas. For .py files just pass working_file=<path>.",
"input_schema": {
"type": "object",
"properties": {
"code": {"type": "string", "description": "Python code to run (ignored if working_file is set)"},
"working_file": {"type": "string", "description": "Path to a .py file to execute directly"},
},
"required": ["code"],
},
},
{
"name": "youtube_transcript",
"description": "Fetch transcript of a YouTube video by URL.",
"input_schema": {"type": "object", "properties": {"video_url": {"type": "string"}}, "required": ["video_url"]},
},
{
"name": "submit_final_answer",
"description": "Submit your final answer. The `answer` string will be scored via exact match - no preamble, no explanation. Call this exactly once at the end.",
"input_schema": {"type": "object", "properties": {"answer": {"type": "string", "description": "The final answer string, formatted exactly as the question requests"}}, "required": ["answer"]},
},
]
TOOL_FNS = {
"wikipedia_search": lambda i: tool_wikipedia_search(i["query"]),
"fetch_url": lambda i: tool_fetch_url(i["url"]),
"download_task_file": lambda i: tool_download_task_file(i["task_id"]),
"run_python": lambda i: tool_run_python(i.get("code", ""), i.get("working_file", "")),
"youtube_transcript": lambda i: tool_youtube_transcript(i["video_url"]),
}
SYSTEM = """You are a research agent solving GAIA benchmark questions for an EXACT-MATCH scoring system.
CRITICAL: You MUST end every task by calling the `submit_final_answer` tool with the clean answer string.
The `answer` argument is what gets scored - no preamble, no explanation, exact format only.
Workflow:
1. For ANY factual / lookup question (people, dates, statistics, geography, articles, history, sports, etc.):
ALWAYS call wikipedia_search FIRST. Do not answer from memory - your memory is often wrong on specifics.
Then call fetch_url on the most relevant Wikipedia URL to read details.
2. For attached file questions: call download_task_file. If it returns "No file path associated",
the file is permanently unavailable - just guess in the right format.
3. For pure reasoning (math, logic, reversed text, group theory): you may answer directly, but use run_python to verify.
4. For YouTube questions: try youtube_transcript with the URL.
Format rules (CRITICAL for exact-match):
- "comma-separated list, alphabetical order" → "apple, banana, cherry" (lowercase, space after comma)
- "first name only" → just one word like "Sarah"
- "IOC country code" → 3 uppercase letters like "USA"
- "USD with two decimal places" → "1234.56" (no $ sign unless asked)
- "just the city name without abbreviations" → "Boston" (full name, no state)
- "last names only, in Roman characters" → "Smith, Jones"
- Numeric → bare number, no unit unless requested
- Never include "FINAL ANSWER:" or quotes
- If you can't determine the answer, still submit your best guess in the correct format
You can use up to 10 tool calls. Then you MUST call submit_final_answer."""
def solve_question(q: dict) -> str:
"""Run agent loop for a single question, return final answer string."""
task_id = q["task_id"]
question = q["question"]
file_name = q.get("file_name", "")
user_content = f"task_id: {task_id}\n\nQuestion:\n{question}"
if file_name:
user_content += f"\n\nAttached file: {file_name} (call download_task_file with the task_id above to get it)"
# For chess image (Q4), include image in initial message
image_content = None
if file_name.lower().endswith((".png", ".jpg", ".jpeg")):
# Download the image first
tool_download_task_file(task_id)
local_img = os.path.join(WORK_DIR, file_name)
if os.path.exists(local_img):
with open(local_img, "rb") as f:
img_data = base64.standard_b64encode(f.read()).decode("utf-8")
media_type = mimetypes.guess_type(local_img)[0] or "image/png"
image_content = {"type": "image", "source": {"type": "base64", "media_type": media_type, "data": img_data}}
messages = [{"role": "user", "content": ([image_content, {"type": "text", "text": user_content}] if image_content else user_content)}]
final_answer = None
for turn in range(MAX_TURNS):
resp = client.messages.create(
model=MODEL,
max_tokens=4096,
system=SYSTEM,
tools=TOOLS,
messages=messages,
)
if resp.stop_reason == "tool_use":
messages.append({"role": "assistant", "content": resp.content})
tool_results = []
for block in resp.content:
if block.type == "tool_use":
if block.name == "submit_final_answer":
final_answer = block.input.get("answer", "").strip()
print(f" [turn {turn}] >>> submit_final_answer: {final_answer!r}")
return final_answer
print(f" [turn {turn}] tool: {block.name}({json.dumps(block.input)[:120]})")
try:
result = TOOL_FNS[block.name](block.input)
except Exception as e:
result = f"Tool error: {e}"
if len(result) > 12000:
result = result[:12000] + "\n[truncated]"
tool_results.append({"type": "tool_result", "tool_use_id": block.id, "content": result})
messages.append({"role": "user", "content": tool_results})
continue
# Reached end_turn without submitting — force a final answer
text_blocks = [b.text for b in resp.content if b.type == "text"]
partial_text = " ".join(text_blocks).strip()
print(f" [turn {turn}] end_turn without submit, forcing final answer...")
messages.append({"role": "assistant", "content": resp.content})
messages.append({"role": "user", "content": "You did not call submit_final_answer. Please call it now with your best answer in the exact format requested."})
# Loop one more time to force the tool call
continue
# Hit max turns - force one more attempt
if final_answer is None:
messages.append({"role": "user", "content": "Max turns reached. Call submit_final_answer NOW with your best guess in the right format."})
try:
resp = client.messages.create(model=MODEL, max_tokens=512, system=SYSTEM, tools=TOOLS, messages=messages, tool_choice={"type": "tool", "name": "submit_final_answer"})
for block in resp.content:
if block.type == "tool_use" and block.name == "submit_final_answer":
return block.input.get("answer", "").strip()
except Exception:
pass
return final_answer or "(no answer)"
def extract_clean_answer(question: str, agent_response: str) -> str:
"""Second-pass cleanup: extract just the answer in the exact format requested."""
if not agent_response.strip():
return agent_response
resp = client.messages.create(
model=MODEL,
max_tokens=200,
system=EXTRACTOR_SYSTEM,
messages=[{
"role": "user",
"content": f"QUESTION:\n{question}\n\nAGENT'S REASONING:\n{agent_response}\n\nNow output ONLY the final answer string (no quotes, no preamble):",
}],
)
text = "".join(b.text for b in resp.content if b.type == "text").strip()
# Strip surrounding quotes
if (text.startswith('"') and text.endswith('"')) or (text.startswith("'") and text.endswith("'")):
text = text[1:-1]
return text
def main():
with open("C:/Users/22678/Downloads/test/test/gaia_questions.json", "r", encoding="utf-8") as f:
questions = json.load(f)
only = sys.argv[1:] if len(sys.argv) > 1 else None
results = {}
out_path = "C:/Users/22678/Downloads/test/test/gaia_answers.json"
if os.path.exists(out_path):
with open(out_path, "r", encoding="utf-8") as f:
results = json.load(f)
for i, q in enumerate(questions):
tid = q["task_id"]
if only and tid not in only and str(i+1) not in only and f"Q{i+1}" not in only:
continue
if tid in results and not only:
print(f"Q{i+1} {tid[:8]} already answered, skipping")
continue
print(f"\n{'='*60}\nQ{i+1} task_id={tid[:8]} file={q.get('file_name','')}\n{'='*60}")
print(f"Q: {q['question'][:200]}")
try:
answer = solve_question(q)
print(f"\n>>> FINAL: {answer}")
results[tid] = answer
except Exception as e:
print(f"\nERROR: {e}")
traceback.print_exc()
results[tid] = f"(error: {e})"
with open(out_path, "w", encoding="utf-8") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\n\nSaved {len(results)} answers to {out_path}")
if __name__ == "__main__":
main()