Final_Assignment_Template / agent_runner.py
hemantvirmani's picture
adding support for langfuse tracking]
a1e2111
"""Agent execution functionality for running questions through the GAIA agent."""
from typing import Optional, Tuple, List, Dict
from colorama import Fore, Style
from agents import MyGAIAAgents
import config
from langfuse_tracking import track_question_processing
class AgentRunner:
"""Handles agent execution and question processing.
"""
def __init__(self, active_agent: str = None):
"""Initialize the AgentRunner.
Args:
active_agent: The agent type to use. If None, uses config.ACTIVE_AGENT.
"""
self.agent = None
self.active_agent = active_agent
def _initialize_agent(self) -> bool:
"""Initialize the agent. Returns True if successful."""
try:
self.agent = MyGAIAAgents(active_agent=self.active_agent)
return True
except Exception as e:
print(f"{Fore.RED}Error instantiating agent: {e}{Style.RESET_ALL}")
return False
def run_on_questions(self, questions_data: List[Dict]) -> Optional[List[Tuple]]:
"""Run agent on a list of questions and return results."""
if not self._initialize_agent():
return None
results = []
total = len(questions_data)
print(f"{Fore.CYAN}Running agent on {total} questions...{Style.RESET_ALL}")
for idx, item in enumerate(questions_data, 1):
task_id = item.get("task_id")
question_text = item.get("question")
file_name = item.get("file_name")
if not task_id or question_text is None:
print(f"\n{Fore.YELLOW}Skipping item with missing task_id or question: {item}{Style.RESET_ALL}\n")
continue
print(f"\n{'#' * config.SEPARATOR_WIDTH}")
print(f"{Fore.CYAN}Processing Question {idx}/{total} - Task ID: {task_id}{Style.RESET_ALL}")
print(f"{'#' * config.SEPARATOR_WIDTH}")
try:
# Track individual question processing with Langfuse
with track_question_processing(task_id, question_text) as span:
answer = self.agent(question_text, file_name=file_name)
if span:
span.update(output={"answer": str(answer)[:300]})
print(f"\n{Fore.GREEN}[RESULT] Task ID: {task_id}{Style.RESET_ALL}")
print(f"Question: {question_text[:config.QUESTION_PREVIEW_LENGTH]}{'...' if len(question_text) > config.QUESTION_PREVIEW_LENGTH else ''}")
print(f"Answer: {answer}")
results.append((task_id, question_text, answer))
except Exception as e:
print(f"{Fore.RED}[ERROR] Exception running agent on task {task_id}: {e}{Style.RESET_ALL}")
error_msg = f"AGENT ERROR: {str(e)[:config.ERROR_MESSAGE_LENGTH]}"
results.append((task_id, question_text, error_msg))
return results