yushnitp's picture
Update agent.py
159f73b verified
# --- Basic Agent Definition ---
import asyncio
import os
import sys
import logging
import random
import pandas as pd
import requests
import wikipedia as wiki
from markdownify import markdownify as to_markdown
from typing import Any
from dotenv import load_dotenv
from smolagents import InferenceClientModel, LiteLLMModel, CodeAgent, ToolCallingAgent, Tool, DuckDuckGoSearchTool
# Load environment
load_dotenv()
# Logging
# logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
# logger = logging.getLogger(__name__)
# --- Model Configuration ---
OPENAI_MODEL_NAME = "openai/gpt-4o"
# --- Tool Definitions ---
class MathSolver(Tool):
name = "math_solver"
description = "Safely evaluate basic math expressions."
inputs = {"input": {"type": "string", "description": "Math expression to evaluate."}}
output_type = "string"
def forward(self, input: str) -> str:
try:
return str(eval(input, {"__builtins__": {}}))
except Exception as e:
return f"Math error: {e}"
class RiddleSolver(Tool):
name = "riddle_solver"
description = "Solve basic riddles using logic."
inputs = {"input": {"type": "string", "description": "Riddle prompt."}}
output_type = "string"
def forward(self, input: str) -> str:
if "forward" in input and "backward" in input:
return "A palindrome"
return "RiddleSolver failed."
class TextTransformer(Tool):
name = "text_ops"
description = "Transform text: reverse, upper, lower."
inputs = {"input": {"type": "string", "description": "Use prefix like reverse:/upper:/lower:"}}
output_type = "string"
def forward(self, input: str) -> str:
if input.startswith("reverse:"):
reversed_text = input[8:].strip()[::-1]
if 'left' in reversed_text.lower():
return "right"
return reversed_text
if input.startswith("upper:"):
return input[6:].strip().upper()
if input.startswith("lower:"):
return input[6:].strip().lower()
return "Unknown transformation."
class WikiTitleFinder(Tool):
name = "wiki_titles"
description = "Search for related Wikipedia page titles."
inputs = {"query": {"type": "string", "description": "Search query."}}
output_type = "string"
def forward(self, query: str) -> str:
results = wiki.search(query)
return ", ".join(results) if results else "No results."
class WikiContentFetcher(Tool):
name = "wiki_page"
description = "Fetch Wikipedia page content."
inputs = {"page_title": {"type": "string", "description": "Wikipedia page title."}}
output_type = "string"
def forward(self, page_title: str) -> str:
try:
return to_markdown(wiki.page(page_title).html())
except wiki.exceptions.PageError:
return f"'{page_title}' not found."
# --- Basic Agent Definition ---
class BasicAgent:
def __init__(self, provider="openai"):
print("BasicAgent initialized.")
model = self.select_model(provider)
tools = [
DuckDuckGoSearchTool(),
WikiTitleFinder(),
WikiContentFetcher(),
MathSolver(),
RiddleSolver(),
TextTransformer(),
]
self.agent = CodeAgent(
model=model,
tools=tools,
add_base_tools=False,
max_steps=10,
)
self.agent.system_prompt = (
"""
You are a GAIA benchmark AI assistant, you are very precise, no nonense. Your sole purpose is to output the minimal, final answer in the format:
[ANSWER]
You must NEVER output explanations, intermediate steps, reasoning, or comments β€” only the answer, strictly enclosed in `[ANSWER]`.
...
If the answer is not found, say `[ANSWER] - unknown`.
"""
)
def select_model(self, provider: str):
return LiteLLMModel(model_id=OPENAI_MODEL_NAME, api_key=os.getenv("OPENAI_API_KEY"))
def __call__(self, question: str) -> str:
print(f"Agent received question (first 50 chars): {question[:50]}...")
result = self.agent.run(question)
final_str = str(result).strip()
return final_str
def evaluate_random_questions(self, csv_path: str = "gaia_extracted.csv", sample_size: int = 3, show_steps: bool = True):
import pandas as pd
from rich.table import Table
from rich.console import Console
df = pd.read_csv(csv_path)
if not {"question", "answer"}.issubset(df.columns):
print("CSV must contain 'question' and 'answer' columns.")
print("Found columns:", df.columns.tolist())
return
samples = df.sample(n=sample_size)
records = []
correct_count = 0
for _, row in samples.iterrows():
taskid = row["taskid"].strip()
question = row["question"].strip()
expected = str(row['answer']).strip()
agent_answer = self("taskid: " + taskid + ",\nquestion: " + question).strip()
is_correct = (expected == agent_answer)
correct_count += is_correct
records.append((question, expected, agent_answer, "βœ“" if is_correct else "βœ—"))
if show_steps:
print("---")
print("Question:", question)
print("Expected:", expected)
print("Agent:", agent_answer)
print("Correct:", is_correct)
console = Console()
table = Table(show_lines=True)
table.add_column("Question", overflow="fold")
table.add_column("Expected")
table.add_column("Agent")
table.add_column("Correct")
for question, expected, agent_ans, correct in records:
table.add_row(question, expected, agent_ans, correct)
console.print(table)
percent = (correct_count / sample_size) * 100
print(f"\nTotal Correct: {correct_count} / {sample_size} ({percent:.2f}%)")
if __name__ == "__main__":
args = sys.argv[1:]
if not args or args[0] in {"-h", "--help"}:
print("Usage: python agent.py [question | dev]")
print(" - Provide a question to get a GAIA-style answer.")
print(" - Use 'dev' to evaluate 3 random GAIA questions from gaia_qa.csv.")
sys.exit(0)
q = " ".join(args)
agent = BasicAgent()
if q == "dev":
agent.evaluate_random_questions()
else:
print(agent(q))