Mirrowel commited on
Commit
372b6c5
·
1 Parent(s): 5f94b93

feat(auth): ✨ add sequential credential refresh queue and availability checks

Browse files

Introduce a per-provider refresh queue and background processor to serialize token refreshes and interactive re-auth flows. Changes include:
- add _refresh_queue, _queued_credentials, _unavailable_credentials, queue tracking lock and background _process_refresh_queue to gemini/iflow/qwen auth base classes
- add _queue_refresh, _ensure_queue_processor_running and is_credential_available helpers
- change proactive refresh paths to enqueue automated refreshes (preserving exponential backoff) and allow re-auth to bypass backoff when needed
- update RotatingClient to filter out credentials that are currently marked unavailable (queued for re-auth), preferring available creds but falling back if all are unavailable

This reduces concurrent refresh attempts, prevents selecting keys undergoing re-auth, and centralizes refresh/backoff behavior without changing external APIs.

src/rotator_library/client.py CHANGED
@@ -630,6 +630,18 @@ class RotatingClient:
630
  # multiple keys have the same usage stats.
631
  credentials_for_provider = list(self.all_credentials[provider])
632
  random.shuffle(credentials_for_provider)
 
 
 
 
 
 
 
 
 
 
 
 
633
 
634
  tried_creds = set()
635
  last_exception = None
@@ -992,6 +1004,18 @@ class RotatingClient:
992
  # Create a mutable copy of the keys and shuffle it.
993
  credentials_for_provider = list(self.all_credentials[provider])
994
  random.shuffle(credentials_for_provider)
 
 
 
 
 
 
 
 
 
 
 
 
995
 
996
  deadline = time.time() + self.global_timeout
997
  tried_creds = set()
 
630
  # multiple keys have the same usage stats.
631
  credentials_for_provider = list(self.all_credentials[provider])
632
  random.shuffle(credentials_for_provider)
633
+
634
+ # Filter out credentials that are unavailable (queued for re-auth)
635
+ provider_plugin = self._get_provider_instance(provider)
636
+ if provider_plugin and hasattr(provider_plugin, 'is_credential_available'):
637
+ available_creds = [
638
+ cred for cred in credentials_for_provider
639
+ if provider_plugin.is_credential_available(cred)
640
+ ]
641
+ if available_creds:
642
+ credentials_for_provider = available_creds
643
+ # If all credentials are unavailable, keep the original list
644
+ # (better to try unavailable creds than fail immediately)
645
 
646
  tried_creds = set()
647
  last_exception = None
 
1004
  # Create a mutable copy of the keys and shuffle it.
1005
  credentials_for_provider = list(self.all_credentials[provider])
1006
  random.shuffle(credentials_for_provider)
1007
+
1008
+ # Filter out credentials that are unavailable (queued for re-auth)
1009
+ provider_plugin = self._get_provider_instance(provider)
1010
+ if provider_plugin and hasattr(provider_plugin, 'is_credential_available'):
1011
+ available_creds = [
1012
+ cred for cred in credentials_for_provider
1013
+ if provider_plugin.is_credential_available(cred)
1014
+ ]
1015
+ if available_creds:
1016
+ credentials_for_provider = available_creds
1017
+ # If all credentials are unavailable, keep the original list
1018
+ # (better to try unavailable creds than fail immediately)
1019
 
1020
  deadline = time.time() + self.global_timeout
1021
  tried_creds = set()
src/rotator_library/providers/gemini_auth_base.py CHANGED
@@ -37,6 +37,13 @@ class GeminiAuthBase:
37
  # [BACKOFF TRACKING] Track consecutive failures per credential
38
  self._refresh_failures: Dict[str, int] = {} # Track consecutive failures per credential
39
  self._next_refresh_after: Dict[str, float] = {} # Track backoff timers (Unix timestamp)
 
 
 
 
 
 
 
40
 
41
  def _load_from_env(self) -> Optional[Dict[str, Any]]:
