AIstudioProxyAPI / api_utils /auth_manager.py
peijun1's picture
Deploy AI Studio Proxy API to Hugging Face Spaces
a5784e9
Raw
History Blame Contribute Delete
3.25 kB
import asyncio
import glob
import logging
import os
from typing import List, Optional, Set
from launcher.config import SAVED_AUTH_DIR
logger = logging.getLogger("AuthManager")
class AuthManager:
"""
Manages authentication profiles for rotation and error recovery.
"""
def __init__(self) -> None:
self.failed_profiles: Set[str] = set()
self.current_profile: Optional[str] = None
# Initialize with the profile from environment if available
initial_profile = os.environ.get("ACTIVE_AUTH_JSON_PATH")
if initial_profile:
self.current_profile = initial_profile
async def get_available_profiles(self) -> List[str]:
"""List all .json files in the saved auth directory."""
if not os.path.exists(SAVED_AUTH_DIR):
logger.warning(f"Saved auth directory not found: {SAVED_AUTH_DIR}")
return []
loop = asyncio.get_running_loop()
pattern = os.path.join(SAVED_AUTH_DIR, "*.json")
# Run glob in executor to avoid blocking the event loop
profiles = await loop.run_in_executor(None, glob.glob, pattern)
return sorted(profiles) # Sort for deterministic order
async def get_next_profile(self) -> str:
"""
Get the next available profile that hasn't failed yet.
Raises RuntimeError if no profiles are available.
"""
profiles = await self.get_available_profiles()
# Get set of basenames for failed profiles (prevents duplicates due to path differences)
failed_basenames = {os.path.basename(p) for p in self.failed_profiles}
current_basename = (
os.path.basename(self.current_profile) if self.current_profile else None
)
# Filter out failed profiles by basename comparison
available = [
p
for p in profiles
if os.path.basename(p) not in failed_basenames
and os.path.basename(p)
!= current_basename # Also exclude the current profile
]
if not available:
msg = f"All authentication profiles exhausted. Failed: {len(self.failed_profiles)}, Total: {len(profiles)}"
logger.critical(msg)
raise RuntimeError(msg)
# Simple strategy: Pick the first available one.
next_profile = available[0]
self.current_profile = next_profile
logger.info(f"Switched to auth profile: {os.path.basename(next_profile)}")
return next_profile
def mark_profile_failed(self, profile_path: Optional[str] = None) -> None:
"""Mark a profile as failed so it won't be used again in this cycle."""
target = profile_path or self.current_profile
if target:
self.failed_profiles.add(target)
logger.warning(f"Marked auth profile as failed: {os.path.basename(target)}")
else:
logger.warning(
"Attempted to mark profile failed but no profile provided or active."
)
def reset_failures(self) -> None:
"""Reset the failure tracking."""
self.failed_profiles.clear()
logger.info("Auth profile failure tracking reset.")
# Global instance
auth_manager = AuthManager()