| import os |
| import re |
| import json |
| import tempfile |
| from pathlib import Path |
|
|
| import gradio as gr |
| import requests |
| import pandas as pd |
|
|
| from smolagents import CodeAgent, DuckDuckGoSearchTool, VisitWebpageTool, tool |
| from smolagents.models import InferenceClientModel |
|
|
|
|
| |
| |
| |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
| |
| |
| |
|
|
| @tool |
| def download_task_file(task_id: str) -> str: |
| """ |
| Download the file attached to a GAIA task and return the local file path. |
| Use this when the question references an attached file/document/image/data file. |
| Args: |
| task_id: The task id of the GAIA question. |
| Returns: |
| Local file path of the downloaded file, or a message if no file is available. |
| """ |
| api_url = os.getenv("SCORING_API_URL", DEFAULT_API_URL) |
| file_url = f"{api_url}/files/{task_id}" |
|
|
| try: |
| response = requests.get(file_url, timeout=60) |
| if response.status_code != 200: |
| return f"No downloadable file found for task {task_id}. HTTP {response.status_code}" |
|
|
| content_type = response.headers.get("content-type", "").lower() |
|
|
| |
| ext = "" |
| if "pdf" in content_type: |
| ext = ".pdf" |
| elif "json" in content_type: |
| ext = ".json" |
| elif "csv" in content_type: |
| ext = ".csv" |
| elif "text" in content_type: |
| ext = ".txt" |
| elif "html" in content_type: |
| ext = ".html" |
| elif "png" in content_type: |
| ext = ".png" |
| elif "jpeg" in content_type or "jpg" in content_type: |
| ext = ".jpg" |
| elif "excel" in content_type or "spreadsheet" in content_type: |
| ext = ".xlsx" |
|
|
| tmp_dir = tempfile.mkdtemp(prefix="gaia_task_") |
| file_path = os.path.join(tmp_dir, f"{task_id}{ext}") |
|
|
| with open(file_path, "wb") as f: |
| f.write(response.content) |
|
|
| return file_path |
| except Exception as e: |
| return f"Error downloading file for task {task_id}: {e}" |
|
|
|
|
| @tool |
| def read_local_text_file(file_path: str) -> str: |
| """ |
| Read a local text-like file and return its contents. |
| Use this only for local TXT/JSON/CSV/HTML-like files after downloading them. |
| Args: |
| file_path: Path to a local file. |
| Returns: |
| File contents as text. |
| """ |
| try: |
| path = Path(file_path) |
| if not path.exists(): |
| return f"File not found: {file_path}" |
|
|
| |
| try: |
| return path.read_text(encoding="utf-8") |
| except Exception: |
| return path.read_text(errors="ignore") |
| except Exception as e: |
| return f"Error reading file {file_path}: {e}" |
|
|
|
|
| |
| |
| |
|
|
| SYSTEM_PROMPT = """ |
| You are solving a GAIA benchmark question. |
| |
| Rules: |
| 1. Think carefully and use tools when needed. |
| 2. If the question mentions an attached file, download it using the download_task_file tool. |
| 3. If a downloaded file is text/csv/json/html-like, inspect it with read_local_text_file. |
| 4. If web information is needed, use the search/browser tools. |
| 5. Return ONLY the final answer. |
| 6. Do NOT return explanations. |
| 7. Do NOT return the words "FINAL ANSWER". |
| 8. Do NOT add markdown, bullet points, or surrounding quotes unless the answer itself requires quotes. |
| 9. Keep the answer as short and exact as possible. |
| """ |
|
|
| class BasicAgent: |
| def __init__(self): |
| |
| |
| model_id = os.getenv("MODEL_ID", "Qwen/Qwen2.5-72B-Instruct") |
|
|
| self.model = InferenceClientModel( |
| model_id=model_id, |
| token=os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN"), |
| ) |
|
|
| self.agent = CodeAgent( |
| tools=[ |
| DuckDuckGoSearchTool(), |
| VisitWebpageTool(), |
| download_task_file, |
| read_local_text_file, |
| ], |
| model=self.model, |
| additional_authorized_imports=[ |
| "json", |
| "re", |
| "math", |
| "statistics", |
| "csv", |
| "pandas", |
| "pathlib", |
| ], |
| max_steps=12, |
| verbosity_level=1, |
| ) |
|
|
| print(f"BasicAgent initialized with model: {model_id}") |
|
|
| def clean_final_answer(self, answer: str) -> str: |
| """ |
| Clean the model output for exact-match scoring. |
| """ |
| if answer is None: |
| return "" |
|
|
| answer = str(answer).strip() |
|
|
| |
| answer = re.sub(r"^\s*FINAL ANSWER\s*[:\-]?\s*", "", answer, flags=re.IGNORECASE) |
| answer = re.sub(r"^\s*Answer\s*[:\-]?\s*", "", answer, flags=re.IGNORECASE) |
| answer = re.sub(r"^\s*The answer is\s*", "", answer, flags=re.IGNORECASE) |
|
|
| |
| answer = answer.strip().strip("`").strip() |
|
|
| |
| if len(answer) >= 2 and ( |
| (answer.startswith('"') and answer.endswith('"')) or |
| (answer.startswith("'") and answer.endswith("'")) |
| ): |
| answer = answer[1:-1].strip() |
|
|
| return answer.strip() |
|
|
| def __call__(self, question: str, task_id: str | None = None) -> str: |
| """ |
| Run the agent on a question and return a clean final answer. |
| """ |
| prompt = f"{SYSTEM_PROMPT}\n\nTask ID: {task_id}\nQuestion:\n{question}\n" |
| print(f"Running agent for task_id={task_id}") |
|
|
| try: |
| result = self.agent.run(prompt) |
| cleaned = self.clean_final_answer(result) |
| print(f"Agent raw result: {result}") |
| print(f"Agent cleaned result: {cleaned}") |
| return cleaned |
| except Exception as e: |
| print(f"Agent failed on task {task_id}: {e}") |
| return f"ERROR: {e}" |
|
|
|
|
| |
| |
| |
|
|
| def run_and_submit_all(profile: gr.OAuthProfile | None): |
| """ |
| Fetch all questions, run the agent, submit answers, and display results. |
| """ |
| space_id = os.getenv("SPACE_ID") |
| api_url = os.getenv("SCORING_API_URL", DEFAULT_API_URL) |
|
|
| if profile: |
| username = profile.username.strip() |
| print(f"User logged in: {username}") |
| else: |
| return "Please login to Hugging Face first.", None |
|
|
| if not space_id: |
| |
| agent_code = "LOCAL_RUN_NO_SPACE_ID" |
| print("SPACE_ID not found. Using LOCAL_RUN_NO_SPACE_ID") |
| else: |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
|
| questions_url = f"{api_url}/questions" |
| submit_url = f"{api_url}/submit" |
|
|
| |
| try: |
| agent = BasicAgent() |
| except Exception as e: |
| return f"Error initializing agent: {e}", None |
|
|
| |
| print(f"Fetching questions from {questions_url}") |
| try: |
| response = requests.get(questions_url, timeout=60) |
| response.raise_for_status() |
| questions_data = response.json() |
|
|
| if not isinstance(questions_data, list) or len(questions_data) == 0: |
| return "Questions endpoint returned empty/invalid data.", None |
|
|
| print(f"Fetched {len(questions_data)} questions.") |
| except Exception as e: |
| return f"Error fetching questions: {e}", None |
|
|
| |
| answers_payload = [] |
| results_log = [] |
|
|
| for item in questions_data: |
| task_id = item.get("task_id") |
| question_text = item.get("question", "") |
|
|
| if not task_id or not question_text: |
| results_log.append({ |
| "Task ID": item.get("task_id", "UNKNOWN"), |
| "Question": item.get("question", ""), |
| "Submitted Answer": "SKIPPED: Missing task_id or question" |
| }) |
| continue |
|
|
| try: |
| submitted_answer = agent(question_text, task_id=task_id) |
| except Exception as e: |
| submitted_answer = f"ERROR: {e}" |
|
|
| answers_payload.append({ |
| "task_id": task_id, |
| "submitted_answer": str(submitted_answer).strip() |
| }) |
|
|
| results_log.append({ |
| "Task ID": task_id, |
| "Question": question_text, |
| "Submitted Answer": submitted_answer |
| }) |
|
|
| if not answers_payload: |
| return "No answers were generated.", pd.DataFrame(results_log) |
|
|
| |
| submission_data = { |
| "username": username, |
| "agent_code": agent_code, |
| "answers": answers_payload |
| } |
|
|
| print("Submitting payload...") |
| print(json.dumps({ |
| "username": username, |
| "agent_code": agent_code, |
| "answers_count": len(answers_payload) |
| }, indent=2)) |
|
|
| try: |
| response = requests.post(submit_url, json=submission_data, timeout=180) |
| response.raise_for_status() |
| result_data = response.json() |
|
|
| final_status = ( |
| f"Submission Successful!\n" |
| f"User: {result_data.get('username', username)}\n" |
| f"Overall Score: {result_data.get('score', 'N/A')}% " |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
| f"Message: {result_data.get('message', 'No message received.')}" |
| ) |
|
|
| return final_status, pd.DataFrame(results_log) |
|
|
| except requests.exceptions.HTTPError as e: |
| detail = f"HTTP {e.response.status_code}" |
| try: |
| detail_json = e.response.json() |
| detail += f" | {detail_json}" |
| except Exception: |
| detail += f" | {e.response.text[:1000]}" |
| return f"Submission failed: {detail}", pd.DataFrame(results_log) |
|
|
| except Exception as e: |
| return f"Submission failed: {e}", pd.DataFrame(results_log) |
|
|
|
|
| |
| |
| |
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("# GAIA Unit 4 Agent Evaluation Runner") |
| gr.Markdown( |
| """ |
| **Instructions** |
| 1. Login with your Hugging Face account. |
| 2. Click **Run Evaluation & Submit All Answers**. |
| 3. The app will fetch questions, run the agent, and submit the answers. |
| """ |
| ) |
|
|
| gr.LoginButton() |
| run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=6, interactive=False) |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
| run_button.click( |
| fn=run_and_submit_all, |
| outputs=[status_output, results_table] |
| ) |
|
|
| if __name__ == "__main__": |
| print("\n" + "-" * 30 + " App Starting " + "-" * 30) |
|
|
| space_host = os.getenv("SPACE_HOST") |
| space_id = os.getenv("SPACE_ID") |
|
|
| if space_host: |
| print(f"SPACE_HOST: {space_host}") |
| else: |
| print("SPACE_HOST not found.") |
|
|
| if space_id: |
| print(f"SPACE_ID: {space_id}") |
| print(f"Repo Tree URL: https://huggingface.co/spaces/{space_id}/tree/main") |
| else: |
| print("SPACE_ID not found.") |
|
|
| print("Launching app...") |
| demo.launch(debug=True) |