Mirrowel commited on
Commit
8ed4f52
Β·
1 Parent(s): a68d8d0

fix(oauth): πŸ› prevent deadlock and token desync for rotating refresh tokens

Browse files

This commit fixes critical issues in OAuth providers (Google, iFlow, Qwen) that use rotating refresh tokens, where each token refresh invalidates the previous token.

**Problems Fixed:**

1. **Deadlock Prevention**: Removed inline re-authentication from `refresh_token()` method that was called while holding a lock. When refresh failed with HTTP 400/401/403, the method would call `initialize_token()` directly, which would try to acquire the same lock, causing a deadlock. Now, invalid token errors are caught and queued for background re-authentication via `asyncio.create_task()`.

2. **Token Desync**: Changed `_save_credentials()` to write to disk FIRST, then update cache. Previously, cache was updated first with `buffer_on_failure=True`, which could leave stale tokens on disk if the write failed. For rotating tokens, this caused the old refresh_token on disk to become invalid after a successful API call, requiring re-auth on restart.

3. **Stale Cache Usage**: Modified `refresh_token()` to always read fresh credentials from disk before refreshing, preventing use of stale cached tokens that may have been invalidated by another process.

4. **New Error Type**: Introduced `CredentialNeedsReauthError` exception to signal rotatable authentication failures. This allows the client to rotate to the next credential without logging scary tracebacks, while background re-auth fixes the broken credential.

**Changes:**

- Add `CredentialNeedsReauthError` exception class and classification in error_handler.py
- Catch and wrap `CredentialNeedsReauthError` in client.py retry loop
- Replace inline re-auth with background task queuing in all OAuth providers
- Change `_save_credentials()` to disk-first writes with no buffering for rotating tokens
- Add `force_interactive` parameter to `initialize_token()` for explicit re-auth requests
- Always reload credentials from disk before refresh to prevent stale token usage
- Return boolean from `_save_credentials()` and raise IOError on critical failures
- Update re-auth queue processing to call `initialize_token(force_interactive=True)`

src/rotator_library/client.py CHANGED
@@ -23,6 +23,7 @@ from .usage_manager import UsageManager
23
  from .failure_logger import log_failure, configure_failure_logger
