""" GAIA Agent - Main Gradio Interface Enhanced agent with question-type specific strategies """ import os import re import json import time import gradio as gr import requests import pandas as pd # Importar mรณdulos locales from config import DEFAULT_API_URL from agent import EnhancedAgent from utils import detect_question_type, download_file_for_task # ============================================================================ # FUNCIONES PRINCIPALES DE GRADIO # ============================================================================ def run_and_submit_all(profile: gr.OAuthProfile | None): """Run the agent on all questions and submit the results.""" space_id = os.getenv("SPACE_ID") if profile: username = f"{profile.username}" else: return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" # Load questions try: print("๐Ÿ“ฅ Loading questions from server...") response = requests.get(questions_url, timeout=15) questions_data = response.json() print(f" โœ“ {len(questions_data)} questions loaded") except Exception as e: return f"Error fetching questions: {e}", None # Create agent (reusable) print("\n๐Ÿค– Creating agent...") agent = EnhancedAgent() results_log = [] answers_payload = [] diagnostics = [] print(f"\n{'='*80}") print(f"๐Ÿš€ Starting processing of {len(questions_data)} questions") print(f"{'='*80}\n") for i, item in enumerate(questions_data): task_id = item.get("task_id") question_text = item.get("question", "") file_name = item.get("file_name", "") if not task_id: continue print(f"\n{'='*80}") print(f"[{i+1}/{len(questions_data)}] Task: {task_id}") print(f"{'='*80}") print(f"โ“ Question: {question_text[:150]}...") # Detect question type question_type = detect_question_type(question_text, file_name) print(f"๐Ÿ” Detected type: {question_type}") if file_name: print(f"๐Ÿ“Ž Expected file: {file_name}") # Download file if exists local_file = download_file_for_task(task_id) # Show URLs found in the question url_pattern = r"https?://[\w\-\./?&=%#]+" found_urls = re.findall(url_pattern, question_text) for url in found_urls: print(f" ๐Ÿ”— URL found: {url}") # Execute agent start_time = time.time() print(f"โš™๏ธ Processing with strategy '{question_type}'...") try: submitted_answer, execution_logs = agent.solve( question_text, local_file, question_type ) # Limpieza de seguridad if len(submitted_answer) > 200: submitted_answer = submitted_answer[:200] except Exception as e: print(f"โŒ Error: {e}") submitted_answer = "Error" execution_logs = str(e) elapsed = time.time() - start_time print(f"\nโœ… Answer: {submitted_answer}") print(f"โฑ๏ธ Time: {elapsed:.1f}s") # Save results answers_payload.append({ "task_id": task_id, "submitted_answer": submitted_answer }) results_log.append({ "Task ID": task_id, "Index": i, "Type": question_type, "Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, "File": file_name or "N/A", "Answer": submitted_answer, "Time (s)": round(elapsed, 1) }) diagnostics.append({ "index": i, "task_id": task_id, "question_type": question_type, "question": question_text, "file_name": file_name, "answer": submitted_answer, "elapsed_seconds": round(elapsed, 1) }) # Clean up temporary file if local_file and os.path.exists(local_file): try: os.remove(local_file) except: pass # Save diagnostics try: ts = time.strftime("%Y%m%d_%H%M%S") diag_path = f"diagnostics_{ts}.json" with open(diag_path, "w", encoding="utf-8") as f: json.dump(diagnostics, f, ensure_ascii=False, indent=2) print(f"\n๐Ÿ“Š Diagnostics saved: {diag_path}") except Exception as e: print(f"โš ๏ธ Error saving diagnostics: {e}") # Submit results print(f"\n{'='*80}") print("๐Ÿ“ค Submitting answers to server...") submission_data = { "username": username.strip(), "agent_code": agent_code, "answers": answers_payload } try: response = requests.post(submit_url, json=submission_data, timeout=60) try: result = response.json() score = result.get('score', 'N/A') correct = result.get('correct_count', '?') total = result.get('total_count', len(questions_data)) status_msg = f"""โœ… Submission Successful! ๐Ÿ“Š Score: {score}% โœ“ Correct: {correct}/{total} ๐Ÿ‘ค Username: {username} """ print(status_msg) return status_msg, pd.DataFrame(results_log) except Exception: return f"Submission Failed (Server Error): {response.text}", pd.DataFrame(results_log) except Exception as e: return f"Submission Failed: {e}", pd.DataFrame(results_log) # ============================================================================ # INTERFAZ GRADIO # ============================================================================ with gr.Blocks() as demo: gr.Markdown(""" # ๐Ÿค– GAIA Agent - Optimized for Files, YouTube and Logic This agent uses question-specific strategies: - ๐Ÿ“Š **Excel/CSV Files**: Reads and analyzes data with pandas - ๐ŸŽฌ **YouTube**: Searches for transcripts and online discussions - ๐Ÿ–ผ๏ธ **Images**: Searches for information on the web - ๐ŸŽต **Audio**: Searches for transcripts online - ๐Ÿ“ **Wikipedia**: Navigates and extracts information - ๐Ÿ”ข **Counting**: Lists items and counts programmatically - ๐Ÿ”„ **Text Manipulation**: Handles reversed text, opposites, etc. """) gr.LoginButton() with gr.Row(): run_button = gr.Button("๐Ÿš€ Run Evaluation & Submit All Answers", variant="primary") status_output = gr.Textbox(label="๐Ÿ“‹ Status", lines=6, interactive=False) results_table = gr.DataFrame( label="๐Ÿ“Š Detailed Results", wrap=True, headers=["Task ID", "Index", "Type", "Question", "File", "Answer", "Time (s)"] ) run_button.click( fn=run_and_submit_all, outputs=[status_output, results_table] ) if __name__ == "__main__": print("๐Ÿš€ Starting GAIA Agent...") demo.launch()