Mirrowel commited on
Commit
71586c6
·
unverified ·
2 Parent(s): d2adf05a1cc875

Merge pull request #14 from Mirrowel/Error-handling-consistency

Browse files

add structured error accumulator and consistent error handling/reporting

src/rotator_library/client.py CHANGED
@@ -25,6 +25,10 @@ from .error_handler import (
25
  classify_error,
26
  AllProviders,
27
  NoAvailableKeysError,
 
 
 
 
28
  )
29
  from .providers import PROVIDER_PLUGINS
30
  from .providers.openai_compatible_provider import OpenAICompatibleProvider
@@ -67,7 +71,7 @@ class RotatingClient:
67
  ):
68
  """
69
  Initialize the RotatingClient with intelligent credential rotation.
70
-
71
  Args:
72
  api_keys: Dictionary mapping provider names to lists of API keys
73
  oauth_credentials: Dictionary mapping provider names to OAuth credential paths
@@ -136,8 +140,7 @@ class RotatingClient:
136
  self.global_timeout = global_timeout
137
  self.abort_on_callback_error = abort_on_callback_error
138
  self.usage_manager = UsageManager(
139
- file_path=usage_file_path,
140
- rotation_tolerance=rotation_tolerance
141
  )
142
  self._model_list_cache = {}
143
  self._provider_plugins = PROVIDER_PLUGINS
@@ -156,7 +159,9 @@ class RotatingClient:
156
  # Validate all values are >= 1
157
  for provider, max_val in self.max_concurrent_requests_per_key.items():
158
  if max_val < 1:
159
- lib_logger.warning(f"Invalid max_concurrent for '{provider}': {max_val}. Setting to 1.")
 
 
160
  self.max_concurrent_requests_per_key[provider] = 1
161
 
162
  def _is_model_ignored(self, provider: str, model_id: str) -> bool:
@@ -364,7 +369,9 @@ class RotatingClient:
364
 
365
  return kwargs
366
 
367
- def _apply_default_safety_settings(self, litellm_kwargs: Dict[str, Any], provider: str):
 
 
368
  """
369
  Ensure default Gemini safety settings are present when calling the Gemini provider.
370
  This will not override any explicit settings provided by the request. It accepts
@@ -393,22 +400,33 @@ class RotatingClient:
393
  ]
394
 
395
  # If generic form is present, ensure missing generic keys are filled in
396
- if "safety_settings" in litellm_kwargs and isinstance(litellm_kwargs["safety_settings"], dict):
 
 
397
  for k, v in default_generic.items():
398
  if k not in litellm_kwargs["safety_settings"]:
399
  litellm_kwargs["safety_settings"][k] = v
400
  return
401
 
402
  # If Gemini form is present, ensure missing gemini categories are appended
403
- if "safetySettings" in litellm_kwargs and isinstance(litellm_kwargs["safetySettings"], list):
404
- present = {item.get("category") for item in litellm_kwargs["safetySettings"] if isinstance(item, dict)}
 
 
 
 
 
 
405
  for d in default_gemini:
406
  if d["category"] not in present:
407
  litellm_kwargs["safetySettings"].append(d)
408
  return
409
 
410
  # Neither present: set generic defaults so provider conversion will translate them
411
- if "safety_settings" not in litellm_kwargs and "safetySettings" not in litellm_kwargs:
 
 
 
412
  litellm_kwargs["safety_settings"] = default_generic.copy()
413
 
414
  def get_oauth_credentials(self) -> Dict[str, List[str]]:
@@ -426,10 +444,10 @@ class RotatingClient:
426
  """
427
  Lazily initializes and returns a provider instance.
428
  Only initializes providers that have configured credentials.
429
-
430
  Args:
431
  provider_name: The name of the provider to get an instance for.
432
-
433
  Returns:
434
  Provider instance if credentials exist, None otherwise.
435
  """
@@ -439,7 +457,7 @@ class RotatingClient:
439
  f"Skipping provider '{provider_name}' initialization: no credentials configured"
440
  )
441
  return None
442
-
443
  if provider_name not in self._provider_instances:
444
  if provider_name in self._provider_plugins:
