Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import requests | |
| import pandas as pd | |
| import traceback | |
| import time | |
| import mimetypes | |
| from tempfile import NamedTemporaryFile | |
| # Import smol-agent and tool components | |
| from smolagents import CodeAgent, LiteLLMModel, tool | |
| from smolagents import DuckDuckGoSearchTool | |
| from unstructured.partition.auto import partition | |
| # Imports for advanced file processing | |
| import speech_recognition as sr | |
| from pydub import AudioSegment | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # --- Tool Definition (Upgraded for Full Multimodality with pydub) --- | |
| def file_reader(file_path: str) -> str: | |
| """ | |
| Reads and analyzes the content of a file and returns relevant text-based information. | |
| Supports: | |
| - Text files (PDF, TXT, CSV) | |
| - Images (PNG, JPG) with OCR | |
| - Audio (MP3, WAV) via speech recognition | |
| - Video (MP4, MOV) via speech recognition on audio track | |
| Can be used with a local file path or a web URL. | |
| Args: | |
| file_path (str): The local path or web URL of the file to be read. | |
| Returns: | |
| str: Extracted or transcribed content as text. | |
| """ | |
| temp_file_path = None | |
| audio_temp_path = None | |
| try: | |
| # Download the file if it's a URL | |
| if file_path.startswith("http://") or file_path.startswith("https://"): | |
| temp_file_path = NamedTemporaryFile(delete=False).name | |
| response = requests.get(file_path, timeout=20) | |
| response.raise_for_status() | |
| with open(temp_file_path, "wb") as f: | |
| f.write(response.content) | |
| local_path = temp_file_path | |
| else: | |
| local_path = file_path | |
| mime_type, _ = mimetypes.guess_type(local_path) | |
| recognizer = sr.Recognizer() | |
| if mime_type: | |
| # Handle audio files | |
| if mime_type.startswith("audio/"): | |
| with sr.AudioFile(local_path) as source: | |
| audio = recognizer.record(source) | |
| return recognizer.recognize_whisper(audio) | |
| # Handle video files by extracting audio with pydub | |
| elif mime_type.startswith("video/"): | |
| with NamedTemporaryFile(suffix=".wav", delete=False) as audio_temp: | |
| audio_temp_path = audio_temp.name | |
| # Extract audio using pydub | |
| video_audio = AudioSegment.from_file(local_path, format=mime_type.split('/')[1]) | |
| video_audio.export(audio_temp_path, format="wav") | |
| with sr.AudioFile(audio_temp_path) as source: | |
| audio = recognizer.record(source) | |
| return recognizer.recognize_whisper(audio) | |
| # Default to handling text and images with OCR if not audio/video | |
| elements = partition(local_path) | |
| return "\n\n".join([str(el) for el in elements]) | |
| except Exception as e: | |
| return f"Error reading or processing file '{file_path}': {e}" | |
| finally: | |
| # Clean up the downloaded file if it exists | |
| if temp_file_path and os.path.exists(temp_file_path): | |
| os.remove(temp_file_path) | |
| # Clean up the temporary audio file | |
| if audio_temp_path and os.path.exists(audio_temp_path): | |
| os.remove(audio_temp_path) | |
| # --- Agent Class (Updated with More Powerful Model and Tools) --- | |
| class GaiaSmolAgent: | |
| def __init__(self): | |
| """ | |
| Initializes the optimized agent. | |
| Now uses a more powerful model and the agent's native conversation memory. | |
| """ | |
| print("Initializing Optimized GaiaSmolAgent...") | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| if not api_key: | |
| raise ValueError("API key 'GEMINI_API_KEY' not found in environment secrets.") | |
| # Use a more powerful, "clever" model for better reasoning. | |
| model = LiteLLMModel( | |
| model_id="gemini/gemini-1.5-pro-latest", | |
| api_key=api_key, | |
| temperature=0.0, | |
| timeout=120.0, # Add a timeout to prevent hanging | |
| ) | |
| # --- CHANGE 1: ENHANCED SYSTEM PROMPT --- | |
| # A more detailed prompt that guides the agent on how to handle GAIA-specific challenges, | |
| # such as precise data extraction, calculations, and structured reasoning. | |
| self.system_prompt = """ | |
| You are an expert-level research assistant AI, specifically designed to solve challenging questions from the GAIA benchmark. Your goal is to provide a precise and accurate final answer by meticulously following a step-by-step plan. | |
| **Available Tools:** | |
| - `duck_duck_go_search(query: str) -> str`: Use this for web searches to find information, URLs, facts, etc. | |
| - `file_reader(file_path: str) -> str`: Use this to read content from local files or web URLs. It handles text, PDFs, images (OCR), audio, and video. | |
| **Your Thought Process & Execution Strategy:** | |
| 1. **Analyze the Question:** First, break down the user's question to fully understand all its components, constraints, and the exact type of information required for the answer (e.g., a number, a date, a name). | |
| 2. **Formulate a Step-by-Step Plan:** Before using any tools, you MUST outline your plan in your thoughts. For example: "Step 1: Search for the document URL. Step 2: Use the file_reader to read the document. Step 3: Extract the specific data point. Step 4: Perform calculation if needed. Step 5: Provide the final answer." | |
| 3. **Execute and Verify:** Execute your plan one step at a time. After each tool call, review the output. Verify if the information obtained is sufficient and accurate. If a step fails or the result is not what you expected, REVISE your plan. | |
| 4. **Synthesize the Answer:** Once you have gathered and verified all necessary information, formulate the final answer. Use the Python interpreter for any calculations, data sorting, or text processing to ensure accuracy. | |
| **CRITICAL INSTRUCTIONS:** | |
| - **Precision is Key:** Pay close attention to the requested format of the final answer. If a question asks for a number, your final answer must be only that number. | |
| - **Code for Calculations:** ALWAYS use the Python interpreter for any calculations, date comparisons, or data manipulation. Do not perform calculations in your head. | |
| - **Autonomous Operation:** You must work autonomously. Make the most logical deduction based on the information you gather. Do not ask for clarification. | |
| - **Final Answer:** Your final output MUST be a single call to the `final_answer(answer: str)` function with the precise answer. | |
| """ | |
| # Initialize the agent with the updated file_reader tool and memory settings. | |
| self.agent = CodeAgent( | |
| model=model, | |
| tools=[file_reader, DuckDuckGoSearchTool()], | |
| add_base_tools=True, # Provides python interpreter and final_answer | |
| # --- CHANGE 2: MORE REACTIVE PLANNING --- | |
| # By setting planning_interval=1, the agent re-evaluates its plan | |
| # after every single tool execution. This allows it to immediately course-correct | |
| # based on new information, which is vital for complex, multi-step tasks. | |
| planning_interval=1 | |
| ) | |
| print("Optimized GaiaSmolAgent initialized successfully with enhanced prompt and reactive planning.") | |
| def __call__(self, question: str, reset_memory: bool = False) -> str: | |
| """ | |
| Directly runs the agent to generate and execute a plan to answer the question. | |
| It leverages the agent's built-in memory, controlled by the `reset` parameter. | |
| Args: | |
| question (str): The user's question. | |
| reset_memory (bool): If True, the agent's conversation memory will be cleared | |
| before running. Maps to the agent's `reset` parameter. | |
| """ | |
| print(f"Optimized Agent received question: {question[:100]}...") | |
| try: | |
| # Combine the system prompt with the current question. The agent will handle the history. | |
| full_prompt = f"{self.system_prompt}\n\nCURRENT TASK:\nUser Question: \"{question}\"" | |
| # Use the agent's `reset` parameter to control conversation memory. | |
| # `reset=False` keeps the memory from previous calls. | |
| final_answer = self.agent.run(full_prompt, reset=reset_memory) | |
| except Exception as e: | |
| print(f"FATAL AGENT ERROR: An exception occurred during agent execution: {e}") | |
| print(traceback.format_exc()) # Print full traceback for easier debugging | |
| return f"FATAL AGENT ERROR: {e}" | |
| print(f"Optimized Agent returning final answer: {final_answer}") | |
| return str(final_answer) | |
| # --- Main Application Logic (Unchanged) --- | |
| def run_and_submit_all( profile: gr.OAuthProfile | None): | |
| """ | |
| Fetches all questions, runs the BasicAgent on them, submits all answers, | |
| and displays the results. | |
| """ | |
| # --- Determine HF Space Runtime URL and Repo URL --- | |
| space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
| if profile: | |
| username= f"{profile.username}" | |
| print(f"User logged in: {username}") | |
| else: | |
| print("User not logged in.") | |
| return "Please Login to Hugging Face with the button.", None | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| submit_url = f"{api_url}/submit" | |
| # 1. Instantiate Agent ( modify this part to create your agent) | |
| try: | |
| agent = GaiaSmolAgent() | |
| except Exception as e: | |
| print(f"Error instantiating agent: {e}") | |
| return f"Error initializing agent: {e}", None | |
| # In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
| print(agent_code) | |
| # 2. Fetch Questions | |
| print(f"Fetching questions from: {questions_url}") | |
| try: | |
| response = requests.get(questions_url, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| print("Fetched questions list is empty.") | |
| return "Fetched questions list is empty or invalid format.", None | |
| print(f"Fetched {len(questions_data)} questions.") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching questions: {e}") | |
| return f"Error fetching questions: {e}", None | |
| except requests.exceptions.JSONDecodeError as e: | |
| print(f"Error decoding JSON response from questions endpoint: {e}") | |
| print(f"Response text: {response.text[:500]}") | |
| return f"Error decoding server response for questions: {e}", None | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching questions: {e}") | |
| return f"An unexpected error occurred fetching questions: {e}", None | |
| # 3. Run your Agent | |
| results_log = [] | |
| answers_payload = [] | |
| print(f"Running agent on {len(questions_data)} questions...") | |
| for item in questions_data: | |
| task_id = item.get("task_id") | |
| question_text = item.get("question") | |
| if not task_id or question_text is None: | |
| print(f"Skipping item with missing task_id or question: {item}") | |
| continue | |
| try: | |
| submitted_answer = agent(question_text) | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
| except Exception as e: | |
| print(f"Error running agent on task {task_id}: {e}") | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
| if not answers_payload: | |
| print("Agent did not produce any answers to submit.") | |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
| # 4. Prepare Submission | |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
| status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
| print(status_update) | |
| # 5. Submit | |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
| try: | |
| response = requests.post(submit_url, json=submission_data, timeout=60) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| final_status = ( | |
| f"Submission Successful!\n" | |
| f"User: {result_data.get('username')}\n" | |
| f"Overall Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {result_data.get('message', 'No message received.')}" | |
| ) | |
| print("Submission successful.") | |
| results_df = pd.DataFrame(results_log) | |
| return final_status, results_df | |
| except requests.exceptions.HTTPError as e: | |
| error_detail = f"Server responded with status {e.response.status_code}." | |
| try: | |
| error_json = e.response.json() | |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_detail += f" Response: {e.response.text[:500]}" | |
| status_message = f"Submission Failed: {error_detail}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.Timeout: | |
| status_message = "Submission Failed: The request timed out." | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.RequestException as e: | |
| status_message = f"Submission Failed: Network error - {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except Exception as e: | |
| status_message = f"An unexpected error occurred during submission: {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| # --- Gradio Interface (Updated Instructions) --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# GAIA Agent Evaluation Runner (smol-agent)") | |
| gr.Markdown( | |
| """ | |
| **Instructions:** | |
| 1. Ensure you have added your **GEMINI API key** (as `GEMINI_API_KEY`) in the Space's secrets. | |
| 2. Log in to your Hugging Face account using the button below. | |
| 3. Click 'Run Evaluation & Submit All Answers' to run your agent and see the score. | |
| """ | |
| ) | |
| gr.LoginButton() | |
| run_button = gr.Button("Run Evaluation & Submit All Answers") | |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
| run_button.click( | |
| fn=run_and_submit_all, | |
| outputs=[status_output, results_table] | |
| ) | |
| if __name__ == "__main__": | |
| print("Launching Gradio Interface for GAIA Agent Evaluation...") | |
| demo.launch(debug=True, share=False) | |