Spaces:
Paused
Paused
File size: 3,248 Bytes
a5784e9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | import asyncio
import glob
import logging
import os
from typing import List, Optional, Set
from launcher.config import SAVED_AUTH_DIR
logger = logging.getLogger("AuthManager")
class AuthManager:
"""
Manages authentication profiles for rotation and error recovery.
"""
def __init__(self) -> None:
self.failed_profiles: Set[str] = set()
self.current_profile: Optional[str] = None
# Initialize with the profile from environment if available
initial_profile = os.environ.get("ACTIVE_AUTH_JSON_PATH")
if initial_profile:
self.current_profile = initial_profile
async def get_available_profiles(self) -> List[str]:
"""List all .json files in the saved auth directory."""
if not os.path.exists(SAVED_AUTH_DIR):
logger.warning(f"Saved auth directory not found: {SAVED_AUTH_DIR}")
return []
loop = asyncio.get_running_loop()
pattern = os.path.join(SAVED_AUTH_DIR, "*.json")
# Run glob in executor to avoid blocking the event loop
profiles = await loop.run_in_executor(None, glob.glob, pattern)
return sorted(profiles) # Sort for deterministic order
async def get_next_profile(self) -> str:
"""
Get the next available profile that hasn't failed yet.
Raises RuntimeError if no profiles are available.
"""
profiles = await self.get_available_profiles()
# Get set of basenames for failed profiles (prevents duplicates due to path differences)
failed_basenames = {os.path.basename(p) for p in self.failed_profiles}
current_basename = (
os.path.basename(self.current_profile) if self.current_profile else None
)
# Filter out failed profiles by basename comparison
available = [
p
for p in profiles
if os.path.basename(p) not in failed_basenames
and os.path.basename(p)
!= current_basename # Also exclude the current profile
]
if not available:
msg = f"All authentication profiles exhausted. Failed: {len(self.failed_profiles)}, Total: {len(profiles)}"
logger.critical(msg)
raise RuntimeError(msg)
# Simple strategy: Pick the first available one.
next_profile = available[0]
self.current_profile = next_profile
logger.info(f"Switched to auth profile: {os.path.basename(next_profile)}")
return next_profile
def mark_profile_failed(self, profile_path: Optional[str] = None) -> None:
"""Mark a profile as failed so it won't be used again in this cycle."""
target = profile_path or self.current_profile
if target:
self.failed_profiles.add(target)
logger.warning(f"Marked auth profile as failed: {os.path.basename(target)}")
else:
logger.warning(
"Attempted to mark profile failed but no profile provided or active."
)
def reset_failures(self) -> None:
"""Reset the failure tracking."""
self.failed_profiles.clear()
logger.info("Auth profile failure tracking reset.")
# Global instance
auth_manager = AuthManager()
|