445
  self._provider_instances[provider_name] = self._provider_plugins[
@@ -461,46 +479,47 @@ class RotatingClient:
461
  def _resolve_model_id(self, model: str, provider: str) -> str:
462
  """
463
  Resolves the actual model ID to send to the provider.
464
-
465
  For custom models with name/ID mappings, returns the ID.
466
  Otherwise, returns the model name unchanged.
467
-
468
  Args:
469
  model: Full model string with provider (e.g., "iflow/DS-v3.2")
470
  provider: Provider name (e.g., "iflow")
471
-
472
  Returns:
473
  Full model string with ID (e.g., "iflow/deepseek-v3.2")
474
  """
475
  # Extract model name from "provider/model_name" format
476
- model_name = model.split('/')[-1] if '/' in model else model
477
-
478
  # Try to get provider instance to check for model definitions
479
  provider_plugin = self._get_provider_instance(provider)
480
-
481
  # Check if provider has model definitions
482
- if provider_plugin and hasattr(provider_plugin, 'model_definitions'):
483
- model_id = provider_plugin.model_definitions.get_model_id(provider, model_name)
 
 
484
  if model_id and model_id != model_name:
485
  # Return with provider prefix
486
  return f"{provider}/{model_id}"
487
-
488
  # Fallback: use client's own model definitions
489
  model_id = self.model_definitions.get_model_id(provider, model_name)
490
  if model_id and model_id != model_name:
491
  return f"{provider}/{model_id}"
492
-
493
  # No conversion needed, return original
494
  return model
495
 
496
-
497
  async def _safe_streaming_wrapper(
498
  self, stream: Any, key: str, model: str, request: Optional[Any] = None
499
  ) -> AsyncGenerator[Any, None]:
500
  """
501
  A hybrid wrapper for streaming that buffers fragmented JSON, handles client disconnections gracefully,
502
  and distinguishes between content and streamed errors.
503
-
504
  FINISH_REASON HANDLING:
505
  Providers just translate chunks - this wrapper handles ALL finish_reason logic:
506
  1. Strip finish_reason from intermediate chunks (litellm defaults to "stop")
@@ -537,7 +556,7 @@ class RotatingClient:
537
  chunk_dict = chunk.model_dump()
538
  else:
539
  chunk_dict = chunk
540
-
541
  # === FINISH_REASON LOGIC ===
542
  # Providers send raw chunks without finish_reason logic.
543
  # This wrapper determines finish_reason based on accumulated state.
@@ -545,19 +564,19 @@ class RotatingClient:
545
  choice = chunk_dict["choices"][0]
546
  delta = choice.get("delta", {})
547
  usage = chunk_dict.get("usage", {})
548
-
549
  # Track tool_calls across ALL chunks - if we ever see one, finish_reason must be tool_calls
550
  if delta.get("tool_calls"):
551
  has_tool_calls = True
552
  accumulated_finish_reason = "tool_calls"
553
-
554
  # Detect final chunk: has usage with completion_tokens > 0
555
  has_completion_tokens = (
556
- usage and
557
- isinstance(usage, dict) and
558
- usage.get("completion_tokens", 0) > 0
559
  )
560
-
561
  if has_completion_tokens:
562
  # FINAL CHUNK: Determine correct finish_reason
563
  if has_tool_calls:
@@ -573,7 +592,7 @@ class RotatingClient:
573
  # INTERMEDIATE CHUNK: Never emit finish_reason
574
  # (litellm.ModelResponse defaults to "stop" which is wrong)
575
  choice["finish_reason"] = None
576
-
577
  yield f"data: {json.dumps(chunk_dict)}\n\n"
578
 
579
  if hasattr(chunk, "usage") and chunk.usage:
@@ -722,12 +741,13 @@ class RotatingClient:
722
  # multiple keys have the same usage stats.
723
  credentials_for_provider = list(self.all_credentials[provider])
724
  random.shuffle(credentials_for_provider)
725
-
726
  # Filter out credentials that are unavailable (queued for re-auth)
727
  provider_plugin = self._get_provider_instance(provider)
728
- if provider_plugin and hasattr(provider_plugin, 'is_credential_available'):
729
  available_creds = [
730
- cred for cred in credentials_for_provider
 
731
  if provider_plugin.is_credential_available(cred)
732
  ]
733
  if available_creds:
@@ -740,7 +760,7 @@ class RotatingClient:
740
  kwargs = self._convert_model_params(**kwargs)
741
 
742
  # The main rotation loop. It continues as long as there are untried credentials and the global deadline has not been exceeded.
743
-
744
  # Resolve model ID early, before any credential operations
745
  # This ensures consistent model ID usage for acquisition, release, and tracking
746
  resolved_model = self._resolve_model_id(model, provider)
@@ -748,10 +768,10 @@ class RotatingClient:
748
  lib_logger.info(f"Resolved model '{model}' to '{resolved_model}'")
749
  model = resolved_model
750
  kwargs["model"] = model # Ensure kwargs has the resolved model for litellm
751
-
752
  # [NEW] Filter by model tier requirement and build priority map
753
  credential_priorities = None
754
- if provider_plugin and hasattr(provider_plugin, 'get_model_tier_requirement'):
755
  required_tier = provider_plugin.get_model_tier_requirement(model)
756
  if required_tier is not None:
757
  # Filter OUT only credentials we KNOW are too low priority
@@ -759,9 +779,9 @@ class RotatingClient:
759
  incompatible_creds = []
760
  compatible_creds = []
761
  unknown_creds = []
762
-
763
  for cred in credentials_for_provider:
764
- if hasattr(provider_plugin, 'get_credential_priority'):
765
  priority = provider_plugin.get_credential_priority(cred)
766
  if priority is None:
767
  # Unknown priority - keep it, will be discovered on first use
@@ -775,7 +795,7 @@ class RotatingClient:
775
  else:
776
  # Provider doesn't support priorities - keep all
777
  unknown_creds.append(cred)
778
-
779
  # If we have any known-compatible or unknown credentials, use them
780
  tier_compatible_creds = compatible_creds + unknown_creds
781
  if tier_compatible_creds:
@@ -802,20 +822,25 @@ class RotatingClient:
802
  f"but all {len(incompatible_creds)} known credentials have priority > {required_tier}. "
803
  f"Request will likely fail."
804
  )
805
-
806
  # Build priority map for usage_manager
807
- if provider_plugin and hasattr(provider_plugin, 'get_credential_priority'):
808
  credential_priorities = {}
809
  for cred in credentials_for_provider:
810
  priority = provider_plugin.get_credential_priority(cred)
811
  if priority is not None:
812
  credential_priorities[cred] = priority
813
-
814
  if credential_priorities:
815
  lib_logger.debug(
816
- f"Credential priorities for {provider}: {', '.join(f'P{p}={len([c for c in credentials_for_provider if credential_priorities.get(c)==p])}' for p in sorted(set(credential_priorities.values())))}"
817
  )
818
 
 
 
 
 
 
819
  while (
820
  len(tried_creds) < len(credentials_for_provider) and time.time() < deadline
821
  ):
@@ -852,9 +877,11 @@ class RotatingClient:
852
  )
853
  max_concurrent = self.max_concurrent_requests_per_key.get(provider, 1)
854
  current_cred = await self.usage_manager.acquire_key(
855
- available_keys=creds_to_try, model=model, deadline=deadline,
 
 
856
  max_concurrent=max_concurrent,
857
- credential_priorities=credential_priorities
858
  )
859
  key_acquired = True
860
  tried_creds.add(current_cred)
@@ -937,10 +964,14 @@ class RotatingClient:
937
  if provider_instance:
938
  # Ensure default Gemini safety settings are present (without overriding request)
939
  try:
940
- self._apply_default_safety_settings(litellm_kwargs, provider)
 
 
941
  except Exception:
942
  # If anything goes wrong here, avoid breaking the request flow.
943
- lib_logger.debug("Could not apply default safety settings; continuing.")
 
 
944
 
945
  if "safety_settings" in litellm_kwargs:
946
  converted_settings = (
@@ -1023,8 +1054,14 @@ class RotatingClient:
1023
 
1024
  # Extract a clean error message for the user-facing log
1025
  error_message = str(e).split("\n")[0]
 
 
 
 
 
 
1026
  lib_logger.info(
1027
- f"Key ...{current_cred[-6:]} hit rate limit for model {model}. Reason: '{error_message}'. Rotating key."
1028
  )
1029
 
1030
  if classified_error.status_code == 429:
@@ -1032,16 +1069,10 @@ class RotatingClient:
1032
  await self.cooldown_manager.start_cooldown(
1033
  provider, cooldown_duration
1034
  )
1035
- lib_logger.warning(
1036
- f"IP-based rate limit detected for {provider}. Starting a {cooldown_duration}-second global cooldown."
1037
- )
1038
 
1039
  await self.usage_manager.record_failure(
1040
  current_cred, model, classified_error
1041
  )
1042
- lib_logger.warning(
1043
- f"Key ...{current_cred[-6:]} encountered a rate limit. Trying next key."
1044
- )
1045
  break # Move to the next key
1046
 
1047
  except (
@@ -1060,39 +1091,115 @@ class RotatingClient:
1060
  else {},
1061
  )
1062
  classified_error = classify_error(e)
 
 
1063
  # Provider-level error: don't increment consecutive failures
1064
  await self.usage_manager.record_failure(
1065
- current_cred, model, classified_error,
1066
- increment_consecutive_failures=False
 
 
1067
  )
1068
 
1069
  if attempt >= self.max_retries - 1:
1070
- error_message = str(e).split("\n")[0]
 
 
 
1071
  lib_logger.warning(
1072
- f"Key ...{current_cred[-6:]} failed after max retries for model {model} due to a server error. Reason: '{error_message}'. Rotating key."
1073
  )
1074
  break # Move to the next key
1075
 
1076
  # For temporary errors, wait before retrying with the same key.
1077
  wait_time = classified_error.retry_after or (
1078
- 1 * (2**attempt)
1079
  ) + random.uniform(0, 1)
1080
  remaining_budget = deadline - time.time()
1081
 
1082
  # If the required wait time exceeds the budget, don't wait; rotate to the next key immediately.
1083
  if wait_time > remaining_budget:
 
 
 
1084
  lib_logger.warning(
1085
- f"Required retry wait time ({wait_time:.2f}s) exceeds remaining budget ({remaining_budget:.2f}s). Rotating key early."
1086
  )
1087
  break
1088
 
1089
- error_message = str(e).split("\n")[0]
1090
  lib_logger.warning(
1091
- f"Key ...{current_cred[-6:]} encountered a server error for model {model}. Reason: '{error_message}'. Retrying in {wait_time:.2f}s."
1092
  )
1093
  await asyncio.sleep(wait_time)
1094
  continue # Retry with the same key
1095
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1096
  except Exception as e:
1097
  last_exception = e
1098
  log_failure(
@@ -1107,32 +1214,40 @@ class RotatingClient:
1107
 
1108
  if request and await request.is_disconnected():
1109
  lib_logger.warning(
1110
- f"Client disconnected. Aborting retries for credential ...{current_cred[-6:]}."
1111
  )
1112
  raise last_exception
1113
 
1114
  classified_error = classify_error(e)
1115
  error_message = str(e).split("\n")[0]
 
1116
  lib_logger.warning(
1117
- f"Key ...{current_cred[-6:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key."
1118
  )
1119
- if classified_error.status_code == 429:
 
 
 
 
 
 
1120
  cooldown_duration = classified_error.retry_after or 60
1121
  await self.cooldown_manager.start_cooldown(
1122
  provider, cooldown_duration
1123
  )
1124
- lib_logger.warning(
1125
- f"IP-based rate limit detected for {provider} from generic exception. Starting a {cooldown_duration}-second global cooldown."
1126
- )
1127
 
1128
- if classified_error.error_type in [
1129
- "invalid_request",
1130
- "context_window_exceeded",
1131
- "authentication",
1132
- ]:
1133
- # For these errors, we should not retry with other keys.
1134
  raise last_exception
1135
 
 
 
 
 
 
1136
  await self.usage_manager.record_failure(
1137
  current_cred, model, classified_error
1138
  )
@@ -1141,14 +1256,22 @@ class RotatingClient:
1141
  if key_acquired and current_cred:
1142
  await self.usage_manager.release_key(current_cred, model)
1143
 
1144
- if last_exception:
1145
- # Log the final error but do not raise it, as per the new requirement.
1146
- # The client should not see intermittent failures.
1147
- lib_logger.error(
1148
- f"Request failed after trying all keys or exceeding global timeout. Last error: {last_exception}"
1149
- )
 
 
 
 
1150
 
1151
- # Return None to indicate failure without propagating a disruptive exception.
 
 
 
 
1152
  return None
1153
 
1154
  async def _streaming_acompletion_with_retry(
@@ -1164,12 +1287,13 @@ class RotatingClient:
1164
  # Create a mutable copy of the keys and shuffle it.
1165
  credentials_for_provider = list(self.all_credentials[provider])
1166
  random.shuffle(credentials_for_provider)
1167
-
1168
  # Filter out credentials that are unavailable (queued for re-auth)
1169
  provider_plugin = self._get_provider_instance(provider)
1170
- if provider_plugin and hasattr(provider_plugin, 'is_credential_available'):
1171
  available_creds = [
1172
- cred for cred in credentials_for_provider
 
1173
  if provider_plugin.is_credential_available(cred)
1174
  ]
1175
  if available_creds:
@@ -1191,10 +1315,10 @@ class RotatingClient:
1191
  lib_logger.info(f"Resolved model '{model}' to '{resolved_model}'")
1192
  model = resolved_model
1193
  kwargs["model"] = model # Ensure kwargs has the resolved model for litellm
1194
-
1195
  # [NEW] Filter by model tier requirement and build priority map
1196
  credential_priorities = None
1197
- if provider_plugin and hasattr(provider_plugin, 'get_model_tier_requirement'):
1198
  required_tier = provider_plugin.get_model_tier_requirement(model)
1199
  if required_tier is not None:
1200
  # Filter OUT only credentials we KNOW are too low priority
@@ -1202,9 +1326,9 @@ class RotatingClient:
1202
  incompatible_creds = []
1203
  compatible_creds = []
1204
  unknown_creds = []
1205
-
1206
  for cred in credentials_for_provider:
1207
- if hasattr(provider_plugin, 'get_credential_priority'):
1208
  priority = provider_plugin.get_credential_priority(cred)
1209
  if priority is None:
1210
  # Unknown priority - keep it, will be discovered on first use
@@ -1218,7 +1342,7 @@ class RotatingClient:
1218
  else:
1219
  # Provider doesn't support priorities - keep all
1220
  unknown_creds.append(cred)
1221
-
1222
  # If we have any known-compatible or unknown credentials, use them
1223
  tier_compatible_creds = compatible_creds + unknown_creds
1224
  if tier_compatible_creds:
@@ -1245,20 +1369,25 @@ class RotatingClient:
1245
  f"but all {len(incompatible_creds)} known credentials have priority > {required_tier}. "
1246
  f"Request will likely fail."
1247
  )
1248
-
1249
  # Build priority map for usage_manager
1250
- if provider_plugin and hasattr(provider_plugin, 'get_credential_priority'):
1251
  credential_priorities = {}
1252
  for cred in credentials_for_provider:
1253
  priority = provider_plugin.get_credential_priority(cred)
1254
  if priority is not None:
1255
  credential_priorities[cred] = priority
1256
-
1257
  if credential_priorities:
1258
  lib_logger.debug(
1259
- f"Credential priorities for {provider}: {', '.join(f'P{p}={len([c for c in credentials_for_provider if credential_priorities.get(c)==p])}' for p in sorted(set(credential_priorities.values())))}"
1260
  )
1261
 
 
 
 
 
 
1262
  try:
1263
  while (
1264
  len(tried_creds) < len(credentials_for_provider)
@@ -1294,11 +1423,15 @@ class RotatingClient:
1294
  lib_logger.info(
1295
  f"Acquiring credential for model {model}. Tried credentials: {len(tried_creds)}/{len(credentials_for_provider)}"
1296
  )
1297
- max_concurrent = self.max_concurrent_requests_per_key.get(provider, 1)
 
 
1298
  current_cred = await self.usage_manager.acquire_key(
1299
- available_keys=creds_to_try, model=model, deadline=deadline,
 
 
1300
  max_concurrent=max_concurrent,
1301
- credential_priorities=credential_priorities
1302
  )
1303
  key_acquired = True
1304
  tried_creds.add(current_cred)
@@ -1402,21 +1535,51 @@ class RotatingClient:
1402
  litellm.RateLimitError,
1403
  httpx.HTTPStatusError,
1404
  ) as e:
1405
- if (
1406
- isinstance(e, httpx.HTTPStatusError)
1407
- and e.response.status_code != 429
1408
- ):
1409
- raise e
1410
-
1411
  last_exception = e
1412
  # If the exception is our custom wrapper, unwrap the original error
1413
  original_exc = getattr(e, "data", e)
1414
  classified_error = classify_error(original_exc)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1415
  await self.usage_manager.record_failure(
1416
  current_cred, model, classified_error
1417
  )
1418
  lib_logger.warning(
1419
- f"Credential ...{current_cred[-6:]} encountered a recoverable error ({classified_error.error_type}) during custom provider stream. Rotating key."
1420
  )
1421
  break
1422
 
@@ -1436,31 +1599,40 @@ class RotatingClient:
1436
  else {},
1437
  )
1438
  classified_error = classify_error(e)
 
 
1439
  # Provider-level error: don't increment consecutive failures
1440
  await self.usage_manager.record_failure(
1441
- current_cred, model, classified_error,
1442
- increment_consecutive_failures=False
 
 
1443
  )
1444
 
1445
  if attempt >= self.max_retries - 1:
 
 
 
1446
  lib_logger.warning(
1447
- f"Credential ...{current_cred[-6:]} failed after max retries for model {model} due to a server error. Rotating key."
1448
  )
1449
  break
1450
 
1451
  wait_time = classified_error.retry_after or (
1452
- 1 * (2**attempt)
1453
  ) + random.uniform(0, 1)
1454
  remaining_budget = deadline - time.time()
1455
  if wait_time > remaining_budget:
 
 
 
1456
  lib_logger.warning(
1457
- f"Required retry wait time ({wait_time:.2f}s) exceeds remaining budget ({remaining_budget:.2f}s). Rotating key early."
1458
  )
1459
  break
1460
 
1461
- error_message = str(e).split("\n")[0]
1462
  lib_logger.warning(
1463
- f"Credential ...{current_cred[-6:]} encountered a server error for model {model}. Reason: '{error_message}'. Retrying in {wait_time:.2f}s."
1464
  )
1465
  await asyncio.sleep(wait_time)
1466
  continue
@@ -1477,15 +1649,24 @@ class RotatingClient:
1477
  else {},
1478
  )
1479
  classified_error = classify_error(e)
 
 
 
 
 
 
 
1480
  lib_logger.warning(
1481
- f"Credential ...{current_cred[-6:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {str(e)}. Rotating key."
1482
  )
1483
- if classified_error.error_type in [
1484
- "invalid_request",
1485
- "context_window_exceeded",
1486
- "authentication",
1487
- ]:
 
1488
  raise last_exception
 
1489
  await self.usage_manager.record_failure(
1490
  current_cred, model, classified_error
1491
  )
@@ -1507,9 +1688,13 @@ class RotatingClient:
1507
  if provider_instance:
1508
  # Ensure default Gemini safety settings are present (without overriding request)
1509
  try:
1510
- self._apply_default_safety_settings(litellm_kwargs, provider)
 
 
1511
  except Exception:
1512
- lib_logger.debug("Could not apply default safety settings for streaming path; continuing.")
 
 
1513
 
1514
  if "safety_settings" in litellm_kwargs:
1515
  converted_settings = (
@@ -1590,7 +1775,11 @@ class RotatingClient:
1590
  yield chunk
1591
  return
1592
 
1593
- except (StreamedAPIError, litellm.RateLimitError) as e:
 
 
 
 
1594
  last_exception = e
1595
 
1596
  # This is the final, robust handler for streamed errors.
@@ -1600,24 +1789,26 @@ class RotatingClient:
1600
  original_exc = getattr(e, "data", e)
1601
  classified_error = classify_error(original_exc)
1602
 
 
 
 
 
 
 
 
1603
  try:
1604
  # The full error JSON is in the string representation of the exception.
1605
  json_str_match = re.search(
1606
  r"(\{.*\})", str(original_exc), re.DOTALL
1607
  )
1608
  if json_str_match:
1609
- # The string may contain byte-escaped characters (e.g., \\n).
1610
  cleaned_str = codecs.decode(
1611
  json_str_match.group(1), "unicode_escape"
1612
  )
1613
  error_payload = json.loads(cleaned_str)
1614
  except (json.JSONDecodeError, TypeError):
1615
- lib_logger.warning(
1616
- "Could not parse JSON details from streamed error exception."
1617
- )
1618
  error_payload = {}
1619
 
1620
- # Now, log the failure with the extracted raw response.
1621
  log_failure(
1622
  api_key=current_cred,
1623
  model=model,
@@ -1631,9 +1822,13 @@ class RotatingClient:
1631
 
1632
  error_details = error_payload.get("error", {})
1633
  error_status = error_details.get("status", "")
1634
- # Fallback to the full string if parsing fails.
1635
  error_message_text = error_details.get(
1636
- "message", str(original_exc)
 
 
 
 
 
1637
  )
1638
 
1639
  if (
@@ -1641,9 +1836,6 @@ class RotatingClient:
1641
  or "resource_exhausted" in error_status.lower()
1642
  ):
1643
  consecutive_quota_failures += 1
1644
- lib_logger.warning(
1645
- f"Credential ...{current_cred[-6:]} hit a quota limit. This is consecutive failure #{consecutive_quota_failures} for this request."
1646
- )
1647
 
1648
  quota_value = "N/A"
1649
  quota_id = "N/A"
@@ -1670,48 +1862,39 @@ class RotatingClient:
1670
  )
1671
 
1672
  if consecutive_quota_failures >= 3:
1673
- console_log_message = (
1674
- f"Terminating stream for credential ...{current_cred[-6:]} due to 3rd consecutive quota error. "
1675
- f"This is now considered a fatal input data error. ID: {quota_id}, Limit: {quota_value}."
1676
- )
1677
  client_error_message = (
1678
- "FATAL: Request failed after 3 consecutive quota errors, "
1679
- "indicating the input data is too large for the model's per-request limit. "
1680
- f"Last Error Message: '{error_message_text}'. Limit: {quota_value} (Quota ID: {quota_id})."
 
 
1681
  )
1682
- lib_logger.error(console_log_message)
1683
-
1684
  yield f"data: {json.dumps({'error': {'message': client_error_message, 'type': 'proxy_fatal_quota_error'}})}\n\n"
1685
  yield "data: [DONE]\n\n"
1686
  return
1687
-
1688
  else:
1689
- # [MODIFIED] Do not yield to the client. Just log and break to rotate the key.
1690
  lib_logger.warning(
1691
- f"Quota error on credential ...{current_cred[-6:]} (failure {consecutive_quota_failures}/3). Rotating key silently."
1692
  )
1693
  break
1694
 
1695
  else:
1696
  consecutive_quota_failures = 0
1697
- # [MODIFIED] Do not yield to the client. Just log and break to rotate the key.
1698
  lib_logger.warning(
1699
- f"Credential ...{current_cred[-6:]} encountered a recoverable error ({classified_error.error_type}) during stream. Rotating key silently."
1700
  )
1701
 
1702
- if (
1703
- classified_error.error_type == "rate_limit"
1704
- and classified_error.status_code == 429
1705
- ):
1706
  cooldown_duration = (
1707
  classified_error.retry_after or 60
1708
  )
1709
  await self.cooldown_manager.start_cooldown(
1710
  provider, cooldown_duration
1711
  )
1712
- lib_logger.warning(
1713
- f"IP-based rate limit detected for {provider}. Starting a {cooldown_duration}-second global cooldown."
1714
- )
1715
 
1716
  await self.usage_manager.record_failure(
1717
  current_cred, model, classified_error
@@ -1735,10 +1918,19 @@ class RotatingClient:
1735
  else {},
1736
  )
1737
  classified_error = classify_error(e)
 
 
 
 
 
 
 
1738
  # Provider-level error: don't increment consecutive failures
1739
  await self.usage_manager.record_failure(
1740
- current_cred, model, classified_error,
1741
- increment_consecutive_failures=False
 
 
1742
  )
1743
 
1744
  if attempt >= self.max_retries - 1:
@@ -1749,7 +1941,7 @@ class RotatingClient:
1749
  break
1750
 
1751
  wait_time = classified_error.retry_after or (
1752
- 1 * (2**attempt)
1753
  ) + random.uniform(0, 1)
1754
  remaining_budget = deadline - time.time()
1755
  if wait_time > remaining_budget:
@@ -1758,9 +1950,8 @@ class RotatingClient:
1758
  )
1759
  break
1760
 
1761
- error_message = str(e).split("\n")[0]
1762
  lib_logger.warning(
1763
- f"Credential ...{current_cred[-6:]} encountered a server error for model {model}. Reason: '{error_message}'. Retrying in {wait_time:.2f}s."
1764
  )
1765
  await asyncio.sleep(wait_time)
1766
  continue
@@ -1778,49 +1969,76 @@ class RotatingClient:
1778
  else {},
1779
  )
1780
  classified_error = classify_error(e)
 
 
 
 
 
 
1781
 
1782
  lib_logger.warning(
1783
- f"Credential ...{current_cred[-6:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {str(e)}. Rotating key."
1784
  )
1785
 
1786
- if classified_error.status_code == 429:
 
 
 
 
 
1787
  cooldown_duration = classified_error.retry_after or 60
1788
  await self.cooldown_manager.start_cooldown(
1789
  provider, cooldown_duration
1790
  )
1791
  lib_logger.warning(
1792
- f"IP-based rate limit detected for {provider} from generic stream exception. Starting a {cooldown_duration}-second global cooldown."
1793
  )
1794
 
1795
- if classified_error.error_type in [
1796
- "invalid_request",
1797
- "context_window_exceeded",
1798
- "authentication",
1799
- ]:
 
1800
  raise last_exception
1801
 
1802
- # [MODIFIED] Do not yield to the client here.
1803
  await self.usage_manager.record_failure(
1804
  current_cred, model, classified_error
1805
  )
 
 
 
1806
  break
1807
 
1808
  finally:
1809
  if key_acquired and current_cred:
1810
  await self.usage_manager.release_key(current_cred, model)
1811
 
1812
- final_error_message = "Failed to complete the streaming request: No available API keys after rotation or global timeout exceeded."
1813
- if last_exception:
1814
- final_error_message = f"Failed to complete the streaming request. Last error: {str(last_exception)}"
1815
- lib_logger.error(
1816
- f"Streaming request failed after trying all keys. Last error: {last_exception}"
1817
- )
 
 
 
 
1818
  else:
 
 
 
 
 
 
 
 
 
 
 
1819
  lib_logger.error(final_error_message)
1820
 
1821
- error_data = {
1822
- "error": {"message": final_error_message, "type": "proxy_error"}
1823
- }
1824
  yield f"data: {json.dumps(error_data)}\n\n"
1825
  yield "data: [DONE]\n\n"
1826
 
@@ -1868,11 +2086,13 @@ class RotatingClient:
1868
  # Handle iflow provider: remove stream_options to avoid HTTP 406
1869
  model = kwargs.get("model", "")
1870
  provider = model.split("/")[0] if "/" in model else ""
1871
-
1872
  if provider == "iflow" and "stream_options" in kwargs:
1873
- lib_logger.debug("Removing stream_options for iflow provider to avoid HTTP 406")
 
 
1874
  kwargs.pop("stream_options", None)
1875
-
1876
  if kwargs.get("stream"):
1877
  # Only add stream_options for providers that support it (excluding iflow)
1878
  if provider != "iflow":
@@ -1880,7 +2100,7 @@ class RotatingClient:
1880
  kwargs["stream_options"] = {}
1881
  if "include_usage" not in kwargs["stream_options"]:
1882
  kwargs["stream_options"]["include_usage"] = True
1883
-
1884
  return self._streaming_acompletion_with_retry(
1885
  request=request, pre_request_callback=pre_request_callback, **kwargs
1886
  )
 
25
  classify_error,
26
  AllProviders,
27
  NoAvailableKeysError,
28
+ should_rotate_on_error,
29
+ should_retry_same_key,
30
+ RequestErrorAccumulator,
31
+ mask_credential,
32
  )
33
  from .providers import PROVIDER_PLUGINS
34
  from .providers.openai_compatible_provider import OpenAICompatibleProvider
 
71
  ):
72
  """
73
  Initialize the RotatingClient with intelligent credential rotation.
74
+
75
  Args:
76
  api_keys: Dictionary mapping provider names to lists of API keys
77
  oauth_credentials: Dictionary mapping provider names to OAuth credential paths
 
140
  self.global_timeout = global_timeout
141
  self.abort_on_callback_error = abort_on_callback_error
142
  self.usage_manager = UsageManager(
143
+ file_path=usage_file_path, rotation_tolerance=rotation_tolerance
 
144
  )
145
  self._model_list_cache = {}
146
  self._provider_plugins = PROVIDER_PLUGINS
 
159
  # Validate all values are >= 1
160
  for provider, max_val in self.max_concurrent_requests_per_key.items():
161
  if max_val < 1:
162
+ lib_logger.warning(
163
+ f"Invalid max_concurrent for '{provider}': {max_val}. Setting to 1."
164
+ )
165
  self.max_concurrent_requests_per_key[provider] = 1
166
 
167
  def _is_model_ignored(self, provider: str, model_id: str) -> bool:
 
369
 
370
  return kwargs
371
 
372
+ def _apply_default_safety_settings(
373
+ self, litellm_kwargs: Dict[str, Any], provider: str
374
+ ):
375
  """
376
  Ensure default Gemini safety settings are present when calling the Gemini provider.
377
  This will not override any explicit settings provided by the request. It accepts
 
400
  ]
401
 
402
  # If generic form is present, ensure missing generic keys are filled in
403
+ if "safety_settings" in litellm_kwargs and isinstance(
404
+ litellm_kwargs["safety_settings"], dict
405
+ ):
406
  for k, v in default_generic.items():
407
  if k not in litellm_kwargs["safety_settings"]:
408
  litellm_kwargs["safety_settings"][k] = v
409
  return
410
 
411
  # If Gemini form is present, ensure missing gemini categories are appended
412
+ if "safetySettings" in litellm_kwargs and isinstance(
413
+ litellm_kwargs["safetySettings"], list
414
+ ):
415
+ present = {
416
+ item.get("category")
417
+ for item in litellm_kwargs["safetySettings"]
418
+ if isinstance(item, dict)
419
+ }
420
  for d in default_gemini:
421
  if d["category"] not in present:
422
  litellm_kwargs["safetySettings"].append(d)
423
  return
424
 
425
  # Neither present: set generic defaults so provider conversion will translate them
426
+ if (
427
+ "safety_settings" not in litellm_kwargs
428
+ and "safetySettings" not in litellm_kwargs
429
+ ):
430
  litellm_kwargs["safety_settings"] = default_generic.copy()
431
 
432
  def get_oauth_credentials(self) -> Dict[str, List[str]]:
 
444
  """
445
  Lazily initializes and returns a provider instance.
446
  Only initializes providers that have configured credentials.
447
+
448
  Args:
449
  provider_name: The name of the provider to get an instance for.
450
+
451
  Returns:
452
  Provider instance if credentials exist, None otherwise.
453
  """
 
457
  f"Skipping provider '{provider_name}' initialization: no credentials configured"
458
  )
459
  return None
460
+
461
  if provider_name not in self._provider_instances:
462
  if provider_name in self._provider_plugins:
463
  self._provider_instances[provider_name] = self._provider_plugins[
 
479
  def _resolve_model_id(self, model: str, provider: str) -> str:
480
  """
481
  Resolves the actual model ID to send to the provider.
482
+
483
  For custom models with name/ID mappings, returns the ID.
484
  Otherwise, returns the model name unchanged.
485
+
486
  Args:
487
  model: Full model string with provider (e.g., "iflow/DS-v3.2")
488
  provider: Provider name (e.g., "iflow")
489
+
490
  Returns:
491
  Full model string with ID (e.g., "iflow/deepseek-v3.2")
492
  """
493
  # Extract model name from "provider/model_name" format
494
+ model_name = model.split("/")[-1] if "/" in model else model
495
+
496
  # Try to get provider instance to check for model definitions
497
  provider_plugin = self._get_provider_instance(provider)
498
+
499
  # Check if provider has model definitions
500
+ if provider_plugin and hasattr(provider_plugin, "model_definitions"):
501
+ model_id = provider_plugin.model_definitions.get_model_id(
502
+ provider, model_name
503
+ )
504
  if model_id and model_id != model_name:
505
  # Return with provider prefix
506
  return f"{provider}/{model_id}"
507
+
508
  # Fallback: use client's own model definitions
509
  model_id = self.model_definitions.get_model_id(provider, model_name)
510
  if model_id and model_id != model_name:
511
  return f"{provider}/{model_id}"
512
+
513
  # No conversion needed, return original
514
  return model
515
 
 
516
  async def _safe_streaming_wrapper(
517
  self, stream: Any, key: str, model: str, request: Optional[Any] = None
518
  ) -> AsyncGenerator[Any, None]:
519
  """
520
  A hybrid wrapper for streaming that buffers fragmented JSON, handles client disconnections gracefully,
521
  and distinguishes between content and streamed errors.
522
+
523
  FINISH_REASON HANDLING:
524
  Providers just translate chunks - this wrapper handles ALL finish_reason logic:
525
  1. Strip finish_reason from intermediate chunks (litellm defaults to "stop")
 
556
  chunk_dict = chunk.model_dump()
557
  else:
558
  chunk_dict = chunk
559
+
560
  # === FINISH_REASON LOGIC ===
561
  # Providers send raw chunks without finish_reason logic.
562
  # This wrapper determines finish_reason based on accumulated state.
 
564
  choice = chunk_dict["choices"][0]
565
  delta = choice.get("delta", {})
566
  usage = chunk_dict.get("usage", {})
567
+
568
  # Track tool_calls across ALL chunks - if we ever see one, finish_reason must be tool_calls
569
  if delta.get("tool_calls"):
570
  has_tool_calls = True
571
  accumulated_finish_reason = "tool_calls"
572
+
573
  # Detect final chunk: has usage with completion_tokens > 0
574
  has_completion_tokens = (
575
+ usage
576
+ and isinstance(usage, dict)
577
+ and usage.get("completion_tokens", 0) > 0
578
  )
579
+
580
  if has_completion_tokens:
581
  # FINAL CHUNK: Determine correct finish_reason
582
  if has_tool_calls:
 
592
  # INTERMEDIATE CHUNK: Never emit finish_reason
593
  # (litellm.ModelResponse defaults to "stop" which is wrong)
594
  choice["finish_reason"] = None
595
+
596
  yield f"data: {json.dumps(chunk_dict)}\n\n"
597
 
598
  if hasattr(chunk, "usage") and chunk.usage:
 
741
  # multiple keys have the same usage stats.
742
  credentials_for_provider = list(self.all_credentials[provider])
743
  random.shuffle(credentials_for_provider)
744
+
745
  # Filter out credentials that are unavailable (queued for re-auth)
746
  provider_plugin = self._get_provider_instance(provider)
747
+ if provider_plugin and hasattr(provider_plugin, "is_credential_available"):
748
  available_creds = [
749
+ cred
750
+ for cred in credentials_for_provider
751
  if provider_plugin.is_credential_available(cred)
752
  ]
753
  if available_creds:
 
760
  kwargs = self._convert_model_params(**kwargs)
761
 
762
  # The main rotation loop. It continues as long as there are untried credentials and the global deadline has not been exceeded.
763
+
764
  # Resolve model ID early, before any credential operations
765
  # This ensures consistent model ID usage for acquisition, release, and tracking
766
  resolved_model = self._resolve_model_id(model, provider)
 
768
  lib_logger.info(f"Resolved model '{model}' to '{resolved_model}'")
769
  model = resolved_model
770
  kwargs["model"] = model # Ensure kwargs has the resolved model for litellm
771
+
772
  # [NEW] Filter by model tier requirement and build priority map
773
  credential_priorities = None
774
+ if provider_plugin and hasattr(provider_plugin, "get_model_tier_requirement"):
775
  required_tier = provider_plugin.get_model_tier_requirement(model)
776
  if required_tier is not None:
777
  # Filter OUT only credentials we KNOW are too low priority
 
779
  incompatible_creds = []
780
  compatible_creds = []
781
  unknown_creds = []
782
+
783
  for cred in credentials_for_provider:
784
+ if hasattr(provider_plugin, "get_credential_priority"):
785
  priority = provider_plugin.get_credential_priority(cred)
786
  if priority is None:
787
  # Unknown priority - keep it, will be discovered on first use
 
795
  else:
796
  # Provider doesn't support priorities - keep all
797
  unknown_creds.append(cred)
798
+
799
  # If we have any known-compatible or unknown credentials, use them
800
  tier_compatible_creds = compatible_creds + unknown_creds
801
  if tier_compatible_creds:
 
822
  f"but all {len(incompatible_creds)} known credentials have priority > {required_tier}. "
823
  f"Request will likely fail."
824
  )
