File size: 12,601 Bytes
d09f6aa
 
9b095d8
 
7e2bb59
9b095d8
 
d09f6aa
9b095d8
 
100024e
d09f6aa
9b095d8
 
 
d09f6aa
9b095d8
 
d09f6aa
9b095d8
d09f6aa
 
 
100024e
d09f6aa
 
100024e
 
d09f6aa
100024e
 
d09f6aa
 
 
 
 
100024e
 
d09f6aa
100024e
d09f6aa
 
 
 
100024e
d09f6aa
 
 
100024e
d09f6aa
100024e
 
d09f6aa
 
100024e
d09f6aa
 
100024e
d09f6aa
 
 
93bd7fb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d09f6aa
9b095d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c467eef
9b095d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c467eef
9b095d8
 
c467eef
9b095d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
100024e
9b095d8
d09f6aa
9b095d8
d09f6aa
 
9b095d8
 
 
 
d09f6aa
9b095d8
 
 
 
d09f6aa
9b095d8
 
 
 
d09f6aa
9b095d8
 
 
d09f6aa
9b095d8
 
 
 
 
 
d09f6aa
 
9b095d8
 
 
d09f6aa
 
 
 
9b095d8
d09f6aa
 
 
100024e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# Module for OpenAI client management and API call logic

import asyncio
import time
from typing import Optional, TypeVar

from agents import Agent, ModelSettings, Runner, set_default_openai_client
from openai import (
    APIConnectionError,
    APIStatusError,
    AsyncOpenAI,
    OpenAIError,
    RateLimitError,
)
from pydantic import BaseModel

from ankigen_core.logging import logger
from ankigen_core.utils import ResponseCache

T = TypeVar("T", bound=BaseModel)


class OpenAIClientManager:
    """Manages the AsyncOpenAI client instance."""

    def __init__(self):
        self._client: Optional[AsyncOpenAI] = None
        self._api_key: Optional[str] = None

    async def initialize_client(self, api_key: str):
        """Initializes the AsyncOpenAI client with the given API key."""
        if not api_key or not api_key.startswith("sk-"):
            logger.error("Invalid OpenAI API key provided for client initialization.")
            raise ValueError("Invalid OpenAI API key format.")
        self._api_key = api_key
        try:
            self._client = AsyncOpenAI(api_key=self._api_key)
            logger.info("AsyncOpenAI client initialized successfully.")
        except OpenAIError as e:  # Catch specific OpenAI errors
            logger.error(f"Failed to initialize AsyncOpenAI client: {e}", exc_info=True)
            self._client = None  # Ensure client is None on failure
            raise  # Re-raise the OpenAIError to be caught by UI
        except Exception as e:  # Catch any other unexpected errors
            logger.error(
                f"An unexpected error occurred during AsyncOpenAI client initialization: {e}",
                exc_info=True,
            )
            self._client = None
            raise RuntimeError("Unexpected error initializing AsyncOpenAI client.")

    def get_client(self) -> AsyncOpenAI:
        """Returns the initialized AsyncOpenAI client. Raises error if not initialized."""
        if self._client is None:
            logger.error(
                "AsyncOpenAI client accessed before initialization or after a failed initialization."
            )
            raise RuntimeError(
                "AsyncOpenAI client is not initialized. Please provide a valid API key."
            )
        return self._client

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit - cleanup resources."""
        self.close()
        return False

    async def __aenter__(self):
        """Async context manager entry."""
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit - cleanup resources."""
        await self.aclose()
        return False

    def close(self) -> None:
        """Close the OpenAI client synchronously."""
        if self._client:
            try:
                # OpenAI client has a close method for cleanup
                if hasattr(self._client, "close"):
                    self._client.close()
                logger.debug("OpenAI client closed")
            except Exception as e:
                logger.warning(f"Error closing OpenAI client: {e}")
            finally:
                self._client = None

    async def aclose(self) -> None:
        """Close the OpenAI client asynchronously."""
        if self._client:
            try:
                # OpenAI async client has an aclose method
                if hasattr(self._client, "aclose"):
                    await self._client.aclose()
                elif hasattr(self._client, "close"):
                    self._client.close()
                logger.debug("OpenAI client closed (async)")
            except Exception as e:
                logger.warning(f"Error closing OpenAI client: {e}")
            finally:
                self._client = None


