| import os
|
| import argparse
|
| import requests
|
| import pandas as pd
|
| import json
|
| import time
|
| import warnings
|
| import logging
|
| from enum import Enum
|
| from colorama import init
|
|
|
|
|
| init(autoreset=True)
|
|
|
|
|
| warnings.filterwarnings('ignore', message='.*Invalid file descriptor.*')
|
| logging.getLogger('asyncio').setLevel(logging.ERROR)
|
|
|
|
|
| import config
|
|
|
|
|
|
|
| from gradioapp import create_ui
|
|
|
| from scorer import question_scorer
|
|
|
|
|
| from question_loader import QuestionLoader
|
| from result_formatter import ResultFormatter
|
| from agent_runner import AgentRunner
|
| from validators import InputValidator, ValidationError
|
| from utils import retry_with_backoff
|
| from langfuse_tracking import track_session
|
|
|
|
|
| class RunMode(Enum):
|
| UI = "ui"
|
| CLI = "cli"
|
|
|
|
|
| @retry_with_backoff(max_retries=3, initial_delay=2.0)
|
| def _submit_to_server(submit_url: str, submission_data: dict) -> dict:
|
| """Internal function to submit to server (with retries)."""
|
| response = requests.post(submit_url, json=submission_data, timeout=config.SUBMIT_TIMEOUT)
|
| response.raise_for_status()
|
| return response.json()
|
|
|
| def submit_and_score(username: str, results: list) -> str:
|
| """
|
| Submit answers to the GAIA scoring server and return status message.
|
|
|
| Args:
|
| username: Hugging Face username for submission
|
| results: List of tuples (task_id, question_text, answer)
|
|
|
| Returns:
|
| str: Status message (success or error details)
|
| """
|
|
|
| try:
|
| username = InputValidator.validate_username(username)
|
| except ValidationError as e:
|
| error_msg = f"Invalid username: {e}"
|
| print(error_msg)
|
| return error_msg
|
|
|
|
|
| answers_payload = ResultFormatter.format_for_api(results)
|
|
|
| if not answers_payload:
|
| error_msg = "No answers to submit."
|
| print(error_msg)
|
| return error_msg
|
|
|
| space_id = config.SPACE_ID
|
| submit_url = f"{config.DEFAULT_API_URL}/submit"
|
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
|
|
|
|
|
| submission_data = {
|
| "username": username,
|
| "agent_code": agent_code,
|
| "answers": answers_payload
|
| }
|
|
|
| print(f"\n{'=' * config.SEPARATOR_WIDTH}")
|
| print(f"Submitting {len(answers_payload)} answers for user '{username}'...")
|
| print(f"{'=' * config.SEPARATOR_WIDTH}\n")
|
|
|
|
|
| print(f"Submitting to: {submit_url}")
|
| try:
|
| result_data = _submit_to_server(submit_url, submission_data)
|
|
|
| final_status = (
|
| f"Submission Successful!\n"
|
| f"User: {result_data.get('username')}\n"
|
| f"Overall Score: {result_data.get('score', 'N/A')}% "
|
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
|
| f"Message: {result_data.get('message', 'No message received.')}"
|
| )
|
| print("Submission successful.")
|
| return final_status
|
|
|
| except requests.exceptions.HTTPError as e:
|
| error_detail = f"Server responded with status {e.response.status_code}."
|
| try:
|
| error_json = e.response.json()
|
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
|
| except requests.exceptions.JSONDecodeError:
|
| error_detail += f" Response: {e.response.text[:500]}"
|
| status_message = f"Submission Failed: {error_detail}"
|
| print(status_message)
|
| return status_message
|
|
|
| except requests.exceptions.Timeout:
|
| status_message = "Submission Failed: The request timed out."
|
| print(status_message)
|
| return status_message
|
|
|
| except requests.exceptions.RequestException as e:
|
| status_message = f"Submission Failed: Network error - {e}"
|
| print(status_message)
|
| return status_message
|
|
|
| except Exception as e:
|
| status_message = f"An unexpected error occurred during submission: {e}"
|
| print(status_message)
|
| return status_message
|
|
|
|
|
| def run_and_submit_all(username: str, active_agent: str = None) -> tuple:
|
| """
|
| Fetches all questions, runs the GAIA agent on them, submits all answers,
|
| and displays the results.
|
|
|
| Args:
|
| username: Hugging Face username for submission
|
| active_agent: The agent type to use (default: config.AGENT_LANGGRAPH)
|
|
|
| Returns:
|
| tuple: (status_message: str, results_df: pd.DataFrame)
|
| """
|
|
|
| try:
|
| questions_data = QuestionLoader().get_questions(test_mode=False)
|
| except Exception as e:
|
| return f"Error loading questions: {e}", None
|
|
|
|
|
| try:
|
| questions_data = InputValidator.validate_questions_data(questions_data)
|
| except ValidationError as e:
|
| return f"Invalid questions data: {e}", None
|
|
|
|
|
| with track_session("Submit_All", {
|
| "agent": active_agent or config.ACTIVE_AGENT,
|
| "username": username,
|
| "question_count": len(questions_data),
|
| "mode": "submission"
|
| }):
|
| results = AgentRunner(active_agent=active_agent).run_on_questions(questions_data)
|
|
|
| if results is None:
|
| return "Error initializing agent.", None
|
|
|
|
|
| status_message = submit_and_score(username, results)
|
|
|
|
|
| results_for_display = ResultFormatter.format_for_display(results)
|
| results_df = pd.DataFrame(results_for_display)
|
| return status_message, results_df
|
|
|
| def _load_ground_truth(file_path: str = config.METADATA_FILE) -> dict:
|
| """Load ground truth data indexed by task_id.
|
|
|
| Args:
|
| file_path: Path to the metadata file
|
|
|
| Returns:
|
| dict: Mapping of task_id -> {"question": str, "answer": str}
|
| """
|
| truth_mapping = {}
|
| try:
|
| with open(file_path, 'r', encoding='utf-8') as f:
|
| for line in f:
|
| data = json.loads(line)
|
| task_id = data.get("task_id")
|
| question = data.get("Question")
|
| answer = data.get("Final answer")
|
| if task_id and answer:
|
| truth_mapping[task_id] = {
|
| "question": question,
|
| "answer": answer
|
| }
|
| except Exception as e:
|
| print(f"Error loading ground truth: {e}")
|
| return truth_mapping
|
|
|
| def _verify_answers(results: list, log_output: list, runtime: tuple = None) -> None:
|
| """Verify answers against ground truth using the official GAIA scorer.
|
|
|
| Args:
|
| results: List of tuples (task_id, question_text, answer)
|
| log_output: List to append verification results to
|
| runtime: Optional tuple of (minutes, seconds) for total runtime
|
| """
|
| ground_truth = _load_ground_truth()
|
| log_output.append("\n=== Verification Results ===")
|
|
|
| correct_count = 0
|
| total_count = 0
|
|
|
| for task_id, question_text, answer in results:
|
| if task_id in ground_truth:
|
| truth_data = ground_truth[task_id]
|
| correct_answer = truth_data["answer"]
|
|
|
|
|
|
|
| is_correct = question_scorer(str(answer), str(correct_answer))
|
|
|
| if is_correct:
|
| correct_count += 1
|
| total_count += 1
|
|
|
| log_output.append(f"Task ID: {task_id}")
|
| log_output.append(f"Question: {question_text[:config.ERROR_MESSAGE_LENGTH]}...")
|
| log_output.append(f"Expected: {correct_answer}")
|
| log_output.append(f"Got: {answer}")
|
| log_output.append(f"Match: {'✓ Correct' if is_correct else '✗ Incorrect'}\n")
|
| else:
|
| log_output.append(f"Task ID: {task_id}")
|
| log_output.append(f"Question: {question_text[:config.ERROR_MESSAGE_LENGTH]}...")
|
| log_output.append(f"No ground truth found.\n")
|
|
|
|
|
| if total_count > 0:
|
| accuracy = (correct_count / total_count) * 100
|
| log_output.append("=" * config.SEPARATOR_WIDTH)
|
| log_output.append(f"SUMMARY: {correct_count}/{total_count} correct ({accuracy:.1f}%)")
|
| if runtime:
|
| minutes, seconds = runtime
|
| log_output.append(f"Runtime: {minutes}m {seconds}s")
|
| log_output.append("=" * config.SEPARATOR_WIDTH)
|
|
|
| def run_test_code(filter=None, active_agent=None) -> pd.DataFrame:
|
| """Run test code on selected questions.
|
|
|
| Args:
|
| filter: Optional tuple/list of question indices to test (e.g., (4, 7, 15)).
|
| If None, processes all questions.
|
| active_agent: Optional agent type to use (e.g., "LangGraph", "ReActLangGraph", "LLamaIndex").
|
| If None, uses config.ACTIVE_AGENT.
|
|
|
| Returns:
|
| pd.DataFrame: Results and verification output
|
| """
|
| start_time = time.time()
|
| logs_for_display = []
|
| logs_for_display.append("=== Processing Example Questions One by One ===")
|
|
|
|
|
| try:
|
| questions_data = QuestionLoader().get_questions(test_mode=True)
|
| except Exception as e:
|
| return pd.DataFrame([f"Error loading questions: {e}"])
|
|
|
|
|
| try:
|
| questions_data = InputValidator.validate_questions_data(questions_data)
|
| except ValidationError as e:
|
| return pd.DataFrame([f"Invalid questions data: {e}"])
|
|
|
|
|
| try:
|
| filter = InputValidator.validate_filter_indices(filter, len(questions_data))
|
| except ValidationError as e:
|
| return pd.DataFrame([f"Invalid filter: {e}"])
|
|
|
|
|
| if filter is not None:
|
| questions_to_process = [questions_data[i] for i in filter]
|
| logs_for_display.append(f"Testing {len(questions_to_process)} selected questions (indices: {filter})")
|
| else:
|
| questions_to_process = questions_data
|
| logs_for_display.append(f"Testing all {len(questions_to_process)} questions")
|
|
|
|
|
| with track_session("Test_Run", {
|
| "agent": active_agent or config.ACTIVE_AGENT,
|
| "question_count": len(questions_to_process),
|
| "filter": str(filter) if filter else "all",
|
| "mode": "test"
|
| }):
|
| results = AgentRunner(active_agent=active_agent).run_on_questions(questions_to_process)
|
|
|
| if results is None:
|
| return pd.DataFrame(["Error initializing agent."])
|
|
|
| logs_for_display.append("\n=== Completed Example Questions ===")
|
|
|
|
|
| elapsed_time = time.time() - start_time
|
| minutes = int(elapsed_time // 60)
|
| seconds = int(elapsed_time % 60)
|
|
|
| _verify_answers(results, logs_for_display, runtime=(minutes, seconds))
|
| return pd.DataFrame(logs_for_display)
|
|
|
|
|
| def main() -> None:
|
| """Main entry point for the application."""
|
| parser = argparse.ArgumentParser(description="Run the agent application.")
|
| parser.add_argument("--test", type=str, nargs='?', const='default', help="Run local tests on selected questions and exit. Optionally provide comma-separated question indices (e.g., --test 2,4,6). If no indices provided, uses default test questions.")
|
| parser.add_argument("--testall", action="store_true", help="Run local tests on all questions and exit.")
|
| parser.add_argument("--agent", type=str, choices=['langgraph', 'reactlangg', 'llamaindex'], help="Agent to use in CLI mode (case-insensitive). Options: langgraph, react langgraph, llamaindex. Default: uses config.ACTIVE_AGENT")
|
| args = parser.parse_args()
|
|
|
|
|
| agent_mapping = {
|
| 'langgraph': config.AGENT_LANGGRAPH,
|
| 'reactlangg': config.AGENT_REACT_LANGGRAPH,
|
| 'llamaindex': config.AGENT_LLAMAINDEX,
|
| }
|
|
|
| active_agent = None
|
| if args.agent:
|
| agent_key = args.agent.lower()
|
| active_agent = agent_mapping.get(agent_key)
|
| if not active_agent:
|
| print(f"Error: Unknown agent '{args.agent}'. Valid options: langgraph, react, llamaindex")
|
| return
|
| print(f"[CLI] Using agent: {active_agent}")
|
|
|
| print(f"\n{'-' * 30} App Starting {'-' * 30}")
|
|
|
|
|
| run_mode = RunMode.CLI if (args.test or args.testall) else RunMode.UI
|
|
|
|
|
| if run_mode == RunMode.UI:
|
| space_host = config.SPACE_HOST
|
| space_id = config.SPACE_ID
|
|
|
| if space_host:
|
| print(f"[OK] SPACE_HOST found: {space_host}")
|
| print(f" Runtime URL should be: https://{space_host}.hf.space")
|
| else:
|
| print("[INFO] SPACE_HOST environment variable not found (running locally?).")
|
|
|
| if space_id:
|
| print(f"[OK] SPACE_ID found: {space_id}")
|
| print(f" Repo URL: https://huggingface.co/spaces/{space_id}")
|
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id}/tree/main")
|
| else:
|
| print("[INFO] SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
|
|
|
| print(f"{'-' * (60 + len(' App Starting '))}\n")
|
|
|
|
|
| if run_mode == RunMode.UI:
|
| print("Launching Gradio Interface for Basic Agent Evaluation...")
|
| grTestApp = create_ui(run_and_submit_all, run_test_code)
|
| grTestApp.launch()
|
|
|
| else:
|
|
|
| if args.test:
|
|
|
| if args.test == 'default':
|
|
|
| test_filter = config.DEFAULT_TEST_FILTER
|
| else:
|
|
|
| try:
|
| test_filter = tuple(int(idx.strip()) for idx in args.test.split(','))
|
| except ValueError:
|
| print(f"Error: Invalid test indices '{args.test}'. Must be comma-separated integers (e.g., 2,4,6)")
|
| return
|
| else:
|
| test_filter = None
|
|
|
| print(f"Running test code on {len(test_filter) if test_filter else 'ALL'} questions (CLI mode)...")
|
| result = run_test_code(filter=test_filter, active_agent=active_agent)
|
|
|
|
|
| if isinstance(result, pd.DataFrame):
|
| ResultFormatter.print_dataframe(result)
|
| else:
|
| print(result)
|
|
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|