Mirrowel commited on
Commit
856f4f3
·
1 Parent(s): aaab6f8

feat(client): enhance streaming error handling and log clarity

Browse files

- Introduce a mechanism to track consecutive quota failures within the streaming client. This allows for distinguishing between transient quota hits and persistent, input-related quota exhaustion.
- Terminate the stream with a `proxy_fatal_quota_error` message to the client only after 3 consecutive quota failures, signaling a persistent issue (e.g., input data too large).
- Refactor key rotation behavior to be largely silent for transient or recoverable errors (including single quota errors or server errors). Intermediate error messages are no longer sent to the client on each key switch, improving client experience by reducing noise.
- Integrate `colorlog` into the proxy application's console handler for improved readability and visual distinction of log levels.
- Update `rotating-api-key-client` version to `0.8` reflecting these significant updates.

BREAKING CHANGE: The streaming client's error handling and client-facing error messages have changed. Intermediate `proxy_key_rotation_error` messages previously yielded during key rotation for transient failures are no longer sent. The `proxy_quota_error` type has been replaced by `proxy_fatal_quota_error`, which is now only emitted after 3 consecutive quota failures instead of immediately. Clients consuming the streaming API should adapt to these changes in error message types and timing.

requirements.txt CHANGED
@@ -14,3 +14,5 @@ litellm
14
  filelock
15
  httpx
16
  aiofiles
 
 
 
14
  filelock
15
  httpx
16
  aiofiles
17
+
18
+ colorlog
src/proxy_app/main.py CHANGED
@@ -8,6 +8,7 @@ from fastapi.responses import StreamingResponse
8
  from fastapi.security import APIKeyHeader
9
  from dotenv import load_dotenv
10
  import logging
 
11
  from pathlib import Path
12
  import sys
13
  import json
@@ -60,10 +61,20 @@ class RotatorDebugFilter(logging.Filter):
60
  return record.levelno == logging.DEBUG and record.name.startswith('rotator_library')
61
  debug_file_handler.addFilter(RotatorDebugFilter())
62
 
63
- # Configure a console handler for concise, high-level info
64
- console_handler = logging.StreamHandler(sys.stdout)
65
  console_handler.setLevel(logging.INFO)
66
- console_handler.setFormatter(logging.Formatter('%(message)s'))
 
 
 
 
 
 
 
 
 
 
67
 
68
  # Add a filter to prevent any LiteLLM logs from cluttering the console
69
  class NoLiteLLMLogFilter(logging.Filter):
 
8
  from fastapi.security import APIKeyHeader
9
  from dotenv import load_dotenv
10
  import logging
11
+ import colorlog
12
  from pathlib import Path
13
  import sys
14
  import json
 
61
  return record.levelno == logging.DEBUG and record.name.startswith('rotator_library')
62
  debug_file_handler.addFilter(RotatorDebugFilter())
63
 
64
+ # Configure a console handler with color
65
+ console_handler = colorlog.StreamHandler(sys.stdout)
66
  console_handler.setLevel(logging.INFO)
67
+ formatter = colorlog.ColoredFormatter(
68
+ '%(log_color)s%(message)s',
69
+ log_colors={
70
+ 'DEBUG': 'cyan',
71
+ 'INFO': 'green',
72
+ 'WARNING': 'yellow',
73
+ 'ERROR': 'red',
74
+ 'CRITICAL': 'red,bg_white',
75
+ }
76
+ )
77
+ console_handler.setFormatter(formatter)
78
 
79
  # Add a filter to prevent any LiteLLM logs from cluttering the console
80
  class NoLiteLLMLogFilter(logging.Filter):
src/rotator_library/client.py CHANGED
@@ -454,6 +454,9 @@ class RotatingClient:
454
  tried_keys = set()
455
  last_exception = None
456
  kwargs = self._convert_model_params(**kwargs)
 
 
 
457
  try:
458
  while len(tried_keys) < len(keys_for_provider) and time.time() < deadline:
459
  current_key = None
@@ -545,7 +548,9 @@ class RotatingClient:
545
  error_message_text = error_details.get("message", str(original_exc))
546
 
547
  if "quota" in error_message_text.lower() or "resource_exhausted" in error_status.lower():
548
- # This is a fatal quota error. Terminate the stream with a clear message.
 
 
549
  quota_value = "N/A"
550
  quota_id = "N/A"
551
  if "details" in error_details and isinstance(error_details.get("details"), list):
@@ -559,58 +564,52 @@ class RotatingClient:
559
  if quota_value != "N/A" and quota_id != "N/A":
560
  break
561
 
