Mirrowel commited on
Commit
b079814
·
1 Parent(s): f835300

feat(client): implement robust streaming error handling

Browse files

- Consolidates internal error handling in `_safe_streaming_wrapper` to always raise `StreamedAPIError` for critical Litellm exceptions and parsed API errors. This ensures the main retry loop consistently handles all API errors, rather than some being yielded directly.
- Implements comprehensive processing for `StreamedAPIError` (and other `litellm` exceptions) within the `_stream_request` loop.
- Introduces distinct logic to detect and handle fatal quota errors from providers (e.g., `resource_exhausted`):
- Parses detailed quota information from provider error messages.
- Yields a specific, client-friendly `proxy_quota_error` message.
- Gracefully terminates the stream with `data: [DONE]\n\n`, preventing further retries on an exhausted key.
- For recoverable errors (e.g., general rate limits), yields a `proxy_key_rotation_error` message to the client and logs key rotation. Applies a global cooldown for IP-based rate limits.
- Adds logging for stream connection establishment for better visibility.

BREAKING CHANGE: The format and behavior of in-stream error messages have changed. Previously, some provider errors were yielded directly within the stream with potentially varied JSON structures. Now, all API errors are processed internally and yielded as a structured `{"error": {...}}` object, consistently typed as `proxy_key_rotation_error` for recoverable issues or `proxy_quota_error` for fatal quota errors. For fatal quota errors, the stream will now explicitly terminate with `data: [DONE]\n\n` instead of attempting further retries on an exhausted key.

Files changed (1) hide show
  1. src/rotator_library/client.py +75 -25
src/rotator_library/client.py CHANGED
@@ -207,6 +207,13 @@ class RotatingClient:
207
  lib_logger.debug(f"Stream ended with incomplete data in buffer: {json_buffer}")
208
  break
209
 
 
 
 
 
 
 
 
210
  except Exception as e:
211
  try:
212
  raw_chunk = ""
@@ -234,15 +241,8 @@ class RotatingClient:
234
  # If parsing succeeds, we have the complete object.
235
  lib_logger.debug(f"Successfully reassembled JSON from stream: {json_buffer}")
236
 
237
- if "error" in parsed_data:
238
- # Yield the error to the client, then raise internally to trigger rotation.
239
- yield f"data: {json.dumps(parsed_data)}\n\n"
240
- raise StreamedAPIError("Provider error received in stream", data=parsed_data)
241
- else:
242
- # This is an unexpected case, but we handle it by yielding the data.
243
- yield f"data: {json.dumps(parsed_data)}\n\n"
244
-
245
- json_buffer = "" # Clear the buffer on success
246
 
247
  except json.JSONDecodeError:
248
  # This is the expected outcome if the JSON in the buffer is not yet complete.
@@ -499,6 +499,8 @@ class RotatingClient:
499
  logger_fn=self._litellm_logger_callback
500
  )
501
 
 
 
502
  key_acquired = False
503
  stream_generator = self._safe_streaming_wrapper(response, current_key, model, request)
504
 
@@ -511,27 +513,75 @@ class RotatingClient:
511
  log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_headers=dict(request.headers) if request else {})
512
  classified_error = classify_error(e)
513
 
514
- # Inform the client about the temporary failure before rotating.
515
- # This keeps the connection alive.
516
- error_message = f"Provider API key failed with {classified_error.error_type}. Rotating to a new key."
517
- error_data = {
518
- "error": {
519
- "message": error_message,
520
- "type": "proxy_key_rotation_error",
521
- "code": classified_error.status_code
522
- }
523
- }
524
- yield f"data: {json.dumps(error_data)}\n\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
525
 
526
- lib_logger.warning(f"Key ...{current_key[-4:]} hit rate limit during stream for model {model}. Reason: '{str(e).split('\n')[0]}'. Rotating key.")
527
 
 
528
  if classified_error.error_type == 'rate_limit' and classified_error.status_code == 429:
529
  cooldown_duration = classified_error.retry_after or 60
530
  await self.cooldown_manager.start_cooldown(provider, cooldown_duration)