24
  from .error_handler import (
25
  PreRequestCallbackError,
 
26
  classify_error,
27
  AllProviders,
28
  NoAvailableKeysError,
@@ -755,6 +756,12 @@ class RotatingClient:
755
  await self.usage_manager.record_success(key, model)
756
  break
757
 
 
 
 
 
 
 
758
  except (
759
  litellm.RateLimitError,
760
  litellm.ServiceUnavailableError,
 
23
  from .failure_logger import log_failure, configure_failure_logger
24
  from .error_handler import (
25
  PreRequestCallbackError,
26
+ CredentialNeedsReauthError,
27
  classify_error,
28
  AllProviders,
29
  NoAvailableKeysError,
 
756
  await self.usage_manager.record_success(key, model)
757
  break
758
 
759
+ except CredentialNeedsReauthError as e:
760
+ # This credential needs re-authentication but re-auth is already queued.
761
+ # Wrap it so the outer retry loop can rotate to the next credential.
762
+ # No scary traceback needed - this is an expected recovery scenario.
763
+ raise StreamedAPIError("Credential needs re-authentication", data=e)
764
+
765
  except (
766
  litellm.RateLimitError,
767
  litellm.ServiceUnavailableError,
src/rotator_library/error_handler.py CHANGED
@@ -117,6 +117,31 @@ class PreRequestCallbackError(Exception):
117
  pass
118
 
119
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
  # =============================================================================
121
  # ERROR TRACKING FOR CLIENT REPORTING
122
  # =============================================================================
@@ -698,6 +723,14 @@ def classify_error(e: Exception, provider: Optional[str] = None) -> ClassifiedEr
698
  status_code=400, # Treat as a bad request
699
  )
700
 
 
 
 
 
 
 
 
 
701
  if isinstance(e, RateLimitError):
702
  retry_after = get_retry_after(e)
703
  # Check if this is a quota error vs rate limit
@@ -789,6 +822,7 @@ def should_rotate_on_error(classified_error: ClassifiedError) -> bool:
789
  - quota_exceeded: Current key/account exhausted
790
  - forbidden: Current credential denied access
791
  - authentication: Current credential invalid
 
792
  - server_error: Provider having issues (might work with different endpoint/key)
793
  - api_connection: Network issues (might be transient)
794
  - unknown: Safer to try another key
 
117
  pass
118
 
119
 
120
+ class CredentialNeedsReauthError(Exception):
121
+ """
122
+ Raised when a credential's refresh token is invalid and re-authentication is required.
123
+
124
+ This is a rotatable error - the request should try the next credential while
125
+ the broken credential is queued for re-authentication in the background.
126
+
127
+ Unlike generic HTTPStatusError, this exception signals:
128
+ - The credential is temporarily unavailable (needs user action)
129
+ - Re-auth has already been queued
130
+ - The request should rotate to the next credential without logging scary tracebacks
131
+
132
+ Attributes:
133
+ credential_path: Path to the credential file that needs re-auth
134
+ message: Human-readable message about the error
135
+ """
136
+
137
+ def __init__(self, credential_path: str, message: str = ""):
138
+ self.credential_path = credential_path
139
+ self.message = (
140
+ message or f"Credential '{credential_path}' requires re-authentication"
141
+ )
142
+ super().__init__(self.message)
143
+
144
+
145
  # =============================================================================
146
  # ERROR TRACKING FOR CLIENT REPORTING
147
  # =============================================================================
 
723
  status_code=400, # Treat as a bad request
724
  )
725
 
726
+ if isinstance(e, CredentialNeedsReauthError):
727
+ # This is a rotatable error - credential is broken but re-auth is queued
728
+ return ClassifiedError(
729
+ error_type="credential_reauth_needed",
730
+ original_exception=e,
731
+ status_code=401, # Treat as auth error for reporting purposes
732
+ )
733
+
734
  if isinstance(e, RateLimitError):
735
  retry_after = get_retry_after(e)
736
  # Check if this is a quota error vs rate limit
 
822
  - quota_exceeded: Current key/account exhausted
823
  - forbidden: Current credential denied access
824
  - authentication: Current credential invalid
825
+ - credential_reauth_needed: Credential needs interactive re-auth (queued)
826
  - server_error: Provider having issues (might work with different endpoint/key)
827
  - api_connection: Network issues (might be transient)
828
  - unknown: Safer to try another key
src/rotator_library/providers/google_oauth_base.py CHANGED
@@ -22,6 +22,7 @@ from rich.markup import escape as rich_escape
22
  from ..utils.headless_detection import is_headless_environment
23
  from ..utils.reauth_coordinator import get_reauth_coordinator
24
  from ..utils.resilient_io import safe_write_json
 
25
 
26
  lib_logger = logging.getLogger("rotator_library")
27
 
@@ -366,7 +367,6 @@ class GoogleOAuthBase:
366
  max_retries = 3
367
  new_token_data = None
368
  last_error = None
369
- needs_reauth = False
370
 
371
  async with httpx.AsyncClient() as client:
372
  for attempt in range(max_retries):
@@ -390,15 +390,42 @@ class GoogleOAuthBase:
390
  except httpx.HTTPStatusError as e:
391
  last_error = e
392
  status_code = e.response.status_code
393
-
394
- # [INVALID GRANT HANDLING] Handle 401/403 by triggering re-authentication
395
- if status_code == 401 or status_code == 403:
396
- lib_logger.warning(
397
- f"Refresh token invalid for '{Path(path).name}' (HTTP {status_code}). "
398
- f"Token may have been revoked or expired. Starting re-authentication..."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
399
  )
400
- needs_reauth = True
401
- break # Exit retry loop to trigger re-auth
402
 
403
  elif status_code == 429:
404
  # Rate limit - honor Retry-After header if present
@@ -438,23 +465,6 @@ class GoogleOAuthBase:
438
  continue
439
  raise
440
 
441
- # [INVALID GRANT RE-AUTH] Trigger OAuth flow if refresh token is invalid
442
- if needs_reauth:
443
- lib_logger.info(
444
- f"Starting re-authentication for '{Path(path).name}'..."
445
- )
446
- try:
447
- # Call initialize_token to trigger OAuth flow
448
- new_creds = await self.initialize_token(path)
449
- return new_creds
450
- except Exception as reauth_error:
451
- lib_logger.error(
452
- f"Re-authentication failed for '{Path(path).name}': {reauth_error}"
453
- )
454
- raise ValueError(
455
- f"Refresh token invalid and re-authentication failed: {reauth_error}"
456
- )
457
-
458
  # If we exhausted retries without success
459
  if new_token_data is None:
460
  raise last_error or Exception("Token refresh failed after all retries")
@@ -832,7 +842,7 @@ class GoogleOAuthBase:
832
 
833
  try:
834
  lib_logger.info(f"Starting re-auth for '{Path(path).name}'...")
835
- await self.initialize_token(path)
836
  lib_logger.info(f"Re-auth SUCCESS for '{Path(path).name}'")
837
 
838
  except Exception as e:
@@ -1058,7 +1068,9 @@ class GoogleOAuthBase:
1058
  return new_creds
1059
 
1060
  async def initialize_token(
1061
- self, creds_or_path: Union[Dict[str, Any], str]
 
 
1062
  ) -> Dict[str, Any]:
1063
  """
1064
  Initialize OAuth token, triggering interactive OAuth flow if needed.
@@ -1066,6 +1078,12 @@ class GoogleOAuthBase:
1066
  If interactive OAuth is required (expired refresh token, missing credentials, etc.),
1067
  the flow is coordinated globally via ReauthCoordinator to ensure only one
1068
  interactive OAuth flow runs at a time across all providers.
 
 
 
 
 
 
1069
  """
1070
  path = creds_or_path if isinstance(creds_or_path, str) else None
1071
 
@@ -1085,7 +1103,11 @@ class GoogleOAuthBase:
1085
  await self._load_credentials(creds_or_path) if path else creds_or_path
1086
  )
1087
  reason = ""
1088
- if not creds.get("refresh_token"):
 
 
 
 
1089
  reason = "refresh token is missing"
1090
  elif self._is_token_expired(creds):
1091
  reason = "token is expired"
 
22
  from ..utils.headless_detection import is_headless_environment
23
  from ..utils.reauth_coordinator import get_reauth_coordinator
24
  from ..utils.resilient_io import safe_write_json
25
+ from ..error_handler import CredentialNeedsReauthError
26
 
27
  lib_logger = logging.getLogger("rotator_library")
28
 
 
367
  max_retries = 3
368
  new_token_data = None
369
  last_error = None
 
370
 
371
  async with httpx.AsyncClient() as client:
372
  for attempt in range(max_retries):
 
390
  except httpx.HTTPStatusError as e:
391
  last_error = e
392
  status_code = e.response.status_code
393
+ error_body = e.response.text
394
+
395
+ # [INVALID GRANT HANDLING] Handle 400/401/403 by queuing for re-auth
396
+ # We must NOT call initialize_token from here as we hold a lock (would deadlock)
397
+ if status_code == 400:
398
+ # Check if this is an invalid_grant error
399
+ if "invalid_grant" in error_body.lower():
400
+ lib_logger.info(
401
+ f"Credential '{Path(path).name}' needs re-auth (HTTP 400: invalid_grant). "
402
+ f"Queued for re-authentication, rotating to next credential."
403
+ )
404
+ asyncio.create_task(
405
+ self._queue_refresh(
406
+ path, force=True, needs_reauth=True
407
+ )
408
+ )
409
+ raise CredentialNeedsReauthError(
410
+ credential_path=path,
411
+ message=f"Refresh token invalid for '{Path(path).name}'. Re-auth queued.",
412
+ )
413
+ else:
414
+ # Other 400 error - raise it
415
+ raise
416
+
417
+ elif status_code in (401, 403):
418
+ lib_logger.info(
419
+ f"Credential '{Path(path).name}' needs re-auth (HTTP {status_code}). "
420
+ f"Queued for re-authentication, rotating to next credential."
421
+ )
422
+ asyncio.create_task(
423
+ self._queue_refresh(path, force=True, needs_reauth=True)
424
+ )
425
+ raise CredentialNeedsReauthError(
426
+ credential_path=path,
427
+ message=f"Token invalid for '{Path(path).name}' (HTTP {status_code}). Re-auth queued.",
428
  )
 
 
429
 
430
  elif status_code == 429:
431
  # Rate limit - honor Retry-After header if present
 
465
  continue
466
  raise
467
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
468
  # If we exhausted retries without success
469
  if new_token_data is None:
470
  raise last_error or Exception("Token refresh failed after all retries")
 
842
 
843
  try:
844
  lib_logger.info(f"Starting re-auth for '{Path(path).name}'...")
845
+ await self.initialize_token(path, force_interactive=True)
846
  lib_logger.info(f"Re-auth SUCCESS for '{Path(path).name}'")
847
 
848
  except Exception as e:
 
1068
  return new_creds
1069
 
1070
  async def initialize_token(
1071
+ self,
1072
+ creds_or_path: Union[Dict[str, Any], str],
1073
+ force_interactive: bool = False,
1074
  ) -> Dict[str, Any]:
1075
  """
1076
  Initialize OAuth token, triggering interactive OAuth flow if needed.
 
1078
  If interactive OAuth is required (expired refresh token, missing credentials, etc.),
1079
  the flow is coordinated globally via ReauthCoordinator to ensure only one
1080
  interactive OAuth flow runs at a time across all providers.
1081
+
1082
+ Args:
1083
+ creds_or_path: Either a credentials dict or path to credentials file.
1084
+ force_interactive: If True, skip expiry checks and force interactive OAuth.
1085
+ Use this when the refresh token is known to be invalid
1086
+ (e.g., after HTTP 400 from token endpoint).
1087
  """
1088
  path = creds_or_path if isinstance(creds_or_path, str) else None
1089
 
 
1103
  await self._load_credentials(creds_or_path) if path else creds_or_path
1104
  )
