import os import gradio as gr import requests import pandas as pd import traceback import time # Import smol-agent and tool components from smolagents import CodeAgent, LiteLLMModel, tool from smolagents import DuckDuckGoSearchTool from unstructured.partition.auto import partition # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # --- Tool Definition --- @tool def file_reader(file_path: str) -> str: """Reads the content of a file and returns its text content. This tool supports various file types like PDF, TXT, CSV, etc., from either a local path or a web URL. Args: file_path (str): The local path or web URL of the file to be read. """ try: if file_path.startswith("http://") or file_path.startswith("https://"): response = requests.get(file_path, timeout=20) response.raise_for_status() with open("temp_file", "wb") as f: f.write(response.content) elements = partition("temp_file") os.remove("temp_file") # Clean up else: elements = partition(file_path) return "\n\n".join([str(el) for el in elements]) except Exception as e: return f"Error reading or processing file '{file_path}': {e}" # --- Agent Class (Now using a free Open-Source LLM) --- class GaiaSmolAgent: def __init__(self): #print("Initializing GaiaSmolAgent with a free Open-Source LLM via Groq...") api_key = os.getenv("GEMINI_API_KEY") if not api_key: raise ValueError("API key 'GEMINI_API_KEY' not found in environment secrets.") #model = InferenceClientModel(model_id="Qwen/Qwen2.5-Coder-32B-Instruct", provider="together") self.planner_model = LiteLLMModel( #model_id="groq/llama3-8b-8192", model_id="gemini/gemini-1.5-pro-latest", api_key=api_key, temperature=0.0, ) # Initialize the agent with the tools it can use. self.executor_agent = CodeAgent( model=self.planner_model, tools=[file_reader, DuckDuckGoSearchTool()], add_base_tools=True, # Provides a python interpreter ) print("GaiaSmolAgent initialized successfully.") def _generate_script(self, question: str) -> str: """Generates a self-contained Python script to answer the question.""" print(f"Generating script for question: {question[:100]}...") prompt = f""" You are an expert Python programmer. Your task is to write a single, self-contained Python script to answer the user's question. You have access to the following functions which are pre-imported and ready to use: - `duck_duck_go_search(query: str) -> str`: Searches the web and returns a string with the results. - `file_reader(file_path: str) -> str`: Reads a file and returns its contents as a string. CRITICAL INSTRUCTIONS: 1. Your output must be ONLY the Python code for the script. Do not add any explanation or markdown formatting like ```python. 2. The script MUST end with a call to a function `final_answer(answer: str)`. 3. The `answer` passed to `final_answer` must be a single, concise string. 4. All logic, including processing the string outputs from the tools, must be included in this single script. State is preserved within the script. Question: "{question}" Example for "What is the capital of France?": search_result = duck_duck_go_search("capital of France") # In a real scenario, you would parse this string to find the answer. # For this example, we'll just summarize the string. answer = "Based on the search, the capital is likely Paris." # Replace with actual logic final_answer(answer) Now, write the Python script to answer the user's question. """ messages = [{"role": "user", "content": [{"type": "text", "text": prompt}]}] response_object = self.planner_model.generate(messages) # --- THIS IS THE FIX --- # The response is an object, not a string. We need to access its .content attribute. response_content = response_object.content if "```python" in response_content: response_content = response_content.split("```python")[1].split("```")[0].strip() print(f"--- Generated Script ---\n{response_content}\n------------------------") return response_content def __call__(self, question: str) -> str: """Generates and executes a single script to answer the question.""" print(f"Agent received question: {question[:100]}...") try: script_to_execute = self._generate_script(question) final_answer = self.executor_agent.run(script_to_execute) except Exception as e: print(f"FATAL AGENT ERROR: An exception occurred during agent execution: {e}") print(traceback.format_exc()) # Print the full traceback for debugging return f"FATAL AGENT ERROR: {e}" print(f"Agent returning final answer: {final_answer}") return str(final_answer) # --- Main Application Logic (Unchanged) --- def run_and_submit_all( profile: gr.OAuthProfile | None): """ Fetches all questions, runs the BasicAgent on them, submits all answers, and displays the results. """ # --- Determine HF Space Runtime URL and Repo URL --- space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code if profile: username= f"{profile.username}" print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" # 1. Instantiate Agent ( modify this part to create your agent) try: agent = BasicAgent() except Exception as e: print(f"Error instantiating agent: {e}") return f"Error initializing agent: {e}", None # In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" print(agent_code) # 2. Fetch Questions print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: print("Fetched questions list is empty.") return "Fetched questions list is empty or invalid format.", None print(f"Fetched {len(questions_data)} questions.") except requests.exceptions.RequestException as e: print(f"Error fetching questions: {e}") return f"Error fetching questions: {e}", None except requests.exceptions.JSONDecodeError as e: print(f"Error decoding JSON response from questions endpoint: {e}") print(f"Response text: {response.text[:500]}") return f"Error decoding server response for questions: {e}", None except Exception as e: print(f"An unexpected error occurred fetching questions: {e}") return f"An unexpected error occurred fetching questions: {e}", None # 3. Run your Agent results_log = [] answers_payload = [] print(f"Running agent on {len(questions_data)} questions...") for item in questions_data: task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue try: submitted_answer = agent(question_text) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) except Exception as e: print(f"Error running agent on task {task_id}: {e}") results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) if not answers_payload: print("Agent did not produce any answers to submit.") return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) # 4. Prepare Submission submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." print(status_update) # 5. Submit print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") results_df = pd.DataFrame(results_log) return final_status, results_df except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except requests.exceptions.JSONDecodeError: error_detail += f" Response: {e.response.text[:500]}" status_message = f"Submission Failed: {error_detail}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.Timeout: status_message = "Submission Failed: The request timed out." print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.RequestException as e: status_message = f"Submission Failed: Network error - {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df # --- Gradio Interface (Updated Instructions) --- with gr.Blocks() as demo: gr.Markdown("# GAIA Agent Evaluation Runner (smol-agent)") gr.Markdown( """ **Instructions:** 1. Ensure you have added your **GEMINI API key** (as `GEMINI_API_KEY`) in the Space's secrets. 2. Log in to your Hugging Face account using the button below. 3. Click 'Run Evaluation & Submit All Answers' to run your agent and see the score. """ ) gr.LoginButton() run_button = gr.Button("Run Evaluation & Submit All Answers") status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) run_button.click( fn=run_and_submit_all, outputs=[status_output, results_table] ) if __name__ == "__main__": print("Launching Gradio Interface for GAIA Agent Evaluation...") demo.launch(debug=True, share=False)