import os import gradio as gr import requests import pandas as pd import re import io import contextlib import json from huggingface_hub import InferenceClient from langchain_community.tools import DuckDuckGoSearchRun from PyPDF2 import PdfReader from docx import Document from youtube_transcript_api import YouTubeTranscriptApi # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.2" PROMPT_TEMPLATE = """[INST]You are a helpful assistant designed to answer questions accurately. You have access to the following tools: {tools_description} To answer the question, you must follow this format, thinking step by step. Thought: Your reasoning and plan for the next step. You can also write down observations here. Action: The tool to use, in the format `tool_name(arg_name="value")`. The available tools are: {tool_names}. Observation: The result from the tool. ... (this Thought/Action/Observation can repeat N times) When you have the final answer, respond with: Thought: I have now found the final answer. Final Answer: The final answer. Important: - Do not use a tool if you are not sure about the parameters. - Do not make up file names. - If a tool is not available for a task (e.g., image analysis), state that you cannot answer. - If a tool returns an error, note it and try an alternative approach if possible. Question: {question} [/INST]{scratchpad}""" # --- Tool Definitions --- class WebSearchTool: """A tool to search the web for information.""" def __init__(self): self.search = DuckDuckGoSearchRun() def __call__(self, query: str): print(f"--- Calling WebSearchTool with query: {query} ---") try: return self.search.run(query) except Exception as e: return f"Error during web search: {e}" @property def description(self): return 'web_search(query: str) -> str - A tool to search the web for information. Use it to find up-to-date information or facts.' class PythonREPLTool: """A tool to execute Python code.""" def __call__(self, code: str): print(f"--- Calling PythonREPLTool with code: {code} ---") if any(keyword in code for keyword in ["os", "sys", "subprocess", "eval", "exec"]): return "Error: Use of os, sys, subprocess, eval, or exec is not allowed for security reasons." local_vars = {} string_io = io.StringIO() try: with contextlib.redirect_stdout(string_io): exec(code, {}, local_vars) output = string_io.getvalue() if not output and local_vars: output = str(list(local_vars.values())[-1]) return output if output else "Code executed with no output." except Exception as e: return f"Error executing code: {e}" @property def description(self): return 'python_repl(code: str) -> str - A Python REPL. Use it to perform calculations, data manipulation, etc. The result of the last line is returned.' class FileReaderTool: """A tool to read the content of a file associated with a task.""" def __init__(self, api_url: str): self.api_url = api_url def __call__(self, task_id: str, file_name: str): print(f"--- Calling FileReaderTool for task_id: {task_id}, file_name: {file_name} ---") if file_name.endswith(('.mp3', '.wav', '.flac')): return "Error: This tool cannot read audio files. Use the `audio_transcriber` tool instead." file_url = f"{self.api_url}/files/{task_id}" try: response = requests.get(file_url, timeout=20) response.raise_for_status() file_content = io.BytesIO(response.content) content = "" if file_name.endswith('.pdf'): pdf = PdfReader(file_content) content = "".join(page.extract_text() for page in pdf.pages if page.extract_text()) elif file_name.endswith('.docx'): doc = Document(file_content) content = "\n".join(para.text for para in doc.paragraphs) elif file_name.endswith('.csv'): df = pd.read_csv(file_content) content = df.to_json(orient='records') # Return JSON for easier processing elif file_name.endswith('.json'): data = json.load(file_content) content = json.dumps(data, indent=2) elif file_name.endswith('.txt'): content = file_content.read().decode('utf-8') elif file_name.endswith('.xlsx'): df = pd.read_excel(file_content, engine='openpyxl') content = df.to_json(orient='records') # Return JSON for easier processing else: return f"Error: Unsupported file type for '{file_name}'. Supported types: .pdf, .docx, .csv, .json, .txt, .xlsx." return content if content else "File is empty." except Exception as e: return f"Error reading file '{file_name}': {e}" @property def description(self): return 'file_reader(task_id: str, file_name: str) -> str - Reads content of text-based files (.pdf, .docx, .csv, .json, .txt, .xlsx). For audio, use audio_transcriber.' class AudioTranscriptionTool: """A tool to transcribe audio files using the Hugging Face Inference API.""" def __init__(self, api_url: str, client: InferenceClient): self.api_url = api_url self.client = client def __call__(self, task_id: str, file_name: str): print(f"--- Calling AudioTranscriptionTool for task: {task_id}, file: {file_name} ---") file_url = f"{self.api_url}/files/{task_id}" try: response = requests.get(file_url, timeout=30) response.raise_for_status() audio_data = response.content # Specify Whisper-large-v2 for accurate transcription transcription = self.client.automatic_speech_recognition(audio_data, model="openai/whisper-large-v2") if transcription and 'text' in transcription: return transcription['text'] else: return "Could not transcribe audio." except Exception as e: return f"Error during audio transcription: {e}" @property def description(self): return 'audio_transcriber(task_id: str, file_name: str) -> str - Transcribes an audio file (.mp3, .wav) associated with the current task.' class YouTubeTranscriptTool: """A tool to fetch the transcript of a YouTube video.""" def __call__(self, video_url: str): print(f"--- Calling YouTubeTranscriptTool for URL: {video_url} ---") match = re.search(r"v=([a-zA-Z0-9_-]+)", video_url) if not match: return "Error: Invalid YouTube URL. Could not extract video ID." video_id = match.group(1) try: transcript_list = YouTubeTranscriptApi.get_transcript(video_id) return " ".join([d['text'] for d in transcript_list]) except Exception as e: return f"Error fetching transcript for video {video_id}: {e}. The video might not have a transcript." @property def description(self): return 'youtube_transcript_fetcher(video_url: str) -> str - Fetches the transcript of a YouTube video. Use for questions about video content.' # --- GAIA Agent Definition --- class GaiaAgent: def __init__(self, hf_token: str, api_url: str, max_turns: int = 8): if not hf_token: raise ValueError("Hugging Face token is required for the Inference API.") self.llm_client = InferenceClient(model=MODEL_ID, token=hf_token) self.max_turns = max_turns self.tools = { "web_search": WebSearchTool(), "python_repl": PythonREPLTool(), "file_reader": FileReaderTool(api_url=api_url), "youtube_transcript_fetcher": YouTubeTranscriptTool(), "audio_transcriber": AudioTranscriptionTool(api_url=api_url, client=self.llm_client), } self.tools_description = "\n".join([f"- `{tool.description}`" for tool in self.tools.values()]) self.tool_names = ", ".join(self.tools.keys()) print("GaiaAgent initialized successfully with tools:", self.tool_names) def __call__(self, question: str, task_id: str) -> str: print(f"\n--- Running agent on task {task_id} ---") print(f"Question: {question[:100]}...") scratchpad = "" for turn in range(self.max_turns): print(f"Turn {turn + 1}/{self.max_turns}") prompt = PROMPT_TEMPLATE.format( tools_description=self.tools_description, tool_names=self.tool_names, question=question, scratchpad=scratchpad, ) try: llm_output = self.llm_client.text_generation( prompt, max_new_tokens=1024, stop_sequences=["Observation:", "[/INST]"], temperature=0.1 ).strip() except Exception as e: return f"Error: LLM call failed. {e}" print(f"LLM Output:\n{llm_output}") scratchpad += llm_output final_answer_match = re.search(r"Final Answer:\s*(.*)", scratchpad, re.DOTALL) action_match = re.search(r"Action:\s*([a-zA-Z0-9_]+)\((.*)\)", llm_output, re.DOTALL) if final_answer_match: return final_answer_match.group(1).strip() elif action_match: tool_name = action_match.group(1).strip() tool_args_str = action_match.group(2).strip() if tool_name not in self.tools: observation = f"Error: Unknown tool '{tool_name}'." else: try: args_dict = eval(f"dict({tool_args_str})", {"__builtins__": None}, {}) if tool_name in ['file_reader', 'audio_transcriber']: args_dict['task_id'] = task_id tool = self.tools[tool_name] observation = tool(**args_dict) except Exception as e: observation = f"Error executing tool '{tool_name}': {e}" print(f"Observation: {str(observation)[:200]}...") scratchpad += f"\nObservation: {str(observation)}\n" else: scratchpad += "\nObservation: No valid action or final answer found. Please format your response as either 'Action: tool_name(args)' or 'Final Answer: your_answer'." return "Agent stopped after reaching maximum turns." # --- Main Submission Logic --- def run_and_submit_all(profile: gr.OAuthProfile | None): hf_token = os.getenv("HF_TOKEN") if not hf_token: return "Error: `HF_TOKEN` secret not set. Please add it to your Space secrets.", None space_id = "ZeroTimo/RobotPai" if not space_id: return "Error: `SPACE_ID` not found. Are you in a Hugging Face Space?", None if not profile: return "Please Login to Hugging Face with the button to submit.", None username = profile.username api_url = DEFAULT_API_URL agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" try: agent = GaiaAgent(hf_token=hf_token, api_url=api_url) except Exception as e: return f"Error initializing agent: {e}", None try: response = requests.get(f"{api_url}/questions", timeout=15) response.raise_for_status() questions_data = response.json() except Exception as e: return f"Error fetching questions: {e}", None results_log, answers_payload = [], [] for item in questions_data: task_id, question_text = item.get("task_id"), item.get("question") if not all([task_id, question_text]): continue try: answer = agent(question_text, task_id) answers_payload.append({"task_id": task_id, "submitted_answer": answer}) results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": answer}) except Exception as e: results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) if not answers_payload: return "Agent did not produce any answers.", pd.DataFrame(results_log) submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} try: response = requests.post(f"{api_url}/submit", json=submission_data, timeout=120) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful! Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')})" ) return final_status, pd.DataFrame(results_log) except requests.exceptions.RequestException as e: error_detail = f"Server responded with status {e.response.status_code}. Response: {e.response.text[:500]}" if e.response else str(e) return f"Submission Failed: {error_detail}", pd.DataFrame(results_log) # --- Gradio Interface --- with gr.Blocks() as demo: gr.Markdown("# GAIA Agent Evaluation Runner") gr.Markdown( """ **Instructions:** 1. **Add your HF Token**: Go to your Space's **Settings** and add a secret named `HF_TOKEN` with your Hugging Face `read` token. 2. **Login**: Use the button below to login with your Hugging Face account. 3. **Run**: Click 'Run Evaluation & Submit' to start the agent. This may take several minutes. """ ) with gr.Row(): gr.LoginButton() run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary") status_output = gr.Textbox(label="Run Status / Submission Result", lines=4, interactive=False) results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) if __name__ == "__main__": if not os.getenv("HF_TOKEN"): print("⚠️ WARNING: `HF_TOKEN` secret not found. The agent will not run.") demo.launch()