import time import threading from typing import Dict, List, Tuple, Optional, Iterator, Union from app.config import get_settings from loguru import logger class RequestLimitManager: def __init__(self, provider: str): self.provider = provider self.lock = threading.Lock() self._init_keys_models() def _init_keys_models(self): settings = get_settings() if self.provider == "gemini": self.api_keys: List[str] = getattr(settings, 'gemini_api_keys_list', []) self.models: List[str] = getattr(settings, 'gemini_models_list', []) # Có thể mở rộng cho provider khác ở đây self.status: Dict[str, Dict[str, Dict[str, Union[str, float]]]] = {} now = time.time() for key in self.api_keys: self.status[key] = {} for model in self.models: self.status[key][model] = {"status": "active", "timestamp": now} self.default_key: Optional[str] = self.api_keys[0] if self.api_keys else None self.default_model: Optional[str] = self.models[0] if self.models else None def log_request(self, key: str, model: str, success: bool, retry_delay: Optional[int] = None): with self.lock: now = time.time() if key not in self.status: self.status[key] = {} if model not in self.status[key]: self.status[key][model] = {"status": "active", "timestamp": now} if success: logger.info(f"[LIMIT] Mark key={key[:5]}...{key[-5:]} - model={model} as active at {now}") self.status[key][model]["status"] = "active" self.status[key][model]["timestamp"] = now else: logger.warning(f"[LIMIT] Mark key={key[:5]}...{key[-5:]} - model={model} as blocked until {now + (retry_delay or 60)} (retry_delay={retry_delay})") self.status[key][model]["status"] = "blocked" self.status[key][model]["timestamp"] = now + (retry_delay or 60) def iterate_key_model(self) -> Iterator[Tuple[str, str]]: now = time.time() keys = self.api_keys[:] models = self.models[:] # Ưu tiên default key/model nếu có if self.default_key and self.default_key in keys: keys.remove(self.default_key) keys = [self.default_key] + keys if self.default_model and self.default_model in models: models.remove(self.default_model) models = [self.default_model] + models logger.info(f"[LIMIT] Trying key/model candidates: {[(k[:6]+'...', m) for k in keys for m in models]}") found = False for key in keys: for model in models: info = self.status.get(key, {}).get(model, {"status": "active", "timestamp": 0.0}) status = info.get("status", "active") ts = float(info.get("timestamp", 0.0)) if status == "active": logger.info(f"[LIMIT] Use key={key[:5]}...{key[-5:]} - model={model} (active)") found = True yield key, model elif status == "blocked" and now > ts: logger.info(f"[LIMIT] Use key={key[:5]}...{key[-5:]} - model={model} (was blocked, now retry)") found = True yield key, model if not found: logger.warning(f"[LIMIT] No available key/model for provider {self.provider}") pass # Nếu không có key/model nào hợp lệ, không yield gì