| import os |
| import gradio as gr |
| import requests |
| import pandas as pd |
| import re |
| import base64 |
| import io |
| from typing import Optional, Dict, Any |
| import anthropic |
|
|
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
| class GAIAAgent: |
| def __init__(self): |
| print("Initializing GAIA Agent powered by Claude...") |
| self.claude_key = os.environ.get("ANTHROPIC_API_KEY") |
| if not self.claude_key: |
| raise ValueError("ANTHROPIC_API_KEY not found in environment variables") |
|
|
| self.client = anthropic.Anthropic(api_key=self.claude_key) |
| self.api_url = DEFAULT_API_URL |
| self.file_cache = {} |
|
|
| self.system_prompt = """You are an expert AI assistant solving GAIA benchmark tasks with maximum accuracy. |
| |
| GAIA evaluation uses EXACT STRING MATCHING — your final answer format is absolutely critical. |
| |
| ## Step-by-step approach: |
| 1. Read the question carefully |
| 2. Identify the answer type: number, word, list, date, etc. |
| 3. If a file/image/table is attached — analyze it first |
| 4. Think step by step, show reasoning |
| 5. Write the final answer in <answer> tags |
| |
| ## Special question types — handle carefully: |
| |
| ### Reversed/encoded text |
| If the question text itself looks garbled or reversed (like ".rewsna eht..."), |
| reverse it character by character to read it, then answer the actual question. |
| Example: ".dlrow olleh" reversed = "hello world." |
| |
| ### Python code files |
| Execute the logic mentally, trace through the code step by step, find the final output value. |
| |
| ### Excel/CSV/table data |
| Use the data provided to compute the answer. Show your calculation. |
| |
| ### YouTube/video questions |
| You cannot watch videos. Use your knowledge about the topic if possible, |
| or state what you would need to find the answer. |
| |
| ### Chess positions |
| Analyze the board from the image carefully. Think about which move is best. |
| |
| ### Wikipedia questions |
| Use your training knowledge. Be precise about names, dates, counts. |
| |
| ## Final answer format — CRITICAL: |
| - Always end with: <answer>YOUR ANSWER HERE</answer> |
| - Numbers only (no units unless asked): <answer>42</answer> |
| - Lists comma-separated: <answer>apple, banana, orange</answer> |
| - Single word: <answer>photosynthesis</answer> |
| - Follow exact format requested in the question |
| - NO quotes, NO trailing punctuation inside the tags |
| - If unsure, give your best guess — never leave it empty""" |
|
|
| def fetch_file(self, task_id: str) -> Optional[Dict[str, Any]]: |
| if task_id in self.file_cache: |
| return self.file_cache[task_id] |
|
|
| print(f"Fetching file for task: {task_id}") |
| try: |
| response = requests.get(f"{self.api_url}/files/{task_id}", timeout=15) |
|
|
| if response.status_code != 200: |
| print(f"No file for task {task_id}, status: {response.status_code}") |
| return None |
|
|
| file_content = response.content |
| content_type = response.headers.get("Content-Type", "").lower() |
| |
| content_disp = response.headers.get("Content-Disposition", "") |
| filename = "" |
| if "filename=" in content_disp: |
| filename = content_disp.split("filename=")[-1].strip().strip('"') |
| print(f"File: type={content_type}, name={filename}, size={len(file_content)}") |
|
|
| file_info = { |
| "content": file_content, |
| "content_type": content_type, |
| "filename": filename, |
| "size": len(file_content) |
| } |
|
|
| |
| if "image" in content_type or filename.lower().endswith((".png", ".jpg", ".jpeg", ".gif", ".webp")): |
| file_info["base64"] = base64.b64encode(file_content).decode("utf-8") |
| file_info["type"] = "image" |
|
|
| |
| elif "pdf" in content_type or filename.lower().endswith(".pdf"): |
| file_info["base64"] = base64.b64encode(file_content).decode("utf-8") |
| file_info["type"] = "pdf" |
|
|
| |
| elif ("spreadsheet" in content_type or "excel" in content_type |
| or filename.lower().endswith((".xlsx", ".xls"))): |
| file_info["type"] = "excel" |
| file_info["text"] = self._parse_excel(file_content, filename) |
|
|
| |
| elif "csv" in content_type or filename.lower().endswith(".csv"): |
| file_info["type"] = "text" |
| for enc in ["utf-8", "latin-1", "cp1252"]: |
| try: |
| file_info["text"] = file_content.decode(enc) |
| break |
| except UnicodeDecodeError: |
| continue |
| else: |
| file_info["text"] = file_content.decode("utf-8", errors="replace") |
|
|
| |
| elif any(x in content_type for x in ["audio", "video"]): |
| file_info["type"] = "media" |
| file_info["text"] = f"[{content_type} file, {len(file_content)} bytes — cannot process directly]" |
|
|
| |
| else: |
| for enc in ["utf-8", "latin-1", "cp1252"]: |
| try: |
| decoded = file_content.decode(enc) |
| file_info["text"] = decoded |
| file_info["type"] = "text" |
| break |
| except UnicodeDecodeError: |
| continue |
| else: |
| |
| file_info["type"] = "binary" |
| file_info["text"] = f"[Binary file, {len(file_content)} bytes]" |
|
|
| self.file_cache[task_id] = file_info |
| return file_info |
|
|
| except Exception as e: |
| print(f"Error fetching file for {task_id}: {e}") |
| return None |
|
|
| def _parse_excel(self, content: bytes, filename: str) -> str: |
| """Convert Excel to readable text representation""" |
| try: |
| import openpyxl |
| wb = openpyxl.load_workbook(io.BytesIO(content), data_only=True) |
| result = [] |
| for sheet_name in wb.sheetnames: |
| ws = wb[sheet_name] |
| result.append(f"=== Sheet: {sheet_name} ===") |
| rows = [] |
| for row in ws.iter_rows(values_only=True): |
| if any(cell is not None for cell in row): |
| rows.append("\t".join("" if v is None else str(v) for v in row)) |
| result.append("\n".join(rows[:200])) |
| if ws.max_row > 200: |
| result.append(f"... ({ws.max_row - 200} more rows)") |
| return "\n\n".join(result) |
| except ImportError: |
| |
| try: |
| df = pd.read_excel(io.BytesIO(content)) |
| return df.to_string(max_rows=200) |
| except Exception as e2: |
| return f"[Could not parse Excel: {e2}]" |
| except Exception as e: |
| try: |
| df = pd.read_excel(io.BytesIO(content)) |
| return df.to_string(max_rows=200) |
| except Exception as e2: |
| return f"[Could not parse Excel: {e}, {e2}]" |
|
|
| def extract_answer(self, response_text: str) -> str: |
| |
| match = re.search(r"<answer>(.*?)</answer>", response_text, re.DOTALL | re.IGNORECASE) |
| if match: |
| answer = match.group(1).strip() |
| print(f"Extracted from tags: {repr(answer)}") |
| return answer |
|
|
| |
| match = re.search(r"(?:final answer|the answer is)[:\s]+(.+?)(?:\n|$)", response_text, re.IGNORECASE) |
| if match: |
| return match.group(1).strip().strip("\"'") |
|
|
| |
| lines = [l.strip() for l in response_text.strip().split("\n") if l.strip()] |
| if lines: |
| return lines[-1].strip("\"'.,") |
|
|
| return response_text.strip() |
|
|
| def __call__(self, question: str, task_id: str = None) -> str: |
| print(f"\n{'='*60}") |
| print(f"Task: {task_id}") |
| print(f"Q: {question[:200]}") |
|
|
| try: |
| user_content = [] |
|
|
| |
| reversed_hint = "" |
| |
| if question.strip().endswith("fI") or ".rewsna" in question or question.strip().startswith("."): |
| reversed_q = question[::-1] |
| reversed_hint = f"\n\nNOTE: This question appears to be written in reverse. Reversed, it reads:\n\"{reversed_q}\"\nPlease answer the reversed version." |
|
|
| user_content.append({ |
| "type": "text", |
| "text": f"Question: {question}{reversed_hint}" |
| }) |
|
|
| |
| file_info = self.fetch_file(task_id) if task_id else None |
|
|
| if file_info: |
| ftype = file_info.get("type", "unknown") |
| ct = file_info.get("content_type", "") |
| fname = file_info.get("filename", "") |
|
|
| if ftype == "image": |
| if "jpeg" in ct or "jpg" in ct or fname.lower().endswith((".jpg", ".jpeg")): |
| media_type = "image/jpeg" |
| elif "png" in ct or fname.lower().endswith(".png"): |
| media_type = "image/png" |
| elif "gif" in ct: |
| media_type = "image/gif" |
| elif "webp" in ct: |
| media_type = "image/webp" |
| else: |
| media_type = "image/png" |
| user_content.append({ |
| "type": "image", |
| "source": {"type": "base64", "media_type": media_type, "data": file_info["base64"]} |
| }) |
| user_content.append({"type": "text", "text": "The image above is part of this question. Analyze it carefully."}) |
| print("Attached image") |
|
|
| elif ftype == "pdf": |
| user_content.append({ |
| "type": "document", |
| "source": {"type": "base64", "media_type": "application/pdf", "data": file_info["base64"]} |
| }) |
| user_content.append({"type": "text", "text": "The PDF above is part of this question. Read it carefully."}) |
| print("Attached PDF") |
|
|
| elif ftype in ("text", "excel") and "text" in file_info: |
| file_text = file_info["text"] |
| if len(file_text) > 10000: |
| file_text = file_text[:10000] + f"\n...[truncated, total {len(file_info['text'])} chars]" |
| label = "Excel/spreadsheet" if ftype == "excel" else "file" |
| user_content.append({ |
| "type": "text", |
| "text": f"\nAttached {label} content:\n```\n{file_text}\n```" |
| }) |
| print(f"Attached {ftype} ({len(file_info['text'])} chars)") |
|
|
| elif ftype == "media": |
| user_content.append({ |
| "type": "text", |
| "text": f"\nNote: {file_info.get('text', 'A media file is attached but cannot be processed directly.')}" |
| }) |
|
|
| response = self.client.messages.create( |
| model="claude-sonnet-4-6", |
| system=self.system_prompt, |
| messages=[{"role": "user", "content": user_content}], |
| temperature=0, |
| max_tokens=4096 |
| ) |
|
|
| if not response.content or len(response.content) == 0: |
| print("ERROR: Empty response") |
| return "ERROR: empty response" |
|
|
| first_block = response.content[0] |
| raw_answer = first_block.text.strip() if hasattr(first_block, "text") else "" |
|
|
| if not raw_answer: |
| print("ERROR: Empty text in response") |
| return "ERROR: empty text" |
|
|
| print(f"Raw ({len(raw_answer)} chars): {raw_answer[:400]}") |
| final = self.extract_answer(raw_answer) |
| print(f"Final: {repr(final)}") |
| return final |
|
|
| except anthropic.APIError as e: |
| print(f"API error: {e}") |
| return f"API_ERROR: {str(e)[:100]}" |
| except Exception as e: |
| print(f"Error task {task_id}: {e}") |
| import traceback |
| traceback.print_exc() |
| return f"ERROR: {str(e)[:100]}" |
|
|
|
|
| class BasicAgent(GAIAAgent): |
| pass |
|
|
|
|
| def run_and_submit_all(profile: gr.OAuthProfile | None): |
| space_id = os.getenv("SPACE_ID") |
|
|
| if profile: |
| username = f"{profile.username}" |
| print(f"User logged in: {username}") |
| else: |
| return "Please Login to Hugging Face with the button.", None |
|
|
| api_url = DEFAULT_API_URL |
| questions_url = f"{api_url}/questions" |
| submit_url = f"{api_url}/submit" |
|
|
| try: |
| agent = BasicAgent() |
| except Exception as e: |
| return f"Error initializing agent: {e}", None |
|
|
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
|
| try: |
| response = requests.get(questions_url, timeout=15) |
| response.raise_for_status() |
| questions_data = response.json() |
| if not questions_data: |
| return "Fetched questions list is empty.", None |
| print(f"Fetched {len(questions_data)} questions.") |
| except Exception as e: |
| return f"Error fetching questions: {e}", None |
|
|
| results_log = [] |
| answers_payload = [] |
|
|
| for item in questions_data: |
| task_id = item.get("task_id") |
| question_text = item.get("question") |
| if not task_id or question_text is None: |
| continue |
| try: |
| submitted_answer = agent(question_text, task_id) |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
| results_log.append({ |
| "Task ID": task_id, |
| "Question": question_text[:100], |
| "Submitted Answer": submitted_answer |
| }) |
| except Exception as e: |
| print(f"Error on task {task_id}: {e}") |
| results_log.append({ |
| "Task ID": task_id, |
| "Question": question_text[:100], |
| "Submitted Answer": f"AGENT ERROR: {e}" |
| }) |
|
|
| if not answers_payload: |
| return "Agent did not produce any answers.", pd.DataFrame(results_log) |
|
|
| submission_data = { |
| "username": username.strip(), |
| "agent_code": agent_code, |
| "answers": answers_payload |
| } |
|
|
| try: |
| response = requests.post(submit_url, json=submission_data, timeout=60) |
| response.raise_for_status() |
| result_data = response.json() |
| final_status = ( |
| f"Submission Successful!\n" |
| f"User: {result_data.get('username')}\n" |
| f"Overall Score: {result_data.get('score', 'N/A')}% " |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
| f"Message: {result_data.get('message', 'No message received.')}" |
| ) |
| return final_status, pd.DataFrame(results_log) |
| except requests.exceptions.HTTPError as e: |
| error_detail = f"Status {e.response.status_code}." |
| try: |
| error_detail += f" {e.response.json().get('detail', '')}" |
| except Exception: |
| error_detail += f" {e.response.text[:200]}" |
| return f"Submission Failed: {error_detail}", pd.DataFrame(results_log) |
| except Exception as e: |
| return f"Submission Failed: {e}", pd.DataFrame(results_log) |
|
|
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("# GAIA Benchmark Agent Evaluation") |
| gr.Markdown("1. Log in to Hugging Face.\n2. Click **Run Evaluation & Submit All Answers**.") |
| gr.LoginButton() |
| run_button = gr.Button("Run Evaluation & Submit All Answers") |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
| run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) |
|
|
| if __name__ == "__main__": |
| print("Launching Gradio Interface for GAIA Agent Evaluation...") |
| demo.launch(debug=True, share=False) |