God Agent OS CI
πŸš€ Deploy God Agent OS v11 - 2026-05-17 07:49
02117ee
"""
KeyPool β€” Multi-API Key Manager with Failover & Cooldown
God Agent OS v8 β€” Supports Gemini, SambaNova, GitHub, OpenAI, Groq, etc.
Each provider can have multiple comma-separated keys.
"""
import asyncio
import time
from collections import defaultdict
from typing import Dict, List, Optional
import structlog
log = structlog.get_logger()
COOLDOWN_SECONDS = 60 # Key cooling time after max failures
MAX_FAILURES = 3 # Max fails before cooldown
class KeyEntry:
def __init__(self, key: str):
self.key = key
self.failures = 0
self.cooldown_until = 0.0
self.calls = 0
self.last_used = 0.0
def is_available(self) -> bool:
if self.cooldown_until > time.time():
return False
return True
def mark_fail(self):
self.failures += 1
if self.failures >= MAX_FAILURES:
self.cooldown_until = time.time() + COOLDOWN_SECONDS
log.warning("Key cooldown activated", failures=self.failures)
def mark_success(self):
self.failures = max(0, self.failures - 1)
self.cooldown_until = 0.0
self.calls += 1
self.last_used = time.time()
def status(self) -> dict:
remaining = max(0, self.cooldown_until - time.time())
return {
"key_preview": self.key[:8] + "..." + self.key[-4:] if len(self.key) > 12 else "***",
"available": self.is_available(),
"failures": self.failures,
"calls": self.calls,
"cooldown_remaining_s": round(remaining, 1),
}
class KeyPool:
"""
Pool of API keys for a single provider.
Round-robins through available keys with failure tracking.
"""
def __init__(self, provider: str, keys: List[str]):
self.provider = provider
self._keys: List[KeyEntry] = [KeyEntry(k.strip()) for k in keys if k.strip()]
self._index = 0
def __len__(self) -> int:
return len(self._keys)
def pick(self) -> Optional[str]:
"""Pick the next available key (round-robin, skip cooling down keys)."""
if not self._keys:
return None
n = len(self._keys)
for _ in range(n):
entry = self._keys[self._index % n]
self._index = (self._index + 1) % n
if entry.is_available():
return entry.key
# All keys in cooldown β€” try the one with shortest cooldown
soonest = min(self._keys, key=lambda e: e.cooldown_until)
log.warning(
"All keys in cooldown, using soonest",
provider=self.provider,
cooldown_remaining=round(soonest.cooldown_until - time.time(), 1),
)
return soonest.key
def mark_fail(self, key: str):
for e in self._keys:
if e.key == key:
e.mark_fail()
return
def mark_success(self, key: str):
for e in self._keys:
if e.key == key:
e.mark_success()
return
def available_count(self) -> int:
return sum(1 for e in self._keys if e.is_available())
def status(self) -> dict:
return {
"provider": self.provider,
"total_keys": len(self._keys),
"available_keys": self.available_count(),
"keys": [e.status() for e in self._keys],
}
# ─── Global Key Pool Registry ─────────────────────────────────────────────────
class KeyPoolRegistry:
"""Central registry for all provider key pools."""
def __init__(self):
self._pools: Dict[str, KeyPool] = {}
def register(self, provider: str, keys_csv: str):
"""Register comma-separated keys for a provider."""
keys = [k.strip() for k in keys_csv.split(",") if k.strip()]
if keys:
self._pools[provider] = KeyPool(provider, keys)
log.info("KeyPool registered", provider=provider, count=len(keys))
def get(self, provider: str) -> Optional[KeyPool]:
return self._pools.get(provider)
def all_status(self) -> dict:
return {name: pool.status() for name, pool in self._pools.items()}