Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import requests | |
| import inspect | |
| import pandas as pd | |
| import io | |
| import contextlib | |
| from typing import TypedDict, Annotated | |
| import torch | |
| import json # For robust tool call parsing/generation if needed | |
| # --- Multimodal & Web Tool Imports --- | |
| from transformers import pipeline | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| import requests | |
| from bs4 import BeautifulSoup | |
| # --- LangChain & LangGraph Imports --- | |
| from langgraph.graph.message import add_messages | |
| from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage | |
| from langgraph.prebuilt import ToolNode | |
| from langgraph.graph import START, StateGraph | |
| from langgraph.prebuilt import tools_condition | |
| from langchain_huggingface import ChatHuggingFace | |
| from langchain_huggingface import HuggingFaceEndpoint | |
| from langchain_community.tools import DuckDuckGoSearchRun | |
| from langchain_core.tools import tool, BaseTool | |
| # (Keep Constants as is) | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # --- LangGraph Agent State --- | |
| class AgentState(TypedDict): | |
| messages: Annotated[list[AnyMessage], add_messages] | |
| # --- Basic Agent Definition --- | |
| # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------ | |
| class BasicAgent: | |
| # --- Tool Definitions as Methods --- | |
| # By making tools methods, they can access self.asr_pipeline | |
| def search_tool(self, query: str) -> str: | |
| """Calls DuckDuckGo search and returns the results. Use this for recent information or general web searches.""" | |
| print(f"--- Calling Search Tool with query: {query} ---") | |
| try: | |
| search = DuckDuckGoSearchRun() | |
| return search.run(query) | |
| except Exception as e: | |
| return f"Error running search: {e}" | |
| def code_interpreter(self, code: str) -> str: | |
| """ | |
| Executes a string of Python code and returns its stdout, stderr, and any error. | |
| Use this for calculations, data manipulation (including pandas on dataframes read from files), list operations, string manipulations, or any other Python operation. | |
| The code runs in a sandboxed environment. 'pandas' (as pd) and 'openpyxl' are available. | |
| Ensure the code is complete and executable. If printing, use print(). | |
| """ | |
| print(f"--- Calling Code Interpreter with code:\n{code}\n---") | |
| output_stream = io.StringIO() | |
| error_stream = io.StringIO() | |
| try: | |
| # Use contextlib to redirect stdout and stderr | |
| with contextlib.redirect_stdout(output_stream), contextlib.redirect_stderr(error_stream): | |
| # Execute the code. Provide 'pd' (pandas) in the globals | |
| exec(code, {"pd": pd}, {}) | |
| stdout = output_stream.getvalue() | |
| stderr = error_stream.getvalue() | |
| if stderr: | |
| return f"Error: {stderr}\nStdout: {stdout}" | |
| if stdout: | |
| return f"Success:\n{stdout}" | |
| return "Success: Code executed without error and produced no stdout." | |
| except Exception as e: | |
| # Capture any exception during exec | |
| return f"Execution failed with error: {str(e)}" | |
| def read_file(self, path: str) -> str: | |
| """Reads the content of a file at the specified path. Use this to examine files provided in the question.""" | |
| print(f"--- Calling Read File Tool at path: {path} ---") | |
| try: | |
| # Try finding the file relative to the app directory first | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) # Use absolute path | |
| full_path = os.path.join(script_dir, path) | |
| print(f"Attempting to read relative path: {full_path}") | |
| if not os.path.exists(full_path): | |
| # If not found, try the direct path (might be absolute or relative to cwd) | |
| full_path = path | |
| print(f"Attempting to read direct path: {full_path}") | |
| if not os.path.exists(full_path): | |
| # Try basename for GAIA questions providing just the filename | |
| base_path = os.path.basename(path) | |
| print(f"Attempting to read basename path: {base_path}") | |
| if os.path.exists(base_path): | |
| full_path = base_path | |
| else: | |
| # List files in current and script directory for debugging | |
| cwd_files = os.listdir(".") | |
| script_dir_files = os.listdir(script_dir) | |
| return (f"Error: File not found.\n" | |
| f"Tried: '{path}', '{os.path.join(script_dir, path)}', '{base_path}'.\n" | |
| f"Files in current dir (.): {cwd_files}\n" | |
| f"Files in script dir ({script_dir}): {script_dir_files}") | |
| print(f"Reading file: {full_path}") | |
| with open(full_path, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except Exception as e: | |
| return f"Error reading file {path}: {str(e)}" | |
| def write_file(self, path: str, content: str) -> str: | |
| """Writes the given content to a file at the specified path relative to the app's directory. Creates directories if they don't exist.""" | |
| print(f"--- Calling Write File Tool at path: {path} ---") | |
| try: | |
| # Ensure the directory exists | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| full_path = os.path.join(script_dir, path) # Write relative to script dir | |
| print(f"Writing file to: {full_path}") | |
| os.makedirs(os.path.dirname(full_path), exist_ok=True) | |
| with open(full_path, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| return f"Successfully wrote to file {path} (relative to app)." | |
| except Exception as e: | |
| return f"Error writing to file {path}: {str(e)}" | |
| def list_directory(self, path: str = ".") -> str: | |
| """Lists the contents (files and directories) of a directory at the specified path relative to the app's directory.""" | |
| print(f"--- Calling List Directory Tool at path: {path} ---") | |
| try: | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| full_path = os.path.join(script_dir, path) # List relative to script dir | |
| print(f"Listing directory: {full_path}") | |
| if not os.path.isdir(full_path): | |
| return f"Error: '{path}' is not a valid directory relative to the app." | |
| files = os.listdir(full_path) | |
| return "\n".join(files) if files else "Directory is empty." | |
| except Exception as e: | |
| return f"Error listing directory {path}: {str(e)}" | |
| def audio_transcription_tool(self, file_path: str) -> str: | |
| """ | |
| Transcribes an audio file (like .mp3 or .wav) using Whisper and returns the text content. | |
| Use this for questions involving audio file analysis. | |
| """ | |
| print(f"--- Calling Audio Transcription Tool at path: {file_path} ---") | |
| # Access the pipeline via self | |
| if not self.asr_pipeline: | |
| return "Error: Audio transcription pipeline is not available." | |
| try: | |
| # Try finding the file relative to the app directory first | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| full_path = os.path.join(script_dir, file_path) | |
| print(f"Attempting to transcribe relative path: {full_path}") | |
| if not os.path.exists(full_path): | |
| # If not found, try the direct path | |
| full_path = file_path | |
| print(f"Attempting to transcribe direct path: {full_path}") | |
| if not os.path.exists(full_path): | |
| # Try basename for GAIA questions | |
| base_path = os.path.basename(file_path) | |
| print(f"Attempting to transcribe basename path: {base_path}") | |
| if os.path.exists(base_path): | |
| full_path = base_path | |
| else: | |
| cwd_files = os.listdir(".") | |
| script_dir_files = os.listdir(script_dir) | |
| return (f"Error: Audio file not found.\n" | |
| f"Tried: '{file_path}', '{os.path.join(script_dir, file_path)}', '{base_path}'.\n" | |
| f"Files in current dir (.): {cwd_files}\n" | |
| f"Files in script dir ({script_dir}): {script_dir_files}") | |
| print(f"Transcribing file: {full_path}") | |
| # Use self.asr_pipeline | |
| transcription = self.asr_pipeline(full_path) | |
| print("--- Transcription Complete ---") | |
| return transcription["text"] | |
| except Exception as e: | |
| return f"Error during audio transcription: {str(e)}" | |
| def get_youtube_transcript(self, video_url: str) -> str: | |
| """ | |
| Fetches the transcript for a given YouTube video URL. Use this for questions about YouTube video content. | |
| """ | |
| print(f"--- Calling YouTube Transcript Tool for URL: {video_url} ---") | |
| try: | |
| # Extract video ID from URL more robustly | |
| video_id = None | |
| if "watch?v=" in video_url: | |
| video_id = video_url.split("v=")[1].split("&")[0] | |
| elif "youtu.be/" in video_url: | |
| video_id = video_url.split("youtu.be/")[1].split("?")[0] | |
| if not video_id: | |
| return f"Error: Could not extract video ID from URL: {video_url}" | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id) | |
| # Combine all transcript parts into one string | |
| full_transcript = " ".join([item["text"] for item in transcript_list]) | |
| print("--- Transcript Fetched ---") | |
| # Return a limited amount to avoid overwhelming the context | |
| return full_transcript[:8000] | |
| except Exception as e: | |
| return f"Error fetching YouTube transcript: {str(e)}" | |
| def scrape_web_page(self, url: str) -> str: | |
| """ | |
| Fetches the primary text content of a given web page URL, removing navigation, footer, scripts, and styles. | |
| Use this when you need the full content of a webpage found via search. | |
| """ | |
| print(f"--- Calling Web Scraper Tool for URL: {url} ---") | |
| try: | |
| headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'} | |
| response = requests.get(url, headers=headers, timeout=15) # Increased timeout | |
| response.raise_for_status() # Raise an error for bad responses (4xx or 5xx) | |
| # Check content type to avoid parsing non-HTML | |
| if 'html' not in response.headers.get('Content-Type', '').lower(): | |
| return f"Error: URL {url} did not return HTML content." | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Remove common non-content tags | |
| for tag in soup(["script", "style", "nav", "footer", "aside", "header", "form"]): | |
| tag.extract() | |
| # Attempt to find the main content area (heuristics, may not always work) | |
| main_content = soup.find('main') or soup.find('article') or soup.find('div', role='main') or soup.body | |
| if not main_content: | |
| main_content = soup # Fallback to the whole soup if no main area found | |
| text = main_content.get_text(separator='\n', strip=True) | |
| # Clean up excessive whitespace | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| print("--- Web Page Scraped ---") | |
| # Limit context size | |
| return text[:8000] | |
| except requests.exceptions.RequestException as e: | |
| return f"Error fetching web page {url}: {str(e)}" | |
| except Exception as e: | |
| return f"Error scraping web page {url}: {str(e)}" | |
| # --- End of Tool Definitions --- | |
| def __init__(self): | |
| print("BasicAgent (LangGraph) initializing...") | |
| # 1. Initialize ASR Pipeline *inside* init - DELAYED LOADING | |
| # ==================== MOVED HERE ==================== | |
| self.asr_pipeline = None # Initialize as None first | |
| try: | |
| print("Loading ASR (Whisper) pipeline...") | |
| # Decide device based on availability | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| print(f"Using device: {device} for ASR.") | |
| self.asr_pipeline = pipeline( | |
| "automatic-speech-recognition", | |
| model="openai/whisper-base", | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, # Use float16 only if CUDA available | |
| device=device # Explicitly set device | |
| ) | |
| print("✅ ASR (Whisper) pipeline loaded successfully.") | |
| except Exception as e: | |
| print(f"⚠️ Warning: Could not load ASR pipeline. Audio tool will not work. Error: {e}") | |
| self.asr_pipeline = None # Ensure it's None if loading fails | |
| # ==================================================== | |
| # 2. Get API Token from Space Secrets | |
| HUGGINGFACEHUB_API_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
| if not HUGGINGFACEHUB_API_TOKEN: | |
| raise ValueError("HUGGINGFACEHUB_API_TOKEN secret is not set! Please add it to your Space secrets.") | |
| # 3. Collect Tool Methods | |
| # LangChain tools expect functions or objects with a 'run' method. | |
| # The @tool decorator makes our methods compatible. | |
| self.tools = [ | |
| self.search_tool, # References the method | |
| self.code_interpreter, | |
| self.read_file, | |
| self.write_file, | |
| self.list_directory, | |
| self.audio_transcription_tool, | |
| self.get_youtube_transcript, | |
| self.scrape_web_page | |
| ] | |
| # 4. Define the Improved System Prompt | |
| tool_descriptions = "\n".join([f"- {tool.name}: {tool.description}" for tool in self.tools]) | |
| self.system_prompt = f"""You are a highly intelligent and meticulous AI assistant built to answer questions from the GAIA benchmark. | |
| Your primary goal is to provide **only the concise, factual, and direct answer** to the user's question, exactly matching the format required by the benchmark (e.g., a name, a number, a specific string format, a comma-separated list). | |
| **CRITICAL INSTRUCTIONS:** | |
| * **DO NOT** include conversational filler (e.g., "Sure, I can help...", "The answer is...", "Here is the information..."). | |
| * **DO NOT** explain your reasoning or the steps you took unless the question *explicitly* asks for it. | |
| * **DO NOT** repeat the question in your final answer. | |
| * **FINAL ANSWER FORMAT:** Your final response must contain *only* the answer itself. | |
| You have access to the following tools to gather information and perform actions: | |
| {tool_descriptions} | |
| **TOOL USAGE PROTOCOL:** | |
| * To use a tool, you MUST respond ONLY with a single JSON object formatted exactly like this: | |
| ```json | |
| {{ | |
| "tool": "tool_name", | |
| "tool_input": {{ "arg_name1": "value1", "arg_name2": "value2", ... }} | |
| }} | |
| ``` | |
| * Replace `tool_name` with the exact name of the tool you want to use. | |
| * Provide the required arguments within the `tool_input` dictionary. Ensure argument names and value types match the tool description precisely. | |
| * Do not add any text before or after the JSON tool call block. | |
| **REASONING PROCESS:** | |
| 1. Carefully analyze the user's question to understand the specific information required and the expected answer format. Check if any files are attached (mentioned like `[Attached File: filename.ext]`). | |
| 2. Break down the problem into logical steps. | |
| 3. Determine if any tools are necessary. Use `read_file` for attached files, `audio_transcription_tool` for audio, `get_youtube_transcript` for YouTube URLs, `search_tool` for web info, `scrape_web_page` to read content from URLs found via search, and `code_interpreter` for calculations or data processing. | |
| 4. If a tool is needed, call it using the specified JSON format. Wait for the tool's output. | |
| 5. Analyze the tool's output. If the answer is found, proceed to step 7. | |
| 6. If more information or steps are needed, use another tool (step 4) or continue reasoning based on the gathered information. Pay close attention to previous tool results. | |
| 7. Once you have derived the final, definitive answer that meets the question's requirements, output **ONLY** that answer and nothing else. Stop the process. | |
| """ | |
| import os | |
| import gradio as gr | |
| import requests | |
| import inspect | |
| import pandas as pd | |
| import io | |
| import contextlib | |
| from typing import TypedDict, Annotated | |
| import torch | |
| import json # For robust tool call parsing/generation if needed | |
| import re # For finding JSON | |
| import uuid # For generating tool call IDs | |
| # --- Multimodal & Web Tool Imports --- | |
| from transformers import pipeline | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| import requests | |
| from bs4 import BeautifulSoup | |
| # --- LangChain & LangGraph Imports --- | |
| from langgraph.graph.message import add_messages | |
| # Make sure to import ToolCall | |
| from langchain_core.messages import AnyMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage, ToolCall | |
| from langgraph.prebuilt import ToolNode | |
| from langgraph.graph import START, StateGraph | |
| from langgraph.prebuilt import tools_condition | |
| from langchain_huggingface import ChatHuggingFace | |
| from langchain_huggingface import HuggingFaceEndpoint | |
| from langchain_community.tools import DuckDuckGoSearchRun | |
| from langchain_core.tools import tool, BaseTool | |
| # (Keep Constants as is) | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # --- LangGraph Agent State --- | |
| class AgentState(TypedDict): | |
| messages: Annotated[list[AnyMessage], add_messages] | |
| # --- Basic Agent Definition --- | |
| # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------ | |
| class BasicAgent: | |
| # --- Tool Definitions as Methods --- | |
| # By making tools methods, they can access self.asr_pipeline | |
| def search_tool(self, query: str) -> str: | |
| """Calls DuckDuckGo search and returns the results. Use this for recent information or general web searches.""" | |
| print(f"--- Calling Search Tool with query: {query} ---") | |
| try: | |
| search = DuckDuckGoSearchRun() | |
| return search.run(query) | |
| except Exception as e: | |
| return f"Error running search: {e}" | |
| def code_interpreter(self, code: str) -> str: | |
| """ | |
| Executes a string of Python code and returns its stdout, stderr, and any error. | |
| Use this for calculations, data manipulation (including pandas on dataframes read from files), list operations, string manipulations, or any other Python operation. | |
| The code runs in a sandboxed environment. 'pandas' (as pd) and 'openpyxl' are available. | |
| Ensure the code is complete and executable. If printing, use print(). | |
| """ | |
| print(f"--- Calling Code Interpreter with code:\n{code}\n---") | |
| output_stream = io.StringIO() | |
| error_stream = io.StringIO() | |
| try: | |
| # Use contextlib to redirect stdout and stderr | |
| with contextlib.redirect_stdout(output_stream), contextlib.redirect_stderr(error_stream): | |
| # Execute the code. Provide 'pd' (pandas) in the globals | |
| exec(code, {"pd": pd}, {}) | |
| stdout = output_stream.getvalue() | |
| stderr = error_stream.getvalue() | |
| if stderr: | |
| return f"Error: {stderr}\nStdout: {stdout}" | |
| if stdout: | |
| return f"Success:\n{stdout}" | |
| return "Success: Code executed without error and produced no stdout." | |
| except Exception as e: | |
| # Capture any exception during exec | |
| return f"Execution failed with error: {str(e)}" | |
| def read_file(self, path: str) -> str: | |
| """Reads the content of a file at the specified path. Use this to examine files provided in the question.""" | |
| print(f"--- Calling Read File Tool at path: {path} ---") | |
| try: | |
| # Try finding the file relative to the app directory first | |
| # Use os.path.dirname(os.path.realpath(__file__)) for robustness in different execution contexts | |
| script_dir = os.path.dirname(os.path.realpath(__file__)) | |
| full_path = os.path.join(script_dir, path) | |
| print(f"Attempting to read relative path: {full_path}") | |
| if not os.path.exists(full_path): | |
| # If not found, try the direct path (might be absolute or relative to cwd) | |
| full_path = path | |
| print(f"Attempting to read direct path: {full_path}") | |
| if not os.path.exists(full_path): | |
| # Try basename for GAIA questions providing just the filename | |
| base_path = os.path.basename(path) | |
| print(f"Attempting to read basename path in cwd: {os.path.join(os.getcwd(), base_path)}") | |
| if os.path.exists(base_path): # Check relative to CWD | |
| full_path = base_path | |
| else: | |
| # List files in current and script directory for debugging | |
| try: | |
| cwd_files = os.listdir(".") | |
| except Exception: | |
| cwd_files = ["Error listing CWD"] | |
| try: | |
| script_dir_files = os.listdir(script_dir) | |
| except Exception: | |
| script_dir_files = ["Error listing script dir"] | |
| return (f"Error: File not found.\n" | |
| f"Tried relative path: '{os.path.join(script_dir, path)}'\n" | |
| f"Tried direct path: '{path}'\n" | |
| f"Tried basename in CWD: '{base_path}'\n" | |
| f"Files in current dir (.): {cwd_files}\n" | |
| f"Files in script dir ({script_dir}): {script_dir_files}") | |
| print(f"Reading file: {full_path}") | |
| with open(full_path, 'r', encoding='utf-8') as f: | |
| return f.read() | |
| except Exception as e: | |
| return f"Error reading file {path}: {str(e)}" | |
| def write_file(self, path: str, content: str) -> str: | |
| """Writes the given content to a file at the specified path relative to the app's directory. Creates directories if they don't exist.""" | |
| print(f"--- Calling Write File Tool at path: {path} ---") | |
| try: | |
| # Ensure the directory exists | |
| script_dir = os.path.dirname(os.path.realpath(__file__)) | |
| full_path = os.path.join(script_dir, path) # Write relative to script dir | |
| print(f"Writing file to: {full_path}") | |
| os.makedirs(os.path.dirname(full_path), exist_ok=True) | |
| with open(full_path, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| return f"Successfully wrote to file {path} (relative to app)." | |
| except Exception as e: | |
| return f"Error writing to file {path}: {str(e)}" | |
| def list_directory(self, path: str = ".") -> str: | |
| """Lists the contents (files and directories) of a directory at the specified path relative to the app's directory.""" | |
| print(f"--- Calling List Directory Tool at path: {path} ---") | |
| try: | |
| script_dir = os.path.dirname(os.path.realpath(__file__)) | |
| full_path = os.path.join(script_dir, path) # List relative to script dir | |
| print(f"Listing directory: {full_path}") | |
| if not os.path.isdir(full_path): | |
| return f"Error: '{path}' is not a valid directory relative to the app." | |
| files = os.listdir(full_path) | |
| return "\n".join(files) if files else "Directory is empty." | |
| except Exception as e: | |
| return f"Error listing directory {path}: {str(e)}" | |
| def audio_transcription_tool(self, file_path: str) -> str: | |
| """ | |
| Transcribes an audio file (like .mp3 or .wav) using Whisper and returns the text content. | |
| Use this for questions involving audio file analysis. | |
| """ | |
| print(f"--- Calling Audio Transcription Tool at path: {file_path} ---") | |
| # Access the pipeline via self | |
| if not self.asr_pipeline: | |
| return "Error: Audio transcription pipeline is not available." | |
| try: | |
| # Try finding the file relative to the app directory first | |
| script_dir = os.path.dirname(os.path.realpath(__file__)) | |
| full_path = os.path.join(script_dir, file_path) | |
| print(f"Attempting to transcribe relative path: {full_path}") | |
| if not os.path.exists(full_path): | |
| # If not found, try the direct path | |
| full_path = file_path | |
| print(f"Attempting to transcribe direct path: {full_path}") | |
| if not os.path.exists(full_path): | |
| # Try basename for GAIA questions | |
| base_path = os.path.basename(file_path) | |
| print(f"Attempting to transcribe basename path in CWD: {os.path.join(os.getcwd(), base_path)}") | |
| if os.path.exists(base_path): # Check relative to CWD | |
| full_path = base_path | |
| else: | |
| try: | |
| cwd_files = os.listdir(".") | |
| except Exception: | |
| cwd_files = ["Error listing CWD"] | |
| try: | |
| script_dir_files = os.listdir(script_dir) | |
| except Exception: | |
| script_dir_files = ["Error listing script dir"] | |
| return (f"Error: Audio file not found.\n" | |
| f"Tried relative path: '{os.path.join(script_dir, file_path)}'\n" | |
| f"Tried direct path: '{file_path}'\n" | |
| f"Tried basename in CWD: '{base_path}'\n" | |
| f"Files in current dir (.): {cwd_files}\n" | |
| f"Files in script dir ({script_dir}): {script_dir_files}") | |
| print(f"Transcribing file: {full_path}") | |
| # Important: Ensure the pipeline can handle the file path directly | |
| transcription = self.asr_pipeline(full_path) | |
| print("--- Transcription Complete ---") | |
| # The output structure might vary slightly based on pipeline version | |
| return transcription.get("text", "Error: Transcription failed to produce text.") | |
| except Exception as e: | |
| import traceback | |
| print(f"Error during audio transcription: {e}") | |
| traceback.print_exc() | |
| return f"Error during audio transcription: {str(e)}" | |
| def get_youtube_transcript(self, video_url: str) -> str: | |
| """ | |
| Fetches the transcript for a given YouTube video URL. Use this for questions about YouTube video content. | |
| """ | |
| print(f"--- Calling YouTube Transcript Tool for URL: {video_url} ---") | |
| try: | |
| # Extract video ID from URL more robustly | |
| video_id = None | |
| if "watch?v=" in video_url: | |
| video_id = video_url.split("v=")[1].split("&")[0] | |
| elif "youtu.be/" in video_url: | |
| video_id = video_url.split("youtu.be/")[1].split("?")[0] | |
| if not video_id: | |
| return f"Error: Could not extract video ID from URL: {video_url}" | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id) | |
| # Combine all transcript parts into one string | |
| full_transcript = " ".join([item["text"] for item in transcript_list]) | |
| print("--- Transcript Fetched ---") | |
| # Return a limited amount to avoid overwhelming the context | |
| return full_transcript[:8000] | |
| except Exception as e: | |
| return f"Error fetching YouTube transcript: {str(e)}" | |
| def scrape_web_page(self, url: str) -> str: | |
| """ | |
| Fetches the primary text content of a given web page URL, removing navigation, footer, scripts, and styles. | |
| Use this when you need the full content of a webpage found via search. | |
| """ | |
| print(f"--- Calling Web Scraper Tool for URL: {url} ---") | |
| try: | |
| headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'} | |
| response = requests.get(url, headers=headers, timeout=15) # Increased timeout | |
| response.raise_for_status() # Raise an error for bad responses (4xx or 5xx) | |
| # Check content type to avoid parsing non-HTML | |
| if 'html' not in response.headers.get('Content-Type', '').lower(): | |
| return f"Error: URL {url} did not return HTML content." | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Remove common non-content tags | |
| for tag in soup(["script", "style", "nav", "footer", "aside", "header", "form", "button", "input"]): | |
| tag.extract() | |
| # Attempt to find the main content area (heuristics, may not always work) | |
| main_content = soup.find('main') or soup.find('article') or soup.find('div', role='main') or soup.body | |
| if not main_content: | |
| main_content = soup # Fallback to the whole soup if no main area found | |
| text = main_content.get_text(separator='\n', strip=True) | |
| # Clean up excessive whitespace | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text = '\n'.join(chunk for chunk in chunks if chunk) | |
| print("--- Web Page Scraped ---") | |
| # Limit context size | |
| return text[:8000] | |
| except requests.exceptions.RequestException as e: | |
| return f"Error fetching web page {url}: {str(e)}" | |
| except Exception as e: | |
| return f"Error scraping web page {url}: {str(e)}" | |
| # --- End of Tool Definitions --- | |
| def __init__(self): | |
| print("BasicAgent (LangGraph) initializing...") | |
| # 1. Initialize ASR Pipeline *inside* init - DELAYED LOADING | |
| self.asr_pipeline = None # Initialize as None first | |
| try: | |
| print("Loading ASR (Whisper) pipeline...") | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| print(f"Using device: {device} for ASR.") | |
| self.asr_pipeline = pipeline( | |
| "automatic-speech-recognition", | |
| model="openai/whisper-base", | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| device=device | |
| ) | |
| print("✅ ASR (Whisper) pipeline loaded successfully.") | |
| except Exception as e: | |
| print(f"⚠️ Warning: Could not load ASR pipeline. Audio tool will not work. Error: {e}") | |
| import traceback | |
| traceback.print_exc() # Print full traceback for ASR load error | |
| self.asr_pipeline = None | |
| # ==================================================== | |
| # 2. Get API Token from Space Secrets | |
| HUGGINGFACEHUB_API_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
| if not HUGGINGFACEHUB_API_TOKEN: | |
| raise ValueError("HUGGINGFACEHUB_API_TOKEN secret is not set! Please add it to your Space secrets.") | |
| # 3. Collect Tool Methods | |
| self.tools = [ | |
| self.search_tool, | |
| self.code_interpreter, | |
| self.read_file, | |
| self.write_file, | |
| self.list_directory, | |
| self.audio_transcription_tool, | |
| self.get_youtube_transcript, | |
| self.scrape_web_page | |
| ] | |
| # 4. Define the Improved System Prompt with Placeholders | |
| tool_descriptions = "\n".join([f"- {tool.name}: {tool.description}" for tool in self.tools]) | |
| self.system_prompt = f"""You are a highly intelligent and meticulous AI assistant built to answer questions from the GAIA benchmark. | |
| Your primary goal is to provide **only the concise, factual, and direct answer** to the user's question, exactly matching the format required by the benchmark (e.g., a name, a number, a specific string format, a comma-separated list). | |
| **CRITICAL INSTRUCTIONS:** | |
| * **DO NOT** include conversational filler (e.g., "Sure, I can help...", "The answer is...", "Here is the information..."). | |
| * **DO NOT** explain your reasoning or the steps you took unless the question *explicitly* asks for it. | |
| * **DO NOT** repeat the question in your final answer. | |
| * **FINAL ANSWER FORMAT:** Your final response must contain *only* the answer itself. | |
| You have access to the following tools to gather information and perform actions: | |
| {tool_descriptions} | |
| **TOOL USAGE PROTOCOL:** | |
| * To use a tool, you MUST respond ONLY with a single JSON object formatted exactly like this: | |
| ```json | |
| {{ | |
| "tool": "tool_name", | |
| "tool_input": {{ "arg_name1": "value1", "arg_name2": "value2", ... }} | |
| }} | |
| ``` | |
| * Replace `tool_name` with the exact name of the tool you want to use. | |
| * Provide the required arguments within the `tool_input` dictionary. Ensure argument names and value types match the tool description precisely. | |
| * Do not add any text before or after the JSON tool call block. | |
| **REASONING PROCESS:** | |
| 1. Carefully analyze the user's question to understand the specific information required and the expected answer format. Check if any files are attached (mentioned like `[Attached File: filename.ext]`). | |
| 2. Break down the problem into logical steps. | |
| 3. Determine if any tools are necessary. Use `read_file` for attached files, `audio_transcription_tool` for audio, `get_youtube_transcript` for YouTube URLs, `search_tool` for web info, `scrape_web_page` to read content from URLs found via search, and `code_interpreter` for calculations or data processing. | |
| 4. If a tool is needed, call it using the specified JSON format. Wait for the tool's output. | |
| 5. Analyze the tool's output. If the answer is found, proceed to step 7. | |
| 6. If more information or steps are needed, use another tool (step 4) or continue reasoning based on the gathered information. Pay close attention to previous tool results. | |
| 7. Once you have derived the final, definitive answer that meets the question's requirements, output **ONLY** that answer and nothing else. Stop the process. | |
| """ | |
| # 5. Initialize the LLM (Using Mistral Instruct) | |
| print("Initializing LLM Endpoint...") | |
| llm = HuggingFaceEndpoint( | |
| repo_id="mistralai/Mistral-7B-Instruct-v0.2", # Switched model | |
| huggingfacehub_api_token=HUGGINGFACEHUB_API_TOKEN, | |
| max_new_tokens=2048, | |
| temperature=0.01, | |
| ) | |
| chat_llm = ChatHuggingFace(llm=llm) | |
| print("✅ LLM Endpoint initialized.") | |
| # 6. Bind tools to the LLM | |
| # We still bind tools, but we'll manually parse if it fails | |
| self.llm_with_tools = chat_llm.bind_tools(self.tools) | |
| print("✅ Tools bound to LLM.") | |
| # 7. Define the Agent Node with Manual Tool Parsing | |
| # ==================== NODE WITH PLACEHOLDER REGEX ==================== | |
| def agent_node(state: AgentState): | |
| print("--- Running Agent Node ---") | |
| messages_with_prompt = state["messages"] | |
| # Invoke the LLM (which has tools bound) | |
| ai_message: AIMessage = self.llm_with_tools.invoke(messages_with_prompt) | |
| print(f"AI Message Raw Content: {ai_message.content}") | |
| # --- Manual Tool Call Parsing Logic --- | |
| tool_calls = [] | |
| # Check if bind_tools already populated tool_calls (ideal case) | |
| if ai_message.tool_calls: | |
| print(f"SUCCESS: bind_tools correctly parsed tool_calls: {ai_message.tool_calls}") | |
| tool_calls = ai_message.tool_calls | |
| # Fallback: Check if content contains likely JSON for tool calls | |
| # Use regex to find JSON possibly wrapped in markdown | |
| elif isinstance(ai_message.content, str): | |
| print("Attempting manual JSON parsing from content...") | |
| # --- THIS IS THE LINE WITH THE FIRST PLACEHOLDER --- | |
| json_match = re.search(r"...") # Replace this line manually | |
| if json_match: | |
| # Extract the first valid group that contains JSON | |
| json_str = json_match.group(1) or json_match.group(2) | |
| if json_str: | |
| try: | |
| # Attempt to strip potential leading/trailing non-JSON chars if regex was too broad | |
| json_str_cleaned = json_str.strip() | |
| # Basic validation: starts with { or [ ends with } or ] | |
| if (json_str_cleaned.startswith('{') and json_str_cleaned.endswith('}')) or \ | |
| (json_str_cleaned.startswith('[') and json_str_cleaned.endswith(']')): | |
| data = json.loads(json_str_cleaned) | |
| # Check structure for single tool call (dict) | |
| if isinstance(data, dict) and "tool" in data and "tool_input" in data: | |
| tool_name = data.get("tool") | |
| tool_input = data.get("tool_input") | |
| # Basic validation of tool name and input type | |
| if isinstance(tool_name, str) and isinstance(tool_input, dict): | |
| call_id = f"tool_{uuid.uuid4()}" # Generate unique ID | |
| tool_calls.append(ToolCall(name=tool_name, args=tool_input, id=call_id)) | |
| print(f"Manually parsed Single Tool Call: ID={call_id}, Name={tool_name}, Args={tool_input}") | |
| ai_message.content = "" # Clear content after successful parse | |
| else: | |
| print("Parsed JSON dict, but incorrect tool name type or tool_input is not a dict.") | |
| # Check structure for multiple tool calls (if model outputs a list) | |
| elif isinstance(data, list): | |
| print("Attempting to parse list as multiple tool calls...") | |
| parsed_list_ok = True | |
| temp_tool_calls = [] | |
| for item in data: | |
| if isinstance(item, dict) and "tool" in item and "tool_input" in item: | |
| tool_name = item.get("tool") | |
| tool_input = item.get("tool_input") | |
| if isinstance(tool_name, str) and isinstance(tool_input, dict): | |
| call_id = f"tool_{uuid.uuid4()}" | |
| temp_tool_calls.append(ToolCall(name=tool_name, args=tool_input, id=call_id)) | |
| print(f"Manually parsed Multi-Tool Call item: ID={call_id}, Name={tool_name}, Args={tool_input}") | |
| else: | |
| parsed_list_ok = False | |
| print("Parsed JSON list item, but incorrect tool name type or tool_input is not a dict.") | |
| break | |
| else: | |
| parsed_list_ok = False | |
| print("Parsed JSON list item, but not a valid tool call structure (missing 'tool' or 'tool_input').") | |
| break | |
| if parsed_list_ok and temp_tool_calls: | |
| tool_calls.extend(temp_tool_calls) | |
| ai_message.content = "" # Clear content if list successfully parsed | |
| else: | |
| print("Parsed JSON, but incorrect structure (neither dict with tool/tool_input nor list of such dicts).") | |
| else: | |
| print(f"Skipping manual parse: Cleaned JSON string ('{json_str_cleaned[:50]}...') does not start/end correctly with braces/brackets.") | |
| except json.JSONDecodeError as e: | |
| print(f"Manual JSON parsing failed: {e}. String was: '{json_str[:500]}...'") # Log the problematic string | |
| except Exception as e: | |
| print(f"Unexpected error during manual parsing: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| else: | |
| print("Regex matched, but no JSON content found in capture groups.") | |
| else: | |
| print("No JSON block found in content for manual parsing.") | |
| else: | |
| print("AI Message content is not a string, skipping manual parse.") | |
| # --- End Manual Parsing --- | |
| # Attach manually parsed calls (if any) to the message | |
| # This allows tools_condition to work correctly | |
| if tool_calls and not ai_message.tool_calls: | |
| ai_message.tool_calls = tool_calls | |
| # Also clear invalid_tool_calls if we manually succeeded | |
| ai_message.invalid_tool_calls = [] # Use empty list instead of None | |
| # Log final interpretation | |
| if ai_message.tool_calls: | |
| print(f"AI Message contains tool calls (after manual check): {ai_message.tool_calls}") | |
| elif ai_message.invalid_tool_calls: | |
| print(f"AI Message contains INVALID tool calls: {ai_message.invalid_tool_calls}") | |
| else: | |
| print(f"AI Message Interpreted Content (no tool calls): {ai_message.pretty_repr()}") | |
| return {"messages": [ai_message]} | |
| # ======================================================= | |
| # 8. Define the Tool Node | |
| tool_node = ToolNode(self.tools) | |
| # 9. Create the Graph | |
| print("Building agent graph...") | |
| graph_builder = StateGraph(AgentState) | |
| graph_builder.add_node("agent", agent_node) | |
| graph_builder.add_node("tools", tool_node) | |
| graph_builder.add_edge(START, "agent") | |
| graph_builder.add_conditional_edges( | |
| "agent", | |
| tools_condition, # This condition checks ai_message.tool_calls | |
| { | |
| "tools": "tools", | |
| "__end__": "__end__", | |
| }, | |
| ) | |
| graph_builder.add_edge("tools", "agent") | |
| # 10. Compile the graph and store it | |
| self.graph = graph_builder.compile() | |
| print("✅ Graph compiled successfully.") | |
| def __call__(self, question: str) -> str: | |
| print(f"\n--- Starting Agent Run for Question ---") | |
| print(f"Agent received question (first 100 chars): {question[:100]}...") | |
| # Prepare the input for the graph, including the system prompt | |
| graph_input = {"messages": [ | |
| HumanMessage(content=self.system_prompt + "\n\nUser Question:\n" + question) | |
| ]} | |
| final_answer_content = "" | |
| # Stream the graph's execution | |
| try: | |
| # Use stream_mode="values" to get the full state at each step | |
| for event in self.graph.stream(graph_input, stream_mode="values", config={"recursion_limit": 25}): # Increased recursion limit | |
| # The 'event' dictionary holds the entire AgentState ('messages') | |
| last_message = event["messages"][-1] | |
| # Keep track of the latest AI response that isn't a tool call | |
| if isinstance(last_message, AIMessage): | |
| # Check if it has tool calls or invalid tool calls | |
| has_calls = bool(last_message.tool_calls or last_message.invalid_tool_calls) | |
| if not has_calls: # Only consider it final if no calls were attempted | |
| # Ensure content is a string and not empty before assigning | |
| if isinstance(last_message.content, str) and last_message.content.strip(): | |
| print(f"Potential Final AI Response: {last_message.content[:500]}...") | |
| final_answer_content = last_message.content | |
| # If content is empty after manual parsing cleared it, don't overwrite a previous potential answer | |
| elif not isinstance(last_message.content, str) or not last_message.content.strip(): | |
| print("AI Message has no tool calls and empty/non-string content.") | |
| else: | |
| print(f"Non-string AI message content without tool calls: {last_message.content}") | |
| elif isinstance(last_message, ToolMessage): | |
| print(f"Tool Result ({last_message.tool_call_id}): {last_message.content[:500]}...") | |
| # After a tool result, the next AI message might be the final one, | |
| # so don't necessarily clear final_answer_content here. Let the loop find the *last* non-tool-call AI message. | |
| # --- Add the cleaning step --- | |
| cleaned_answer = final_answer_content.strip() | |
| # More aggressive cleaning (optional, use with caution): | |
| # Try to remove common conversational prefixes if they slipped through | |
| prefixes_to_remove = [ | |
| "The answer is:", "Here is the answer:", "Based on the information:", | |
| "Final Answer:", "Answer:" | |
| ] | |
| # More thorough prefix removal | |
| original_cleaned = cleaned_answer | |
| for prefix in prefixes_to_remove: | |
| if cleaned_answer.lower().startswith(prefix.lower()): | |
| # Find where the actual answer starts after the prefix | |
| potential_answer = cleaned_answer[len(prefix):].strip() | |
| if potential_answer: # Only strip if there's content after the prefix | |
| cleaned_answer = potential_answer | |
| break # Stop after removing the first found prefix | |
| # If nothing was stripped but prefixes exist, log it | |
| if cleaned_answer == original_cleaned and any(cleaned_answer.lower().startswith(p.lower()) for p in prefixes_to_remove): | |
| print(f"Warning: Prefix found but not stripped (maybe answer was empty after prefix?): '{original_cleaned[:100]}...'") | |
| # Remove potential markdown code blocks only if the answer isn't expected to be code | |
| # More robust check for code-like content | |
| looks_like_code = any(kw in cleaned_answer for kw in ["def ", "import ", "print(", "for ", "while ", "if ", "class ", "=>", "dict(", "list["]) or cleaned_answer.count('\n') > 3 or (cleaned_answer.startswith('[') and cleaned_answer.endswith(']')) or (cleaned_answer.startswith('{') and cleaned_answer.endswith('}')) | |
| if not looks_like_code: | |
| # --- THIS IS THE LINE WITH THE SECOND PLACEHOLDER --- | |
| cleaned_answer = [[[REGEX_PLACEHOLDER_SUB]]] # Replace this line manually | |
| # Remove single backticks if they surround the whole answer | |
| if cleaned_answer.startswith("`") and cleaned_answer.endswith("`"): | |
| cleaned_answer = cleaned_answer[1:-1].strip() | |
| print(f"Agent returning final answer (cleaned): '{cleaned_answer}'") # Add quotes for clarity | |
| if not cleaned_answer and final_answer_content: | |
| # If cleaning resulted in empty but original wasn't, return original | |
| print("Warning: Agent produced an empty final answer after cleaning. Falling back to raw answer.") | |
| return final_answer_content.strip() # Fallback if cleaning removed everything | |
| # Handle case where agent legitimately produces no answer (e.g., error during loop) | |
| return cleaned_answer if cleaned_answer else "AGENT FAILED TO PRODUCE ANSWER" | |
| except Exception as e: | |
| print(f"Error running agent graph: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return f"AGENT GRAPH ERROR: {e}" | |
| # --- (Original Template Code Starts Here - NO CHANGES NEEDED BELOW THIS LINE) --- | |
| def run_and_submit_all( profile: gr.OAuthProfile | None): | |
| """ | |
| Fetches all questions, runs the BasicAgent on them, submits all answers, | |
| and displays the results. | |
| """ | |
| space_id = os.getenv("SPACE_ID") | |
| if profile: | |
| username= f"{profile.username}" | |
| print(f"User logged in: {username}") | |
| else: | |
| print("User not logged in.") | |
| return "Please Login to Hugging Face with the button.", None | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| submit_url = f"{api_url}/submit" | |
| print("Instantiating agent...") | |
| try: | |
| agent = BasicAgent() | |
| if agent.asr_pipeline is None: | |
| print("⚠️ ASR Pipeline failed to load during agent init. Audio questions will likely fail.") | |
| except Exception as e: | |
| print(f"Error instantiating agent: {e}") | |
| import traceback | |
| traceback.print_exc() # Print full traceback for init errors | |
| return f"Error initializing agent: {e}", None | |
| print("Agent instantiated successfully.") | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
| print(f"Agent code URL: {agent_code}") | |
| print(f"Fetching questions from: {questions_url}") | |
| try: | |
| response = requests.get(questions_url, timeout=30) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| print("Fetched questions list is empty.") | |
| return "Fetched questions list is empty or invalid format.", None | |
| print(f"Fetched {len(questions_data)} questions.") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching questions: {e}") | |
| return f"Error fetching questions: {e}", None | |
| except requests.exceptions.JSONDecodeError as e: | |
| print(f"Error decoding JSON response from questions endpoint: {e}") | |
| print(f"Response text: {response.text[:500]}") | |
| return f"Error decoding server response for questions: {e}", None | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching questions: {e}") | |
| return f"An unexpected error occurred fetching questions: {e}", None | |
| results_log = [] | |
| answers_payload = [] | |
| total_questions = len(questions_data) | |
| print(f"Running agent on {total_questions} questions...") | |
| # --- Limit for Testing --- | |
| # question_limit = 5 # Uncomment and set a number (e.g., 5) to test fewer questions | |
| # questions_to_run = questions_data[:question_limit] | |
| # print(f"--- RUNNING WITH QUESTION LIMIT: {question_limit} ---") | |
| questions_to_run = questions_data # Comment this line out if using the limit above | |
| for i, item in enumerate(questions_to_run): | |
| task_id = item.get("task_id") | |
| question_text = item.get("question") | |
| if not task_id or question_text is None: | |
| print(f"Skipping item {i+1} with missing task_id or question: {item}") | |
| continue | |
| print(f"\n--- Running Task {i+1}/{len(questions_to_run)} (ID: {task_id}) ---") | |
| try: | |
| file_path = item.get("file_path") | |
| if file_path: | |
| # Check existence relative to script dir first, then CWD | |
| script_dir = os.path.dirname(os.path.realpath(__file__)) | |
| potential_script_path = os.path.join(script_dir, file_path) | |
| potential_cwd_path = os.path.join(os.getcwd(), file_path) # Check CWD too | |
| if os.path.exists(potential_script_path): | |
| file_context = f"[Attached File (exists): {file_path}]" # Path relative to script is good enough for agent | |
| elif os.path.exists(potential_cwd_path): | |
| file_context = f"[Attached File (exists in cwd): {file_path}]" # Path relative to cwd | |
| else: | |
| file_context = f"[Attached File (path provided): {file_path}]" # Agent needs to handle finding it | |
| question_text_with_context = f"{question_text}\n\n{file_context}" | |
| print(f"Question includes file reference: {file_path}") | |
| else: | |
| question_text_with_context = question_text | |
| submitted_answer = agent(question_text_with_context) | |
| # Ensure answer is a string, even if agent returns None or other types | |
| submitted_answer_str = str(submitted_answer) if submitted_answer is not None else "" | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer_str}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer_str}) | |
| print(f"--- Task {task_id} Complete ---") | |
| except Exception as e: | |
| print(f"FATAL ERROR running agent graph on task {task_id}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| submitted_answer = f"AGENT CRASH ERROR: {e}" | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
| if not answers_payload: | |
| print("Agent did not produce any answers to submit.") | |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
| # 4. Prepare Submission | |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
| status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
| print(status_update) | |
| # 5. Submit | |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
| try: | |
| response = requests.post(submit_url, json=submission_data, timeout=120) # Increased timeout | |
| response.raise_for_status() | |
| result_data = response.json() | |
| final_status = ( | |
| f"Submission Successful!\n" | |
| f"User: {result_data.get('username')}\n" | |
| f"Overall Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {result_data.get('message', 'No message received.')}" | |
| ) | |
| print("Submission successful.") | |
| results_df = pd.DataFrame(results_log) | |
| # Add score details if available | |
| if 'scores' in result_data: | |
| scores_dict = {item['task_id']: item['score'] for item in result_data['scores']} | |
| results_df['Correct'] = results_df['Task ID'].map(lambda x: scores_dict.get(x, None)) | |
| results_df['Correct'] = results_df['Correct'].apply(lambda x: 'Yes' if x == 1 else ('No' if x == 0 else 'N/A')) | |
| return final_status, results_df | |
| except requests.exceptions.HTTPError as e: | |
| error_detail = f"Server responded with status {e.response.status_code}." | |
| try: | |
| error_json = e.response.json() | |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_detail += f" Response: {e.response.text[:500]}" | |
| status_message = f"Submission Failed: {error_detail}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.Timeout: | |
| status_message = "Submission Failed: The submission request timed out." | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.RequestException as e: | |
| status_message = f"Submission Failed: Network error during submission - {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except Exception as e: | |
| status_message = f"An unexpected error occurred during submission processing: {e}" | |
| print(status_message) | |
| import traceback | |
| traceback.print_exc() | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| # --- Build Gradio Interface using Blocks --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# GAIA Agent Evaluation Runner (LangGraph + Mistral)") # Updated title | |
| gr.Markdown( | |
| """ | |
| **Instructions:** | |
| 1. Log in to your Hugging Face account using the button below. | |
| 2. Click 'Run Evaluation & Submit All Answers' to fetch questions, run the agent, submit answers, and see the score. | |
| --- | |
| **Notes:** | |
| * The full evaluation can take **several hours**. Use the logs tab to monitor progress. | |
| * This agent uses `mistralai/Mistral-7B-Instruct-v0.2` and multiple tools. | |
| * Make sure your `HUGGINGFACEHUB_API_TOKEN` secret is set correctly in Settings. | |
| """ | |
| ) | |
| gr.LoginButton() | |
| run_button = gr.Button("Run Evaluation & Submit All Answers") | |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
| results_table = gr.DataFrame(label="Questions, Agent Answers, and Results", wrap=True) | |
| run_button.click( | |
| fn=run_and_submit_all, | |
| outputs=[status_output, results_table] | |
| ) | |
| if __name__ == "__main__": | |
| print("\n" + "-"*30 + " App Starting " + "-"*30) | |
| # Check for SPACE_HOST and SPACE_ID at startup for information | |
| space_host_startup = os.getenv("SPACE_HOST") | |
| space_id_startup = os.getenv("SPACE_ID") | |
| if space_host_startup: | |
| print(f"✅ SPACE_HOST found: {space_host_startup}") | |
| print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
| else: | |
| print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
| if space_id_startup: | |
| print(f"✅ SPACE_ID found: {space_id_startup}") | |
| print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
| else: | |
| print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
| # Add detailed path info for debugging file access | |
| print(f"Script directory (__file__): {os.path.dirname(os.path.realpath(__file__))}") | |
| print(f"Current working directory (os.getcwd()): {os.getcwd()}") | |
| # List files only if the directory exists | |
| try: | |
| print("Files in current working directory:", os.listdir(".")) | |
| except FileNotFoundError: | |
| print("Warning: Could not list current working directory.") | |
| print("-"*(60 + len(" App Starting ")) + "\n") | |
| print("Launching Gradio Interface for GAIA Agent Evaluation...") | |
| # Set queue=True to handle multiple clicks better, though only one run should happen at a time. | |
| demo.queue().launch(debug=True, share=False) | |