import os
import gradio as gr
import requests
import pandas as pd
import logging
import json
import time
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
# Stage 1: Import GAIAAgent (LangGraph-based agent)
from src.agent import GAIAAgent
# Import ground truth comparison
from src.utils.ground_truth import get_ground_truth
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Suppress noisy third-party logs (only show WARNING+)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("huggingface_hub").setLevel(logging.WARNING)
logging.getLogger("gradio").setLevel(logging.WARNING)
# (Keep Constants as is)
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- Helper Functions ---
def check_api_keys():
"""Check which API keys are configured."""
keys_status = {
"GOOGLE_API_KEY (Gemini)": "✓ SET"
if os.getenv("GOOGLE_API_KEY")
else "✗ MISSING",
"HF_TOKEN (HuggingFace)": "✓ SET" if os.getenv("HF_TOKEN") else "✗ MISSING",
"ANTHROPIC_API_KEY (Claude)": "✓ SET"
if os.getenv("ANTHROPIC_API_KEY")
else "✗ MISSING",
"TAVILY_API_KEY (Search)": "✓ SET"
if os.getenv("TAVILY_API_KEY")
else "✗ MISSING",
"EXA_API_KEY (Search)": "✓ SET" if os.getenv("EXA_API_KEY") else "✗ MISSING",
}
return "\n".join([f"{k}: {v}" for k, v in keys_status.items()])
def _build_export_data(
results_log: list,
submission_status: str,
execution_time: float = None,
submission_response: dict = None,
) -> dict:
"""Build canonical export data structure.
Single source of truth for both JSON and HTML exports.
Returns dict with metadata and results arrays.
Args:
results_log: List of question results (source of truth)
submission_status: Status message from submission
execution_time: Total execution time in seconds
submission_response: Response from GAIA API with correctness info
Returns:
Dict with {metadata: {...}, submission_status: str, results: [...]}
"""
from datetime import datetime
# Build metadata
metadata = {
"generated": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
"total_questions": len(results_log),
}
if execution_time is not None:
metadata["execution_time_seconds"] = round(execution_time, 2)
metadata["execution_time_formatted"] = (
f"{int(execution_time // 60)}m {int(execution_time % 60)}s"
)
if submission_response:
metadata["score_percent"] = submission_response.get("score")
metadata["correct_count"] = submission_response.get("correct_count")
metadata["total_attempted"] = submission_response.get("total_attempted")
# Build results array with all fields from results_log
results_array = []
for result in results_log:
result_dict = {
"task_id": result.get("Task ID", "N/A"),
"question": result.get("Question", "N/A"),
"system_error": result.get("System Error", "no"),
"submitted_answer": result.get("Submitted Answer", "N/A"),
}
if result.get("System Error") == "yes" and result.get("Error Log"):
result_dict["error_log"] = result.get("Error Log")
if result.get("Correct?"):
result_dict["correct"] = (
True if result.get("Correct?") == "✅ Yes" else False
)
if result.get("Ground Truth Answer"):
result_dict["ground_truth_answer"] = result.get("Ground Truth Answer")
if result.get("annotator_metadata"):
result_dict["annotator_metadata"] = result.get("annotator_metadata")
results_array.append(result_dict)
return {
"metadata": metadata,
"submission_status": submission_status,
"results": results_array,
}
def export_results_to_json(
results_log: list,
submission_status: str,
execution_time: float = None,
submission_response: dict = None,
) -> str:
"""Export evaluation results to JSON file.
- Saves to ./_cache/gaia_results_TIMESTAMP.json
- Uses canonical data builder for consistency with HTML export
- Single source of truth: _build_export_data()
Args:
results_log: List of question results (single source of truth)
submission_status: Status message from submission
execution_time: Total execution time in seconds
submission_response: Response from GAIA API with correctness info
Returns:
File path to JSON file
"""
from datetime import datetime
# Get canonical data structure
export_data = _build_export_data(
results_log, submission_status, execution_time, submission_response
)
# Generate filename
timestamp = export_data["metadata"]["timestamp"]
filename = f"gaia_results_{timestamp}.json"
cache_dir = os.path.join(os.getcwd(), "_cache")
os.makedirs(cache_dir, exist_ok=True)
filepath = os.path.join(cache_dir, filename)
# Write JSON file
with open(filepath, "w", encoding="utf-8") as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
logger.info(f"JSON exported to: {filepath}")
return filepath
def export_results_to_html(
results_log: list,
submission_status: str,
execution_time: float = None,
submission_response: dict = None,
) -> str:
"""Export evaluation results to HTML file.
- Saves to ./_cache/gaia_results_TIMESTAMP.html
- Uses canonical data builder for consistency with JSON export
- Single source of truth: _build_export_data()
Args:
results_log: List of question results (single source of truth)
submission_status: Status message from submission
execution_time: Total execution time in seconds
submission_response: Response from GAIA API with correctness info
Returns:
File path to HTML file
"""
from datetime import datetime
import html as html_escape
# Get canonical data structure (same source as JSON)
export_data = _build_export_data(
results_log, submission_status, execution_time, submission_response
)
metadata = export_data.get("metadata", {})
results_array = export_data.get("results", [])
# Generate filename
timestamp = metadata["timestamp"]
filename = f"gaia_results_{timestamp}.html"
cache_dir = os.path.join(os.getcwd(), "_cache")
os.makedirs(cache_dir, exist_ok=True)
filepath = os.path.join(cache_dir, filename)
def escape(text):
"""Escape HTML special characters."""
if text is None:
return ""
return html_escape.escape(str(text))
# Build HTML content
html_parts = []
html_parts.append("""
GAIA Agent Evaluation Results
GAIA Agent Evaluation Results
Results (matching JSON structure)
| # |
task_id |
question |
submitted_answer |
correct |
system_error |
error_log |
ground_truth_answer |
""")
for idx, result in enumerate(results_array, 1):
task_id = escape(result.get("task_id", "N/A"))
question = escape(result.get("question", "N/A"))
submitted_answer = escape(result.get("submitted_answer", "N/A"))
correct = result.get("correct") # boolean or null
system_error = escape(result.get("system_error", "no"))
error_log = escape(result.get("error_log", ""))
ground_truth = escape(result.get("ground_truth_answer", "N/A"))
# Format correct status (boolean from JSON)
if correct is True:
correct_display = 'true'
elif correct is False:
correct_display = 'false'
else:
correct_display = 'null'
# Format system_error
if system_error == "yes":
error_display = f'yes'
else:
error_display = system_error
html_parts.append(f"""
| {idx} |
{task_id} |
{question} |
{submitted_answer} |
{correct_display} |
{error_display} |
{error_log if error_log else '-'} |
{ground_truth} |
""")
html_parts.append("""
""")
# Write HTML file
with open(filepath, "w", encoding="utf-8") as f:
f.write("\n".join(html_parts))
logger.info(f"HTML exported to: {filepath}")
return filepath
def format_diagnostics(final_state: dict) -> str:
"""Format agent state for diagnostic display."""
diagnostics = []
# Question
diagnostics.append(f"**Question:** {final_state.get('question', 'N/A')}\n")
# Plan
plan = final_state.get("plan", "No plan generated")
diagnostics.append(f"**Plan:**\n{plan}\n")
# Tool calls
tool_calls = final_state.get("tool_calls", [])
if tool_calls:
diagnostics.append(f"**Tools Selected:** {len(tool_calls)} tool(s)")
for idx, tc in enumerate(tool_calls, 1):
tool_name = tc.get("tool", "unknown")
params = tc.get("params", {})
diagnostics.append(f" {idx}. {tool_name}({params})")
diagnostics.append("")
else:
diagnostics.append("**Tools Selected:** None\n")
# Tool results
tool_results = final_state.get("tool_results", [])
if tool_results:
diagnostics.append(f"**Tool Execution Results:** {len(tool_results)} result(s)")
for idx, tr in enumerate(tool_results, 1):
tool_name = tr.get("tool", "unknown")
status = tr.get("status", "unknown")
if status == "success":
result_preview = (
str(tr.get("result", ""))[:100] + "..."
if len(str(tr.get("result", ""))) > 100
else str(tr.get("result", ""))
)
diagnostics.append(f" {idx}. {tool_name}: ✓ SUCCESS")
diagnostics.append(f" Result: {result_preview}")
else:
error = tr.get("error", "Unknown error")
diagnostics.append(f" {idx}. {tool_name}: ✗ FAILED - {error}")
diagnostics.append("")
# Evidence
evidence = final_state.get("evidence", [])
if evidence:
diagnostics.append(f"**Evidence Collected:** {len(evidence)} item(s)")
for idx, ev in enumerate(evidence, 1):
ev_preview = ev[:150] + "..." if len(ev) > 150 else ev
diagnostics.append(f" {idx}. {ev_preview}")
diagnostics.append("")
else:
diagnostics.append("**Evidence Collected:** None\n")
# Errors
errors = final_state.get("errors", [])
if errors:
diagnostics.append(f"**Errors:** {len(errors)} error(s)")
for idx, err in enumerate(errors, 1):
diagnostics.append(f" {idx}. {err}")
diagnostics.append("")
# Answer
answer = final_state.get("answer", "No answer generated")
diagnostics.append(f"**Final Answer:** {answer}")
return "\n".join(diagnostics)
def download_task_file(
task_id: str, file_name: str, save_dir: str = "_cache/gaia_files/"
):
"""Download file attached to a GAIA question from the GAIA dataset on HuggingFace.
The evaluation API's /files/{task_id} endpoint returns 404 because files are not
hosted there. Files must be downloaded from the official GAIA dataset instead.
Files are cached in _cache/ directory (runtime cache, not in git).
Args:
task_id: Question's task_id (used for logging)
file_name: Original file name from API (e.g., "task_id.png")
save_dir: Directory to save file (created if not exists)
Returns:
File path if downloaded successfully, None if download failed
"""
import shutil
from huggingface_hub import hf_hub_download
import tempfile
# GAIA dataset file structure: 2023/validation/{task_id}.{ext}
# Extract file extension from file_name
_, ext = os.path.splitext(file_name)
ext = ext.lower()
# Try validation set first (most questions are from validation)
repo_id = "gaia-benchmark/GAIA"
possible_paths = [
f"2023/validation/{task_id}{ext}",
f"2023/test/{task_id}{ext}",
]
# Create save directory if not exists (relative to script location)
# Use script's directory as base to ensure paths work in all environments (local, HF Space)
script_dir = Path(__file__).parent.absolute()
cache_dir = script_dir / save_dir
cache_dir.mkdir(exist_ok=True, parents=True)
target_path = str(cache_dir / file_name)
# Check if file already exists in cache (use absolute path for check)
if os.path.exists(target_path):
logger.info(f"Using cached file for {task_id}: {target_path}")
return target_path
# Try each possible path
for dataset_path in possible_paths:
try:
logger.info(f"Attempting to download {dataset_path} from GAIA dataset...")
# Download to temp dir first to get the file
with tempfile.TemporaryDirectory() as temp_dir:
downloaded_path = hf_hub_download(
repo_id=repo_id,
filename=dataset_path,
repo_type="dataset",
local_dir=temp_dir,
)
# Copy file to target location (flat structure in cache)
shutil.copy(downloaded_path, target_path)
logger.info(f"Downloaded file for {task_id}: {target_path}")
return target_path
except Exception as e:
logger.debug(f"Path {dataset_path} not found: {e}")
continue
logger.warning(f"File not found in GAIA dataset for task {task_id}")
return None
def test_single_question(question: str, llm_provider: str):
"""Test agent with a single question and return diagnostics."""
if not question or not question.strip():
return "Please enter a question.", "", check_api_keys()
try:
# Set LLM provider from UI selection (overrides .env)
os.environ["LLM_PROVIDER"] = llm_provider.lower()
logger.info(f"UI Config: LLM_PROVIDER={llm_provider}")
# Initialize agent
agent = GAIAAgent()
# Run agent (this stores final_state in agent.last_state)
answer = agent(question)
# Get final state from agent
final_state = agent.last_state or {}
# Format diagnostics with LLM provider info
provider_info = f"**LLM Provider:** {llm_provider}\n\n"
diagnostics = provider_info + format_diagnostics(final_state)
api_status = check_api_keys()
return answer, diagnostics, api_status
except Exception as e:
logger.error(f"Error in test_single_question: {e}", exc_info=True)
return f"ERROR: {str(e)}", f"Exception occurred: {str(e)}", check_api_keys()
# --- GAIA Agent (Replaced BasicAgent) ---
# LangGraph-based agent with sequential workflow
# Stage 1: Placeholder nodes, returns fixed answer
# Stage 2: Tool integration
# Stage 3: Planning and reasoning logic
# Stage 4: Error handling and robustness
# Stage 5: Performance optimization
# Stage 6: Async processing with ThreadPoolExecutor
def a_determine_status(answer: str) -> tuple[bool, str | None]:
"""Determine if response is system error or AI answer.
Returns:
(is_system_error, error_log)
- is_system_error: True if system error, False if AI answer
- error_log: Full error message if system error, None otherwise
"""
if not answer:
return True, "Empty answer"
answer_lower = answer.lower().strip()
# System/technical errors from our code
if answer_lower.startswith("error:") or "no evidence collected" in answer_lower:
return True, answer # Full error message as log
# Everything else is an AI response (including "Unable to answer")
return False, None
def process_single_question(agent, item, index, total):
"""Process single question with agent, return result with error handling.
Supports file attachments - downloads files before processing.
Args:
agent: GAIAAgent instance
item: Question item dict with task_id, question, and optional file_name
index: Question index (0-based)
total: Total number of questions
Returns:
dict: Result containing task_id, question, answer, and error flag
"""
task_id = item.get("task_id")
question_text = item.get("question")
file_name = item.get("file_name")
if not task_id or question_text is None:
answer = "ERROR: Missing task_id or question"
is_error, error_log = a_determine_status(answer)
return {
"task_id": task_id,
"question": question_text,
"answer": answer,
"system_error": "yes" if is_error else "no",
"error_log": error_log,
"error": True,
}
# Download file if question has attachment
file_path = None
if file_name:
file_path = download_task_file(task_id, file_name)
if file_path:
logger.info(f"[{index + 1}/{total}] File downloaded: {file_path}")
else:
logger.warning(f"[{index + 1}/{total}] File expected but not downloaded")
try:
logger.info(f"[{index + 1}/{total}] Processing {task_id[:8]}...")
# Pass file_path to agent if available
submitted_answer = agent(question_text, file_path=file_path)
logger.info(f"[{index + 1}/{total}] Completed {task_id[:8]}")
is_error, error_log = a_determine_status(submitted_answer)
return {
"task_id": task_id,
"question": question_text,
"answer": submitted_answer,
"system_error": "yes" if is_error else "no",
"error_log": error_log,
"error": False,
}
except Exception as e:
logger.error(f"[{index + 1}/{total}] Error {task_id[:8]}: {e}")
answer = f"ERROR: {str(e)}"
is_error, error_log = a_determine_status(answer)
return {
"task_id": task_id,
"question": question_text,
"answer": answer,
"system_error": "yes" if is_error else "no",
"error_log": error_log,
"error": True,
}
def run_and_submit_all(
llm_provider: str,
video_mode: str = "Transcript",
question_limit: int = 0,
task_ids: str = "",
profile: gr.OAuthProfile | None = None,
):
"""
Fetches all questions, runs the BasicAgent on them, submits all answers,
and displays the results.
Args:
llm_provider: LLM provider to use
video_mode: YouTube processing mode ("Transcript" or "Frames")
question_limit: Limit number of questions (0 = process all)
task_ids: Comma-separated task IDs to target (overrides question_limit)
profile: OAuth profile for HF login
"""
# Start execution timer
start_time = time.time()
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code
if profile:
username = f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", "", ""
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# Set LLM provider from UI selection (overrides .env)
os.environ["LLM_PROVIDER"] = llm_provider.lower()
logger.info(f"UI Config for Full Evaluation: LLM_PROVIDER={llm_provider}")
# Set YouTube video processing mode from UI selection
os.environ["YOUTUBE_MODE"] = video_mode.lower()
logger.info(f"UI Config for Full Evaluation: YOUTUBE_MODE={video_mode}")
# 1. Instantiate Agent (Stage 1: GAIAAgent with LangGraph)
try:
logger.info("Initializing GAIAAgent...")
agent = GAIAAgent()
logger.info("GAIAAgent initialized successfully")
except Exception as e:
logger.error(f"Error instantiating agent: {e}")
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", "", ""
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public)
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None, ""
# Apply question limit if configured (from UI or .env)
limit = (
int(question_limit)
if question_limit > 0
else int(os.getenv("DEBUG_QUESTION_LIMIT", "0"))
)
if limit > 0:
questions_data = questions_data[:limit]
logger.warning(f"DEBUG MODE: Limited to first {limit} questions")
print(
f"DEBUG MODE: Processing only {limit} questions (set to 0 to process all)"
)
# Filter by specific task IDs if provided (overrides question limit)
if task_ids and task_ids.strip():
target_ids = [tid.strip() for tid in task_ids.split(",")]
original_count = len(questions_data)
questions_data = [
q for q in questions_data if q.get("task_id") in target_ids
]
found_ids = [q.get("task_id") for q in questions_data]
missing_ids = set(target_ids) - set(found_ids)
if missing_ids:
logger.warning(f"Task IDs not found: {missing_ids}")
logger.warning(
f"DEBUG MODE: Targeted {len(questions_data)}/{original_count} questions by task_id"
)
print(
f"DEBUG MODE: Processing {len(questions_data)} targeted questions "
f"({len(missing_ids)} IDs not found: {missing_ids})"
)
print(f"Processing {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None, ""
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None, ""
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None, ""
# 2.5. Load ground truth for local comparison (validation set only)
ground_truth = get_ground_truth()
if ground_truth.load_validation_set():
logger.info("Ground truth loaded - per-question correctness will be available")
else:
logger.warning("Ground truth not loaded - per-question correctness unavailable")
# 3. Run your Agent (Stage 6: Concurrent processing)
max_workers = int(os.getenv("MAX_CONCURRENT_WORKERS", "5"))
results_log = []
answers_payload = []
logger.info(
f"Running agent on {len(questions_data)} questions with {max_workers} workers..."
)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all questions for concurrent processing
future_to_index = {
executor.submit(
process_single_question, agent, item, idx, len(questions_data)
): idx
for idx, item in enumerate(questions_data)
}
# Collect results as they complete
for future in as_completed(future_to_index):
result = future.result()
# Compare with ground truth if available
is_correct = ground_truth.compare_answer(
result["task_id"], result["answer"]
)
# Get ground truth answer and metadata (fetch once)
gt_answer = ground_truth.get_answer(result["task_id"])
metadata_item = ground_truth.metadata.get(result["task_id"], {})
annotator_metadata = metadata_item.get("Annotator Metadata", {})
# Add to results log
result_entry = {
"Task ID": result["task_id"],
"Question": result["question"],
"System Error": result.get("system_error", "no"),
"Submitted Answer": ""
if result.get("system_error") == "yes"
else result["answer"],
}
# Add error log if system error
if result.get("system_error") == "yes" and result.get("error_log"):
result_entry["Error Log"] = result["error_log"]
# Add ground truth data if available
if is_correct is not None:
result_entry["Correct?"] = "✅ Yes" if is_correct else "❌ No"
result_entry["Ground Truth Answer"] = gt_answer
# Store metadata (both UI and JSON show identical data)
result_entry["annotator_metadata"] = annotator_metadata
results_log.append(result_entry)
# Add to submission payload if no system error
if result.get("system_error") == "no":
answers_payload.append(
{"task_id": result["task_id"], "submitted_answer": result["answer"]}
)
# Log progress
logger.info(
f"Progress: {len(results_log)}/{len(questions_data)} questions processed"
)
if not answers_payload:
print("Agent did not produce any answers to submit.")
status_message = "Agent did not produce any answers to submit."
execution_time = time.time() - start_time
json_path = export_results_to_json(
results_log, status_message, execution_time, None
)
html_path = export_results_to_html(
results_log, status_message, execution_time, None
)
return status_message, json_path, html_path
# 4. Prepare Submission
submission_data = {
"username": username.strip(),
"agent_code": agent_code,
"answers": answers_payload,
}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
execution_time = time.time() - start_time
logger.info(
f"Total execution time: {execution_time:.2f} seconds ({int(execution_time // 60)}m {int(execution_time % 60)}s)"
)
# LIMITATION: GAIA API does NOT provide per-question correctness data
# API response structure: {username, score, correct_count, total_attempted, message, timestamp}
# No "results" array exists - we only get summary stats, not which specific questions are correct
# Therefore: UI table has no "Correct?" column, JSON export shows "correct": null for all questions
# Export to JSON with execution time and submission response
json_path = export_results_to_json(
results_log, final_status, execution_time, result_data
)
html_path = export_results_to_html(
results_log, final_status, execution_time, result_data
)
return final_status, json_path, html_path
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
execution_time = time.time() - start_time
json_path = export_results_to_json(
results_log, status_message, execution_time, None
)
html_path = export_results_to_html(
results_log, status_message, execution_time, None
)
return status_message, json_path, html_path
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
execution_time = time.time() - start_time
json_path = export_results_to_json(
results_log, status_message, execution_time, None
)
html_path = export_results_to_html(
results_log, status_message, execution_time, None
)
return status_message, json_path, html_path
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
execution_time = time.time() - start_time
json_path = export_results_to_json(
results_log, status_message, execution_time, None
)
html_path = export_results_to_html(
results_log, status_message, execution_time, None
)
return status_message, json_path, html_path
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
execution_time = time.time() - start_time
json_path = export_results_to_json(
results_log, status_message, execution_time, None
)
html_path = export_results_to_html(
results_log, status_message, execution_time, None
)
return status_message, json_path, html_path
# --- Build Gradio Interface using Blocks ---
with gr.Blocks() as demo:
gr.Markdown("# GAIA Agent Evaluation Runner")
gr.Markdown(
"""
**Stage 4 Progress:** Adding diagnostics, error handling, and fallback mechanisms.
"""
)
with gr.Tabs():
# Tab 1: Full Evaluation (primary functionality)
with gr.Tab("📊 Full Evaluation"):
gr.Markdown(
"""
**Quick Start:**
1. **Log in** to your Hugging Face account (uses your username for leaderboard submission)
2. **Select LLM Provider** (Gemini/HuggingFace/Groq/Claude)
3. **Click "Run Evaluation & Submit All Answers"**
**What happens:**
- Fetches GAIA benchmark questions
- Runs your agent on each question using selected LLM
- Submits answers to official leaderboard
- Returns downloadable results (JSON + HTML)
**Expectations:**
- Full evaluation takes time (agent processes all questions sequentially)
- Download files appear below when complete
"""
)
gr.LoginButton()
with gr.Row():
eval_llm_provider_dropdown = gr.Dropdown(
label="LLM Provider for Evaluation",
choices=["Gemini", "HuggingFace", "Groq", "Claude"],
value="HuggingFace",
info="Select which LLM to use for all questions",
)
eval_video_mode = gr.Radio(
label="YouTube Processing Mode",
choices=["Transcript", "Frames"],
value="Transcript",
info="Transcript: Audio/subtitle extraction (fast) | Frames: Visual analysis with vision models (slower)",
)
eval_question_limit = gr.Number(
label="Question Limit (Debug)",
value=0,
precision=0,
minimum=0,
maximum=165,
info="Limit questions for testing (0 = process all)",
)
with gr.Row():
eval_task_ids = gr.Textbox(
label="Target Task IDs (Debug)",
value="",
placeholder="task_id1, task_id2, ...",
info="Comma-separated task IDs to run (overrides question limit)",
lines=1,
)
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(
label="Run Status / Submission Result", lines=5, interactive=False
)
# Export buttons - JSON and HTML
json_export = gr.File(label="Download JSON Results", type="filepath")
html_export = gr.File(label="Download HTML Results", type="filepath")
run_button.click(
fn=run_and_submit_all,
inputs=[
eval_llm_provider_dropdown,
eval_video_mode,
eval_question_limit,
eval_task_ids,
],
outputs=[status_output, json_export, html_export],
)
# Tab 2: Test Single Question (debugging/diagnostics)
with gr.Tab("🔍 Test & Debug"):
gr.Markdown("""
**Test Mode:** Run the agent on a single question and see detailed diagnostics.
This mode shows:
- API key status
- Execution plan
- Tools selected and executed
- Evidence collected
- Errors encountered
- Final answer
""")
test_question_input = gr.Textbox(
label="Enter Test Question",
placeholder="e.g., What is the capital of France?",
lines=3,
)
with gr.Row():
llm_provider_dropdown = gr.Dropdown(
label="LLM Provider",
choices=["Gemini", "HuggingFace", "Groq", "Claude"],
value="HuggingFace",
info="Select which LLM to use for this test",
)
test_button = gr.Button("Run Test", variant="primary")
with gr.Row():
with gr.Column(scale=1):
test_answer_output = gr.Textbox(
label="Answer", lines=3, interactive=False
)
test_api_status = gr.Textbox(
label="API Keys Status", lines=5, interactive=False
)
with gr.Column(scale=2):
test_diagnostics_output = gr.Textbox(
label="Execution Diagnostics", lines=20, interactive=False
)
test_button.click(
fn=test_single_question,
inputs=[
test_question_input,
llm_provider_dropdown,
],
outputs=[test_answer_output, test_diagnostics_output, test_api_status],
)
if __name__ == "__main__":
print("\n" + "-" * 30 + " App Starting " + "-" * 30)
# Check for SPACE_HOST and SPACE_ID at startup for information
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup: # Print repo URLs if SPACE_ID is found
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(
f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main"
)
else:
print(
"ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined."
)
print("-" * (60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for Basic Agent Evaluation...")
demo.launch(debug=True, share=False)