1105
  reason = ""
1106
+ if force_interactive:
1107
+ reason = (
1108
+ "re-authentication was explicitly requested (refresh token invalid)"
1109
+ )
1110
+ elif not creds.get("refresh_token"):
1111
  reason = "refresh token is missing"
1112
  elif self._is_token_expired(creds):
1113
  reason = "token is expired"
src/rotator_library/providers/iflow_auth_base.py CHANGED
@@ -26,6 +26,7 @@ from rich.markup import escape as rich_escape
26
  from ..utils.headless_detection import is_headless_environment
27
  from ..utils.reauth_coordinator import get_reauth_coordinator
28
  from ..utils.resilient_io import safe_write_json
 
29
 
30
  lib_logger = logging.getLogger("rotator_library")
31
 
@@ -371,26 +372,42 @@ class IFlowAuthBase:
371
  return env_creds
372
  raise # Re-raise the original file not found error
373
 
374
- async def _save_credentials(self, path: str, creds: Dict[str, Any]):
375
- """Save credentials with in-memory fallback if disk unavailable."""
376
- # Always update cache first (memory is reliable)
377
- self._credentials_cache[path] = creds
 
 
 
378
 
 
 
 
 
 
379
  # Don't save to file if credentials were loaded from environment
380
  if creds.get("_proxy_metadata", {}).get("loaded_from_env"):
 
381
  lib_logger.debug("Credentials loaded from env, skipping file save")
382
- return
383
 
384
- # Attempt disk write - if it fails, we still have the cache
385
- # buffer_on_failure ensures data is retried periodically and saved on shutdown
386
- if safe_write_json(
387
- path, creds, lib_logger, secure_permissions=True, buffer_on_failure=True
388
  ):
389
- lib_logger.debug(f"Saved updated iFlow OAuth credentials to '{path}'.")
390
- else:
391
- lib_logger.warning(
392
- "iFlow credentials cached in memory only (buffered for retry)."
393
  )
 
 
 
 
 
 
 
 
394
 
395
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
396
  """Checks if the token is expired (with buffer for proactive refresh)."""
@@ -550,10 +567,11 @@ class IFlowAuthBase:
550
  if not force and cached_creds and not self._is_token_expired(cached_creds):
551
  return cached_creds
552
 
553
- # If cache is empty, read from file
554
- if path not in self._credentials_cache:
555
- await self._read_creds_from_file(path)
556
-
 
557
  creds_from_file = self._credentials_cache[path]
558
 
559
  lib_logger.debug(f"Refreshing iFlow OAuth token for '{Path(path).name}'...")
@@ -565,7 +583,6 @@ class IFlowAuthBase:
565
  max_retries = 3
566
  new_token_data = None
567
  last_error = None
568
- needs_reauth = False
569
 
570
  # Create Basic Auth header
571
  auth_string = f"{IFLOW_CLIENT_ID}:{IFLOW_CLIENT_SECRET}"
@@ -624,14 +641,58 @@ class IFlowAuthBase:
624
  )
625
 
626
  # [STATUS CODE HANDLING]
627
- # [INVALID GRANT HANDLING] Handle 401/403 by triggering re-authentication
628
- if status_code in (401, 403):
629
- lib_logger.warning(
630
- f"Refresh token invalid for '{Path(path).name}' (HTTP {status_code}). "
631
- f"Token may have been revoked or expired. Starting re-authentication..."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
632
  )
633
- needs_reauth = True
634
- break # Exit retry loop to trigger re-auth
635
 
636
  elif status_code == 429:
637
  retry_after = int(e.response.headers.get("Retry-After", 60))
@@ -667,37 +728,6 @@ class IFlowAuthBase:
667
  continue
668
  raise
669
 
670
- # [INVALID GRANT RE-AUTH] Trigger OAuth flow if refresh token is invalid
671
- if needs_reauth:
672
- lib_logger.info(
673
- f"Starting re-authentication for '{Path(path).name}'..."
674
- )
675
- try:
676
- # Call initialize_token to trigger OAuth flow
677
- new_creds = await self.initialize_token(path)
678
- # Clear backoff on successful re-auth
679
- self._refresh_failures.pop(path, None)
680
- self._next_refresh_after.pop(path, None)
681
- return new_creds
682
- except Exception as reauth_error:
683
- lib_logger.error(
684
- f"Re-authentication failed for '{Path(path).name}': {reauth_error}"
685
- )
686
- # [BACKOFF TRACKING] Increment failure count and set backoff timer
687
- self._refresh_failures[path] = (
688
- self._refresh_failures.get(path, 0) + 1
689
- )
690
- backoff_seconds = min(
691
- 300, 30 * (2 ** self._refresh_failures[path])
692
- ) # Max 5 min backoff
693
- self._next_refresh_after[path] = time.time() + backoff_seconds
694
- lib_logger.debug(
695
- f"Setting backoff for '{Path(path).name}': {backoff_seconds}s"
696
- )
697
- raise ValueError(
698
- f"Refresh token invalid and re-authentication failed: {reauth_error}"
699
- )
700
-
701
  if new_token_data is None:
702
  # [BACKOFF TRACKING] Increment failure count and set backoff timer
703
  self._refresh_failures[path] = self._refresh_failures.get(path, 0) + 1
@@ -775,11 +805,19 @@ class IFlowAuthBase:
775
  self._refresh_failures.pop(path, None)
776
  self._next_refresh_after.pop(path, None)
777
 
778
- await self._save_credentials(path, creds_from_file)
 
 
 
 
 
 
 
 
779
  lib_logger.debug(
780
  f"Successfully refreshed iFlow OAuth token for '{Path(path).name}'."
781
  )
782
- return creds_from_file
783
 
784
  async def get_api_details(self, credential_identifier: str) -> Tuple[str, str]:
