import os import re import json import tempfile from pathlib import Path import gradio as gr import requests import pandas as pd from smolagents import CodeAgent, DuckDuckGoSearchTool, VisitWebpageTool, tool from smolagents.models import InferenceClientModel # ============================================================ # Constants # ============================================================ DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # ============================================================ # Helper tools # ============================================================ @tool def download_task_file(task_id: str) -> str: """ Download the file attached to a GAIA task and return the local file path. Use this when the question references an attached file/document/image/data file. Args: task_id: The task id of the GAIA question. Returns: Local file path of the downloaded file, or a message if no file is available. """ api_url = os.getenv("SCORING_API_URL", DEFAULT_API_URL) file_url = f"{api_url}/files/{task_id}" try: response = requests.get(file_url, timeout=60) if response.status_code != 200: return f"No downloadable file found for task {task_id}. HTTP {response.status_code}" content_type = response.headers.get("content-type", "").lower() # Try to infer extension ext = "" if "pdf" in content_type: ext = ".pdf" elif "json" in content_type: ext = ".json" elif "csv" in content_type: ext = ".csv" elif "text" in content_type: ext = ".txt" elif "html" in content_type: ext = ".html" elif "png" in content_type: ext = ".png" elif "jpeg" in content_type or "jpg" in content_type: ext = ".jpg" elif "excel" in content_type or "spreadsheet" in content_type: ext = ".xlsx" tmp_dir = tempfile.mkdtemp(prefix="gaia_task_") file_path = os.path.join(tmp_dir, f"{task_id}{ext}") with open(file_path, "wb") as f: f.write(response.content) return file_path except Exception as e: return f"Error downloading file for task {task_id}: {e}" @tool def read_local_text_file(file_path: str) -> str: """ Read a local text-like file and return its contents. Use this only for local TXT/JSON/CSV/HTML-like files after downloading them. Args: file_path: Path to a local file. Returns: File contents as text. """ try: path = Path(file_path) if not path.exists(): return f"File not found: {file_path}" # Try UTF-8 first, then fallback try: return path.read_text(encoding="utf-8") except Exception: return path.read_text(errors="ignore") except Exception as e: return f"Error reading file {file_path}: {e}" # ============================================================ # Agent # ============================================================ SYSTEM_PROMPT = """ You are solving a GAIA benchmark question. Rules: 1. Think carefully and use tools when needed. 2. If the question mentions an attached file, download it using the download_task_file tool. 3. If a downloaded file is text/csv/json/html-like, inspect it with read_local_text_file. 4. If web information is needed, use the search/browser tools. 5. Return ONLY the final answer. 6. Do NOT return explanations. 7. Do NOT return the words "FINAL ANSWER". 8. Do NOT add markdown, bullet points, or surrounding quotes unless the answer itself requires quotes. 9. Keep the answer as short and exact as possible. """ class BasicAgent: def __init__(self): # You can change the model if needed, but this works well on HF Spaces # and avoids the old HfApiModel import issue. model_id = os.getenv("MODEL_ID", "Qwen/Qwen2.5-72B-Instruct") self.model = InferenceClientModel( model_id=model_id, token=os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN"), ) self.agent = CodeAgent( tools=[ DuckDuckGoSearchTool(), VisitWebpageTool(), download_task_file, read_local_text_file, ], model=self.model, additional_authorized_imports=[ "json", "re", "math", "statistics", "csv", "pandas", "pathlib", ], max_steps=12, verbosity_level=1, ) print(f"BasicAgent initialized with model: {model_id}") def clean_final_answer(self, answer: str) -> str: """ Clean the model output for exact-match scoring. """ if answer is None: return "" answer = str(answer).strip() # Remove common prefixes the model may add answer = re.sub(r"^\s*FINAL ANSWER\s*[:\-]?\s*", "", answer, flags=re.IGNORECASE) answer = re.sub(r"^\s*Answer\s*[:\-]?\s*", "", answer, flags=re.IGNORECASE) answer = re.sub(r"^\s*The answer is\s*", "", answer, flags=re.IGNORECASE) # Remove enclosing markdown/code fences if any answer = answer.strip().strip("`").strip() # If it returns quoted answer like "Paris", remove only outer quotes if len(answer) >= 2 and ( (answer.startswith('"') and answer.endswith('"')) or (answer.startswith("'") and answer.endswith("'")) ): answer = answer[1:-1].strip() return answer.strip() def __call__(self, question: str, task_id: str | None = None) -> str: """ Run the agent on a question and return a clean final answer. """ prompt = f"{SYSTEM_PROMPT}\n\nTask ID: {task_id}\nQuestion:\n{question}\n" print(f"Running agent for task_id={task_id}") try: result = self.agent.run(prompt) cleaned = self.clean_final_answer(result) print(f"Agent raw result: {result}") print(f"Agent cleaned result: {cleaned}") return cleaned except Exception as e: print(f"Agent failed on task {task_id}: {e}") return f"ERROR: {e}" # ============================================================ # Main runner # ============================================================ def run_and_submit_all(profile: gr.OAuthProfile | None): """ Fetch all questions, run the agent, submit answers, and display results. """ space_id = os.getenv("SPACE_ID") api_url = os.getenv("SCORING_API_URL", DEFAULT_API_URL) if profile: username = profile.username.strip() print(f"User logged in: {username}") else: return "Please login to Hugging Face first.", None if not space_id: # Fallback so submission still works locally if needed agent_code = "LOCAL_RUN_NO_SPACE_ID" print("SPACE_ID not found. Using LOCAL_RUN_NO_SPACE_ID") else: agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" # 1) Build agent try: agent = BasicAgent() except Exception as e: return f"Error initializing agent: {e}", None # 2) Fetch questions print(f"Fetching questions from {questions_url}") try: response = requests.get(questions_url, timeout=60) response.raise_for_status() questions_data = response.json() if not isinstance(questions_data, list) or len(questions_data) == 0: return "Questions endpoint returned empty/invalid data.", None print(f"Fetched {len(questions_data)} questions.") except Exception as e: return f"Error fetching questions: {e}", None # 3) Solve questions answers_payload = [] results_log = [] for item in questions_data: task_id = item.get("task_id") question_text = item.get("question", "") if not task_id or not question_text: results_log.append({ "Task ID": item.get("task_id", "UNKNOWN"), "Question": item.get("question", ""), "Submitted Answer": "SKIPPED: Missing task_id or question" }) continue try: submitted_answer = agent(question_text, task_id=task_id) except Exception as e: submitted_answer = f"ERROR: {e}" answers_payload.append({ "task_id": task_id, "submitted_answer": str(submitted_answer).strip() }) results_log.append({ "Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer }) if not answers_payload: return "No answers were generated.", pd.DataFrame(results_log) # 4) Submit submission_data = { "username": username, "agent_code": agent_code, "answers": answers_payload } print("Submitting payload...") print(json.dumps({ "username": username, "agent_code": agent_code, "answers_count": len(answers_payload) }, indent=2)) try: response = requests.post(submit_url, json=submission_data, timeout=180) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username', username)}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) return final_status, pd.DataFrame(results_log) except requests.exceptions.HTTPError as e: detail = f"HTTP {e.response.status_code}" try: detail_json = e.response.json() detail += f" | {detail_json}" except Exception: detail += f" | {e.response.text[:1000]}" return f"Submission failed: {detail}", pd.DataFrame(results_log) except Exception as e: return f"Submission failed: {e}", pd.DataFrame(results_log) # ============================================================ # Gradio UI # ============================================================ with gr.Blocks() as demo: gr.Markdown("# GAIA Unit 4 Agent Evaluation Runner") gr.Markdown( """ **Instructions** 1. Login with your Hugging Face account. 2. Click **Run Evaluation & Submit All Answers**. 3. The app will fetch questions, run the agent, and submit the answers. """ ) gr.LoginButton() run_button = gr.Button("Run Evaluation & Submit All Answers") status_output = gr.Textbox(label="Run Status / Submission Result", lines=6, interactive=False) results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) run_button.click( fn=run_and_submit_all, outputs=[status_output, results_table] ) if __name__ == "__main__": print("\n" + "-" * 30 + " App Starting " + "-" * 30) space_host = os.getenv("SPACE_HOST") space_id = os.getenv("SPACE_ID") if space_host: print(f"SPACE_HOST: {space_host}") else: print("SPACE_HOST not found.") if space_id: print(f"SPACE_ID: {space_id}") print(f"Repo Tree URL: https://huggingface.co/spaces/{space_id}/tree/main") else: print("SPACE_ID not found.") print("Launching app...") demo.launch(debug=True)