Mirrowel commited on
Commit
c5dfecd
·
1 Parent(s): 8ed4f52

feat(antigravity): ✨ add automatic retry logic for empty API responses

Browse files

Implements robust handling for cases where the Antigravity provider returns empty responses (no content, no tool calls) due to transient server-side issues.

- Introduces new `EmptyResponseError` exception class to represent empty response failures
- Classifies empty responses as rotatable server errors (HTTP 503 equivalent) to trigger credential rotation
- Adds configurable retry mechanism with `ANTIGRAVITY_EMPTY_RESPONSE_RETRIES` (default: 3) and `ANTIGRAVITY_EMPTY_RESPONSE_RETRY_DELAY` (default: 2s)
- Implements separate retry logic for streaming and non-streaming requests to preserve true streaming behavior
- Refactors request handling to use a URL fallback loop with nested empty response retry logic
- Tracks whether any chunks were yielded in streaming mode to avoid emitting synthetic final chunks for empty responses
- Moves helper functions (`_env_bool`, `_env_int`) earlier in the file for use in configuration constants
- Stores last received usage metadata for proper final chunk construction in streaming mode

The retry mechanism operates independently from URL fallback logic, ensuring empty responses are retried on the same URL before attempting credential rotation, while HTTP errors (except 429) trigger URL fallback attempts.

src/rotator_library/error_handler.py CHANGED
@@ -142,6 +142,29 @@ class CredentialNeedsReauthError(Exception):
142
  super().__init__(self.message)
143
 
144
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
145
  # =============================================================================
146
  # ERROR TRACKING FOR CLIENT REPORTING
147
  # =============================================================================
@@ -731,6 +754,15 @@ def classify_error(e: Exception, provider: Optional[str] = None) -> ClassifiedEr
731
  status_code=401, # Treat as auth error for reporting purposes
732
  )
733
 
 
 
 
 
 
 
 
 
 
734
  if isinstance(e, RateLimitError):
735
  retry_after = get_retry_after(e)
736
  # Check if this is a quota error vs rate limit
 
142
  super().__init__(self.message)
143
 
144
 
145
+ class EmptyResponseError(Exception):
146
+ """
147
+ Raised when a provider returns an empty response after multiple retry attempts.
148
+
149
+ This is a rotatable error - the request should try the next credential.
150
+ Treated as a transient server-side issue (503 equivalent).
151
+
152
+ Attributes:
153
+ provider: The provider name (e.g., "antigravity")
154
+ model: The model that was requested
155
+ message: Human-readable message about the error
156
+ """
157
+
158
+ def __init__(self, provider: str, model: str, message: str = ""):
159
+ self.provider = provider
160
+ self.model = model
161
+ self.message = (
162
+ message
163
+ or f"Empty response from {provider}/{model} after multiple retry attempts"
164
+ )
165
+ super().__init__(self.message)
166
+
167
+
168
  # =============================================================================
169
  # ERROR TRACKING FOR CLIENT REPORTING
170
  # =============================================================================
 
754
  status_code=401, # Treat as auth error for reporting purposes
755
  )
756
 
757
+ if isinstance(e, EmptyResponseError):
758
+ # Transient server-side issue - provider returned empty response
759
+ # This is rotatable - try next credential
760
+ return ClassifiedError(
761
+ error_type="server_error",
762
+ original_exception=e,
763
+ status_code=503,
764
+ )
765
+
766
  if isinstance(e, RateLimitError):
767
  retry_after = get_retry_after(e)
768
  # Check if this is a quota error vs rate limit
src/rotator_library/providers/antigravity_provider.py CHANGED
@@ -39,6 +39,7 @@ from .antigravity_auth_base import AntigravityAuthBase
39
  from .provider_cache import ProviderCache
40
  from ..model_definitions import ModelDefinitions
41
  from ..timeout_config import TimeoutConfig
 
42
  from ..utils.paths import get_logs_dir, get_cache_dir
43
 
44
 
@@ -46,6 +47,17 @@ from ..utils.paths import get_logs_dir, get_cache_dir
46
  # CONFIGURATION CONSTANTS
47
  # =============================================================================
48
 
 
 
 
 
 
 
 
 
 
 
 
49
  lib_logger = logging.getLogger("rotator_library")
50
 
51
  # Antigravity base URLs with fallback order
