agentCourse / app.py
gabejavitt's picture
Update app.py
8cd5fe1 verified
raw
history blame
60.4 kB
import os
import gradio as gr
import requests
import inspect
import pandas as pd
import io
import contextlib
from typing import TypedDict, Annotated
import torch
import json # For robust tool call parsing/generation if needed
# --- Multimodal & Web Tool Imports ---
from transformers import pipeline
from youtube_transcript_api import YouTubeTranscriptApi
import requests
from bs4 import BeautifulSoup
# --- LangChain & LangGraph Imports ---
from langgraph.graph.message import add_messages
from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage
from langgraph.prebuilt import ToolNode
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import tools_condition
from langchain_huggingface import ChatHuggingFace
from langchain_huggingface import HuggingFaceEndpoint
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_core.tools import tool, BaseTool
# (Keep Constants as is)
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- LangGraph Agent State ---
class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
# --- Basic Agent Definition ---
# ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------
class BasicAgent:
# --- Tool Definitions as Methods ---
# By making tools methods, they can access self.asr_pipeline
@tool
def search_tool(self, query: str) -> str:
"""Calls DuckDuckGo search and returns the results. Use this for recent information or general web searches."""
print(f"--- Calling Search Tool with query: {query} ---")
try:
search = DuckDuckGoSearchRun()
return search.run(query)
except Exception as e:
return f"Error running search: {e}"
@tool
def code_interpreter(self, code: str) -> str:
"""
Executes a string of Python code and returns its stdout, stderr, and any error.
Use this for calculations, data manipulation (including pandas on dataframes read from files), list operations, string manipulations, or any other Python operation.
The code runs in a sandboxed environment. 'pandas' (as pd) and 'openpyxl' are available.
Ensure the code is complete and executable. If printing, use print().
"""
print(f"--- Calling Code Interpreter with code:\n{code}\n---")
output_stream = io.StringIO()
error_stream = io.StringIO()
try:
# Use contextlib to redirect stdout and stderr
with contextlib.redirect_stdout(output_stream), contextlib.redirect_stderr(error_stream):
# Execute the code. Provide 'pd' (pandas) in the globals
exec(code, {"pd": pd}, {})
stdout = output_stream.getvalue()
stderr = error_stream.getvalue()
if stderr:
return f"Error: {stderr}\nStdout: {stdout}"
if stdout:
return f"Success:\n{stdout}"
return "Success: Code executed without error and produced no stdout."
except Exception as e:
# Capture any exception during exec
return f"Execution failed with error: {str(e)}"
@tool
def read_file(self, path: str) -> str:
"""Reads the content of a file at the specified path. Use this to examine files provided in the question."""
print(f"--- Calling Read File Tool at path: {path} ---")
try:
# Try finding the file relative to the app directory first
script_dir = os.path.dirname(os.path.abspath(__file__)) # Use absolute path
full_path = os.path.join(script_dir, path)
print(f"Attempting to read relative path: {full_path}")
if not os.path.exists(full_path):
# If not found, try the direct path (might be absolute or relative to cwd)
full_path = path
print(f"Attempting to read direct path: {full_path}")
if not os.path.exists(full_path):
# Try basename for GAIA questions providing just the filename
base_path = os.path.basename(path)
print(f"Attempting to read basename path: {base_path}")
if os.path.exists(base_path):
full_path = base_path
else:
# List files in current and script directory for debugging
cwd_files = os.listdir(".")
script_dir_files = os.listdir(script_dir)
return (f"Error: File not found.\n"
f"Tried: '{path}', '{os.path.join(script_dir, path)}', '{base_path}'.\n"
f"Files in current dir (.): {cwd_files}\n"
f"Files in script dir ({script_dir}): {script_dir_files}")
print(f"Reading file: {full_path}")
with open(full_path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
return f"Error reading file {path}: {str(e)}"
@tool
def write_file(self, path: str, content: str) -> str:
"""Writes the given content to a file at the specified path relative to the app's directory. Creates directories if they don't exist."""
print(f"--- Calling Write File Tool at path: {path} ---")
try:
# Ensure the directory exists
script_dir = os.path.dirname(os.path.abspath(__file__))
full_path = os.path.join(script_dir, path) # Write relative to script dir
print(f"Writing file to: {full_path}")
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, 'w', encoding='utf-8') as f:
f.write(content)
return f"Successfully wrote to file {path} (relative to app)."
except Exception as e:
return f"Error writing to file {path}: {str(e)}"
@tool
def list_directory(self, path: str = ".") -> str:
"""Lists the contents (files and directories) of a directory at the specified path relative to the app's directory."""
print(f"--- Calling List Directory Tool at path: {path} ---")
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
full_path = os.path.join(script_dir, path) # List relative to script dir
print(f"Listing directory: {full_path}")
if not os.path.isdir(full_path):
return f"Error: '{path}' is not a valid directory relative to the app."
files = os.listdir(full_path)
return "\n".join(files) if files else "Directory is empty."
except Exception as e:
return f"Error listing directory {path}: {str(e)}"
@tool
def audio_transcription_tool(self, file_path: str) -> str:
"""
Transcribes an audio file (like .mp3 or .wav) using Whisper and returns the text content.
Use this for questions involving audio file analysis.
"""
print(f"--- Calling Audio Transcription Tool at path: {file_path} ---")
# Access the pipeline via self
if not self.asr_pipeline:
return "Error: Audio transcription pipeline is not available."
try:
# Try finding the file relative to the app directory first
script_dir = os.path.dirname(os.path.abspath(__file__))
full_path = os.path.join(script_dir, file_path)
print(f"Attempting to transcribe relative path: {full_path}")
if not os.path.exists(full_path):
# If not found, try the direct path
full_path = file_path
print(f"Attempting to transcribe direct path: {full_path}")
if not os.path.exists(full_path):
# Try basename for GAIA questions
base_path = os.path.basename(file_path)
print(f"Attempting to transcribe basename path: {base_path}")
if os.path.exists(base_path):
full_path = base_path
else:
cwd_files = os.listdir(".")
script_dir_files = os.listdir(script_dir)
return (f"Error: Audio file not found.\n"
f"Tried: '{file_path}', '{os.path.join(script_dir, file_path)}', '{base_path}'.\n"
f"Files in current dir (.): {cwd_files}\n"
f"Files in script dir ({script_dir}): {script_dir_files}")
print(f"Transcribing file: {full_path}")
# Use self.asr_pipeline
transcription = self.asr_pipeline(full_path)
print("--- Transcription Complete ---")
return transcription["text"]
except Exception as e:
return f"Error during audio transcription: {str(e)}"
@tool
def get_youtube_transcript(self, video_url: str) -> str:
"""
Fetches the transcript for a given YouTube video URL. Use this for questions about YouTube video content.
"""
print(f"--- Calling YouTube Transcript Tool for URL: {video_url} ---")
try:
# Extract video ID from URL more robustly
video_id = None
if "watch?v=" in video_url:
video_id = video_url.split("v=")[1].split("&")[0]
elif "youtu.be/" in video_url:
video_id = video_url.split("youtu.be/")[1].split("?")[0]
if not video_id:
return f"Error: Could not extract video ID from URL: {video_url}"
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
# Combine all transcript parts into one string
full_transcript = " ".join([item["text"] for item in transcript_list])
print("--- Transcript Fetched ---")
# Return a limited amount to avoid overwhelming the context
return full_transcript[:8000]
except Exception as e:
return f"Error fetching YouTube transcript: {str(e)}"
@tool
def scrape_web_page(self, url: str) -> str:
"""
Fetches the primary text content of a given web page URL, removing navigation, footer, scripts, and styles.
Use this when you need the full content of a webpage found via search.
"""
print(f"--- Calling Web Scraper Tool for URL: {url} ---")
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}
response = requests.get(url, headers=headers, timeout=15) # Increased timeout
response.raise_for_status() # Raise an error for bad responses (4xx or 5xx)
# Check content type to avoid parsing non-HTML
if 'html' not in response.headers.get('Content-Type', '').lower():
return f"Error: URL {url} did not return HTML content."
soup = BeautifulSoup(response.text, 'html.parser')
# Remove common non-content tags
for tag in soup(["script", "style", "nav", "footer", "aside", "header", "form"]):
tag.extract()
# Attempt to find the main content area (heuristics, may not always work)
main_content = soup.find('main') or soup.find('article') or soup.find('div', role='main') or soup.body
if not main_content:
main_content = soup # Fallback to the whole soup if no main area found
text = main_content.get_text(separator='\n', strip=True)
# Clean up excessive whitespace
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
print("--- Web Page Scraped ---")
# Limit context size
return text[:8000]
except requests.exceptions.RequestException as e:
return f"Error fetching web page {url}: {str(e)}"
except Exception as e:
return f"Error scraping web page {url}: {str(e)}"
# --- End of Tool Definitions ---
def __init__(self):
print("BasicAgent (LangGraph) initializing...")
# 1. Initialize ASR Pipeline *inside* init - DELAYED LOADING
# ==================== MOVED HERE ====================
self.asr_pipeline = None # Initialize as None first
try:
print("Loading ASR (Whisper) pipeline...")
# Decide device based on availability
device = "cuda:0" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device} for ASR.")
self.asr_pipeline = pipeline(
"automatic-speech-recognition",
model="openai/whisper-base",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, # Use float16 only if CUDA available
device=device # Explicitly set device
)
print("✅ ASR (Whisper) pipeline loaded successfully.")
except Exception as e:
print(f"⚠️ Warning: Could not load ASR pipeline. Audio tool will not work. Error: {e}")
self.asr_pipeline = None # Ensure it's None if loading fails
# ====================================================
# 2. Get API Token from Space Secrets
HUGGINGFACEHUB_API_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN")
if not HUGGINGFACEHUB_API_TOKEN:
raise ValueError("HUGGINGFACEHUB_API_TOKEN secret is not set! Please add it to your Space secrets.")
# 3. Collect Tool Methods
# LangChain tools expect functions or objects with a 'run' method.
# The @tool decorator makes our methods compatible.
self.tools = [
self.search_tool, # References the method
self.code_interpreter,
self.read_file,
self.write_file,
self.list_directory,
self.audio_transcription_tool,
self.get_youtube_transcript,
self.scrape_web_page
]
# 4. Define the Improved System Prompt
tool_descriptions = "\n".join([f"- {tool.name}: {tool.description}" for tool in self.tools])
self.system_prompt = f"""You are a highly intelligent and meticulous AI assistant built to answer questions from the GAIA benchmark.
Your primary goal is to provide **only the concise, factual, and direct answer** to the user's question, exactly matching the format required by the benchmark (e.g., a name, a number, a specific string format, a comma-separated list).
**CRITICAL INSTRUCTIONS:**
* **DO NOT** include conversational filler (e.g., "Sure, I can help...", "The answer is...", "Here is the information...").
* **DO NOT** explain your reasoning or the steps you took unless the question *explicitly* asks for it.
* **DO NOT** repeat the question in your final answer.
* **FINAL ANSWER FORMAT:** Your final response must contain *only* the answer itself.
You have access to the following tools to gather information and perform actions:
{tool_descriptions}
**TOOL USAGE PROTOCOL:**
* To use a tool, you MUST respond ONLY with a single JSON object formatted exactly like this:
```json
{{
"tool": "tool_name",
"tool_input": {{ "arg_name1": "value1", "arg_name2": "value2", ... }}
}}
```
* Replace `tool_name` with the exact name of the tool you want to use.
* Provide the required arguments within the `tool_input` dictionary. Ensure argument names and value types match the tool description precisely.
* Do not add any text before or after the JSON tool call block.
**REASONING PROCESS:**
1. Carefully analyze the user's question to understand the specific information required and the expected answer format. Check if any files are attached (mentioned like `[Attached File: filename.ext]`).
2. Break down the problem into logical steps.
3. Determine if any tools are necessary. Use `read_file` for attached files, `audio_transcription_tool` for audio, `get_youtube_transcript` for YouTube URLs, `search_tool` for web info, `scrape_web_page` to read content from URLs found via search, and `code_interpreter` for calculations or data processing.
4. If a tool is needed, call it using the specified JSON format. Wait for the tool's output.
5. Analyze the tool's output. If the answer is found, proceed to step 7.
6. If more information or steps are needed, use another tool (step 4) or continue reasoning based on the gathered information. Pay close attention to previous tool results.
7. Once you have derived the final, definitive answer that meets the question's requirements, output **ONLY** that answer and nothing else. Stop the process.
"""
import os
import gradio as gr
import requests
import inspect
import pandas as pd
import io
import contextlib
from typing import TypedDict, Annotated
import torch
import json # For robust tool call parsing/generation if needed
import re # For finding JSON
import uuid # For generating tool call IDs
# --- Multimodal & Web Tool Imports ---
from transformers import pipeline
from youtube_transcript_api import YouTubeTranscriptApi
import requests
from bs4 import BeautifulSoup
# --- LangChain & LangGraph Imports ---
from langgraph.graph.message import add_messages
# Make sure to import ToolCall
from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage, ToolCall
from langgraph.prebuilt import ToolNode
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import tools_condition
from langchain_huggingface import ChatHuggingFace
from langchain_huggingface import HuggingFaceEndpoint
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_core.tools import tool, BaseTool
# (Keep Constants as is)
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- LangGraph Agent State ---
class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
# --- Basic Agent Definition ---
# ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------
class BasicAgent:
# --- Tool Definitions as Methods ---
# By making tools methods, they can access self.asr_pipeline
@tool
def search_tool(self, query: str) -> str:
"""Calls DuckDuckGo search and returns the results. Use this for recent information or general web searches."""
print(f"--- Calling Search Tool with query: {query} ---")
try:
search = DuckDuckGoSearchRun()
return search.run(query)
except Exception as e:
return f"Error running search: {e}"
@tool
def code_interpreter(self, code: str) -> str:
"""
Executes a string of Python code and returns its stdout, stderr, and any error.
Use this for calculations, data manipulation (including pandas on dataframes read from files), list operations, string manipulations, or any other Python operation.
The code runs in a sandboxed environment. 'pandas' (as pd) and 'openpyxl' are available.
Ensure the code is complete and executable. If printing, use print().
"""
print(f"--- Calling Code Interpreter with code:\n{code}\n---")
output_stream = io.StringIO()
error_stream = io.StringIO()
try:
# Use contextlib to redirect stdout and stderr
with contextlib.redirect_stdout(output_stream), contextlib.redirect_stderr(error_stream):
# Execute the code. Provide 'pd' (pandas) in the globals
exec(code, {"pd": pd}, {})
stdout = output_stream.getvalue()
stderr = error_stream.getvalue()
if stderr:
return f"Error: {stderr}\nStdout: {stdout}"
if stdout:
return f"Success:\n{stdout}"
return "Success: Code executed without error and produced no stdout."
except Exception as e:
# Capture any exception during exec
return f"Execution failed with error: {str(e)}"
@tool
def read_file(self, path: str) -> str:
"""Reads the content of a file at the specified path. Use this to examine files provided in the question."""
print(f"--- Calling Read File Tool at path: {path} ---")
try:
# Try finding the file relative to the app directory first
# Use os.path.dirname(os.path.realpath(__file__)) for robustness in different execution contexts
script_dir = os.path.dirname(os.path.realpath(__file__))
full_path = os.path.join(script_dir, path)
print(f"Attempting to read relative path: {full_path}")
if not os.path.exists(full_path):
# If not found, try the direct path (might be absolute or relative to cwd)
full_path = path
print(f"Attempting to read direct path: {full_path}")
if not os.path.exists(full_path):
# Try basename for GAIA questions providing just the filename
base_path = os.path.basename(path)
print(f"Attempting to read basename path in cwd: {os.path.join(os.getcwd(), base_path)}")
if os.path.exists(base_path): # Check relative to CWD
full_path = base_path
else:
# List files in current and script directory for debugging
try:
cwd_files = os.listdir(".")
except Exception:
cwd_files = ["Error listing CWD"]
try:
script_dir_files = os.listdir(script_dir)
except Exception:
script_dir_files = ["Error listing script dir"]
return (f"Error: File not found.\n"
f"Tried relative path: '{os.path.join(script_dir, path)}'\n"
f"Tried direct path: '{path}'\n"
f"Tried basename in CWD: '{base_path}'\n"
f"Files in current dir (.): {cwd_files}\n"
f"Files in script dir ({script_dir}): {script_dir_files}")
print(f"Reading file: {full_path}")
with open(full_path, 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
return f"Error reading file {path}: {str(e)}"
@tool
def write_file(self, path: str, content: str) -> str:
"""Writes the given content to a file at the specified path relative to the app's directory. Creates directories if they don't exist."""
print(f"--- Calling Write File Tool at path: {path} ---")
try:
# Ensure the directory exists
script_dir = os.path.dirname(os.path.realpath(__file__))
full_path = os.path.join(script_dir, path) # Write relative to script dir
print(f"Writing file to: {full_path}")
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, 'w', encoding='utf-8') as f:
f.write(content)
return f"Successfully wrote to file {path} (relative to app)."
except Exception as e:
return f"Error writing to file {path}: {str(e)}"
@tool
def list_directory(self, path: str = ".") -> str:
"""Lists the contents (files and directories) of a directory at the specified path relative to the app's directory."""
print(f"--- Calling List Directory Tool at path: {path} ---")
try:
script_dir = os.path.dirname(os.path.realpath(__file__))
full_path = os.path.join(script_dir, path) # List relative to script dir
print(f"Listing directory: {full_path}")
if not os.path.isdir(full_path):
return f"Error: '{path}' is not a valid directory relative to the app."
files = os.listdir(full_path)
return "\n".join(files) if files else "Directory is empty."
except Exception as e:
return f"Error listing directory {path}: {str(e)}"
@tool
def audio_transcription_tool(self, file_path: str) -> str:
"""
Transcribes an audio file (like .mp3 or .wav) using Whisper and returns the text content.
Use this for questions involving audio file analysis.
"""
print(f"--- Calling Audio Transcription Tool at path: {file_path} ---")
# Access the pipeline via self
if not self.asr_pipeline:
return "Error: Audio transcription pipeline is not available."
try:
# Try finding the file relative to the app directory first
script_dir = os.path.dirname(os.path.realpath(__file__))
full_path = os.path.join(script_dir, file_path)
print(f"Attempting to transcribe relative path: {full_path}")
if not os.path.exists(full_path):
# If not found, try the direct path
full_path = file_path
print(f"Attempting to transcribe direct path: {full_path}")
if not os.path.exists(full_path):
# Try basename for GAIA questions
base_path = os.path.basename(file_path)
print(f"Attempting to transcribe basename path in CWD: {os.path.join(os.getcwd(), base_path)}")
if os.path.exists(base_path): # Check relative to CWD
full_path = base_path
else:
try:
cwd_files = os.listdir(".")
except Exception:
cwd_files = ["Error listing CWD"]
try:
script_dir_files = os.listdir(script_dir)
except Exception:
script_dir_files = ["Error listing script dir"]
return (f"Error: Audio file not found.\n"
f"Tried relative path: '{os.path.join(script_dir, file_path)}'\n"
f"Tried direct path: '{file_path}'\n"
f"Tried basename in CWD: '{base_path}'\n"
f"Files in current dir (.): {cwd_files}\n"
f"Files in script dir ({script_dir}): {script_dir_files}")
print(f"Transcribing file: {full_path}")
# Important: Ensure the pipeline can handle the file path directly
transcription = self.asr_pipeline(full_path)
print("--- Transcription Complete ---")
# The output structure might vary slightly based on pipeline version
return transcription.get("text", "Error: Transcription failed to produce text.")
except Exception as e:
import traceback
print(f"Error during audio transcription: {e}")
traceback.print_exc()
return f"Error during audio transcription: {str(e)}"
@tool
def get_youtube_transcript(self, video_url: str) -> str:
"""
Fetches the transcript for a given YouTube video URL. Use this for questions about YouTube video content.
"""
print(f"--- Calling YouTube Transcript Tool for URL: {video_url} ---")
try:
# Extract video ID from URL more robustly
video_id = None
if "watch?v=" in video_url:
video_id = video_url.split("v=")[1].split("&")[0]
elif "youtu.be/" in video_url:
video_id = video_url.split("youtu.be/")[1].split("?")[0]
if not video_id:
return f"Error: Could not extract video ID from URL: {video_url}"
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
# Combine all transcript parts into one string
full_transcript = " ".join([item["text"] for item in transcript_list])
print("--- Transcript Fetched ---")
# Return a limited amount to avoid overwhelming the context
return full_transcript[:8000]
except Exception as e:
return f"Error fetching YouTube transcript: {str(e)}"
@tool
def scrape_web_page(self, url: str) -> str:
"""
Fetches the primary text content of a given web page URL, removing navigation, footer, scripts, and styles.
Use this when you need the full content of a webpage found via search.
"""
print(f"--- Calling Web Scraper Tool for URL: {url} ---")
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}
response = requests.get(url, headers=headers, timeout=15) # Increased timeout
response.raise_for_status() # Raise an error for bad responses (4xx or 5xx)
# Check content type to avoid parsing non-HTML
if 'html' not in response.headers.get('Content-Type', '').lower():
return f"Error: URL {url} did not return HTML content."
soup = BeautifulSoup(response.text, 'html.parser')
# Remove common non-content tags
for tag in soup(["script", "style", "nav", "footer", "aside", "header", "form", "button", "input"]):
tag.extract()
# Attempt to find the main content area (heuristics, may not always work)
main_content = soup.find('main') or soup.find('article') or soup.find('div', role='main') or soup.body
if not main_content:
main_content = soup # Fallback to the whole soup if no main area found
text = main_content.get_text(separator='\n', strip=True)
# Clean up excessive whitespace
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = '\n'.join(chunk for chunk in chunks if chunk)
print("--- Web Page Scraped ---")
# Limit context size
return text[:8000]
except requests.exceptions.RequestException as e:
return f"Error fetching web page {url}: {str(e)}"
except Exception as e:
return f"Error scraping web page {url}: {str(e)}"
# --- End of Tool Definitions ---
def __init__(self):
print("BasicAgent (LangGraph) initializing...")
# 1. Initialize ASR Pipeline *inside* init - DELAYED LOADING
self.asr_pipeline = None # Initialize as None first
try:
print("Loading ASR (Whisper) pipeline...")
device = "cuda:0" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device} for ASR.")
self.asr_pipeline = pipeline(
"automatic-speech-recognition",
model="openai/whisper-base",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
device=device
)
print("✅ ASR (Whisper) pipeline loaded successfully.")
except Exception as e:
print(f"⚠️ Warning: Could not load ASR pipeline. Audio tool will not work. Error: {e}")
import traceback
traceback.print_exc() # Print full traceback for ASR load error
self.asr_pipeline = None
# ====================================================
# 2. Get API Token from Space Secrets
HUGGINGFACEHUB_API_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN")
if not HUGGINGFACEHUB_API_TOKEN:
raise ValueError("HUGGINGFACEHUB_API_TOKEN secret is not set! Please add it to your Space secrets.")
# 3. Collect Tool Methods
self.tools = [
self.search_tool,
self.code_interpreter,
self.read_file,
self.write_file,
self.list_directory,
self.audio_transcription_tool,
self.get_youtube_transcript,
self.scrape_web_page
]
# 4. Define the Improved System Prompt with Placeholders
tool_descriptions = "\n".join([f"- {tool.name}: {tool.description}" for tool in self.tools])
self.system_prompt = f"""You are a highly intelligent and meticulous AI assistant built to answer questions from the GAIA benchmark.
Your primary goal is to provide **only the concise, factual, and direct answer** to the user's question, exactly matching the format required by the benchmark (e.g., a name, a number, a specific string format, a comma-separated list).
**CRITICAL INSTRUCTIONS:**
* **DO NOT** include conversational filler (e.g., "Sure, I can help...", "The answer is...", "Here is the information...").
* **DO NOT** explain your reasoning or the steps you took unless the question *explicitly* asks for it.
* **DO NOT** repeat the question in your final answer.
* **FINAL ANSWER FORMAT:** Your final response must contain *only* the answer itself.
You have access to the following tools to gather information and perform actions:
{tool_descriptions}
**TOOL USAGE PROTOCOL:**
* To use a tool, you MUST respond ONLY with a single JSON object formatted exactly like this:
```json
{{
"tool": "tool_name",
"tool_input": {{ "arg_name1": "value1", "arg_name2": "value2", ... }}
}}
```
* Replace `tool_name` with the exact name of the tool you want to use.
* Provide the required arguments within the `tool_input` dictionary. Ensure argument names and value types match the tool description precisely.
* Do not add any text before or after the JSON tool call block.
**REASONING PROCESS:**
1. Carefully analyze the user's question to understand the specific information required and the expected answer format. Check if any files are attached (mentioned like `[Attached File: filename.ext]`).
2. Break down the problem into logical steps.
3. Determine if any tools are necessary. Use `read_file` for attached files, `audio_transcription_tool` for audio, `get_youtube_transcript` for YouTube URLs, `search_tool` for web info, `scrape_web_page` to read content from URLs found via search, and `code_interpreter` for calculations or data processing.
4. If a tool is needed, call it using the specified JSON format. Wait for the tool's output.
5. Analyze the tool's output. If the answer is found, proceed to step 7.
6. If more information or steps are needed, use another tool (step 4) or continue reasoning based on the gathered information. Pay close attention to previous tool results.
7. Once you have derived the final, definitive answer that meets the question's requirements, output **ONLY** that answer and nothing else. Stop the process.
"""
# 5. Initialize the LLM (Using Mistral Instruct)
print("Initializing LLM Endpoint...")
llm = HuggingFaceEndpoint(
repo_id="mistralai/Mistral-7B-Instruct-v0.2", # Switched model
huggingfacehub_api_token=HUGGINGFACEHUB_API_TOKEN,
max_new_tokens=2048,
temperature=0.01,
)
chat_llm = ChatHuggingFace(llm=llm)
print("✅ LLM Endpoint initialized.")
# 6. Bind tools to the LLM
# We still bind tools, but we'll manually parse if it fails
self.llm_with_tools = chat_llm.bind_tools(self.tools)
print("✅ Tools bound to LLM.")
# 7. Define the Agent Node with Manual Tool Parsing
# ==================== NODE WITH PLACEHOLDER REGEX ====================
def agent_node(state: AgentState):
print("--- Running Agent Node ---")
messages_with_prompt = state["messages"]
# Invoke the LLM (which has tools bound)
ai_message: AIMessage = self.llm_with_tools.invoke(messages_with_prompt)
print(f"AI Message Raw Content: {ai_message.content}")
# --- Manual Tool Call Parsing Logic ---
tool_calls = []
# Check if bind_tools already populated tool_calls (ideal case)
if ai_message.tool_calls:
print(f"SUCCESS: bind_tools correctly parsed tool_calls: {ai_message.tool_calls}")
tool_calls = ai_message.tool_calls
# Fallback: Check if content contains likely JSON for tool calls
# Use regex to find JSON possibly wrapped in markdown
elif isinstance(ai_message.content, str):
print("Attempting manual JSON parsing from content...")
# --- THIS IS THE LINE WITH THE FIRST PLACEHOLDER ---
json_match = re.search(r"...") # Replace this line manually
if json_match:
# Extract the first valid group that contains JSON
json_str = json_match.group(1) or json_match.group(2)
if json_str:
try:
# Attempt to strip potential leading/trailing non-JSON chars if regex was too broad
json_str_cleaned = json_str.strip()
# Basic validation: starts with { or [ ends with } or ]
if (json_str_cleaned.startswith('{') and json_str_cleaned.endswith('}')) or \
(json_str_cleaned.startswith('[') and json_str_cleaned.endswith(']')):
data = json.loads(json_str_cleaned)
# Check structure for single tool call (dict)
if isinstance(data, dict) and "tool" in data and "tool_input" in data:
tool_name = data.get("tool")
tool_input = data.get("tool_input")
# Basic validation of tool name and input type
if isinstance(tool_name, str) and isinstance(tool_input, dict):
call_id = f"tool_{uuid.uuid4()}" # Generate unique ID
tool_calls.append(ToolCall(name=tool_name, args=tool_input, id=call_id))
print(f"Manually parsed Single Tool Call: ID={call_id}, Name={tool_name}, Args={tool_input}")
ai_message.content = "" # Clear content after successful parse
else:
print("Parsed JSON dict, but incorrect tool name type or tool_input is not a dict.")
# Check structure for multiple tool calls (if model outputs a list)
elif isinstance(data, list):
print("Attempting to parse list as multiple tool calls...")
parsed_list_ok = True
temp_tool_calls = []
for item in data:
if isinstance(item, dict) and "tool" in item and "tool_input" in item:
tool_name = item.get("tool")
tool_input = item.get("tool_input")
if isinstance(tool_name, str) and isinstance(tool_input, dict):
call_id = f"tool_{uuid.uuid4()}"
temp_tool_calls.append(ToolCall(name=tool_name, args=tool_input, id=call_id))
print(f"Manually parsed Multi-Tool Call item: ID={call_id}, Name={tool_name}, Args={tool_input}")
else:
parsed_list_ok = False
print("Parsed JSON list item, but incorrect tool name type or tool_input is not a dict.")
break
else:
parsed_list_ok = False
print("Parsed JSON list item, but not a valid tool call structure (missing 'tool' or 'tool_input').")
break
if parsed_list_ok and temp_tool_calls:
tool_calls.extend(temp_tool_calls)
ai_message.content = "" # Clear content if list successfully parsed
else:
print("Parsed JSON, but incorrect structure (neither dict with tool/tool_input nor list of such dicts).")
else:
print(f"Skipping manual parse: Cleaned JSON string ('{json_str_cleaned[:50]}...') does not start/end correctly with braces/brackets.")
except json.JSONDecodeError as e:
print(f"Manual JSON parsing failed: {e}. String was: '{json_str[:500]}...'") # Log the problematic string
except Exception as e:
print(f"Unexpected error during manual parsing: {e}")
import traceback
traceback.print_exc()
else:
print("Regex matched, but no JSON content found in capture groups.")
else:
print("No JSON block found in content for manual parsing.")
else:
print("AI Message content is not a string, skipping manual parse.")
# --- End Manual Parsing ---
# Attach manually parsed calls (if any) to the message
# This allows tools_condition to work correctly
if tool_calls and not ai_message.tool_calls:
ai_message.tool_calls = tool_calls
# Also clear invalid_tool_calls if we manually succeeded
ai_message.invalid_tool_calls = [] # Use empty list instead of None
# Log final interpretation
if ai_message.tool_calls:
print(f"AI Message contains tool calls (after manual check): {ai_message.tool_calls}")
elif ai_message.invalid_tool_calls:
print(f"AI Message contains INVALID tool calls: {ai_message.invalid_tool_calls}")
else:
print(f"AI Message Interpreted Content (no tool calls): {ai_message.pretty_repr()}")
return {"messages": [ai_message]}
# =======================================================
# 8. Define the Tool Node
tool_node = ToolNode(self.tools)
# 9. Create the Graph
print("Building agent graph...")
graph_builder = StateGraph(AgentState)
graph_builder.add_node("agent", agent_node)
graph_builder.add_node("tools", tool_node)
graph_builder.add_edge(START, "agent")
graph_builder.add_conditional_edges(
"agent",
tools_condition, # This condition checks ai_message.tool_calls
{
"tools": "tools",
"__end__": "__end__",
},
)
graph_builder.add_edge("tools", "agent")
# 10. Compile the graph and store it
self.graph = graph_builder.compile()
print("✅ Graph compiled successfully.")
def __call__(self, question: str) -> str:
print(f"\n--- Starting Agent Run for Question ---")
print(f"Agent received question (first 100 chars): {question[:100]}...")
# Prepare the input for the graph, including the system prompt
graph_input = {"messages": [
HumanMessage(content=self.system_prompt + "\n\nUser Question:\n" + question)
]}
final_answer_content = ""
# Stream the graph's execution
try:
# Use stream_mode="values" to get the full state at each step
for event in self.graph.stream(graph_input, stream_mode="values", config={"recursion_limit": 25}): # Increased recursion limit
# The 'event' dictionary holds the entire AgentState ('messages')
last_message = event["messages"][-1]
# Keep track of the latest AI response that isn't a tool call
if isinstance(last_message, AIMessage):
# Check if it has tool calls or invalid tool calls
has_calls = bool(last_message.tool_calls or last_message.invalid_tool_calls)
if not has_calls: # Only consider it final if no calls were attempted
# Ensure content is a string and not empty before assigning
if isinstance(last_message.content, str) and last_message.content.strip():
print(f"Potential Final AI Response: {last_message.content[:500]}...")
final_answer_content = last_message.content
# If content is empty after manual parsing cleared it, don't overwrite a previous potential answer
elif not isinstance(last_message.content, str) or not last_message.content.strip():
print("AI Message has no tool calls and empty/non-string content.")
else:
print(f"Non-string AI message content without tool calls: {last_message.content}")
elif isinstance(last_message, ToolMessage):
print(f"Tool Result ({last_message.tool_call_id}): {last_message.content[:500]}...")
# After a tool result, the next AI message might be the final one,
# so don't necessarily clear final_answer_content here. Let the loop find the *last* non-tool-call AI message.
# --- Add the cleaning step ---
cleaned_answer = final_answer_content.strip()
# More aggressive cleaning (optional, use with caution):
# Try to remove common conversational prefixes if they slipped through
prefixes_to_remove = [
"The answer is:", "Here is the answer:", "Based on the information:",
"Final Answer:", "Answer:"
]
# More thorough prefix removal
original_cleaned = cleaned_answer
for prefix in prefixes_to_remove:
if cleaned_answer.lower().startswith(prefix.lower()):
# Find where the actual answer starts after the prefix
potential_answer = cleaned_answer[len(prefix):].strip()
if potential_answer: # Only strip if there's content after the prefix
cleaned_answer = potential_answer
break # Stop after removing the first found prefix
# If nothing was stripped but prefixes exist, log it
if cleaned_answer == original_cleaned and any(cleaned_answer.lower().startswith(p.lower()) for p in prefixes_to_remove):
print(f"Warning: Prefix found but not stripped (maybe answer was empty after prefix?): '{original_cleaned[:100]}...'")
# Remove potential markdown code blocks only if the answer isn't expected to be code
# More robust check for code-like content
looks_like_code = any(kw in cleaned_answer for kw in ["def ", "import ", "print(", "for ", "while ", "if ", "class ", "=>", "dict(", "list["]) or cleaned_answer.count('\n') > 3 or (cleaned_answer.startswith('[') and cleaned_answer.endswith(']')) or (cleaned_answer.startswith('{') and cleaned_answer.endswith('}'))
if not looks_like_code:
# --- THIS IS THE LINE WITH THE SECOND PLACEHOLDER ---
cleaned_answer = [[[REGEX_PLACEHOLDER_SUB]]] # Replace this line manually
# Remove single backticks if they surround the whole answer
if cleaned_answer.startswith("`") and cleaned_answer.endswith("`"):
cleaned_answer = cleaned_answer[1:-1].strip()
print(f"Agent returning final answer (cleaned): '{cleaned_answer}'") # Add quotes for clarity
if not cleaned_answer and final_answer_content:
# If cleaning resulted in empty but original wasn't, return original
print("Warning: Agent produced an empty final answer after cleaning. Falling back to raw answer.")
return final_answer_content.strip() # Fallback if cleaning removed everything
# Handle case where agent legitimately produces no answer (e.g., error during loop)
return cleaned_answer if cleaned_answer else "AGENT FAILED TO PRODUCE ANSWER"
except Exception as e:
print(f"Error running agent graph: {e}")
import traceback
traceback.print_exc()
return f"AGENT GRAPH ERROR: {e}"
# --- (Original Template Code Starts Here - NO CHANGES NEEDED BELOW THIS LINE) ---
def run_and_submit_all( profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the BasicAgent on them, submits all answers,
and displays the results.
"""
space_id = os.getenv("SPACE_ID")
if profile:
username= f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
print("Instantiating agent...")
try:
agent = BasicAgent()
if agent.asr_pipeline is None:
print("⚠️ ASR Pipeline failed to load during agent init. Audio questions will likely fail.")
except Exception as e:
print(f"Error instantiating agent: {e}")
import traceback
traceback.print_exc() # Print full traceback for init errors
return f"Error initializing agent: {e}", None
print("Agent instantiated successfully.")
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(f"Agent code URL: {agent_code}")
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=30)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
results_log = []
answers_payload = []
total_questions = len(questions_data)
print(f"Running agent on {total_questions} questions...")
# --- Limit for Testing ---
# question_limit = 5 # Uncomment and set a number (e.g., 5) to test fewer questions
# questions_to_run = questions_data[:question_limit]
# print(f"--- RUNNING WITH QUESTION LIMIT: {question_limit} ---")
questions_to_run = questions_data # Comment this line out if using the limit above
for i, item in enumerate(questions_to_run):
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
print(f"Skipping item {i+1} with missing task_id or question: {item}")
continue
print(f"\n--- Running Task {i+1}/{len(questions_to_run)} (ID: {task_id}) ---")
try:
file_path = item.get("file_path")
if file_path:
# Check existence relative to script dir first, then CWD
script_dir = os.path.dirname(os.path.realpath(__file__))
potential_script_path = os.path.join(script_dir, file_path)
potential_cwd_path = os.path.join(os.getcwd(), file_path) # Check CWD too
if os.path.exists(potential_script_path):
file_context = f"[Attached File (exists): {file_path}]" # Path relative to script is good enough for agent
elif os.path.exists(potential_cwd_path):
file_context = f"[Attached File (exists in cwd): {file_path}]" # Path relative to cwd
else:
file_context = f"[Attached File (path provided): {file_path}]" # Agent needs to handle finding it
question_text_with_context = f"{question_text}\n\n{file_context}"
print(f"Question includes file reference: {file_path}")
else:
question_text_with_context = question_text
submitted_answer = agent(question_text_with_context)
# Ensure answer is a string, even if agent returns None or other types
submitted_answer_str = str(submitted_answer) if submitted_answer is not None else ""
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer_str})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer_str})
print(f"--- Task {task_id} Complete ---")
except Exception as e:
print(f"FATAL ERROR running agent graph on task {task_id}: {e}")
import traceback
traceback.print_exc()
submitted_answer = f"AGENT CRASH ERROR: {e}"
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=120) # Increased timeout
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
# Add score details if available
if 'scores' in result_data:
scores_dict = {item['task_id']: item['score'] for item in result_data['scores']}
results_df['Correct'] = results_df['Task ID'].map(lambda x: scores_dict.get(x, None))
results_df['Correct'] = results_df['Correct'].apply(lambda x: 'Yes' if x == 1 else ('No' if x == 0 else 'N/A'))
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
status_message = "Submission Failed: The submission request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error during submission - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"An unexpected error occurred during submission processing: {e}"
print(status_message)
import traceback
traceback.print_exc()
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Build Gradio Interface using Blocks ---
with gr.Blocks() as demo:
gr.Markdown("# GAIA Agent Evaluation Runner (LangGraph + Mistral)") # Updated title
gr.Markdown(
"""
**Instructions:**
1. Log in to your Hugging Face account using the button below.
2. Click 'Run Evaluation & Submit All Answers' to fetch questions, run the agent, submit answers, and see the score.
---
**Notes:**
* The full evaluation can take **several hours**. Use the logs tab to monitor progress.
* This agent uses `mistralai/Mistral-7B-Instruct-v0.2` and multiple tools.
* Make sure your `HUGGINGFACEHUB_API_TOKEN` secret is set correctly in Settings.
"""
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions, Agent Answers, and Results", wrap=True)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("\n" + "-"*30 + " App Starting " + "-"*30)
# Check for SPACE_HOST and SPACE_ID at startup for information
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID")
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup:
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
# Add detailed path info for debugging file access
print(f"Script directory (__file__): {os.path.dirname(os.path.realpath(__file__))}")
print(f"Current working directory (os.getcwd()): {os.getcwd()}")
# List files only if the directory exists
try:
print("Files in current working directory:", os.listdir("."))
except FileNotFoundError:
print("Warning: Could not list current working directory.")
print("-"*(60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for GAIA Agent Evaluation...")
# Set queue=True to handle multiple clicks better, though only one run should happen at a time.
demo.queue().launch(debug=True, share=False)