825
+
826
  # Build priority map for usage_manager
827
+ if provider_plugin and hasattr(provider_plugin, "get_credential_priority"):
828
  credential_priorities = {}
829
  for cred in credentials_for_provider:
830
  priority = provider_plugin.get_credential_priority(cred)
831
  if priority is not None:
832
  credential_priorities[cred] = priority
833
+
834
  if credential_priorities:
835
  lib_logger.debug(
836
+ f"Credential priorities for {provider}: {', '.join(f'P{p}={len([c for c in credentials_for_provider if credential_priorities.get(c) == p])}' for p in sorted(set(credential_priorities.values())))}"
837
  )
838
 
839
+ # Initialize error accumulator for tracking errors across credential rotation
840
+ error_accumulator = RequestErrorAccumulator()
841
+ error_accumulator.model = model
842
+ error_accumulator.provider = provider
843
+
844
  while (
845
  len(tried_creds) < len(credentials_for_provider) and time.time() < deadline
846
  ):
 
877
  )
878
  max_concurrent = self.max_concurrent_requests_per_key.get(provider, 1)
879
  current_cred = await self.usage_manager.acquire_key(
880
+ available_keys=creds_to_try,
881
+ model=model,
882
+ deadline=deadline,
883
  max_concurrent=max_concurrent,
884
+ credential_priorities=credential_priorities,
885
  )