531
  lib_logger.warning(f"IP-based rate limit detected for {provider}. Starting a {cooldown_duration}-second global cooldown.")
532
 
533
  await self.usage_manager.record_failure(current_key, model, classified_error)
534
- break
535
 
536
  except (APIConnectionError, litellm.InternalServerError, litellm.ServiceUnavailableError) as e:
537
  last_exception = e
@@ -650,7 +700,7 @@ class RotatingClient:
650
  """Returns a list of available models for a specific provider, with caching."""
651
  lib_logger.info(f"Getting available models for provider: {provider}")
652
  if provider in self._model_list_cache:
653
- lib_logger.info(f"Returning cached models for provider: {provider}")
654
  return self._model_list_cache[provider]
655
 
656
  keys_for_provider = self.api_keys.get(provider)
@@ -666,14 +716,14 @@ class RotatingClient:
666
  if provider_instance:
667
  for api_key in shuffled_keys:
668
  try:
669
- lib_logger.info(f"Attempting to get models for {provider} with key ...{api_key[-4:]}")
670
  models = await provider_instance.get_models(api_key, self.http_client)
671
  lib_logger.info(f"Got {len(models)} models for provider: {provider}")
672
  self._model_list_cache[provider] = models
673
  return models
674
  except Exception as e:
675
  classified_error = classify_error(e)
676
- lib_logger.warning(f"Failed to get models for provider {provider} with key ...{api_key[-4:]}: {classified_error.error_type}. Trying next key.")
677
  continue # Try the next key
678
 
679
  lib_logger.error(f"Failed to get models for provider {provider} after trying all keys.")
 
207
  lib_logger.debug(f"Stream ended with incomplete data in buffer: {json_buffer}")
208
  break
209
 
210
+ except (litellm.RateLimitError, litellm.ServiceUnavailableError, litellm.InternalServerError, APIConnectionError) as e:
211
+ # This is a critical, typed error from litellm that signals a key failure.
212
+ # We do not try to parse it here. We wrap it and raise it immediately
213
+ # for the outer retry loop to handle.
214
+ lib_logger.warning(f"Caught a critical API error mid-stream: {type(e).__name__}. Signaling for key rotation.")
215
+ raise StreamedAPIError("Provider error received in stream", data=e)
216
+
217
  except Exception as e:
218
  try:
219
  raw_chunk = ""
 
241
  # If parsing succeeds, we have the complete object.
242
  lib_logger.debug(f"Successfully reassembled JSON from stream: {json_buffer}")
243
 
244
+ # Wrap the complete error object and raise it. The outer function will decide how to handle it.
245
+ raise StreamedAPIError("Provider error received in stream", data=parsed_data)
 
 
 
 
 
 
 
246
 
247
  except json.JSONDecodeError:
248
  # This is the expected outcome if the JSON in the buffer is not yet complete.
 
499
  logger_fn=self._litellm_logger_callback
500
  )
501
 
502
+ lib_logger.info(f"Stream connection established for key ...{current_key[-4:]}. Processing response.")
503
+
504
  key_acquired = False
505
  stream_generator = self._safe_streaming_wrapper(response, current_key, model, request)
506
 
 
513
  log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_headers=dict(request.headers) if request else {})
514
  classified_error = classify_error(e)
515
 