42
  """
@@ -342,37 +349,11 @@ class GeminiAuthBase:
342
  return creds
343
 
344
  async def proactively_refresh(self, credential_path: str):
345
- # [BACKOFF] Check if refresh is in backoff period (matches Go's refreshFailureBackoff)
346
- now = time.time()
347
- if credential_path in self._next_refresh_after:
348
- backoff_until = self._next_refresh_after[credential_path]
349
- if now < backoff_until:
350
- remaining = int(backoff_until - now)
351
- lib_logger.debug(f"Skipping refresh for '{Path(credential_path).name}' (in backoff for {remaining}s)")
352
- return
353
-
354
  creds = await self._load_credentials(credential_path)
355
  if self._is_token_expired(creds):
356
- try:
357
- await self._refresh_token(credential_path, creds)
358
- # [SUCCESS] Clear failure tracking on successful refresh
359
- self._refresh_failures.pop(credential_path, None)
360
- self._next_refresh_after.pop(credential_path, None)
361
- lib_logger.debug(f"Successfully refreshed '{Path(credential_path).name}', cleared failure tracking")
362
- except Exception as e:
363
- # [FAILURE] Increment failure count and set exponential backoff
364
- failures = self._refresh_failures.get(credential_path, 0) + 1
365
- self._refresh_failures[credential_path] = failures
366
-
367
- # Exponential backoff: 5min → 10min → 20min → 40min → max 1 hour
368
- backoff_seconds = min(300 * (2 ** (failures - 1)), 3600)
369
- self._next_refresh_after[credential_path] = now + backoff_seconds
370
-
371
- lib_logger.error(
372
- f"Refresh failed for '{Path(credential_path).name}' "
373
- f"(attempt {failures}). Next retry in {backoff_seconds}s. Error: {e}"
374
- )
375
- # Don't re-raise - let background refresher continue with other credentials
376
 
377
  async def _get_lock(self, path: str) -> asyncio.Lock:
378
  # [FIX RACE CONDITION] Protect lock creation with a master lock
@@ -382,6 +363,92 @@ class GeminiAuthBase:
382
  self._refresh_locks[path] = asyncio.Lock()
383
  return self._refresh_locks[path]
384
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
385
  async def initialize_token(self, creds_or_path: Union[Dict[str, Any], str]) -> Dict[str, Any]:
386
  path = creds_or_path if isinstance(creds_or_path, str) else None
387
 
 
37
  # [BACKOFF TRACKING] Track consecutive failures per credential
38
  self._refresh_failures: Dict[str, int] = {} # Track consecutive failures per credential
39
  self._next_refresh_after: Dict[str, float] = {} # Track backoff timers (Unix timestamp)
40
+
41
+ # [QUEUE SYSTEM] Sequential refresh processing
42
+ self._refresh_queue: asyncio.Queue = asyncio.Queue()
43
+ self._queued_credentials: set = set() # Track credentials already in queue
44
+ self._unavailable_credentials: set = set() # Mark credentials unavailable during re-auth
45
+ self._queue_tracking_lock = asyncio.Lock() # Protects queue sets
46
+ self._queue_processor_task: Optional[asyncio.Task] = None # Background worker task
47
 
48
  def _load_from_env(self) -> Optional[Dict[str, Any]]:
49
  """
 
349
  return creds
350
 
351
  async def proactively_refresh(self, credential_path: str):
352
+ """Proactively refresh a credential by queueing it for refresh."""
 
 
 
 
 
 
 
 
353
  creds = await self._load_credentials(credential_path)
354
  if self._is_token_expired(creds):
355
+ # Queue for refresh with needs_reauth=False (automated refresh)
356
+ await self._queue_refresh(credential_path, force=False, needs_reauth=False)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
357
 
358
  async def _get_lock(self, path: str) -> asyncio.Lock:
359
  # [FIX RACE CONDITION] Protect lock creation with a master lock
 
363
  self._refresh_locks[path] = asyncio.Lock()
364
  return self._refresh_locks[path]
365
 