886
  key_acquired = True
887
  tried_creds.add(current_cred)
 
964
  if provider_instance:
965
  # Ensure default Gemini safety settings are present (without overriding request)
966
  try:
967
+ self._apply_default_safety_settings(
968
+ litellm_kwargs, provider
969
+ )
970
  except Exception:
971
  # If anything goes wrong here, avoid breaking the request flow.
972
+ lib_logger.debug(
973
+ "Could not apply default safety settings; continuing."
974
+ )
975
 
976
  if "safety_settings" in litellm_kwargs:
977
  converted_settings = (
 
1054
 
1055
  # Extract a clean error message for the user-facing log
1056
  error_message = str(e).split("\n")[0]
1057
+
1058
+ # Record in accumulator for client reporting
1059
+ error_accumulator.record_error(
1060
+ current_cred, classified_error, error_message
1061
+ )
1062
+
1063
  lib_logger.info(
1064
+ f"Key {mask_credential(current_cred)} hit rate limit for {model}. Rotating key."
1065
  )
1066
 
1067
  if classified_error.status_code == 429:
 
1069
  await self.cooldown_manager.start_cooldown(
1070
  provider, cooldown_duration
1071
  )
 
 
 
1072
 
1073
  await self.usage_manager.record_failure(
1074
  current_cred, model, classified_error
1075
  )
 
 
 
1076
  break # Move to the next key
1077
 
1078
  except (
 
1091
  else {},
1092
  )
1093
  classified_error = classify_error(e)
1094
+ error_message = str(e).split("\n")[0]
1095
+
1096
  # Provider-level error: don't increment consecutive failures
1097
  await self.usage_manager.record_failure(
1098
+ current_cred,
1099
+ model,
1100
+ classified_error,
1101
+ increment_consecutive_failures=False,
1102
  )
1103
 
1104
  if attempt >= self.max_retries - 1:
1105
+ # Record in accumulator only on final failure for this key
1106
+ error_accumulator.record_error(
1107
+ current_cred, classified_error, error_message
1108
+ )
1109
  lib_logger.warning(
1110
+ f"Key {mask_credential(current_cred)} failed after max retries due to server error. Rotating."
1111
  )
1112
  break # Move to the next key
1113
 
1114
  # For temporary errors, wait before retrying with the same key.
1115
  wait_time = classified_error.retry_after or (
1116
+ 2**attempt
1117
  ) + random.uniform(0, 1)
1118
  remaining_budget = deadline - time.time()
1119
 
1120
  # If the required wait time exceeds the budget, don't wait; rotate to the next key immediately.
1121
  if wait_time > remaining_budget:
1122
+ error_accumulator.record_error(
1123
+ current_cred, classified_error, error_message
1124
+ )
1125
  lib_logger.warning(
1126
+ f"Retry wait ({wait_time:.2f}s) exceeds budget ({remaining_budget:.2f}s). Rotating key."
1127
  )
1128
  break
1129
 
 
1130
  lib_logger.warning(
1131
+ f"Key {mask_credential(current_cred)} server error. Retrying in {wait_time:.2f}s."
1132
  )
1133
  await asyncio.sleep(wait_time)
1134
  continue # Retry with the same key
1135
 
1136
+ except httpx.HTTPStatusError as e:
1137
+ # Handle HTTP errors from httpx (e.g., from custom providers like Antigravity)
1138
+ last_exception = e
1139
+ log_failure(
1140
+ api_key=current_cred,
1141
+ model=model,
1142
+ attempt=attempt + 1,
1143
+ error=e,
1144
+ request_headers=dict(request.headers)
1145
+ if request
1146
+ else {},
1147
+ )
1148
+
1149
+ classified_error = classify_error(e)
1150
+ error_message = str(e).split("\n")[0]
1151
+
1152
+ lib_logger.warning(
1153
+ f"Key {mask_credential(current_cred)} HTTP {e.response.status_code} ({classified_error.error_type})."
1154
+ )
1155
+
1156
+ # Check if this error should trigger rotation
1157
+ if not should_rotate_on_error(classified_error):
1158
+ lib_logger.error(
1159
+ f"Non-recoverable error ({classified_error.error_type}). Failing request."
1160
+ )
1161
+ raise last_exception
1162
+
1163
+ # Record in accumulator after confirming it's a rotatable error
1164
+ error_accumulator.record_error(
1165
+ current_cred, classified_error, error_message
1166
+ )
1167
+
1168
+ # Handle rate limits with cooldown
1169
+ if classified_error.error_type in [
1170
+ "rate_limit",
1171
+ "quota_exceeded",
1172
+ ]:
1173
+ cooldown_duration = classified_error.retry_after or 60
1174
+ await self.cooldown_manager.start_cooldown(
1175
+ provider, cooldown_duration
1176
+ )
1177
+
1178
+ # Check if we should retry same key (server errors with retries left)
1179
+ if (
1180
+ should_retry_same_key(classified_error)
1181
+ and attempt < self.max_retries - 1
1182
+ ):
1183
+ wait_time = classified_error.retry_after or (
1184
+ 2**attempt
1185
+ ) + random.uniform(0, 1)
1186
+ remaining_budget = deadline - time.time()
1187
+ if wait_time <= remaining_budget:
1188
+ lib_logger.warning(
1189
+ f"Server error, retrying same key in {wait_time:.2f}s."
1190
+ )
1191
+ await asyncio.sleep(wait_time)
1192
+ continue
1193
+
1194
+ # Record failure and rotate to next key
1195
+ await self.usage_manager.record_failure(
1196
+ current_cred, model, classified_error
1197
+ )
1198
+ lib_logger.info(
1199
+ f"Rotating to next key after {classified_error.error_type} error."
1200
+ )
1201
+ break
1202
+
1203
  except Exception as e:
1204
  last_exception = e
1205
  log_failure(
 
1214
 
1215
  if request and await request.is_disconnected():
1216
  lib_logger.warning(
1217
+ f"Client disconnected. Aborting retries for {mask_credential(current_cred)}."
1218
  )
1219
  raise last_exception
1220
 
1221
  classified_error = classify_error(e)
1222
  error_message = str(e).split("\n")[0]
1223
+
1224
  lib_logger.warning(
1225
+ f"Key {mask_credential(current_cred)} {classified_error.error_type} (HTTP {classified_error.status_code})."
1226
  )
1227
+
1228
+ # Handle rate limits with cooldown
1229
+ if (
1230
+ classified_error.status_code == 429
1231
+ or classified_error.error_type
1232
+ in ["rate_limit", "quota_exceeded"]
1233
+ ):
1234
  cooldown_duration = classified_error.retry_after or 60
1235
  await self.cooldown_manager.start_cooldown(
1236
  provider, cooldown_duration
1237
  )
 
 
 
1238
 
1239
+ # Check if this error should trigger rotation
1240
+ if not should_rotate_on_error(classified_error):
1241
+ lib_logger.error(
1242
+ f"Non-recoverable error ({classified_error.error_type}). Failing request."
1243
+ )
 
1244
  raise last_exception
1245
 
1246
+ # Record in accumulator after confirming it's a rotatable error
1247
+ error_accumulator.record_error(
1248
+ current_cred, classified_error, error_message
1249
+ )
1250
+
1251
  await self.usage_manager.record_failure(
1252
  current_cred, model, classified_error
1253
  )
 
1256
  if key_acquired and current_cred:
1257
  await self.usage_manager.release_key(current_cred, model)
1258
 
1259
+ # Check if we exhausted all credentials or timed out
1260
+ if time.time() >= deadline:
1261
+ error_accumulator.timeout_occurred = True
1262
+
1263
+ if error_accumulator.has_errors():
1264
+ # Log concise summary for server logs
1265
+ lib_logger.error(error_accumulator.build_log_message())
1266
+
1267
+ # Return the structured error response for the client
1268
+ return error_accumulator.build_client_error_response()
1269
 
1270
+ # Return None to indicate failure without error details (shouldn't normally happen)
1271
+ lib_logger.warning(
1272
+ "Unexpected state: request failed with no recorded errors. "
1273
+ "This may indicate a logic error in error tracking."
1274
+ )
1275
  return None
1276
 
1277
  async def _streaming_acompletion_with_retry(
 
1287
  # Create a mutable copy of the keys and shuffle it.
1288
  credentials_for_provider = list(self.all_credentials[provider])
1289
  random.shuffle(credentials_for_provider)
1290
+
1291
  # Filter out credentials that are unavailable (queued for re-auth)
1292
  provider_plugin = self._get_provider_instance(provider)
1293
+ if provider_plugin and hasattr(provider_plugin, "is_credential_available"):
1294
  available_creds = [
1295
+ cred
1296
+ for cred in credentials_for_provider
1297
  if provider_plugin.is_credential_available(cred)
1298
  ]
1299
  if available_creds:
 
1315
  lib_logger.info(f"Resolved model '{model}' to '{resolved_model}'")
1316
  model = resolved_model
1317
  kwargs["model"] = model # Ensure kwargs has the resolved model for litellm
1318
+
1319
  # [NEW] Filter by model tier requirement and build priority map
1320
  credential_priorities = None
1321
+ if provider_plugin and hasattr(provider_plugin, "get_model_tier_requirement"):
1322
  required_tier = provider_plugin.get_model_tier_requirement(model)
1323
  if required_tier is not None:
1324
  # Filter OUT only credentials we KNOW are too low priority
 
1326
  incompatible_creds = []
1327
  compatible_creds = []
1328
  unknown_creds = []
1329
+
1330
  for cred in credentials_for_provider:
1331
+ if hasattr(provider_plugin, "get_credential_priority"):
1332
  priority = provider_plugin.get_credential_priority(cred)
1333
  if priority is None:
1334
  # Unknown priority - keep it, will be discovered on first use
 
1342
  else:
1343
  # Provider doesn't support priorities - keep all
1344
  unknown_creds.append(cred)
1345
+
1346
  # If we have any known-compatible or unknown credentials, use them
1347
  tier_compatible_creds = compatible_creds + unknown_creds
1348
  if tier_compatible_creds:
 
1369
  f"but all {len(incompatible_creds)} known credentials have priority > {required_tier}. "
1370
  f"Request will likely fail."
1371
  )
1372
+
1373
  # Build priority map for usage_manager
1374
+ if provider_plugin and hasattr(provider_plugin, "get_credential_priority"):
1375
  credential_priorities = {}
1376
  for cred in credentials_for_provider:
1377
  priority = provider_plugin.get_credential_priority(cred)
1378
  if priority is not None:
1379
  credential_priorities[cred] = priority
1380
+
1381
  if credential_priorities:
1382
  lib_logger.debug(
1383
+ f"Credential priorities for {provider}: {', '.join(f'P{p}={len([c for c in credentials_for_provider if credential_priorities.get(c) == p])}' for p in sorted(set(credential_priorities.values())))}"
1384
  )
1385
 
1386
+ # Initialize error accumulator for tracking errors across credential rotation
1387
+ error_accumulator = RequestErrorAccumulator()
1388
+ error_accumulator.model = model
1389
+ error_accumulator.provider = provider
1390
+
1391
  try:
1392
  while (
1393
  len(tried_creds) < len(credentials_for_provider)
 
1423
  lib_logger.info(
1424
  f"Acquiring credential for model {model}. Tried credentials: {len(tried_creds)}/{len(credentials_for_provider)}"
1425
  )
1426
+ max_concurrent = self.max_concurrent_requests_per_key.get(
1427
+ provider, 1
1428
+ )
1429
  current_cred = await self.usage_manager.acquire_key(
1430
+ available_keys=creds_to_try,
1431
+ model=model,
1432
+ deadline=deadline,
1433
  max_concurrent=max_concurrent,
1434
+ credential_priorities=credential_priorities,
1435
  )
1436
  key_acquired = True
1437
  tried_creds.add(current_cred)
 
1535
  litellm.RateLimitError,
1536
  httpx.HTTPStatusError,
1537
  ) as e:
 
 
 
 
 
 
1538
  last_exception = e
1539
  # If the exception is our custom wrapper, unwrap the original error
1540
  original_exc = getattr(e, "data", e)
1541
  classified_error = classify_error(original_exc)
1542
+ error_message = str(original_exc).split("\n")[0]
1543
+
1544
+ log_failure(
1545
+ api_key=current_cred,
1546
+ model=model,
1547
+ attempt=attempt + 1,
1548
+ error=e,
1549
+ request_headers=dict(request.headers)
1550
+ if request
1551
+ else {},
1552
+ )
1553
+
1554
+ # Record in accumulator for client reporting
1555
+ error_accumulator.record_error(
1556
+ current_cred, classified_error, error_message
1557
+ )
1558
+
1559
+ # Check if this error should trigger rotation
1560
+ if not should_rotate_on_error(classified_error):
1561
+ lib_logger.error(
1562
+ f"Non-recoverable error ({classified_error.error_type}) during custom stream. Failing."
1563
+ )
1564
+ raise last_exception
1565
+
1566
+ # Handle rate limits with cooldown
1567
+ if classified_error.error_type in [
1568
+ "rate_limit",
1569
+ "quota_exceeded",
1570
+ ]:
1571
+ cooldown_duration = (
1572
+ classified_error.retry_after or 60
1573
+ )
1574
+ await self.cooldown_manager.start_cooldown(
1575
+ provider, cooldown_duration
1576
+ )
1577
+
1578
  await self.usage_manager.record_failure(
1579
  current_cred, model, classified_error
1580
  )
1581
  lib_logger.warning(
1582
+ f"Cred {mask_credential(current_cred)} {classified_error.error_type} (HTTP {classified_error.status_code}). Rotating."
1583
  )
1584
  break
1585
 
 
1599
  else {},
1600
  )
1601
  classified_error = classify_error(e)
1602
+ error_message = str(e).split("\n")[0]
1603
+
1604
  # Provider-level error: don't increment consecutive failures
1605
  await self.usage_manager.record_failure(
1606
+ current_cred,
1607
+ model,
1608
+ classified_error,
1609
+ increment_consecutive_failures=False,
1610
  )
1611
 
1612
  if attempt >= self.max_retries - 1:
1613
+ error_accumulator.record_error(
1614
+ current_cred, classified_error, error_message
1615
+ )
1616
  lib_logger.warning(
1617
+ f"Cred {mask_credential(current_cred)} failed after max retries. Rotating."
1618
  )
1619
  break
1620
 
1621
  wait_time = classified_error.retry_after or (
1622
+ 2**attempt
1623
  ) + random.uniform(0, 1)
1624
  remaining_budget = deadline - time.time()
1625
  if wait_time > remaining_budget:
1626
+ error_accumulator.record_error(
1627
+ current_cred, classified_error, error_message
1628
+ )
1629
  lib_logger.warning(
1630
+ f"Retry wait ({wait_time:.2f}s) exceeds budget. Rotating."
1631
  )
1632
  break
1633
 
 
1634
  lib_logger.warning(
1635
+ f"Cred {mask_credential(current_cred)} server error. Retrying in {wait_time:.2f}s."
1636
  )
1637
  await asyncio.sleep(wait_time)
1638
  continue
 
1649
  else {},
1650
  )
