# Base agent wrapper and configuration classes from typing import Dict, Any, Optional, List from dataclasses import dataclass from pydantic import BaseModel import asyncio import json from openai import AsyncOpenAI from agents import Agent, Runner, ModelSettings from ankigen_core.logging import logger from .token_tracker import track_usage_from_agents_sdk def parse_agent_json_response(response: Any) -> Dict[str, Any]: """Parse agent response, handling markdown code blocks if present""" if isinstance(response, str): # Strip markdown code blocks response = response.strip() if response.startswith("```json"): response = response[7:] # Remove ```json if response.startswith("```"): response = response[3:] # Remove ``` if response.endswith("```"): response = response[:-3] # Remove trailing ``` response = response.strip() return json.loads(response) else: return response @dataclass class AgentConfig: """Configuration for individual agents""" name: str instructions: str model: str = "gpt-5.2" reasoning_effort: Optional[str] = None temperature: float = 0.7 max_tokens: Optional[int] = None timeout: float = 30.0 retry_attempts: int = 3 enable_tracing: bool = True custom_prompts: Optional[Dict[str, str]] = None output_type: Optional[type] = None # For structured outputs def __post_init__(self): if self.custom_prompts is None: self.custom_prompts = {} class BaseAgentWrapper: """Base wrapper for OpenAI Agents SDK integration""" def __init__(self, config: AgentConfig, openai_client: AsyncOpenAI): self.config = config self.openai_client = openai_client self.agent = None self.runner = None async def initialize(self): """Initialize the OpenAI agent with structured output support""" try: # Set the default OpenAI client for the agents SDK from agents import set_default_openai_client set_default_openai_client(self.openai_client, use_for_tracing=False) # Create model settings with temperature and optional reasoning effort model_settings_kwargs = {"temperature": self.config.temperature} effort = self.config.reasoning_effort if effort in ("auto", "", None): effort = None # GPT-5.x (not chat-latest) supports reasoning_effort if ( effort and self.config.model.startswith("gpt-5") and "chat-latest" not in self.config.model ): from openai.types.shared import Reasoning model_settings_kwargs["reasoning"] = Reasoning(effort=effort) model_settings = ModelSettings(**model_settings_kwargs) # Use clean instructions without JSON formatting hacks clean_instructions = self.config.instructions # Create agent with structured output if output_type is provided if self.config.output_type: self.agent = Agent( name=self.config.name, instructions=clean_instructions, model=self.config.model, model_settings=model_settings, output_type=self.config.output_type, ) logger.info( f"Initialized agent with structured output: {self.config.name} -> {self.config.output_type}" ) else: self.agent = Agent( name=self.config.name, instructions=clean_instructions, model=self.config.model, model_settings=model_settings, ) logger.info( f"Initialized agent (no structured output): {self.config.name}" ) except Exception as e: logger.error(f"Failed to initialize agent {self.config.name}: {e}") raise def _enhance_input_with_context( self, user_input: str, context: Optional[Dict[str, Any]] ) -> str: """Add context to user input if provided.""" if context is None: return user_input context_str = "\n".join([f"{k}: {v}" for k, v in context.items()]) return f"{user_input}\n\nContext:\n{context_str}" async def _execute_with_retry(self, enhanced_input: str) -> Any: """Execute agent with retry logic on timeout.""" for attempt in range(self.config.retry_attempts): try: result = await asyncio.wait_for( Runner.run( starting_agent=self.agent, input=enhanced_input, ), timeout=self.config.timeout, ) return result except asyncio.TimeoutError: if attempt < self.config.retry_attempts - 1: logger.warning( f"Agent {self.config.name} timed out " f"(attempt {attempt + 1}/{self.config.retry_attempts}), retrying..." ) continue logger.error( f"Agent {self.config.name} timed out after {self.config.retry_attempts} attempts" ) raise raise RuntimeError("Retry loop exited without result") def _extract_and_track_usage(self, result: Any) -> Dict[str, Any]: """Extract usage info from result and track it.""" total_usage = { "input_tokens": 0, "output_tokens": 0, "total_tokens": 0, "requests": 0, } if hasattr(result, "raw_responses") and result.raw_responses: for response in result.raw_responses: if hasattr(response, "usage") and response.usage: total_usage["input_tokens"] += response.usage.input_tokens total_usage["output_tokens"] += response.usage.output_tokens total_usage["total_tokens"] += response.usage.total_tokens total_usage["requests"] += response.usage.requests track_usage_from_agents_sdk(total_usage, self.config.model) logger.info(f"Agent usage: {total_usage}") return total_usage def _extract_output(self, result: Any) -> Any: """Extract final output from agent result.""" if not (hasattr(result, "new_items") and result.new_items): return str(result) from agents.items import ItemHelpers text_output = ItemHelpers.text_message_outputs(result.new_items) if self.config.output_type and self.config.output_type is not str: logger.info( f"Structured output: {type(text_output)} -> {self.config.output_type}" ) return text_output async def execute( self, user_input: str, context: Optional[Dict[str, Any]] = None ) -> tuple[Any, Dict[str, Any]]: """Execute the agent with user input and optional context.""" if not self.agent: await self.initialize() if self.agent is None: raise ValueError("Agent not initialized") enhanced_input = self._enhance_input_with_context(user_input, context) logger.info(f"Executing agent: {self.config.name}") logger.info(f"Input: {enhanced_input[:200]}...") import time start_time = time.time() try: result = await self._execute_with_retry(enhanced_input) execution_time = time.time() - start_time logger.info(f"Agent {self.config.name} executed in {execution_time:.2f}s") total_usage = self._extract_and_track_usage(result) output = self._extract_output(result) return output, total_usage except asyncio.TimeoutError: logger.error( f"Agent {self.config.name} timed out after {self.config.timeout}s" ) raise except Exception as e: logger.error(f"Agent {self.config.name} execution failed: {e}") raise async def handoff_to( self, target_agent: "BaseAgentWrapper", context: Dict[str, Any] ) -> Any: """Hand off execution to another agent with context""" logger.info( f"Handing off from {self.config.name} to {target_agent.config.name}" ) # Prepare handoff context handoff_context = { "from_agent": self.config.name, "handoff_reason": context.get("reason", "Standard workflow handoff"), **context, } # Execute the target agent return await target_agent.execute( context.get("user_input", "Continue processing"), handoff_context ) class AgentResponse(BaseModel): """Standard response format for agents""" success: bool data: Any agent_name: str metadata: Dict[str, Any] = {} errors: List[str] = []