366
+ def is_credential_available(self, path: str) -> bool:
367
+ """Check if a credential is available for rotation (not queued/refreshing)."""
368
+ return path not in self._unavailable_credentials
369
+
370
+ async def _ensure_queue_processor_running(self):
371
+ """Lazily starts the queue processor if not already running."""
372
+ if self._queue_processor_task is None or self._queue_processor_task.done():
373
+ self._queue_processor_task = asyncio.create_task(self._process_refresh_queue())
374
+
375
+ async def _queue_refresh(self, path: str, force: bool = False, needs_reauth: bool = False):
376
+ """Add a credential to the refresh queue if not already queued.
377
+
378
+ Args:
379
+ path: Credential file path
380
+ force: Force refresh even if not expired
381
+ needs_reauth: True if full re-authentication needed (bypasses backoff)
382
+ """
383
+ # IMPORTANT: Only check backoff for simple automated refreshes
384
+ # Re-authentication (interactive OAuth) should BYPASS backoff since it needs user input
385
+ if not needs_reauth:
386
+ now = time.time()
387
+ if path in self._next_refresh_after:
388
+ backoff_until = self._next_refresh_after[path]
389
+ if now < backoff_until:
390
+ # Credential is in backoff for automated refresh, do not queue
391
+ remaining = int(backoff_until - now)
392
+ lib_logger.debug(f"Skipping automated refresh for '{Path(path).name}' (in backoff for {remaining}s)")
393
+ return
394
+
395
+ async with self._queue_tracking_lock:
396
+ if path not in self._queued_credentials:
397
+ self._queued_credentials.add(path)
398
+ self._unavailable_credentials.add(path) # Mark as unavailable
399
+ await self._refresh_queue.put((path, force, needs_reauth))
400
+ await self._ensure_queue_processor_running()
401
+
402
+ async def _process_refresh_queue(self):
403
+ """Background worker that processes refresh requests sequentially."""
404
+ while True:
405
+ path = None
406
+ try:
407
+ # Wait for an item with timeout to allow graceful shutdown
408
+ try:
409
+ path, force, needs_reauth = await asyncio.wait_for(
410
+ self._refresh_queue.get(),
411
+ timeout=60.0
412
+ )
413
+ except asyncio.TimeoutError:
414
+ # No items for 60s, exit to save resources
415
+ self._queue_processor_task = None
416
+ return
417
+
418
+ try:
419
+ # Perform the actual refresh (still using per-credential lock)
420
+ async with await self._get_lock(path):
421
+ # Re-check if still expired (may have changed since queueing)
422
+ creds = self._credentials_cache.get(path)
423
+ if creds and not self._is_token_expired(creds):
424
+ # No longer expired, mark as available
425
+ async with self._queue_tracking_lock:
426
+ self._unavailable_credentials.discard(path)
427
+ continue
428
+
429
+ # Perform refresh
430
+ if not creds:
431
+ creds = await self._load_credentials(path)
432
+ await self._refresh_token(path, creds, force=force)
433
+
434
+ # SUCCESS: Mark as available again
435
+ async with self._queue_tracking_lock:
436
+ self._unavailable_credentials.discard(path)
437
+
438
+ finally:
439
+ # Remove from queued set
440
+ async with self._queue_tracking_lock:
441
+ self._queued_credentials.discard(path)
442
+ self._refresh_queue.task_done()
443
+ except asyncio.CancelledError:
444
+ break
445
+ except Exception as e:
446
+ lib_logger.error(f"Error in queue processor: {e}")
447
+ # Even on error, mark as available (backoff will prevent immediate retry)
448
+ if path:
449
+ async with self._queue_tracking_lock:
450
+ self._unavailable_credentials.discard(path)
451
+
452
  async def initialize_token(self, creds_or_path: Union[Dict[str, Any], str]) -> Dict[str, Any]:
453
  path = creds_or_path if isinstance(creds_or_path, str) else None
454
 
src/rotator_library/providers/iflow_auth_base.py CHANGED
@@ -150,6 +150,13 @@ class IFlowAuthBase:
150
  # [BACKOFF TRACKING] Track consecutive failures per credential
151
  self._refresh_failures: Dict[str, int] = {} # Track consecutive failures per credential
152
  self._next_refresh_after: Dict[str, float] = {} # Track backoff timers (Unix timestamp)
 
 
 
 
 
 
 
