File size: 3,248 Bytes
a5784e9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import asyncio
import glob
import logging
import os
from typing import List, Optional, Set

from launcher.config import SAVED_AUTH_DIR

logger = logging.getLogger("AuthManager")


class AuthManager:
    """
    Manages authentication profiles for rotation and error recovery.
    """

    def __init__(self) -> None:
        self.failed_profiles: Set[str] = set()
        self.current_profile: Optional[str] = None

        # Initialize with the profile from environment if available
        initial_profile = os.environ.get("ACTIVE_AUTH_JSON_PATH")
        if initial_profile:
            self.current_profile = initial_profile

    async def get_available_profiles(self) -> List[str]:
        """List all .json files in the saved auth directory."""
        if not os.path.exists(SAVED_AUTH_DIR):
            logger.warning(f"Saved auth directory not found: {SAVED_AUTH_DIR}")
            return []

        loop = asyncio.get_running_loop()
        pattern = os.path.join(SAVED_AUTH_DIR, "*.json")
        # Run glob in executor to avoid blocking the event loop
        profiles = await loop.run_in_executor(None, glob.glob, pattern)
        return sorted(profiles)  # Sort for deterministic order

    async def get_next_profile(self) -> str:
        """
        Get the next available profile that hasn't failed yet.
        Raises RuntimeError if no profiles are available.
        """
        profiles = await self.get_available_profiles()

        # Get set of basenames for failed profiles (prevents duplicates due to path differences)
        failed_basenames = {os.path.basename(p) for p in self.failed_profiles}
        current_basename = (
            os.path.basename(self.current_profile) if self.current_profile else None
        )

        # Filter out failed profiles by basename comparison
        available = [
            p
            for p in profiles
            if os.path.basename(p) not in failed_basenames
            and os.path.basename(p)
            != current_basename  # Also exclude the current profile
        ]

        if not available:
            msg = f"All authentication profiles exhausted. Failed: {len(self.failed_profiles)}, Total: {len(profiles)}"
            logger.critical(msg)
            raise RuntimeError(msg)

        # Simple strategy: Pick the first available one.
        next_profile = available[0]
        self.current_profile = next_profile
        logger.info(f"Switched to auth profile: {os.path.basename(next_profile)}")
        return next_profile

    def mark_profile_failed(self, profile_path: Optional[str] = None) -> None:
        """Mark a profile as failed so it won't be used again in this cycle."""
        target = profile_path or self.current_profile
        if target:
            self.failed_profiles.add(target)
            logger.warning(f"Marked auth profile as failed: {os.path.basename(target)}")
        else:
            logger.warning(
                "Attempted to mark profile failed but no profile provided or active."
            )

    def reset_failures(self) -> None:
        """Reset the failure tracking."""
        self.failed_profiles.clear()
        logger.info("Auth profile failure tracking reset.")


# Global instance
auth_manager = AuthManager()