# --- Agents SDK Utility ---


async def structured_agent_call(
    openai_client: AsyncOpenAI,
    model: str,
    instructions: str,
    user_input: str,
    output_type: type[T],
    cache: Optional[ResponseCache] = None,
    cache_key: Optional[str] = None,
    temperature: float = 0.7,
    timeout: float = 120.0,
    retry_attempts: int = 3,
) -> T:
    """
    Make a single-turn structured output call using the agents SDK.

    This is a lightweight wrapper for simple structured output calls,
    not intended for complex multi-agent workflows.

    Args:
        openai_client: AsyncOpenAI client instance
        model: Model name (e.g., "gpt-5.2", "gpt-5.2-chat-latest")
        instructions: System instructions for the agent
        user_input: User prompt/input
        output_type: Pydantic model class for structured output
        cache: Optional ResponseCache instance
        cache_key: Cache key (required if cache is provided)
        temperature: Model temperature (default 0.7)
        timeout: Request timeout in seconds (default 120)
        retry_attempts: Number of retry attempts (default 3)

    Returns:
        Instance of output_type with the structured response
    """
    # 1. Check cache first
    if cache and cache_key:
        cached = cache.get(cache_key, model)
        if cached is not None:
            logger.info(f"Using cached response for model {model}")
            # Reconstruct Pydantic model from cached dict
            if isinstance(cached, dict):
                return output_type.model_validate(cached)
            return cached

    # 2. Set up the OpenAI client for agents SDK
    set_default_openai_client(openai_client, use_for_tracing=False)

    # 3. Build model settings with GPT-5.x reasoning support
    model_settings_kwargs: dict = {"temperature": temperature}

    # GPT-5.x (not chat-latest) supports reasoning_effort
    if model.startswith("gpt-5") and "chat-latest" not in model:
        from openai.types.shared import Reasoning

        model_settings_kwargs["reasoning"] = Reasoning(effort="none")

    model_settings = ModelSettings(**model_settings_kwargs)

    # 4. Create agent with structured output
    agent = Agent(
        name="structured_output_agent",
        instructions=instructions,
        model=model,
        model_settings=model_settings,
        output_type=output_type,
    )

    # 5. Execute with retry and timeout
    last_error: Optional[Exception] = None
    for attempt in range(retry_attempts):
        try:
            result = await asyncio.wait_for(
                Runner.run(agent, user_input),
                timeout=timeout,
            )

            # 6. Extract structured output
            output = result.final_output

            # 7. Cache successful result (as dict for serialization)
            if cache and cache_key and output is not None:
                if isinstance(output, BaseModel):
                    cache.set(cache_key, model, output.model_dump())
                else:
                    cache.set(cache_key, model, output)

            logger.debug(f"Successfully received response from model {model}")
            return output

        except asyncio.TimeoutError as e:
            last_error = e
            if attempt < retry_attempts - 1:
                wait_time = 4 * (2**attempt)  # Exponential backoff
                logger.warning(
                    f"Agent timed out (attempt {attempt + 1}/{retry_attempts}), "
                    f"retrying in {wait_time}s..."
                )
                await asyncio.sleep(wait_time)
                continue
            logger.error(f"Agent timed out after {retry_attempts} attempts")
            raise
        except Exception as e:
            last_error = e
            if attempt < retry_attempts - 1:
                wait_time = 4 * (2**attempt)
                logger.warning(
                    f"Agent failed (attempt {attempt + 1}/{retry_attempts}): {e}, "
                    f"retrying in {wait_time}s..."
                )
                await asyncio.sleep(wait_time)
                continue
            logger.error(f"Agent failed after {retry_attempts} attempts: {e}")
            raise

    raise RuntimeError(f"Retry loop exited without result: {last_error}")


# Generic schema for arbitrary JSON structured outputs
class GenericJsonOutput(BaseModel):
    """Generic container for JSON output - allows any structure."""

    model_config = {"extra": "allow"}  # Allow arbitrary fields


async def structured_output_completion(
    openai_client: AsyncOpenAI,
    model: str,
    response_format: dict,  # Legacy parameter - kept for API compatibility
    system_prompt: str,
    user_prompt: str,
    cache: ResponseCache,
) -> Optional[dict]:
    """
    Makes an API call with structured output using agents SDK.

    Note: response_format parameter is ignored - the agents SDK handles
    JSON parsing automatically. For typed outputs, use structured_agent_call() directly.
    """
    cache_key = f"{system_prompt}:{user_prompt}"

    # Ensure system_prompt includes JSON instruction
    effective_system_prompt = system_prompt
    if "JSON object matching the specified schema" not in system_prompt:
        effective_system_prompt = f"{system_prompt}\nProvide your response as a JSON object matching the specified schema."

    try:
        result = await structured_agent_call(
            openai_client=openai_client,
            model=model,
            instructions=effective_system_prompt.strip(),
            user_input=user_prompt.strip(),
            output_type=GenericJsonOutput,
            cache=cache,
            cache_key=cache_key,
            temperature=0.7,
        )

        # Convert Pydantic model back to dict for backward compatibility
        if isinstance(result, BaseModel):
            return result.model_dump()
        return result

    except Exception as e:
        logger.error(
            f"structured_output_completion failed for model {model}: {e}",
            exc_info=True,
        )
        raise  # Re-raise unexpected errors


# Specific OpenAI exceptions to retry on
RETRYABLE_OPENAI_ERRORS = (
    APIConnectionError,
    RateLimitError,
    APIStatusError,  # Typically for 5xx server errors
)

# --- New OpenAIRateLimiter Class (Subtask 9.2) ---


class OpenAIRateLimiter:
    """Manages token usage to proactively stay within (estimated) OpenAI rate limits."""

    def __init__(self, tokens_per_minute: int = 60000):  # Default, can be configured
        self.tokens_per_minute_limit: int = tokens_per_minute
        self.tokens_used_current_window: int = 0
        self.current_window_start_time: float = time.monotonic()

    async def wait_if_needed(self, estimated_tokens_for_request: int):
        """Waits if adding the estimated tokens would exceed the rate limit for the current window."""
        current_time = time.monotonic()

        # Check if the 60-second window has passed
        if current_time - self.current_window_start_time >= 60.0:
            # Reset window and token count
            self.current_window_start_time = current_time
            self.tokens_used_current_window = 0
            logger.debug("OpenAIRateLimiter: Window reset.")

        # Check if the request would exceed the limit in the current window
        if (
            self.tokens_used_current_window + estimated_tokens_for_request
            > self.tokens_per_minute_limit
        ):
            time_to_wait = (self.current_window_start_time + 60.0) - current_time
            if time_to_wait > 0:
                logger.info(
                    f"OpenAIRateLimiter: Approaching token limit. Waiting for {time_to_wait:.2f} seconds to reset window."
                )
                await asyncio.sleep(time_to_wait)
            # After waiting for the window to reset, reset counters
            self.current_window_start_time = time.monotonic()  # New window starts now
            self.tokens_used_current_window = 0
            logger.debug("OpenAIRateLimiter: Window reset after waiting.")

        # If we are here, it's safe to proceed (or we've waited and reset)
        # Add tokens for the current request
        self.tokens_used_current_window += estimated_tokens_for_request
        logger.debug(
            f"OpenAIRateLimiter: Tokens used in current window: {self.tokens_used_current_window}/{self.tokens_per_minute_limit}"
        )


# Global instance of the rate limiter
# This assumes a single rate limit bucket for all calls from this application instance.
# More sophisticated scenarios might need per-model or per-key limiters.
openai_rate_limiter = OpenAIRateLimiter()  # Using default 60k TPM for now