Mirrowel commited on
Commit
0c82aac
·
unverified ·
2 Parent(s): 73a2395 b7aa5d6

Merge pull request #34 from MasuRii/fix/antigravity-credential-stuck-unavailable

Browse files
src/proxy_app/main.py CHANGED
@@ -570,23 +570,11 @@ async def lifespan(app: FastAPI):
570
  )
571
 
572
  # Log loaded credentials summary (compact, always visible for deployment verification)
573
- _api_summary = (
574
- ", ".join([f"{p}:{len(c)}" for p, c in api_keys.items()])
575
- if api_keys
576
- else "none"
577
- )
578
- _oauth_summary = (
579
- ", ".join([f"{p}:{len(c)}" for p, c in oauth_credentials.items()])
580
- if oauth_credentials
581
- else "none"
582
- )
583
- _total_summary = ", ".join(
584
- [f"{p}:{len(c)}" for p, c in client.all_credentials.items()]
585
- )
586
- print(
587
- f"🔑 Credentials loaded: {_total_summary} (API: {_api_summary} | OAuth: {_oauth_summary})"
588
- )
589
- client.background_refresher.start() # Start the background task
590
  app.state.rotating_client = client
591
 
592
  # Warn if no provider credentials are configured
 
570
  )
571
 
572
  # Log loaded credentials summary (compact, always visible for deployment verification)
573
+ #_api_summary = ', '.join([f"{p}:{len(c)}" for p, c in api_keys.items()]) if api_keys else "none"
574
+ #_oauth_summary = ', '.join([f"{p}:{len(c)}" for p, c in oauth_credentials.items()]) if oauth_credentials else "none"
575
+ #_total_summary = ', '.join([f"{p}:{len(c)}" for p, c in client.all_credentials.items()])
576
+ #print(f"🔑 Credentials loaded: {_total_summary} (API: {_api_summary} | OAuth: {_oauth_summary})")
577
+ client.background_refresher.start() # Start the background task
 
 
 
 
 
 
 
 
 
 
 
 
578
  app.state.rotating_client = client
579
 
580
  # Warn if no provider credentials are configured
src/rotator_library/background_refresher.py CHANGED
@@ -18,6 +18,9 @@ class BackgroundRefresher:
18
  """
19
 
20
  def __init__(self, client: "RotatingClient"):
 
 
 
21
  try:
22
  interval_str = os.getenv("OAUTH_REFRESH_INTERVAL", "600")
23
  self._interval = int(interval_str)
@@ -26,9 +29,6 @@ class BackgroundRefresher:
26
  f"Invalid OAUTH_REFRESH_INTERVAL '{interval_str}'. Falling back to 600s."
27
  )
28
  self._interval = 600
29
- self._client = client
30
- self._task: Optional[asyncio.Task] = None
31
- self._initialized = False
32
 
33
  def start(self):
34
  """Starts the background refresh task."""
 
18
  """
19
 
20
  def __init__(self, client: "RotatingClient"):
21
+ self._client = client
22
+ self._task: Optional[asyncio.Task] = None
23
+ self._initialized = False
24
  try:
25
  interval_str = os.getenv("OAUTH_REFRESH_INTERVAL", "600")
26
  self._interval = int(interval_str)
 
29
  f"Invalid OAUTH_REFRESH_INTERVAL '{interval_str}'. Falling back to 600s."
30
  )
31
  self._interval = 600
 
 
 
32
 
33
  def start(self):
34
  """Starts the background refresh task."""
src/rotator_library/providers/google_oauth_base.py CHANGED
@@ -19,6 +19,7 @@ from rich.text import Text
19
  from rich.markup import escape as rich_escape
20
 
21
  from ..utils.headless_detection import is_headless_environment
 
22
 
23
  lib_logger = logging.getLogger("rotator_library")
24
 
@@ -85,9 +86,12 @@ class GoogleOAuthBase:
85
  # [QUEUE SYSTEM] Sequential refresh processing
86
  self._refresh_queue: asyncio.Queue = asyncio.Queue()
87
  self._queued_credentials: set = set() # Track credentials already in queue
88
- self._unavailable_credentials: set = (
89
- set()
90
- ) # Mark credentials unavailable during re-auth
 
 
 
91
  self._queue_tracking_lock = asyncio.Lock() # Protects queue sets
92
  self._queue_processor_task: Optional[asyncio.Task] = (
93
  None # Background worker task
@@ -526,8 +530,33 @@ class GoogleOAuthBase:
526
  return self._refresh_locks[path]
527
 
528
  def is_credential_available(self, path: str) -> bool:
529
- """Check if a credential is available for rotation (not queued/refreshing)."""
530
- return path not in self._unavailable_credentials
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
531
 
532
  async def _ensure_queue_processor_running(self):
533
  """Lazily starts the queue processor if not already running."""
@@ -563,7 +592,12 @@ class GoogleOAuthBase:
563
  async with self._queue_tracking_lock:
564
  if path not in self._queued_credentials:
565
  self._queued_credentials.add(path)
566
- self._unavailable_credentials.add(path) # Mark as unavailable
 
 
 
 
 
567
  await self._refresh_queue.put((path, force, needs_reauth))
568
  await self._ensure_queue_processor_running()
569
 
@@ -578,7 +612,16 @@ class GoogleOAuthBase:
578
  self._refresh_queue.get(), timeout=60.0
579
  )
580
  except asyncio.TimeoutError:
581
- # No items for 60s, exit to save resources
 
 
 
 
 
 
 
 
 
582
  self._queue_processor_task = None
583
  return
584
 
@@ -590,7 +633,11 @@ class GoogleOAuthBase:
590
  if creds and not self._is_token_expired(creds):
591
  # No longer expired, mark as available
592
  async with self._queue_tracking_lock:
593
- self._unavailable_credentials.discard(path)
 
 
 
 
594
  continue
595
 
596
  # Perform refresh
@@ -600,25 +647,235 @@ class GoogleOAuthBase:
600
 
601
  # SUCCESS: Mark as available again
602
  async with self._queue_tracking_lock:
603
- self._unavailable_credentials.discard(path)
 
 
 
 
604
 
605
  finally:
606
- # Remove from queued set
 
607
  async with self._queue_tracking_lock:
608
  self._queued_credentials.discard(path)
 
 
 
 
 
 
609
  self._refresh_queue.task_done()
610
  except asyncio.CancelledError:
 
 
 
 
 
 
 
 
611
  break
612
  except Exception as e:
613
  lib_logger.error(f"Error in queue processor: {e}")
614
  # Even on error, mark as available (backoff will prevent immediate retry)
615
  if path:
616
  async with self._queue_tracking_lock:
617
- self._unavailable_credentials.discard(path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
618
 
619
  async def initialize_token(
620
  self, creds_or_path: Union[Dict[str, Any], str]
621
  ) -> Dict[str, Any]:
 
 
 
 
 
 
 
622
  path = creds_or_path if isinstance(creds_or_path, str) else None
623
 
624
  # Get display name from metadata if available, otherwise derive from path
@@ -655,181 +912,23 @@ class GoogleOAuthBase:
655
  f"{self.ENV_PREFIX} OAuth token for '{display_name}' needs setup: {reason}."
656
  )
657
 
658
- # [HEADLESS DETECTION] Check if running in headless environment
659
- is_headless = is_headless_environment()
660
-
661
- auth_code_future = asyncio.get_event_loop().create_future()
662
- server = None
663
 
664
- async def handle_callback(reader, writer):
665
- try:
666
- request_line_bytes = await reader.readline()
667
- if not request_line_bytes:
668
- return
669
- path_str = (
670
- request_line_bytes.decode("utf-8").strip().split(" ")[1]
671
- )
672
- while await reader.readline() != b"\r\n":
673
- pass
674
- from urllib.parse import urlparse, parse_qs
675
-
676
- query_params = parse_qs(urlparse(path_str).query)
677
- writer.write(
678
- b"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n"
679
- )
680
- if "code" in query_params:
681
- if not auth_code_future.done():
682
- auth_code_future.set_result(query_params["code"][0])
683
- writer.write(
684
- b"<html><body><h1>Authentication successful!</h1><p>You can close this window.</p></body></html>"
685
- )
686
- else:
687
- error = query_params.get("error", ["Unknown error"])[0]
688
- if not auth_code_future.done():
689
- auth_code_future.set_exception(
690
- Exception(f"OAuth failed: {error}")
691
- )
692
- writer.write(
693
- f"<html><body><h1>Authentication Failed</h1><p>Error: {error}. Please try again.</p></body></html>".encode()
694
- )
695
- await writer.drain()
696
- except Exception as e:
697
- lib_logger.error(f"Error in OAuth callback handler: {e}")
698
- finally:
699
- writer.close()
700
-
701
- try:
702
- server = await asyncio.start_server(
703
- handle_callback, "127.0.0.1", self.CALLBACK_PORT
704
- )
705
- from urllib.parse import urlencode
706
-
707
- auth_url = (
708
- "https://accounts.google.com/o/oauth2/v2/auth?"
709
- + urlencode(
710
- {
711
- "client_id": self.CLIENT_ID,
712
- "redirect_uri": f"http://localhost:{self.CALLBACK_PORT}{self.CALLBACK_PATH}",
713
- "scope": " ".join(self.OAUTH_SCOPES),
714
- "access_type": "offline",
715
- "response_type": "code",
716
- "prompt": "consent",
717
- }
718
- )
719
  )
720
 
721
- # [HEADLESS SUPPORT] Display appropriate instructions
722
- if is_headless:
723
- auth_panel_text = Text.from_markup(
724
- "Running in headless environment (no GUI detected).\n"
725
- "Please open the URL below in a browser on another machine to authorize:\n"
726
- )
727
- else:
728
- auth_panel_text = Text.from_markup(
729
- "1. Your browser will now open to log in and authorize the application.\n"
730
- "2. If it doesn't open automatically, please open the URL below manually."
731
- )
732
-
733
- console.print(
734
- Panel(
735
- auth_panel_text,
736
- title=f"{self.ENV_PREFIX} OAuth Setup for [bold yellow]{display_name}[/bold yellow]",
737
- style="bold blue",
738
- )
739
- )
740
- # [URL DISPLAY] Print URL with proper escaping to prevent Rich markup issues.
741
- # IMPORTANT: OAuth URLs contain special characters (=, &, etc.) that Rich might
742
- # interpret as markup in some terminal configurations. We escape the URL to
743
- # ensure it displays correctly.
744
- #
745
- # KNOWN ISSUE: If Rich rendering fails entirely (e.g., terminal doesn't support
746
- # ANSI codes, or output is piped), the escaped URL should still be valid.
747
- # However, if the terminal strips or mangles the output, users should copy
748
- # the URL directly from logs or use --verbose to see the raw URL.
749
- #
750
- # The [link=...] markup creates a clickable hyperlink in supported terminals
751
- # (iTerm2, Windows Terminal, etc.), but the displayed text is the escaped URL
752
- # which can be safely copied even if the hyperlink doesn't work.
753
- escaped_url = rich_escape(auth_url)
754
- console.print(
755
- f"[bold]URL:[/bold] [link={auth_url}]{escaped_url}[/link]\n"
756
- )
757
-
758
- # [HEADLESS SUPPORT] Only attempt browser open if NOT headless
759
- if not is_headless:
760
- try:
761
- webbrowser.open(auth_url)
762
- lib_logger.info(
763
- "Browser opened successfully for OAuth flow"
764
- )
765
- except Exception as e:
766
- lib_logger.warning(
767
- f"Failed to open browser automatically: {e}. Please open the URL manually."
768
- )
769
-
770
- with console.status(
771
- f"[bold green]Waiting for you to complete authentication in the browser...[/bold green]",
772
- spinner="dots",
773
- ):
774
- auth_code = await asyncio.wait_for(
775
- auth_code_future, timeout=300
776
- )
777
- except asyncio.TimeoutError:
778
- raise Exception("OAuth flow timed out. Please try again.")
779
- finally:
780
- if server:
781
- server.close()
782
- await server.wait_closed()
783
-
784
- lib_logger.info(
785
- f"Attempting to exchange authorization code for tokens..."
786
  )
