|
|
import os |
|
|
import gradio as gr |
|
|
import requests |
|
|
import pandas as pd |
|
|
import re |
|
|
import io |
|
|
import contextlib |
|
|
import json |
|
|
from huggingface_hub import InferenceClient |
|
|
from langchain_community.tools import DuckDuckGoSearchRun |
|
|
from PyPDF2 import PdfReader |
|
|
from docx import Document |
|
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.2" |
|
|
PROMPT_TEMPLATE = """<s>[INST]You are a helpful assistant designed to answer questions accurately. You have access to the following tools: |
|
|
|
|
|
{tools_description} |
|
|
|
|
|
To answer the question, you must follow this format, thinking step by step. |
|
|
|
|
|
Thought: Your reasoning and plan for the next step. You can also write down observations here. |
|
|
Action: The tool to use, in the format `tool_name(arg_name="value")`. The available tools are: {tool_names}. |
|
|
Observation: The result from the tool. |
|
|
... (this Thought/Action/Observation can repeat N times) |
|
|
|
|
|
When you have the final answer, respond with: |
|
|
Thought: I have now found the final answer. |
|
|
Final Answer: The final answer. |
|
|
|
|
|
Important: |
|
|
- Do not use a tool if you are not sure about the parameters. |
|
|
- Do not make up file names. |
|
|
- If a tool is not available for a task (e.g., image analysis), state that you cannot answer. |
|
|
- If a tool returns an error, note it and try an alternative approach if possible. |
|
|
|
|
|
Question: {question} |
|
|
[/INST]{scratchpad}""" |
|
|
|
|
|
|
|
|
|
|
|
class WebSearchTool: |
|
|
"""A tool to search the web for information.""" |
|
|
def __init__(self): |
|
|
self.search = DuckDuckGoSearchRun() |
|
|
|
|
|
def __call__(self, query: str): |
|
|
print(f"--- Calling WebSearchTool with query: {query} ---") |
|
|
try: |
|
|
return self.search.run(query) |
|
|
except Exception as e: |
|
|
return f"Error during web search: {e}" |
|
|
|
|
|
@property |
|
|
def description(self): |
|
|
return 'web_search(query: str) -> str - A tool to search the web for information. Use it to find up-to-date information or facts.' |
|
|
|
|
|
class PythonREPLTool: |
|
|
"""A tool to execute Python code.""" |
|
|
def __call__(self, code: str): |
|
|
print(f"--- Calling PythonREPLTool with code: {code} ---") |
|
|
if any(keyword in code for keyword in ["os", "sys", "subprocess", "eval", "exec"]): |
|
|
return "Error: Use of os, sys, subprocess, eval, or exec is not allowed for security reasons." |
|
|
|
|
|
local_vars = {} |
|
|
string_io = io.StringIO() |
|
|
try: |
|
|
with contextlib.redirect_stdout(string_io): |
|
|
exec(code, {}, local_vars) |
|
|
output = string_io.getvalue() |
|
|
if not output and local_vars: |
|
|
output = str(list(local_vars.values())[-1]) |
|
|
return output if output else "Code executed with no output." |
|
|
except Exception as e: |
|
|
return f"Error executing code: {e}" |
|
|
|
|
|
@property |
|
|
def description(self): |
|
|
return 'python_repl(code: str) -> str - A Python REPL. Use it to perform calculations, data manipulation, etc. The result of the last line is returned.' |
|
|
|
|
|
class FileReaderTool: |
|
|
"""A tool to read the content of a file associated with a task.""" |
|
|
def __init__(self, api_url: str): |
|
|
self.api_url = api_url |
|
|
|
|
|
def __call__(self, task_id: str, file_name: str): |
|
|
print(f"--- Calling FileReaderTool for task_id: {task_id}, file_name: {file_name} ---") |
|
|
if file_name.endswith(('.mp3', '.wav', '.flac')): |
|
|
return "Error: This tool cannot read audio files. Use the `audio_transcriber` tool instead." |
|
|
|
|
|
file_url = f"{self.api_url}/files/{task_id}" |
|
|
|
|
|
try: |
|
|
response = requests.get(file_url, timeout=20) |
|
|
response.raise_for_status() |
|
|
file_content = io.BytesIO(response.content) |
|
|
|
|
|
content = "" |
|
|
if file_name.endswith('.pdf'): |
|
|
pdf = PdfReader(file_content) |
|
|
content = "".join(page.extract_text() for page in pdf.pages if page.extract_text()) |
|
|
elif file_name.endswith('.docx'): |
|
|
doc = Document(file_content) |
|
|
content = "\n".join(para.text for para in doc.paragraphs) |
|
|
elif file_name.endswith('.csv'): |
|
|
df = pd.read_csv(file_content) |
|
|
content = df.to_json(orient='records') |
|
|
elif file_name.endswith('.json'): |
|
|
data = json.load(file_content) |
|
|
content = json.dumps(data, indent=2) |
|
|
elif file_name.endswith('.txt'): |
|
|
content = file_content.read().decode('utf-8') |
|
|
elif file_name.endswith('.xlsx'): |
|
|
df = pd.read_excel(file_content, engine='openpyxl') |
|
|
content = df.to_json(orient='records') |
|
|
else: |
|
|
return f"Error: Unsupported file type for '{file_name}'. Supported types: .pdf, .docx, .csv, .json, .txt, .xlsx." |
|
|
return content if content else "File is empty." |
|
|
except Exception as e: |
|
|
return f"Error reading file '{file_name}': {e}" |
|
|
|
|
|
@property |
|
|
def description(self): |
|
|
return 'file_reader(task_id: str, file_name: str) -> str - Reads content of text-based files (.pdf, .docx, .csv, .json, .txt, .xlsx). For audio, use audio_transcriber.' |
|
|
|
|
|
class AudioTranscriptionTool: |
|
|
"""A tool to transcribe audio files using the Hugging Face Inference API.""" |
|
|
def __init__(self, api_url: str, client: InferenceClient): |
|
|
self.api_url = api_url |
|
|
self.client = client |
|
|
|
|
|
def __call__(self, task_id: str, file_name: str): |
|
|
print(f"--- Calling AudioTranscriptionTool for task: {task_id}, file: {file_name} ---") |
|
|
file_url = f"{self.api_url}/files/{task_id}" |
|
|
try: |
|
|
response = requests.get(file_url, timeout=30) |
|
|
response.raise_for_status() |
|
|
audio_data = response.content |
|
|
|
|
|
transcription = self.client.automatic_speech_recognition(audio_data, model="openai/whisper-large-v2") |
|
|
if transcription and 'text' in transcription: |
|
|
return transcription['text'] |
|
|
else: |
|
|
return "Could not transcribe audio." |
|
|
except Exception as e: |
|
|
return f"Error during audio transcription: {e}" |
|
|
|
|
|
@property |
|
|
def description(self): |
|
|
return 'audio_transcriber(task_id: str, file_name: str) -> str - Transcribes an audio file (.mp3, .wav) associated with the current task.' |
|
|
|
|
|
class YouTubeTranscriptTool: |
|
|
"""A tool to fetch the transcript of a YouTube video.""" |
|
|
def __call__(self, video_url: str): |
|
|
print(f"--- Calling YouTubeTranscriptTool for URL: {video_url} ---") |
|
|
match = re.search(r"v=([a-zA-Z0-9_-]+)", video_url) |
|
|
if not match: |
|
|
return "Error: Invalid YouTube URL. Could not extract video ID." |
|
|
video_id = match.group(1) |
|
|
try: |
|
|
transcript_list = YouTubeTranscriptApi.get_transcript(video_id) |
|
|
return " ".join([d['text'] for d in transcript_list]) |
|
|
except Exception as e: |
|
|
return f"Error fetching transcript for video {video_id}: {e}. The video might not have a transcript." |
|
|
|
|
|
@property |
|
|
def description(self): |
|
|
return 'youtube_transcript_fetcher(video_url: str) -> str - Fetches the transcript of a YouTube video. Use for questions about video content.' |
|
|
|
|
|
|
|
|
class GaiaAgent: |
|
|
def __init__(self, hf_token: str, api_url: str, max_turns: int = 8): |
|
|
if not hf_token: |
|
|
raise ValueError("Hugging Face token is required for the Inference API.") |
|
|
self.llm_client = InferenceClient(model=MODEL_ID, token=hf_token) |
|
|
self.max_turns = max_turns |
|
|
|
|
|
self.tools = { |
|
|
"web_search": WebSearchTool(), |
|
|
"python_repl": PythonREPLTool(), |
|
|
"file_reader": FileReaderTool(api_url=api_url), |
|
|
"youtube_transcript_fetcher": YouTubeTranscriptTool(), |
|
|
"audio_transcriber": AudioTranscriptionTool(api_url=api_url, client=self.llm_client), |
|
|
} |
|
|
self.tools_description = "\n".join([f"- `{tool.description}`" for tool in self.tools.values()]) |
|
|
self.tool_names = ", ".join(self.tools.keys()) |
|
|
print("GaiaAgent initialized successfully with tools:", self.tool_names) |
|
|
|
|
|
def __call__(self, question: str, task_id: str) -> str: |
|
|
print(f"\n--- Running agent on task {task_id} ---") |
|
|
print(f"Question: {question[:100]}...") |
|
|
scratchpad = "" |
|
|
|
|
|
for turn in range(self.max_turns): |
|
|
print(f"Turn {turn + 1}/{self.max_turns}") |
|
|
prompt = PROMPT_TEMPLATE.format( |
|
|
tools_description=self.tools_description, |
|
|
tool_names=self.tool_names, |
|
|
question=question, |
|
|
scratchpad=scratchpad, |
|
|
) |
|
|
try: |
|
|
llm_output = self.llm_client.text_generation( |
|
|
prompt, max_new_tokens=1024, stop_sequences=["Observation:", "[/INST]"], temperature=0.1 |
|
|
).strip() |
|
|
except Exception as e: |
|
|
return f"Error: LLM call failed. {e}" |
|
|
|
|
|
print(f"LLM Output:\n{llm_output}") |
|
|
scratchpad += llm_output |
|
|
|
|
|
final_answer_match = re.search(r"Final Answer:\s*(.*)", scratchpad, re.DOTALL) |
|
|
action_match = re.search(r"Action:\s*([a-zA-Z0-9_]+)\((.*)\)", llm_output, re.DOTALL) |
|
|
|
|
|
if final_answer_match: |
|
|
return final_answer_match.group(1).strip() |
|
|
|
|
|
elif action_match: |
|
|
tool_name = action_match.group(1).strip() |
|
|
tool_args_str = action_match.group(2).strip() |
|
|
|
|
|
if tool_name not in self.tools: |
|
|
observation = f"Error: Unknown tool '{tool_name}'." |
|
|
else: |
|
|
try: |
|
|
args_dict = eval(f"dict({tool_args_str})", {"__builtins__": None}, {}) |
|
|
if tool_name in ['file_reader', 'audio_transcriber']: |
|
|
args_dict['task_id'] = task_id |
|
|
tool = self.tools[tool_name] |
|
|
observation = tool(**args_dict) |
|
|
except Exception as e: |
|
|
observation = f"Error executing tool '{tool_name}': {e}" |
|
|
|
|
|
print(f"Observation: {str(observation)[:200]}...") |
|
|
scratchpad += f"\nObservation: {str(observation)}\n" |
|
|
else: |
|
|
scratchpad += "\nObservation: No valid action or final answer found. Please format your response as either 'Action: tool_name(args)' or 'Final Answer: your_answer'." |
|
|
|
|
|
return "Agent stopped after reaching maximum turns." |
|
|
|
|
|
|
|
|
def run_and_submit_all(profile: gr.OAuthProfile | None): |
|
|
hf_token = os.getenv("HF_TOKEN") |
|
|
if not hf_token: |
|
|
return "Error: `HF_TOKEN` secret not set. Please add it to your Space secrets.", None |
|
|
|
|
|
space_id = "ZeroTimo/RobotPai" |
|
|
if not space_id: |
|
|
return "Error: `SPACE_ID` not found. Are you in a Hugging Face Space?", None |
|
|
|
|
|
if not profile: |
|
|
return "Please Login to Hugging Face with the button to submit.", None |
|
|
|
|
|
username = profile.username |
|
|
api_url = DEFAULT_API_URL |
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
|
|
|
|
try: |
|
|
agent = GaiaAgent(hf_token=hf_token, api_url=api_url) |
|
|
except Exception as e: |
|
|
return f"Error initializing agent: {e}", None |
|
|
|
|
|
try: |
|
|
response = requests.get(f"{api_url}/questions", timeout=15) |
|
|
response.raise_for_status() |
|
|
questions_data = response.json() |
|
|
except Exception as e: |
|
|
return f"Error fetching questions: {e}", None |
|
|
|
|
|
results_log, answers_payload = [], [] |
|
|
for item in questions_data: |
|
|
task_id, question_text = item.get("task_id"), item.get("question") |
|
|
if not all([task_id, question_text]): continue |
|
|
try: |
|
|
answer = agent(question_text, task_id) |
|
|
answers_payload.append({"task_id": task_id, "submitted_answer": answer}) |
|
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": answer}) |
|
|
except Exception as e: |
|
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
|
|
|
if not answers_payload: |
|
|
return "Agent did not produce any answers.", pd.DataFrame(results_log) |
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
|
|
|
|
try: |
|
|
response = requests.post(f"{api_url}/submit", json=submission_data, timeout=120) |
|
|
response.raise_for_status() |
|
|
result_data = response.json() |
|
|
final_status = ( |
|
|
f"Submission Successful! Score: {result_data.get('score', 'N/A')}% " |
|
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')})" |
|
|
) |
|
|
return final_status, pd.DataFrame(results_log) |
|
|
except requests.exceptions.RequestException as e: |
|
|
error_detail = f"Server responded with status {e.response.status_code}. Response: {e.response.text[:500]}" if e.response else str(e) |
|
|
return f"Submission Failed: {error_detail}", pd.DataFrame(results_log) |
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# GAIA Agent Evaluation Runner") |
|
|
gr.Markdown( |
|
|
""" |
|
|
**Instructions:** |
|
|
1. **Add your HF Token**: Go to your Space's **Settings** and add a secret named `HF_TOKEN` with your Hugging Face `read` token. |
|
|
2. **Login**: Use the button below to login with your Hugging Face account. |
|
|
3. **Run**: Click 'Run Evaluation & Submit' to start the agent. This may take several minutes. |
|
|
""" |
|
|
) |
|
|
with gr.Row(): |
|
|
gr.LoginButton() |
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary") |
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=4, interactive=False) |
|
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
if not os.getenv("HF_TOKEN"): |
|
|
print("⚠️ WARNING: `HF_TOKEN` secret not found. The agent will not run.") |
|
|
demo.launch() |