153
 
154
  def _load_from_env(self) -> Optional[Dict[str, Any]]:
155
  """
@@ -583,36 +590,10 @@ class IFlowAuthBase:
583
  if not os.path.isfile(credential_identifier):
584
  return # Direct API key, no refresh needed
585
 
586
- # [BACKOFF] Check if refresh is in backoff period
587
- now = time.time()
588
- if credential_identifier in self._next_refresh_after:
589
- backoff_until = self._next_refresh_after[credential_identifier]
590
- if now < backoff_until:
591
- remaining = int(backoff_until - now)
592
- lib_logger.debug(f"Skipping refresh for '{Path(credential_identifier).name}' (in backoff for {remaining}s)")
593
- return
594
-
595
  creds = await self._load_credentials(credential_identifier)
596
  if self._is_token_expired(creds):
597
- try:
598
- await self._refresh_token(credential_identifier)
599
- # [SUCCESS] Clear failure tracking
600
- self._refresh_failures.pop(credential_identifier, None)
601
- self._next_refresh_after.pop(credential_identifier, None)
602
- lib_logger.debug(f"Successfully refreshed '{Path(credential_identifier).name}', cleared failure tracking")
603
- except Exception as e:
604
- # [FAILURE] Increment failure count and set exponential backoff
605
- failures = self._refresh_failures.get(credential_identifier, 0) + 1
606
- self._refresh_failures[credential_identifier] = failures
607
-
608
- # Exponential backoff: 5min → 10min → 20min → max 1 hour
609
- backoff_seconds = min(300 * (2 ** (failures - 1)), 3600)
610
- self._next_refresh_after[credential_identifier] = now + backoff_seconds
611
-
612
- lib_logger.error(
613
- f"Refresh failed for '{Path(credential_identifier).name}' "
614
- f"(attempt {failures}). Next retry in {backoff_seconds}s. Error: {e}"
615
- )
616
 
617
  async def _get_lock(self, path: str) -> asyncio.Lock:
618
  """Gets or creates a lock for the given credential path."""
@@ -622,6 +603,92 @@ class IFlowAuthBase:
622
  self._refresh_locks[path] = asyncio.Lock()
623
  return self._refresh_locks[path]
624
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
625
  async def initialize_token(self, creds_or_path: Union[Dict[str, Any], str]) -> Dict[str, Any]:
626
  """
627
  Initiates OAuth authorization code flow if tokens are missing or invalid.
 
150
  # [BACKOFF TRACKING] Track consecutive failures per credential
151
  self._refresh_failures: Dict[str, int] = {} # Track consecutive failures per credential
152
  self._next_refresh_after: Dict[str, float] = {} # Track backoff timers (Unix timestamp)
153
+
154
+ # [QUEUE SYSTEM] Sequential refresh processing
155
+ self._refresh_queue: asyncio.Queue = asyncio.Queue()
156
+ self._queued_credentials: set = set() # Track credentials already in queue
157
+ self._unavailable_credentials: set = set() # Mark credentials unavailable during re-auth
158
+ self._queue_tracking_lock = asyncio.Lock() # Protects queue sets
159
+ self._queue_processor_task: Optional[asyncio.Task] = None # Background worker task
160
 
161
  def _load_from_env(self) -> Optional[Dict[str, Any]]:
