|
|
import os |
|
|
import gradio as gr |
|
|
import requests |
|
|
import pandas as pd |
|
|
import re |
|
|
import io |
|
|
import contextlib |
|
|
from huggingface_hub import InferenceClient |
|
|
from langchain_community.tools import DuckDuckGoSearchRun |
|
|
from PyPDF2 import PdfReader |
|
|
from docx import Document |
|
|
import json |
|
|
|
|
|
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
|
|
MODEL_ID = "NousResearch/Hermes-2-Pro-Mistral-7B" |
|
|
|
|
|
PROMPT_TEMPLATE = """<|im_start|>system |
|
|
You are a helpful assistant designed to answer questions accurately. You have access to the following tools: |
|
|
|
|
|
{tools_description} |
|
|
|
|
|
To answer the question, you must follow this format, thinking step by step. |
|
|
|
|
|
Thought: Your reasoning and plan for the next step. You can also write down observations here. |
|
|
Action: The tool to use, in the format `tool_name(arg_name="value")`. The available tools are: {tool_names}. |
|
|
Observation: The result from the tool. |
|
|
... (this Thought/Action/Observation can repeat N times) |
|
|
|
|
|
When you have the final answer, respond with: |
|
|
Thought: I have now found the final answer. |
|
|
Final Answer: The final answer. |
|
|
|
|
|
Do not use a tool if you are not sure about the parameters. Do not make up file names. |
|
|
Question: {question}<|im_end|> |
|
|
<|im_start|>assistant |
|
|
{scratchpad}""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WebSearchTool: |
|
|
"""A tool to search the web for information.""" |
|
|
def __init__(self): |
|
|
self.search = DuckDuckGoSearchRun() |
|
|
|
|
|
def __call__(self, query: str): |
|
|
""" |
|
|
Searches the web for the given query. |
|
|
Args: |
|
|
query (str): The search query. |
|
|
Returns: |
|
|
str: The search results. |
|
|
""" |
|
|
print(f"--- Calling WebSearchTool with query: {query} ---") |
|
|
try: |
|
|
return self.search.run(query) |
|
|
except Exception as e: |
|
|
return f"Error during web search: {e}" |
|
|
|
|
|
@property |
|
|
def description(self): |
|
|
return 'web_search(query: str) -> str - A tool to search the web for information. Use it to find up-to-date information or facts.' |
|
|
|
|
|
class PythonREPLTool: |
|
|
"""A tool to execute Python code.""" |
|
|
def __call__(self, code: str): |
|
|
""" |
|
|
Executes Python code and returns the output. |
|
|
Args: |
|
|
code (str): The Python code to execute. |
|
|
Returns: |
|
|
str: The output of the executed code. |
|
|
""" |
|
|
print(f"--- Calling PythonREPLTool with code: {code} ---") |
|
|
if "os" in code or "sys" in code or "subprocess" in code: |
|
|
return "Error: Use of os, sys, or subprocess is not allowed." |
|
|
|
|
|
local_vars = {} |
|
|
string_io = io.StringIO() |
|
|
try: |
|
|
with contextlib.redirect_stdout(string_io): |
|
|
exec(code, {}, local_vars) |
|
|
output = string_io.getvalue() |
|
|
if not output and local_vars: |
|
|
|
|
|
output = str(list(local_vars.values())[-1]) |
|
|
return output if output else "Code executed with no output." |
|
|
except Exception as e: |
|
|
return f"Error executing code: {e}" |
|
|
|
|
|
@property |
|
|
def description(self): |
|
|
return 'python_repl(code: str) -> str - A Python REPL. Use it to perform calculations, data manipulation, etc. The result of the last line is returned.' |
|
|
|
|
|
class FileReaderTool: |
|
|
"""A tool to read the content of a file associated with a task.""" |
|
|
def __init__(self, api_url: str): |
|
|
self.api_url = api_url |
|
|
|
|
|
def __call__(self, task_id: str, file_name: str): |
|
|
""" |
|
|
Reads the content of a file. |
|
|
Args: |
|
|
task_id (str): The ID of the task the file is associated with. |
|
|
file_name (str): The name of the file to read. The LLM must infer this from the question. |
|
|
Returns: |
|
|
str: The content of the file. |
|
|
""" |
|
|
print(f"--- Calling FileReaderTool for task_id: {task_id}, file_name: {file_name} ---") |
|
|
file_url = f"{self.api_url}/files/{task_id}" |
|
|
|
|
|
try: |
|
|
response = requests.get(file_url, timeout=20) |
|
|
response.raise_for_status() |
|
|
|
|
|
content = "" |
|
|
file_content = io.BytesIO(response.content) |
|
|
|
|
|
if file_name.endswith('.pdf'): |
|
|
pdf = PdfReader(file_content) |
|
|
for page in pdf.pages: |
|
|
content += page.extract_text() if page.extract_text() else "" |
|
|
elif file_name.endswith('.docx'): |
|
|
doc = Document(file_content) |
|
|
for para in doc.paragraphs: |
|
|
content += para.text + '\n' |
|
|
elif file_name.endswith('.csv'): |
|
|
df = pd.read_csv(file_content) |
|
|
content = df.to_string() |
|
|
elif file_name.endswith('.json'): |
|
|
data = json.load(file_content) |
|
|
content = json.dumps(data, indent=2) |
|
|
elif file_name.endswith('.txt'): |
|
|
content = file_content.read().decode('utf-8') |
|
|
else: |
|
|
return f"Error: Unsupported file type for '{file_name}'. Supported types: .pdf, .docx, .csv, .json, .txt." |
|
|
|
|
|
return content if content else "File is empty." |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
return f"Error downloading file: {e}" |
|
|
except Exception as e: |
|
|
return f"Error reading file '{file_name}': {e}" |
|
|
|
|
|
@property |
|
|
def description(self): |
|
|
return 'file_reader(task_id: str, file_name: str) -> str - Reads the content of a file associated with the current task. Use the file name mentioned in the question.' |
|
|
|
|
|
|
|
|
|
|
|
class GaiaAgent: |
|
|
def __init__(self, hf_token: str, api_url: str, max_turns: int = 8): |
|
|
print("GaiaAgent initializing...") |
|
|
if not hf_token: |
|
|
raise ValueError("Hugging Face token is required for the Inference API.") |
|
|
|
|
|
self.llm_client = InferenceClient(model=MODEL_ID, token=hf_token) |
|
|
self.max_turns = max_turns |
|
|
|
|
|
|
|
|
self.tools = { |
|
|
"web_search": WebSearchTool(), |
|
|
"python_repl": PythonREPLTool(), |
|
|
"file_reader": FileReaderTool(api_url=api_url), |
|
|
} |
|
|
self.tools_description = "\n".join([f"- `{tool.description}`" for tool in self.tools.values()]) |
|
|
self.tool_names = ", ".join(self.tools.keys()) |
|
|
print("GaiaAgent initialized successfully.") |
|
|
|
|
|
def __call__(self, question: str, task_id: str) -> str: |
|
|
print(f"\n--- Running agent on task {task_id} ---") |
|
|
print(f"Question: {question[:100]}...") |
|
|
|
|
|
scratchpad = "" |
|
|
|
|
|
for turn in range(self.max_turns): |
|
|
print(f"Turn {turn + 1}/{self.max_turns}") |
|
|
|
|
|
|
|
|
prompt = PROMPT_TEMPLATE.format( |
|
|
tools_description=self.tools_description, |
|
|
tool_names=self.tool_names, |
|
|
question=question, |
|
|
scratchpad=scratchpad, |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
llm_output = self.llm_client.text_generation( |
|
|
prompt, max_new_tokens=1024, stop_sequences=["<|im_end|>", "Observation:"], temperature=0.1 |
|
|
).strip() |
|
|
except Exception as e: |
|
|
print(f"LLM API call failed: {e}") |
|
|
return f"Error: LLM call failed. {e}" |
|
|
|
|
|
print(f"LLM Output:\n{llm_output}") |
|
|
scratchpad += llm_output |
|
|
|
|
|
|
|
|
final_answer_match = re.search(r"Final Answer:\s*(.*)", scratchpad, re.DOTALL) |
|
|
action_match = re.search(r"Action:\s*([a-zA-Z0-9_]+)\((.*)\)", llm_output) |
|
|
|
|
|
if final_answer_match: |
|
|
answer = final_answer_match.group(1).strip() |
|
|
print(f"Final Answer Found: {answer}") |
|
|
return answer |
|
|
|
|
|
elif action_match: |
|
|
tool_name = action_match.group(1).strip() |
|
|
tool_args_str = action_match.group(2).strip() |
|
|
|
|
|
if tool_name not in self.tools: |
|
|
observation = f"Error: Unknown tool '{tool_name}'. Available tools: {self.tool_names}" |
|
|
else: |
|
|
try: |
|
|
|
|
|
args_dict = eval(f"dict({tool_args_str})", {"__builtins__": None}, {}) |
|
|
|
|
|
if tool_name == 'file_reader': |
|
|
args_dict['task_id'] = task_id |
|
|
|
|
|
tool = self.tools[tool_name] |
|
|
observation = tool(**args_dict) |
|
|
except Exception as e: |
|
|
observation = f"Error executing tool '{tool_name}': {e}" |
|
|
|
|
|
print(f"Observation: {str(observation)[:200]}...") |
|
|
scratchpad += f"\nObservation: {str(observation)}\n" |
|
|
else: |
|
|
print("No valid action or final answer found in LLM output. Continuing thought process.") |
|
|
scratchpad += "\nObservation: No valid action taken. Please either use a tool with the correct format `Action: tool_name(arg_name=\"value\")` or provide the final answer in the format `Final Answer: your_answer`." |
|
|
|
|
|
print("Agent reached max turns.") |
|
|
return "Agent stopped after reaching maximum turns." |
|
|
|
|
|
|
|
|
|
|
|
def run_and_submit_all(profile: gr.OAuthProfile | None): |
|
|
hf_token = os.getenv("HF_TOKEN") |
|
|
if not hf_token: |
|
|
return "Error: `HF_TOKEN` environment variable not set. Please add it to your Space secrets.", None |
|
|
|
|
|
space_id = os.getenv("SPACE_ID") |
|
|
if not space_id: |
|
|
return "Error: `SPACE_ID` environment variable not found. Are you running in a Hugging Face Space?", None |
|
|
|
|
|
if not profile: |
|
|
return "Please Login to Hugging Face with the button to submit.", None |
|
|
|
|
|
username = profile.username |
|
|
print(f"User logged in: {username}") |
|
|
|
|
|
api_url = DEFAULT_API_URL |
|
|
questions_url = f"{api_url}/questions" |
|
|
submit_url = f"{api_url}/submit" |
|
|
|
|
|
|
|
|
try: |
|
|
agent = GaiaAgent(hf_token=hf_token, api_url=api_url) |
|
|
except Exception as e: |
|
|
print(f"Error instantiating agent: {e}") |
|
|
return f"Error initializing agent: {e}", None |
|
|
|
|
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
|
print(f"Code link: {agent_code}") |
|
|
|
|
|
|
|
|
try: |
|
|
response = requests.get(questions_url, timeout=15) |
|
|
response.raise_for_status() |
|
|
questions_data = response.json() |
|
|
if not questions_data: |
|
|
return "Fetched questions list is empty or invalid format.", None |
|
|
print(f"Fetched {len(questions_data)} questions.") |
|
|
except Exception as e: |
|
|
return f"Error fetching questions: {e}", None |
|
|
|
|
|
|
|
|
results_log = [] |
|
|
answers_payload = [] |
|
|
print(f"Running agent on {len(questions_data)} questions...") |
|
|
for item in questions_data: |
|
|
task_id = item.get("task_id") |
|
|
question_text = item.get("question") |
|
|
if not task_id or question_text is None: |
|
|
continue |
|
|
try: |
|
|
submitted_answer = agent(question_text, task_id) |
|
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
|
|
except Exception as e: |
|
|
print(f"Error running agent on task {task_id}: {e}") |
|
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
|
|
|
|
if not answers_payload: |
|
|
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) |
|
|
|
|
|
|
|
|
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} |
|
|
print(f"Submitting {len(answers_payload)} answers for user '{username}'...") |
|
|
|
|
|
try: |
|
|
response = requests.post(submit_url, json=submission_data, timeout=120) |
|
|
response.raise_for_status() |
|
|
result_data = response.json() |
|
|
final_status = ( |
|
|
f"Submission Successful!\n" |
|
|
f"User: {result_data.get('username')}\n" |
|
|
f"Overall Score: {result_data.get('score', 'N/A')}% " |
|
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" |
|
|
f"Message: {result_data.get('message', 'No message received.')}" |
|
|
) |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return final_status, results_df |
|
|
except requests.exceptions.RequestException as e: |
|
|
error_detail = "Network error or server responded with an error." |
|
|
if e.response is not None: |
|
|
error_detail = f"Server responded with status {e.response.status_code}. Response: {e.response.text[:500]}" |
|
|
status_message = f"Submission Failed: {error_detail}" |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
except Exception as e: |
|
|
status_message = f"An unexpected error occurred during submission: {e}" |
|
|
results_df = pd.DataFrame(results_log) |
|
|
return status_message, results_df |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# GAIA Agent Evaluation Runner") |
|
|
gr.Markdown( |
|
|
""" |
|
|
**Instructions:** |
|
|
|
|
|
1. **Add your HF Token**: Go to the 'Settings' tab of this Space and add a secret named `HF_TOKEN` with your Hugging Face read token. |
|
|
2. **Login**: Log in to your Hugging Face account using the button below. This is required for submission. |
|
|
3. **Run**: Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. |
|
|
--- |
|
|
**Disclaimer:** |
|
|
This process can take several minutes as the agent processes each question. Please be patient. |
|
|
""" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
gr.LoginButton() |
|
|
run_button = gr.Button("Run Evaluation & Submit All Answers", variant="primary") |
|
|
|
|
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
|
|
|
|
run_button.click( |
|
|
fn=run_and_submit_all, |
|
|
outputs=[status_output, results_table] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
print("\n" + "-"*30 + " App Starting " + "-"*30) |
|
|
if not os.getenv("HF_TOKEN"): |
|
|
print("⚠️ WARNING: `HF_TOKEN` secret not found. The agent will not be able to run.") |
|
|
else: |
|
|
print("✅ `HF_TOKEN` secret found.") |
|
|
|
|
|
space_id_startup = os.getenv("SPACE_ID") |
|
|
if space_id_startup: |
|
|
print(f"✅ SPACE_ID found: {space_id_startup}") |
|
|
else: |
|
|
print("ℹ️ SPACE_ID environment variable not found (running locally?).") |
|
|
|
|
|
print("-"*(60 + len(" App Starting ")) + "\n") |
|
|
print("Launching Gradio Interface for GAIA Agent Evaluation...") |
|
|
demo.launch() |