hemantvirmani's picture
fix runtime warnings that come on hugging face spaces
e378548
import os
import argparse
import requests
import pandas as pd
import json
import time
import warnings
import logging
from enum import Enum
from colorama import init
# Initialize colorama for Windows compatibility
init(autoreset=True)
# Suppress asyncio event loop cleanup warnings (common on HF Spaces)
warnings.filterwarnings('ignore', message='.*Invalid file descriptor.*')
logging.getLogger('asyncio').setLevel(logging.ERROR)
# Import configuration
import config
# Agent-related code is imported via agent_runner module
# Import Gradio UI creation function
from gradioapp import create_ui
# Import scoring function for answer verification
from scorer import question_scorer
# Import new utilities
from question_loader import QuestionLoader
from result_formatter import ResultFormatter
from agent_runner import AgentRunner
from validators import InputValidator, ValidationError
from utils import retry_with_backoff
from langfuse_tracking import track_session
# --- Run Modes ---
class RunMode(Enum):
UI = "ui" # Gradio UI mode
CLI = "cli" # Command-line test mode
@retry_with_backoff(max_retries=3, initial_delay=2.0)
def _submit_to_server(submit_url: str, submission_data: dict) -> dict:
"""Internal function to submit to server (with retries)."""
response = requests.post(submit_url, json=submission_data, timeout=config.SUBMIT_TIMEOUT)
response.raise_for_status()
return response.json()
def submit_and_score(username: str, results: list) -> str:
"""
Submit answers to the GAIA scoring server and return status message.
Args:
username: Hugging Face username for submission
results: List of tuples (task_id, question_text, answer)
Returns:
str: Status message (success or error details)
"""
# Validate username
try:
username = InputValidator.validate_username(username)
except ValidationError as e:
error_msg = f"Invalid username: {e}"
print(error_msg)
return error_msg
# Format results for API submission
answers_payload = ResultFormatter.format_for_api(results)
if not answers_payload:
error_msg = "No answers to submit."
print(error_msg)
return error_msg
space_id = config.SPACE_ID
submit_url = f"{config.DEFAULT_API_URL}/submit"
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
# Prepare submission data
submission_data = {
"username": username,
"agent_code": agent_code,
"answers": answers_payload
}
print(f"\n{'=' * config.SEPARATOR_WIDTH}")
print(f"Submitting {len(answers_payload)} answers for user '{username}'...")
print(f"{'=' * config.SEPARATOR_WIDTH}\n")
# Submit to server
print(f"Submitting to: {submit_url}")
try:
result_data = _submit_to_server(submit_url, submission_data)
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
return final_status
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
return status_message
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
return status_message
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
return status_message
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
return status_message
def run_and_submit_all(username: str, active_agent: str = None) -> tuple:
"""
Fetches all questions, runs the GAIA agent on them, submits all answers,
and displays the results.
Args:
username: Hugging Face username for submission
active_agent: The agent type to use (default: config.AGENT_LANGGRAPH)
Returns:
tuple: (status_message: str, results_df: pd.DataFrame)
"""
# Fetch questions from API (always online for submission)
try:
questions_data = QuestionLoader().get_questions(test_mode=False)
except Exception as e:
return f"Error loading questions: {e}", None
# Validate questions data
try:
questions_data = InputValidator.validate_questions_data(questions_data)
except ValidationError as e:
return f"Invalid questions data: {e}", None
# Run agent on all questions with specified agent type (with Langfuse session tracking)
with track_session("Submit_All", {
"agent": active_agent or config.ACTIVE_AGENT,
"username": username,
"question_count": len(questions_data),
"mode": "submission"
}):
results = AgentRunner(active_agent=active_agent).run_on_questions(questions_data)
if results is None:
return "Error initializing agent.", None
# Submit answers and get score (formatting happens inside submit_and_score)
status_message = submit_and_score(username, results)
# Format results for UI display
results_for_display = ResultFormatter.format_for_display(results)
results_df = pd.DataFrame(results_for_display)
return status_message, results_df
def _load_ground_truth(file_path: str = config.METADATA_FILE) -> dict:
"""Load ground truth data indexed by task_id.
Args:
file_path: Path to the metadata file
Returns:
dict: Mapping of task_id -> {"question": str, "answer": str}
"""
truth_mapping = {}
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
data = json.loads(line)
task_id = data.get("task_id")
question = data.get("Question")
answer = data.get("Final answer")
if task_id and answer:
truth_mapping[task_id] = {
"question": question,
"answer": answer
}
except Exception as e:
print(f"Error loading ground truth: {e}")
return truth_mapping
def _verify_answers(results: list, log_output: list, runtime: tuple = None) -> None:
"""Verify answers against ground truth using the official GAIA scorer.
Args:
results: List of tuples (task_id, question_text, answer)
log_output: List to append verification results to
runtime: Optional tuple of (minutes, seconds) for total runtime
"""
ground_truth = _load_ground_truth()
log_output.append("\n=== Verification Results ===")
correct_count = 0
total_count = 0
for task_id, question_text, answer in results:
if task_id in ground_truth:
truth_data = ground_truth[task_id]
correct_answer = truth_data["answer"]
# Use the official GAIA question_scorer for comparison
# This handles numbers, lists, and strings with proper normalization
is_correct = question_scorer(str(answer), str(correct_answer))
if is_correct:
correct_count += 1
total_count += 1
log_output.append(f"Task ID: {task_id}")
log_output.append(f"Question: {question_text[:config.ERROR_MESSAGE_LENGTH]}...")
log_output.append(f"Expected: {correct_answer}")
log_output.append(f"Got: {answer}")
log_output.append(f"Match: {'✓ Correct' if is_correct else '✗ Incorrect'}\n")
else:
log_output.append(f"Task ID: {task_id}")
log_output.append(f"Question: {question_text[:config.ERROR_MESSAGE_LENGTH]}...")
log_output.append(f"No ground truth found.\n")
# Add summary statistics
if total_count > 0:
accuracy = (correct_count / total_count) * 100
log_output.append("=" * config.SEPARATOR_WIDTH)
log_output.append(f"SUMMARY: {correct_count}/{total_count} correct ({accuracy:.1f}%)")
if runtime:
minutes, seconds = runtime
log_output.append(f"Runtime: {minutes}m {seconds}s")
log_output.append("=" * config.SEPARATOR_WIDTH)
def run_test_code(filter=None, active_agent=None) -> pd.DataFrame:
"""Run test code on selected questions.
Args:
filter: Optional tuple/list of question indices to test (e.g., (4, 7, 15)).
If None, processes all questions.
active_agent: Optional agent type to use (e.g., "LangGraph", "ReActLangGraph", "LLamaIndex").
If None, uses config.ACTIVE_AGENT.
Returns:
pd.DataFrame: Results and verification output
"""
start_time = time.time()
logs_for_display = []
logs_for_display.append("=== Processing Example Questions One by One ===")
# Fetch questions (OFFLINE for testing)
try:
questions_data = QuestionLoader().get_questions(test_mode=True)
except Exception as e:
return pd.DataFrame([f"Error loading questions: {e}"])
# Validate questions data
try:
questions_data = InputValidator.validate_questions_data(questions_data)
except ValidationError as e:
return pd.DataFrame([f"Invalid questions data: {e}"])
# Validate and apply filter
try:
filter = InputValidator.validate_filter_indices(filter, len(questions_data))
except ValidationError as e:
return pd.DataFrame([f"Invalid filter: {e}"])
# Apply filter or use all questions
if filter is not None:
questions_to_process = [questions_data[i] for i in filter]
logs_for_display.append(f"Testing {len(questions_to_process)} selected questions (indices: {filter})")
else:
questions_to_process = questions_data
logs_for_display.append(f"Testing all {len(questions_to_process)} questions")
# Run agent on selected questions with specified agent type (with Langfuse session tracking)
with track_session("Test_Run", {
"agent": active_agent or config.ACTIVE_AGENT,
"question_count": len(questions_to_process),
"filter": str(filter) if filter else "all",
"mode": "test"
}):
results = AgentRunner(active_agent=active_agent).run_on_questions(questions_to_process)
if results is None:
return pd.DataFrame(["Error initializing agent."])
logs_for_display.append("\n=== Completed Example Questions ===")
# Calculate runtime
elapsed_time = time.time() - start_time
minutes = int(elapsed_time // 60)
seconds = int(elapsed_time % 60)
_verify_answers(results, logs_for_display, runtime=(minutes, seconds))
return pd.DataFrame(logs_for_display)
def main() -> None:
"""Main entry point for the application."""
parser = argparse.ArgumentParser(description="Run the agent application.")
parser.add_argument("--test", type=str, nargs='?', const='default', help="Run local tests on selected questions and exit. Optionally provide comma-separated question indices (e.g., --test 2,4,6). If no indices provided, uses default test questions.")
parser.add_argument("--testall", action="store_true", help="Run local tests on all questions and exit.")
parser.add_argument("--agent", type=str, choices=['langgraph', 'reactlangg', 'llamaindex'], help="Agent to use in CLI mode (case-insensitive). Options: langgraph, react langgraph, llamaindex. Default: uses config.ACTIVE_AGENT")
args = parser.parse_args()
# Map agent name to config constant (case-insensitive)
agent_mapping = {
'langgraph': config.AGENT_LANGGRAPH,
'reactlangg': config.AGENT_REACT_LANGGRAPH,
'llamaindex': config.AGENT_LLAMAINDEX,
}
active_agent = None
if args.agent:
agent_key = args.agent.lower()
active_agent = agent_mapping.get(agent_key)
if not active_agent:
print(f"Error: Unknown agent '{args.agent}'. Valid options: langgraph, react, llamaindex")
return
print(f"[CLI] Using agent: {active_agent}")
print(f"\n{'-' * 30} App Starting {'-' * 30}")
# Determine run mode
run_mode = RunMode.CLI if (args.test or args.testall) else RunMode.UI
# Print environment info only in UI mode
if run_mode == RunMode.UI:
space_host = config.SPACE_HOST
space_id = config.SPACE_ID
if space_host:
print(f"[OK] SPACE_HOST found: {space_host}")
print(f" Runtime URL should be: https://{space_host}.hf.space")
else:
print("[INFO] SPACE_HOST environment variable not found (running locally?).")
if space_id:
print(f"[OK] SPACE_ID found: {space_id}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id}/tree/main")
else:
print("[INFO] SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
print(f"{'-' * (60 + len(' App Starting '))}\n")
# Execute based on run mode
if run_mode == RunMode.UI:
print("Launching Gradio Interface for Basic Agent Evaluation...")
grTestApp = create_ui(run_and_submit_all, run_test_code)
grTestApp.launch()
else: # RunMode.CLI
# Determine test filter based on which CLI flag was used
if args.test:
# Check if custom indices were provided
if args.test == 'default':
# No indices provided, use default
test_filter = config.DEFAULT_TEST_FILTER
else:
# Parse comma-separated indices
try:
test_filter = tuple(int(idx.strip()) for idx in args.test.split(','))
except ValueError:
print(f"Error: Invalid test indices '{args.test}'. Must be comma-separated integers (e.g., 2,4,6)")
return
else: # args.testall
test_filter = None # Test all questions
print(f"Running test code on {len(test_filter) if test_filter else 'ALL'} questions (CLI mode)...")
result = run_test_code(filter=test_filter, active_agent=active_agent)
# Print results
if isinstance(result, pd.DataFrame):
ResultFormatter.print_dataframe(result)
else:
print(result)
if __name__ == "__main__":
main()