| |
| import asyncio |
| import os |
| import sys |
| import logging |
| import random |
| import pandas as pd |
| import requests |
| import wikipedia as wiki |
| from markdownify import markdownify as to_markdown |
| from typing import Any |
| from dotenv import load_dotenv |
|
|
| from smolagents import InferenceClientModel, LiteLLMModel, CodeAgent, ToolCallingAgent, Tool, DuckDuckGoSearchTool |
|
|
| |
| load_dotenv() |
|
|
| |
| |
| |
|
|
| |
| OPENAI_MODEL_NAME = "openai/gpt-4o" |
|
|
| |
| class MathSolver(Tool): |
| name = "math_solver" |
| description = "Safely evaluate basic math expressions." |
| inputs = {"input": {"type": "string", "description": "Math expression to evaluate."}} |
| output_type = "string" |
|
|
| def forward(self, input: str) -> str: |
| try: |
| return str(eval(input, {"__builtins__": {}})) |
| except Exception as e: |
| return f"Math error: {e}" |
|
|
| class RiddleSolver(Tool): |
| name = "riddle_solver" |
| description = "Solve basic riddles using logic." |
| inputs = {"input": {"type": "string", "description": "Riddle prompt."}} |
| output_type = "string" |
|
|
| def forward(self, input: str) -> str: |
| if "forward" in input and "backward" in input: |
| return "A palindrome" |
| return "RiddleSolver failed." |
|
|
| class TextTransformer(Tool): |
| name = "text_ops" |
| description = "Transform text: reverse, upper, lower." |
| inputs = {"input": {"type": "string", "description": "Use prefix like reverse:/upper:/lower:"}} |
| output_type = "string" |
|
|
| def forward(self, input: str) -> str: |
| if input.startswith("reverse:"): |
| reversed_text = input[8:].strip()[::-1] |
| if 'left' in reversed_text.lower(): |
| return "right" |
| return reversed_text |
| if input.startswith("upper:"): |
| return input[6:].strip().upper() |
| if input.startswith("lower:"): |
| return input[6:].strip().lower() |
| return "Unknown transformation." |
|
|
| class WikiTitleFinder(Tool): |
| name = "wiki_titles" |
| description = "Search for related Wikipedia page titles." |
| inputs = {"query": {"type": "string", "description": "Search query."}} |
| output_type = "string" |
|
|
| def forward(self, query: str) -> str: |
| results = wiki.search(query) |
| return ", ".join(results) if results else "No results." |
|
|
| class WikiContentFetcher(Tool): |
| name = "wiki_page" |
| description = "Fetch Wikipedia page content." |
| inputs = {"page_title": {"type": "string", "description": "Wikipedia page title."}} |
| output_type = "string" |
|
|
| def forward(self, page_title: str) -> str: |
| try: |
| return to_markdown(wiki.page(page_title).html()) |
| except wiki.exceptions.PageError: |
| return f"'{page_title}' not found." |
|
|
| |
| class BasicAgent: |
| def __init__(self, provider="openai"): |
| print("BasicAgent initialized.") |
| model = self.select_model(provider) |
| tools = [ |
| DuckDuckGoSearchTool(), |
| WikiTitleFinder(), |
| WikiContentFetcher(), |
| MathSolver(), |
| RiddleSolver(), |
| TextTransformer(), |
| ] |
| self.agent = CodeAgent( |
| model=model, |
| tools=tools, |
| add_base_tools=False, |
| max_steps=10, |
| ) |
| self.agent.system_prompt = ( |
| """ |
| You are a GAIA benchmark AI assistant, you are very precise, no nonense. Your sole purpose is to output the minimal, final answer in the format: |
| [ANSWER] |
| You must NEVER output explanations, intermediate steps, reasoning, or comments β only the answer, strictly enclosed in `[ANSWER]`. |
| ... |
| If the answer is not found, say `[ANSWER] - unknown`. |
| """ |
| ) |
|
|
| def select_model(self, provider: str): |
| return LiteLLMModel(model_id=OPENAI_MODEL_NAME, api_key=os.getenv("OPENAI_API_KEY")) |
|
|
| def __call__(self, question: str) -> str: |
| print(f"Agent received question (first 50 chars): {question[:50]}...") |
| result = self.agent.run(question) |
| final_str = str(result).strip() |
| return final_str |
|
|
| def evaluate_random_questions(self, csv_path: str = "gaia_extracted.csv", sample_size: int = 3, show_steps: bool = True): |
| import pandas as pd |
| from rich.table import Table |
| from rich.console import Console |
|
|
| df = pd.read_csv(csv_path) |
| if not {"question", "answer"}.issubset(df.columns): |
| print("CSV must contain 'question' and 'answer' columns.") |
| print("Found columns:", df.columns.tolist()) |
| return |
|
|
| samples = df.sample(n=sample_size) |
| records = [] |
| correct_count = 0 |
|
|
| for _, row in samples.iterrows(): |
| taskid = row["taskid"].strip() |
| question = row["question"].strip() |
| expected = str(row['answer']).strip() |
| agent_answer = self("taskid: " + taskid + ",\nquestion: " + question).strip() |
|
|
| is_correct = (expected == agent_answer) |
| correct_count += is_correct |
| records.append((question, expected, agent_answer, "β" if is_correct else "β")) |
|
|
| if show_steps: |
| print("---") |
| print("Question:", question) |
| print("Expected:", expected) |
| print("Agent:", agent_answer) |
| print("Correct:", is_correct) |
|
|
| console = Console() |
| table = Table(show_lines=True) |
| table.add_column("Question", overflow="fold") |
| table.add_column("Expected") |
| table.add_column("Agent") |
| table.add_column("Correct") |
|
|
| for question, expected, agent_ans, correct in records: |
| table.add_row(question, expected, agent_ans, correct) |
|
|
| console.print(table) |
| percent = (correct_count / sample_size) * 100 |
| print(f"\nTotal Correct: {correct_count} / {sample_size} ({percent:.2f}%)") |
|
|
| if __name__ == "__main__": |
| args = sys.argv[1:] |
| if not args or args[0] in {"-h", "--help"}: |
| print("Usage: python agent.py [question | dev]") |
| print(" - Provide a question to get a GAIA-style answer.") |
| print(" - Use 'dev' to evaluate 3 random GAIA questions from gaia_qa.csv.") |
| sys.exit(0) |
|
|
| q = " ".join(args) |
| agent = BasicAgent() |
| if q == "dev": |
| agent.evaluate_random_questions() |
| else: |
| print(agent(q)) |
|
|