| import os |
| import sys |
| import json |
| import base64 |
| import tempfile |
| import requests |
| import pandas as pd |
| import gradio as gr |
| import anthropic |
| from io import StringIO |
| from pathlib import Path |
|
|
| |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
| |
| |
| |
|
|
| def web_search(query: str) -> str: |
| """Search the web using DuckDuckGo (no API key needed).""" |
| try: |
| from duckduckgo_search import DDGS |
| with DDGS() as ddgs: |
| results = list(ddgs.text(query, max_results=6)) |
| if not results: |
| return "No results found." |
| return "\n\n".join( |
| f"Title: {r['title']}\nURL: {r['href']}\nSnippet: {r['body']}" |
| for r in results |
| ) |
| except Exception as e: |
| return f"Search error: {e}" |
|
|
|
|
| def visit_webpage(url: str) -> str: |
| """Fetch and return the text content of a webpage.""" |
| try: |
| headers = {"User-Agent": "Mozilla/5.0 (compatible; GAIABot/1.0)"} |
| resp = requests.get(url, headers=headers, timeout=15) |
| resp.raise_for_status() |
| try: |
| from bs4 import BeautifulSoup |
| soup = BeautifulSoup(resp.text, "html.parser") |
| for tag in soup(["script", "style", "nav", "footer", "header"]): |
| tag.decompose() |
| text = soup.get_text(separator=" ", strip=True) |
| except ImportError: |
| from html.parser import HTMLParser |
| class _Strip(HTMLParser): |
| def __init__(self): |
| super().__init__() |
| self._parts, self._skip = [], False |
| def handle_starttag(self, t, _): |
| if t in ("script", "style"): |
| self._skip = True |
| def handle_endtag(self, t): |
| if t in ("script", "style"): |
| self._skip = False |
| def handle_data(self, d): |
| if not self._skip: |
| self._parts.append(d) |
| p = _Strip() |
| p.feed(resp.text) |
| text = " ".join(p._parts) |
| import re |
| text = re.sub(r"\s+", " ", text).strip() |
| return text[:8000] |
| except Exception as e: |
| return f"Failed to fetch {url}: {e}" |
|
|
|
|
| def run_python(code: str) -> str: |
| """Execute Python code in a sandboxed namespace and return stdout.""" |
| buf_out, buf_err = StringIO(), StringIO() |
| old_out, old_err = sys.stdout, sys.stderr |
| sys.stdout, sys.stderr = buf_out, buf_err |
| try: |
| namespace = {"pd": pd, "__builtins__": __builtins__} |
| exec(code, namespace) |
| out = buf_out.getvalue() |
| err = buf_err.getvalue() |
| if err: |
| out += f"\n[stderr]: {err}" |
| return out.strip() or "(executed — no output)" |
| except Exception as exc: |
| return f"{type(exc).__name__}: {exc}" |
| finally: |
| sys.stdout, sys.stderr = old_out, old_err |
|
|
|
|
| def read_file_as_text(file_bytes: bytes, file_name: str) -> str: |
| """Convert various file types to a text representation.""" |
| ext = Path(file_name).suffix.lower() |
| try: |
| if ext in (".txt", ".py", ".md", ".json", ".xml", ".html", ".css", ".js"): |
| return file_bytes.decode("utf-8", errors="replace")[:6000] |
| elif ext == ".csv": |
| df = pd.read_csv(StringIO(file_bytes.decode("utf-8", errors="replace"))) |
| return df.to_string(max_rows=50) |
| elif ext in (".xlsx", ".xls"): |
| import io |
| df = pd.read_excel(io.BytesIO(file_bytes), sheet_name=None) |
| parts = [] |
| for sheet, frame in df.items(): |
| parts.append(f"=== Sheet: {sheet} ===\n{frame.to_string(max_rows=50)}") |
| return "\n\n".join(parts)[:6000] |
| elif ext == ".pdf": |
| import io |
| try: |
| import pypdf |
| reader = pypdf.PdfReader(io.BytesIO(file_bytes)) |
| return "\n".join(p.extract_text() for p in reader.pages)[:6000] |
| except ImportError: |
| return "[PDF reading requires pypdf — install with: pip install pypdf]" |
| elif ext in (".mp3", ".wav", ".m4a", ".flac"): |
| return f"[Audio file: {file_name}, {len(file_bytes):,} bytes — transcription not available without Whisper API]" |
| else: |
| |
| try: |
| return file_bytes.decode("utf-8", errors="replace")[:4000] |
| except Exception: |
| return f"[Binary file: {file_name}, {len(file_bytes):,} bytes]" |
| except Exception as e: |
| return f"Error reading file {file_name}: {e}" |
|
|
|
|
| |
| |
| |
|
|
| TOOLS = [ |
| { |
| "name": "web_search", |
| "description": ( |
| "Search the web for current information, facts, Wikipedia content, " |
| "news, etc. Returns titles, URLs, and snippets." |
| ), |
| "input_schema": { |
| "type": "object", |
| "properties": { |
| "query": {"type": "string", "description": "The search query"} |
| }, |
| "required": ["query"], |
| }, |
| }, |
| { |
| "name": "visit_webpage", |
| "description": ( |
| "Fetch the full text of a specific webpage. Use when you need more " |
| "detail than a search snippet, e.g. to read a Wikipedia article." |
| ), |
| "input_schema": { |
| "type": "object", |
| "properties": { |
| "url": {"type": "string", "description": "Full URL to fetch"} |
| }, |
| "required": ["url"], |
| }, |
| }, |
| { |
| "name": "run_python", |
| "description": ( |
| "Execute Python code. Great for arithmetic, counting, sorting, " |
| "string manipulation, or processing data. Use print() for output. " |
| "pandas (as pd) is pre-imported." |
| ), |
| "input_schema": { |
| "type": "object", |
| "properties": { |
| "code": { |
| "type": "string", |
| "description": "Python code to run. Always use print() to show results.", |
| } |
| }, |
| "required": ["code"], |
| }, |
| }, |
| ] |
|
|
| SYSTEM_PROMPT = """You are an expert research assistant solving GAIA benchmark questions. |
| These are real-world questions requiring careful research and precise answers. |
| |
| Strategy: |
| - Use web_search to find facts; follow up with visit_webpage for detail |
| - Use run_python for any calculation, counting, sorting, or data manipulation |
| - For files provided in the question, analyse them carefully |
| - Cross-check facts when accuracy is critical |
| |
| Answer format (VERY IMPORTANT): |
| - Provide ONLY the final answer — no preamble, no explanation |
| - Give exactly what is asked: a number, a name, a date, a word, a short phrase |
| - Numbers: digits only, unless units are part of the question's expected format |
| - Lists: comma-separated values unless another format is specified |
| - Yes/No questions: just "Yes" or "No" |
| |
| Think step by step, then output your final concise answer.""" |
|
|
|
|
| |
| |
| |
|
|
| class GAIAAgent: |
| """Agentic loop backed by Claude with tool use.""" |
|
|
| MAX_ITERATIONS = 15 |
|
|
| def __init__(self): |
| api_key = os.getenv("ANTHROPIC_API_KEY") |
| if not api_key: |
| raise EnvironmentError("ANTHROPIC_API_KEY environment variable not set.") |
| self.client = anthropic.Anthropic(api_key=api_key) |
| self.model = "claude-sonnet-4-20250514" |
| print(f"GAIAAgent initialised (model: {self.model})") |
|
|
| |
|
|
| def _dispatch_tool(self, name: str, inputs: dict) -> str: |
| if name == "web_search": |
| return web_search(inputs["query"]) |
| if name == "visit_webpage": |
| return visit_webpage(inputs["url"]) |
| if name == "run_python": |
| return run_python(inputs["code"]) |
| return f"[unknown tool: {name}]" |
|
|
| def _build_initial_content( |
| self, question: str, file_bytes: bytes | None, file_name: str | None |
| ) -> list: |
| """Return the content list for the first user message.""" |
| content = [] |
|
|
| if file_bytes and file_name: |
| ext = Path(file_name).suffix.lower() |
| image_exts = {".jpg", ".jpeg", ".png", ".gif", ".webp"} |
| if ext in image_exts: |
| media_map = { |
| ".jpg": "image/jpeg", ".jpeg": "image/jpeg", |
| ".png": "image/png", ".gif": "image/gif", |
| ".webp": "image/webp", |
| } |
| content.append({ |
| "type": "image", |
| "source": { |
| "type": "base64", |
| "media_type": media_map[ext], |
| "data": base64.b64encode(file_bytes).decode(), |
| }, |
| }) |
| content.append({ |
| "type": "text", |
| "text": f"The image above is the attached file '{file_name}'.\n\n{question}", |
| }) |
| else: |
| file_text = read_file_as_text(file_bytes, file_name) |
| content.append({ |
| "type": "text", |
| "text": ( |
| f"A file named '{file_name}' is attached. Its contents:\n\n" |
| f"{file_text}\n\n---\n\nQuestion: {question}" |
| ), |
| }) |
| else: |
| content.append({"type": "text", "text": question}) |
|
|
| return content |
|
|
| |
|
|
| def solve( |
| self, |
| question: str, |
| file_bytes: bytes | None = None, |
| file_name: str | None = None, |
| ) -> str: |
| print(f"\n[Agent] Question: {question[:120]}{'...' if len(question)>120 else ''}") |
| messages = [ |
| {"role": "user", "content": self._build_initial_content(question, file_bytes, file_name)} |
| ] |
|
|
| for iteration in range(self.MAX_ITERATIONS): |
| response = self.client.messages.create( |
| model=self.model, |
| max_tokens=4096, |
| system=SYSTEM_PROMPT, |
| tools=TOOLS, |
| messages=messages, |
| ) |
|
|
| if response.stop_reason == "end_turn": |
| for block in response.content: |
| if hasattr(block, "text"): |
| answer = block.text.strip() |
| print(f"[Agent] Answer: {answer[:100]}") |
| return answer |
| return "No answer generated." |
|
|
| if response.stop_reason == "tool_use": |
| tool_results = [] |
| for block in response.content: |
| if block.type == "tool_use": |
| print(f" [Tool] {block.name}({json.dumps(block.input)[:80]})") |
| result = self._dispatch_tool(block.name, block.input) |
| print(f" [Tool] → {result[:120]}") |
| tool_results.append({ |
| "type": "tool_result", |
| "tool_use_id": block.id, |
| "content": result, |
| }) |
| messages.append({"role": "assistant", "content": response.content}) |
| messages.append({"role": "user", "content": tool_results}) |
| else: |
| |
| print(f"[Agent] Unexpected stop_reason: {response.stop_reason}") |
| break |
|
|
| return "Could not determine answer within iteration limit." |
|
|
| def __call__(self, question: str) -> str: |
| """Compatibility shim for the template's agent(question) calls.""" |
| return self.solve(question) |
|
|
|
|
| |
| |
| |
|
|
| def run_and_submit_all(profile: gr.OAuthProfile | None): |
| """Fetch questions, run the agent, submit answers, display results.""" |
|
|
| space_id = os.getenv("SPACE_ID") |
|
|
| if profile: |
| username = profile.username |
| print(f"Logged in as: {username}") |
| else: |
| return "Please log in to Hugging Face first.", None |
|
|
| api_url = DEFAULT_API_URL |
| questions_url = f"{api_url}/questions" |
| submit_url = f"{api_url}/submit" |
|
|
| |
| try: |
| agent = GAIAAgent() |
| except Exception as e: |
| return f"Error initialising agent: {e}", None |
|
|
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "unknown" |
|
|
| |
| print(f"Fetching questions from {questions_url} …") |
| try: |
| resp = requests.get(questions_url, timeout=15) |
| resp.raise_for_status() |
| questions_data = resp.json() |
| if not questions_data: |
| return "Questions list is empty.", None |
| print(f"Fetched {len(questions_data)} questions.") |
| except Exception as e: |
| return f"Error fetching questions: {e}", None |
|
|
| |
| results_log = [] |
| answers_payload = [] |
|
|
| for item in questions_data: |
| task_id = item.get("task_id") |
| question_text = item.get("question") |
| file_name = item.get("file_name", "") |
|
|
| if not task_id or question_text is None: |
| print(f"Skipping malformed item: {item}") |
| continue |
|
|
| |
| file_bytes = None |
| if file_name: |
| try: |
| file_url = f"{api_url}/files/{task_id}" |
| file_resp = requests.get(file_url, timeout=30) |
| file_resp.raise_for_status() |
| file_bytes = file_resp.content |
| print(f" Downloaded '{file_name}' ({len(file_bytes):,} bytes)") |
| except Exception as e: |
| print(f" Could not download file for task {task_id}: {e}") |
|
|
| try: |
| submitted_answer = agent.solve(question_text, file_bytes, file_name) |
| except Exception as e: |
| submitted_answer = f"AGENT ERROR: {e}" |
| print(f" Agent error on {task_id}: {e}") |
|
|
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
| results_log.append({ |
| "Task ID": task_id, |
| "Question": question_text[:120], |
| "File": file_name or "—", |
| "Submitted Answer": submitted_answer, |
| }) |
|
|
| if not answers_payload: |
| return "Agent produced no answers.", pd.DataFrame(results_log) |
|
|
| |
| submission = { |
| "username": username.strip(), |
| "agent_code": agent_code, |
| "answers": answers_payload, |
| } |
| print(f"Submitting {len(answers_payload)} answers …") |
| try: |
| resp = requests.post(submit_url, json=submission, timeout=120) |
| resp.raise_for_status() |
| result = resp.json() |
| status = ( |
| f"Submission Successful!\n" |
| f"User: {result.get('username')}\n" |
| f"Score: {result.get('score', 'N/A')}% " |
| f"({result.get('correct_count', '?')}/{result.get('total_attempted', '?')} correct)\n" |
| f"Message: {result.get('message', '')}" |
| ) |
| except requests.exceptions.HTTPError as e: |
| detail = "" |
| try: |
| detail = e.response.json().get("detail", e.response.text) |
| except Exception: |
| detail = e.response.text[:500] |
| status = f"Submission failed (HTTP {e.response.status_code}): {detail}" |
| except Exception as e: |
| status = f"Submission error: {e}" |
|
|
| print(status) |
| return status, pd.DataFrame(results_log) |
|
|
|
|
| |
| |
| |
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("# GAIA Agent Evaluation Runner") |
| gr.Markdown( |
| """ |
| **Setup:** |
| 1. Set `ANTHROPIC_API_KEY` as a Space secret. |
| 2. Log in with your Hugging Face account below. |
| 3. Click **Run Evaluation** to fetch questions, run the agent, and submit. |
| |
| The agent uses Claude with web search, code execution, and file analysis. |
| """ |
| ) |
|
|
| gr.LoginButton() |
|
|
| run_btn = gr.Button("Run Evaluation & Submit All Answers", variant="primary") |
| status_box = gr.Textbox(label="Status / Result", lines=6, interactive=False) |
| results_table = gr.DataFrame(label="Questions & Answers", wrap=True) |
|
|
| run_btn.click(fn=run_and_submit_all, outputs=[status_box, results_table]) |
|
|
| if __name__ == "__main__": |
| print("\n" + "=" * 60) |
| space_host = os.getenv("SPACE_HOST") |
| space_id = os.getenv("SPACE_ID") |
| if space_host: |
| print(f"SPACE_HOST : {space_host}") |
| if space_id: |
| print(f"SPACE_ID : {space_id}") |
| if not os.getenv("ANTHROPIC_API_KEY"): |
| print("⚠️ ANTHROPIC_API_KEY is NOT set — agent will fail.") |
| else: |
| print("✅ ANTHROPIC_API_KEY found.") |
| print("=" * 60 + "\n") |
| demo.launch(debug=True, share=False) |