1651
  classified_error = classify_error(e)
1652
+ error_message = str(e).split("\n")[0]
1653
+
1654
+ # Record in accumulator
1655
+ error_accumulator.record_error(
1656
+ current_cred, classified_error, error_message
1657
+ )
1658
+
1659
  lib_logger.warning(
1660
+ f"Cred {mask_credential(current_cred)} {classified_error.error_type} (HTTP {classified_error.status_code})."
1661
  )
1662
+
1663
+ # Check if this error should trigger rotation
1664
+ if not should_rotate_on_error(classified_error):
1665
+ lib_logger.error(
1666
+ f"Non-recoverable error ({classified_error.error_type}). Failing."
1667
+ )
1668
  raise last_exception
1669
+
1670
  await self.usage_manager.record_failure(
1671
  current_cred, model, classified_error
1672
  )
 
1688
  if provider_instance:
1689
  # Ensure default Gemini safety settings are present (without overriding request)
1690
  try:
1691
+ self._apply_default_safety_settings(
1692
+ litellm_kwargs, provider
1693
+ )
1694
  except Exception:
1695
+ lib_logger.debug(
1696
+ "Could not apply default safety settings for streaming path; continuing."
1697
+ )
1698
 
1699
  if "safety_settings" in litellm_kwargs:
