import time import threading from collections import deque from typing import Optional import google.generativeai as genai # ─── Model Pool (only models with actual quota) ─────────────────────────────── # Ordered by preference: most quota first MODEL_POOL = [ { "key": "gemini-3.1-flash-lite", "name": "Gemini 3.1 Flash Lite", "rpm": 15, "rpd": 500, "tpm": 250_000, }, { "key": "gemini-2.5-flash-lite", # gemini-2.5-flash-lite-preview-06-17 if needed "name": "Gemini 2.5 Flash Lite", "rpm": 10, "rpd": 20, "tpm": 250_000, }, { "key": "gemini-2.5-flash", "name": "Gemini 2.5 Flash", "rpm": 5, "rpd": 20, "tpm": 250_000, }, { "key": "gemini-2.0-flash", # "Gemini 3 Flash" in the UI "name": "Gemini 3 Flash", "rpm": 5, "rpd": 20, "tpm": 250_000, }, ] class ModelManager: """ Tracks per-model rate limits (RPM + RPD) and automatically shuffles to the next available model when a limit is reached. Resets minute/day windows with a sliding window approach. """ def __init__(self): self._lock = threading.Lock() # For each model key: deque of UTC timestamps for recent calls self._minute_calls: dict[str, deque] = {m["key"]: deque() for m in MODEL_POOL} self._day_calls: dict[str, deque] = {m["key"]: deque() for m in MODEL_POOL} # Track which models are in a cooldown (rate-limited by the API itself) self._cooldown_until: dict[str, float] = {m["key"]: 0.0 for m in MODEL_POOL} def _prune(self, dq: deque, window_seconds: int) -> None: """Remove timestamps outside the rolling window.""" cutoff = time.time() - window_seconds while dq and dq[0] < cutoff: dq.popleft() def _is_available(self, model: dict) -> bool: key = model["key"] now = time.time() # Hard cooldown (e.g. after a 429) if now < self._cooldown_until[key]: return False self._prune(self._minute_calls[key], 60) self._prune(self._day_calls[key], 86_400) rpm_ok = len(self._minute_calls[key]) < model["rpm"] rpd_ok = len(self._day_calls[key]) < model["rpd"] return rpm_ok and rpd_ok def _record_call(self, key: str) -> None: now = time.time() self._minute_calls[key].append(now) self._day_calls[key].append(now) def _set_cooldown(self, key: str, seconds: int = 65) -> None: """Call this after receiving a 429 to pause that model.""" self._cooldown_until[key] = time.time() + seconds print(f"[ModelManager] {key} in cooldown for {seconds}s") def get_available_model(self) -> Optional[dict]: """Return the first model that has remaining quota, or None.""" with self._lock: for model in MODEL_POOL: if self._is_available(model): return model return None def call_with_fallback(self, system_prompt: str) -> tuple[str, str]: """ Try each model in order. On success return (response_text, model_key). On 429 / quota error, mark the model as cooled down and try the next. Raises RuntimeError if all models are exhausted. """ import google.api_core.exceptions as gex with self._lock: candidates = [m for m in MODEL_POOL if self._is_available(m)] if not candidates: raise RuntimeError("All models are rate-limited. Try again later.") for model_info in candidates: key = model_info["key"] try: genai_model = genai.GenerativeModel( key, generation_config={"response_mime_type": "application/json"}, ) response = genai_model.generate_content(system_prompt) with self._lock: self._record_call(key) print(f"[ModelManager] Used: {key}") return response.text, key except gex.ResourceExhausted as e: print(f"[ModelManager] 429 on {key}: {e}") with self._lock: self._set_cooldown(key, seconds=65) continue # try next model except Exception as e: print(f"[ModelManager] Error on {key}: {e}") continue # skip broken model, try next raise RuntimeError("All models failed or are rate-limited.") def status(self) -> list[dict]: """Return current usage snapshot for all models (useful for /api/models endpoint).""" now = time.time() result = [] with self._lock: for m in MODEL_POOL: key = m["key"] self._prune(self._minute_calls[key], 60) self._prune(self._day_calls[key], 86_400) cooldown_remaining = max(0, self._cooldown_until[key] - now) result.append({ "key": key, "name": m["name"], "rpm_limit": m["rpm"], "rpd_limit": m["rpd"], "rpm_used": len(self._minute_calls[key]), "rpd_used": len(self._day_calls[key]), "available": self._is_available(m), "cooldown_seconds": round(cooldown_remaining), }) return result # Singleton — import this in main.py model_manager = ModelManager()