Spaces:
Runtime error
Runtime error
| """ | |
| GAIA Final Challenge agent for the HF AI Agents course. | |
| Uses Claude Haiku 4.5 with tool use. | |
| """ | |
| import os, sys, json, subprocess, tempfile, traceback, base64, mimetypes | |
| sys.stdout.reconfigure(encoding='utf-8') | |
| import requests | |
| import anthropic | |
| API_BASE = "https://agents-course-unit4-scoring.hf.space" | |
| MODEL = "claude-haiku-4-5" | |
| MAX_TURNS = 12 | |
| WORK_DIR = "C:/Users/22678/Downloads/test/test/gaia_work" | |
| os.makedirs(WORK_DIR, exist_ok=True) | |
| client = anthropic.Anthropic() | |
| # ---------- TOOLS ---------- | |
| def tool_wikipedia_search(query: str) -> str: | |
| """Search English Wikipedia and return top result extracts (summary text).""" | |
| try: | |
| r = requests.get( | |
| "https://en.wikipedia.org/w/api.php", | |
| params={ | |
| "action": "query", | |
| "list": "search", | |
| "srsearch": query, | |
| "format": "json", | |
| "srlimit": 5, | |
| }, | |
| timeout=20, | |
| headers={"User-Agent": "gaia-agent/0.1 (course exercise)"}, | |
| ) | |
| results = r.json().get("query", {}).get("search", []) | |
| if not results: | |
| return f"No results for '{query}'." | |
| out = [f"Top {len(results)} Wikipedia hits for '{query}':"] | |
| for hit in results: | |
| title = hit["title"] | |
| snippet = hit.get("snippet", "").replace('<span class="searchmatch">', '**').replace("</span>", "**") | |
| url = f"https://en.wikipedia.org/wiki/{title.replace(' ', '_')}" | |
| out.append(f"\n- **{title}** — {url}\n {snippet}") | |
| return "\n".join(out) | |
| except Exception as e: | |
| return f"Error: {e}" | |
| def tool_fetch_url(url: str, max_chars: int = 8000) -> str: | |
| """Fetch a URL and return its text content (stripped of HTML).""" | |
| try: | |
| r = requests.get(url, timeout=30, headers={"User-Agent": "Mozilla/5.0 gaia-agent"}) | |
| ct = r.headers.get("content-type", "") | |
| if "html" in ct or url.endswith(".html") or "wikipedia.org" in url: | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(r.text, "html.parser") | |
| for s in soup(["script", "style", "nav", "footer"]): | |
| s.decompose() | |
| text = soup.get_text(separator="\n") | |
| text = "\n".join(line.strip() for line in text.splitlines() if line.strip()) | |
| else: | |
| text = r.text | |
| if len(text) > max_chars: | |
| text = text[:max_chars] + f"\n[...truncated {len(text)-max_chars} chars]" | |
| return text | |
| except Exception as e: | |
| return f"Error fetching {url}: {e}" | |
| def tool_download_task_file(task_id: str) -> str: | |
| """Download the file attached to a GAIA task. Returns local file path.""" | |
| try: | |
| r = requests.get(f"{API_BASE}/files/{task_id}", timeout=60) | |
| r.raise_for_status() | |
| # Try to get filename from header | |
| cd = r.headers.get("content-disposition", "") | |
| fname = task_id | |
| if "filename=" in cd: | |
| fname = cd.split("filename=")[1].strip('"; ') | |
| local = os.path.join(WORK_DIR, fname) | |
| with open(local, "wb") as f: | |
| f.write(r.content) | |
| return f"Downloaded to {local} ({len(r.content)} bytes)" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| def tool_run_python(code: str, working_file: str = "") -> str: | |
| """Execute Python code. If working_file points to a .py file, just run that file.""" | |
| try: | |
| if working_file and working_file.endswith(".py"): | |
| r = subprocess.run( | |
| [sys.executable, working_file], | |
| capture_output=True, text=True, timeout=60, cwd=WORK_DIR, | |
| ) | |
| return f"stdout:\n{r.stdout}\n\nstderr:\n{r.stderr}\nreturncode={r.returncode}" | |
| with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False, encoding="utf-8") as f: | |
| f.write(code) | |
| tmp = f.name | |
| try: | |
| r = subprocess.run( | |
| [sys.executable, tmp], | |
| capture_output=True, text=True, timeout=60, cwd=WORK_DIR, | |
| ) | |
| return f"stdout:\n{r.stdout}\n\nstderr:\n{r.stderr}\nreturncode={r.returncode}" | |
| finally: | |
| os.unlink(tmp) | |
| except subprocess.TimeoutExpired: | |
| return "Error: Timed out after 60s" | |
| except Exception as e: | |
| return f"Error: {e}\n{traceback.format_exc()}" | |
| def tool_youtube_transcript(video_url: str) -> str: | |
| """Try to fetch YouTube transcript.""" | |
| try: | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| vid = video_url.split("v=")[1].split("&")[0] | |
| transcript = YouTubeTranscriptApi.get_transcript(vid) | |
| return "\n".join(f"[{t['start']:.1f}s] {t['text']}" for t in transcript) | |
| except Exception as e: | |
| return f"Error: {e}" | |
| TOOLS = [ | |
| { | |
| "name": "wikipedia_search", | |
| "description": "Search English Wikipedia and get top 5 results with snippets and URLs. Use this FIRST for any factual question.", | |
| "input_schema": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]}, | |
| }, | |
| { | |
| "name": "fetch_url", | |
| "description": "Fetch a URL (usually a Wikipedia page) and return its cleaned text content.", | |
| "input_schema": {"type": "object", "properties": {"url": {"type": "string"}}, "required": ["url"]}, | |
| }, | |
| { | |
| "name": "download_task_file", | |
| "description": "Download the file attached to the current GAIA task. Returns the local file path.", | |
| "input_schema": {"type": "object", "properties": {"task_id": {"type": "string"}}, "required": ["task_id"]}, | |
| }, | |
| { | |
| "name": "run_python", | |
| "description": "Execute Python code OR run an existing .py file. For .xlsx parsing, use pandas. For .py files just pass working_file=<path>.", | |
| "input_schema": { | |
| "type": "object", | |
| "properties": { | |
| "code": {"type": "string", "description": "Python code to run (ignored if working_file is set)"}, | |
| "working_file": {"type": "string", "description": "Path to a .py file to execute directly"}, | |
| }, | |
| "required": ["code"], | |
| }, | |
| }, | |
| { | |
| "name": "youtube_transcript", | |
| "description": "Fetch transcript of a YouTube video by URL.", | |
| "input_schema": {"type": "object", "properties": {"video_url": {"type": "string"}}, "required": ["video_url"]}, | |
| }, | |
| { | |
| "name": "submit_final_answer", | |
| "description": "Submit your final answer. The `answer` string will be scored via exact match - no preamble, no explanation. Call this exactly once at the end.", | |
| "input_schema": {"type": "object", "properties": {"answer": {"type": "string", "description": "The final answer string, formatted exactly as the question requests"}}, "required": ["answer"]}, | |
| }, | |
| ] | |
| TOOL_FNS = { | |
| "wikipedia_search": lambda i: tool_wikipedia_search(i["query"]), | |
| "fetch_url": lambda i: tool_fetch_url(i["url"]), | |
| "download_task_file": lambda i: tool_download_task_file(i["task_id"]), | |
| "run_python": lambda i: tool_run_python(i.get("code", ""), i.get("working_file", "")), | |
| "youtube_transcript": lambda i: tool_youtube_transcript(i["video_url"]), | |
| } | |
| SYSTEM = """You are a research agent solving GAIA benchmark questions for an EXACT-MATCH scoring system. | |
| CRITICAL: You MUST end every task by calling the `submit_final_answer` tool with the clean answer string. | |
| The `answer` argument is what gets scored - no preamble, no explanation, exact format only. | |
| Workflow: | |
| 1. For ANY factual / lookup question (people, dates, statistics, geography, articles, history, sports, etc.): | |
| ALWAYS call wikipedia_search FIRST. Do not answer from memory - your memory is often wrong on specifics. | |
| Then call fetch_url on the most relevant Wikipedia URL to read details. | |
| 2. For attached file questions: call download_task_file. If it returns "No file path associated", | |
| the file is permanently unavailable - just guess in the right format. | |
| 3. For pure reasoning (math, logic, reversed text, group theory): you may answer directly, but use run_python to verify. | |
| 4. For YouTube questions: try youtube_transcript with the URL. | |
| Format rules (CRITICAL for exact-match): | |
| - "comma-separated list, alphabetical order" → "apple, banana, cherry" (lowercase, space after comma) | |
| - "first name only" → just one word like "Sarah" | |
| - "IOC country code" → 3 uppercase letters like "USA" | |
| - "USD with two decimal places" → "1234.56" (no $ sign unless asked) | |
| - "just the city name without abbreviations" → "Boston" (full name, no state) | |
| - "last names only, in Roman characters" → "Smith, Jones" | |
| - Numeric → bare number, no unit unless requested | |
| - Never include "FINAL ANSWER:" or quotes | |
| - If you can't determine the answer, still submit your best guess in the correct format | |
| You can use up to 10 tool calls. Then you MUST call submit_final_answer.""" | |
| def solve_question(q: dict) -> str: | |
| """Run agent loop for a single question, return final answer string.""" | |
| task_id = q["task_id"] | |
| question = q["question"] | |
| file_name = q.get("file_name", "") | |
| user_content = f"task_id: {task_id}\n\nQuestion:\n{question}" | |
| if file_name: | |
| user_content += f"\n\nAttached file: {file_name} (call download_task_file with the task_id above to get it)" | |
| # For chess image (Q4), include image in initial message | |
| image_content = None | |
| if file_name.lower().endswith((".png", ".jpg", ".jpeg")): | |
| # Download the image first | |
| tool_download_task_file(task_id) | |
| local_img = os.path.join(WORK_DIR, file_name) | |
| if os.path.exists(local_img): | |
| with open(local_img, "rb") as f: | |
| img_data = base64.standard_b64encode(f.read()).decode("utf-8") | |
| media_type = mimetypes.guess_type(local_img)[0] or "image/png" | |
| image_content = {"type": "image", "source": {"type": "base64", "media_type": media_type, "data": img_data}} | |
| messages = [{"role": "user", "content": ([image_content, {"type": "text", "text": user_content}] if image_content else user_content)}] | |
| final_answer = None | |
| for turn in range(MAX_TURNS): | |
| resp = client.messages.create( | |
| model=MODEL, | |
| max_tokens=4096, | |
| system=SYSTEM, | |
| tools=TOOLS, | |
| messages=messages, | |
| ) | |
| if resp.stop_reason == "tool_use": | |
| messages.append({"role": "assistant", "content": resp.content}) | |
| tool_results = [] | |
| for block in resp.content: | |
| if block.type == "tool_use": | |
| if block.name == "submit_final_answer": | |
| final_answer = block.input.get("answer", "").strip() | |
| print(f" [turn {turn}] >>> submit_final_answer: {final_answer!r}") | |
| return final_answer | |
| print(f" [turn {turn}] tool: {block.name}({json.dumps(block.input)[:120]})") | |
| try: | |
| result = TOOL_FNS[block.name](block.input) | |
| except Exception as e: | |
| result = f"Tool error: {e}" | |
| if len(result) > 12000: | |
| result = result[:12000] + "\n[truncated]" | |
| tool_results.append({"type": "tool_result", "tool_use_id": block.id, "content": result}) | |
| messages.append({"role": "user", "content": tool_results}) | |
| continue | |
| # Reached end_turn without submitting — force a final answer | |
| text_blocks = [b.text for b in resp.content if b.type == "text"] | |
| partial_text = " ".join(text_blocks).strip() | |
| print(f" [turn {turn}] end_turn without submit, forcing final answer...") | |
| messages.append({"role": "assistant", "content": resp.content}) | |
| messages.append({"role": "user", "content": "You did not call submit_final_answer. Please call it now with your best answer in the exact format requested."}) | |
| # Loop one more time to force the tool call | |
| continue | |
| # Hit max turns - force one more attempt | |
| if final_answer is None: | |
| messages.append({"role": "user", "content": "Max turns reached. Call submit_final_answer NOW with your best guess in the right format."}) | |
| try: | |
| resp = client.messages.create(model=MODEL, max_tokens=512, system=SYSTEM, tools=TOOLS, messages=messages, tool_choice={"type": "tool", "name": "submit_final_answer"}) | |
| for block in resp.content: | |
| if block.type == "tool_use" and block.name == "submit_final_answer": | |
| return block.input.get("answer", "").strip() | |
| except Exception: | |
| pass | |
| return final_answer or "(no answer)" | |
| def extract_clean_answer(question: str, agent_response: str) -> str: | |
| """Second-pass cleanup: extract just the answer in the exact format requested.""" | |
| if not agent_response.strip(): | |
| return agent_response | |
| resp = client.messages.create( | |
| model=MODEL, | |
| max_tokens=200, | |
| system=EXTRACTOR_SYSTEM, | |
| messages=[{ | |
| "role": "user", | |
| "content": f"QUESTION:\n{question}\n\nAGENT'S REASONING:\n{agent_response}\n\nNow output ONLY the final answer string (no quotes, no preamble):", | |
| }], | |
| ) | |
| text = "".join(b.text for b in resp.content if b.type == "text").strip() | |
| # Strip surrounding quotes | |
| if (text.startswith('"') and text.endswith('"')) or (text.startswith("'") and text.endswith("'")): | |
| text = text[1:-1] | |
| return text | |
| def main(): | |
| with open("C:/Users/22678/Downloads/test/test/gaia_questions.json", "r", encoding="utf-8") as f: | |
| questions = json.load(f) | |
| only = sys.argv[1:] if len(sys.argv) > 1 else None | |
| results = {} | |
| out_path = "C:/Users/22678/Downloads/test/test/gaia_answers.json" | |
| if os.path.exists(out_path): | |
| with open(out_path, "r", encoding="utf-8") as f: | |
| results = json.load(f) | |
| for i, q in enumerate(questions): | |
| tid = q["task_id"] | |
| if only and tid not in only and str(i+1) not in only and f"Q{i+1}" not in only: | |
| continue | |
| if tid in results and not only: | |
| print(f"Q{i+1} {tid[:8]} already answered, skipping") | |
| continue | |
| print(f"\n{'='*60}\nQ{i+1} task_id={tid[:8]} file={q.get('file_name','')}\n{'='*60}") | |
| print(f"Q: {q['question'][:200]}") | |
| try: | |
| answer = solve_question(q) | |
| print(f"\n>>> FINAL: {answer}") | |
| results[tid] = answer | |
| except Exception as e: | |
| print(f"\nERROR: {e}") | |
| traceback.print_exc() | |
| results[tid] = f"(error: {e})" | |
| with open(out_path, "w", encoding="utf-8") as f: | |
| json.dump(results, f, indent=2, ensure_ascii=False) | |
| print(f"\n\nSaved {len(results)} answers to {out_path}") | |
| if __name__ == "__main__": | |
| main() | |