agentbee / app.py
mangubee's picture
Update JSON export with execution time and correct flags
ff5bca5
raw
history blame
24.7 kB
import os
import gradio as gr
import requests
import inspect
import pandas as pd
import logging
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
# Stage 1: Import GAIAAgent (LangGraph-based agent)
from src.agent import GAIAAgent
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# (Keep Constants as is)
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- Helper Functions ---
def check_api_keys():
"""Check which API keys are configured."""
keys_status = {
"GOOGLE_API_KEY (Gemini)": "✓ SET" if os.getenv("GOOGLE_API_KEY") else "✗ MISSING",
"HF_TOKEN (HuggingFace)": "✓ SET" if os.getenv("HF_TOKEN") else "✗ MISSING",
"ANTHROPIC_API_KEY (Claude)": "✓ SET" if os.getenv("ANTHROPIC_API_KEY") else "✗ MISSING",
"TAVILY_API_KEY (Search)": "✓ SET" if os.getenv("TAVILY_API_KEY") else "✗ MISSING",
"EXA_API_KEY (Search)": "✓ SET" if os.getenv("EXA_API_KEY") else "✗ MISSING",
}
return "\n".join([f"{k}: {v}" for k, v in keys_status.items()])
def export_results_to_json(results_log: list, submission_status: str, execution_time: float = None,
submission_response: dict = None) -> str:
"""Export evaluation results to JSON file for easy processing.
- Local: Saves to ~/Downloads/gaia_results_TIMESTAMP.json
- HF Spaces: Saves to ./exports/gaia_results_TIMESTAMP.json
- Format: Clean JSON with full error messages, no truncation
Args:
results_log: List of question results
submission_status: Status message from submission
execution_time: Total execution time in seconds
submission_response: Response from GAIA API with correctness info
"""
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"gaia_results_{timestamp}.json"
# Detect environment: HF Spaces or local
if os.getenv("SPACE_ID"):
# HF Spaces: save to local exports directory for Gradio to serve
export_dir = os.path.join(os.getcwd(), "exports")
os.makedirs(export_dir, exist_ok=True)
filepath = os.path.join(export_dir, filename)
else:
# Local: save to Downloads folder
downloads_dir = os.path.expanduser("~/Downloads")
filepath = os.path.join(downloads_dir, filename)
# Extract correctness info from submission response if available
correct_task_ids = set()
if submission_response and "results" in submission_response:
# If API provides per-question results
for item in submission_response.get("results", []):
if item.get("correct"):
correct_task_ids.add(item.get("task_id"))
# Build JSON structure
metadata = {
"generated": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"timestamp": timestamp,
"total_questions": len(results_log)
}
# Add execution time if available
if execution_time is not None:
metadata["execution_time_seconds"] = round(execution_time, 2)
metadata["execution_time_formatted"] = f"{int(execution_time // 60)}m {int(execution_time % 60)}s"
# Add score info if available
if submission_response:
metadata["score_percent"] = submission_response.get("score")
metadata["correct_count"] = submission_response.get("correct_count")
metadata["total_attempted"] = submission_response.get("total_attempted")
export_data = {
"metadata": metadata,
"submission_status": submission_status,
"results": [
{
"task_id": result.get("Task ID", "N/A"),
"question": result.get("Question", "N/A"),
"submitted_answer": result.get("Submitted Answer", "N/A"),
"correct": result.get("Task ID") in correct_task_ids if correct_task_ids else None
}
for result in results_log
]
}
# Write JSON file with pretty formatting
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
logger.info(f"Results exported to: {filepath}")
return filepath
def format_diagnostics(final_state: dict) -> str:
"""Format agent state for diagnostic display."""
diagnostics = []
# Question
diagnostics.append(f"**Question:** {final_state.get('question', 'N/A')}\n")
# Plan
plan = final_state.get('plan', 'No plan generated')
diagnostics.append(f"**Plan:**\n{plan}\n")
# Tool calls
tool_calls = final_state.get('tool_calls', [])
if tool_calls:
diagnostics.append(f"**Tools Selected:** {len(tool_calls)} tool(s)")
for idx, tc in enumerate(tool_calls, 1):
tool_name = tc.get('tool', 'unknown')
params = tc.get('params', {})
diagnostics.append(f" {idx}. {tool_name}({params})")
diagnostics.append("")
else:
diagnostics.append("**Tools Selected:** None\n")
# Tool results
tool_results = final_state.get('tool_results', [])
if tool_results:
diagnostics.append(f"**Tool Execution Results:** {len(tool_results)} result(s)")
for idx, tr in enumerate(tool_results, 1):
tool_name = tr.get('tool', 'unknown')
status = tr.get('status', 'unknown')
if status == 'success':
result_preview = str(tr.get('result', ''))[:100] + "..." if len(str(tr.get('result', ''))) > 100 else str(tr.get('result', ''))
diagnostics.append(f" {idx}. {tool_name}: ✓ SUCCESS")
diagnostics.append(f" Result: {result_preview}")
else:
error = tr.get('error', 'Unknown error')
diagnostics.append(f" {idx}. {tool_name}: ✗ FAILED - {error}")
diagnostics.append("")
# Evidence
evidence = final_state.get('evidence', [])
if evidence:
diagnostics.append(f"**Evidence Collected:** {len(evidence)} item(s)")
for idx, ev in enumerate(evidence, 1):
ev_preview = ev[:150] + "..." if len(ev) > 150 else ev
diagnostics.append(f" {idx}. {ev_preview}")
diagnostics.append("")
else:
diagnostics.append("**Evidence Collected:** None\n")
# Errors
errors = final_state.get('errors', [])
if errors:
diagnostics.append(f"**Errors:** {len(errors)} error(s)")
for idx, err in enumerate(errors, 1):
diagnostics.append(f" {idx}. {err}")
diagnostics.append("")
# Answer
answer = final_state.get('answer', 'No answer generated')
diagnostics.append(f"**Final Answer:** {answer}")
return "\n".join(diagnostics)
def test_single_question(question: str, llm_provider: str, enable_fallback: bool):
"""Test agent with a single question and return diagnostics."""
if not question or not question.strip():
return "Please enter a question.", "", check_api_keys()
try:
# Set LLM provider from UI selection (overrides .env)
os.environ["LLM_PROVIDER"] = llm_provider.lower()
os.environ["ENABLE_LLM_FALLBACK"] = "true" if enable_fallback else "false"
logger.info(f"UI Config: LLM_PROVIDER={llm_provider}, ENABLE_LLM_FALLBACK={enable_fallback}")
# Initialize agent
agent = GAIAAgent()
# Run agent (this stores final_state in agent.last_state)
answer = agent(question)
# Get final state from agent
final_state = agent.last_state or {}
# Format diagnostics with LLM provider info
provider_info = f"**LLM Provider:** {llm_provider} (Fallback: {'Enabled' if enable_fallback else 'Disabled'})\n\n"
diagnostics = provider_info + format_diagnostics(final_state)
api_status = check_api_keys()
return answer, diagnostics, api_status
except Exception as e:
logger.error(f"Error in test_single_question: {e}", exc_info=True)
return f"ERROR: {str(e)}", f"Exception occurred: {str(e)}", check_api_keys()
# --- GAIA Agent (Replaced BasicAgent) ---
# LangGraph-based agent with sequential workflow
# Stage 1: Placeholder nodes, returns fixed answer
# Stage 2: Tool integration
# Stage 3: Planning and reasoning logic
# Stage 4: Error handling and robustness
# Stage 5: Performance optimization
# Stage 6: Async processing with ThreadPoolExecutor
def process_single_question(agent, item, index, total):
"""Process single question with agent, return result with error handling.
Args:
agent: GAIAAgent instance
item: Question item dict with task_id and question
index: Question index (0-based)
total: Total number of questions
Returns:
dict: Result containing task_id, question, answer, and error flag
"""
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
return {
"task_id": task_id,
"question": question_text,
"answer": "ERROR: Missing task_id or question",
"error": True
}
try:
logger.info(f"[{index+1}/{total}] Processing {task_id[:8]}...")
submitted_answer = agent(question_text)
logger.info(f"[{index+1}/{total}] Completed {task_id[:8]}")
return {
"task_id": task_id,
"question": question_text,
"answer": submitted_answer,
"error": False
}
except Exception as e:
logger.error(f"[{index+1}/{total}] Error {task_id[:8]}: {e}")
return {
"task_id": task_id,
"question": question_text,
"answer": f"ERROR: {str(e)}",
"error": True
}
def run_and_submit_all(llm_provider: str, enable_fallback: bool, profile: gr.OAuthProfile | None = None):
"""
Fetches all questions, runs the BasicAgent on them, submits all answers,
and displays the results.
"""
# Start execution timer
start_time = time.time()
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code
if profile:
username = f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None, ""
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# Set LLM provider from UI selection (overrides .env)
os.environ["LLM_PROVIDER"] = llm_provider.lower()
os.environ["ENABLE_LLM_FALLBACK"] = "true" if enable_fallback else "false"
logger.info(f"UI Config for Full Evaluation: LLM_PROVIDER={llm_provider}, ENABLE_LLM_FALLBACK={enable_fallback}")
# 1. Instantiate Agent (Stage 1: GAIAAgent with LangGraph)
try:
logger.info("Initializing GAIAAgent...")
agent = GAIAAgent()
logger.info("GAIAAgent initialized successfully")
except Exception as e:
logger.error(f"Error instantiating agent: {e}")
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None, ""
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public)
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None, ""
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None, ""
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None, ""
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None, ""
# 3. Run your Agent (Stage 6: Concurrent processing)
max_workers = int(os.getenv("MAX_CONCURRENT_WORKERS", "5"))
results_log = []
answers_payload = []
logger.info(f"Running agent on {len(questions_data)} questions with {max_workers} workers...")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all questions for concurrent processing
future_to_index = {
executor.submit(process_single_question, agent, item, idx, len(questions_data)): idx
for idx, item in enumerate(questions_data)
}
# Collect results as they complete
for future in as_completed(future_to_index):
result = future.result()
# Add to results log
results_log.append({
"Task ID": result["task_id"],
"Question": result["question"],
"Submitted Answer": result["answer"],
})
# Add to submission payload if no error
if not result["error"]:
answers_payload.append({
"task_id": result["task_id"],
"submitted_answer": result["answer"]
})
# Log progress
logger.info(f"Progress: {len(results_log)}/{len(questions_data)} questions processed")
if not answers_payload:
print("Agent did not produce any answers to submit.")
status_message = "Agent did not produce any answers to submit."
results_df = pd.DataFrame(results_log)
execution_time = time.time() - start_time
export_path = export_results_to_json(results_log, status_message, execution_time)
return status_message, results_df, export_path
# 4. Prepare Submission
submission_data = {
"username": username.strip(),
"agent_code": agent_code,
"answers": answers_payload,
}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
execution_time = time.time() - start_time
logger.info(f"Total execution time: {execution_time:.2f} seconds ({int(execution_time // 60)}m {int(execution_time % 60)}s)")
# Extract correct task_ids from result_data if available
correct_task_ids = set()
if "results" in result_data:
for item in result_data.get("results", []):
if item.get("correct"):
correct_task_ids.add(item.get("task_id"))
# Add "Correct?" column to results
for result in results_log:
task_id = result.get("Task ID")
if correct_task_ids:
result["Correct?"] = "✅ Yes" if task_id in correct_task_ids else "❌ No"
else:
# If no per-question data, show summary info
result["Correct?"] = f"See summary: {result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct"
results_df = pd.DataFrame(results_log)
# Export to JSON with execution time and submission response
export_path = export_results_to_json(results_log, final_status, execution_time, result_data)
return final_status, results_df, export_path
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
execution_time = time.time() - start_time
results_df = pd.DataFrame(results_log)
export_path = export_results_to_json(results_log, status_message, execution_time)
return status_message, results_df, export_path
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
execution_time = time.time() - start_time
results_df = pd.DataFrame(results_log)
export_path = export_results_to_json(results_log, status_message, execution_time)
return status_message, results_df, export_path
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
execution_time = time.time() - start_time
results_df = pd.DataFrame(results_log)
export_path = export_results_to_json(results_log, status_message, execution_time)
return status_message, results_df, export_path
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
execution_time = time.time() - start_time
results_df = pd.DataFrame(results_log)
export_path = export_results_to_json(results_log, status_message, execution_time)
return status_message, results_df, export_path
# --- Build Gradio Interface using Blocks ---
with gr.Blocks() as demo:
gr.Markdown("# GAIA Agent Evaluation Runner (Stage 4: MVP - Real Integration)")
gr.Markdown(
"""
**Stage 4 Progress:** Adding diagnostics, error handling, and fallback mechanisms.
"""
)
with gr.Tabs():
# Tab 1: Test Single Question (NEW - for diagnostics)
with gr.Tab("🔍 Test & Debug"):
gr.Markdown("""
**Test Mode:** Run the agent on a single question and see detailed diagnostics.
This mode shows:
- API key status
- Execution plan
- Tools selected and executed
- Evidence collected
- Errors encountered
- Final answer
""")
test_question_input = gr.Textbox(
label="Enter Test Question",
placeholder="e.g., What is the capital of France?",
lines=3
)
with gr.Row():
llm_provider_dropdown = gr.Dropdown(
label="LLM Provider",
choices=["Gemini", "HuggingFace", "Groq", "Claude"],
value="Groq",
info="Select which LLM to use for this test"
)
enable_fallback_checkbox = gr.Checkbox(
label="Enable Fallback",
value=False,
info="If enabled, falls back to other providers on failure"
)
test_button = gr.Button("Run Test", variant="primary")
with gr.Row():
with gr.Column(scale=1):
test_answer_output = gr.Textbox(
label="Answer",
lines=3,
interactive=False
)
test_api_status = gr.Textbox(
label="API Keys Status",
lines=5,
interactive=False
)
with gr.Column(scale=2):
test_diagnostics_output = gr.Textbox(
label="Execution Diagnostics",
lines=20,
interactive=False
)
test_button.click(
fn=test_single_question,
inputs=[test_question_input, llm_provider_dropdown, enable_fallback_checkbox],
outputs=[test_answer_output, test_diagnostics_output, test_api_status]
)
# Tab 2: Full Evaluation (existing functionality)
with gr.Tab("📊 Full Evaluation"):
gr.Markdown(
"""
**Instructions:**
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ...
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission.
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score.
---
**Disclaimers:**
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions).
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async.
"""
)
gr.LoginButton()
with gr.Row():
eval_llm_provider_dropdown = gr.Dropdown(
label="LLM Provider for Evaluation",
choices=["Gemini", "HuggingFace", "Groq", "Claude"],
value="Groq",
info="Select which LLM to use for all questions"
)
eval_enable_fallback_checkbox = gr.Checkbox(
label="Enable Fallback",
value=True,
info="Recommended: Enable fallback for production evaluation"
)
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(
label="Run Status / Submission Result", lines=5, interactive=False
)
# Removed max_rows=10 from DataFrame constructor
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
export_output = gr.File(
label="Download Results",
type="filepath"
)
run_button.click(
fn=run_and_submit_all,
inputs=[eval_llm_provider_dropdown, eval_enable_fallback_checkbox],
outputs=[status_output, results_table, export_output]
)
if __name__ == "__main__":
print("\n" + "-" * 30 + " App Starting " + "-" * 30)
# Check for SPACE_HOST and SPACE_ID at startup for information
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup: # Print repo URLs if SPACE_ID is found
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(
f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main"
)
else:
print(
"ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined."
)
print("-" * (60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for Basic Agent Evaluation...")
demo.launch(debug=True, share=False)