1700
  converted_settings = (
 
1775
  yield chunk
1776
  return
1777
 
1778
+ except (
1779
+ StreamedAPIError,
1780
+ litellm.RateLimitError,
1781
+ httpx.HTTPStatusError,
1782
+ ) as e:
1783
  last_exception = e
1784
 
1785
  # This is the final, robust handler for streamed errors.
 
1789
  original_exc = getattr(e, "data", e)
1790
  classified_error = classify_error(original_exc)
1791
 
1792
+ # Check if this error should trigger rotation
1793
+ if not should_rotate_on_error(classified_error):
1794
+ lib_logger.error(
1795
+ f"Non-recoverable error ({classified_error.error_type}) during litellm stream. Failing."
1796
+ )
1797
+ raise last_exception
1798
+
1799
  try:
1800
  # The full error JSON is in the string representation of the exception.
1801
  json_str_match = re.search(
1802
  r"(\{.*\})", str(original_exc), re.DOTALL
1803
  )
1804
  if json_str_match:
 
1805
  cleaned_str = codecs.decode(
1806
  json_str_match.group(1), "unicode_escape"
1807
  )
1808
  error_payload = json.loads(cleaned_str)
1809
  except (json.JSONDecodeError, TypeError):
 
 
 
1810
  error_payload = {}
1811
 
 
1812
  log_failure(
1813
  api_key=current_cred,
1814
  model=model,
 
1822
 
1823
  error_details = error_payload.get("error", {})
1824
  error_status = error_details.get("status", "")
 
1825
  error_message_text = error_details.get(
1826
+ "message", str(original_exc).split("\n")[0]
1827
+ )
1828
+
1829
+ # Record in accumulator for client reporting
1830
+ error_accumulator.record_error(
1831
+ current_cred, classified_error, error_message_text
1832
  )
1833
 
1834
  if (
 
1836
  or "resource_exhausted" in error_status.lower()
1837
  ):
1838
  consecutive_quota_failures += 1
 
 
 
1839
 
1840
  quota_value = "N/A"
1841
  quota_id = "N/A"
 
1862
  )
1863
 
1864
  if consecutive_quota_failures >= 3:
1865
+ # Fatal: likely input data too large
 
 
 
1866
  client_error_message = (
1867
+ f"Request failed after 3 consecutive quota errors (input may be too large). "
1868
+ f"Limit: {quota_value} (Quota ID: {quota_id})"
1869
+ )
1870
+ lib_logger.error(
1871
+ f"Fatal quota error for {mask_credential(current_cred)}. ID: {quota_id}, Limit: {quota_value}"
1872
  )
 
 
1873
  yield f"data: {json.dumps({'error': {'message': client_error_message, 'type': 'proxy_fatal_quota_error'}})}\n\n"
1874
  yield "data: [DONE]\n\n"
1875
  return
 
1876
  else:
 
1877
  lib_logger.warning(
1878
+ f"Cred {mask_credential(current_cred)} quota error ({consecutive_quota_failures}/3). Rotating."
1879
  )
1880
  break
1881
 
1882
  else:
1883
  consecutive_quota_failures = 0
 
1884
  lib_logger.warning(
1885
+ f"Cred {mask_credential(current_cred)} {classified_error.error_type}. Rotating."
1886
  )
1887
 
1888
+ if classified_error.error_type in [
1889
+ "rate_limit",
1890
+ "quota_exceeded",
1891
+ ]:
1892
  cooldown_duration = (
1893
  classified_error.retry_after or 60
1894
  )
1895
  await self.cooldown_manager.start_cooldown(
1896
  provider, cooldown_duration
1897
  )
 
 
 
1898
 
1899
  await self.usage_manager.record_failure(
1900
  current_cred, model, classified_error
 
1918
  else {},
1919
  )
1920
  classified_error = classify_error(e)
1921
+ error_message_text = str(e).split("\n")[0]
1922
+
1923
+ # Record error in accumulator (server errors are transient, not abnormal)
1924
+ error_accumulator.record_error(
1925
+ current_cred, classified_error, error_message_text
1926
+ )
1927
+
1928
  # Provider-level error: don't increment consecutive failures
1929
  await self.usage_manager.record_failure(
1930
+ current_cred,
1931
+ model,
1932
+ classified_error,
1933
+ increment_consecutive_failures=False,
1934
  )
1935
 
1936
  if attempt >= self.max_retries - 1:
 
1941
  break
1942
 
1943
  wait_time = classified_error.retry_after or (
1944
+ 2**attempt
1945
  ) + random.uniform(0, 1)
1946
  remaining_budget = deadline - time.time()
1947
  if wait_time > remaining_budget:
 
1950
  )
1951
  break
1952
 
 
1953
  lib_logger.warning(
1954
+ f"Credential ...{current_cred[-6:]} encountered a server error for model {model}. Reason: '{error_message_text}'. Retrying in {wait_time:.2f}s."
1955
  )
1956
  await asyncio.sleep(wait_time)
1957
  continue
 
1969
  else {},
1970
  )
1971
  classified_error = classify_error(e)
1972
+ error_message_text = str(e).split("\n")[0]
1973
+
1974
+ # Record error in accumulator
1975
+ error_accumulator.record_error(
1976
+ current_cred, classified_error, error_message_text
1977
+ )
1978
 
1979
  lib_logger.warning(
1980
+ f"Credential ...{current_cred[-6:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message_text}."
1981
  )
1982
 
1983
+ # Handle rate limits with cooldown
1984
+ if (
1985
+ classified_error.status_code == 429
1986
+ or classified_error.error_type
1987
+ in ["rate_limit", "quota_exceeded"]
1988
+ ):
1989
  cooldown_duration = classified_error.retry_after or 60
1990
  await self.cooldown_manager.start_cooldown(
1991
  provider, cooldown_duration
1992
  )
1993
  lib_logger.warning(
1994
+ f"Rate limit detected for {provider}. Starting {cooldown_duration}s cooldown."
1995
  )
1996
 
1997
+ # Check if this error should trigger rotation
1998
+ if not should_rotate_on_error(classified_error):
1999
+ # Non-rotatable errors - fail immediately
2000
+ lib_logger.error(
2001
+ f"Non-recoverable error ({classified_error.error_type}). Failing request."
2002
+ )
2003
  raise last_exception
2004
 
2005
+ # Record failure and rotate to next key
2006
  await self.usage_manager.record_failure(
2007
  current_cred, model, classified_error
2008
  )
2009
+ lib_logger.info(
2010
+ f"Rotating to next key after {classified_error.error_type} error."
2011
+ )
2012
  break
2013
 
2014
  finally:
2015
  if key_acquired and current_cred:
2016
  await self.usage_manager.release_key(current_cred, model)
2017
 
2018
+ # Build detailed error response using error accumulator
2019
+ error_accumulator.timeout_occurred = time.time() >= deadline
2020
+
2021
+ if error_accumulator.has_errors():
2022
+ # Log concise summary for server logs
2023
+ lib_logger.error(error_accumulator.build_log_message())
2024
+
2025
+ # Build structured error response for client
2026
+ error_response = error_accumulator.build_client_error_response()
2027
+ error_data = error_response
2028
  else:
2029
+ # Fallback if no errors were recorded (shouldn't happen)
2030
+ final_error_message = (
2031
+ "Request failed: No available API keys after rotation or timeout."
2032
+ )
2033
+ if last_exception:
2034
+ final_error_message = (
2035
+ f"Request failed. Last error: {str(last_exception)}"
2036
+ )
2037
+ error_data = {
2038
+ "error": {"message": final_error_message, "type": "proxy_error"}
2039
+ }
2040
  lib_logger.error(final_error_message)
2041
 
 
 
 
2042
  yield f"data: {json.dumps(error_data)}\n\n"
2043
  yield "data: [DONE]\n\n"
2044
 
 
2086
  # Handle iflow provider: remove stream_options to avoid HTTP 406
2087
  model = kwargs.get("model", "")
2088
  provider = model.split("/")[0] if "/" in model else ""
2089
+
2090
  if provider == "iflow" and "stream_options" in kwargs:
2091
+ lib_logger.debug(
2092
+ "Removing stream_options for iflow provider to avoid HTTP 406"
2093
+ )
2094
  kwargs.pop("stream_options", None)
2095
+
2096
  if kwargs.get("stream"):
2097
  # Only add stream_options for providers that support it (excluding iflow)
2098
  if provider != "iflow":
 
2100
  kwargs["stream_options"] = {}
2101
  if "include_usage" not in kwargs["stream_options"]:
2102
  kwargs["stream_options"]["include_usage"] = True
2103
+
2104
  return self._streaming_acompletion_with_retry(
2105
  request=request, pre_request_callback=pre_request_callback, **kwargs
2106
  )
src/rotator_library/error_handler.py CHANGED
@@ -1,5 +1,6 @@
1
  import re
2
  import json
 
3
  from typing import Optional, Dict, Any
4
  import httpx
5
 
