File size: 11,844 Bytes
2cbacd8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
import os
import gradio as gr
import requests
import pandas as pd
import traceback
import time
# Import smol-agent and tool components
from smolagents import CodeAgent, LiteLLMModel, tool
from smolagents import DuckDuckGoSearchTool
from unstructured.partition.auto import partition
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- Tool Definition ---
@tool
def file_reader(file_path: str) -> str:
"""Reads the content of a file and returns its text content.
This tool supports various file types like PDF, TXT, CSV, etc., from either
a local path or a web URL.
Args:
file_path (str): The local path or web URL of the file to be read.
"""
try:
if file_path.startswith("http://") or file_path.startswith("https://"):
response = requests.get(file_path, timeout=20)
response.raise_for_status()
with open("temp_file", "wb") as f:
f.write(response.content)
elements = partition("temp_file")
os.remove("temp_file") # Clean up
else:
elements = partition(file_path)
return "\n\n".join([str(el) for el in elements])
except Exception as e:
return f"Error reading or processing file '{file_path}': {e}"
# --- Agent Class (Now using a free Open-Source LLM) ---
class GaiaSmolAgent:
def __init__(self):
#print("Initializing GaiaSmolAgent with a free Open-Source LLM via Groq...")
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise ValueError("API key 'GEMINI_API_KEY' not found in environment secrets.")
#model = InferenceClientModel(model_id="Qwen/Qwen2.5-Coder-32B-Instruct", provider="together")
self.planner_model = LiteLLMModel(
#model_id="groq/llama3-8b-8192",
model_id="gemini/gemini-1.5-pro-latest",
api_key=api_key,
temperature=0.0,
)
# Initialize the agent with the tools it can use.
self.executor_agent = CodeAgent(
model=self.planner_model,
tools=[file_reader, DuckDuckGoSearchTool()],
add_base_tools=True, # Provides a python interpreter
)
print("GaiaSmolAgent initialized successfully.")
def _generate_script(self, question: str) -> str:
"""Generates a self-contained Python script to answer the question."""
print(f"Generating script for question: {question[:100]}...")
prompt = f"""
You are an expert Python programmer. Your task is to write a single, self-contained Python script to answer the user's question.
You have access to the following functions which are pre-imported and ready to use:
- `duck_duck_go_search(query: str) -> str`: Searches the web and returns a string with the results.
- `file_reader(file_path: str) -> str`: Reads a file and returns its contents as a string.
CRITICAL INSTRUCTIONS:
1. Your output must be ONLY the Python code for the script. Do not add any explanation or markdown formatting like ```python.
2. The script MUST end with a call to a function `final_answer(answer: str)`.
3. The `answer` passed to `final_answer` must be a single, concise string.
4. All logic, including processing the string outputs from the tools, must be included in this single script. State is preserved within the script.
Question: "{question}"
Example for "What is the capital of France?":
search_result = duck_duck_go_search("capital of France")
# In a real scenario, you would parse this string to find the answer.
# For this example, we'll just summarize the string.
answer = "Based on the search, the capital is likely Paris." # Replace with actual logic
final_answer(answer)
Now, write the Python script to answer the user's question.
"""
messages = [{"role": "user", "content": [{"type": "text", "text": prompt}]}]
response_object = self.planner_model.generate(messages)
# --- THIS IS THE FIX ---
# The response is an object, not a string. We need to access its .content attribute.
response_content = response_object.content
if "```python" in response_content:
response_content = response_content.split("```python")[1].split("```")[0].strip()
print(f"--- Generated Script ---\n{response_content}\n------------------------")
return response_content
def __call__(self, question: str) -> str:
"""Generates and executes a single script to answer the question."""
print(f"Agent received question: {question[:100]}...")
try:
script_to_execute = self._generate_script(question)
final_answer = self.executor_agent.run(script_to_execute)
except Exception as e:
print(f"FATAL AGENT ERROR: An exception occurred during agent execution: {e}")
print(traceback.format_exc()) # Print the full traceback for debugging
return f"FATAL AGENT ERROR: {e}"
print(f"Agent returning final answer: {final_answer}")
return str(final_answer)
# --- Main Application Logic (Unchanged) ---
def run_and_submit_all( profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the BasicAgent on them, submits all answers,
and displays the results.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code
if profile:
username= f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent ( modify this part to create your agent)
try:
agent = BasicAgent()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public)
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
# 3. Run your Agent
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
try:
submitted_answer = agent(question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Gradio Interface (Updated Instructions) ---
with gr.Blocks() as demo:
gr.Markdown("# GAIA Agent Evaluation Runner (smol-agent)")
gr.Markdown(
"""
**Instructions:**
1. Ensure you have added your **GEMINI API key** (as `GEMINI_API_KEY`) in the Space's secrets.
2. Log in to your Hugging Face account using the button below.
3. Click 'Run Evaluation & Submit All Answers' to run your agent and see the score.
"""
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("Launching Gradio Interface for GAIA Agent Evaluation...")
demo.launch(debug=True, share=False)
|