785
  """
@@ -1026,12 +1064,39 @@ class IFlowAuthBase:
1026
 
1027
  except httpx.HTTPStatusError as e:
1028
  status_code = e.response.status_code
1029
- if status_code in (401, 403):
1030
- # Invalid refresh token - route to re-auth queue
1031
- lib_logger.warning(
1032
- f"Refresh token invalid for '{Path(path).name}' (HTTP {status_code}). "
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1033
  f"Routing to re-auth queue."
1034
  )
 
 
1035
  self._queue_retry_count.pop(path, None) # Clear retry count
1036
  async with self._queue_tracking_lock:
1037
  self._queued_credentials.discard(
@@ -1126,7 +1191,7 @@ class IFlowAuthBase:
1126
 
1127
  try:
1128
  lib_logger.info(f"Starting re-auth for '{Path(path).name}'...")
1129
- await self.initialize_token(path)
1130
  lib_logger.info(f"Re-auth SUCCESS for '{Path(path).name}'")
1131
 
1132
  except Exception as e:
@@ -1270,7 +1335,11 @@ class IFlowAuthBase:
1270
  }
1271
 
1272
  if path:
1273
- await self._save_credentials(path, creds)
 
 
 
 
1274
 
1275
  lib_logger.info(
1276
  f"iFlow OAuth initialized successfully for '{display_name}'."
@@ -1281,7 +1350,9 @@ class IFlowAuthBase:
1281
  await callback_server.stop()
1282
 
1283
  async def initialize_token(
1284
- self, creds_or_path: Union[Dict[str, Any], str]
 
 
1285
  ) -> Dict[str, Any]:
1286
  """
1287
  Initialize OAuth token, triggering interactive authorization flow if needed.
@@ -1289,6 +1360,12 @@ class IFlowAuthBase:
1289
  If interactive OAuth is required (expired refresh token, missing credentials, etc.),
1290
  the flow is coordinated globally via ReauthCoordinator to ensure only one
1291
  interactive OAuth flow runs at a time across all providers.
 
 
 
 
 
 
1292
  """
1293
  path = creds_or_path if isinstance(creds_or_path, str) else None
1294
 
@@ -1308,7 +1385,11 @@ class IFlowAuthBase:
1308
  )
1309
 
1310
  reason = ""
1311
- if not creds.get("refresh_token"):
 
 
 
 
1312
  reason = "refresh token is missing"
1313
  elif self._is_token_expired(creds):
1314
  reason = "token is expired"
@@ -1389,10 +1470,15 @@ class IFlowAuthBase:
1389
  f"No email found in iFlow credentials for '{path or 'in-memory object'}'."
1390
  )
1391
 
1392
- # Update timestamp on check
 
 
1393
  if path and "_proxy_metadata" in creds:
1394
  creds["_proxy_metadata"]["last_check_timestamp"] = time.time()
1395
- await self._save_credentials(path, creds)
 
 
 
1396
 
1397
  return {"email": email}
1398
  except Exception as e:
@@ -1513,7 +1599,11 @@ class IFlowAuthBase:
1513
  )
1514
 
1515
  # Step 4: Save credentials to file
1516
- await self._save_credentials(str(file_path), new_creds)
 
 
 
 
1517
 
1518
  return IFlowCredentialSetupResult(
1519
  success=True,
 
26
  from ..utils.headless_detection import is_headless_environment
27
  from ..utils.reauth_coordinator import get_reauth_coordinator
28
  from ..utils.resilient_io import safe_write_json
29
+ from ..error_handler import CredentialNeedsReauthError
30
 
31
  lib_logger = logging.getLogger("rotator_library")
32
 
 
372
  return env_creds
373
  raise # Re-raise the original file not found error
374
 
375
+ async def _save_credentials(self, path: str, creds: Dict[str, Any]) -> bool:
376
+ """Save credentials to disk, then update cache. Returns True only if disk write succeeded.
377
+
378
+ For providers with rotating refresh tokens, disk persistence is CRITICAL.
379
+ If we update the cache but fail to write to disk:
380
+ - The old refresh_token on disk may become invalid (consumed by API)
381
+ - On restart, we'd load the invalid token and require re-auth
382
 
383
+ By writing to disk FIRST, we ensure:
384
+ - Cache only updated after disk succeeds (guaranteed parity)
385
+ - If disk fails, cache keeps old tokens, refresh is retried
386
+ - No desync between cache and disk is possible
387
+ """
388
  # Don't save to file if credentials were loaded from environment
389
  if creds.get("_proxy_metadata", {}).get("loaded_from_env"):
390
+ self._credentials_cache[path] = creds
391
  lib_logger.debug("Credentials loaded from env, skipping file save")
392
+ return True
393
 
394
+ # Write to disk FIRST - do NOT buffer on failure for rotating tokens
395
+ # Buffering is dangerous because the refresh_token may be stale by retry time
396
+ if not safe_write_json(
397
+ path, creds, lib_logger, secure_permissions=True, buffer_on_failure=False
398
  ):
399
+ lib_logger.error(
400
+ f"Failed to write iFlow credentials to disk for '{Path(path).name}'. "
401
+ f"Cache NOT updated to maintain parity with disk."
 
402
  )
403
+ return False
404
+
405
+ # Disk write succeeded - now update cache (guaranteed parity)
406
+ self._credentials_cache[path] = creds
407
+ lib_logger.debug(
408
+ f"Saved updated iFlow OAuth credentials to '{Path(path).name}'."
409
+ )
410
+ return True
411
 
412
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
413
  """Checks if the token is expired (with buffer for proactive refresh)."""
 
567
  if not force and cached_creds and not self._is_token_expired(cached_creds):
568
  return cached_creds
569
 
570
+ # [ROTATING TOKEN FIX] Always read fresh from disk before refresh.
571
+ # iFlow may use rotating refresh tokens - each refresh could invalidate the previous token.
572
+ # If we use a stale cached token, refresh will fail.
573
+ # Reading fresh from disk ensures we have the latest token.
574
+ await self._read_creds_from_file(path)
575
  creds_from_file = self._credentials_cache[path]
576
 
577
  lib_logger.debug(f"Refreshing iFlow OAuth token for '{Path(path).name}'...")
 
583
  max_retries = 3
584
  new_token_data = None
585
  last_error = None
 
586
 
587
  # Create Basic Auth header
588
  auth_string = f"{IFLOW_CLIENT_ID}:{IFLOW_CLIENT_SECRET}"
 
641
  )
642
 
643
  # [STATUS CODE HANDLING]
644
+ # [INVALID GRANT HANDLING] Handle 400/401/403 by raising
645
+ # Queue for re-auth in background so credential gets fixed automatically
646
+ if status_code == 400:
647
+ # Check if this is an invalid refresh token error
648
+ try:
649
+ error_data = e.response.json()
650
+ error_type = error_data.get("error", "")
651
+ error_desc = error_data.get("error_description", "")
652
+ if not error_desc:
653
+ error_desc = error_data.get("message", error_body)
654
+ except Exception:
655
+ error_type = ""
656
+ error_desc = error_body
657
+
658
+ if (
659
+ "invalid" in error_desc.lower()
660
+ or error_type == "invalid_request"
661
+ ):
662
+ lib_logger.info(
663
+ f"Credential '{Path(path).name}' needs re-auth (HTTP 400: {error_desc}). "
664
+ f"Queued for re-authentication, rotating to next credential."
665
+ )
666
+ # Queue for re-auth in background (non-blocking, fire-and-forget)
667
+ # This ensures credential gets fixed even if caller doesn't handle it
668
+ asyncio.create_task(
669
+ self._queue_refresh(
670
+ path, force=True, needs_reauth=True
671
+ )
672
+ )
673
+ # Raise rotatable error instead of raw HTTPStatusError
674
+ raise CredentialNeedsReauthError(
675
+ credential_path=path,
676
+ message=f"Refresh token invalid for '{Path(path).name}'. Re-auth queued.",
677
+ )
678
+ else:
679
+ # Other 400 error - raise it
680
+ raise
681
+
682
+ elif status_code in (401, 403):
683
+ lib_logger.info(
684
+ f"Credential '{Path(path).name}' needs re-auth (HTTP {status_code}). "
685
+ f"Queued for re-authentication, rotating to next credential."
686
+ )
687
+ # Queue for re-auth in background (non-blocking, fire-and-forget)
688
+ asyncio.create_task(
689
+ self._queue_refresh(path, force=True, needs_reauth=True)
690
+ )
691
+ # Raise rotatable error instead of raw HTTPStatusError
692
+ raise CredentialNeedsReauthError(
693
+ credential_path=path,
694
+ message=f"Token invalid for '{Path(path).name}' (HTTP {status_code}). Re-auth queued.",
695
  )
 
 
696
 
697
  elif status_code == 429:
698
  retry_after = int(e.response.headers.get("Retry-After", 60))
 
728
  continue
729
  raise
730
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
731
  if new_token_data is None:
732
  # [BACKOFF TRACKING] Increment failure count and set backoff timer
733
  self._refresh_failures[path] = self._refresh_failures.get(path, 0) + 1
 
805
  self._refresh_failures.pop(path, None)
806
  self._next_refresh_after.pop(path, None)
807
 
808
+ # Save credentials - MUST succeed for rotating token providers
809
+ if not await self._save_credentials(path, creds_from_file):
810
+ # CRITICAL: If we can't persist the new token, the old token may be
811
+ # invalidated. This is a critical failure - raise so retry logic kicks in.
812
+ raise IOError(
813
+ f"Failed to persist refreshed credentials for '{Path(path).name}'. "
814
+ f"Disk write failed - refresh will be retried."
815
+ )
816
+
817
  lib_logger.debug(
818
  f"Successfully refreshed iFlow OAuth token for '{Path(path).name}'."
819
  )
820
+ return self._credentials_cache[path] # Return from cache (synced with disk)
821
 
822
  async def get_api_details(self, credential_identifier: str) -> Tuple[str, str]:
823
  """
 
