Diego-Fco's picture
Clean project structure with English comments
b712b2b
"""
GAIA Agent - Main Gradio Interface
Enhanced agent with question-type specific strategies
"""
import os
import re
import json
import time
import gradio as gr
import requests
import pandas as pd
# Importar mΓ³dulos locales
from config import DEFAULT_API_URL
from agent import EnhancedAgent
from utils import detect_question_type, download_file_for_task
# ============================================================================
# FUNCIONES PRINCIPALES DE GRADIO
# ============================================================================
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""Run the agent on all questions and submit the results."""
space_id = os.getenv("SPACE_ID")
if profile:
username = f"{profile.username}"
else:
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
# Load questions
try:
print("πŸ“₯ Loading questions from server...")
response = requests.get(questions_url, timeout=15)
questions_data = response.json()
print(f" βœ“ {len(questions_data)} questions loaded")
except Exception as e:
return f"Error fetching questions: {e}", None
# Create agent (reusable)
print("\nπŸ€– Creating agent...")
agent = EnhancedAgent()
results_log = []
answers_payload = []
diagnostics = []
print(f"\n{'='*80}")
print(f"πŸš€ Starting processing of {len(questions_data)} questions")
print(f"{'='*80}\n")
for i, item in enumerate(questions_data):
task_id = item.get("task_id")
question_text = item.get("question", "")
file_name = item.get("file_name", "")
if not task_id:
continue
print(f"\n{'='*80}")
print(f"[{i+1}/{len(questions_data)}] Task: {task_id}")
print(f"{'='*80}")
print(f"❓ Question: {question_text[:150]}...")
# Detect question type
question_type = detect_question_type(question_text, file_name)
print(f"πŸ” Detected type: {question_type}")
if file_name:
print(f"πŸ“Ž Expected file: {file_name}")
# Download file if exists
local_file = download_file_for_task(task_id)
# Show URLs found in the question
url_pattern = r"https?://[\w\-\./?&=%#]+"
found_urls = re.findall(url_pattern, question_text)
for url in found_urls:
print(f" πŸ”— URL found: {url}")
# Execute agent
start_time = time.time()
print(f"βš™οΈ Processing with strategy '{question_type}'...")
try:
submitted_answer, execution_logs = agent.solve(
question_text,
local_file,
question_type
)
# Limpieza de seguridad
if len(submitted_answer) > 200:
submitted_answer = submitted_answer[:200]
except Exception as e:
print(f"❌ Error: {e}")
submitted_answer = "Error"
execution_logs = str(e)
elapsed = time.time() - start_time
print(f"\nβœ… Answer: {submitted_answer}")
print(f"⏱️ Time: {elapsed:.1f}s")
# Save results
answers_payload.append({
"task_id": task_id,
"submitted_answer": submitted_answer
})
results_log.append({
"Task ID": task_id,
"Index": i,
"Type": question_type,
"Question": question_text[:100] + "..." if len(question_text) > 100 else question_text,
"File": file_name or "N/A",
"Answer": submitted_answer,
"Time (s)": round(elapsed, 1)
})
diagnostics.append({
"index": i,
"task_id": task_id,
"question_type": question_type,
"question": question_text,
"file_name": file_name,
"answer": submitted_answer,
"elapsed_seconds": round(elapsed, 1)
})
# Clean up temporary file
if local_file and os.path.exists(local_file):
try:
os.remove(local_file)
except:
pass
# Save diagnostics
try:
ts = time.strftime("%Y%m%d_%H%M%S")
diag_path = f"diagnostics_{ts}.json"
with open(diag_path, "w", encoding="utf-8") as f:
json.dump(diagnostics, f, ensure_ascii=False, indent=2)
print(f"\nπŸ“Š Diagnostics saved: {diag_path}")
except Exception as e:
print(f"⚠️ Error saving diagnostics: {e}")
# Submit results
print(f"\n{'='*80}")
print("πŸ“€ Submitting answers to server...")
submission_data = {
"username": username.strip(),
"agent_code": agent_code,
"answers": answers_payload
}
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
try:
result = response.json()
score = result.get('score', 'N/A')
correct = result.get('correct_count', '?')
total = result.get('total_count', len(questions_data))
status_msg = f"""βœ… Submission Successful!
πŸ“Š Score: {score}%
βœ“ Correct: {correct}/{total}
πŸ‘€ Username: {username}
"""
print(status_msg)
return status_msg, pd.DataFrame(results_log)
except Exception:
return f"Submission Failed (Server Error): {response.text}", pd.DataFrame(results_log)
except Exception as e:
return f"Submission Failed: {e}", pd.DataFrame(results_log)
# ============================================================================
# INTERFAZ GRADIO
# ============================================================================
with gr.Blocks() as demo:
gr.Markdown("""
# πŸ€– GAIA Agent - Optimized for Files, YouTube and Logic
This agent uses question-specific strategies:
- πŸ“Š **Excel/CSV Files**: Reads and analyzes data with pandas
- 🎬 **YouTube**: Searches for transcripts and online discussions
- πŸ–ΌοΈ **Images**: Searches for information on the web
- 🎡 **Audio**: Searches for transcripts online
- πŸ“ **Wikipedia**: Navigates and extracts information
- πŸ”’ **Counting**: Lists items and counts programmatically
- πŸ”„ **Text Manipulation**: Handles reversed text, opposites, etc.
""")
gr.LoginButton()
with gr.Row():
run_button = gr.Button("πŸš€ Run Evaluation & Submit All Answers", variant="primary")
status_output = gr.Textbox(label="πŸ“‹ Status", lines=6, interactive=False)
results_table = gr.DataFrame(
label="πŸ“Š Detailed Results",
wrap=True,
headers=["Task ID", "Index", "Type", "Question", "File", "Answer", "Time (s)"]
)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("πŸš€ Starting GAIA Agent...")
demo.launch()