File size: 2,941 Bytes
fb9fdbd b06fdec 407e466 b06fdec a1e2111 fb9fdbd 57e2365 fb9fdbd 57e2365 fb9fdbd 4db235f fb9fdbd 57e2365 fb9fdbd b06fdec fb9fdbd 4db235f fb9fdbd b06fdec fb9fdbd b06fdec fb9fdbd b06fdec fb9fdbd a1e2111 b06fdec fb9fdbd b06fdec fb9fdbd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | """Agent execution functionality for running questions through the GAIA agent."""
from typing import Optional, Tuple, List, Dict
from colorama import Fore, Style
from agents import MyGAIAAgents
import config
from langfuse_tracking import track_question_processing
class AgentRunner:
"""Handles agent execution and question processing.
"""
def __init__(self, active_agent: str = None):
"""Initialize the AgentRunner.
Args:
active_agent: The agent type to use. If None, uses config.ACTIVE_AGENT.
"""
self.agent = None
self.active_agent = active_agent
def _initialize_agent(self) -> bool:
"""Initialize the agent. Returns True if successful."""
try:
self.agent = MyGAIAAgents(active_agent=self.active_agent)
return True
except Exception as e:
print(f"{Fore.RED}Error instantiating agent: {e}{Style.RESET_ALL}")
return False
def run_on_questions(self, questions_data: List[Dict]) -> Optional[List[Tuple]]:
"""Run agent on a list of questions and return results."""
if not self._initialize_agent():
return None
results = []
total = len(questions_data)
print(f"{Fore.CYAN}Running agent on {total} questions...{Style.RESET_ALL}")
for idx, item in enumerate(questions_data, 1):
task_id = item.get("task_id")
question_text = item.get("question")
file_name = item.get("file_name")
if not task_id or question_text is None:
print(f"\n{Fore.YELLOW}Skipping item with missing task_id or question: {item}{Style.RESET_ALL}\n")
continue
print(f"\n{'#' * config.SEPARATOR_WIDTH}")
print(f"{Fore.CYAN}Processing Question {idx}/{total} - Task ID: {task_id}{Style.RESET_ALL}")
print(f"{'#' * config.SEPARATOR_WIDTH}")
try:
# Track individual question processing with Langfuse
with track_question_processing(task_id, question_text) as span:
answer = self.agent(question_text, file_name=file_name)
if span:
span.update(output={"answer": str(answer)[:300]})
print(f"\n{Fore.GREEN}[RESULT] Task ID: {task_id}{Style.RESET_ALL}")
print(f"Question: {question_text[:config.QUESTION_PREVIEW_LENGTH]}{'...' if len(question_text) > config.QUESTION_PREVIEW_LENGTH else ''}")
print(f"Answer: {answer}")
results.append((task_id, question_text, answer))
except Exception as e:
print(f"{Fore.RED}[ERROR] Exception running agent on task {task_id}: {e}{Style.RESET_ALL}")
error_msg = f"AGENT ERROR: {str(e)[:config.ERROR_MESSAGE_LENGTH]}"
results.append((task_id, question_text, error_msg))
return results
|