Chatopus / app /request_limit_manager.py
VietCat's picture
add quota manager
8b81b1d
raw
history blame
3.61 kB
import time
import threading
from typing import Dict, List, Tuple, Optional, Iterator, Union
from app.config import get_settings
from loguru import logger
class RequestLimitManager:
def __init__(self, provider: str):
self.provider = provider
self.lock = threading.Lock()
self._init_keys_models()
def _init_keys_models(self):
settings = get_settings()
if self.provider == "gemini":
self.api_keys: List[str] = getattr(settings, 'gemini_api_keys_list', [])
self.models: List[str] = getattr(settings, 'gemini_models_list', [])
# Có thể mở rộng cho provider khác ở đây
self.status: Dict[str, Dict[str, Dict[str, Union[str, float]]]] = {}
now = time.time()
for key in self.api_keys:
self.status[key] = {}
for model in self.models:
self.status[key][model] = {"status": "active", "timestamp": now}
self.default_key: Optional[str] = self.api_keys[0] if self.api_keys else None
self.default_model: Optional[str] = self.models[0] if self.models else None
def log_request(self, key: str, model: str, success: bool, retry_delay: Optional[int] = None):
with self.lock:
now = time.time()
if key not in self.status:
self.status[key] = {}
if model not in self.status[key]:
self.status[key][model] = {"status": "active", "timestamp": now}
if success:
logger.info(f"[LIMIT] Mark key={key[:5]}...{key[-5:]} - model={model} as active at {now}")
self.status[key][model]["status"] = "active"
self.status[key][model]["timestamp"] = now
else:
logger.warning(f"[LIMIT] Mark key={key[:5]}...{key[-5:]} - model={model} as blocked until {now + (retry_delay or 60)} (retry_delay={retry_delay})")
self.status[key][model]["status"] = "blocked"
self.status[key][model]["timestamp"] = now + (retry_delay or 60)
def iterate_key_model(self) -> Iterator[Tuple[str, str]]:
now = time.time()
keys = self.api_keys[:]
models = self.models[:]
# Ưu tiên default key/model nếu có
if self.default_key and self.default_key in keys:
keys.remove(self.default_key)
keys = [self.default_key] + keys
if self.default_model and self.default_model in models:
models.remove(self.default_model)
models = [self.default_model] + models
logger.info(f"[LIMIT] Trying key/model candidates: {[(k[:6]+'...', m) for k in keys for m in models]}")
found = False
for key in keys:
for model in models:
info = self.status.get(key, {}).get(model, {"status": "active", "timestamp": 0.0})
status = info.get("status", "active")
ts = float(info.get("timestamp", 0.0))
if status == "active":
logger.info(f"[LIMIT] Use key={key[:5]}...{key[-5:]} - model={model} (active)")
found = True
yield key, model
elif status == "blocked" and now > ts:
logger.info(f"[LIMIT] Use key={key[:5]}...{key[-5:]} - model={model} (was blocked, now retry)")
found = True
yield key, model
if not found:
logger.warning(f"[LIMIT] No available key/model for provider {self.provider}")
pass
# Nếu không có key/model nào hợp lệ, không yield gì