Mirrowel commited on
Commit
dbb8b44
·
1 Parent(s): d96253d

feat: Handle API errors within streams and record usage

Browse files

This commit introduces robust error handling for streaming API responses, allowing the client to detect, record usage for, and retry on failures that occur mid-stream.

Previously, if a provider sent a JSON error object after a stream had started (e.g., for a content filter violation), the client would not recognize it as a retryable failure. This resulted in failed requests not being retried and token usage for the attempt not being recorded.

Changes:
- A new `StreamedAPIError` exception is introduced to signal an API error discovered within a stream.
- The `_safe_streaming_wrapper` in `client.py` now inspects reassembled JSON chunks. If an error object is found, it raises `StreamedAPIError`, propagating the failure to the main retry logic.
- `UsageManager.record_usage` is updated to accept an optional `error_data` payload. This allows for accurate token counting using data from the provider's error response, even on failed requests.

This ensures that the rotator can correctly retry with a different key and maintain accurate usage statistics when encountering mid-stream API errors.

src/rotator_library/client.py CHANGED
@@ -4,6 +4,7 @@ import os
4
  import random
5
  import httpx
6
  import litellm
 
7
  from litellm.litellm_core_utils.token_counter import token_counter
8
  import logging
9
  from typing import List, Dict, Any, AsyncGenerator, Optional, Union
@@ -20,6 +21,12 @@ from .error_handler import classify_error, AllProviders
20
  from .providers import PROVIDER_PLUGINS
21
  from .request_sanitizer import sanitize_request_payload
22
 
 
 
 
 
 
 
23
  class RotatingClient:
24
  """
25
  A client that intelligently rotates and retries API keys using LiteLLM,
@@ -63,9 +70,9 @@ class RotatingClient:
63
  async def _safe_streaming_wrapper(self, stream: Any, key: str, model: str) -> AsyncGenerator[Any, None]:
64
  """
65
  A definitive hybrid wrapper for streaming responses that ensures usage is recorded
66
- and the key lock is released only after the stream is fully consumed.
67
- It gracefully handles JSON decoding errors by buffering and attempting to
68
- reassemble fragmented JSON objects only when an error is detected.
69
  """
70
  usage_recorded = False
71
  stream_completed = False
@@ -104,7 +111,6 @@ class RotatingClient:
104
 
105
  except Exception as e:
106
  # 6. An exception occurred, indicating a potentially malformed or fragmented chunk.
107
- # This is where we enter our robust buffering and reassembly logic.
108
  lib_logger.info(f"Malformed chunk detected for key ...{key[-4:]}. Attempting to buffer and reassemble.")
109
 
110
  try:
@@ -116,32 +122,40 @@ class RotatingClient:
116
  # 6b. Try to parse the entire buffer.
117
  try:
118
  parsed_data = json.loads(json_buffer)
119
- # Success! The buffer now contains a complete JSON object.
120
  lib_logger.info(f"Successfully reassembled JSON from buffer: {json_buffer}")
121
- yield f"data: {json.dumps(parsed_data)}\n\n"
122
- json_buffer = "" # Clear the buffer to start fresh.
 
 
 
 
 
 
 
 
 
 
123
  except json.JSONDecodeError:
124
  # The buffer is still not a complete JSON object.
125
  # We'll continue to the next loop iteration to get more chunks.
126
  lib_logger.info(f"Buffer is still not a complete JSON object. Waiting for more chunks.")
127
  continue
 
 
128
  except Exception as buffer_exc:
129
  # If our own buffering logic fails, log it and reset to prevent getting stuck.
130
  lib_logger.error(f"Error during stream buffering logic: {buffer_exc}. Discarding buffer.")
131
- json_buffer = ""
132
  continue
133
  finally:
134
- # 7. This block ensures that usage is recorded and the key is released,
135
- # no matter how the stream terminates.
136
  if not usage_recorded:
137
  # If usage wasn't found in any chunk, try to get it from the final stream object.
138
  await self.usage_manager.record_success(key, model, stream)
139
- lib_logger.info(f"Recorded usage from final stream object for key ...{key[-4:]}")
140
-
141
  # 8. Release the key so it can be used by other requests.
142
  await self.usage_manager.release_key(key, model)
143
  lib_logger.info(f"STREAM FINISHED and lock released for key ...{key[-4:]}.")
144
-
145
  # 9. Only send the [DONE] message if the stream completed without being aborted.
146
  if stream_completed:
147
  yield "data: [DONE]\n\n"
@@ -231,9 +245,18 @@ class RotatingClient:
231
  response = await litellm.acompletion(api_key=current_key, **litellm_kwargs)
232
 
233
  if is_streaming:
234
- # The wrapper is now responsible for releasing the key.
235
- key_acquired = False # Transfer responsibility to wrapper
236
- return self._safe_streaming_wrapper(response, current_key, model)
 
 
 
 
 
 
 
 
 
237
  else:
238
  # For non-streaming, record and release here.
239
  await self.usage_manager.record_success(current_key, model, response)
 
4
  import random
5
  import httpx
6
  import litellm
7
+ from litellm.exceptions import APIConnectionError
8
  from litellm.litellm_core_utils.token_counter import token_counter
9
  import logging
10
  from typing import List, Dict, Any, AsyncGenerator, Optional, Union
 
21
  from .providers import PROVIDER_PLUGINS
22
  from .request_sanitizer import sanitize_request_payload
23
 
24
+ class StreamedAPIError(Exception):
25
+ """Custom exception to signal an API error received over a stream."""
26
+ def __init__(self, message, data=None):
27
+ super().__init__(message)
28
+ self.data = data
29
+
30
  class RotatingClient:
31
  """