562
- # 1. Detailed message for the end client
563
- client_error_message = (
564
- f"FATAL: You have exceeded your API quota. "
565
- f"Message: '{error_message_text}'. "
566
- f"Limit: {quota_value} (Quota ID: {quota_id})."
567
- )
568
-
569
- # 2. Concise message for the console log
570
- console_log_message = (
571
- f"Terminating stream for key ...{current_key[-4:]} due to fatal quota error. "
572
- f"ID: {quota_id}, Limit: {quota_value}."
573
- )
574
- lib_logger.warning(console_log_message)
575
-
576
- # 3. Yield the detailed message to the client and terminate
577
- yield f"data: {json.dumps({'error': {'message': client_error_message, 'type': 'proxy_quota_error'}})}\n\n"
578
- yield "data: [DONE]\n\n"
579
- return # Exit the generator completely.
580
-
581
- # --- NON-QUOTA ERROR: Fallback to key rotation ---
582
- rotation_error_message = f"Provider API key failed with {classified_error.error_type}. Rotating to a new key."
583
- yield f"data: {json.dumps({'error': {'message': rotation_error_message, 'type': 'proxy_key_rotation_error', 'code': classified_error.status_code}})}\n\n"
584
-
585
- lib_logger.warning(f"Key ...{current_key[-4:]} encountered a recoverable error during stream for model {model}. Rotating key.")
586
 
587
- # Only apply global cooldown for non-quota 429s.
588
- if classified_error.error_type == 'rate_limit' and classified_error.status_code == 429:
589
- cooldown_duration = classified_error.retry_after or 60
590
- await self.cooldown_manager.start_cooldown(provider, cooldown_duration)
591
- lib_logger.warning(f"IP-based rate limit detected for {provider}. Starting a {cooldown_duration}-second global cooldown.")
 
 
 
 
592
 
593
- await self.usage_manager.record_failure(current_key, model, classified_error)
594
- break # Break to try the next key
595
 
596
  except (APIConnectionError, litellm.InternalServerError, litellm.ServiceUnavailableError) as e:
 
597
  last_exception = e
598
  log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_headers=dict(request.headers) if request else {})
599
  classified_error = classify_error(e)
600
  await self.usage_manager.record_failure(current_key, model, classified_error)
601
 
602
  if attempt >= self.max_retries - 1:
603
- lib_logger.warning(f"Key ...{current_key[-4:]} failed after max retries for model {model} due to a server error. Rotating key.")
604
- # Inform the client about the temporary failure before rotating.
605
- error_message = f"Key ...{current_key[-4:]} failed after multiple retries. Rotating to a new key."
606
- error_data = {
607
- "error": {
608
- "message": error_message,
609
- "type": "proxy_key_rotation_error",
610
- "code": classified_error.status_code
611
- }
612
- }
613
- yield f"data: {json.dumps(error_data)}\n\n"
614
  break
615
 
616
  wait_time = classified_error.retry_after or (1 * (2 ** attempt)) + random.uniform(0, 1)
@@ -625,22 +624,11 @@ class RotatingClient:
625
  continue
626
 
627
  except Exception as e:
 
628
  last_exception = e
629
  log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_headers=dict(request.headers) if request else {})
630
  classified_error = classify_error(e)
631
 
