File size: 12,601 Bytes
d09f6aa 9b095d8 7e2bb59 9b095d8 d09f6aa 9b095d8 100024e d09f6aa 9b095d8 d09f6aa 9b095d8 d09f6aa 9b095d8 d09f6aa 100024e d09f6aa 100024e d09f6aa 100024e d09f6aa 100024e d09f6aa 100024e d09f6aa 100024e d09f6aa 100024e d09f6aa 100024e d09f6aa 100024e d09f6aa 100024e d09f6aa 93bd7fb d09f6aa 9b095d8 c467eef 9b095d8 c467eef 9b095d8 c467eef 9b095d8 100024e 9b095d8 d09f6aa 9b095d8 d09f6aa 9b095d8 d09f6aa 9b095d8 d09f6aa 9b095d8 d09f6aa 9b095d8 d09f6aa 9b095d8 d09f6aa 9b095d8 d09f6aa 9b095d8 d09f6aa 100024e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 | # Module for OpenAI client management and API call logic
import asyncio
import time
from typing import Optional, TypeVar
from agents import Agent, ModelSettings, Runner, set_default_openai_client
from openai import (
APIConnectionError,
APIStatusError,
AsyncOpenAI,
OpenAIError,
RateLimitError,
)
from pydantic import BaseModel
from ankigen_core.logging import logger
from ankigen_core.utils import ResponseCache
T = TypeVar("T", bound=BaseModel)
class OpenAIClientManager:
"""Manages the AsyncOpenAI client instance."""
def __init__(self):
self._client: Optional[AsyncOpenAI] = None
self._api_key: Optional[str] = None
async def initialize_client(self, api_key: str):
"""Initializes the AsyncOpenAI client with the given API key."""
if not api_key or not api_key.startswith("sk-"):
logger.error("Invalid OpenAI API key provided for client initialization.")
raise ValueError("Invalid OpenAI API key format.")
self._api_key = api_key
try:
self._client = AsyncOpenAI(api_key=self._api_key)
logger.info("AsyncOpenAI client initialized successfully.")
except OpenAIError as e: # Catch specific OpenAI errors
logger.error(f"Failed to initialize AsyncOpenAI client: {e}", exc_info=True)
self._client = None # Ensure client is None on failure
raise # Re-raise the OpenAIError to be caught by UI
except Exception as e: # Catch any other unexpected errors
logger.error(
f"An unexpected error occurred during AsyncOpenAI client initialization: {e}",
exc_info=True,
)
self._client = None
raise RuntimeError("Unexpected error initializing AsyncOpenAI client.")
def get_client(self) -> AsyncOpenAI:
"""Returns the initialized AsyncOpenAI client. Raises error if not initialized."""
if self._client is None:
logger.error(
"AsyncOpenAI client accessed before initialization or after a failed initialization."
)
raise RuntimeError(
"AsyncOpenAI client is not initialized. Please provide a valid API key."
)
return self._client
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit - cleanup resources."""
self.close()
return False
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit - cleanup resources."""
await self.aclose()
return False
def close(self) -> None:
"""Close the OpenAI client synchronously."""
if self._client:
try:
# OpenAI client has a close method for cleanup
if hasattr(self._client, "close"):
self._client.close()
logger.debug("OpenAI client closed")
except Exception as e:
logger.warning(f"Error closing OpenAI client: {e}")
finally:
self._client = None
async def aclose(self) -> None:
"""Close the OpenAI client asynchronously."""
if self._client:
try:
# OpenAI async client has an aclose method
if hasattr(self._client, "aclose"):
await self._client.aclose()
elif hasattr(self._client, "close"):
self._client.close()
logger.debug("OpenAI client closed (async)")
except Exception as e:
logger.warning(f"Error closing OpenAI client: {e}")
finally:
self._client = None
# --- Agents SDK Utility ---
async def structured_agent_call(
openai_client: AsyncOpenAI,
model: str,
instructions: str,
user_input: str,
output_type: type[T],
cache: Optional[ResponseCache] = None,
cache_key: Optional[str] = None,
temperature: float = 0.7,
timeout: float = 120.0,
retry_attempts: int = 3,
) -> T:
"""
Make a single-turn structured output call using the agents SDK.
This is a lightweight wrapper for simple structured output calls,
not intended for complex multi-agent workflows.
Args:
openai_client: AsyncOpenAI client instance
model: Model name (e.g., "gpt-5.2", "gpt-5.2-chat-latest")
instructions: System instructions for the agent
user_input: User prompt/input
output_type: Pydantic model class for structured output
cache: Optional ResponseCache instance
cache_key: Cache key (required if cache is provided)
temperature: Model temperature (default 0.7)
timeout: Request timeout in seconds (default 120)
retry_attempts: Number of retry attempts (default 3)
Returns:
Instance of output_type with the structured response
"""
# 1. Check cache first
if cache and cache_key:
cached = cache.get(cache_key, model)
if cached is not None:
logger.info(f"Using cached response for model {model}")
# Reconstruct Pydantic model from cached dict
if isinstance(cached, dict):
return output_type.model_validate(cached)
return cached
# 2. Set up the OpenAI client for agents SDK
set_default_openai_client(openai_client, use_for_tracing=False)
# 3. Build model settings with GPT-5.x reasoning support
model_settings_kwargs: dict = {"temperature": temperature}
# GPT-5.x (not chat-latest) supports reasoning_effort
if model.startswith("gpt-5") and "chat-latest" not in model:
from openai.types.shared import Reasoning
model_settings_kwargs["reasoning"] = Reasoning(effort="none")
model_settings = ModelSettings(**model_settings_kwargs)
# 4. Create agent with structured output
agent = Agent(
name="structured_output_agent",
instructions=instructions,
model=model,
model_settings=model_settings,
output_type=output_type,
)
# 5. Execute with retry and timeout
last_error: Optional[Exception] = None
for attempt in range(retry_attempts):
try:
result = await asyncio.wait_for(
Runner.run(agent, user_input),
timeout=timeout,
)
# 6. Extract structured output
output = result.final_output
# 7. Cache successful result (as dict for serialization)
if cache and cache_key and output is not None:
if isinstance(output, BaseModel):
cache.set(cache_key, model, output.model_dump())
else:
cache.set(cache_key, model, output)
logger.debug(f"Successfully received response from model {model}")
return output
except asyncio.TimeoutError as e:
last_error = e
if attempt < retry_attempts - 1:
wait_time = 4 * (2**attempt) # Exponential backoff
logger.warning(
f"Agent timed out (attempt {attempt + 1}/{retry_attempts}), "
f"retrying in {wait_time}s..."
)
await asyncio.sleep(wait_time)
continue
logger.error(f"Agent timed out after {retry_attempts} attempts")
raise
except Exception as e:
last_error = e
if attempt < retry_attempts - 1:
wait_time = 4 * (2**attempt)
logger.warning(
f"Agent failed (attempt {attempt + 1}/{retry_attempts}): {e}, "
f"retrying in {wait_time}s..."
)
await asyncio.sleep(wait_time)
continue
logger.error(f"Agent failed after {retry_attempts} attempts: {e}")
raise
raise RuntimeError(f"Retry loop exited without result: {last_error}")
# Generic schema for arbitrary JSON structured outputs
class GenericJsonOutput(BaseModel):
"""Generic container for JSON output - allows any structure."""
model_config = {"extra": "allow"} # Allow arbitrary fields
async def structured_output_completion(
openai_client: AsyncOpenAI,
model: str,
response_format: dict, # Legacy parameter - kept for API compatibility
system_prompt: str,
user_prompt: str,
cache: ResponseCache,
) -> Optional[dict]:
"""
Makes an API call with structured output using agents SDK.
Note: response_format parameter is ignored - the agents SDK handles
JSON parsing automatically. For typed outputs, use structured_agent_call() directly.
"""
cache_key = f"{system_prompt}:{user_prompt}"
# Ensure system_prompt includes JSON instruction
effective_system_prompt = system_prompt
if "JSON object matching the specified schema" not in system_prompt:
effective_system_prompt = f"{system_prompt}\nProvide your response as a JSON object matching the specified schema."
try:
result = await structured_agent_call(
openai_client=openai_client,
model=model,
instructions=effective_system_prompt.strip(),
user_input=user_prompt.strip(),
output_type=GenericJsonOutput,
cache=cache,
cache_key=cache_key,
temperature=0.7,
)
# Convert Pydantic model back to dict for backward compatibility
if isinstance(result, BaseModel):
return result.model_dump()
return result
except Exception as e:
logger.error(
f"structured_output_completion failed for model {model}: {e}",
exc_info=True,
)
raise # Re-raise unexpected errors
# Specific OpenAI exceptions to retry on
RETRYABLE_OPENAI_ERRORS = (
APIConnectionError,
RateLimitError,
APIStatusError, # Typically for 5xx server errors
)
# --- New OpenAIRateLimiter Class (Subtask 9.2) ---
class OpenAIRateLimiter:
"""Manages token usage to proactively stay within (estimated) OpenAI rate limits."""
def __init__(self, tokens_per_minute: int = 60000): # Default, can be configured
self.tokens_per_minute_limit: int = tokens_per_minute
self.tokens_used_current_window: int = 0
self.current_window_start_time: float = time.monotonic()
async def wait_if_needed(self, estimated_tokens_for_request: int):
"""Waits if adding the estimated tokens would exceed the rate limit for the current window."""
current_time = time.monotonic()
# Check if the 60-second window has passed
if current_time - self.current_window_start_time >= 60.0:
# Reset window and token count
self.current_window_start_time = current_time
self.tokens_used_current_window = 0
logger.debug("OpenAIRateLimiter: Window reset.")
# Check if the request would exceed the limit in the current window
if (
self.tokens_used_current_window + estimated_tokens_for_request
> self.tokens_per_minute_limit
):
time_to_wait = (self.current_window_start_time + 60.0) - current_time
if time_to_wait > 0:
logger.info(
f"OpenAIRateLimiter: Approaching token limit. Waiting for {time_to_wait:.2f} seconds to reset window."
)
await asyncio.sleep(time_to_wait)
# After waiting for the window to reset, reset counters
self.current_window_start_time = time.monotonic() # New window starts now
self.tokens_used_current_window = 0
logger.debug("OpenAIRateLimiter: Window reset after waiting.")
# If we are here, it's safe to proceed (or we've waited and reset)
# Add tokens for the current request
self.tokens_used_current_window += estimated_tokens_for_request
logger.debug(
f"OpenAIRateLimiter: Tokens used in current window: {self.tokens_used_current_window}/{self.tokens_per_minute_limit}"
)
# Global instance of the rate limiter
# This assumes a single rate limit bucket for all calls from this application instance.
# More sophisticated scenarios might need per-model or per-key limiters.
openai_rate_limiter = OpenAIRateLimiter() # Using default 60k TPM for now
|