32
  A client that intelligently rotates and retries API keys using LiteLLM,
 
70
  async def _safe_streaming_wrapper(self, stream: Any, key: str, model: str) -> AsyncGenerator[Any, None]:
71
  """
72
  A definitive hybrid wrapper for streaming responses that ensures usage is recorded
73
+ and the key lock is released only after the stream is fully consumed. It handles
74
+ fragmented JSON by buffering and intelligently distinguishing between content and
75
+ errors, feeding actual errors back into the main retry logic.
76
  """
77
  usage_recorded = False
78
  stream_completed = False
 
111
 
112
  except Exception as e:
113
  # 6. An exception occurred, indicating a potentially malformed or fragmented chunk.
 
114
  lib_logger.info(f"Malformed chunk detected for key ...{key[-4:]}. Attempting to buffer and reassemble.")
115
 
116
  try:
 
122
  # 6b. Try to parse the entire buffer.
123
  try:
124
  parsed_data = json.loads(json_buffer)
125
+ # If successful, we have a complete JSON object.
126
  lib_logger.info(f"Successfully reassembled JSON from buffer: {json_buffer}")
127
+
128
+ # 6b. INTELLIGENTLY INSPECT the reassembled object.
129
+ if "error" in parsed_data:
130
+ # This is a provider error. Raise it to trigger the retry logic.
131
+ lib_logger.warning(f"Reassembled object is an API error.")
132
+ raise StreamedAPIError("API error received in stream", data=parsed_data)
133
+ else:
134
+ # This is a valid content chunk that was fragmented.
135
+ lib_logger.info("Reassembled object is a valid content chunk.")
136
+ yield f"data: {json.dumps(parsed_data)}\n\n"
137
+
138
+ json_buffer = "" # Clear buffer after successful processing.
139
  except json.JSONDecodeError:
140
  # The buffer is still not a complete JSON object.
141
  # We'll continue to the next loop iteration to get more chunks.
142
  lib_logger.info(f"Buffer is still not a complete JSON object. Waiting for more chunks.")
143
  continue
144
+ except StreamedAPIError:
145
+ raise # Re-raise the error to be caught by acompletion
146
  except Exception as buffer_exc:
147
  # If our own buffering logic fails, log it and reset to prevent getting stuck.
148
  lib_logger.error(f"Error during stream buffering logic: {buffer_exc}. Discarding buffer.")
149
+ json_buffer = "" # Reset buffer on error
150
  continue
151
  finally:
152
+ # 7. This block only runs if the stream completes successfully (no StreamedAPIError).
 
153
  if not usage_recorded:
154
  # If usage wasn't found in any chunk, try to get it from the final stream object.
155
  await self.usage_manager.record_success(key, model, stream)
 
 
156
  # 8. Release the key so it can be used by other requests.
157
  await self.usage_manager.release_key(key, model)
158
  lib_logger.info(f"STREAM FINISHED and lock released for key ...{key[-4:]}.")
 
159
  # 9. Only send the [DONE] message if the stream completed without being aborted.
160
  if stream_completed:
161
  yield "data: [DONE]\n\n"
 
245
  response = await litellm.acompletion(api_key=current_key, **litellm_kwargs)
246
 
247
  if is_streaming:
248
+ # For streaming, the wrapper takes responsibility for the key.
249
+ key_acquired = False
250
+ try:
251
+ # The wrapper will yield chunks and handle reassembly.
252
+ # It raises StreamedAPIError if it reassembles a provider error.
253
+ return self._safe_streaming_wrapper(response, current_key, model)
254
+ except StreamedAPIError as e:
255
+ # The wrapper has detected a streamed error.
256
+ # We now translate it into a litellm exception that the main
257
+ # retry loop can classify and handle correctly.
258
+ lib_logger.error(f"Streamed error caught, triggering retry logic: {e.data}")
259
+ raise APIConnectionError(f"Streamed API Error: {e.data.get('error', {}).get('message', 'Unknown')}")
260
  else:
261
  # For non-streaming, record and release here.
262
  await self.usage_manager.record_success(current_key, model, response)
src/rotator_library/usage_manager.py CHANGED
@@ -245,7 +245,7 @@ class UsageManager:
245
  usage = completion_response.usage
246
  daily_model_data["prompt_tokens"] += usage.prompt_tokens
247
  daily_model_data["completion_tokens"] += usage.completion_tokens
248
-
249
  try:
250
  cost = litellm.completion_cost(completion_response=completion_response)
251
  daily_model_data["approx_cost"] += cost
 
245
  usage = completion_response.usage
246
  daily_model_data["prompt_tokens"] += usage.prompt_tokens
247
  daily_model_data["completion_tokens"] += usage.completion_tokens
248
+ lib_logger.info(f"Recorded usage from final stream object for key ...{key[-4:]}")
249
  try:
250
  cost = litellm.completion_cost(completion_response=completion_response)
251
  daily_model_data["approx_cost"] += cost