| import time |
| import threading |
| from typing import Dict, List, Tuple, Optional, Iterator, Union |
| from app.config import get_settings |
| from loguru import logger |
|
|
| class RequestLimitManager: |
| _instance = None |
| _lock = threading.Lock() |
| |
| def __new__(cls, provider: str): |
| if cls._instance is None: |
| with cls._lock: |
| if cls._instance is None: |
| cls._instance = super().__new__(cls) |
| return cls._instance |
| |
| def __init__(self, provider: str): |
| if hasattr(self, 'initialized'): |
| return |
| self.provider = provider |
| self.lock = threading.Lock() |
| self._init_keys_models() |
| self.initialized = True |
|
|
| def _init_keys_models(self): |
| settings = get_settings() |
| if self.provider == "gemini": |
| self.api_keys: List[str] = getattr(settings, 'gemini_api_keys_list', []) |
| self.models: List[str] = getattr(settings, 'gemini_models_list', []) |
| |
| self.status: Dict[str, Dict[str, Dict[str, Union[str, float]]]] = {} |
| now = time.time() |
| for key in self.api_keys: |
| self.status[key] = {} |
| for model in self.models: |
| self.status[key][model] = {"status": "active", "timestamp": now} |
| self.current_key: Optional[str] = self.api_keys[0] if self.api_keys else None |
| self.current_model: Optional[str] = self.models[0] if self.models else None |
| key_display = f"{self.current_key[:5]}...{self.current_key[-5:]}" if self.current_key else "None" |
| logger.info(f"[LIMIT] Initialized with current key={key_display} model={self.current_model}") |
|
|
| def get_current_key_model(self) -> Tuple[str, str]: |
| """ |
| Trả về cặp key/model hiện tại đang active. |
| Chỉ scan tìm key/model mới khi current pair bị blocked. |
| """ |
| with self.lock: |
| now = time.time() |
| |
| |
| if self.current_key and self.current_model: |
| info = self.status.get(self.current_key, {}).get(self.current_model, {}) |
| status = info.get("status", "active") |
| ts = float(info.get("timestamp", 0.0)) |
| |
| if status == "active" or (status == "blocked" and now > ts): |
| logger.info(f"[LIMIT] Using current key={self.current_key[:5]}...{self.current_key[-5:]} model={self.current_model}") |
| return self.current_key, self.current_model |
| |
| |
| logger.warning(f"[LIMIT] Current pair not available, scanning for new key/model...") |
| new_key, new_model = self._find_available_key_model() |
| |
| if new_key and new_model: |
| self.current_key = new_key |
| self.current_model = new_model |
| logger.info(f"[LIMIT] Switched to new key={self.current_key[:5]}...{self.current_key[-5:]} model={self.current_model}") |
| return self.current_key, self.current_model |
| else: |
| logger.error(f"[LIMIT] No available key/model found for provider {self.provider}") |
| raise RuntimeError(f"No available key/model for provider {self.provider}") |
|
|
| def _find_available_key_model(self) -> Tuple[Optional[str], Optional[str]]: |
| """ |
| Tìm cặp key/model khả dụng gần nhất. |
| """ |
| now = time.time() |
| keys = self.api_keys[:] |
| models = self.models[:] |
| |
| |
| if self.current_key and self.current_key in keys: |
| keys.remove(self.current_key) |
| keys = [self.current_key] + keys |
| if self.current_model and self.current_model in models: |
| models.remove(self.current_model) |
| models = [self.current_model] + models |
| |
| for key in keys: |
| for model in models: |
| info = self.status.get(key, {}).get(model, {"status": "active", "timestamp": 0.0}) |
| status = info.get("status", "active") |
| ts = float(info.get("timestamp", 0.0)) |
| |
| if status == "active" or (status == "blocked" and now > ts): |
| logger.info(f"[LIMIT] Found available key={key[:5]}...{key[-5:]} model={model}") |
| return key, model |
| |
| return None, None |
|
|
| def log_request(self, key: str, model: str, success: bool, retry_delay: Optional[int] = None): |
| """ |
| Log kết quả request và cập nhật status. |
| Nếu request fail với 429, trigger scan cho key/model mới. |
| """ |
| with self.lock: |
| now = time.time() |
| if key not in self.status: |
| self.status[key] = {} |
| if model not in self.status[key]: |
| self.status[key][model] = {"status": "active", "timestamp": now} |
| |
| if success: |
| logger.info(f"[LIMIT] Mark key={key[:5]}...{key[-5:]} - model={model} as active at {now}") |
| self.status[key][model]["status"] = "active" |
| self.status[key][model]["timestamp"] = now |
| else: |
| logger.warning(f"[LIMIT] Mark key={key[:5]}...{key[-5:]} - model={model} as blocked until {now + (retry_delay or 60)} (retry_delay={retry_delay})") |
| self.status[key][model]["status"] = "blocked" |
| self.status[key][model]["timestamp"] = now + (retry_delay or 60) |
| |
| |
| |
| if key == self.current_key and model == self.current_model: |
| logger.warning(f"[LIMIT] Current pair blocked, will scan for new pair on next request") |
| |
|
|
| def iterate_key_model(self) -> Iterator[Tuple[str, str]]: |
| """ |
| Legacy method - chỉ trả về current pair. |
| Để tương thích với code cũ. |
| """ |
| key, model = self.get_current_key_model() |
| yield key, model |