162
  """
 
590
  if not os.path.isfile(credential_identifier):
591
  return # Direct API key, no refresh needed
592
 
 
 
 
 
 
 
 
 
 
593
  creds = await self._load_credentials(credential_identifier)
594
  if self._is_token_expired(creds):
595
+ # Queue for refresh with needs_reauth=False (automated refresh)
596
+ await self._queue_refresh(credential_identifier, force=False, needs_reauth=False)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
597
 
598
  async def _get_lock(self, path: str) -> asyncio.Lock:
599
  """Gets or creates a lock for the given credential path."""
 
603
  self._refresh_locks[path] = asyncio.Lock()
604
  return self._refresh_locks[path]
605
 
606
+ def is_credential_available(self, path: str) -> bool:
607
+ """Check if a credential is available for rotation (not queued/refreshing)."""
608
+ return path not in self._unavailable_credentials
609
+
610
+ async def _ensure_queue_processor_running(self):
611
+ """Lazily starts the queue processor if not already running."""
612
+ if self._queue_processor_task is None or self._queue_processor_task.done():
613
+ self._queue_processor_task = asyncio.create_task(self._process_refresh_queue())
614
+
615
+ async def _queue_refresh(self, path: str, force: bool = False, needs_reauth: bool = False):
616
+ """Add a credential to the refresh queue if not already queued.
617
+
618
+ Args:
619
+ path: Credential file path
620
+ force: Force refresh even if not expired
621
+ needs_reauth: True if full re-authentication needed (bypasses backoff)
622
+ """
623
+ # IMPORTANT: Only check backoff for simple automated refreshes
624
+ # Re-authentication (interactive OAuth) should BYPASS backoff since it needs user input
625
+ if not needs_reauth:
626
+ now = time.time()
627
+ if path in self._next_refresh_after:
628
+ backoff_until = self._next_refresh_after[path]
629
+ if now < backoff_until:
630
+ # Credential is in backoff for automated refresh, do not queue
631
+ remaining = int(backoff_until - now)
632
+ lib_logger.debug(f"Skipping automated refresh for '{Path(path).name}' (in backoff for {remaining}s)")
633
+ return
634
+
635
+ async with self._queue_tracking_lock:
636
+ if path not in self._queued_credentials:
637
+ self._queued_credentials.add(path)
638
+ self._unavailable_credentials.add(path) # Mark as unavailable
639
+ await self._refresh_queue.put((path, force, needs_reauth))
640
+ await self._ensure_queue_processor_running()
641
+
642
+ async def _process_refresh_queue(self):
643
+ """Background worker that processes refresh requests sequentially."""
644
+ while True:
645
+ path = None
646
+ try:
647
+ # Wait for an item with timeout to allow graceful shutdown
648
+ try:
649
+ path, force, needs_reauth = await asyncio.wait_for(
650
+ self._refresh_queue.get(),
651
+ timeout=60.0
652
+ )
653
+ except asyncio.TimeoutError:
654
+ # No items for 60s, exit to save resources
655
+ self._queue_processor_task = None
656
+ return
657
+
658
+ try:
659
+ # Perform the actual refresh (still using per-credential lock)
660
+ async with await self._get_lock(path):
661
+ # Re-check if still expired (may have changed since queueing)
662
+ creds = self._credentials_cache.get(path)
663
+ if creds and not self._is_token_expired(creds):
664
+ # No longer expired, mark as available
665
+ async with self._queue_tracking_lock:
666
+ self._unavailable_credentials.discard(path)
667
+ continue
668
+
669
+ # Perform refresh
670
+ if not creds:
671
+ creds = await self._load_credentials(path)
672
+ await self._refresh_token(path, force=force)
673
+
674
+ # SUCCESS: Mark as available again
675
+ async with self._queue_tracking_lock:
676
+ self._unavailable_credentials.discard(path)
677
+
678
+ finally:
679
+ # Remove from queued set
680
+ async with self._queue_tracking_lock:
681
+ self._queued_credentials.discard(path)
682
+ self._refresh_queue.task_done()
683
+ except asyncio.CancelledError:
684
+ break
685
+ except Exception as e:
686
+ lib_logger.error(f"Error in queue processor: {e}")
687
+ # Even on error, mark as available (backoff will prevent immediate retry)
688
+ if path:
689
+ async with self._queue_tracking_lock:
690
+ self._unavailable_credentials.discard(path)
691
+
692
  async def initialize_token(self, creds_or_path: Union[Dict[str, Any], str]) -> Dict[str, Any]:
693
  """
694
  Initiates OAuth authorization code flow if tokens are missing or invalid.
src/rotator_library/providers/qwen_auth_base.py CHANGED
@@ -39,6 +39,13 @@ class QwenAuthBase:
39
  # [BACKOFF TRACKING] Track consecutive failures per credential
40
  self._refresh_failures: Dict[str, int] = {} # Track consecutive failures per credential