1064
 
1065
  except httpx.HTTPStatusError as e:
1066
  status_code = e.response.status_code
1067
+ # Check for invalid refresh token errors (400/401/403)
1068
+ # These need to be routed to re-auth queue for interactive OAuth
1069
+ needs_reauth = False
1070
+
1071
+ if status_code == 400:
1072
+ # Check if this is an invalid refresh token error
1073
+ try:
1074
+ error_data = e.response.json()
1075
+ error_type = error_data.get("error", "")
1076
+ error_desc = error_data.get("error_description", "")
1077
+ if not error_desc:
1078
+ error_desc = error_data.get("message", str(e))
1079
+ except Exception:
1080
+ error_type = ""
1081
+ error_desc = str(e)
1082
+
1083
+ if (
1084
+ "invalid" in error_desc.lower()
1085
+ or error_type == "invalid_request"
1086
+ ):
1087
+ needs_reauth = True
1088
+ lib_logger.info(
1089
+ f"Credential '{Path(path).name}' needs re-auth (HTTP 400: {error_desc}). "
1090
+ f"Routing to re-auth queue."
1091
+ )
1092
+ elif status_code in (401, 403):
1093
+ needs_reauth = True
1094
+ lib_logger.info(
1095
+ f"Credential '{Path(path).name}' needs re-auth (HTTP {status_code}). "
1096
  f"Routing to re-auth queue."
1097
  )
1098
+
1099
+ if needs_reauth:
1100
  self._queue_retry_count.pop(path, None) # Clear retry count
1101
  async with self._queue_tracking_lock:
1102
  self._queued_credentials.discard(
 
1191
 
1192
  try:
1193
  lib_logger.info(f"Starting re-auth for '{Path(path).name}'...")
1194
+ await self.initialize_token(path, force_interactive=True)
1195
  lib_logger.info(f"Re-auth SUCCESS for '{Path(path).name}'")
1196
 
1197
  except Exception as e:
 
1335
  }
1336
 
1337
  if path:
1338
+ if not await self._save_credentials(path, creds):
1339
+ raise IOError(
1340
+ f"Failed to save OAuth credentials to disk for '{display_name}'. "
1341
+ f"Please retry authentication."
1342
+ )
1343
 
1344
  lib_logger.info(
1345
  f"iFlow OAuth initialized successfully for '{display_name}'."
 
1350
  await callback_server.stop()
1351
 
1352
  async def initialize_token(
1353
+ self,
1354
+ creds_or_path: Union[Dict[str, Any], str],
1355
+ force_interactive: bool = False,
1356
  ) -> Dict[str, Any]:
1357
  """
1358
  Initialize OAuth token, triggering interactive authorization flow if needed.
 
1360
  If interactive OAuth is required (expired refresh token, missing credentials, etc.),
1361
  the flow is coordinated globally via ReauthCoordinator to ensure only one
1362
  interactive OAuth flow runs at a time across all providers.
1363
+
1364
+ Args:
1365
+ creds_or_path: Either a credentials dict or path to credentials file.
1366
+ force_interactive: If True, skip expiry checks and force interactive OAuth.
1367
+ Use this when the refresh token is known to be invalid
1368
+ (e.g., after HTTP 400 from token endpoint).
1369
  """
1370
  path = creds_or_path if isinstance(creds_or_path, str) else None
1371
 
 
1385
  )
1386
 
1387
  reason = ""
1388
+ if force_interactive:
1389
+ reason = (
1390
+ "re-authentication was explicitly requested (refresh token invalid)"
1391
+ )
1392
+ elif not creds.get("refresh_token"):
1393
  reason = "refresh token is missing"
1394
  elif self._is_token_expired(creds):
1395
  reason = "token is expired"
 
1470
  f"No email found in iFlow credentials for '{path or 'in-memory object'}'."
1471
  )
1472
 
1473
+ # Update timestamp in cache only (not disk) to avoid overwriting
1474
+ # potentially newer tokens that were saved by another process/refresh.
1475
+ # The timestamp is non-critical metadata - losing it on restart is fine.
1476
  if path and "_proxy_metadata" in creds:
1477
  creds["_proxy_metadata"]["last_check_timestamp"] = time.time()
1478
+ # Note: We intentionally don't save to disk here because:
1479
+ # 1. The cache may have older tokens than disk (if external refresh occurred)
1480
+ # 2. Saving would overwrite the newer disk tokens with stale cached ones
1481
+ # 3. The timestamp is non-critical and will be updated on next refresh
1482
 
