Spaces:
Sleeping
Sleeping
| """ | |
| GAIA Agent - Gradio Application | |
| This is the main entry point for the Hugging Face Space. | |
| It provides a Gradio interface for running the GAIA evaluation | |
| and submitting answers to the scoring API. | |
| LOCAL DEBUGGING: | |
| 1. Create a .env file with your API keys | |
| 2. Run: python app.py | |
| 3. Open http://localhost:7860 in your browser | |
| """ | |
| import os | |
| import tempfile | |
| import gradio as gr | |
| import requests | |
| import pandas as pd | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from dotenv import load_dotenv | |
| # Load environment variables from .env file (for local development) | |
| load_dotenv() | |
| # Use the ReAct agent (multi-step reasoning) | |
| from agent import run_agent, run_agent_verbose | |
| # ============== CONFIGURATION ============== | |
| API_BASE = os.getenv("GAIA_API_BASE", "https://agents-course-unit4-scoring.hf.space") | |
| DEBUG_MODE = os.getenv("DEBUG_MODE", "false").lower() == "true" | |
| # ============== FILE HANDLING ============== | |
| def fetch_task_file(task_id: str, file_name: str = "") -> Optional[str]: | |
| """ | |
| Fetch a file attached to a GAIA task and save it locally. | |
| Args: | |
| task_id: The GAIA task ID | |
| file_name: Expected filename (helps determine file type) | |
| Returns: | |
| Local file path if successful, None if no file or error | |
| """ | |
| if not file_name: | |
| return None | |
| try: | |
| url = f"{API_BASE}/files/{task_id}" | |
| print(f"π₯ Fetching file from: {url}") | |
| response = requests.get(url, timeout=60) | |
| if response.status_code == 200: | |
| # Try to get filename from content-disposition header | |
| content_disp = response.headers.get('content-disposition', '') | |
| if 'filename=' in content_disp: | |
| filename = content_disp.split('filename=')[1].strip('"\'') | |
| else: | |
| filename = file_name | |
| # Save to temp directory | |
| file_path = os.path.join(tempfile.gettempdir(), filename) | |
| with open(file_path, 'wb') as f: | |
| f.write(response.content) | |
| file_size = len(response.content) | |
| print(f"β File saved: {file_path} ({file_size} bytes)") | |
| return file_path | |
| else: | |
| print(f"β οΈ File fetch failed: HTTP {response.status_code}") | |
| return None | |
| except Exception as e: | |
| print(f"β Error fetching file: {e}") | |
| return None | |
| # ============== API FUNCTIONS ============== | |
| def fetch_questions() -> List[Dict[str, Any]]: | |
| """Fetch all GAIA questions from the evaluation API.""" | |
| try: | |
| response = requests.get(f"{API_BASE}/questions", timeout=30) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| print(f"Failed to fetch questions: {response.status_code}") | |
| except Exception as e: | |
| print(f"Error fetching questions: {e}") | |
| return [] | |
| def fetch_random_question() -> Optional[Dict[str, Any]]: | |
| """Fetch a single random question for testing.""" | |
| try: | |
| response = requests.get(f"{API_BASE}/random-question", timeout=30) | |
| if response.status_code == 200: | |
| return response.json() | |
| except Exception as e: | |
| print(f"Error fetching random question: {e}") | |
| return None | |
| def submit_answers(username: str, agent_code_url: str, answers: List[Dict[str, str]]) -> Optional[Dict[str, Any]]: | |
| """Submit answers to the scoring API.""" | |
| try: | |
| payload = { | |
| "username": username, | |
| "agent_code": agent_code_url, | |
| "answers": answers | |
| } | |
| response = requests.post( | |
| f"{API_BASE}/submit", | |
| json=payload, | |
| timeout=120 | |
| ) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| print(f"Submission failed: {response.status_code} - {response.text}") | |
| except Exception as e: | |
| print(f"Error submitting answers: {e}") | |
| return None | |
| # ============== LOCAL DEBUG FUNCTIONS ============== | |
| def run_single_question_local(question_text: str, task_id: str, file_name: str) -> Tuple[str, str, str]: | |
| """ | |
| Run the agent on a manually entered question (for local debugging). | |
| """ | |
| if not question_text.strip(): | |
| return "Please enter a question", "", "" | |
| task_id = task_id.strip() or "local_test" | |
| file_name = file_name.strip() or None | |
| print(f"\n{'='*60}") | |
| print(f"LOCAL DEBUG - Running agent") | |
| print(f"Task ID: {task_id}") | |
| print(f"Question: {question_text[:200]}...") | |
| print(f"File: {file_name or 'None'}") | |
| print(f"{'='*60}\n") | |
| # Pre-fetch file if specified | |
| local_file_path = None | |
| if file_name and task_id != "local_test": | |
| local_file_path = fetch_task_file(task_id, file_name) | |
| try: | |
| answer = run_agent_verbose(question_text, task_id, file_name, local_file_path) | |
| return question_text, answer, f"Processed task: {task_id}" | |
| except Exception as e: | |
| import traceback | |
| error_details = traceback.format_exc() | |
| print(f"Error:\n{error_details}") | |
| return question_text, f"Error: {str(e)}\n\nDetails:\n{error_details}", "Failed" | |
| def run_random_question() -> Tuple[str, str, str, str, str]: | |
| """Fetch and run a random question from the API.""" | |
| question_data = fetch_random_question() | |
| if not question_data: | |
| return "Failed to fetch question", "", "", "", "" | |
| task_id = question_data.get("task_id", "unknown") | |
| question = question_data.get("question", "") | |
| file_name = question_data.get("file_name", "") | |
| level = question_data.get("Level", "?") | |
| print(f"\n{'='*60}") | |
| print(f"RANDOM QUESTION from API") | |
| print(f"Task ID: {task_id}") | |
| print(f"Level: {level}") | |
| print(f"Question: {question[:200]}...") | |
| print(f"File: {file_name or 'None'}") | |
| print(f"{'='*60}\n") | |
| # Pre-fetch file if attached | |
| local_file_path = None | |
| if file_name: | |
| local_file_path = fetch_task_file(task_id, file_name) | |
| try: | |
| answer = run_agent_verbose(question, task_id, file_name if file_name else None, local_file_path) | |
| status = f"β Task: {task_id} | Level: {level}" | |
| return question, task_id, file_name or "", answer, status | |
| except Exception as e: | |
| import traceback | |
| error_details = traceback.format_exc() | |
| print(f"Error:\n{error_details}") | |
| return question, task_id, file_name or "", f"Error: {str(e)}", "β Failed" | |
| def run_specific_question(task_id_input: str) -> Tuple[str, str, str, str, str]: | |
| """Run a specific question by task ID.""" | |
| task_id_input = task_id_input.strip() | |
| if not task_id_input: | |
| return "Please enter a task ID", "", "", "", "" | |
| # Fetch all questions and find the matching one | |
| questions = fetch_questions() | |
| matching = [q for q in questions if q.get("task_id") == task_id_input] | |
| if not matching: | |
| return f"Task ID not found: {task_id_input}", task_id_input, "", "", "β Not found" | |
| q = matching[0] | |
| task_id = q.get("task_id", "") | |
| question = q.get("question", "") | |
| file_name = q.get("file_name", "") | |
| level = q.get("Level", "?") | |
| print(f"\n{'='*60}") | |
| print(f"SPECIFIC QUESTION: {task_id}") | |
| print(f"Level: {level}") | |
| print(f"Question: {question[:200]}...") | |
| print(f"File: {file_name or 'None'}") | |
| print(f"{'='*60}\n") | |
| # Pre-fetch file if attached | |
| local_file_path = None | |
| if file_name: | |
| local_file_path = fetch_task_file(task_id, file_name) | |
| try: | |
| answer = run_agent(question, task_id, file_name if file_name else None, local_file_path) | |
| status = f"β Completed | Level: {level}" | |
| return question, task_id, file_name or "", answer, status | |
| except Exception as e: | |
| import traceback | |
| error_details = traceback.format_exc() | |
| print(f"Error:\n{error_details}") | |
| return question, task_id, file_name or "", f"Error: {str(e)}", "β Failed" | |
| def list_all_questions() -> pd.DataFrame: | |
| """Fetch and display all available questions.""" | |
| questions = fetch_questions() | |
| if not questions: | |
| return pd.DataFrame({"error": ["Failed to fetch questions"]}) | |
| data = [] | |
| for q in questions: | |
| data.append({ | |
| "task_id": q.get("task_id", "")[:20] + "...", | |
| "question": q.get("question", "")[:80] + "...", | |
| "file": q.get("file_name", "") or "-", | |
| "level": q.get("Level", "?") | |
| }) | |
| return pd.DataFrame(data) | |
| def run_full_evaluation_local(username: str) -> Tuple[str, pd.DataFrame]: | |
| """ | |
| Run full evaluation in local mode (without HF OAuth). | |
| """ | |
| if not username.strip(): | |
| return "β Please enter your HuggingFace username", pd.DataFrame() | |
| username = username.strip() | |
| agent_code_url = f"https://huggingface.co/spaces/{username}/GAIA-Agent/tree/main" | |
| print(f"\n{'='*60}") | |
| print(f"FULL EVALUATION - LOCAL MODE") | |
| print(f"Username: {username}") | |
| print(f"Agent URL: {agent_code_url}") | |
| print(f"{'='*60}\n") | |
| # Fetch questions | |
| questions = fetch_questions() | |
| if not questions: | |
| return "β Failed to fetch questions from API.", pd.DataFrame() | |
| print(f"Fetched {len(questions)} questions") | |
| # Process each question | |
| results = [] | |
| answers_for_submission = [] | |
| for i, q in enumerate(questions): | |
| task_id = q.get("task_id", "unknown") | |
| question = q.get("question", "") | |
| file_name = q.get("file_name", "") | |
| print(f"\n[{i+1}/{len(questions)}] Processing: {task_id}") | |
| print(f"Question: {question[:100]}...") | |
| # Pre-fetch file if attached | |
| local_file_path = None | |
| if file_name: | |
| local_file_path = fetch_task_file(task_id, file_name) | |
| try: | |
| answer = run_agent(question, task_id, file_name if file_name else None, local_file_path) | |
| print(f"Answer: {answer[:100]}...") | |
| results.append({ | |
| "task_id": task_id[:15] + "...", | |
| "question": question[:60] + "...", | |
| "answer": answer[:80] + "..." if len(answer) > 80 else answer | |
| }) | |
| answers_for_submission.append({ | |
| "task_id": task_id, | |
| "submitted_answer": answer | |
| }) | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| results.append({ | |
| "task_id": task_id[:15] + "...", | |
| "question": question[:60] + "...", | |
| "answer": f"ERROR: {str(e)[:50]}" | |
| }) | |
| answers_for_submission.append({ | |
| "task_id": task_id, | |
| "submitted_answer": "" | |
| }) | |
| # Submit answers | |
| print(f"\n{'='*60}") | |
| print("Submitting answers...") | |
| print(f"{'='*60}\n") | |
| submission_result = submit_answers(username, agent_code_url, answers_for_submission) | |
| df = pd.DataFrame(results) | |
| if submission_result: | |
| score = submission_result.get("score", "N/A") | |
| correct = submission_result.get("correct_count", "?") | |
| total = submission_result.get("total_count", len(questions)) | |
| status = f"β Submitted!\n\nπ Score: {score}\nβ Correct: {correct}/{total}" | |
| print(f"\nFinal Score: {score} ({correct}/{total})") | |
| else: | |
| status = "β Submission failed. Check logs for details." | |
| return status, df | |
| def run_full_evaluation_hf(profile: gr.OAuthProfile = None) -> Tuple[str, pd.DataFrame]: | |
| """ | |
| Run full evaluation with HuggingFace OAuth (for deployed Space). | |
| """ | |
| if profile is None: | |
| return "β Please log in with your Hugging Face account first.", pd.DataFrame() | |
| return run_full_evaluation_local(profile.username) | |
| # ============== BUILD GRADIO INTERFACE ============== | |
| def create_app(): | |
| """Create and configure the Gradio application.""" | |
| # Check if running locally (no HF Space environment) | |
| is_local = os.getenv("SPACE_ID") is None | |
| with gr.Blocks(title="GAIA Agent - Debug & Evaluation") as demo: | |
| gr.Markdown(""" | |
| # π€ GAIA Agent - Debug & Evaluation Interface | |
| Built with **LangGraph** and **OpenAI GPT-4** for the HuggingFace Agents Course. | |
| """) | |
| # Show environment info | |
| env_info = "π₯οΈ **Local Mode**" if is_local else "βοΈ **HuggingFace Space Mode**" | |
| api_key_status = "β API Key Set" if os.getenv("OPENAI_API_KEY") else "β OPENAI_API_KEY not set!" | |
| gr.Markdown(f""" | |
| **Environment:** {env_info} | **OpenAI:** {api_key_status} | |
| --- | |
| """) | |
| with gr.Tabs(): | |
| # ============== TAB 1: Quick Test ============== | |
| with gr.TabItem("π§ͺ Quick Test"): | |
| gr.Markdown("### Test with a random question from the GAIA API") | |
| with gr.Row(): | |
| random_btn = gr.Button("π² Fetch & Run Random Question", variant="primary") | |
| with gr.Row(): | |
| with gr.Column(): | |
| random_question = gr.Textbox(label="Question", lines=4, interactive=False) | |
| random_task_id = gr.Textbox(label="Task ID", lines=1, interactive=False) | |
| random_file = gr.Textbox(label="Attached File", lines=1, interactive=False) | |
| with gr.Column(): | |
| random_answer = gr.Textbox(label="Agent Answer", lines=4, interactive=False) | |
| random_status = gr.Textbox(label="Status", lines=1, interactive=False) | |
| random_btn.click( | |
| fn=run_random_question, | |
| outputs=[random_question, random_task_id, random_file, random_answer, random_status] | |
| ) | |
| # ============== TAB 2: Debug Specific ============== | |
| with gr.TabItem("π Debug Specific Question"): | |
| gr.Markdown("### Run a specific question by Task ID") | |
| with gr.Row(): | |
| specific_task_input = gr.Textbox( | |
| label="Task ID", | |
| placeholder="e.g., 8e867cd7-cff9-4e6c-867a-ff5ddc2550be", | |
| lines=1 | |
| ) | |
| specific_btn = gr.Button("βΆοΈ Run", variant="primary") | |
| with gr.Row(): | |
| with gr.Column(): | |
| specific_question = gr.Textbox(label="Question", lines=4, interactive=False) | |
| specific_file = gr.Textbox(label="Attached File", lines=1, interactive=False) | |
| with gr.Column(): | |
| specific_answer = gr.Textbox(label="Agent Answer", lines=4, interactive=False) | |
| specific_status = gr.Textbox(label="Status", lines=1, interactive=False) | |
| specific_btn.click( | |
| fn=run_specific_question, | |
| inputs=[specific_task_input], | |
| outputs=[specific_question, specific_task_input, specific_file, specific_answer, specific_status] | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("### All Available Questions") | |
| with gr.Row(): | |
| list_btn = gr.Button("π Load Question List") | |
| questions_table = gr.Dataframe( | |
| headers=["task_id", "question", "file", "level"], | |
| label="Questions", | |
| wrap=True | |
| ) | |
| list_btn.click(fn=list_all_questions, outputs=[questions_table]) | |
| # ============== TAB 3: Manual Input ============== | |
| with gr.TabItem("βοΈ Manual Input"): | |
| gr.Markdown("### Test with custom question (for debugging)") | |
| with gr.Row(): | |
| with gr.Column(): | |
| manual_question = gr.Textbox( | |
| label="Question", | |
| lines=4, | |
| placeholder="Enter your test question here..." | |
| ) | |
| manual_task_id = gr.Textbox( | |
| label="Task ID (optional)", | |
| lines=1, | |
| placeholder="test_001" | |
| ) | |
| manual_file = gr.Textbox( | |
| label="File Name (optional)", | |
| lines=1, | |
| placeholder="e.g., data.xlsx" | |
| ) | |
| with gr.Column(): | |
| manual_answer = gr.Textbox(label="Agent Answer", lines=4, interactive=False) | |
| manual_status = gr.Textbox(label="Status", lines=2, interactive=False) | |
| with gr.Row(): | |
| manual_btn = gr.Button("βΆοΈ Run Agent", variant="primary") | |
| manual_btn.click( | |
| fn=run_single_question_local, | |
| inputs=[manual_question, manual_task_id, manual_file], | |
| outputs=[manual_question, manual_answer, manual_status] | |
| ) | |
| # ============== TAB 4: Full Evaluation ============== | |
| with gr.TabItem("π Full Evaluation"): | |
| gr.Markdown("### Run all 20 questions and submit for scoring") | |
| if is_local: | |
| # Local mode - manual username input | |
| gr.Markdown("**Local Mode:** Enter your HuggingFace username to submit.") | |
| with gr.Row(): | |
| username_input = gr.Textbox( | |
| label="HuggingFace Username", | |
| placeholder="your-username", | |
| lines=1 | |
| ) | |
| with gr.Row(): | |
| full_eval_btn_local = gr.Button("π Run Full Evaluation & Submit", variant="primary") | |
| with gr.Row(): | |
| status_output_local = gr.Textbox( | |
| label="Status", | |
| lines=4, | |
| interactive=False, | |
| placeholder="Click 'Run Full Evaluation' to start..." | |
| ) | |
| with gr.Row(): | |
| results_table_local = gr.Dataframe( | |
| headers=["task_id", "question", "answer"], | |
| label="Results", | |
| wrap=True | |
| ) | |
| full_eval_btn_local.click( | |
| fn=run_full_evaluation_local, | |
| inputs=[username_input], | |
| outputs=[status_output_local, results_table_local] | |
| ) | |
| else: | |
| # HF Space mode - OAuth login | |
| gr.Markdown("**Space Mode:** Log in with HuggingFace to submit.") | |
| with gr.Row(): | |
| login_btn = gr.LoginButton(variant="huggingface") | |
| with gr.Row(): | |
| full_eval_btn_hf = gr.Button("π Run Full Evaluation & Submit", variant="primary") | |
| with gr.Row(): | |
| status_output_hf = gr.Textbox( | |
| label="Status", | |
| lines=4, | |
| interactive=False, | |
| placeholder="Log in and click 'Run Full Evaluation' to start..." | |
| ) | |
| with gr.Row(): | |
| results_table_hf = gr.Dataframe( | |
| headers=["task_id", "question", "answer"], | |
| label="Results", | |
| wrap=True | |
| ) | |
| full_eval_btn_hf.click( | |
| fn=run_full_evaluation_hf, | |
| outputs=[status_output_hf, results_table_hf] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### π Resources | |
| - [Course Page](https://huggingface.co/learn/agents-course/unit4/hands-on) | |
| - [API Docs](https://agents-course-unit4-scoring.hf.space/docs) | |
| - [Leaderboard](https://huggingface.co/spaces/agents-course/Students_leaderboard) | |
| ### π§ Local Setup | |
| ```bash | |
| # 1. Create .env file | |
| echo "OPENAI_API_KEY=sk-your-key-here" > .env | |
| # 2. Install dependencies | |
| pip install -r requirements.txt | |
| # 3. Run the app | |
| python app.py | |
| ``` | |
| """) | |
| return demo | |
| # ============== MAIN ============== | |
| if __name__ == "__main__": | |
| print("\n" + "="*60) | |
| print("π€ GAIA Agent - Starting Gradio Interface") | |
| print("="*60) | |
| # Check for API key | |
| if not os.getenv("OPENAI_API_KEY"): | |
| print("\nβ οΈ WARNING: OPENAI_API_KEY not set!") | |
| print(" Create a .env file with: OPENAI_API_KEY=sk-your-key") | |
| print(" Or set it as an environment variable.\n") | |
| else: | |
| print("β OpenAI API Key detected") | |
| print(f"π‘ GAIA API: {API_BASE}") | |
| print("="*60 + "\n") | |
| # Create and launch the app | |
| demo = create_app() | |
| demo.launch( | |
| server_name="0.0.0.0", # Allow external connections | |
| server_port=7860, | |
| share=False, # Set to True to get a public URL | |
| debug=DEBUG_MODE # Enable debug mode for better error messages | |
| ) | |