41
  self._next_refresh_after: Dict[str, float] = {} # Track backoff timers (Unix timestamp)
 
 
 
 
 
 
 
42
 
43
  def _load_from_env(self) -> Optional[Dict[str, Any]]:
44
  """
@@ -327,36 +334,10 @@ class QwenAuthBase:
327
  if not os.path.isfile(credential_identifier):
328
  return # Direct API key, no refresh needed
329
 
330
- # [BACKOFF] Check if refresh is in backoff period
331
- now = time.time()
332
- if credential_identifier in self._next_refresh_after:
333
- backoff_until = self._next_refresh_after[credential_identifier]
334
- if now < backoff_until:
335
- remaining = int(backoff_until - now)
336
- lib_logger.debug(f"Skipping refresh for '{Path(credential_identifier).name}' (in backoff for {remaining}s)")
337
- return
338
-
339
  creds = await self._load_credentials(credential_identifier)
340
  if self._is_token_expired(creds):
341
- try:
342
- await self._refresh_token(credential_identifier)
343
- # [SUCCESS] Clear failure tracking
344
- self._refresh_failures.pop(credential_identifier, None)
345
- self._next_refresh_after.pop(credential_identifier, None)
346
- lib_logger.debug(f"Successfully refreshed '{Path(credential_identifier).name}', cleared failure tracking")
347
- except Exception as e:
348
- # [FAILURE] Increment failure count and set exponential backoff
349
- failures = self._refresh_failures.get(credential_identifier, 0) + 1
350
- self._refresh_failures[credential_identifier] = failures
351
-
352
- # Exponential backoff: 5min → 10min → 20min → max 1 hour
353
- backoff_seconds = min(300 * (2 ** (failures - 1)), 3600)
354
- self._next_refresh_after[credential_identifier] = now + backoff_seconds
355
-
356
- lib_logger.error(
357
- f"Refresh failed for '{Path(credential_identifier).name}' "
358
- f"(attempt {failures}). Next retry in {backoff_seconds}s. Error: {e}"
359
- )
360
 
361
  async def _get_lock(self, path: str) -> asyncio.Lock:
362
  # [FIX RACE CONDITION] Protect lock creation with a master lock
@@ -365,6 +346,92 @@ class QwenAuthBase:
365
  self._refresh_locks[path] = asyncio.Lock()
366
  return self._refresh_locks[path]
367
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
368
  async def initialize_token(self, creds_or_path: Union[Dict[str, Any], str]) -> Dict[str, Any]:
369
  """Initiates device flow if tokens are missing or invalid."""
370
  path = creds_or_path if isinstance(creds_or_path, str) else None
 
39
  # [BACKOFF TRACKING] Track consecutive failures per credential
40
  self._refresh_failures: Dict[str, int] = {} # Track consecutive failures per credential
41
  self._next_refresh_after: Dict[str, float] = {} # Track backoff timers (Unix timestamp)
42
+
43
+ # [QUEUE SYSTEM] Sequential refresh processing
44
+ self._refresh_queue: asyncio.Queue = asyncio.Queue()
45
+ self._queued_credentials: set = set() # Track credentials already in queue
46
+ self._unavailable_credentials: set = set() # Mark credentials unavailable during re-auth
47
+ self._queue_tracking_lock = asyncio.Lock() # Protects queue sets
48
+ self._queue_processor_task: Optional[asyncio.Task] = None # Background worker task
49
 
50
  def _load_from_env(self) -> Optional[Dict[str, Any]]:
51
  """
 
334
  if not os.path.isfile(credential_identifier):
335
  return # Direct API key, no refresh needed
336
 
 
 
 
 
 
 
 
 
 
337
  creds = await self._load_credentials(credential_identifier)
338
  if self._is_token_expired(creds):
339
+ # Queue for refresh with needs_reauth=False (automated refresh)
340
+ await self._queue_refresh(credential_identifier, force=False, needs_reauth=False)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
341
 
342
  async def _get_lock(self, path: str) -> asyncio.Lock:
343
  # [FIX RACE CONDITION] Protect lock creation with a master lock
 
