|
|
import os |
|
|
import gradio as gr |
|
|
import requests |
|
|
import pandas as pd |
|
|
import traceback |
|
|
import time |
|
|
import mimetypes |
|
|
from tempfile import NamedTemporaryFile |
|
|
|
|
|
|
|
|
from smolagents import CodeAgent, LiteLLMModel, tool |
|
|
from smolagents import DuckDuckGoSearchTool |
|
|
from unstructured.partition.auto import partition |
|
|
|
|
|
|
|
|
import speech_recognition as sr |
|
|
from pydub import AudioSegment |
|
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
|
|
|
@tool |
|
|
def file_reader(file_path: str) -> str: |
|
|
""" |
|
|
Reads and analyzes the content of a file and returns relevant text-based information. |
|
|
Supports: |
|
|
- Text files (PDF, TXT, CSV) |
|
|
- Images (PNG, JPG) with OCR |
|
|
- Audio (MP3, WAV) via speech recognition |
|
|
- Video (MP4, MOV) via speech recognition on audio track |
|
|
Can be used with a local file path or a web URL. |
|
|
|
|
|
Args: |
|
|
file_path (str): The local path or web URL of the file to be read. |
|
|
Returns: |
|
|
str: Extracted or transcribed content as text. |
|
|
""" |
|
|
temp_file_path = None |
|
|
audio_temp_path = None |
|
|
try: |
|
|
|
|
|
if file_path.startswith("http://") or file_path.startswith("https://"): |
|
|
temp_file_path = NamedTemporaryFile(delete=False).name |
|
|
response = requests.get(file_path, timeout=20) |
|
|
response.raise_for_status() |
|
|
with open(temp_file_path, "wb") as f: |
|
|
f.write(response.content) |
|
|
local_path = temp_file_path |
|
|
else: |
|
|
local_path = file_path |
|
|
|
|
|
mime_type, _ = mimetypes.guess_type(local_path) |
|
|
recognizer = sr.Recognizer() |
|
|
|
|
|
if mime_type: |
|
|
|
|
|
if mime_type.startswith("audio/"): |
|
|
with sr.AudioFile(local_path) as source: |
|
|
audio = recognizer.record(source) |
|
|
return recognizer.recognize_whisper(audio) |
|
|
|
|
|
|
|
|
elif mime_type.startswith("video/"): |
|
|
with NamedTemporaryFile(suffix=".wav", delete=False) as audio_temp: |
|
|
audio_temp_path = audio_temp.name |
|
|
|
|
|
|
|
|
video_audio = AudioSegment.from_file(local_path, format=mime_type.split('/')[1]) |
|
|
video_audio.export(audio_temp_path, format="wav") |
|
|
|
|
|
with sr.AudioFile(audio_temp_path) as source: |
|
|
audio = recognizer.record(source) |
|
|
|
|
|
return recognizer.recognize_whisper(audio) |
|
|
|
|
|
|
|
|
elements = partition(local_path) |
|
|
return "\n\n".join([str(el) for el in elements]) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error reading or processing file '{file_path}': {e}" |
|
|
finally: |
|
|
|
|
|
if temp_file_path and os.path.exists(temp_file_path): |
|
|
os.remove(temp_file_path) |
|
|
|
|
|
if audio_temp_path and os.path.exists(audio_temp_path): |
|
|
os.remove(audio_temp_path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GaiaSmolAgent: |
|
|
def __init__(self): |
|
|
""" |
|
|
Initializes the optimized agent. |
|
|
Now uses a more powerful model and the agent's native conversation memory. |
|
|
""" |
|
|
print("Initializing Optimized GaiaSmolAgent...") |
|
|
api_key = os.getenv("GEMINI_API_KEY") |
|
|
if not api_key: |
|
|
raise ValueError("API key 'GEMINI_API_KEY' not found in environment secrets.") |
|
|
|
|
|
|
|
|
model = LiteLLMModel( |
|
|
model_id="gemini/gemini-1.5-pro-latest", |
|
|
api_key=api_key, |
|
|
temperature=0.0, |
|
|
timeout=120.0, |
|
|
) |
|
|
|
|
|
|
|
|
self.system_prompt = """ |
|
|
You are an expert-level research assistant AI. Your sole purpose is to answer the user's question by breaking it down into logical steps and using the provided tools. You will have access to the conversation history, so use it for context. |
|
|
|
|
|
**Available Tools:** |
|
|
- `duck_duck_go_search(query: str) -> str`: Use this to find information, file URLs, or anything on the web. |
|
|
- `file_reader(file_path: str) -> str`: Use this to read the contents of a file from a local path or a web URL. It can read text, extract text from images (OCR), and transcribe audio from audio/video files. |
|
|
|
|
|
**Your Thought Process:** |
|
|
1. **Deconstruct the Goal:** Carefully analyze the question to understand what information is needed, considering the previous turns in the conversation. |
|
|
2. **Formulate a Plan:** Think step-by-step about which tools to use in what order. For example, you might need to search for a URL first, then read the content of that URL. |
|
|
3. **Execute & Analyze:** Call the necessary tools. Carefully examine the output of each tool to extract the required facts. You can write Python code to process the data returned by the tools. |
|
|
4. **Synthesize the Answer:** Once you have gathered sufficient information, formulate a final, concise answer to the original question. |
|
|
|
|
|
**CRITICAL INSTRUCTIONS:** |
|
|
- Your final action MUST be a single call to the `final_answer(answer: str)` function. |
|
|
- The `answer` argument must be a string containing only the definitive answer. |
|
|
- All code you write is executed in a restricted Python environment. You can define variables and write logic to process the tool outputs before calling `final_answer`. |
|
|
- Do not ask for clarification. Directly proceed to solve the problem. |
|
|
""" |
|
|
|
|
|
|
|
|
self.agent = CodeAgent( |
|
|
model=model, |
|
|
tools=[file_reader, DuckDuckGoSearchTool()], |
|
|
add_base_tools=True, |
|
|
planning_interval=3 |
|
|
) |
|
|
|
|
|
print("Optimized GaiaSmolAgent initialized successfully with native memory and full multimodal capabilities.") |
|
|
|
|
|
def __call__(self, question: str, reset_memory: bool = False) -> str: |
|
|
""" |
|
|
Directly runs the agent to generate and execute a plan to answer the question. |
|
|
It leverages the agent's built-in memory, controlled by the `reset` parameter. |
|
|
|
|
|
Args: |
|
|
question (str): The user's question. |
|
|
reset_memory (bool): If True, the agent's conversation memory will be cleared |
|
|
before running. Maps to the agent's `reset` parameter. |
|
|
""" |
|
|
print(f"Optimized Agent received question: {question[:100]}...") |
|
|
|
|
|
try: |
|
|
|
|
|
full_prompt = f"{self.system_prompt}\n\nCURRENT TASK:\nUser Question: \"{question}\"" |
|
|
|
|
|
|
|
|
|
|
|
final_answer = self.agent.run(full_prompt, reset=reset_memory) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"FATAL AGENT ERROR: An exception occurred during agent execution: {e}") |
|
|
print(traceback.format_exc()) |
|
|
return f"FATAL AGENT ERROR: {e}" |
|
|
|
|
|
print(f"Optimized Agent returning final answer: {final_answer}") |
|
|
return str(final_answer) |
|
|
|
|
|
|
|
|
def run_and_submit_all( profile: gr.OAuthProfile | None): |
|
|
""" |
|
|
Fetches all questions, runs the BasicAgent on them, submits all answers, |
|
|
and displays the results. |
|
|
""" |
|
|
|
|
|
space_id = os.getenv("SPACE_ID") |
|
|
|
|
|
if profile: |
|
|
username= f"{profile.username}" |
|
|
print(f"User logged in: {username}") |
|
|
else: |
|
|
print("User not logged in.") |
|
|
return "Please Login to Hugging Face with the button.", None |
|
|
|
|
|
api_url = DEFAULT_API_URL |
|
|
questions_url = f"{api_url}/questions" |
|
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
|
|
|
try: |
|
|
agent = GaiaSmolAgent() |
|
|
except Exception as e: |
|
|
print(f"Error instantiating agent: {e}") |
|
|
return f"Error initializing agent: {e}", None |
|
|
|
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
|
print(agent_code) |
|
|
|
|
|
|
|
|
print(f"Fetching questions from: {questions_url}") |
|
|
try: |
|
|
response = requests.get(questions_url, timeout=15) |
|
|
response.raise_for_status() |
|
|
questions_data = response.json() |
|
|
if not questions_data: |
|
|
print("Fetched questions list is empty.") |
|
|
return "Fetched questions list is empty or invalid format.", None |
|
|
print(f"Fetched {len(questions_data)} questions.") |
|
|
except requests.exceptions.RequestException as e: |
|
|
print(f"Error fetching questions: {e}") |
|
|
return f"Error fetching questions: {e}", None |
|
|
except requests.exceptions.JSONDecodeError as e: |
|
|
print(f"Error decoding JSON response from questions endpoint: {e}") |
|
|
print(f"Response text: {response.text[:500]}") |
|
|
return f"Error decoding server response for questions: {e}", None |
|
|
except Exception as e: |
|
|
print(f"An unexpected error occurred fetching questions: {e}") |
|
|
return f"An unexpected error occurred fetching questions: {e}", None |
|
|
|
|
|
|
|
|
results_log = [] |
|
|
answers_payload = [] |
|
|
print(f"Running agent on {len(questions_data)} questions...") |
|
|
for item in questions_data: |
|
|
task_id = item.get("task_id") |
|
|
question_text = item.get("question") |
|
|
if not task_id or question_text is None: |
|
|
print(f"Skipping item with missing task_id or question: {item}") |
|
|
continue |
|
|
try: |
|
|
submitted_answer = agent(question_text) |
|
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
|
|
except Exception as e: |
|
|
print(f"Error running agent on task {task_id}: {e}") |
|
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
|
|
|
if not answers_payload: |
|
|
print("Agent did not produce any answers to submit.") |
|
|
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
|
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." |
|
|
print(status_update) |
|
|
|
|
|
|
|
|
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") |
|
|
try: |
|
|
response = requests.post(submit_url, json=submission_data, timeout=60) |
|
|
response.raise_for_status() |
|
|
result_data = response.json() |
|
|
final_status = ( |
|
|
f"Submission Successful!\n" |
|
|
f"User: {result_data.get('username')}\n" |
|
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
|
) |
|
|
print("Submission successful.") |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return final_status, results_df |
|
|
except requests.exceptions.HTTPError as e: |
|
|
error_detail = f"Server responded with status {e.response.status_code}." |
|
|
try: |
|
|
error_json = e.response.json() |
|
|
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" |
|
|
except requests.exceptions.JSONDecodeError: |
|
|
error_detail += f" Response: {e.response.text[:500]}" |
|
|
status_message = f"Submission Failed: {error_detail}" |
|
|
print(status_message) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
except requests.exceptions.Timeout: |
|
|
status_message = "Submission Failed: The request timed out." |
|
|
print(status_message) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
except requests.exceptions.RequestException as e: |
|
|
status_message = f"Submission Failed: Network error - {e}" |
|
|
print(status_message) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
except Exception as e: |
|
|
status_message = f"An unexpected error occurred during submission: {e}" |
|
|
print(status_message) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# GAIA Agent Evaluation Runner (smol-agent)") |
|
|
gr.Markdown( |
|
|
""" |
|
|
**Instructions:** |
|
|
1. Ensure you have added your **GEMINI API key** (as `GEMINI_API_KEY`) in the Space's secrets. |
|
|
2. Log in to your Hugging Face account using the button below. |
|
|
3. Click 'Run Evaluation & Submit All Answers' to run your agent and see the score. |
|
|
""" |
|
|
) |
|
|
gr.LoginButton() |
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
|
|
run_button.click( |
|
|
fn=run_and_submit_all, |
|
|
outputs=[status_output, results_table] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("Launching Gradio Interface for GAIA Agent Evaluation...") |
|
|
demo.launch(debug=True, share=False) |
|
|
|