Spaces:
No application file
No application file
| import os | |
| import gradio as gr | |
| import requests | |
| import pandas as pd | |
| import tempfile | |
| import json | |
| import logging | |
| from typing import Optional | |
| # Import the optimized agent from the separate module | |
| from agent_enhanced import GAIAAgent | |
| # ============ CONFIGURATION ============ | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ============ API INTERACTION ============ | |
| def fetch_questions(api_url: str = DEFAULT_API_URL, max_retries: int = 3) -> list: | |
| """Fetch all questions from the GAIA API with retry logic.""" | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.get(f"{api_url}/questions", timeout=30) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| logger.warning(f"Attempt {attempt + 1} failed: {e}") | |
| if attempt == max_retries - 1: | |
| raise | |
| return [] | |
| def fetch_random_question(api_url: str = DEFAULT_API_URL, max_retries: int = 3) -> dict: | |
| """Fetch a random question from the GAIA API with retry logic.""" | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.get(f"{api_url}/random-question", timeout=30) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| logger.warning(f"Attempt {attempt + 1} failed: {e}") | |
| if attempt == max_retries - 1: | |
| raise | |
| return {} | |
| def fetch_file(task_id: str, api_url: str = DEFAULT_API_URL, max_retries: int = 3) -> Optional[str]: | |
| """Fetch a file associated with a task with retry logic.""" | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.get(f"{api_url}/files/{task_id}", timeout=30) | |
| if response.status_code == 200: | |
| # Save to temp file | |
| content_disposition = response.headers.get('content-disposition', '') | |
| filename = f"task_{task_id}_file" | |
| if 'filename=' in content_disposition: | |
| filename = content_disposition.split('filename=')[1].strip('"') | |
| temp_dir = tempfile.mkdtemp() | |
| file_path = os.path.join(temp_dir, filename) | |
| with open(file_path, 'wb') as f: | |
| f.write(response.content) | |
| logger.info(f"Downloaded file: {file_path}") | |
| return file_path | |
| elif response.status_code == 404: | |
| logger.info(f"No file found for task {task_id}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| logger.warning(f"File fetch attempt {attempt + 1} failed: {e}") | |
| if attempt == max_retries - 1: | |
| logger.error(f"Failed to fetch file for task {task_id}: {e}") | |
| return None | |
| def submit_answers(username: str, agent_code: str, answers: list, api_url: str = DEFAULT_API_URL, max_retries: int = 3) -> dict: | |
| """Submit answers to the GAIA API with retry logic.""" | |
| payload = { | |
| "username": username, | |
| "agent_code": agent_code, | |
| "answers": answers | |
| } | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.post(f"{api_url}/submit", json=payload, timeout=60) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| logger.warning(f"Submission attempt {attempt + 1} failed: {e}") | |
| if attempt == max_retries - 1: | |
| raise | |
| return {} | |
| # ============ ANSWER VALIDATION ============ | |
| def validate_answer_format(answer: str) -> tuple[bool, str]: | |
| """Validate answer format and return (is_valid, warning_message).""" | |
| if not answer or answer.strip() == "": | |
| return False, "Warning: Answer is empty" | |
| # Check for common prefixes that should be removed | |
| prefixes = ["FINAL ANSWER:", "The answer is:", "Answer:", "final answer:"] | |
| answer_lower = answer.lower() | |
| for prefix in prefixes: | |
| if answer_lower.startswith(prefix.lower()): | |
| return False, f"Warning: Answer contains prefix '{prefix}' which will be removed. Consider removing it." | |
| # Check for explanations (multiple sentences) | |
| if answer.count('.') > 1 or answer.count('because') > 0 or answer.count('since') > 0: | |
| return False, "Warning: Answer may contain explanations. Only the answer should be submitted." | |
| return True, "" | |
| # ============ GRADIO INTERFACE ============ | |
| def run_agent_on_questions(openai_api_key: str, progress=gr.Progress()): | |
| """Run the agent on all GAIA questions.""" | |
| if not openai_api_key: | |
| return "Please provide your OpenAI API key.", None | |
| try: | |
| # Initialize agent | |
| progress(0, desc="Initializing agent...") | |
| agent = GAIAAgent(api_key=openai_api_key) | |
| # Fetch questions | |
| progress(0.05, desc="Fetching questions from API...") | |
| questions = fetch_questions() | |
| if not questions: | |
| return "Error: Failed to fetch questions from API. Please try again.", None | |
| total_questions = len(questions) | |
| results = [] | |
| answers_for_submission = [] | |
| for i, q in enumerate(questions): | |
| progress((i + 1) / total_questions, desc=f"Processing question {i+1}/{total_questions}...") | |
| task_id = q.get("task_id", "") | |
| question_text = q.get("question", "") | |
| # Check if there's an associated file | |
| file_path = None | |
| if q.get("file_name"): | |
| progress((i + 0.5) / total_questions, desc=f"Downloading file for question {i+1}...") | |
| file_path = fetch_file(task_id) | |
| # Run agent | |
| try: | |
| progress((i + 0.7) / total_questions, desc=f"Agent reasoning for question {i+1}...") | |
| answer = agent.run(question_text, task_id, file_path) | |
| # Validate answer format | |
| is_valid, warning = validate_answer_format(answer) | |
| if not is_valid: | |
| logger.warning(f"Question {i+1} ({task_id}): {warning}") | |
| except Exception as e: | |
| logger.error(f"Error processing question {i+1} ({task_id}): {e}") | |
| answer = f"Error: {str(e)}" | |
| results.append({ | |
| "Task ID": task_id, | |
| "Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, | |
| "Answer": answer, | |
| "Status": "β" if answer and not answer.startswith("Error:") else "β" | |
| }) | |
| answers_for_submission.append({ | |
| "task_id": task_id, | |
| "submitted_answer": answer | |
| }) | |
| # Cleanup temp file | |
| if file_path and os.path.exists(file_path): | |
| try: | |
| os.remove(file_path) | |
| # Also try to remove temp directory if empty | |
| temp_dir = os.path.dirname(file_path) | |
| if os.path.exists(temp_dir): | |
| try: | |
| os.rmdir(temp_dir) | |
| except: | |
| pass | |
| except Exception as e: | |
| logger.warning(f"Failed to cleanup file {file_path}: {e}") | |
| df = pd.DataFrame(results) | |
| progress(1.0, desc="Complete!") | |
| return df, answers_for_submission | |
| except Exception as e: | |
| logger.error(f"Error in run_agent_on_questions: {e}") | |
| return f"Error: {str(e)}", None | |
| def submit_to_leaderboard(username: str, space_url: str, answers_json: str): | |
| """Submit answers to the leaderboard.""" | |
| if not username or not space_url or not answers_json: | |
| return "Please fill in all fields and run the agent first." | |
| try: | |
| answers = json.loads(answers_json) if isinstance(answers_json, str) else answers_json | |
| if not isinstance(answers, list) or len(answers) == 0: | |
| return "Error: Answers must be a non-empty list. Please run the agent first." | |
| # Validate answer format before submission | |
| warnings = [] | |
| for ans in answers: | |
| if "task_id" not in ans or "submitted_answer" not in ans: | |
| return "Error: Invalid answer format. Each answer must have 'task_id' and 'submitted_answer'." | |
| is_valid, warning = validate_answer_format(ans.get("submitted_answer", "")) | |
| if not is_valid: | |
| warnings.append(f"Task {ans.get('task_id')}: {warning}") | |
| # Ensure space URL ends with /tree/main | |
| if not space_url.endswith("/tree/main"): | |
| space_url = space_url.rstrip("/") + "/tree/main" | |
| # Submit to API | |
| result = submit_answers(username, space_url, answers) | |
| score = result.get("score", 0) | |
| correct = result.get("correct_count", 0) | |
| total = result.get("total_attempted", 0) | |
| warning_text = "" | |
| if warnings: | |
| warning_text = f"\n\nβ οΈ **Warnings:**\n" + "\n".join(f"- {w}" for w in warnings[:5]) | |
| if len(warnings) > 5: | |
| warning_text += f"\n- ... and {len(warnings) - 5} more warnings" | |
| return f""" | |
| ## Submission Successful! π | |
| **Score:** {score:.1%} | |
| **Correct:** {correct}/{total} | |
| {'π Congratulations! You passed the 30% threshold!' if score >= 0.3 else 'π Keep improving! You need 30% to earn your certificate.'} | |
| {warning_text} | |
| Check the [leaderboard](https://huggingface.co/spaces/agents-course/Students_leaderboard) to see your ranking! | |
| """ | |
| except json.JSONDecodeError as e: | |
| return f"Error: Invalid JSON format. Please run the agent first.\nDetails: {str(e)}" | |
| except Exception as e: | |
| logger.error(f"Submission error: {e}") | |
| return f"Submission error: {str(e)}" | |
| def test_single_question(openai_api_key: str): | |
| """Test the agent on a single random question.""" | |
| if not openai_api_key: | |
| return "Please provide your OpenAI API key.", "", "", "" | |
| try: | |
| agent = GAIAAgent(api_key=openai_api_key) | |
| question_data = fetch_random_question() | |
| if not question_data: | |
| return "Error: Failed to fetch question from API.", "", "", "" | |
| task_id = question_data.get("task_id", "") | |
| question_text = question_data.get("question", "") | |
| file_path = None | |
| if question_data.get("file_name"): | |
| file_path = fetch_file(task_id) | |
| answer = agent.run(question_text, task_id, file_path) | |
| # Validate answer format | |
| is_valid, warning = validate_answer_format(answer) | |
| validation_status = "β Valid format" if is_valid else f"β οΈ {warning}" | |
| # Cleanup temp file | |
| if file_path and os.path.exists(file_path): | |
| try: | |
| os.remove(file_path) | |
| temp_dir = os.path.dirname(file_path) | |
| if os.path.exists(temp_dir): | |
| try: | |
| os.rmdir(temp_dir) | |
| except: | |
| pass | |
| except Exception as e: | |
| logger.warning(f"Failed to cleanup file: {e}") | |
| return question_text, answer, task_id, validation_status | |
| except Exception as e: | |
| logger.error(f"Error in test_single_question: {e}") | |
| return f"Error: {str(e)}", "", "", "" | |
| # ============ BUILD GRADIO APP ============ | |
| with gr.Blocks(title="GAIA Agent - LangGraph", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # π€ GAIA Benchmark Agent (LangGraph) | |
| This agent uses **LangGraph** to solve GAIA benchmark questions. It has access to: | |
| - π Web Search (DuckDuckGo) | |
| - π Wikipedia Search | |
| - π Python Code Execution | |
| - π File Reading (PDF, Text, Excel) | |
| - π’ Calculator | |
| ## Instructions | |
| 1. Enter your OpenAI API key | |
| 2. Test with a single question or run on all questions | |
| 3. Submit your answers to the leaderboard | |
| """) | |
| with gr.Row(): | |
| openai_key = gr.Textbox( | |
| label="OpenAI API Key", | |
| type="password", | |
| placeholder="sk-...", | |
| info="Required for GPT-4o" | |
| ) | |
| with gr.Tabs(): | |
| with gr.TabItem("π§ͺ Test Single Question"): | |
| test_btn = gr.Button("Fetch & Solve Random Question", variant="primary") | |
| test_question = gr.Textbox(label="Question", lines=5, interactive=False) | |
| test_answer = gr.Textbox(label="Agent's Answer", lines=3, interactive=False) | |
| test_task_id = gr.Textbox(label="Task ID", interactive=False) | |
| test_validation = gr.Textbox(label="Answer Validation", interactive=False) | |
| test_btn.click( | |
| test_single_question, | |
| inputs=[openai_key], | |
| outputs=[test_question, test_answer, test_task_id, test_validation] | |
| ) | |
| with gr.TabItem("π Run Full Benchmark"): | |
| run_btn = gr.Button("Run Agent on All Questions", variant="primary") | |
| results_table = gr.Dataframe(label="Results") | |
| answers_state = gr.State() | |
| run_btn.click( | |
| run_agent_on_questions, | |
| inputs=[openai_key], | |
| outputs=[results_table, answers_state] | |
| ) | |
| with gr.TabItem("π€ Submit to Leaderboard"): | |
| gr.Markdown(""" | |
| ### Submit Your Results | |
| After running the full benchmark, fill in your details and submit to the leaderboard. | |
| **Requirements:** | |
| - Your HuggingFace username | |
| - Your Space URL (must end with `/tree/main`) | |
| - Answers will be auto-filled after running the benchmark | |
| """) | |
| with gr.Row(): | |
| username_input = gr.Textbox( | |
| label="HuggingFace Username", | |
| placeholder="your-username", | |
| info="Your HuggingFace account username" | |
| ) | |
| space_url_input = gr.Textbox( | |
| label="Your Space URL", | |
| placeholder="https://huggingface.co/spaces/your-username/your-space", | |
| info="Full URL to your Space (will auto-append /tree/main if needed)" | |
| ) | |
| answers_input = gr.Textbox( | |
| label="Answers JSON (auto-filled after running benchmark)", | |
| lines=10, | |
| placeholder="Run the full benchmark first...", | |
| info="This will be automatically populated after running the benchmark" | |
| ) | |
| submit_btn = gr.Button("Submit to Leaderboard", variant="primary") | |
| submit_result = gr.Markdown() | |
| # Auto-fill answers when benchmark completes | |
| def format_answers(answers): | |
| if answers: | |
| return json.dumps(answers, indent=2) | |
| return "" | |
| answers_state.change(format_answers, inputs=[answers_state], outputs=[answers_input]) | |
| submit_btn.click( | |
| submit_to_leaderboard, | |
| inputs=[username_input, space_url_input, answers_input], | |
| outputs=[submit_result] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### π Tips for Better Scores | |
| **Answer Formatting:** | |
| - Answers are matched **exactly** (character-for-character), so precision is critical | |
| - Do NOT include prefixes like "FINAL ANSWER:" or "The answer is:" | |
| - For lists: use comma-separated format with NO spaces (e.g., "item1,item2,item3") | |
| - For numbers: just the number, no units unless specified | |
| - Check the validation status in the test tab | |
| **Agent Capabilities:** | |
| - Uses GPT-4o for optimal reasoning | |
| - Automatically reads files (PDFs, Excel, text) when available | |
| - Web search for current information | |
| - Wikipedia for factual lookups | |
| - Python execution for calculations | |
| **Best Practices:** | |
| 1. Test with a single question first to verify the agent works | |
| 2. Run the full benchmark (takes ~10-15 minutes) | |
| 3. Review answers before submission | |
| 4. Ensure your Space is public for verification | |
| ### π Links | |
| - [GAIA Benchmark](https://huggingface.co/spaces/gaia-benchmark/leaderboard) | |
| - [Student Leaderboard](https://huggingface.co/spaces/agents-course/Students_leaderboard) | |
| - [Course Unit 4](https://huggingface.co/learn/agents-course/en/unit4/hands-on) | |
| - [API Documentation](https://agents-course-unit4-scoring.hf.space/docs) | |
| """) | |
| if __name__ == "__main__": | |
| # For HuggingFace Spaces, use share=False | |
| # For local development, you can use share=True to get a public link | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |