""" GAIA Agent - HuggingFace Spaces Evaluation Runner 基于 LangGraph 的 GAIA benchmark 评估智能体 """ import os import time import gradio as gr import requests import pandas as pd from config import ( SCORING_API_URL, DEBUG, BATCH_QUESTION_DELAY, ) from agent import GaiaAgent # --- Constants --- DEFAULT_API_URL = SCORING_API_URL # --- Agent Wrapper --- class GAIAAgentWrapper: """ 包装 GaiaAgent,适配 HuggingFace Spaces 评估接口 """ def __init__(self): print("Initializing GAIA Agent...") self._agent = None @property def agent(self) -> GaiaAgent: """延迟初始化 Agent""" if self._agent is None: self._agent = GaiaAgent() print("GAIA Agent initialized.") return self._agent def __call__(self, question: str, task_id: str = "") -> str: """ 处理问题并返回答案 Args: question: 问题文本 task_id: 任务 ID(用于下载附件) Returns: 答案字符串 """ if DEBUG: print(f"Agent received question (first 100 chars): {question[:100]}...") try: if task_id: answer = self.agent(question, task_id=task_id) else: answer = self.agent(question) if DEBUG: print(f"Agent returning answer: {answer[:100] if len(answer) > 100 else answer}") return answer except Exception as e: error_msg = f"Agent error: {type(e).__name__}: {str(e)}" print(error_msg) return error_msg def run_and_submit_all(profile: gr.OAuthProfile | None): """ Fetches all questions, runs the GAIA Agent on them, submits all answers, and displays the results. """ # --- Determine HF Space Runtime URL and Repo URL --- space_id = os.getenv("SPACE_ID") if profile: username = f"{profile.username}" print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" # 1. Instantiate Agent try: agent = GAIAAgentWrapper() except Exception as e: print(f"Error instantiating agent: {e}") return f"Error initializing agent: {e}", None # Agent code link for HuggingFace Spaces agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "local" print(f"Agent code: {agent_code}") # 2. Fetch Questions print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=30) response.raise_for_status() questions_data = response.json() if not questions_data: print("Fetched questions list is empty.") return "Fetched questions list is empty or invalid format.", None print(f"Fetched {len(questions_data)} questions.") except requests.exceptions.RequestException as e: print(f"Error fetching questions: {e}") return f"Error fetching questions: {e}", None except requests.exceptions.JSONDecodeError as e: print(f"Error decoding JSON response from questions endpoint: {e}") return f"Error decoding server response for questions: {e}", None except Exception as e: print(f"An unexpected error occurred fetching questions: {e}") return f"An unexpected error occurred fetching questions: {e}", None # 3. Run Agent on all questions results_log = [] answers_payload = [] total_questions = len(questions_data) print(f"Running agent on {total_questions} questions...") for idx, item in enumerate(questions_data): task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue # Rate limit delay (skip first question) if idx > 0 and BATCH_QUESTION_DELAY > 0: print(f"Waiting {BATCH_QUESTION_DELAY}s before next question (rate limit)...") time.sleep(BATCH_QUESTION_DELAY) print(f"\n[{idx + 1}/{total_questions}] Processing task: {task_id}") try: submitted_answer = agent(question_text, task_id=task_id) answers_payload.append({ "task_id": task_id, "submitted_answer": submitted_answer }) results_log.append({ "Task ID": task_id, "Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, "Submitted Answer": submitted_answer }) except Exception as e: error_msg = f"AGENT ERROR: {type(e).__name__}: {e}" print(f"Error running agent on task {task_id}: {e}") results_log.append({ "Task ID": task_id, "Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, "Submitted Answer": error_msg }) if not answers_payload: print("Agent did not produce any answers to submit.") return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) # 4. Prepare Submission submission_data = { "username": username.strip(), "agent_code": agent_code, "answers": answers_payload } status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." print(status_update) # 5. Submit print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") results_df = pd.DataFrame(results_log) return final_status, results_df except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except requests.exceptions.JSONDecodeError: error_detail += f" Response: {e.response.text[:500]}" status_message = f"Submission Failed: {error_detail}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.Timeout: status_message = "Submission Failed: The request timed out." print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.RequestException as e: status_message = f"Submission Failed: Network error - {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df # --- Build Gradio Interface using Blocks --- with gr.Blocks(title="GAIA Agent Evaluation") as demo: gr.Markdown("# GAIA Agent Evaluation Runner") gr.Markdown( """ **GAIA Agent** - 基于 LangGraph 的智能体,支持: - RAG 知识库检索(高相似度直接返回答案) - 网络搜索(DuckDuckGo) - 文件处理(文本、ZIP、PDF、Excel) - 代码执行(沙箱环境) --- **Instructions:** 1. Log in to your Hugging Face account using the button below. 2. Click 'Run Evaluation & Submit All Answers' to start evaluation. 3. Wait for the agent to process all questions (this may take a while). """ ) gr.LoginButton() run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary") status_output = gr.Textbox( label="Run Status / Submission Result", lines=5, interactive=False ) results_table = gr.DataFrame( label="Questions and Agent Answers", wrap=True ) run_button.click( fn=run_and_submit_all, outputs=[status_output, results_table] ) if __name__ == "__main__": print("\n" + "-" * 30 + " GAIA Agent Starting " + "-" * 30) # Clear proxy settings for localhost os.environ['NO_PROXY'] = 'localhost,127.0.0.1' os.environ.pop('HTTP_PROXY', None) os.environ.pop('HTTPS_PROXY', None) os.environ.pop('http_proxy', None) os.environ.pop('https_proxy', None) # Check for SPACE_HOST and SPACE_ID at startup space_host_startup = os.getenv("SPACE_HOST") space_id_startup = os.getenv("SPACE_ID") if space_host_startup: print(f"SPACE_HOST found: {space_host_startup}") print(f"Runtime URL: https://{space_host_startup}.hf.space") else: print("SPACE_HOST not found (running locally)") if space_id_startup: print(f"SPACE_ID found: {space_id_startup}") print(f"Repo URL: https://huggingface.co/spaces/{space_id_startup}") else: print("SPACE_ID not found (running locally)") print("-" * (60 + len(" GAIA Agent Starting ")) + "\n") print("Launching GAIA Agent Evaluation Interface...") demo.launch(debug=True, share=False)