import asyncio import glob import logging import os from typing import List, Optional, Set from launcher.config import SAVED_AUTH_DIR logger = logging.getLogger("AuthManager") class AuthManager: """ Manages authentication profiles for rotation and error recovery. """ def __init__(self) -> None: self.failed_profiles: Set[str] = set() self.current_profile: Optional[str] = None # Initialize with the profile from environment if available initial_profile = os.environ.get("ACTIVE_AUTH_JSON_PATH") if initial_profile: self.current_profile = initial_profile async def get_available_profiles(self) -> List[str]: """List all .json files in the saved auth directory.""" if not os.path.exists(SAVED_AUTH_DIR): logger.warning(f"Saved auth directory not found: {SAVED_AUTH_DIR}") return [] loop = asyncio.get_running_loop() pattern = os.path.join(SAVED_AUTH_DIR, "*.json") # Run glob in executor to avoid blocking the event loop profiles = await loop.run_in_executor(None, glob.glob, pattern) return sorted(profiles) # Sort for deterministic order async def get_next_profile(self) -> str: """ Get the next available profile that hasn't failed yet. Raises RuntimeError if no profiles are available. """ profiles = await self.get_available_profiles() # Get set of basenames for failed profiles (prevents duplicates due to path differences) failed_basenames = {os.path.basename(p) for p in self.failed_profiles} current_basename = ( os.path.basename(self.current_profile) if self.current_profile else None ) # Filter out failed profiles by basename comparison available = [ p for p in profiles if os.path.basename(p) not in failed_basenames and os.path.basename(p) != current_basename # Also exclude the current profile ] if not available: msg = f"All authentication profiles exhausted. Failed: {len(self.failed_profiles)}, Total: {len(profiles)}" logger.critical(msg) raise RuntimeError(msg) # Simple strategy: Pick the first available one. next_profile = available[0] self.current_profile = next_profile logger.info(f"Switched to auth profile: {os.path.basename(next_profile)}") return next_profile def mark_profile_failed(self, profile_path: Optional[str] = None) -> None: """Mark a profile as failed so it won't be used again in this cycle.""" target = profile_path or self.current_profile if target: self.failed_profiles.add(target) logger.warning(f"Marked auth profile as failed: {os.path.basename(target)}") else: logger.warning( "Attempted to mark profile failed but no profile provided or active." ) def reset_failures(self) -> None: """Reset the failure tracking.""" self.failed_profiles.clear() logger.info("Auth profile failure tracking reset.") # Global instance auth_manager = AuthManager()