@@ -71,6 +83,12 @@ AVAILABLE_MODELS = [
71
  # Default max output tokens (including thinking) - can be overridden per request
72
  DEFAULT_MAX_OUTPUT_TOKENS = 64000
73
 
 
 
 
 
 
 
74
  # Model alias mappings (internal ↔ public)
75
  MODEL_ALIAS_MAP = {
76
  "rev19-uic3-1p": "gemini-2.5-computer-use-preview-10-2025",
@@ -201,16 +219,6 @@ If you are unsure about a tool's parameters, YOU MUST read the schema definition
201
  # =============================================================================
202
 
203
 
204
- def _env_bool(key: str, default: bool = False) -> bool:
205
- """Get boolean from environment variable."""
206
- return os.getenv(key, str(default).lower()).lower() in ("true", "1", "yes")
207
-
208
-
209
- def _env_int(key: str, default: int) -> int:
210
- """Get integer from environment variable."""
211
- return int(os.getenv(key, str(default)))
212
-
213
-
214
  def _generate_request_id() -> str:
215
  """Generate Antigravity request ID: agent-{uuid}"""
216
  return f"agent-{uuid.uuid4()}"
@@ -2799,6 +2807,10 @@ class AntigravityProvider(AntigravityAuthBase, ProviderInterface):
2799
  # Build usage if present
2800
  usage = self._build_usage(chunk.get("usageMetadata", {}))
2801
 
 
 
 
 
2802
  # Mark completion when we see usageMetadata
2803
  if chunk.get("usageMetadata") and accumulator is not None:
2804
  accumulator["is_complete"] = True
@@ -3229,49 +3241,98 @@ class AntigravityProvider(AntigravityAuthBase, ProviderInterface):
3229
  "Accept": "text/event-stream" if stream else "application/json",
3230
  }
3231
 
3232
- try:
3233
- if stream:
3234
- return self._handle_streaming(
3235
- client, url, headers, payload, model, file_logger
3236
- )
3237
- else:
3238
- return await self._handle_non_streaming(
3239
- client, url, headers, payload, model, file_logger
3240
- )
3241
- except httpx.HTTPStatusError as e:
3242
- # 429 = Rate limit/quota exhausted - tied to credential, not URL
3243
- # Do NOT retry on different URL, just raise immediately
3244
- if e.response.status_code == 429:
3245
- lib_logger.debug(f"429 quota error - not retrying on fallback URL: {e}")
3246
- raise
3247
-
3248
- # For other HTTP errors (403, 500, etc.), try fallback URL
3249
- if self._try_next_base_url():
3250
- lib_logger.warning(f"Retrying with fallback URL: {e}")
3251
- url = f"{self._get_base_url()}{endpoint}"
3252
  if stream:
3253
- return self._handle_streaming(
 
3254
  client, url, headers, payload, model, file_logger
3255
  )
3256
  else:
3257
- return await self._handle_non_streaming(
3258
- client, url, headers, payload, model, file_logger
 
 
3259
  )
3260
- raise
3261
- except Exception as e:
3262
- # Non-HTTP errors (network issues, timeouts, etc.) - try fallback URL
3263
- if self._try_next_base_url():
3264
- lib_logger.warning(f"Retrying with fallback URL: {e}")
3265
- url = f"{self._get_base_url()}{endpoint}"
3266
- if stream:
3267
- return self._handle_streaming(
3268
- client, url, headers, payload, model, file_logger
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3269
  )
3270
- else:
3271
- return await self._handle_non_streaming(
3272
- client, url, headers, payload, model, file_logger
 
3273
  )
3274
- raise
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3275
 
3276
  def _inject_tool_hardening_instruction(
3277
  self, payload: Dict[str, Any], instruction_text: str
@@ -3342,6 +3403,8 @@ class AntigravityProvider(AntigravityAuthBase, ProviderInterface):
3342
  "tool_calls": [],
3343
  "tool_idx": 0, # Track tool call index across chunks
3344
  "is_complete": False, # Track if we received usageMetadata
 
 
3345
  }
3346
 
3347
  async with client.stream(
@@ -3381,42 +3444,115 @@ class AntigravityProvider(AntigravityAuthBase, ProviderInterface):
3381
  )
3382
 
3383
  yield litellm.ModelResponse(**openai_chunk)
 
3384
  except json.JSONDecodeError:
3385
  if file_logger:
3386
  file_logger.log_error(f"Parse error: {data_str[:100]}")
3387
  continue
3388
 
3389
- # If stream ended without usageMetadata chunk, emit a final chunk with finish_reason
3390
- # Emit final chunk if stream ended without usageMetadata
3391
- # Client will determine the correct finish_reason based on accumulated state
3392
- if not accumulator.get("is_complete"):
3393
- final_chunk = {
3394
- "id": f"chatcmpl-{uuid.uuid4().hex[:24]}",
3395
- "object": "chat.completion.chunk",
3396
- "created": int(time.time()),
3397
- "model": model,
3398
- "choices": [{"index": 0, "delta": {}, "finish_reason": None}],
3399
- # Include minimal usage to signal this is the final chunk
3400
- "usage": {
3401
- "prompt_tokens": 0,
3402
- "completion_tokens": 1,
3403
- "total_tokens": 1,
3404
- },
3405
- }
3406
- yield litellm.ModelResponse(**final_chunk)
 
 
 
 
 
 
 
 
 
 
 
3407
 
3408
- # Cache Claude thinking after stream completes
3409
- if (
3410
- self._is_claude(model)
3411
- and self._enable_signature_cache
3412
- and accumulator.get("reasoning_content")
3413
- ):
3414
- self._cache_thinking(
3415
- accumulator["reasoning_content"],
3416
- accumulator["thought_signature"],
3417
- accumulator["text_content"],
3418
- accumulator["tool_calls"],
3419
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3420
 
3421
  async def count_tokens(
3422
  self,
 
39
  from .provider_cache import ProviderCache
40
  from ..model_definitions import ModelDefinitions
41
  from ..timeout_config import TimeoutConfig
42
+ from ..error_handler import EmptyResponseError
43
  from ..utils.paths import get_logs_dir, get_cache_dir
44
 
45
 
 
47
  # CONFIGURATION CONSTANTS
48
  # =============================================================================
49
 
50
+
51
+ def _env_bool(key: str, default: bool = False) -> bool:
52
+ """Get boolean from environment variable."""
53
+ return os.getenv(key, str(default).lower()).lower() in ("true", "1", "yes")
54
+
55
+
56
+ def _env_int(key: str, default: int) -> int:
57
+ """Get integer from environment variable."""
58
+ return int(os.getenv(key, str(default)))
59
+
60
+
61
  lib_logger = logging.getLogger("rotator_library")
62
 
63
  # Antigravity base URLs with fallback order
 
83
  # Default max output tokens (including thinking) - can be overridden per request
84
  DEFAULT_MAX_OUTPUT_TOKENS = 64000
85
 
86
+ # Empty response retry configuration
87
+ # When Antigravity returns an empty response (no content, no tool calls),
88
+ # automatically retry up to this many times before giving up
89
+ EMPTY_RESPONSE_MAX_RETRIES = _env_int("ANTIGRAVITY_EMPTY_RESPONSE_RETRIES", 3)
90
+ EMPTY_RESPONSE_RETRY_DELAY = _env_int("ANTIGRAVITY_EMPTY_RESPONSE_RETRY_DELAY", 2)
91
+
92
  # Model alias mappings (internal ↔ public)
93
  MODEL_ALIAS_MAP = {
94
  "rev19-uic3-1p": "gemini-2.5-computer-use-preview-10-2025",
 
219
  # =============================================================================
220
 
221
 
 
 
 
 
 
 
 
 
 
 
222
  def _generate_request_id() -> str:
223
  """Generate Antigravity request ID: agent-{uuid}"""
224
  return f"agent-{uuid.uuid4()}"
 
2807
  # Build usage if present
2808
  usage = self._build_usage(chunk.get("usageMetadata", {}))
2809
 
2810
+ # Store last received usage for final chunk
2811
+ if usage and accumulator is not None:
2812
+ accumulator["last_usage"] = usage
2813
+
2814
  # Mark completion when we see usageMetadata
2815
  if chunk.get("usageMetadata") and accumulator is not None:
2816
  accumulator["is_complete"] = True
 
3241
  "Accept": "text/event-stream" if stream else "application/json",
3242
  }
3243
 
3244
+ # URL fallback loop - handles HTTP errors (except 429) and network errors
3245
+ # by switching to fallback URLs. Empty response retry is handled separately
3246
+ # inside _streaming_with_retry (streaming) or the inner loop (non-streaming).
3247
+ while True:
3248
+ try:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3249
  if stream:
3250
+ # Streaming: _streaming_with_retry handles empty response retries internally
3251
+ return self._streaming_with_retry(
3252
  client, url, headers, payload, model, file_logger
3253
  )
3254
  else:
3255
+ # Non-streaming: empty response retry loop
3256
+ error_msg = (
3257
+ "The model returned an empty response after multiple attempts. "
3258
+ "This may indicate a temporary service issue. Please try again."
3259
  )
3260
+
3261
+ for attempt in range(EMPTY_RESPONSE_MAX_RETRIES + 1):
3262
+ result = await self._handle_non_streaming(
3263
+ client, url, headers, payload, model, file_logger
3264
+ )
3265
+
3266
+ # Check if we got anything - empty dict means no candidates
3267
+ result_dict = (
3268
+ result.model_dump()
3269
+ if hasattr(result, "model_dump")
3270
+ else dict(result)
3271
+ )
3272
+ got_response = bool(result_dict.get("choices"))
3273
+
3274
+ if not got_response:
3275
+ if attempt < EMPTY_RESPONSE_MAX_RETRIES:
3276
+ lib_logger.warning(
3277
+ f"[Antigravity] Empty response from {model}, "
3278
+ f"attempt {attempt + 1}/{EMPTY_RESPONSE_MAX_RETRIES + 1}. Retrying..."
3279
+ )
3280
+ await asyncio.sleep(EMPTY_RESPONSE_RETRY_DELAY)
3281
+ continue
3282
+ else:
3283
+ lib_logger.error(
3284
+ f"[Antigravity] Empty response from {model} after "
3285
+ f"{EMPTY_RESPONSE_MAX_RETRIES + 1} attempts. Giving up."
3286
+ )
3287
+ raise EmptyResponseError(
3288
+ provider="antigravity",
3289
+ model=model,
3290
+ message=error_msg,
3291
+ )
3292
+
3293
+ return result
3294
+
3295
+ # Should not reach here, but just in case
3296
+ lib_logger.error(
3297
+ f"[Antigravity] Unexpected exit from retry loop for {model}"
3298
  )
3299
+ raise EmptyResponseError(
3300
+ provider="antigravity",
3301
+ model=model,
3302
+ message=error_msg,
3303
  )
3304
+
3305
+ except httpx.HTTPStatusError as e:
3306
+ # 429 = Rate limit/quota exhausted - tied to credential, not URL
3307
+ # Do NOT retry on different URL, just raise immediately
3308
+ if e.response.status_code == 429:
3309
+ lib_logger.debug(
3310
+ f"429 quota error - not retrying on fallback URL: {e}"
3311
+ )
3312
+ raise
3313
+
3314
+ # Other HTTP errors (403, 500, etc.) - try fallback URL
3315
+ if self._try_next_base_url():
3316
+ lib_logger.warning(f"Retrying with fallback URL: {e}")
3317
+ url = f"{self._get_base_url()}{endpoint}"
3318
+ if stream:
3319
+ url = f"{url}?alt=sse"
3320
+ continue # Retry with new URL
3321
+ raise # No more fallback URLs
3322
+
3323
+ except EmptyResponseError:
3324
+ # Empty response already retried internally - don't catch, propagate
3325
+ raise
3326
+
3327
+ except Exception as e:
3328
+ # Non-HTTP errors (network issues, timeouts, etc.) - try fallback URL
3329
+ if self._try_next_base_url():
3330
+ lib_logger.warning(f"Retrying with fallback URL: {e}")
3331
+ url = f"{self._get_base_url()}{endpoint}"
3332
+ if stream:
3333
+ url = f"{url}?alt=sse"
3334
+ continue # Retry with new URL
3335
+ raise # No more fallback URLs
3336
 
3337
  def _inject_tool_hardening_instruction(
3338
  self, payload: Dict[str, Any], instruction_text: str
 
3403
  "tool_calls": [],
3404
  "tool_idx": 0, # Track tool call index across chunks
3405
  "is_complete": False, # Track if we received usageMetadata
3406
+ "last_usage": None, # Track last received usage for final chunk
3407
+ "yielded_any": False, # Track if we yielded any real chunks
3408
  }
3409
 
3410
  async with client.stream(
 
3444
  )
3445
 
3446
  yield litellm.ModelResponse(**openai_chunk)
3447
+ accumulator["yielded_any"] = True
3448
  except json.JSONDecodeError:
3449
  if file_logger:
3450
  file_logger.log_error(f"Parse error: {data_str[:100]}")
3451
  continue
3452
 
3453
+ # Only emit synthetic final chunk if we actually received real data
3454
+ # If no data was received, the caller will detect zero chunks and retry
3455
+ if accumulator.get("yielded_any"):
3456
+ # If stream ended without usageMetadata chunk, emit a final chunk
3457
+ if not accumulator.get("is_complete"):
3458
+ final_chunk = {
3459
+ "id": f"chatcmpl-{uuid.uuid4().hex[:24]}",
3460
+ "object": "chat.completion.chunk",
3461
+ "created": int(time.time()),
3462
+ "model": model,
3463
+ "choices": [{"index": 0, "delta": {}, "finish_reason": None}],
3464
+ }
3465
+ # Only include usage if we received real data during streaming
3466
+ if accumulator.get("last_usage"):
3467
+ final_chunk["usage"] = accumulator["last_usage"]
3468
+ yield litellm.ModelResponse(**final_chunk)
3469
+
3470
+ # Cache Claude thinking after stream completes
3471
+ if (
3472
+ self._is_claude(model)
3473
+ and self._enable_signature_cache
3474
+ and accumulator.get("reasoning_content")
3475
+ ):
3476
+ self._cache_thinking(
3477
+ accumulator["reasoning_content"],
3478
+ accumulator["thought_signature"],
3479
+ accumulator["text_content"],
3480
+ accumulator["tool_calls"],
3481
+ )
3482
 
3483
+ async def _streaming_with_retry(
3484
+ self,
3485
+ client: httpx.AsyncClient,
3486
+ url: str,
3487
+ headers: Dict[str, str],
3488
+ payload: Dict[str, Any],
3489
+ model: str,
3490
+ file_logger: Optional[AntigravityFileLogger] = None,
3491
+ ) -> AsyncGenerator[litellm.ModelResponse, None]:
3492
+ """
3493
+ Wrapper around _handle_streaming that retries on empty responses.
3494
+
3495
+ If the stream yields zero chunks (Antigravity returned nothing),
3496
+ retry up to EMPTY_RESPONSE_MAX_RETRIES times before giving up.
3497
+ """
3498
+ error_msg = (
3499
+ "The model returned an empty response after multiple attempts. "
3500
+ "This may indicate a temporary service issue. Please try again."
3501
+ )
3502
+
3503
+ for attempt in range(EMPTY_RESPONSE_MAX_RETRIES + 1):
3504
+ chunk_count = 0
3505
+
3506
+ try:
3507
+ async for chunk in self._handle_streaming(
3508
+ client, url, headers, payload, model, file_logger
3509
+ ):
3510
+ chunk_count += 1
3511
+ yield chunk # Stream immediately - true streaming preserved
3512
+
3513
+ if chunk_count > 0:
3514
+ return # Success - we got data
3515
+
3516
+ # Zero chunks - empty response
3517
+ if attempt < EMPTY_RESPONSE_MAX_RETRIES:
3518
+ lib_logger.warning(
3519
+ f"[Antigravity] Empty stream from {model}, "
3520
+ f"attempt {attempt + 1}/{EMPTY_RESPONSE_MAX_RETRIES + 1}. Retrying..."
3521
+ )
3522
+ await asyncio.sleep(EMPTY_RESPONSE_RETRY_DELAY)
3523
+ continue
3524
+ else:
3525
+ lib_logger.error(
3526
+ f"[Antigravity] Empty stream from {model} after "
3527
+ f"{EMPTY_RESPONSE_MAX_RETRIES + 1} attempts. Giving up."
3528
+ )
3529
+ raise EmptyResponseError(
3530
+ provider="antigravity",
3531
+ model=model,
3532
+ message=error_msg,
3533
+ )
3534
+
3535
+ except httpx.HTTPStatusError as e:
3536
+ # 429 = Rate limit/quota exhausted - don't retry
3537
+ if e.response.status_code == 429:
3538
+ lib_logger.debug(f"429 quota error - not retrying: {e}")
3539
+ raise
3540
+ # Other HTTP errors - raise immediately (let caller handle)
3541
+ raise
3542
+
3543
+ except Exception:
3544
+ # Non-HTTP errors - raise immediately
3545
+ raise
3546
+
3547
+ # Should not reach here, but just in case
3548
+ lib_logger.error(
3549
+ f"[Antigravity] Unexpected exit from streaming retry loop for {model}"
3550
+ )
3551
+ raise EmptyResponseError(
3552
+ provider="antigravity",
3553
+ model=model,
3554
+ message=error_msg,
3555
+ )
3556
 
3557
  async def count_tokens(
3558
  self,