1483
  return {"email": email}
1484
  except Exception as e:
 
1599
  )
1600
 
1601
  # Step 4: Save credentials to file
1602
+ if not await self._save_credentials(str(file_path), new_creds):
1603
+ return IFlowCredentialSetupResult(
1604
+ success=False,
1605
+ error=f"Failed to save credentials to disk at {file_path.name}",
1606
+ )
1607
 
1608
  return IFlowCredentialSetupResult(
1609
  success=True,
src/rotator_library/providers/qwen_auth_base.py CHANGED
@@ -25,6 +25,7 @@ from rich.markup import escape as rich_escape
25
  from ..utils.headless_detection import is_headless_environment
26
  from ..utils.reauth_coordinator import get_reauth_coordinator
27
  from ..utils.resilient_io import safe_write_json
 
28
 
29
  lib_logger = logging.getLogger("rotator_library")
30
 
@@ -235,26 +236,42 @@ class QwenAuthBase:
235
  return env_creds
236
  raise # Re-raise the original file not found error
237
 
238
- async def _save_credentials(self, path: str, creds: Dict[str, Any]):
239
- """Save credentials with in-memory fallback if disk unavailable."""
240
- # Always update cache first (memory is reliable)
241
- self._credentials_cache[path] = creds
 
 
 
242
 
 
 
 
 
 
243
  # Don't save to file if credentials were loaded from environment
244
  if creds.get("_proxy_metadata", {}).get("loaded_from_env"):
 
245
  lib_logger.debug("Credentials loaded from env, skipping file save")
246
- return
247
 
248
- # Attempt disk write - if it fails, we still have the cache
249
- # buffer_on_failure ensures data is retried periodically and saved on shutdown
250
- if safe_write_json(
251
- path, creds, lib_logger, secure_permissions=True, buffer_on_failure=True
252
  ):
253
- lib_logger.debug(f"Saved updated Qwen OAuth credentials to '{path}'.")
254
- else:
255
- lib_logger.warning(
256
- "Qwen credentials cached in memory only (buffered for retry)."
257
  )
 
 
 
 
 
 
 
 
258
 
259
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
260
  expiry_timestamp = creds.get("expiry_date", 0) / 1000
@@ -275,10 +292,11 @@ class QwenAuthBase:
275
  if not force and cached_creds and not self._is_token_expired(cached_creds):
276
  return cached_creds
277
 
278
- # If cache is empty, read from file. This is safe because we hold the lock.
279
- if path not in self._credentials_cache:
280
- await self._read_creds_from_file(path)
281
-
 
282
  creds_from_file = self._credentials_cache[path]
283
 
284
  lib_logger.debug(f"Refreshing Qwen OAuth token for '{Path(path).name}'...")
@@ -291,7 +309,6 @@ class QwenAuthBase:
291
  max_retries = 3
292
  new_token_data = None
293
  last_error = None
294
- needs_reauth = False
295
 
296
  headers = {
297
  "Content-Type": "application/x-www-form-urlencoded",
@@ -324,14 +341,57 @@ class QwenAuthBase:
324
  f"HTTP {status_code} for '{Path(path).name}': {error_body}"
325
  )
326
 
327
- # [INVALID GRANT HANDLING] Handle 401/403 by triggering re-authentication
328
- if status_code in (401, 403):
329
- lib_logger.warning(
330
- f"Refresh token invalid for '{Path(path).name}' (HTTP {status_code}). "
331
- f"Token may have been revoked or expired. Starting re-authentication..."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
332
  )
333
- needs_reauth = True
334
- break # Exit retry loop to trigger re-auth
335
 
336
  elif status_code == 429:
337
  retry_after = int(e.response.headers.get("Retry-After", 60))
@@ -367,37 +427,6 @@ class QwenAuthBase:
367
  continue
368
  raise
369
 
370
- # [INVALID GRANT RE-AUTH] Trigger OAuth flow if refresh token is invalid
371
- if needs_reauth:
372
- lib_logger.info(
373
- f"Starting re-authentication for '{Path(path).name}'..."
374
- )
375
- try:
376
- # Call initialize_token to trigger OAuth flow
377
- new_creds = await self.initialize_token(path)
378
- # Clear backoff on successful re-auth
379
- self._refresh_failures.pop(path, None)
380
- self._next_refresh_after.pop(path, None)
381
- return new_creds
382
- except Exception as reauth_error:
383
- lib_logger.error(
384
- f"Re-authentication failed for '{Path(path).name}': {reauth_error}"
385
- )
386
- # [BACKOFF TRACKING] Increment failure count and set backoff timer
387
- self._refresh_failures[path] = (
388
- self._refresh_failures.get(path, 0) + 1
389
- )
390
- backoff_seconds = min(
391
- 300, 30 * (2 ** self._refresh_failures[path])
392
- ) # Max 5 min backoff
393
- self._next_refresh_after[path] = time.time() + backoff_seconds
394
- lib_logger.debug(
395
- f"Setting backoff for '{Path(path).name}': {backoff_seconds}s"
396
- )
397
- raise ValueError(
398
- f"Refresh token invalid and re-authentication failed: {reauth_error}"
399
- )
400
-
401
  if new_token_data is None:
402
  # [BACKOFF TRACKING] Increment failure count and set backoff timer
403
  self._refresh_failures[path] = self._refresh_failures.get(path, 0) + 1
@@ -440,11 +469,20 @@ class QwenAuthBase:
440
  self._refresh_failures.pop(path, None)
441
  self._next_refresh_after.pop(path, None)
442
 
443
- await self._save_credentials(path, creds_from_file)
 
 
 
 
 
 
 
 
 
444
  lib_logger.debug(
445
  f"Successfully refreshed Qwen OAuth token for '{Path(path).name}'."
446
  )
447
- return creds_from_file
448
 
449
  async def get_api_details(self, credential_identifier: str) -> Tuple[str, str]:
450
  """
@@ -689,12 +727,37 @@ class QwenAuthBase:
689
 
690
  except httpx.HTTPStatusError as e:
691
  status_code = e.response.status_code
692
- if status_code in (401, 403):
693
- # Invalid refresh token - route to re-auth queue
694
- lib_logger.warning(
695
- f"Refresh token invalid for '{Path(path).name}' (HTTP {status_code}). "
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
696
  f"Routing to re-auth queue."
697
  )
 
 
698
  self._queue_retry_count.pop(path, None) # Clear retry count
699
  async with self._queue_tracking_lock:
700
  self._queued_credentials.discard(
@@ -789,7 +852,7 @@ class QwenAuthBase:
789
 
790
  try:
791
  lib_logger.info(f"Starting re-auth for '{Path(path).name}'...")
792
- await self.initialize_token(path)
793
  lib_logger.info(f"Re-auth SUCCESS for '{Path(path).name}'")
794
 
795
  except Exception as e:
@@ -1001,14 +1064,20 @@ class QwenAuthBase:
1001
  }
1002
 
1003
  if path:
1004
- await self._save_credentials(path, creds)
 
 
 
 
1005
  lib_logger.info(
1006
  f"Qwen OAuth initialized successfully for '{display_name}'."
1007
  )
1008
  return creds
1009
 
1010
  async def initialize_token(
1011
- self, creds_or_path: Union[Dict[str, Any], str]
 
 
1012
  ) -> Dict[str, Any]:
1013
  """
1014
  Initialize OAuth token, triggering interactive device flow if needed.
@@ -1016,6 +1085,12 @@ class QwenAuthBase:
1016
  If interactive OAuth is required (expired refresh token, missing credentials, etc.),
1017
  the flow is coordinated globally via ReauthCoordinator to ensure only one
1018
  interactive OAuth flow runs at a time across all providers.
 
 
 
 
 
 
1019
  """
1020
  path = creds_or_path if isinstance(creds_or_path, str) else None
1021
 
@@ -1034,7 +1109,11 @@ class QwenAuthBase:
1034
  )
1035
 
1036
  reason = ""
1037
- if not creds.get("refresh_token"):
 
 
 
 
1038
  reason = "refresh token is missing"
1039
  elif self._is_token_expired(creds):
1040
  reason = "token is expired"
@@ -1108,10 +1187,15 @@ class QwenAuthBase:
1108
  f"No email found in _proxy_metadata for '{path or 'in-memory object'}'."
1109
  )
1110
 
1111
- # Update timestamp on check and save if it's a file-based credential
 
 
1112
  if path and "_proxy_metadata" in creds:
1113
  creds["_proxy_metadata"]["last_check_timestamp"] = time.time()
1114
- await self._save_credentials(path, creds)
 
 
 
1115
 
1116
  return {"email": email}
1117
  except Exception as e:
@@ -1230,7 +1314,11 @@ class QwenAuthBase:
1230
  )
1231
 
1232
  # Step 4: Save credentials to file
1233
- await self._save_credentials(str(file_path), new_creds)
 
 
 
 
1234
 
1235
  return QwenCredentialSetupResult(
1236
  success=True,
 
25
  from ..utils.headless_detection import is_headless_environment
26
  from ..utils.reauth_coordinator import get_reauth_coordinator
27
  from ..utils.resilient_io import safe_write_json
28
+ from ..error_handler import CredentialNeedsReauthError
29
 
30
  lib_logger = logging.getLogger("rotator_library")
31
 
 
236
  return env_creds
237
  raise # Re-raise the original file not found error
238
 
239
+ async def _save_credentials(self, path: str, creds: Dict[str, Any]) -> bool:
240
+ """Save credentials to disk, then update cache. Returns True only if disk write succeeded.
241
+
242
+ For providers with rotating refresh tokens (like Qwen), disk persistence is CRITICAL.
243
+ If we update the cache but fail to write to disk:
244
+ - The old refresh_token on disk is now invalid (consumed by API)
245
+ - On restart, we'd load the invalid token and require re-auth
246
 
247
+ By writing to disk FIRST, we ensure:
248
+ - Cache only updated after disk succeeds (guaranteed parity)
249
+ - If disk fails, cache keeps old tokens, refresh is retried
250
+ - No desync between cache and disk is possible
251
+ """
252
  # Don't save to file if credentials were loaded from environment
253
  if creds.get("_proxy_metadata", {}).get("loaded_from_env"):
254
+ self._credentials_cache[path] = creds
255
  lib_logger.debug("Credentials loaded from env, skipping file save")
256
+ return True
257
 
258
+ # Write to disk FIRST - do NOT buffer on failure for rotating tokens
259
+ # Buffering is dangerous because the refresh_token may be stale by retry time
260
+ if not safe_write_json(
261
+ path, creds, lib_logger, secure_permissions=True, buffer_on_failure=False
262
  ):
263
+ lib_logger.error(
264
+ f"Failed to write Qwen credentials to disk for '{Path(path).name}'. "
265
+ f"Cache NOT updated to maintain parity with disk."
 
266
  )
267
+ return False
268
+
269
+ # Disk write succeeded - now update cache (guaranteed parity)
270
+ self._credentials_cache[path] = creds
271
+ lib_logger.debug(
272
+ f"Saved updated Qwen OAuth credentials to '{Path(path).name}'."
273
+ )
274
+ return True
275
 
276
  def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
277
  expiry_timestamp = creds.get("expiry_date", 0) / 1000
 
292
  if not force and cached_creds and not self._is_token_expired(cached_creds):
293
  return cached_creds
294
 
295
+ # [ROTATING TOKEN FIX] Always read fresh from disk before refresh.
296
+ # Qwen uses rotating refresh tokens - each refresh invalidates the previous token.
297
+ # If we use a stale cached token, refresh will fail with HTTP 400.
298
+ # Reading fresh from disk ensures we have the latest token.
299
+ await self._read_creds_from_file(path)
300
  creds_from_file = self._credentials_cache[path]
301
 
302
  lib_logger.debug(f"Refreshing Qwen OAuth token for '{Path(path).name}'...")
 
309
  max_retries = 3
310
  new_token_data = None
311
  last_error = None
 
312
 
313
  headers = {
314
  "Content-Type": "application/x-www-form-urlencoded",
 
341
  f"HTTP {status_code} for '{Path(path).name}': {error_body}"
342
  )
343
 
344
+ # [INVALID GRANT HANDLING] Handle 400/401/403 by raising
345
+ # The caller (_process_refresh_queue or initialize_token) will handle re-auth
346
+ # We must NOT call initialize_token from here as we hold a lock (would deadlock)
347
+ if status_code == 400:
348
+ # Check if this is an invalid refresh token error
349
+ try:
350
+ error_data = e.response.json()
351
+ error_type = error_data.get("error", "")
352
+ error_desc = error_data.get("error_description", "")
353
+ except Exception:
354
+ error_type = ""
355
+ error_desc = error_body
356
+
357
+ if (
358
+ "invalid" in error_desc.lower()
359
+ or error_type == "invalid_request"
360
+ ):
361
+ lib_logger.info(
362
+ f"Credential '{Path(path).name}' needs re-auth (HTTP 400: {error_desc}). "
363
+ f"Queued for re-authentication, rotating to next credential."
364
+ )
365
+ # Queue for re-auth in background (non-blocking, fire-and-forget)
366
+ # This ensures credential gets fixed even if caller doesn't handle it
367
+ asyncio.create_task(
368
+ self._queue_refresh(
369
+ path, force=True, needs_reauth=True
370
+ )
371
+ )
372
+ # Raise rotatable error instead of raw HTTPStatusError
373
+ raise CredentialNeedsReauthError(
374
+ credential_path=path,
375
+ message=f"Refresh token invalid for '{Path(path).name}'. Re-auth queued.",
376
+ )
377
+ else:
378
+ # Other 400 error - raise it
379
+ raise
380
+
381
+ elif status_code in (401, 403):
382
+ lib_logger.info(
383
+ f"Credential '{Path(path).name}' needs re-auth (HTTP {status_code}). "
384
+ f"Queued for re-authentication, rotating to next credential."
385
+ )
386
+ # Queue for re-auth in background (non-blocking, fire-and-forget)
387
+ asyncio.create_task(
388
+ self._queue_refresh(path, force=True, needs_reauth=True)
389
+ )
390
+ # Raise rotatable error instead of raw HTTPStatusError
391
+ raise CredentialNeedsReauthError(
392
+ credential_path=path,
393
+ message=f"Token invalid for '{Path(path).name}' (HTTP {status_code}). Re-auth queued.",
394
  )
 
 
395
 
396
  elif status_code == 429:
397
  retry_after = int(e.response.headers.get("Retry-After", 60))
 
427
  continue
428
  raise
429
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
430
  if new_token_data is None:
431
  # [BACKOFF TRACKING] Increment failure count and set backoff timer
432
  self._refresh_failures[path] = self._refresh_failures.get(path, 0) + 1
 
469
  self._refresh_failures.pop(path, None)
470
  self._next_refresh_after.pop(path, None)
471
 
472
+ # Save credentials - MUST succeed for rotating token providers
473
+ if not await self._save_credentials(path, creds_from_file):
474
+ # CRITICAL: For rotating tokens, if we can't persist the new token,
475
+ # the old token is already invalidated by Qwen. This is a critical failure.
476
+ # Raise an error so retry logic kicks in.
477
+ raise IOError(
478
+ f"Failed to persist refreshed credentials for '{Path(path).name}'. "
479
+ f"Disk write failed - refresh will be retried."
480
+ )
481
+
482
  lib_logger.debug(
483
  f"Successfully refreshed Qwen OAuth token for '{Path(path).name}'."
484
  )
485
+ return self._credentials_cache[path] # Return from cache (synced with disk)
486
 
487
  async def get_api_details(self, credential_identifier: str) -> Tuple[str, str]:
488
  """
 
727
 
728
  except httpx.HTTPStatusError as e:
729
  status_code = e.response.status_code
730
+ # Check for invalid refresh token errors (400/401/403)
731
+ # These need to be routed to re-auth queue for interactive OAuth
732
+ needs_reauth = False
733
+
734
+ if status_code == 400:
735
+ # Check if this is an invalid refresh token error
736
+ try:
737
+ error_data = e.response.json()
738
+ error_type = error_data.get("error", "")
739
+ error_desc = error_data.get("error_description", "")
740
+ except Exception:
741
+ error_type = ""
742
+ error_desc = str(e)
743
+
744
+ if (
745
+ "invalid" in error_desc.lower()
746
+ or error_type == "invalid_request"
747
+ ):
748
+ needs_reauth = True
749
+ lib_logger.info(
750
+ f"Credential '{Path(path).name}' needs re-auth (HTTP 400: {error_desc}). "
751
+ f"Routing to re-auth queue."
752
+ )
753
+ elif status_code in (401, 403):
754
+ needs_reauth = True
755
+ lib_logger.info(
756
+ f"Credential '{Path(path).name}' needs re-auth (HTTP {status_code}). "
757
  f"Routing to re-auth queue."
758
  )
759
+
760
+ if needs_reauth:
761
  self._queue_retry_count.pop(path, None) # Clear retry count
762
  async with self._queue_tracking_lock:
763
  self._queued_credentials.discard(
 
852
 
853
  try:
854
  lib_logger.info(f"Starting re-auth for '{Path(path).name}'...")
855
+ await self.initialize_token(path, force_interactive=True)
856
  lib_logger.info(f"Re-auth SUCCESS for '{Path(path).name}'")
857
 
858
  except Exception as e:
 
1064
  }
1065
 
1066
  if path:
1067
+ if not await self._save_credentials(path, creds):
1068
+ raise IOError(
1069
+ f"Failed to save OAuth credentials to disk for '{display_name}'. "
1070
+ f"Please retry authentication."
1071
+ )
1072
  lib_logger.info(
1073
  f"Qwen OAuth initialized successfully for '{display_name}'."
1074
  )
1075
  return creds
1076
 
1077
  async def initialize_token(
1078
+ self,
1079
+ creds_or_path: Union[Dict[str, Any], str],
1080
+ force_interactive: bool = False,
1081
  ) -> Dict[str, Any]:
1082
  """
1083
  Initialize OAuth token, triggering interactive device flow if needed.
 
1085
  If interactive OAuth is required (expired refresh token, missing credentials, etc.),
1086
  the flow is coordinated globally via ReauthCoordinator to ensure only one
1087
  interactive OAuth flow runs at a time across all providers.
1088
+
1089
+ Args:
1090
+ creds_or_path: Either a credentials dict or path to credentials file.
1091
+ force_interactive: If True, skip expiry checks and force interactive OAuth.
1092
+ Use this when the refresh token is known to be invalid
1093
+ (e.g., after HTTP 400 from token endpoint).
1094
  """
1095
  path = creds_or_path if isinstance(creds_or_path, str) else None
1096
 
 
1109
  )