516
+ # This is the final, robust handler for streamed errors.
517
+ error_payload = {}
518
+ # The actual exception might be wrapped in our StreamedAPIError.
519
+ original_exc = getattr(e, 'data', e)
520
+
521
+ try:
522
+ # The full error JSON is in the string representation of the exception.
523
+ json_str_match = re.search(r'(\{.*\})', str(original_exc), re.DOTALL)
524
+ if json_str_match:
525
+ # The string may contain byte-escaped characters (e.g., \\n).
526
+ cleaned_str = codecs.decode(json_str_match.group(1), 'unicode_escape')
527
+ error_payload = json.loads(cleaned_str)
528
+ except (json.JSONDecodeError, TypeError):
529
+ lib_logger.warning("Could not parse JSON details from streamed error exception.")
530
+ error_payload = {}
531
+
532
+ error_details = error_payload.get("error", {})
533
+ error_status = error_details.get("status", "")
534
+ # Fallback to the full string if parsing fails.
535
+ error_message_text = error_details.get("message", str(original_exc))
536
+
537
+ if "quota" in error_message_text.lower() or "resource_exhausted" in error_status.lower():
538
+ # This is a fatal quota error. Terminate the stream with a clear message.
539
+ quota_value = "N/A"
540
+ quota_id = "N/A"
541
+ if "details" in error_details and isinstance(error_details.get("details"), list):
542
+ for detail in error_details["details"]:
543
+ if isinstance(detail.get("violations"), list):
544
+ for violation in detail["violations"]:
545
+ if "quotaValue" in violation:
546
+ quota_value = violation["quotaValue"]
547
+ if "quotaId" in violation:
548
+ quota_id = violation["quotaId"]
549
+ if quota_value != "N/A" and quota_id != "N/A":
550
+ break
551
+
552
+ # 1. Detailed message for the end client
553
+ client_error_message = (
554
+ f"FATAL: You have exceeded your API quota. "
555
+ f"Message: '{error_message_text}'. "
556
+ f"Limit: {quota_value} (Quota ID: {quota_id})."
557
+ )
558
+
559
+ # 2. Concise message for the console log
560
+ console_log_message = (
561
+ f"Terminating stream for key ...{current_key[-4:]} due to fatal quota error. "
562
+ f"ID: {quota_id}, Limit: {quota_value}."
563
+ )
564
+ lib_logger.warning(console_log_message)
565
+
566
+ # 3. Yield the detailed message to the client and terminate
567
+ yield f"data: {json.dumps({'error': {'message': client_error_message, 'type': 'proxy_quota_error'}})}\n\n"
568
+ yield "data: [DONE]\n\n"
569
+ return # Exit the generator completely.
570
+
571
+ # --- NON-QUOTA ERROR: Fallback to key rotation ---
572
+ rotation_error_message = f"Provider API key failed with {classified_error.error_type}. Rotating to a new key."
573
+ yield f"data: {json.dumps({'error': {'message': rotation_error_message, 'type': 'proxy_key_rotation_error', 'code': classified_error.status_code}})}\n\n"
574
 
575
+ lib_logger.warning(f"Key ...{current_key[-4:]} encountered a recoverable error during stream for model {model}. Rotating key.")
576
 
577
+ # Only apply global cooldown for non-quota 429s.
578
  if classified_error.error_type == 'rate_limit' and classified_error.status_code == 429:
579
  cooldown_duration = classified_error.retry_after or 60
580
  await self.cooldown_manager.start_cooldown(provider, cooldown_duration)
581
  lib_logger.warning(f"IP-based rate limit detected for {provider}. Starting a {cooldown_duration}-second global cooldown.")
582
 
583
  await self.usage_manager.record_failure(current_key, model, classified_error)
584
+ break # Break to try the next key
585
 
586
  except (APIConnectionError, litellm.InternalServerError, litellm.ServiceUnavailableError) as e:
587
  last_exception = e
 
700
  """Returns a list of available models for a specific provider, with caching."""
701
  lib_logger.info(f"Getting available models for provider: {provider}")
702
  if provider in self._model_list_cache:
703
+ lib_logger.debug(f"Returning cached models for provider: {provider}")
704
  return self._model_list_cache[provider]
705
 
706
  keys_for_provider = self.api_keys.get(provider)
 
716
  if provider_instance:
717
  for api_key in shuffled_keys:
718
  try:
719
+ lib_logger.debug(f"Attempting to get models for {provider} with key ...{api_key[-4:]}")
720
  models = await provider_instance.get_models(api_key, self.http_client)
721
  lib_logger.info(f"Got {len(models)} models for provider: {provider}")
722
  self._model_list_cache[provider] = models
723
  return models
724
  except Exception as e:
725
  classified_error = classify_error(e)
726
+ lib_logger.debug(f"Failed to get models for provider {provider} with key ...{api_key[-4:]}: {classified_error.error_type}. Trying next key.")
727
  continue # Try the next key
728
 
729
  lib_logger.error(f"Failed to get models for provider {provider} after trying all keys.")