itsskofficial's picture
changed space
49bfb00
import os
import gradio as gr
import requests
import pandas as pd
import re
import io
import contextlib
import json
from huggingface_hub import InferenceClient
from langchain_community.tools import DuckDuckGoSearchRun
from PyPDF2 import PdfReader
from docx import Document
from youtube_transcript_api import YouTubeTranscriptApi
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.2"
PROMPT_TEMPLATE = """<s>[INST]You are a helpful assistant designed to answer questions accurately. You have access to the following tools:
{tools_description}
To answer the question, you must follow this format, thinking step by step.
Thought: Your reasoning and plan for the next step. You can also write down observations here.
Action: The tool to use, in the format `tool_name(arg_name="value")`. The available tools are: {tool_names}.
Observation: The result from the tool.
... (this Thought/Action/Observation can repeat N times)
When you have the final answer, respond with:
Thought: I have now found the final answer.
Final Answer: The final answer.
Important:
- Do not use a tool if you are not sure about the parameters.
- Do not make up file names.
- If a tool is not available for a task (e.g., image analysis), state that you cannot answer.
- If a tool returns an error, note it and try an alternative approach if possible.
Question: {question}
[/INST]{scratchpad}"""
# --- Tool Definitions ---
class WebSearchTool:
"""A tool to search the web for information."""
def __init__(self):
self.search = DuckDuckGoSearchRun()
def __call__(self, query: str):
print(f"--- Calling WebSearchTool with query: {query} ---")
try:
return self.search.run(query)
except Exception as e:
return f"Error during web search: {e}"
@property
def description(self):
return 'web_search(query: str) -> str - A tool to search the web for information. Use it to find up-to-date information or facts.'
class PythonREPLTool:
"""A tool to execute Python code."""
def __call__(self, code: str):
print(f"--- Calling PythonREPLTool with code: {code} ---")
if any(keyword in code for keyword in ["os", "sys", "subprocess", "eval", "exec"]):
return "Error: Use of os, sys, subprocess, eval, or exec is not allowed for security reasons."
local_vars = {}
string_io = io.StringIO()
try:
with contextlib.redirect_stdout(string_io):
exec(code, {}, local_vars)
output = string_io.getvalue()
if not output and local_vars:
output = str(list(local_vars.values())[-1])
return output if output else "Code executed with no output."
except Exception as e:
return f"Error executing code: {e}"
@property
def description(self):
return 'python_repl(code: str) -> str - A Python REPL. Use it to perform calculations, data manipulation, etc. The result of the last line is returned.'
class FileReaderTool:
"""A tool to read the content of a file associated with a task."""
def __init__(self, api_url: str):
self.api_url = api_url
def __call__(self, task_id: str, file_name: str):
print(f"--- Calling FileReaderTool for task_id: {task_id}, file_name: {file_name} ---")
if file_name.endswith(('.mp3', '.wav', '.flac')):
return "Error: This tool cannot read audio files. Use the `audio_transcriber` tool instead."
file_url = f"{self.api_url}/files/{task_id}"
try:
response = requests.get(file_url, timeout=20)
response.raise_for_status()
file_content = io.BytesIO(response.content)
content = ""
if file_name.endswith('.pdf'):
pdf = PdfReader(file_content)
content = "".join(page.extract_text() for page in pdf.pages if page.extract_text())
elif file_name.endswith('.docx'):
doc = Document(file_content)
content = "\n".join(para.text for para in doc.paragraphs)
elif file_name.endswith('.csv'):
df = pd.read_csv(file_content)
content = df.to_json(orient='records') # Return JSON for easier processing
elif file_name.endswith('.json'):
data = json.load(file_content)
content = json.dumps(data, indent=2)
elif file_name.endswith('.txt'):
content = file_content.read().decode('utf-8')
elif file_name.endswith('.xlsx'):
df = pd.read_excel(file_content, engine='openpyxl')
content = df.to_json(orient='records') # Return JSON for easier processing
else:
return f"Error: Unsupported file type for '{file_name}'. Supported types: .pdf, .docx, .csv, .json, .txt, .xlsx."
return content if content else "File is empty."
except Exception as e:
return f"Error reading file '{file_name}': {e}"
@property
def description(self):
return 'file_reader(task_id: str, file_name: str) -> str - Reads content of text-based files (.pdf, .docx, .csv, .json, .txt, .xlsx). For audio, use audio_transcriber.'
class AudioTranscriptionTool:
"""A tool to transcribe audio files using the Hugging Face Inference API."""
def __init__(self, api_url: str, client: InferenceClient):
self.api_url = api_url
self.client = client
def __call__(self, task_id: str, file_name: str):
print(f"--- Calling AudioTranscriptionTool for task: {task_id}, file: {file_name} ---")
file_url = f"{self.api_url}/files/{task_id}"
try:
response = requests.get(file_url, timeout=30)
response.raise_for_status()
audio_data = response.content
# Specify Whisper-large-v2 for accurate transcription
transcription = self.client.automatic_speech_recognition(audio_data, model="openai/whisper-large-v2")
if transcription and 'text' in transcription:
return transcription['text']
else:
return "Could not transcribe audio."
except Exception as e:
return f"Error during audio transcription: {e}"
@property
def description(self):
return 'audio_transcriber(task_id: str, file_name: str) -> str - Transcribes an audio file (.mp3, .wav) associated with the current task.'
class YouTubeTranscriptTool:
"""A tool to fetch the transcript of a YouTube video."""
def __call__(self, video_url: str):
print(f"--- Calling YouTubeTranscriptTool for URL: {video_url} ---")
match = re.search(r"v=([a-zA-Z0-9_-]+)", video_url)
if not match:
return "Error: Invalid YouTube URL. Could not extract video ID."
video_id = match.group(1)
try:
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
return " ".join([d['text'] for d in transcript_list])
except Exception as e:
return f"Error fetching transcript for video {video_id}: {e}. The video might not have a transcript."
@property
def description(self):
return 'youtube_transcript_fetcher(video_url: str) -> str - Fetches the transcript of a YouTube video. Use for questions about video content.'
# --- GAIA Agent Definition ---
class GaiaAgent:
def __init__(self, hf_token: str, api_url: str, max_turns: int = 8):
if not hf_token:
raise ValueError("Hugging Face token is required for the Inference API.")
self.llm_client = InferenceClient(model=MODEL_ID, token=hf_token)
self.max_turns = max_turns
self.tools = {
"web_search": WebSearchTool(),
"python_repl": PythonREPLTool(),
"file_reader": FileReaderTool(api_url=api_url),
"youtube_transcript_fetcher": YouTubeTranscriptTool(),
"audio_transcriber": AudioTranscriptionTool(api_url=api_url, client=self.llm_client),
}
self.tools_description = "\n".join([f"- `{tool.description}`" for tool in self.tools.values()])
self.tool_names = ", ".join(self.tools.keys())
print("GaiaAgent initialized successfully with tools:", self.tool_names)
def __call__(self, question: str, task_id: str) -> str:
print(f"\n--- Running agent on task {task_id} ---")
print(f"Question: {question[:100]}...")
scratchpad = ""
for turn in range(self.max_turns):
print(f"Turn {turn + 1}/{self.max_turns}")
prompt = PROMPT_TEMPLATE.format(
tools_description=self.tools_description,
tool_names=self.tool_names,
question=question,
scratchpad=scratchpad,
)
try:
llm_output = self.llm_client.text_generation(
prompt, max_new_tokens=1024, stop_sequences=["Observation:", "[/INST]"], temperature=0.1
).strip()
except Exception as e:
return f"Error: LLM call failed. {e}"
print(f"LLM Output:\n{llm_output}")
scratchpad += llm_output
final_answer_match = re.search(r"Final Answer:\s*(.*)", scratchpad, re.DOTALL)
action_match = re.search(r"Action:\s*([a-zA-Z0-9_]+)\((.*)\)", llm_output, re.DOTALL)
if final_answer_match:
return final_answer_match.group(1).strip()
elif action_match:
tool_name = action_match.group(1).strip()
tool_args_str = action_match.group(2).strip()
if tool_name not in self.tools:
observation = f"Error: Unknown tool '{tool_name}'."
else:
try:
args_dict = eval(f"dict({tool_args_str})", {"__builtins__": None}, {})
if tool_name in ['file_reader', 'audio_transcriber']:
args_dict['task_id'] = task_id
tool = self.tools[tool_name]
observation = tool(**args_dict)
except Exception as e:
observation = f"Error executing tool '{tool_name}': {e}"
print(f"Observation: {str(observation)[:200]}...")
scratchpad += f"\nObservation: {str(observation)}\n"
else:
scratchpad += "\nObservation: No valid action or final answer found. Please format your response as either 'Action: tool_name(args)' or 'Final Answer: your_answer'."
return "Agent stopped after reaching maximum turns."
# --- Main Submission Logic ---
def run_and_submit_all(profile: gr.OAuthProfile | None):
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
return "Error: `HF_TOKEN` secret not set. Please add it to your Space secrets.", None
space_id = "ZeroTimo/RobotPai"
if not space_id:
return "Error: `SPACE_ID` not found. Are you in a Hugging Face Space?", None
if not profile:
return "Please Login to Hugging Face with the button to submit.", None
username = profile.username
api_url = DEFAULT_API_URL
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
try:
agent = GaiaAgent(hf_token=hf_token, api_url=api_url)
except Exception as e:
return f"Error initializing agent: {e}", None
try:
response = requests.get(f"{api_url}/questions", timeout=15)
response.raise_for_status()
questions_data = response.json()
except Exception as e:
return f"Error fetching questions: {e}", None
results_log, answers_payload = [], []
for item in questions_data:
task_id, question_text = item.get("task_id"), item.get("question")
if not all([task_id, question_text]): continue
try:
answer = agent(question_text, task_id)
answers_payload.append({"task_id": task_id, "submitted_answer": answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": answer})
except Exception as e:
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
if not answers_payload:
return "Agent did not produce any answers.", pd.DataFrame(results_log)
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
try:
response = requests.post(f"{api_url}/submit", json=submission_data, timeout=120)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful! Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')})"
)
return final_status, pd.DataFrame(results_log)
except requests.exceptions.RequestException as e:
error_detail = f"Server responded with status {e.response.status_code}. Response: {e.response.text[:500]}" if e.response else str(e)
return f"Submission Failed: {error_detail}", pd.DataFrame(results_log)
# --- Gradio Interface ---
with gr.Blocks() as demo:
gr.Markdown("# GAIA Agent Evaluation Runner")
gr.Markdown(
"""
**Instructions:**
1. **Add your HF Token**: Go to your Space's **Settings** and add a secret named `HF_TOKEN` with your Hugging Face `read` token.
2. **Login**: Use the button below to login with your Hugging Face account.
3. **Run**: Click 'Run Evaluation & Submit' to start the agent. This may take several minutes.
"""
)
with gr.Row():
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=4, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table])
if __name__ == "__main__":
if not os.getenv("HF_TOKEN"):
print("⚠️ WARNING: `HF_TOKEN` secret not found. The agent will not run.")
demo.launch()