787
- async with httpx.AsyncClient() as client:
788
- response = await client.post(
789
- self.TOKEN_URI,
790
- data={
791
- "code": auth_code.strip(),
792
- "client_id": self.CLIENT_ID,
793
- "client_secret": self.CLIENT_SECRET,
794
- "redirect_uri": f"http://localhost:{self.CALLBACK_PORT}{self.CALLBACK_PATH}",
795
- "grant_type": "authorization_code",
796
- },
797
- )
798
- response.raise_for_status()
799
- token_data = response.json()
800
- # Start with the full token data from the exchange
801
- creds = token_data.copy()
802
-
803
- # Convert 'expires_in' to 'expiry_date' in milliseconds
804
- creds["expiry_date"] = (
805
- time.time() + creds.pop("expires_in")
806
- ) * 1000
807
-
808
- # Ensure client_id and client_secret are present
809
- creds["client_id"] = self.CLIENT_ID
810
- creds["client_secret"] = self.CLIENT_SECRET
811
-
812
- creds["token_uri"] = self.TOKEN_URI
813
- creds["universe_domain"] = "googleapis.com"
814
-
815
- # Fetch user info and add metadata
816
- user_info_response = await client.get(
817
- self.USER_INFO_URI,
818
- headers={"Authorization": f"Bearer {creds['access_token']}"},
819
- )
820
- user_info_response.raise_for_status()
821
- user_info = user_info_response.json()
822
- creds["_proxy_metadata"] = {
823
- "email": user_info.get("email"),
824
- "last_check_timestamp": time.time(),
825
- }
826
-
827
- if path:
828
- await self._save_credentials(path, creds)
829
- lib_logger.info(
830
- f"{self.ENV_PREFIX} OAuth initialized successfully for '{display_name}'."
831
- )
832
- return creds
833
 
834
  lib_logger.info(
835
  f"{self.ENV_PREFIX} OAuth token at '{display_name}' is valid."
 
19
  from rich.markup import escape as rich_escape
20
 
21
  from ..utils.headless_detection import is_headless_environment
22
+ from ..utils.reauth_coordinator import get_reauth_coordinator
23
 
24
  lib_logger = logging.getLogger("rotator_library")
25
 
 
86
  # [QUEUE SYSTEM] Sequential refresh processing
87
  self._refresh_queue: asyncio.Queue = asyncio.Queue()
88
  self._queued_credentials: set = set() # Track credentials already in queue
89
+ # [FIX PR#34] Changed from set to dict mapping credential path to timestamp
90
+ # This enables TTL-based stale entry cleanup as defense in depth
91
+ self._unavailable_credentials: Dict[
92
+ str, float
93
+ ] = {} # Maps credential path -> timestamp when marked unavailable
94
+ self._unavailable_ttl_seconds: int = 300 # 5 minutes TTL for stale entries
95
  self._queue_tracking_lock = asyncio.Lock() # Protects queue sets
96
  self._queue_processor_task: Optional[asyncio.Task] = (
97
  None # Background worker task
 
530
  return self._refresh_locks[path]
531
 
532
  def is_credential_available(self, path: str) -> bool:
533
+ """Check if a credential is available for rotation (not queued/refreshing).
534
+
535
+ [FIX PR#34] Now includes TTL-based stale entry cleanup as defense in depth.
536
+ If a credential has been unavailable for longer than _unavailable_ttl_seconds,
537
+ it is automatically cleaned up and considered available.
538
+ """
539
+ if path not in self._unavailable_credentials:
540
+ return True
541
+
542
+ # [FIX PR#34] Check if the entry is stale (TTL expired)
543
+ marked_time = self._unavailable_credentials.get(path)
544
+ if marked_time is not None:
545
+ now = time.time()
546
+ if now - marked_time > self._unavailable_ttl_seconds:
547
+ # Entry is stale - clean it up and return available
548
+ lib_logger.warning(
549
+ f"Credential '{Path(path).name}' was stuck in unavailable state for "
550
+ f"{int(now - marked_time)}s (TTL: {self._unavailable_ttl_seconds}s). "
551
+ f"Auto-cleaning stale entry."
552
+ )
553
+ # Note: This is a sync method, so we can't use async lock here.
554
+ # However, pop from dict is thread-safe for single operations.
555
+ # The _queue_tracking_lock protects concurrent modifications in async context.
556
+ self._unavailable_credentials.pop(path, None)
557
+ return True
558
+
559
+ return False
560
 
561
  async def _ensure_queue_processor_running(self):
562
  """Lazily starts the queue processor if not already running."""
 
592
  async with self._queue_tracking_lock:
593
  if path not in self._queued_credentials:
594
  self._queued_credentials.add(path)
595
+ # [FIX PR#34] Store timestamp when marking unavailable (for TTL cleanup)
596
+ self._unavailable_credentials[path] = time.time()
597
+ lib_logger.debug(
598
+ f"Marked '{Path(path).name}' as unavailable. "
599
+ f"Total unavailable: {len(self._unavailable_credentials)}"
600
+ )
601
  await self._refresh_queue.put((path, force, needs_reauth))
602
  await self._ensure_queue_processor_running()
603
 
 
612
  self._refresh_queue.get(), timeout=60.0
613
  )
614
  except asyncio.TimeoutError:
615
+ # [FIX PR#34] Clean up any stale unavailable entries before exiting
616
+ # If we're idle for 60s, no refreshes are in progress
617
+ async with self._queue_tracking_lock:
618
+ if self._unavailable_credentials:
619
+ stale_count = len(self._unavailable_credentials)
620
+ lib_logger.warning(
621
+ f"Queue processor idle timeout. Cleaning {stale_count} "
622
+ f"stale unavailable credentials: {list(self._unavailable_credentials.keys())}"
623
+ )
624
+ self._unavailable_credentials.clear()
625
  self._queue_processor_task = None
626
  return
627
 
 
633
  if creds and not self._is_token_expired(creds):
634
  # No longer expired, mark as available
635
  async with self._queue_tracking_lock:
636
+ self._unavailable_credentials.pop(path, None)
637
+ lib_logger.debug(
638
+ f"Credential '{Path(path).name}' no longer expired, marked available. "
639
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
640
+ )
641
  continue
642
 
643
  # Perform refresh
 
647
 
648
  # SUCCESS: Mark as available again
649
  async with self._queue_tracking_lock:
650
+ self._unavailable_credentials.pop(path, None)
651
+ lib_logger.debug(
652
+ f"Refresh SUCCESS for '{Path(path).name}', marked available. "
653
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
654
+ )
655
 
656
  finally:
657
+ # [FIX PR#34] Remove from BOTH queued set AND unavailable credentials
658
+ # This ensures cleanup happens in ALL exit paths (success, exception, etc.)
659
  async with self._queue_tracking_lock:
660
  self._queued_credentials.discard(path)
661
+ # [FIX PR#34] Always clean up unavailable credentials in finally block
662
+ self._unavailable_credentials.pop(path, None)
663
+ lib_logger.debug(
664
+ f"Finally cleanup for '{Path(path).name}'. "
665
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
666
+ )
667
  self._refresh_queue.task_done()
668
  except asyncio.CancelledError:
669
+ # [FIX PR#34] Clean up the current credential before breaking
670
+ if path:
671
+ async with self._queue_tracking_lock:
672
+ self._unavailable_credentials.pop(path, None)
673
+ lib_logger.debug(
674
+ f"CancelledError cleanup for '{Path(path).name}'. "
675
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
676
+ )
677
  break
678
  except Exception as e:
679
  lib_logger.error(f"Error in queue processor: {e}")
680
  # Even on error, mark as available (backoff will prevent immediate retry)
681
  if path:
682
  async with self._queue_tracking_lock:
683
+ self._unavailable_credentials.pop(path, None)
684
+ lib_logger.debug(
685
+ f"Error cleanup for '{Path(path).name}': {e}. "
686
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
687
+ )
688
+
689
+ async def _perform_interactive_oauth(
690
+ self, path: str, creds: Dict[str, Any], display_name: str
691
+ ) -> Dict[str, Any]:
692
+ """
693
+ Perform interactive OAuth flow (browser-based authentication).
694
+
695
+ This method is called via the global ReauthCoordinator to ensure
696
+ only one interactive OAuth flow runs at a time across all providers.
697
+
698
+ Args:
699
+ path: Credential file path
700
+ creds: Current credentials dict (will be updated)
701
+ display_name: Display name for logging/UI
702
+
703
+ Returns:
704
+ Updated credentials dict with new tokens
705
+ """
706
+ # [HEADLESS DETECTION] Check if running in headless environment
707
+ is_headless = is_headless_environment()
708
+
709
+ auth_code_future = asyncio.get_event_loop().create_future()
710
+ server = None
711
+
712
+ async def handle_callback(reader, writer):
713
+ try:
714
+ request_line_bytes = await reader.readline()
715
+ if not request_line_bytes:
716
+ return
717
+ path_str = request_line_bytes.decode("utf-8").strip().split(" ")[1]
718
+ while await reader.readline() != b"\r\n":
719
+ pass
720
+ from urllib.parse import urlparse, parse_qs
721
+
722
+ query_params = parse_qs(urlparse(path_str).query)
723
+ writer.write(b"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n")
724
+ if "code" in query_params:
725
+ if not auth_code_future.done():
726
+ auth_code_future.set_result(query_params["code"][0])
727
+ writer.write(
728
+ b"<html><body><h1>Authentication successful!</h1><p>You can close this window.</p></body></html>"
729
+ )
730
+ else:
731
+ error = query_params.get("error", ["Unknown error"])[0]
732
+ if not auth_code_future.done():
733
+ auth_code_future.set_exception(
734
+ Exception(f"OAuth failed: {error}")
735
+ )
736
+ writer.write(
737
+ f"<html><body><h1>Authentication Failed</h1><p>Error: {error}. Please try again.</p></body></html>".encode()
738
+ )
739
+ await writer.drain()
740
+ except Exception as e:
741
+ lib_logger.error(f"Error in OAuth callback handler: {e}")
742
+ finally:
743
+ writer.close()
744
+
745
+ try:
746
+ server = await asyncio.start_server(
747
+ handle_callback, "127.0.0.1", self.CALLBACK_PORT
748
+ )
749
+ from urllib.parse import urlencode
750
+
751
+ auth_url = "https://accounts.google.com/o/oauth2/v2/auth?" + urlencode(
752
+ {
753
+ "client_id": self.CLIENT_ID,
754
+ "redirect_uri": f"http://localhost:{self.CALLBACK_PORT}{self.CALLBACK_PATH}",
755
+ "scope": " ".join(self.OAUTH_SCOPES),
756
+ "access_type": "offline",
757
+ "response_type": "code",
758
+ "prompt": "consent",
759
+ }
760
+ )
761
+
762
+ # [HEADLESS SUPPORT] Display appropriate instructions
763
+ if is_headless:
764
+ auth_panel_text = Text.from_markup(
765
+ "Running in headless environment (no GUI detected).\n"
766
+ "Please open the URL below in a browser on another machine to authorize:\n"
767
+ )
768
+ else:
769
+ auth_panel_text = Text.from_markup(
770
+ "1. Your browser will now open to log in and authorize the application.\n"
771
+ "2. If it doesn't open automatically, please open the URL below manually."
772
+ )
773
+
774
+ console.print(
775
+ Panel(
776
+ auth_panel_text,
777
+ title=f"{self.ENV_PREFIX} OAuth Setup for [bold yellow]{display_name}[/bold yellow]",
778
+ style="bold blue",
779
+ )
780
+ )
781
+ # [URL DISPLAY] Print URL with proper escaping to prevent Rich markup issues.
782
+ # IMPORTANT: OAuth URLs contain special characters (=, &, etc.) that Rich might
783
+ # interpret as markup in some terminal configurations. We escape the URL to
784
+ # ensure it displays correctly.
785
+ #
786
+ # KNOWN ISSUE: If Rich rendering fails entirely (e.g., terminal doesn't support
787
+ # ANSI codes, or output is piped), the escaped URL should still be valid.
788
+ # However, if the terminal strips or mangles the output, users should copy
789
+ # the URL directly from logs or use --verbose to see the raw URL.
790
+ #
791
+ # The [link=...] markup creates a clickable hyperlink in supported terminals
792
+ # (iTerm2, Windows Terminal, etc.), but the displayed text is the escaped URL
793
+ # which can be safely copied even if the hyperlink doesn't work.
794
+ escaped_url = rich_escape(auth_url)
795
+ console.print(f"[bold]URL:[/bold] [link={auth_url}]{escaped_url}[/link]\n")
796
+
797
+ # [HEADLESS SUPPORT] Only attempt browser open if NOT headless
798
+ if not is_headless:
799
+ try:
800
+ webbrowser.open(auth_url)
801
+ lib_logger.info("Browser opened successfully for OAuth flow")
802
+ except Exception as e:
803
+ lib_logger.warning(
804
+ f"Failed to open browser automatically: {e}. Please open the URL manually."
805
+ )
806
+
807
+ with console.status(
808
+ f"[bold green]Waiting for you to complete authentication in the browser...[/bold green]",
809
+ spinner="dots",
810
+ ):
811
+ # Note: The 300s timeout here is handled by the ReauthCoordinator
812
+ # We use a slightly longer internal timeout to let the coordinator handle it
813
+ auth_code = await asyncio.wait_for(auth_code_future, timeout=310)
814
+ except asyncio.TimeoutError:
815
+ raise Exception("OAuth flow timed out. Please try again.")
816
+ finally:
817
+ if server:
818
+ server.close()
819
+ await server.wait_closed()
820
+
821
+ lib_logger.info(f"Attempting to exchange authorization code for tokens...")
822
+ async with httpx.AsyncClient() as client:
823
+ response = await client.post(
824
+ self.TOKEN_URI,
825
+ data={
826
+ "code": auth_code.strip(),
827
+ "client_id": self.CLIENT_ID,
828
+ "client_secret": self.CLIENT_SECRET,
829
+ "redirect_uri": f"http://localhost:{self.CALLBACK_PORT}{self.CALLBACK_PATH}",
830
+ "grant_type": "authorization_code",
831
+ },
832
+ )
833
+ response.raise_for_status()
834
+ token_data = response.json()
835
+ # Start with the full token data from the exchange
836
+ new_creds = token_data.copy()
837
+
838
+ # Convert 'expires_in' to 'expiry_date' in milliseconds
839
+ new_creds["expiry_date"] = (
840
+ time.time() + new_creds.pop("expires_in")
841
+ ) * 1000
842
+
843
+ # Ensure client_id and client_secret are present
844
+ new_creds["client_id"] = self.CLIENT_ID
845
+ new_creds["client_secret"] = self.CLIENT_SECRET
846
+
847
+ new_creds["token_uri"] = self.TOKEN_URI
848
+ new_creds["universe_domain"] = "googleapis.com"
849
+
850
+ # Fetch user info and add metadata
851
+ user_info_response = await client.get(
852
+ self.USER_INFO_URI,
853
+ headers={"Authorization": f"Bearer {new_creds['access_token']}"},
854
+ )
855
+ user_info_response.raise_for_status()
856
+ user_info = user_info_response.json()
857
+ new_creds["_proxy_metadata"] = {
858
+ "email": user_info.get("email"),
859
+ "last_check_timestamp": time.time(),
860
+ }
861
+
862
+ if path:
863
+ await self._save_credentials(path, new_creds)
864
+ lib_logger.info(
865
+ f"{self.ENV_PREFIX} OAuth initialized successfully for '{display_name}'."
866
+ )
867
+ return new_creds
868
 
869
  async def initialize_token(
870
  self, creds_or_path: Union[Dict[str, Any], str]
871
  ) -> Dict[str, Any]:
872
+ """
873
+ Initialize OAuth token, triggering interactive OAuth flow if needed.
874
+
875
+ If interactive OAuth is required (expired refresh token, missing credentials, etc.),
876
+ the flow is coordinated globally via ReauthCoordinator to ensure only one
877
+ interactive OAuth flow runs at a time across all providers.
878
+ """
879
  path = creds_or_path if isinstance(creds_or_path, str) else None
880
 
881
  # Get display name from metadata if available, otherwise derive from path
 
912
  f"{self.ENV_PREFIX} OAuth token for '{display_name}' needs setup: {reason}."
913
  )