1110
 
1111
  reason = ""
1112
+ if force_interactive:
1113
+ reason = (
1114
+ "re-authentication was explicitly requested (refresh token invalid)"
1115
+ )
1116
+ elif not creds.get("refresh_token"):
1117
  reason = "refresh token is missing"
1118
  elif self._is_token_expired(creds):
1119
  reason = "token is expired"
 
1187
  f"No email found in _proxy_metadata for '{path or 'in-memory object'}'."
1188
  )
1189
 
1190
+ # Update timestamp in cache only (not disk) to avoid overwriting
1191
+ # potentially newer tokens that were saved by another process/refresh.
1192
+ # The timestamp is non-critical metadata - losing it on restart is fine.
1193
  if path and "_proxy_metadata" in creds:
1194
  creds["_proxy_metadata"]["last_check_timestamp"] = time.time()
1195
+ # Note: We intentionally don't save to disk here because:
1196
+ # 1. The cache may have older tokens than disk (if external refresh occurred)
1197
+ # 2. Saving would overwrite the newer disk tokens with stale cached ones
1198
+ # 3. The timestamp is non-critical and will be updated on next refresh
1199
 
1200
  return {"email": email}
1201
  except Exception as e:
 
1314
  )
1315
 
1316
  # Step 4: Save credentials to file
1317
+ if not await self._save_credentials(str(file_path), new_creds):
1318
+ return QwenCredentialSetupResult(
1319
+ success=False,
1320
+ error=f"Failed to save credentials to disk at {file_path.name}",
1321
+ )
1322
 
1323
  return QwenCredentialSetupResult(
1324
  success=True,