632
- # For most exceptions, we notify the client and rotate the key.
633
- if classified_error.error_type not in ['invalid_request', 'context_window_exceeded', 'authentication']:
634
- error_message = f"An unexpected error occurred with key ...{current_key[-4:]}. Rotating to a new key."
635
- error_data = {
636
- "error": {
637
- "message": error_message,
638
- "type": "proxy_key_rotation_error",
639
- "code": classified_error.status_code
640
- }
641
- }
642
- yield f"data: {json.dumps(error_data)}\n\n"
643
-
644
  lib_logger.warning(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {str(e)}. Rotating key.")
645
 
646
  if classified_error.status_code == 429:
@@ -651,6 +639,7 @@ class RotatingClient:
651
  if classified_error.error_type in ['invalid_request', 'context_window_exceeded', 'authentication']:
652
  raise last_exception
653
 
 
654
  await self.usage_manager.record_failure(current_key, model, classified_error)
655
  break
656
 
@@ -761,4 +750,4 @@ class RotatingClient:
761
  for provider, models in all_provider_models.items():
762
  for model in models:
763
  flat_models.append(f"{provider}/{model}")
764
- return flat_models
 
454
  tried_keys = set()
455
  last_exception = None
456
  kwargs = self._convert_model_params(**kwargs)
457
+
458
+ consecutive_quota_failures = 0
459
+
460
  try:
461
  while len(tried_keys) < len(keys_for_provider) and time.time() < deadline:
462
  current_key = None
 
548
  error_message_text = error_details.get("message", str(original_exc))
549
 
550
  if "quota" in error_message_text.lower() or "resource_exhausted" in error_status.lower():
551
+ consecutive_quota_failures += 1
552
+ lib_logger.warning(f"Key ...{current_key[-4:]} hit a quota limit. This is consecutive failure #{consecutive_quota_failures} for this request.")
553
+
554
  quota_value = "N/A"
555
  quota_id = "N/A"
556
  if "details" in error_details and isinstance(error_details.get("details"), list):
 
564
  if quota_value != "N/A" and quota_id != "N/A":
565
  break
566
 
567
+ await self.usage_manager.record_failure(current_key, model, classified_error)
568
+
569
+ if consecutive_quota_failures >= 3:
570
+ console_log_message = (
571
+ f"Terminating stream for key ...{current_key[-4:]} due to 3rd consecutive quota error. "
572
+ f"This is now considered a fatal input data error. ID: {quota_id}, Limit: {quota_value}."
573
+ )
574
+ client_error_message = (
575
+ "FATAL: Request failed after 3 consecutive quota errors, "
576
+ "indicating the input data is too large for the model's per-request limit. "
577
+ f"Last Error Message: '{error_message_text}'. Limit: {quota_value} (Quota ID: {quota_id})."
578
+ )
579
+ lib_logger.error(console_log_message)
580
+
581
+ yield f"data: {json.dumps({'error': {'message': client_error_message, 'type': 'proxy_fatal_quota_error'}})}\n\n"
582
+ yield "data: [DONE]\n\n"
583
+ return
584
+
585
+ else:
586
+ # [MODIFIED] Do not yield to the client. Just log and break to rotate the key.
587
+ lib_logger.warning(f"Quota error on key ...{current_key[-4:]} (failure {consecutive_quota_failures}/3). Rotating key silently.")
588
+ break
 
 
589
 
590
+ else:
591
+ consecutive_quota_failures = 0
592
+ # [MODIFIED] Do not yield to the client. Just log and break to rotate the key.
593
+ lib_logger.warning(f"Key ...{current_key[-4:]} encountered a recoverable error ({classified_error.error_type}) during stream. Rotating key silently.")
594
+
595
+ if classified_error.error_type == 'rate_limit' and classified_error.status_code == 429:
596
+ cooldown_duration = classified_error.retry_after or 60
597
+ await self.cooldown_manager.start_cooldown(provider, cooldown_duration)
598
+ lib_logger.warning(f"IP-based rate limit detected for {provider}. Starting a {cooldown_duration}-second global cooldown.")
599
 
600
+ await self.usage_manager.record_failure(current_key, model, classified_error)
601
+ break
602
 
603
  except (APIConnectionError, litellm.InternalServerError, litellm.ServiceUnavailableError) as e:
604
+ consecutive_quota_failures = 0
605
  last_exception = e
606
  log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_headers=dict(request.headers) if request else {})
607
  classified_error = classify_error(e)
608
  await self.usage_manager.record_failure(current_key, model, classified_error)
609
 
610
  if attempt >= self.max_retries - 1:
611
+ lib_logger.warning(f"Key ...{current_key[-4:]} failed after max retries for model {model} due to a server error. Rotating key silently.")
612
+ # [MODIFIED] Do not yield to the client here.
 
 
 
 
 
 
 
 
 
613
  break
614
 
615
  wait_time = classified_error.retry_after or (1 * (2 ** attempt)) + random.uniform(0, 1)
 
624
  continue
625
 
626
  except Exception as e:
627
+ consecutive_quota_failures = 0
628
  last_exception = e
629
  log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_headers=dict(request.headers) if request else {})
630
  classified_error = classify_error(e)
631
 
 
 
 
 
 
 
 
 
 
 
 
 
632
  lib_logger.warning(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {str(e)}. Rotating key.")
633
 
634
  if classified_error.status_code == 429:
 
639
  if classified_error.error_type in ['invalid_request', 'context_window_exceeded', 'authentication']:
640
  raise last_exception
641
 
642
+ # [MODIFIED] Do not yield to the client here.
643
  await self.usage_manager.record_failure(current_key, model, classified_error)
644
  break
645
 
 
750
  for provider, models in all_provider_models.items():
751
  for model in models:
752
  flat_models.append(f"{provider}/{model}")
753
+ return flat_models
src/rotator_library/pyproject.toml CHANGED
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
4
 
5
  [project]
6
  name = "rotating-api-key-client"
7
- version = "0.6.7"
8
  authors = [
9
  { name="Mirrowel", email="nuh@uh.com" },
10
  ]
 
4
 
5
  [project]
6
  name = "rotating-api-key-client"
7
+ version = "0.8"
8
  authors = [
9
  { name="Mirrowel", email="nuh@uh.com" },
10
  ]