914
 
915
+ # [GLOBAL REAUTH COORDINATION] Use the global coordinator to ensure
916
+ # only one interactive OAuth flow runs at a time across all providers
917
+ coordinator = get_reauth_coordinator()
 
 
918
 
919
+ # Define the interactive OAuth function to be executed by coordinator
920
+ async def _do_interactive_oauth():
921
+ return await self._perform_interactive_oauth(
922
+ path, creds, display_name
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
923
  )
924
 
925
+ # Execute via global coordinator (ensures only one at a time)
926
+ return await coordinator.execute_reauth(
927
+ credential_path=path or display_name,
928
+ provider_name=self.ENV_PREFIX,
929
+ reauth_func=_do_interactive_oauth,
930
+ timeout=300.0, # 5 minute timeout for user to complete OAuth
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
931
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
932
 
933
  lib_logger.info(
934
  f"{self.ENV_PREFIX} OAuth token at '{display_name}' is valid."
src/rotator_library/providers/iflow_auth_base.py CHANGED
@@ -23,6 +23,7 @@ from rich.prompt import Prompt
23
  from rich.text import Text
24
  from rich.markup import escape as rich_escape
25
  from ..utils.headless_detection import is_headless_environment
 
26
 
27
  lib_logger = logging.getLogger("rotator_library")
28
 
@@ -173,9 +174,12 @@ class IFlowAuthBase:
173
  # [QUEUE SYSTEM] Sequential refresh processing
174
  self._refresh_queue: asyncio.Queue = asyncio.Queue()
175
  self._queued_credentials: set = set() # Track credentials already in queue
176
- self._unavailable_credentials: set = (
177
- set()
178
- ) # Mark credentials unavailable during re-auth
 
 
 
179
  self._queue_tracking_lock = asyncio.Lock() # Protects queue sets
180
  self._queue_processor_task: Optional[asyncio.Task] = (
181
  None # Background worker task
@@ -745,15 +749,28 @@ class IFlowAuthBase:
745
  Proactively refreshes tokens if they're close to expiry.
746
  Only applies to OAuth credentials (file paths or env:// paths). Direct API keys are skipped.
747
  """
748
- # Check if it's an env:// virtual path (OAuth credentials from environment)
749
- is_env_path = credential_identifier.startswith("env://")
750
 
751
- # Only refresh if it's an OAuth credential (file path or env:// path)
752
- if not is_env_path and not os.path.isfile(credential_identifier):
753
- return # Direct API key, no refresh needed
 
 
 
 
 
 
 
754
 
755
- creds = await self._load_credentials(credential_identifier)
756
- if self._is_token_expired(creds):
 
 
 
 
 
 
 
757
  # Queue for refresh with needs_reauth=False (automated refresh)
758
  await self._queue_refresh(
759
  credential_identifier, force=False, needs_reauth=False
@@ -768,8 +785,30 @@ class IFlowAuthBase:
768
  return self._refresh_locks[path]
769
 
770
  def is_credential_available(self, path: str) -> bool:
771
- """Check if a credential is available for rotation (not queued/refreshing)."""
772
- return path not in self._unavailable_credentials
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
773
 
774
  async def _ensure_queue_processor_running(self):
775
  """Lazily starts the queue processor if not already running."""
@@ -805,7 +844,12 @@ class IFlowAuthBase:
805
  async with self._queue_tracking_lock:
806
  if path not in self._queued_credentials:
807
  self._queued_credentials.add(path)
808
- self._unavailable_credentials.add(path) # Mark as unavailable
 
 
 
 
 
809
  await self._refresh_queue.put((path, force, needs_reauth))
810
  await self._ensure_queue_processor_running()
811
 
@@ -820,7 +864,22 @@ class IFlowAuthBase:
820
  self._refresh_queue.get(), timeout=60.0
821
  )
822
  except asyncio.TimeoutError:
823
- # No items for 60s, exit to save resources
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
824
  self._queue_processor_task = None
825
  return
826
 
@@ -832,7 +891,11 @@ class IFlowAuthBase:
832
  if creds and not self._is_token_expired(creds):
833
  # No longer expired, mark as available
834
  async with self._queue_tracking_lock:
835
- self._unavailable_credentials.discard(path)
 
 
 
 
836
  continue
837
 
838
  # Perform refresh
@@ -842,28 +905,174 @@ class IFlowAuthBase:
842
 
843
  # SUCCESS: Mark as available again
844
  async with self._queue_tracking_lock:
845
- self._unavailable_credentials.discard(path)
 
 
 
 
846
 
847
  finally:
848
- # Remove from queued set
 
849
  async with self._queue_tracking_lock:
850
  self._queued_credentials.discard(path)
 
 
 
 
 
 
851
  self._refresh_queue.task_done()
852
  except asyncio.CancelledError:
 
 
 
 
 
 
 
 
853
  break
854
  except Exception as e:
855
  lib_logger.error(f"Error in queue processor: {e}")
856
  # Even on error, mark as available (backoff will prevent immediate retry)
857
  if path:
858
  async with self._queue_tracking_lock:
859
- self._unavailable_credentials.discard(path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
860
 
861
  async def initialize_token(
862
  self, creds_or_path: Union[Dict[str, Any], str]
863
  ) -> Dict[str, Any]:
864
  """
865
- Initiates OAuth authorization code flow if tokens are missing or invalid.
866
- Uses local callback server to receive authorization code.
 
 
 
867
  """
868
  path = creds_or_path if isinstance(creds_or_path, str) else None
869
 
@@ -903,127 +1112,23 @@ class IFlowAuthBase:
903
  f"iFlow OAuth token for '{display_name}' needs setup: {reason}."
904
  )
905
 
906
- # [HEADLESS DETECTION] Check if running in headless environment
907
- is_headless = is_headless_environment()
908
-
909
- # Generate random state for CSRF protection
910
- state = secrets.token_urlsafe(32)
911
-
912
- # Build authorization URL
913
- redirect_uri = f"http://localhost:{CALLBACK_PORT}/oauth2callback"
914
- auth_params = {
915
- "loginMethod": "phone",
916
- "type": "phone",
917
- "redirect": redirect_uri,
918
- "state": state,
919
- "client_id": IFLOW_CLIENT_ID,
920
- }
921
- auth_url = f"{IFLOW_OAUTH_AUTHORIZE_ENDPOINT}?{urlencode(auth_params)}"
922
-
923
- # Start OAuth callback server
924
- callback_server = OAuthCallbackServer(port=CALLBACK_PORT)
925
- try:
926
- await callback_server.start(expected_state=state)
927
-
928
- # [HEADLESS SUPPORT] Display appropriate instructions
929
- if is_headless:
930
- auth_panel_text = Text.from_markup(
931
- "Running in headless environment (no GUI detected).\n"
932
- "Please open the URL below in a browser on another machine to authorize:\n"
933
- "1. Visit the URL below to sign in with your phone number.\n"
934
- "2. [bold]Authorize the application[/bold] to access your account.\n"
935
- "3. You will be automatically redirected after authorization."
936
- )
937
- else:
938
- auth_panel_text = Text.from_markup(
939
- "1. Visit the URL below to sign in with your phone number.\n"
940
- "2. [bold]Authorize the application[/bold] to access your account.\n"
941
- "3. You will be automatically redirected after authorization."
942
- )
943
-
944
- console.print(
945
- Panel(
946
- auth_panel_text,
947
- title=f"iFlow OAuth Setup for [bold yellow]{display_name}[/bold yellow]",
948
- style="bold blue",
949
- )
950
- )
951
- # [URL DISPLAY] Print URL with proper escaping to prevent Rich markup issues.
952
- # IMPORTANT: OAuth URLs contain special characters (=, &, etc.) that Rich might
953
- # interpret as markup in some terminal configurations. We escape the URL to
954
- # ensure it displays correctly.
955
- #
956
- # KNOWN ISSUE: If Rich rendering fails entirely (e.g., terminal doesn't support
957
- # ANSI codes, or output is piped), the escaped URL should still be valid.
958
- # However, if the terminal strips or mangles the output, users should copy
959
- # the URL directly from logs or use --verbose to see the raw URL.
960
- #
961
- # The [link=...] markup creates a clickable hyperlink in supported terminals
962
- # (iTerm2, Windows Terminal, etc.), but the displayed text is the escaped URL
963
- # which can be safely copied even if the hyperlink doesn't work.
964
- escaped_url = rich_escape(auth_url)
965
- console.print(
966
- f"[bold]URL:[/bold] [link={auth_url}]{escaped_url}[/link]\n"
967
- )
968
-
969
- # [HEADLESS SUPPORT] Only attempt browser open if NOT headless
970
- if not is_headless:
971
- try:
972
- webbrowser.open(auth_url)
973
- lib_logger.info(
974
- "Browser opened successfully for iFlow OAuth flow"
975
- )
976
- except Exception as e:
977
- lib_logger.warning(
978
- f"Failed to open browser automatically: {e}. Please open the URL manually."
979
- )
980
-
981
- # Wait for callback
982
- with console.status(
983
- "[bold green]Waiting for authorization in the browser...[/bold green]",
984
- spinner="dots",
985
- ):
986
- code = await callback_server.wait_for_callback(timeout=300.0)
987
 
988
- lib_logger.info(
989
- "Received authorization code, exchanging for tokens..."
990
- )
991
-
992
- # Exchange code for tokens and API key
993
- token_data = await self._exchange_code_for_tokens(
994
- code, redirect_uri
995
- )
996
-
997
- # Update credentials
998
- creds.update(
999
- {
1000
- "access_token": token_data["access_token"],
1001
- "refresh_token": token_data["refresh_token"],
1002
- "api_key": token_data["api_key"],
1003
- "email": token_data["email"],
1004
- "expiry_date": token_data["expiry_date"],
1005
- "token_type": token_data["token_type"],
1006
- "scope": token_data["scope"],
1007
- }
1008
- )
1009
-
1010
- # Create metadata object
1011
- if not creds.get("_proxy_metadata"):
1012
- creds["_proxy_metadata"] = {
1013
- "email": token_data["email"],
1014
- "last_check_timestamp": time.time(),
1015
- }
1016
-
1017
- if path:
1018
- await self._save_credentials(path, creds)
1019
-
1020
- lib_logger.info(
1021
- f"iFlow OAuth initialized successfully for '{display_name}'."
1022
  )
1023
- return creds
1024
 
1025
- finally:
1026
- await callback_server.stop()
 
 
 
 
 
1027
 
1028
  lib_logger.info(f"iFlow OAuth token at '{display_name}' is valid.")
1029
  return creds
 
23
  from rich.text import Text
24
  from rich.markup import escape as rich_escape
25
  from ..utils.headless_detection import is_headless_environment
26
+ from ..utils.reauth_coordinator import get_reauth_coordinator
27
 
28
  lib_logger = logging.getLogger("rotator_library")
29
 
 
174
  # [QUEUE SYSTEM] Sequential refresh processing
175
  self._refresh_queue: asyncio.Queue = asyncio.Queue()
176
  self._queued_credentials: set = set() # Track credentials already in queue
177
+ # [FIX PR#34] Changed from set to dict mapping credential path to timestamp
178
+ # This enables TTL-based stale entry cleanup as defense in depth
179
+ self._unavailable_credentials: Dict[
180
+ str, float
181
+ ] = {} # Maps credential path -> timestamp when marked unavailable
182
+ self._unavailable_ttl_seconds: int = 300 # 5 minutes TTL for stale entries
183
  self._queue_tracking_lock = asyncio.Lock() # Protects queue sets
184
  self._queue_processor_task: Optional[asyncio.Task] = (
185
  None # Background worker task
 
749
  Proactively refreshes tokens if they're close to expiry.
750
  Only applies to OAuth credentials (file paths or env:// paths). Direct API keys are skipped.
751
  """
752
+ lib_logger.debug(f"proactively_refresh called for: {credential_identifier}")
 
753
 
754
+ # Try to load credentials - this will fail for direct API keys
755
+ # and succeed for OAuth credentials (file paths or env:// paths)
756
+ try:
757
+ creds = await self._load_credentials(credential_identifier)
758
+ except IOError as e:
759
+ # Not a valid credential path (likely a direct API key string)
760
+ lib_logger.debug(
761
+ f"Skipping refresh for '{credential_identifier}' - not an OAuth credential: {e}"
762
+ )
763
+ return
764
 
765
+ is_expired = self._is_token_expired(creds)
766
+ lib_logger.debug(
767
+ f"Token expired check for '{Path(credential_identifier).name}': {is_expired}"
768
+ )
769
+
770
+ if is_expired:
771
+ lib_logger.debug(
772
+ f"Queueing refresh for '{Path(credential_identifier).name}'"
773
+ )
774
  # Queue for refresh with needs_reauth=False (automated refresh)
775
  await self._queue_refresh(
776
  credential_identifier, force=False, needs_reauth=False
 
785
  return self._refresh_locks[path]
786
 
787
  def is_credential_available(self, path: str) -> bool:
788
+ """Check if a credential is available for rotation (not queued/refreshing).
789
+
790
+ [FIX PR#34] Now includes TTL-based stale entry cleanup as defense in depth.
791
+ If a credential has been unavailable for longer than _unavailable_ttl_seconds,
792
+ it is automatically cleaned up and considered available.
793
+ """
794
+ if path not in self._unavailable_credentials:
795
+ return True
796
+
797
+ # [FIX PR#34] Check if the entry is stale (TTL expired)
798
+ marked_time = self._unavailable_credentials.get(path)
799
+ if marked_time is not None:
800
+ now = time.time()
801
+ if now - marked_time > self._unavailable_ttl_seconds:
802
+ # Entry is stale - clean it up and return available
803
+ lib_logger.warning(
804
+ f"Credential '{Path(path).name}' was stuck in unavailable state for "
805
+ f"{int(now - marked_time)}s (TTL: {self._unavailable_ttl_seconds}s). "
806
+ f"Auto-cleaning stale entry."
807
+ )
808
+ self._unavailable_credentials.pop(path, None)
809
+ return True
810
+
811
+ return False
812
 
813
  async def _ensure_queue_processor_running(self):
814
  """Lazily starts the queue processor if not already running."""
 
844
  async with self._queue_tracking_lock:
845
  if path not in self._queued_credentials:
846
  self._queued_credentials.add(path)
847
+ # [FIX PR#34] Store timestamp when marking unavailable (for TTL cleanup)
848
+ self._unavailable_credentials[path] = time.time()
849
+ lib_logger.debug(
850
+ f"Marked '{Path(path).name}' as unavailable. "
851
+ f"Total unavailable: {len(self._unavailable_credentials)}"
852
+ )
853
  await self._refresh_queue.put((path, force, needs_reauth))
854
  await self._ensure_queue_processor_running()
855
 
 
864
  self._refresh_queue.get(), timeout=60.0
865
  )
866
  except asyncio.TimeoutError:
867
+ # [FIX PR#34] Clean up any stale unavailable entries before exiting
868
+ # If we're idle for 60s, no refreshes are in progress
869
+ async with self._queue_tracking_lock:
870
+ if self._unavailable_credentials:
871
+ stale_count = len(self._unavailable_credentials)
872
+ lib_logger.warning(
873
+ f"Queue processor idle timeout. Cleaning {stale_count} "
874
+ f"stale unavailable credentials: {list(self._unavailable_credentials.keys())}"
875
+ )
876
+ self._unavailable_credentials.clear()
877
+ # [FIX BUG#6] Also clear queued credentials to prevent stuck state
878
+ if self._queued_credentials:
879
+ lib_logger.debug(
880
+ f"Clearing {len(self._queued_credentials)} queued credentials on timeout"
881
+ )
882
+ self._queued_credentials.clear()
883
  self._queue_processor_task = None
884
  return
885
 
 
891
  if creds and not self._is_token_expired(creds):
892
  # No longer expired, mark as available
893
  async with self._queue_tracking_lock:
894
+ self._unavailable_credentials.pop(path, None)
895
+ lib_logger.debug(
896
+ f"Credential '{Path(path).name}' no longer expired, marked available. "
897
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
898
+ )
899
  continue
900
 
901
  # Perform refresh
 
905
 
906
  # SUCCESS: Mark as available again
907
  async with self._queue_tracking_lock:
908
+ self._unavailable_credentials.pop(path, None)
909
+ lib_logger.debug(
910
+ f"Refresh SUCCESS for '{Path(path).name}', marked available. "
911
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
912
+ )
913
 
914
  finally:
915
+ # [FIX PR#34] Remove from BOTH queued set AND unavailable credentials
916
+ # This ensures cleanup happens in ALL exit paths (success, exception, etc.)
917
  async with self._queue_tracking_lock:
918
  self._queued_credentials.discard(path)
919
+ # [FIX PR#34] Always clean up unavailable credentials in finally block
920
+ self._unavailable_credentials.pop(path, None)
921
+ lib_logger.debug(
922
+ f"Finally cleanup for '{Path(path).name}'. "
923
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
924
+ )
925
  self._refresh_queue.task_done()
926
  except asyncio.CancelledError:
927
+ # [FIX PR#34] Clean up the current credential before breaking
928
+ if path:
929
+ async with self._queue_tracking_lock:
930
+ self._unavailable_credentials.pop(path, None)
931
+ lib_logger.debug(
932
+ f"CancelledError cleanup for '{Path(path).name}'. "
933
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
934
+ )
935
  break
936
  except Exception as e:
937
  lib_logger.error(f"Error in queue processor: {e}")
938
  # Even on error, mark as available (backoff will prevent immediate retry)
939
  if path:
940
  async with self._queue_tracking_lock:
941
+ self._unavailable_credentials.pop(path, None)
942
+ lib_logger.debug(
943
+ f"Error cleanup for '{Path(path).name}': {e}. "
944
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
945
+ )
946
+
947
+ async def _perform_interactive_oauth(
948
+ self, path: str, creds: Dict[str, Any], display_name: str
949
+ ) -> Dict[str, Any]:
950
+ """
951
+ Perform interactive OAuth authorization code flow (browser-based authentication).
952
+
953
+ This method is called via the global ReauthCoordinator to ensure
954
+ only one interactive OAuth flow runs at a time across all providers.
955
+
956
+ Args:
957
+ path: Credential file path
958
+ creds: Current credentials dict (will be updated)
959
+ display_name: Display name for logging/UI
960
+
961
+ Returns:
962
+ Updated credentials dict with new tokens
963
+ """
964
+ # [HEADLESS DETECTION] Check if running in headless environment
965
+ is_headless = is_headless_environment()
966
+
967
+ # Generate random state for CSRF protection
968
+ state = secrets.token_urlsafe(32)
969
+
970
+ # Build authorization URL
971
+ redirect_uri = f"http://localhost:{CALLBACK_PORT}/oauth2callback"
972
+ auth_params = {
973
+ "loginMethod": "phone",
974
+ "type": "phone",
975
+ "redirect": redirect_uri,
976
+ "state": state,
977
+ "client_id": IFLOW_CLIENT_ID,
978
+ }
979
+ auth_url = f"{IFLOW_OAUTH_AUTHORIZE_ENDPOINT}?{urlencode(auth_params)}"
980
+
981
+ # Start OAuth callback server
982
+ callback_server = OAuthCallbackServer(port=CALLBACK_PORT)
983
+ try:
984
+ await callback_server.start(expected_state=state)
985
+
986
+ # [HEADLESS SUPPORT] Display appropriate instructions
987
+ if is_headless:
988
+ auth_panel_text = Text.from_markup(
989
+ "Running in headless environment (no GUI detected).\n"
990
+ "Please open the URL below in a browser on another machine to authorize:\n"
991
+ "1. Visit the URL below to sign in with your phone number.\n"
992
+ "2. [bold]Authorize the application[/bold] to access your account.\n"
993
+ "3. You will be automatically redirected after authorization."
994
+ )
995
+ else:
996
+ auth_panel_text = Text.from_markup(
997
+ "1. Visit the URL below to sign in with your phone number.\n"
998
+ "2. [bold]Authorize the application[/bold] to access your account.\n"
999
+ "3. You will be automatically redirected after authorization."
1000
+ )
1001
+
1002
+ console.print(
1003
+ Panel(
1004
+ auth_panel_text,
1005
+ title=f"iFlow OAuth Setup for [bold yellow]{display_name}[/bold yellow]",
1006
+ style="bold blue",
1007
+ )
1008
+ )
1009
+ escaped_url = rich_escape(auth_url)
1010
+ console.print(f"[bold]URL:[/bold] [link={auth_url}]{escaped_url}[/link]\n")
1011
+
1012
+ # [HEADLESS SUPPORT] Only attempt browser open if NOT headless
1013
+ if not is_headless:
1014
+ try:
1015
+ webbrowser.open(auth_url)
1016
+ lib_logger.info("Browser opened successfully for iFlow OAuth flow")
1017
+ except Exception as e:
1018
+ lib_logger.warning(
1019
+ f"Failed to open browser automatically: {e}. Please open the URL manually."
1020
+ )
1021
+
1022
+ # Wait for callback
1023
+ with console.status(
1024
+ "[bold green]Waiting for authorization in the browser...[/bold green]",
1025
+ spinner="dots",
1026
+ ):
1027
+ # Note: The 300s timeout here is handled by the ReauthCoordinator
1028
+ # We use a slightly longer internal timeout to let the coordinator handle it
1029
+ code = await callback_server.wait_for_callback(timeout=310.0)
1030
+
1031
+ lib_logger.info("Received authorization code, exchanging for tokens...")
1032
+
1033
+ # Exchange code for tokens and API key
1034
+ token_data = await self._exchange_code_for_tokens(code, redirect_uri)
1035
+
1036
+ # Update credentials
1037
+ creds.update(
1038
+ {
1039
+ "access_token": token_data["access_token"],
1040
+ "refresh_token": token_data["refresh_token"],
1041
+ "api_key": token_data["api_key"],
1042
+ "email": token_data["email"],
1043
+ "expiry_date": token_data["expiry_date"],
1044
+ "token_type": token_data["token_type"],
1045
+ "scope": token_data["scope"],
1046
+ }
1047
+ )
1048
+
1049
+ # Create metadata object
1050
+ if not creds.get("_proxy_metadata"):
1051
+ creds["_proxy_metadata"] = {
1052
+ "email": token_data["email"],
1053
+ "last_check_timestamp": time.time(),
1054
+ }
1055
+
1056
+ if path:
1057
+ await self._save_credentials(path, creds)
1058
+
1059
+ lib_logger.info(
1060
+ f"iFlow OAuth initialized successfully for '{display_name}'."
1061
+ )
1062
+ return creds
1063
+
1064
+ finally:
1065
+ await callback_server.stop()
1066
 
1067
  async def initialize_token(
1068
  self, creds_or_path: Union[Dict[str, Any], str]
1069
  ) -> Dict[str, Any]:
1070
  """
1071
+ Initialize OAuth token, triggering interactive authorization flow if needed.
1072
+
1073
+ If interactive OAuth is required (expired refresh token, missing credentials, etc.),
1074
+ the flow is coordinated globally via ReauthCoordinator to ensure only one
1075
+ interactive OAuth flow runs at a time across all providers.
1076
  """
1077
  path = creds_or_path if isinstance(creds_or_path, str) else None
1078
 
 
1112
  f"iFlow OAuth token for '{display_name}' needs setup: {reason}."
1113
  )
1114
 
1115
+ # [GLOBAL REAUTH COORDINATION] Use the global coordinator to ensure
1116
+ # only one interactive OAuth flow runs at a time across all providers
1117
+ coordinator = get_reauth_coordinator()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1118
 
1119
+ # Define the interactive OAuth function to be executed by coordinator
1120
+ async def _do_interactive_oauth():
1121
+ return await self._perform_interactive_oauth(
1122
+ path, creds, display_name
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1123
  )
 
1124
 
1125
+ # Execute via global coordinator (ensures only one at a time)
1126
+ return await coordinator.execute_reauth(
1127
+ credential_path=path or display_name,
1128
+ provider_name="IFLOW",
1129
+ reauth_func=_do_interactive_oauth,
1130
+ timeout=300.0, # 5 minute timeout for user to complete OAuth
1131
+ )
1132
 
1133
  lib_logger.info(f"iFlow OAuth token at '{display_name}' is valid.")
1134
  return creds
src/rotator_library/providers/qwen_auth_base.py CHANGED
@@ -22,6 +22,7 @@ from rich.text import Text
22
  from rich.markup import escape as rich_escape
23
 
24
  from ..utils.headless_detection import is_headless_environment
 
25
 
26
  lib_logger = logging.getLogger("rotator_library")
27
 
@@ -53,9 +54,12 @@ class QwenAuthBase:
53
  # [QUEUE SYSTEM] Sequential refresh processing
54
  self._refresh_queue: asyncio.Queue = asyncio.Queue()
55
  self._queued_credentials: set = set() # Track credentials already in queue
56
- self._unavailable_credentials: set = (
57
- set()
58
- ) # Mark credentials unavailable during re-auth
 
 
 
59
  self._queue_tracking_lock = asyncio.Lock() # Protects queue sets
60
  self._queue_processor_task: Optional[asyncio.Task] = (
61
  None # Background worker task
@@ -472,15 +476,28 @@ class QwenAuthBase:
472
  Proactively refreshes tokens if they're close to expiry.
473
  Only applies to OAuth credentials (file paths or env:// paths). Direct API keys are skipped.
474
  """
475
- # Check if it's an env:// virtual path (OAuth credentials from environment)
476
- is_env_path = credential_identifier.startswith("env://")
477
 
478
- # Only refresh if it's an OAuth credential (file path or env:// path)
479
- if not is_env_path and not os.path.isfile(credential_identifier):
480
- return # Direct API key, no refresh needed
 
 
 
 
 
 
 
481
 
482
- creds = await self._load_credentials(credential_identifier)
483
- if self._is_token_expired(creds):
 
 
 
 
 
 
 
484
  # Queue for refresh with needs_reauth=False (automated refresh)
485
  await self._queue_refresh(
486
  credential_identifier, force=False, needs_reauth=False
@@ -494,8 +511,30 @@ class QwenAuthBase:
494
  return self._refresh_locks[path]
495
 
496
  def is_credential_available(self, path: str) -> bool:
497
- """Check if a credential is available for rotation (not queued/refreshing)."""
498
- return path not in self._unavailable_credentials
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
499
 
500
  async def _ensure_queue_processor_running(self):
501
  """Lazily starts the queue processor if not already running."""
@@ -531,7 +570,12 @@ class QwenAuthBase:
531
  async with self._queue_tracking_lock:
532
  if path not in self._queued_credentials:
533
  self._queued_credentials.add(path)
534
- self._unavailable_credentials.add(path) # Mark as unavailable
 
 
 
 
 
535
  await self._refresh_queue.put((path, force, needs_reauth))
536
  await self._ensure_queue_processor_running()
537
 
@@ -546,7 +590,22 @@ class QwenAuthBase:
546
  self._refresh_queue.get(), timeout=60.0
547
  )
548
  except asyncio.TimeoutError:
549
- # No items for 60s, exit to save resources
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
550
  self._queue_processor_task = None
551
  return
552
 
@@ -558,7 +617,11 @@ class QwenAuthBase:
558
  if creds and not self._is_token_expired(creds):
559
  # No longer expired, mark as available
560
  async with self._queue_tracking_lock:
561
- self._unavailable_credentials.discard(path)
 
 
 
 
562
  continue
563
 
564
  # Perform refresh
@@ -568,26 +631,240 @@ class QwenAuthBase:
568
 
569
  # SUCCESS: Mark as available again
570
  async with self._queue_tracking_lock:
571
- self._unavailable_credentials.discard(path)
 
 
 
 
572
 
573
  finally:
574
- # Remove from queued set
 
575
  async with self._queue_tracking_lock:
576
  self._queued_credentials.discard(path)
 
 
 
 
 
 
577
  self._refresh_queue.task_done()
578
  except asyncio.CancelledError:
 
 
 
 
 
 
 
 
579
  break
580
  except Exception as e:
581
  lib_logger.error(f"Error in queue processor: {e}")
582
  # Even on error, mark as available (backoff will prevent immediate retry)
583
  if path:
584
  async with self._queue_tracking_lock:
585
- self._unavailable_credentials.discard(path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
586
 
587
  async def initialize_token(
588
  self, creds_or_path: Union[Dict[str, Any], str]
589
  ) -> Dict[str, Any]:
590
- """Initiates device flow if tokens are missing or invalid."""
 
 
 
 
 
 
591
  path = creds_or_path if isinstance(creds_or_path, str) else None
592
 
593
  # Get display name from metadata if available, otherwise derive from path
@@ -623,189 +900,23 @@ class QwenAuthBase:
623
  f"Qwen OAuth token for '{display_name}' needs setup: {reason}."
624
  )
625
 
626
- # [HEADLESS DETECTION] Check if running in headless environment
627
- is_headless = is_headless_environment()
628
-
629
- code_verifier = (
630
- base64.urlsafe_b64encode(secrets.token_bytes(32))
631
- .decode("utf-8")
632
- .rstrip("=")
633
- )
634
- code_challenge = (
635
- base64.urlsafe_b64encode(
636
- hashlib.sha256(code_verifier.encode("utf-8")).digest()
637
- )
638
- .decode("utf-8")
639
- .rstrip("=")
640
- )
641
-
642
- headers = {
643
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
644
- "Content-Type": "application/x-www-form-urlencoded",
645
- "Accept": "application/json",
646
- }
647
- async with httpx.AsyncClient() as client:
648
- request_data = {
649
- "client_id": CLIENT_ID,
650
- "scope": SCOPE,
651
- "code_challenge": code_challenge,
652
- "code_challenge_method": "S256",
653
- }
654
- lib_logger.debug(f"Qwen device code request data: {request_data}")
655
- try:
656
- dev_response = await client.post(
657
- "https://chat.qwen.ai/api/v1/oauth2/device/code",
658
- headers=headers,
659
- data=request_data,
660
- )
661
- dev_response.raise_for_status()
662
- dev_data = dev_response.json()
663
- lib_logger.debug(f"Qwen device auth response: {dev_data}")
664
- except httpx.HTTPStatusError as e:
665
- lib_logger.error(
666
- f"Qwen device code request failed with status {e.response.status_code}: {e.response.text}"
667
- )
668
- raise e
669
-
670
- # [HEADLESS SUPPORT] Display appropriate instructions
671
- if is_headless:
672
- auth_panel_text = Text.from_markup(
673
- "Running in headless environment (no GUI detected).\n"
674
- "Please open the URL below in a browser on another machine to authorize:\n"
675
- "1. Visit the URL below to sign in.\n"
676
- "2. [bold]Copy your email[/bold] or another unique identifier and authorize the application.\n"
677
- "3. You will be prompted to enter your identifier after authorization."
678
- )
679
- else:
680
- auth_panel_text = Text.from_markup(
681
- "1. Visit the URL below to sign in.\n"
682
- "2. [bold]Copy your email[/bold] or another unique identifier and authorize the application.\n"
683
- "3. You will be prompted to enter your identifier after authorization."
684
- )
685
-
686
- console.print(
687
- Panel(
688
- auth_panel_text,
689
- title=f"Qwen OAuth Setup for [bold yellow]{display_name}[/bold yellow]",
690
- style="bold blue",
691
- )
692
- )
693
- # [URL DISPLAY] Print URL with proper escaping to prevent Rich markup issues.
694
- # IMPORTANT: OAuth URLs contain special characters (=, &, etc.) that Rich might
695
- # interpret as markup in some terminal configurations. We escape the URL to
696
- # ensure it displays correctly.
697
- #
698
- # KNOWN ISSUE: If Rich rendering fails entirely (e.g., terminal doesn't support
699
- # ANSI codes, or output is piped), the escaped URL should still be valid.
700
- # However, if the terminal strips or mangles the output, users should copy
701
- # the URL directly from logs or use --verbose to see the raw URL.
702
- #
703
- # The [link=...] markup creates a clickable hyperlink in supported terminals
704
- # (iTerm2, Windows Terminal, etc.), but the displayed text is the escaped URL
705
- # which can be safely copied even if the hyperlink doesn't work.
706
- verification_url = dev_data["verification_uri_complete"]
707
- escaped_url = rich_escape(verification_url)
708
- console.print(
709
- f"[bold]URL:[/bold] [link={verification_url}]{escaped_url}[/link]\n"
710
- )
711
-
712
- # [HEADLESS SUPPORT] Only attempt browser open if NOT headless
713
- if not is_headless:
714
- try:
715
- webbrowser.open(dev_data["verification_uri_complete"])
716
- lib_logger.info(
717
- "Browser opened successfully for Qwen OAuth flow"
718
- )
719
- except Exception as e:
720
- lib_logger.warning(
721
- f"Failed to open browser automatically: {e}. Please open the URL manually."
722
- )
723
 
724
- token_data = None
725
- start_time = time.time()
726
- interval = dev_data.get("interval", 5)
727
-
728
- with console.status(
729
- "[bold green]Polling for token, please complete authentication in the browser...[/bold green]",
730
- spinner="dots",
731
- ) as status:
732
- while time.time() - start_time < dev_data["expires_in"]:
733
- poll_response = await client.post(
734
- TOKEN_ENDPOINT,
735
- headers=headers,
736
- data={
737
- "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
738
- "device_code": dev_data["device_code"],
739
- "client_id": CLIENT_ID,
740
- "code_verifier": code_verifier,
741
- },
742
- )
743
- if poll_response.status_code == 200:
744
- token_data = poll_response.json()
745
- lib_logger.info("Successfully received token.")
746
- break
747
- elif poll_response.status_code == 400:
748
- poll_data = poll_response.json()
749
- error_type = poll_data.get("error")
750
- if error_type == "authorization_pending":
751
- lib_logger.debug(
752
- f"Polling status: {error_type}, waiting {interval}s"
753
- )
754
- elif error_type == "slow_down":
755
- interval = int(interval * 1.5)
756
- if interval > 10:
757
- interval = 10
758
- lib_logger.debug(
759
- f"Polling status: {error_type}, waiting {interval}s"
760
- )
761
- else:
762
- raise ValueError(
763
- f"Token polling failed: {poll_data.get('error_description', error_type)}"
764
- )
765
- else:
766
- poll_response.raise_for_status()
767
-
768
- await asyncio.sleep(interval)
769
-
770
- if not token_data:
771
- raise TimeoutError("Qwen device flow timed out.")
772
-
773
- creds.update(
774
- {
775
- "access_token": token_data["access_token"],
776
- "refresh_token": token_data.get("refresh_token"),
777
- "expiry_date": (time.time() + token_data["expires_in"])
778
- * 1000,
779
- "resource_url": token_data.get("resource_url"),
780
- }
781
  )
782
 
783
- # Prompt for user identifier and create metadata object if needed
784
- if not creds.get("_proxy_metadata", {}).get("email"):
785
- try:
786
- prompt_text = Text.from_markup(
787
- f"\\n[bold]Please enter your email or a unique identifier for [yellow]'{display_name}'[/yellow][/bold]"
788
- )
789
- email = Prompt.ask(prompt_text)
790
- creds["_proxy_metadata"] = {
791
- "email": email.strip(),
792
- "last_check_timestamp": time.time(),
793
- }
794
- except (EOFError, KeyboardInterrupt):
795
- console.print(
796
- "\\n[bold yellow]No identifier provided. Deduplication will not be possible.[/bold yellow]"
797
- )
798
- creds["_proxy_metadata"] = {
799
- "email": None,
800
- "last_check_timestamp": time.time(),
801
- }
802
-
803
- if path:
804
- await self._save_credentials(path, creds)
805
- lib_logger.info(
806
- f"Qwen OAuth initialized successfully for '{display_name}'."
807
- )
808
- return creds
809
 
810
  lib_logger.info(f"Qwen OAuth token at '{display_name}' is valid.")
811
  return creds
 
22
  from rich.markup import escape as rich_escape
23
 
24
  from ..utils.headless_detection import is_headless_environment
25
+ from ..utils.reauth_coordinator import get_reauth_coordinator
26
 
27
  lib_logger = logging.getLogger("rotator_library")
28
 
 
54
  # [QUEUE SYSTEM] Sequential refresh processing
55
  self._refresh_queue: asyncio.Queue = asyncio.Queue()
56
  self._queued_credentials: set = set() # Track credentials already in queue
57
+ # [FIX PR#34] Changed from set to dict mapping credential path to timestamp
58
+ # This enables TTL-based stale entry cleanup as defense in depth
59
+ self._unavailable_credentials: Dict[
60
+ str, float
61
+ ] = {} # Maps credential path -> timestamp when marked unavailable
62
+ self._unavailable_ttl_seconds: int = 300 # 5 minutes TTL for stale entries
63
  self._queue_tracking_lock = asyncio.Lock() # Protects queue sets
64
  self._queue_processor_task: Optional[asyncio.Task] = (
65
  None # Background worker task
 
476
  Proactively refreshes tokens if they're close to expiry.
477
  Only applies to OAuth credentials (file paths or env:// paths). Direct API keys are skipped.
478
  """
479
+ lib_logger.debug(f"proactively_refresh called for: {credential_identifier}")
 
480
 
481
+ # Try to load credentials - this will fail for direct API keys
482
+ # and succeed for OAuth credentials (file paths or env:// paths)
483
+ try:
484
+ creds = await self._load_credentials(credential_identifier)
485
+ except IOError as e:
486
+ # Not a valid credential path (likely a direct API key string)
487
+ lib_logger.debug(
488
+ f"Skipping refresh for '{credential_identifier}' - not an OAuth credential: {e}"
489
+ )
490
+ return
491
 
492
+ is_expired = self._is_token_expired(creds)
493
+ lib_logger.debug(
494
+ f"Token expired check for '{Path(credential_identifier).name}': {is_expired}"
495
+ )
496
+
497
+ if is_expired:
498
+ lib_logger.debug(
499
+ f"Queueing refresh for '{Path(credential_identifier).name}'"
500
+ )
501
  # Queue for refresh with needs_reauth=False (automated refresh)
502
  await self._queue_refresh(
503
  credential_identifier, force=False, needs_reauth=False
 
511
  return self._refresh_locks[path]
512
 
513
  def is_credential_available(self, path: str) -> bool:
514
+ """Check if a credential is available for rotation (not queued/refreshing).
515
+
516
+ [FIX PR#34] Now includes TTL-based stale entry cleanup as defense in depth.
517
+ If a credential has been unavailable for longer than _unavailable_ttl_seconds,
518
+ it is automatically cleaned up and considered available.
519
+ """
520
+ if path not in self._unavailable_credentials:
521
+ return True
522
+
523
+ # [FIX PR#34] Check if the entry is stale (TTL expired)
524
+ marked_time = self._unavailable_credentials.get(path)
525
+ if marked_time is not None:
526
+ now = time.time()
527
+ if now - marked_time > self._unavailable_ttl_seconds:
528
+ # Entry is stale - clean it up and return available
529
+ lib_logger.warning(
530
+ f"Credential '{Path(path).name}' was stuck in unavailable state for "
531
+ f"{int(now - marked_time)}s (TTL: {self._unavailable_ttl_seconds}s). "
532
+ f"Auto-cleaning stale entry."
533
+ )
534
+ self._unavailable_credentials.pop(path, None)
535
+ return True
536
+
537
+ return False
538
 
539
  async def _ensure_queue_processor_running(self):
540
  """Lazily starts the queue processor if not already running."""
 
570
  async with self._queue_tracking_lock:
571
  if path not in self._queued_credentials:
572
  self._queued_credentials.add(path)
573
+ # [FIX PR#34] Store timestamp when marking unavailable (for TTL cleanup)
574
+ self._unavailable_credentials[path] = time.time()
575
+ lib_logger.debug(
576
+ f"Marked '{Path(path).name}' as unavailable. "
577
+ f"Total unavailable: {len(self._unavailable_credentials)}"
578
+ )
579
  await self._refresh_queue.put((path, force, needs_reauth))
580
  await self._ensure_queue_processor_running()
581
 
 
590
  self._refresh_queue.get(), timeout=60.0
591
  )
592
  except asyncio.TimeoutError:
593
+ # [FIX PR#34] Clean up any stale unavailable entries before exiting
594
+ # If we're idle for 60s, no refreshes are in progress
595
+ async with self._queue_tracking_lock:
596
+ if self._unavailable_credentials:
597
+ stale_count = len(self._unavailable_credentials)
598
+ lib_logger.warning(
599
+ f"Queue processor idle timeout. Cleaning {stale_count} "
600
+ f"stale unavailable credentials: {list(self._unavailable_credentials.keys())}"
601
+ )
602
+ self._unavailable_credentials.clear()
603
+ # [FIX BUG#6] Also clear queued credentials to prevent stuck state
604
+ if self._queued_credentials:
605
+ lib_logger.debug(
606
+ f"Clearing {len(self._queued_credentials)} queued credentials on timeout"
607
+ )
608
+ self._queued_credentials.clear()
609
  self._queue_processor_task = None
610
  return
611
 
 
617
  if creds and not self._is_token_expired(creds):
618
  # No longer expired, mark as available
619
  async with self._queue_tracking_lock:
620
+ self._unavailable_credentials.pop(path, None)
621
+ lib_logger.debug(
622
+ f"Credential '{Path(path).name}' no longer expired, marked available. "
623
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
624
+ )
625
  continue
626
 
627
  # Perform refresh
 
631
 
632
  # SUCCESS: Mark as available again
633
  async with self._queue_tracking_lock:
634
+ self._unavailable_credentials.pop(path, None)
635
+ lib_logger.debug(
636
+ f"Refresh SUCCESS for '{Path(path).name}', marked available. "
637
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
638
+ )
639
 
640
  finally:
641
+ # [FIX PR#34] Remove from BOTH queued set AND unavailable credentials
642
+ # This ensures cleanup happens in ALL exit paths (success, exception, etc.)
643
  async with self._queue_tracking_lock:
644
  self._queued_credentials.discard(path)
645
+ # [FIX PR#34] Always clean up unavailable credentials in finally block
646
+ self._unavailable_credentials.pop(path, None)
647
+ lib_logger.debug(
648
+ f"Finally cleanup for '{Path(path).name}'. "
649
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
650
+ )
651
  self._refresh_queue.task_done()
652
  except asyncio.CancelledError:
653
+ # [FIX PR#34] Clean up the current credential before breaking
654
+ if path:
655
+ async with self._queue_tracking_lock:
656
+ self._unavailable_credentials.pop(path, None)
657
+ lib_logger.debug(
658
+ f"CancelledError cleanup for '{Path(path).name}'. "
659
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
660
+ )
661
  break
662
  except Exception as e:
663
  lib_logger.error(f"Error in queue processor: {e}")
664
  # Even on error, mark as available (backoff will prevent immediate retry)
665
  if path:
666
  async with self._queue_tracking_lock:
667
+ self._unavailable_credentials.pop(path, None)
668
+ lib_logger.debug(
669
+ f"Error cleanup for '{Path(path).name}': {e}. "
670
+ f"Remaining unavailable: {len(self._unavailable_credentials)}"
671
+ )
672
+
673
+ async def _perform_interactive_oauth(
674
+ self, path: str, creds: Dict[str, Any], display_name: str
675
+ ) -> Dict[str, Any]:
676
+ """
677
+ Perform interactive OAuth device flow (browser-based authentication).
678
+
679
+ This method is called via the global ReauthCoordinator to ensure
680
+ only one interactive OAuth flow runs at a time across all providers.
681
+
682
+ Args:
683
+ path: Credential file path
684
+ creds: Current credentials dict (will be updated)
685
+ display_name: Display name for logging/UI
686
+
687
+ Returns:
688
+ Updated credentials dict with new tokens
689
+ """
690
+ # [HEADLESS DETECTION] Check if running in headless environment
691
+ is_headless = is_headless_environment()
692
+
693
+ code_verifier = (
694
+ base64.urlsafe_b64encode(secrets.token_bytes(32))
695
+ .decode("utf-8")
696
+ .rstrip("=")
697
+ )
698
+ code_challenge = (
699
+ base64.urlsafe_b64encode(
700
+ hashlib.sha256(code_verifier.encode("utf-8")).digest()
701
+ )
702
+ .decode("utf-8")
703
+ .rstrip("=")
704
+ )
705
+
706
+ headers = {
707
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
708
+ "Content-Type": "application/x-www-form-urlencoded",
709
+ "Accept": "application/json",
710
+ }
711
+ async with httpx.AsyncClient() as client:
712
+ request_data = {
713
+ "client_id": CLIENT_ID,
714
+ "scope": SCOPE,
715
+ "code_challenge": code_challenge,
716
+ "code_challenge_method": "S256",
717
+ }
718
+ lib_logger.debug(f"Qwen device code request data: {request_data}")
719
+ try:
720
+ dev_response = await client.post(
721
+ "https://chat.qwen.ai/api/v1/oauth2/device/code",
722
+ headers=headers,
723
+ data=request_data,
724
+ )
725
+ dev_response.raise_for_status()
726
+ dev_data = dev_response.json()
727
+ lib_logger.debug(f"Qwen device auth response: {dev_data}")
728
+ except httpx.HTTPStatusError as e:
729
+ lib_logger.error(
730
+ f"Qwen device code request failed with status {e.response.status_code}: {e.response.text}"
731
+ )
732
+ raise e
733
+
734
+ # [HEADLESS SUPPORT] Display appropriate instructions
735
+ if is_headless:
736
+ auth_panel_text = Text.from_markup(
737
+ "Running in headless environment (no GUI detected).\n"
738
+ "Please open the URL below in a browser on another machine to authorize:\n"
739
+ "1. Visit the URL below to sign in.\n"
740
+ "2. [bold]Copy your email[/bold] or another unique identifier and authorize the application.\n"
741
+ "3. You will be prompted to enter your identifier after authorization."
742
+ )
743
+ else:
744
+ auth_panel_text = Text.from_markup(
745
+ "1. Visit the URL below to sign in.\n"
746
+ "2. [bold]Copy your email[/bold] or another unique identifier and authorize the application.\n"
747
+ "3. You will be prompted to enter your identifier after authorization."
748
+ )
749
+
750
+ console.print(
751
+ Panel(
752
+ auth_panel_text,
753
+ title=f"Qwen OAuth Setup for [bold yellow]{display_name}[/bold yellow]",
754
+ style="bold blue",
755
+ )
756
+ )
757
+ verification_url = dev_data["verification_uri_complete"]
758
+ escaped_url = rich_escape(verification_url)
759
+ console.print(
760
+ f"[bold]URL:[/bold] [link={verification_url}]{escaped_url}[/link]\n"
761
+ )
762
+
763
+ # [HEADLESS SUPPORT] Only attempt browser open if NOT headless
764
+ if not is_headless:
765
+ try:
766
+ webbrowser.open(dev_data["verification_uri_complete"])
767
+ lib_logger.info("Browser opened successfully for Qwen OAuth flow")
768
+ except Exception as e:
769
+ lib_logger.warning(
770
+ f"Failed to open browser automatically: {e}. Please open the URL manually."
771
+ )
772
+
773
+ token_data = None
774
+ start_time = time.time()
775
+ interval = dev_data.get("interval", 5)
776
+
777
+ with console.status(
778
+ "[bold green]Polling for token, please complete authentication in the browser...[/bold green]",
779
+ spinner="dots",
780
+ ) as status:
781
+ while time.time() - start_time < dev_data["expires_in"]:
782
+ poll_response = await client.post(
783
+ TOKEN_ENDPOINT,
784
+ headers=headers,
785
+ data={
786
+ "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
787
+ "device_code": dev_data["device_code"],
788
+ "client_id": CLIENT_ID,
789
+ "code_verifier": code_verifier,
790
+ },
791
+ )
792
+ if poll_response.status_code == 200:
793
+ token_data = poll_response.json()
794
+ lib_logger.info("Successfully received token.")
795
+ break
796
+ elif poll_response.status_code == 400:
797
+ poll_data = poll_response.json()
798
+ error_type = poll_data.get("error")
799
+ if error_type == "authorization_pending":
800
+ lib_logger.debug(
801
+ f"Polling status: {error_type}, waiting {interval}s"
802
+ )
803
+ elif error_type == "slow_down":
804
+ interval = int(interval * 1.5)
805
+ if interval > 10:
806
+ interval = 10
807
+ lib_logger.debug(
808
+ f"Polling status: {error_type}, waiting {interval}s"
809
+ )
810
+ else:
811
+ raise ValueError(
812
+ f"Token polling failed: {poll_data.get('error_description', error_type)}"
813
+ )
814
+ else:
815
+ poll_response.raise_for_status()
816
+
817
+ await asyncio.sleep(interval)
818
+
819
+ if not token_data:
820
+ raise TimeoutError("Qwen device flow timed out.")
821
+
822
+ creds.update(
823
+ {
824
+ "access_token": token_data["access_token"],
825
+ "refresh_token": token_data.get("refresh_token"),
826
+ "expiry_date": (time.time() + token_data["expires_in"]) * 1000,
827
+ "resource_url": token_data.get("resource_url"),
828
+ }
829
+ )
830
+
831
+ # Prompt for user identifier and create metadata object if needed
832
+ if not creds.get("_proxy_metadata", {}).get("email"):
833
+ try:
834
+ prompt_text = Text.from_markup(
835
+ f"\\n[bold]Please enter your email or a unique identifier for [yellow]'{display_name}'[/yellow][/bold]"
836
+ )
837
+ email = Prompt.ask(prompt_text)
838
+ creds["_proxy_metadata"] = {
839
+ "email": email.strip(),
840
+ "last_check_timestamp": time.time(),
841
+ }
842
+ except (EOFError, KeyboardInterrupt):
843
+ console.print(
844
+ "\\n[bold yellow]No identifier provided. Deduplication will not be possible.[/bold yellow]"
845
+ )
846
+ creds["_proxy_metadata"] = {
847
+ "email": None,
848
+ "last_check_timestamp": time.time(),
849
+ }
850
+
851
+ if path:
852
+ await self._save_credentials(path, creds)
853
+ lib_logger.info(
854
+ f"Qwen OAuth initialized successfully for '{display_name}'."
855
+ )
856
+ return creds
857
 
858
  async def initialize_token(
859
  self, creds_or_path: Union[Dict[str, Any], str]
860
  ) -> Dict[str, Any]:
861
+ """
862
+ Initialize OAuth token, triggering interactive device flow if needed.
863
+
864
+ If interactive OAuth is required (expired refresh token, missing credentials, etc.),
865
+ the flow is coordinated globally via ReauthCoordinator to ensure only one
866
+ interactive OAuth flow runs at a time across all providers.
867
+ """
868
  path = creds_or_path if isinstance(creds_or_path, str) else None
869
 
870
  # Get display name from metadata if available, otherwise derive from path
 
900
  f"Qwen OAuth token for '{display_name}' needs setup: {reason}."
901
  )
902
 
903
+ # [GLOBAL REAUTH COORDINATION] Use the global coordinator to ensure
904
+ # only one interactive OAuth flow runs at a time across all providers
905
+ coordinator = get_reauth_coordinator()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
906
 
907
+ # Define the interactive OAuth function to be executed by coordinator
908
+ async def _do_interactive_oauth():
909
+ return await self._perform_interactive_oauth(
910
+ path, creds, display_name
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
911
  )
912
 
913
+ # Execute via global coordinator (ensures only one at a time)
914
+ return await coordinator.execute_reauth(
915
+ credential_path=path or display_name,
916
+ provider_name="QWEN_CODE",
917
+ reauth_func=_do_interactive_oauth,
918
+ timeout=300.0, # 5 minute timeout for user to complete OAuth
919
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
920
 
921
  lib_logger.info(f"Qwen OAuth token at '{display_name}' is valid.")
922
  return creds
src/rotator_library/utils/__init__.py CHANGED
@@ -1,5 +1,6 @@
1
  # src/rotator_library/utils/__init__.py
2
 
3
  from .headless_detection import is_headless_environment
 
4
 
5
- __all__ = ['is_headless_environment']
 
1
  # src/rotator_library/utils/__init__.py
2
 
3
  from .headless_detection import is_headless_environment
4
+ from .reauth_coordinator import get_reauth_coordinator, ReauthCoordinator
5
 
6
+ __all__ = ["is_headless_environment", "get_reauth_coordinator", "ReauthCoordinator"]
src/rotator_library/utils/reauth_coordinator.py ADDED
@@ -0,0 +1,236 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # src/rotator_library/utils/reauth_coordinator.py
2
+
3
+ """
4
+ Global Re-authentication Coordinator
5
+
6
+ Ensures only ONE interactive OAuth flow runs at a time across ALL providers.
7
+ This prevents port conflicts and user confusion when multiple credentials
8
+ need re-authentication simultaneously.
9
+
10
+ When a credential needs interactive re-auth (expired refresh token, revoked, etc.),
11
+ it queues a request here. The coordinator ensures only one re-auth happens at a time,
12
+ regardless of which provider the credential belongs to.
13
+ """
14
+
15
+ import asyncio
16
+ import logging
17
+ import time
18
+ from typing import Callable, Optional, Dict, Any, Awaitable
19
+ from pathlib import Path
20
+
21
+ lib_logger = logging.getLogger("rotator_library")
22
+
23
+
24
+ class ReauthCoordinator:
25
+ """
26
+ Singleton coordinator for global re-authentication serialization.
27
+
28
+ When a credential needs interactive re-auth (expired refresh token, revoked, etc.),
29
+ it queues a request here. The coordinator ensures only one re-auth happens at a time.
30
+
31
+ This is critical because:
32
+ 1. Different providers may use the same callback ports
33
+ 2. User can only complete one OAuth flow at a time
34
+ 3. Prevents race conditions in credential state management
35
+ """
36
+
37
+ _instance: Optional["ReauthCoordinator"] = None
38
+ _initialized: bool = False # Class-level declaration for Pylint
39
+
40
+ def __new__(cls):
41
+ # Singleton pattern - only one coordinator exists
42
+ if cls._instance is None:
43
+ cls._instance = super().__new__(cls)
44
+ cls._instance._initialized = False
45
+ return cls._instance
46
+
47
+ def __init__(self):
48
+ if self._initialized:
49
+ return
50
+
51
+ # Global semaphore - only 1 re-auth at a time
52
+ self._reauth_semaphore: asyncio.Semaphore = asyncio.Semaphore(1)
53
+
54
+ # Tracking for observability
55
+ self._pending_reauths: Dict[str, float] = {} # credential -> queue_time
56
+ self._current_reauth: Optional[str] = None
57
+ self._current_provider: Optional[str] = None
58
+ self._reauth_start_time: Optional[float] = None
59
+
60
+ # Lock for tracking dict modifications
61
+ self._tracking_lock: asyncio.Lock = asyncio.Lock()
62
+
63
+ # Statistics
64
+ self._total_reauths: int = 0
65
+ self._successful_reauths: int = 0
66
+ self._failed_reauths: int = 0
67
+ self._timeout_reauths: int = 0
68
+
69
+ self._initialized = True
70
+ lib_logger.info("Global ReauthCoordinator initialized")
71
+
72
+ def _get_display_name(self, credential_path: str) -> str:
73
+ """Get a display-friendly name for a credential path."""
74
+ if credential_path.startswith("env://"):
75
+ return credential_path
76
+ return Path(credential_path).name
77
+
78
+ async def execute_reauth(
79
+ self,
80
+ credential_path: str,
81
+ provider_name: str,
82
+ reauth_func: Callable[[], Awaitable[Dict[str, Any]]],
83
+ timeout: float = 300.0, # 5 minutes default timeout
84
+ ) -> Dict[str, Any]:
85
+ """
86
+ Execute a re-authentication function with global serialization.
87
+
88
+ Only one re-auth can run at a time across all providers.
89
+ Other requests wait in queue.
90
+
91
+ Args:
92
+ credential_path: Path/identifier of the credential needing re-auth
93
+ provider_name: Name of the provider (for logging)
94
+ reauth_func: Async function that performs the actual re-auth
95
+ timeout: Maximum time to wait for re-auth to complete
96
+
97
+ Returns:
98
+ The result from reauth_func (new credentials dict)
99
+
100
+ Raises:
101
+ TimeoutError: If re-auth doesn't complete within timeout
102
+ Exception: Any exception from reauth_func is re-raised
103
+ """
104
+ display_name = self._get_display_name(credential_path)
105
+
106
+ # Track that this credential is waiting
107
+ async with self._tracking_lock:
108
+ self._pending_reauths[credential_path] = time.time()
109
+ pending_count = len(self._pending_reauths)
110
+
111
+ # Log queue status
112
+ if self._current_reauth:
113
+ current_display = self._get_display_name(self._current_reauth)
114
+ lib_logger.info(
115
+ f"[ReauthCoordinator] Credential '{display_name}' ({provider_name}) queued for re-auth. "
116
+ f"Position in queue: {pending_count}. "
117
+ f"Currently processing: '{current_display}' ({self._current_provider})"
118
+ )
119
+ else:
120
+ lib_logger.info(
121
+ f"[ReauthCoordinator] Credential '{display_name}' ({provider_name}) requesting re-auth."
122
+ )
123
+
124
+ try:
125
+ # Acquire global semaphore - blocks until our turn
126
+ async with self._reauth_semaphore:
127
+ # Calculate how long we waited in queue
128
+ async with self._tracking_lock:
129
+ queue_time = self._pending_reauths.pop(credential_path, time.time())
130
+ wait_duration = time.time() - queue_time
131
+ self._current_reauth = credential_path
132
+ self._current_provider = provider_name
133
+ self._reauth_start_time = time.time()
134
+ self._total_reauths += 1
135
+
136
+ if wait_duration > 1.0:
137
+ lib_logger.info(
138
+ f"[ReauthCoordinator] Starting re-auth for '{display_name}' ({provider_name}) "
139
+ f"after waiting {wait_duration:.1f}s in queue"
140
+ )
141
+ else:
142
+ lib_logger.info(
143
+ f"[ReauthCoordinator] Starting re-auth for '{display_name}' ({provider_name})"
144
+ )
145
+
146
+ try:
147
+ # Execute the actual re-auth with timeout
148
+ result = await asyncio.wait_for(reauth_func(), timeout=timeout)
149
+
150
+ async with self._tracking_lock:
151
+ self._successful_reauths += 1
152
+ duration = time.time() - self._reauth_start_time
153
+
154
+ lib_logger.info(
155
+ f"[ReauthCoordinator] Re-auth SUCCESS for '{display_name}' ({provider_name}) "
156
+ f"in {duration:.1f}s"
157
+ )
158
+ return result
159
+
160
+ except asyncio.TimeoutError:
161
+ async with self._tracking_lock:
162
+ self._failed_reauths += 1
163
+ self._timeout_reauths += 1
164
+ lib_logger.error(
165
+ f"[ReauthCoordinator] Re-auth TIMEOUT for '{display_name}' ({provider_name}) "
166
+ f"after {timeout}s. User did not complete OAuth flow in time."
167
+ )
168
+ raise TimeoutError(
169
+ f"Re-authentication timed out after {timeout}s. "
170
+ f"Please try again and complete the OAuth flow within the time limit."
171
+ )
172
+
173
+ except Exception as e:
174
+ async with self._tracking_lock:
175
+ self._failed_reauths += 1
176
+ lib_logger.error(
177
+ f"[ReauthCoordinator] Re-auth FAILED for '{display_name}' ({provider_name}): {e}"
178
+ )
179
+ raise
180
+
181
+ finally:
182
+ async with self._tracking_lock:
183
+ self._current_reauth = None
184
+ self._current_provider = None
185
+ self._reauth_start_time = None
186
+
187
+ # Log if there are still pending reauths
188
+ if self._pending_reauths:
189
+ lib_logger.info(
190
+ f"[ReauthCoordinator] {len(self._pending_reauths)} credential(s) "
191
+ f"still waiting for re-auth"
192
+ )
193
+
194
+ finally:
195
+ # Ensure we're removed from pending even if something goes wrong
196
+ async with self._tracking_lock:
197
+ self._pending_reauths.pop(credential_path, None)
198
+
199
+ def is_reauth_in_progress(self) -> bool:
200
+ """Check if a re-auth is currently in progress."""
201
+ return self._current_reauth is not None
202
+
203
+ def get_pending_count(self) -> int:
204
+ """Get number of credentials waiting for re-auth."""
205
+ return len(self._pending_reauths)
206
+
207
+ def get_status(self) -> Dict[str, Any]:
208
+ """Get current coordinator status for debugging/monitoring."""
209
+ return {
210
+ "current_reauth": self._current_reauth,
211
+ "current_provider": self._current_provider,
212
+ "reauth_in_progress": self._current_reauth is not None,
213
+ "reauth_duration": (time.time() - self._reauth_start_time)
214
+ if self._reauth_start_time
215
+ else None,
216
+ "pending_count": len(self._pending_reauths),
217
+ "pending_credentials": list(self._pending_reauths.keys()),
218
+ "stats": {
219
+ "total": self._total_reauths,
220
+ "successful": self._successful_reauths,
221
+ "failed": self._failed_reauths,
222
+ "timeouts": self._timeout_reauths,
223
+ },
224
+ }
225
+
226
+
227
+ # Global singleton instance
228
+ _coordinator: Optional[ReauthCoordinator] = None
229
+
230
+
231
+ def get_reauth_coordinator() -> ReauthCoordinator:
232
+ """Get the global ReauthCoordinator instance."""
233
+ global _coordinator
234
+ if _coordinator is None:
235
+ _coordinator = ReauthCoordinator()
236
+ return _coordinator