346
  self._refresh_locks[path] = asyncio.Lock()
347
  return self._refresh_locks[path]
348
 
349
+ def is_credential_available(self, path: str) -> bool:
350
+ """Check if a credential is available for rotation (not queued/refreshing)."""
351
+ return path not in self._unavailable_credentials
352
+
353
+ async def _ensure_queue_processor_running(self):
354
+ """Lazily starts the queue processor if not already running."""
355
+ if self._queue_processor_task is None or self._queue_processor_task.done():
356
+ self._queue_processor_task = asyncio.create_task(self._process_refresh_queue())
357
+
358
+ async def _queue_refresh(self, path: str, force: bool = False, needs_reauth: bool = False):
359
+ """Add a credential to the refresh queue if not already queued.
360
+
361
+ Args:
362
+ path: Credential file path
363
+ force: Force refresh even if not expired
364
+ needs_reauth: True if full re-authentication needed (bypasses backoff)
365
+ """
366
+ # IMPORTANT: Only check backoff for simple automated refreshes
367
+ # Re-authentication (interactive OAuth) should BYPASS backoff since it needs user input
368
+ if not needs_reauth:
369
+ now = time.time()
370
+ if path in self._next_refresh_after:
371
+ backoff_until = self._next_refresh_after[path]
372
+ if now < backoff_until:
373
+ # Credential is in backoff for automated refresh, do not queue
374
+ remaining = int(backoff_until - now)
375
+ lib_logger.debug(f"Skipping automated refresh for '{Path(path).name}' (in backoff for {remaining}s)")
376
+ return
377
+
378
+ async with self._queue_tracking_lock:
379
+ if path not in self._queued_credentials:
380
+ self._queued_credentials.add(path)
381
+ self._unavailable_credentials.add(path) # Mark as unavailable
382
+ await self._refresh_queue.put((path, force, needs_reauth))
383
+ await self._ensure_queue_processor_running()
384
+
385
+ async def _process_refresh_queue(self):
386
+ """Background worker that processes refresh requests sequentially."""
387
+ while True:
388
+ path = None
389
+ try:
390
+ # Wait for an item with timeout to allow graceful shutdown
391
+ try:
392
+ path, force, needs_reauth = await asyncio.wait_for(
393
+ self._refresh_queue.get(),
394
+ timeout=60.0
395
+ )
396
+ except asyncio.TimeoutError:
397
+ # No items for 60s, exit to save resources
398
+ self._queue_processor_task = None
399
+ return
400
+
401
+ try:
402
+ # Perform the actual refresh (still using per-credential lock)
403
+ async with await self._get_lock(path):
404
+ # Re-check if still expired (may have changed since queueing)
405
+ creds = self._credentials_cache.get(path)
406
+ if creds and not self._is_token_expired(creds):
407
+ # No longer expired, mark as available
408
+ async with self._queue_tracking_lock:
409
+ self._unavailable_credentials.discard(path)
410
+ continue
411
+
412
+ # Perform refresh
413
+ if not creds:
414
+ creds = await self._load_credentials(path)
415
+ await self._refresh_token(path, force=force)
416
+
417
+ # SUCCESS: Mark as available again
418
+ async with self._queue_tracking_lock:
419
+ self._unavailable_credentials.discard(path)
420
+
421
+ finally:
422
+ # Remove from queued set
423
+ async with self._queue_tracking_lock:
424
+ self._queued_credentials.discard(path)
425
+ self._refresh_queue.task_done()
426
+ except asyncio.CancelledError:
427
+ break
428
+ except Exception as e:
429
+ lib_logger.error(f"Error in queue processor: {e}")
430
+ # Even on error, mark as available (backoff will prevent immediate retry)
431
+ if path:
432
+ async with self._queue_tracking_lock:
433
+ self._unavailable_credentials.discard(path)
434
+
435
  async def initialize_token(self, creds_or_path: Union[Dict[str, Any], str]) -> Dict[str, Any]:
436
  """Initiates device flow if tokens are missing or invalid."""
437
  path = creds_or_path if isinstance(creds_or_path, str) else None