@@ -20,20 +21,20 @@ from litellm.exceptions import (
20
  def extract_retry_after_from_body(error_body: Optional[str]) -> Optional[int]:
21
  """
22
  Extract the retry-after time from an API error response body.
23
-
24
  Handles various error formats including:
25
  - Gemini CLI: "Your quota will reset after 39s."
26
  - Generic: "quota will reset after 120s", "retry after 60s"
27
-
28
  Args:
29
  error_body: The raw error response body
30
-
31
  Returns:
32
  The retry time in seconds, or None if not found
33
  """
34
  if not error_body:
35
  return None
36
-
37
  # Pattern to match various "reset after Xs" or "retry after Xs" formats
38
  patterns = [
39
  r"quota will reset after\s*(\d+)s",
@@ -41,7 +42,7 @@ def extract_retry_after_from_body(error_body: Optional[str]) -> Optional[int]:
41
  r"retry after\s*(\d+)s",
42
  r"try again in\s*(\d+)\s*seconds?",
43
  ]
44
-
45
  for pattern in patterns:
46
  match = re.search(pattern, error_body, re.IGNORECASE)
47
  if match:
@@ -49,7 +50,7 @@ def extract_retry_after_from_body(error_body: Optional[str]) -> Optional[int]:
49
  return int(match.group(1))
50
  except (ValueError, IndexError):
51
  continue
52
-
53
  return None
54
 
55
 
@@ -65,6 +66,227 @@ class PreRequestCallbackError(Exception):
65
  pass
66
 
67
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
  class ClassifiedError:
69
  """A structured representation of a classified error."""
70
 
@@ -94,7 +316,7 @@ def get_retry_after(error: Exception) -> Optional[int]:
94
  if isinstance(error, httpx.HTTPStatusError):
95
  headers = error.response.headers
96
  # Check standard Retry-After header (case-insensitive)
97
- retry_header = headers.get('retry-after') or headers.get('Retry-After')
98
  if retry_header:
99
  try:
100
  return int(retry_header) # Assumes seconds format
@@ -102,10 +324,13 @@ def get_retry_after(error: Exception) -> Optional[int]:
102
  pass # Might be HTTP date format, skip for now
103
 
104
  # Check X-RateLimit-Reset header (Unix timestamp)
105
- reset_header = headers.get('x-ratelimit-reset') or headers.get('X-RateLimit-Reset')
 
 
106
  if reset_header:
107
  try:
108
  import time
 
109
  reset_timestamp = int(reset_header)
110
  current_time = int(time.time())
111
  wait_seconds = reset_timestamp - current_time
@@ -155,16 +380,16 @@ def get_retry_after(error: Exception) -> Optional[int]:
155
  continue
156
 
157
  # 3. Handle duration formats like "60s", "2m", "1h"
158
- duration_match = re.search(r'(\d+)\s*([smh])', error_str)
159
  if duration_match:
160
  try:
161
  value = int(duration_match.group(1))
162
  unit = duration_match.group(2)
163
- if unit == 's':
164
  return value
165
- elif unit == 'm':
166
  return value * 60
167
- elif unit == 'h':
168
  return value * 3600
169
  except (ValueError, IndexError):
170
  pass
@@ -179,15 +404,15 @@ def get_retry_after(error: Exception) -> Optional[int]:
179
  if value.isdigit():
180
  return int(value)
181
  # Handle "60s", "2m" format in attribute
182
- duration_match = re.search(r'(\d+)\s*([smh])', value.lower())
183
  if duration_match:
184
  val = int(duration_match.group(1))
185
  unit = duration_match.group(2)
186
- if unit == 's':
187
  return val
188
- elif unit == 'm':
189
  return val * 60
190
- elif unit == 'h':
191
  return val * 3600
192
 
193
  return None
@@ -197,25 +422,89 @@ def classify_error(e: Exception) -> ClassifiedError:
197
  """
198
  Classifies an exception into a structured ClassifiedError object.
199
  Now handles both litellm and httpx exceptions.
 
 
 
 
 
 
 
 
 
 
 
200
  """
201
  status_code = getattr(e, "status_code", None)
 
202
  if isinstance(e, httpx.HTTPStatusError): # [NEW] Handle httpx errors first
203
  status_code = e.response.status_code
 
 
 
 
 
 
 
204
  if status_code == 401:
205
  return ClassifiedError(
206
  error_type="authentication",
207
  original_exception=e,
208
  status_code=status_code,
209
  )
 
 
 
 
 
 
 
 
210
  if status_code == 429:
211
  retry_after = get_retry_after(e)
 
 
 
 
 
 
 
 
212
  return ClassifiedError(
213
  error_type="rate_limit",
214
  original_exception=e,
215
  status_code=status_code,
216
  retry_after=retry_after,
217
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
218
  if 400 <= status_code < 500:
 
219
  return ClassifiedError(
220
  error_type="invalid_request",
221
  original_exception=e,
@@ -313,6 +602,52 @@ def is_unrecoverable_error(e: Exception) -> bool:
313
  return isinstance(e, (InvalidRequestError, AuthenticationError, BadRequestError))
314
 
315
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
316
  class AllProviders:
317
  """
318
  A class to handle provider-specific settings, such as custom API bases.
 
1
  import re
2
  import json
3
+ import os
4
  from typing import Optional, Dict, Any
5
  import httpx
6
 
 
21
  def extract_retry_after_from_body(error_body: Optional[str]) -> Optional[int]:
22
  """
23
  Extract the retry-after time from an API error response body.
24
+
25
  Handles various error formats including:
26
  - Gemini CLI: "Your quota will reset after 39s."
27
  - Generic: "quota will reset after 120s", "retry after 60s"
28
+
29
  Args:
30
  error_body: The raw error response body
31
+
32
  Returns:
33
  The retry time in seconds, or None if not found
34
  """
35
  if not error_body:
36
  return None
37
+
38
  # Pattern to match various "reset after Xs" or "retry after Xs" formats
39
  patterns = [
40
  r"quota will reset after\s*(\d+)s",
 
42
  r"retry after\s*(\d+)s",
43
  r"try again in\s*(\d+)\s*seconds?",
44
  ]
45
+
46
  for pattern in patterns:
47
  match = re.search(pattern, error_body, re.IGNORECASE)
48
  if match:
 
50
  return int(match.group(1))
51
  except (ValueError, IndexError):
52
  continue
53
+
54
  return None
55
 
56
 
 
66
  pass
67
 
68
 
69
+ # =============================================================================
70
+ # ERROR TRACKING FOR CLIENT REPORTING
71
+ # =============================================================================
72
+
73
+ # Abnormal errors that require attention and should always be reported to client
74
+ ABNORMAL_ERROR_TYPES = frozenset(
75
+ {
76
+ "forbidden", # 403 - credential access issue
77
+ "authentication", # 401 - credential invalid/revoked
78
+ "pre_request_callback_error", # Internal proxy error
79
+ }
80
+ )
81
+
82
+ # Normal/expected errors during operation - only report if ALL credentials fail
83
+ NORMAL_ERROR_TYPES = frozenset(
84
+ {
85
+ "rate_limit", # 429 - expected during high load
86
+ "quota_exceeded", # Expected when quota runs out
87
+ "server_error", # 5xx - transient provider issues
88
+ "api_connection", # Network issues - transient
89
+ }
90
+ )
91
+
92
+
93
+ def is_abnormal_error(classified_error: "ClassifiedError") -> bool:
94
+ """
95
+ Check if an error is abnormal and should be reported to the client.
96
+
97
+ Abnormal errors indicate credential issues that need attention:
98
+ - 403 Forbidden: Credential doesn't have access
99
+ - 401 Unauthorized: Credential is invalid/revoked
100
+
101
+ Normal errors are expected during operation:
102
+ - 429 Rate limit: Expected during high load
103
+ - 5xx Server errors: Transient provider issues
104
+ """
105
+ return classified_error.error_type in ABNORMAL_ERROR_TYPES
106
+
107
+
108
+ def mask_credential(credential: str) -> str:
109
+ """
110
+ Mask a credential for safe display in logs and error messages.
111
+
112
+ - For API keys: shows last 6 characters (e.g., "...xyz123")
113
+ - For OAuth file paths: shows just the filename (e.g., "antigravity_oauth_1.json")
114
+ """
115
+ if os.path.isfile(credential):
116
+ return os.path.basename(credential)
117
+ elif len(credential) > 6:
118
+ return f"...{credential[-6:]}"
119
+ else:
120
+ return "***"
121
+
122
+
123
+ class RequestErrorAccumulator:
124
+ """
125
+ Tracks errors encountered during a request's credential rotation cycle.
126
+
127
+ Used to build informative error messages for clients when all credentials
128
+ are exhausted. Distinguishes between abnormal errors (that need attention)
129
+ and normal errors (expected during operation).
130
+ """
131
+
132
+ def __init__(self):
133
+ self.abnormal_errors: list = [] # 403, 401 - always report details
134
+ self.normal_errors: list = [] # 429, 5xx - summarize only
135
+ self._tried_credentials: set = set() # Track unique credentials
136
+ self.timeout_occurred: bool = False
137
+ self.model: str = ""
138
+ self.provider: str = ""
139
+
140
+ def record_error(
141
+ self, credential: str, classified_error: "ClassifiedError", error_message: str
142
+ ):
143
+ """Record an error for a credential."""
144
+ self._tried_credentials.add(credential)
145
+ masked_cred = mask_credential(credential)
146
+
147
+ error_record = {
148
+ "credential": masked_cred,
149
+ "error_type": classified_error.error_type,
150
+ "status_code": classified_error.status_code,
151
+ "message": self._truncate_message(error_message, 150),
152
+ }
153
+
154
+ if is_abnormal_error(classified_error):
155
+ self.abnormal_errors.append(error_record)
156
+ else:
157
+ self.normal_errors.append(error_record)
158
+
159
+ @property
160
+ def total_credentials_tried(self) -> int:
161
+ """Return the number of unique credentials tried."""
162
+ return len(self._tried_credentials)
163
+
164
+ def _truncate_message(self, message: str, max_length: int = 150) -> str:
165
+ """Truncate error message for readability."""
166
+ # Take first line and truncate
167
+ first_line = message.split("\n")[0]
168
+ if len(first_line) > max_length:
169
+ return first_line[:max_length] + "..."
170
+ return first_line
171
+
172
+ def has_errors(self) -> bool:
173
+ """Check if any errors were recorded."""
174
+ return bool(self.abnormal_errors or self.normal_errors)
175
+
176
+ def has_abnormal_errors(self) -> bool:
177
+ """Check if any abnormal errors were recorded."""
178
+ return bool(self.abnormal_errors)
179
+
180
+ def get_normal_error_summary(self) -> str:
181
+ """Get a summary of normal errors (not individual details)."""
182
+ if not self.normal_errors:
183
+ return ""
184
+
185
+ # Count by type
186
+ counts = {}
187
+ for err in self.normal_errors:
188
+ err_type = err["error_type"]
189
+ counts[err_type] = counts.get(err_type, 0) + 1
190
+
191
+ # Build summary like "3 rate_limit, 1 server_error"
192
+ parts = [f"{count} {err_type}" for err_type, count in counts.items()]
193
+ return ", ".join(parts)
194
+
195
+ def build_client_error_response(self) -> dict:
196
+ """
197
+ Build a structured error response for the client.
198
+
199
+ Returns a dict suitable for JSON serialization in the error response.
200
+ """
201
+ # Determine the primary failure reason
202
+ if self.timeout_occurred:
203
+ error_type = "proxy_timeout"
204
+ base_message = f"Request timed out after trying {self.total_credentials_tried} credential(s)"
205
+ else:
206
+ error_type = "proxy_all_credentials_exhausted"
207
+ base_message = f"All {self.total_credentials_tried} credential(s) exhausted for {self.provider}"
208
+
209
+ # Build human-readable message
210
+ message_parts = [base_message]
211
+
212
+ if self.abnormal_errors:
213
+ message_parts.append("\n\nCredential issues (require attention):")
214
+ for err in self.abnormal_errors:
215
+ status = (
216
+ f"HTTP {err['status_code']}"
217
+ if err["status_code"] is not None
218
+ else err["error_type"]
219
+ )
220
+ message_parts.append(
221
+ f"\n • {err['credential']}: {status} - {err['message']}"
222
+ )
223
+
224
+ normal_summary = self.get_normal_error_summary()
225
+ if normal_summary:
226
+ if self.abnormal_errors:
227
+ message_parts.append(
228
+ f"\n\nAdditionally: {normal_summary} (expected during normal operation)"
229
+ )
230
+ else:
231
+ message_parts.append(f"\n\nAll failures were: {normal_summary}")
232
+ message_parts.append(
233
+ "\nThis is normal during high load - retry later or add more credentials."
234
+ )
235
+
236
+ response = {
237
+ "error": {
238
+ "message": "".join(message_parts),
239
+ "type": error_type,
240
+ "details": {
241
+ "model": self.model,
242
+ "provider": self.provider,
243
+ "credentials_tried": self.total_credentials_tried,
244
+ "timeout": self.timeout_occurred,
245
+ },
246
+ }
247
+ }
248
+
249
+ # Only include abnormal errors in details (they need attention)
250
+ if self.abnormal_errors:
251
+ response["error"]["details"]["abnormal_errors"] = self.abnormal_errors
252
+
253
+ # Include summary of normal errors
254
+ if normal_summary:
255
+ response["error"]["details"]["normal_error_summary"] = normal_summary
256
+
257
+ return response
258
+
259
+ def build_log_message(self) -> str:
260
+ """
261
+ Build a concise log message for server-side logging.
262
+
263
+ Shorter than client message, suitable for terminal display.
264
+ """
265
+ parts = []
266
+
267
+ if self.timeout_occurred:
268
+ parts.append(
269
+ f"TIMEOUT: {self.total_credentials_tried} creds tried for {self.model}"
270
+ )
271
+ else:
272
+ parts.append(
273
+ f"ALL CREDS EXHAUSTED: {self.total_credentials_tried} tried for {self.model}"
274
+ )
275
+
276
+ if self.abnormal_errors:
277
+ abnormal_summary = ", ".join(
278
+ f"{e['credential']}={e['status_code'] or e['error_type']}"
279
+ for e in self.abnormal_errors
280
+ )
281
+ parts.append(f"ISSUES: {abnormal_summary}")
282
+
283
+ normal_summary = self.get_normal_error_summary()
284
+ if normal_summary:
285
+ parts.append(f"Normal: {normal_summary}")
286
+
287
+ return " | ".join(parts)
288
+
289
+
290
  class ClassifiedError:
291
  """A structured representation of a classified error."""
292
 
 
316
  if isinstance(error, httpx.HTTPStatusError):
317
  headers = error.response.headers
318
  # Check standard Retry-After header (case-insensitive)
319
+ retry_header = headers.get("retry-after") or headers.get("Retry-After")
320
  if retry_header:
321
  try:
322
  return int(retry_header) # Assumes seconds format
 
324
  pass # Might be HTTP date format, skip for now
325
 
326
  # Check X-RateLimit-Reset header (Unix timestamp)
327
+ reset_header = headers.get("x-ratelimit-reset") or headers.get(
328
+ "X-RateLimit-Reset"
329
+ )
330
  if reset_header:
331
  try:
332
  import time
333
+
334
  reset_timestamp = int(reset_header)
335
  current_time = int(time.time())
336
  wait_seconds = reset_timestamp - current_time
 
380
  continue
381
 
382
  # 3. Handle duration formats like "60s", "2m", "1h"
383
+ duration_match = re.search(r"(\d+)\s*([smh])", error_str)
384
  if duration_match:
385
  try:
386
  value = int(duration_match.group(1))
387
  unit = duration_match.group(2)
388
+ if unit == "s":
389
  return value
390
+ elif unit == "m":
391
  return value * 60
392
+ elif unit == "h":
393
  return value * 3600
394
  except (ValueError, IndexError):
395
  pass
 
404
  if value.isdigit():
405
  return int(value)
406
  # Handle "60s", "2m" format in attribute
407
+ duration_match = re.search(r"(\d+)\s*([smh])", value.lower())
408
  if duration_match:
409
  val = int(duration_match.group(1))
410
  unit = duration_match.group(2)
411
+ if unit == "s":
412
  return val
413
+ elif unit == "m":
414
  return val * 60
415
+ elif unit == "h":
416
  return val * 3600
417
 
418
  return None
 
422
  """
423
  Classifies an exception into a structured ClassifiedError object.
424
  Now handles both litellm and httpx exceptions.
425
+
426
+ Error types and their typical handling:
427
+ - rate_limit (429): Rotate key, may retry with backoff
428
+ - server_error (5xx): Retry with backoff, then rotate
429
+ - forbidden (403): Rotate key immediately (access denied for this credential)
430
+ - authentication (401): Rotate key, trigger re-auth if OAuth
431
+ - quota_exceeded: Rotate key (credential quota exhausted)
432
+ - invalid_request (400): Don't retry - client error in request
433
+ - context_window_exceeded: Don't retry - request too large
434
+ - api_connection: Retry with backoff, then rotate
435
+ - unknown: Rotate key (safer to try another)
436
  """
437
  status_code = getattr(e, "status_code", None)
438
+
439
  if isinstance(e, httpx.HTTPStatusError): # [NEW] Handle httpx errors first
440
  status_code = e.response.status_code
441
+
442
+ # Try to get error body for better classification
443
+ try:
444
+ error_body = e.response.text.lower() if hasattr(e.response, "text") else ""
445
+ except Exception:
446
+ error_body = ""
447
+
448
  if status_code == 401:
449
  return ClassifiedError(
450
  error_type="authentication",
451
  original_exception=e,
452
  status_code=status_code,
453
  )
454
+ if status_code == 403:
455
+ # 403 Forbidden - credential doesn't have access, should rotate
456
+ # Could be: IP restriction, account disabled, permission denied, etc.
457
+ return ClassifiedError(
458
+ error_type="forbidden",
459
+ original_exception=e,
460
+ status_code=status_code,
461
+ )
462
  if status_code == 429:
463
  retry_after = get_retry_after(e)
464
+ # Check if this is a quota error vs rate limit
465
+ if "quota" in error_body or "resource_exhausted" in error_body:
466
+ return ClassifiedError(
467
+ error_type="quota_exceeded",
468
+ original_exception=e,
469
+ status_code=status_code,
470
+ retry_after=retry_after,
471
+ )
472
  return ClassifiedError(
473
  error_type="rate_limit",
474
  original_exception=e,
475
  status_code=status_code,
476
  retry_after=retry_after,
477
  )
478
+ if status_code == 400:
479
+ # Check for context window / token limit errors with more specific patterns
480
+ if any(
481
+ pattern in error_body
482
+ for pattern in [
483
+ "context_length",
484
+ "max_tokens",
485
+ "token limit",
486
+ "context window",
487
+ "too many tokens",
488
+ "too long",
489
+ ]
490
+ ):
491
+ return ClassifiedError(
492
+ error_type="context_window_exceeded",
493
+ original_exception=e,
494
+ status_code=status_code,
495
+ )
496
+ return ClassifiedError(
497
+ error_type="invalid_request",
498
+ original_exception=e,
499
+ status_code=status_code,
500
+ )
501
+ return ClassifiedError(
502
+ error_type="invalid_request",
503
+ original_exception=e,
504
+ status_code=status_code,
505
+ )
506
  if 400 <= status_code < 500:
507
+ # Other 4xx errors - generally client errors
508
  return ClassifiedError(
509
  error_type="invalid_request",
510
  original_exception=e,
 
602
  return isinstance(e, (InvalidRequestError, AuthenticationError, BadRequestError))
603
 
604
 
605
+ def should_rotate_on_error(classified_error: ClassifiedError) -> bool:
606
+ """
607
+ Determines if an error should trigger key rotation.
608
+
609
+ Errors that SHOULD rotate (try another key):
610
+ - rate_limit: Current key is throttled
611
+ - quota_exceeded: Current key/account exhausted
612
+ - forbidden: Current credential denied access
613
+ - authentication: Current credential invalid
614
+ - server_error: Provider having issues (might work with different endpoint/key)
615
+ - api_connection: Network issues (might be transient)
616
+ - unknown: Safer to try another key
617
+
618
+ Errors that should NOT rotate (fail immediately):
619
+ - invalid_request: Client error in request payload (won't help to retry)
620
+ - context_window_exceeded: Request too large (won't help to retry)
621
+ - pre_request_callback_error: Internal proxy error
622
+
623
+ Returns:
624
+ True if should rotate to next key, False if should fail immediately
625
+ """
626
+ non_rotatable_errors = {
627
+ "invalid_request",
628
+ "context_window_exceeded",
629
+ "pre_request_callback_error",
630
+ }
631
+ return classified_error.error_type not in non_rotatable_errors
632
+
633
+
634
+ def should_retry_same_key(classified_error: ClassifiedError) -> bool:
635
+ """
636
+ Determines if an error should retry with the same key (with backoff).
637
+
638
+ Only server errors and connection issues should retry the same key,
639
+ as these are often transient.
640
+
641
+ Returns:
642
+ True if should retry same key, False if should rotate immediately
643
+ """
644
+ retryable_errors = {
645
+ "server_error",
646
+ "api_connection",
647
+ }
648
+ return classified_error.error_type in retryable_errors
649
+
650
+
651
  class AllProviders:
652
  """
653
  A class to handle provider-specific settings, such as custom API bases.
src/rotator_library/failure_logger.py CHANGED
@@ -4,6 +4,7 @@ from logging.handlers import RotatingFileHandler
4
  import os
5
  from datetime import datetime
6
 
 
7
  def setup_failure_logger():
8
  """Sets up a dedicated JSON logger for writing detailed failure logs to a file."""
9
  log_dir = "logs"
@@ -12,15 +13,15 @@ def setup_failure_logger():
12
 
13
  # Create a logger specifically for failures.
14
  # This logger will NOT propagate to the root logger.
15
- logger = logging.getLogger('failure_logger')
16
  logger.setLevel(logging.INFO)
17
  logger.propagate = False
18
 
19
  # Use a rotating file handler
20
  handler = RotatingFileHandler(
21
- os.path.join(log_dir, 'failures.log'),
22
- maxBytes=5*1024*1024, # 5 MB
23
- backupCount=2
24
  )
25
 
26
  # Custom JSON formatter for structured logs
@@ -30,45 +31,124 @@ def setup_failure_logger():
30
  return json.dumps(record.msg)
31
 
32
  handler.setFormatter(JsonFormatter())
33
-
34
  # Add handler only if it hasn't been added before
35
  if not logger.handlers:
36
  logger.addHandler(handler)
37
 
38
  return logger
39
 
 
40
  # Initialize the dedicated logger for detailed failure logs
41
  failure_logger = setup_failure_logger()
42
 
43
  # Get the main library logger for concise, propagated messages
44
- main_lib_logger = logging.getLogger('rotator_library')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
 
46
- def log_failure(api_key: str, model: str, attempt: int, error: Exception, request_headers: dict, raw_response_text: str = None):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  """
48
  Logs a detailed failure message to a file and a concise summary to the main logger.
 
 
 
 
 
 
 
 
49
  """
50
  # 1. Log the full, detailed error to the dedicated failures.log file
51
  # Prioritize the explicitly passed raw response text, as it may contain
52
  # reassembled data from a stream that is not available on the exception object.
53
  raw_response = raw_response_text
54
- if not raw_response and hasattr(error, 'response') and hasattr(error.response, 'text'):
55
- raw_response = error.response.text
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
 
57
  detailed_log_data = {
58
  "timestamp": datetime.utcnow().isoformat(),
59
- "api_key_ending": api_key[-4:],
60
  "model": model,
61
  "attempt_number": attempt,
62
  "error_type": type(error).__name__,
63
- "error_message": str(error),
64
- "raw_response": raw_response,
 
 
65
  "request_headers": request_headers,
 
66
  }
67
  failure_logger.error(detailed_log_data)
68
 
69
  # 2. Log a concise summary to the main library logger, which will propagate
70
  summary_message = (
71
- f"API call failed for model {model} with key ...{api_key[-4:]}. "
72
  f"Error: {type(error).__name__}. See failures.log for details."
73
  )
74
  main_lib_logger.error(summary_message)
 
4
  import os
5
  from datetime import datetime
6
 
7
+
8
  def setup_failure_logger():
9
  """Sets up a dedicated JSON logger for writing detailed failure logs to a file."""
10
  log_dir = "logs"
 
13
 
14
  # Create a logger specifically for failures.
15
  # This logger will NOT propagate to the root logger.
16
+ logger = logging.getLogger("failure_logger")
17
  logger.setLevel(logging.INFO)
18
  logger.propagate = False
19
 
20
  # Use a rotating file handler
21
  handler = RotatingFileHandler(
22
+ os.path.join(log_dir, "failures.log"),
23
+ maxBytes=5 * 1024 * 1024, # 5 MB
24
+ backupCount=2,
25
  )
26
 
27
  # Custom JSON formatter for structured logs
 
31
  return json.dumps(record.msg)
32
 
33
  handler.setFormatter(JsonFormatter())
34
+
35
  # Add handler only if it hasn't been added before
36
  if not logger.handlers:
37
  logger.addHandler(handler)
38
 
39
  return logger
40
 
41
+
42
  # Initialize the dedicated logger for detailed failure logs
43
  failure_logger = setup_failure_logger()
44
 
45
  # Get the main library logger for concise, propagated messages
46
+ main_lib_logger = logging.getLogger("rotator_library")
47
+
48
+
49
+ def _extract_response_body(error: Exception) -> str:
50
+ """
51
+ Extract the full response body from various error types.
52
+
53
+ Handles:
54
+ - httpx.HTTPStatusError: response.text or response.content
55
+ - litellm exceptions: various response attributes
56
+ - Other exceptions: str(error)
57
+ """
58
+ # Try to get response body from httpx errors
59
+ if hasattr(error, "response") and error.response is not None:
60
+ response = error.response
61
+ # Try .text first (decoded)
62
+ if hasattr(response, "text") and response.text:
63
+ return response.text
64
+ # Try .content (bytes)
65
+ if hasattr(response, "content") and response.content:
66
+ try:
67
+ return response.content.decode("utf-8", errors="replace")
68
+ except Exception:
69
+ return str(response.content)
70
 
71
+ # Check for litellm's body attribute
72
+ if hasattr(error, "body") and error.body:
73
+ return str(error.body)
74
+
75
+ # Check for message attribute that might contain response
76
+ if hasattr(error, "message") and error.message:
77
+ return str(error.message)
78
+
79
+ return None
80
+
81
+
82
+ def log_failure(
83
+ api_key: str,
84
+ model: str,
85
+ attempt: int,
86
+ error: Exception,
87
+ request_headers: dict,
88
+ raw_response_text: str = None,
89
+ ):
90
  """
91
  Logs a detailed failure message to a file and a concise summary to the main logger.
92
+
93
+ Args:
94
+ api_key: The API key or credential path that was used
95
+ model: The model that was requested
96
+ attempt: The attempt number (1-based)
97
+ error: The exception that occurred
98
+ request_headers: Headers from the original request
99
+ raw_response_text: Optional pre-extracted response body (e.g., from streaming)
100
  """
101
  # 1. Log the full, detailed error to the dedicated failures.log file
102
  # Prioritize the explicitly passed raw response text, as it may contain
103
  # reassembled data from a stream that is not available on the exception object.
104
  raw_response = raw_response_text
105
+ if not raw_response:
106
+ raw_response = _extract_response_body(error)
107
+
108
+ # Get full error message (not truncated)
109
+ full_error_message = str(error)
110
+
111
+ # Also capture any nested/wrapped exception info
112
+ error_chain = []
113
+ visited = set() # Track visited exceptions to detect circular references
114
+ current_error = error
115
+ while current_error:
116
+ # Check for circular references
117
+ error_id = id(current_error)
118
+ if error_id in visited:
119
+ break
120
+ visited.add(error_id)
121
+
122
+ error_chain.append(
123
+ {
124
+ "type": type(current_error).__name__,
125
+ "message": str(current_error)[:2000], # Limit per-error message size
126
+ }
127
+ )
128
+ current_error = getattr(current_error, "__cause__", None) or getattr(
129
+ current_error, "__context__", None
130
+ )
131
+ if len(error_chain) > 5: # Prevent excessive chain length
132
+ break
133
 
134
  detailed_log_data = {
135
  "timestamp": datetime.utcnow().isoformat(),
136
+ "api_key_ending": api_key[-4:] if len(api_key) >= 4 else "****",
137
  "model": model,
138
  "attempt_number": attempt,
139
  "error_type": type(error).__name__,
140
+ "error_message": full_error_message[:5000], # Limit total size
141
+ "raw_response": raw_response[:10000]
142
+ if raw_response
143
+ else None, # Limit response size
144
  "request_headers": request_headers,
145
+ "error_chain": error_chain if len(error_chain) > 1 else None,
146
  }
147
  failure_logger.error(detailed_log_data)
148
 
149
  # 2. Log a concise summary to the main library logger, which will propagate
150
  summary_message = (
151
+ f"API call failed for model {model} with key ...{api_key[-4:] if len(api_key) >= 4 else '****'}. "
152
  f"Error: {type(error).__name__}. See failures.log for details."
153
  )
154
  main_lib_logger.error(summary_message)