import os import gradio as gr import requests import pandas as pd import re import base64 import io from typing import Optional, Dict, Any import anthropic DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" class GAIAAgent: def __init__(self): print("Initializing GAIA Agent powered by Claude...") self.claude_key = os.environ.get("ANTHROPIC_API_KEY") if not self.claude_key: raise ValueError("ANTHROPIC_API_KEY not found in environment variables") self.client = anthropic.Anthropic(api_key=self.claude_key) self.api_url = DEFAULT_API_URL self.file_cache = {} self.system_prompt = """You are an expert AI assistant solving GAIA benchmark tasks with maximum accuracy. GAIA evaluation uses EXACT STRING MATCHING — your final answer format is absolutely critical. ## Step-by-step approach: 1. Read the question carefully 2. Identify the answer type: number, word, list, date, etc. 3. If a file/image/table is attached — analyze it first 4. Think step by step, show reasoning 5. Write the final answer in tags ## Special question types — handle carefully: ### Reversed/encoded text If the question text itself looks garbled or reversed (like ".rewsna eht..."), reverse it character by character to read it, then answer the actual question. Example: ".dlrow olleh" reversed = "hello world." ### Python code files Execute the logic mentally, trace through the code step by step, find the final output value. ### Excel/CSV/table data Use the data provided to compute the answer. Show your calculation. ### YouTube/video questions You cannot watch videos. Use your knowledge about the topic if possible, or state what you would need to find the answer. ### Chess positions Analyze the board from the image carefully. Think about which move is best. ### Wikipedia questions Use your training knowledge. Be precise about names, dates, counts. ## Final answer format — CRITICAL: - Always end with: YOUR ANSWER HERE - Numbers only (no units unless asked): 42 - Lists comma-separated: apple, banana, orange - Single word: photosynthesis - Follow exact format requested in the question - NO quotes, NO trailing punctuation inside the tags - If unsure, give your best guess — never leave it empty""" def fetch_file(self, task_id: str) -> Optional[Dict[str, Any]]: if task_id in self.file_cache: return self.file_cache[task_id] print(f"Fetching file for task: {task_id}") try: response = requests.get(f"{self.api_url}/files/{task_id}", timeout=15) if response.status_code != 200: print(f"No file for task {task_id}, status: {response.status_code}") return None file_content = response.content content_type = response.headers.get("Content-Type", "").lower() # Try to get filename from headers content_disp = response.headers.get("Content-Disposition", "") filename = "" if "filename=" in content_disp: filename = content_disp.split("filename=")[-1].strip().strip('"') print(f"File: type={content_type}, name={filename}, size={len(file_content)}") file_info = { "content": file_content, "content_type": content_type, "filename": filename, "size": len(file_content) } # --- Image --- if "image" in content_type or filename.lower().endswith((".png", ".jpg", ".jpeg", ".gif", ".webp")): file_info["base64"] = base64.b64encode(file_content).decode("utf-8") file_info["type"] = "image" # --- PDF --- elif "pdf" in content_type or filename.lower().endswith(".pdf"): file_info["base64"] = base64.b64encode(file_content).decode("utf-8") file_info["type"] = "pdf" # --- Excel --- elif ("spreadsheet" in content_type or "excel" in content_type or filename.lower().endswith((".xlsx", ".xls"))): file_info["type"] = "excel" file_info["text"] = self._parse_excel(file_content, filename) # --- CSV --- elif "csv" in content_type or filename.lower().endswith(".csv"): file_info["type"] = "text" for enc in ["utf-8", "latin-1", "cp1252"]: try: file_info["text"] = file_content.decode(enc) break except UnicodeDecodeError: continue else: file_info["text"] = file_content.decode("utf-8", errors="replace") # --- Audio/video — can't process, note it --- elif any(x in content_type for x in ["audio", "video"]): file_info["type"] = "media" file_info["text"] = f"[{content_type} file, {len(file_content)} bytes — cannot process directly]" # --- Try text (covers .py, .txt, .json, .md, etc.) --- else: for enc in ["utf-8", "latin-1", "cp1252"]: try: decoded = file_content.decode(enc) file_info["text"] = decoded file_info["type"] = "text" break except UnicodeDecodeError: continue else: # Binary fallback file_info["type"] = "binary" file_info["text"] = f"[Binary file, {len(file_content)} bytes]" self.file_cache[task_id] = file_info return file_info except Exception as e: print(f"Error fetching file for {task_id}: {e}") return None def _parse_excel(self, content: bytes, filename: str) -> str: """Convert Excel to readable text representation""" try: import openpyxl wb = openpyxl.load_workbook(io.BytesIO(content), data_only=True) result = [] for sheet_name in wb.sheetnames: ws = wb[sheet_name] result.append(f"=== Sheet: {sheet_name} ===") rows = [] for row in ws.iter_rows(values_only=True): if any(cell is not None for cell in row): rows.append("\t".join("" if v is None else str(v) for v in row)) result.append("\n".join(rows[:200])) # limit rows if ws.max_row > 200: result.append(f"... ({ws.max_row - 200} more rows)") return "\n\n".join(result) except ImportError: # Fallback to pandas try: df = pd.read_excel(io.BytesIO(content)) return df.to_string(max_rows=200) except Exception as e2: return f"[Could not parse Excel: {e2}]" except Exception as e: try: df = pd.read_excel(io.BytesIO(content)) return df.to_string(max_rows=200) except Exception as e2: return f"[Could not parse Excel: {e}, {e2}]" def extract_answer(self, response_text: str) -> str: # Primary: tags match = re.search(r"(.*?)", response_text, re.DOTALL | re.IGNORECASE) if match: answer = match.group(1).strip() print(f"Extracted from tags: {repr(answer)}") return answer # Fallback: "Final answer:" pattern match = re.search(r"(?:final answer|the answer is)[:\s]+(.+?)(?:\n|$)", response_text, re.IGNORECASE) if match: return match.group(1).strip().strip("\"'") # Last resort: last non-empty line lines = [l.strip() for l in response_text.strip().split("\n") if l.strip()] if lines: return lines[-1].strip("\"'.,") return response_text.strip() def __call__(self, question: str, task_id: str = None) -> str: print(f"\n{'='*60}") print(f"Task: {task_id}") print(f"Q: {question[:200]}") try: user_content = [] # Detect reversed text question and pre-reverse it reversed_hint = "" # Check if question looks reversed (many words end in common reversed patterns) if question.strip().endswith("fI") or ".rewsna" in question or question.strip().startswith("."): reversed_q = question[::-1] reversed_hint = f"\n\nNOTE: This question appears to be written in reverse. Reversed, it reads:\n\"{reversed_q}\"\nPlease answer the reversed version." user_content.append({ "type": "text", "text": f"Question: {question}{reversed_hint}" }) # Fetch and attach file file_info = self.fetch_file(task_id) if task_id else None if file_info: ftype = file_info.get("type", "unknown") ct = file_info.get("content_type", "") fname = file_info.get("filename", "") if ftype == "image": if "jpeg" in ct or "jpg" in ct or fname.lower().endswith((".jpg", ".jpeg")): media_type = "image/jpeg" elif "png" in ct or fname.lower().endswith(".png"): media_type = "image/png" elif "gif" in ct: media_type = "image/gif" elif "webp" in ct: media_type = "image/webp" else: media_type = "image/png" user_content.append({ "type": "image", "source": {"type": "base64", "media_type": media_type, "data": file_info["base64"]} }) user_content.append({"type": "text", "text": "The image above is part of this question. Analyze it carefully."}) print("Attached image") elif ftype == "pdf": user_content.append({ "type": "document", "source": {"type": "base64", "media_type": "application/pdf", "data": file_info["base64"]} }) user_content.append({"type": "text", "text": "The PDF above is part of this question. Read it carefully."}) print("Attached PDF") elif ftype in ("text", "excel") and "text" in file_info: file_text = file_info["text"] if len(file_text) > 10000: file_text = file_text[:10000] + f"\n...[truncated, total {len(file_info['text'])} chars]" label = "Excel/spreadsheet" if ftype == "excel" else "file" user_content.append({ "type": "text", "text": f"\nAttached {label} content:\n```\n{file_text}\n```" }) print(f"Attached {ftype} ({len(file_info['text'])} chars)") elif ftype == "media": user_content.append({ "type": "text", "text": f"\nNote: {file_info.get('text', 'A media file is attached but cannot be processed directly.')}" }) response = self.client.messages.create( model="claude-sonnet-4-6", system=self.system_prompt, messages=[{"role": "user", "content": user_content}], temperature=0, max_tokens=4096 ) if not response.content or len(response.content) == 0: print("ERROR: Empty response") return "ERROR: empty response" first_block = response.content[0] raw_answer = first_block.text.strip() if hasattr(first_block, "text") else "" if not raw_answer: print("ERROR: Empty text in response") return "ERROR: empty text" print(f"Raw ({len(raw_answer)} chars): {raw_answer[:400]}") final = self.extract_answer(raw_answer) print(f"Final: {repr(final)}") return final except anthropic.APIError as e: print(f"API error: {e}") return f"API_ERROR: {str(e)[:100]}" except Exception as e: print(f"Error task {task_id}: {e}") import traceback traceback.print_exc() return f"ERROR: {str(e)[:100]}" class BasicAgent(GAIAAgent): pass def run_and_submit_all(profile: gr.OAuthProfile | None): space_id = os.getenv("SPACE_ID") if profile: username = f"{profile.username}" print(f"User logged in: {username}") else: return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" try: agent = BasicAgent() except Exception as e: return f"Error initializing agent: {e}", None agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: return "Fetched questions list is empty.", None print(f"Fetched {len(questions_data)} questions.") except Exception as e: return f"Error fetching questions: {e}", None results_log = [] answers_payload = [] for item in questions_data: task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: continue try: submitted_answer = agent(question_text, task_id) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({ "Task ID": task_id, "Question": question_text[:100], "Submitted Answer": submitted_answer }) except Exception as e: print(f"Error on task {task_id}: {e}") results_log.append({ "Task ID": task_id, "Question": question_text[:100], "Submitted Answer": f"AGENT ERROR: {e}" }) if not answers_payload: return "Agent did not produce any answers.", pd.DataFrame(results_log) submission_data = { "username": username.strip(), "agent_code": agent_code, "answers": answers_payload } try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) return final_status, pd.DataFrame(results_log) except requests.exceptions.HTTPError as e: error_detail = f"Status {e.response.status_code}." try: error_detail += f" {e.response.json().get('detail', '')}" except Exception: error_detail += f" {e.response.text[:200]}" return f"Submission Failed: {error_detail}", pd.DataFrame(results_log) except Exception as e: return f"Submission Failed: {e}", pd.DataFrame(results_log) with gr.Blocks() as demo: gr.Markdown("# GAIA Benchmark Agent Evaluation") gr.Markdown("1. Log in to Hugging Face.\n2. Click **Run Evaluation & Submit All Answers**.") gr.LoginButton() run_button = gr.Button("Run Evaluation & Submit All Answers") status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) if __name__ == "__main__": print("Launching Gradio Interface for GAIA Agent Evaluation...") demo.launch(debug=True, share=False)