"""Key Pool Manager – yt-flow (main space). NEVER tests keys.""" import json, os, time, threading, urllib.request, urllib.error, ssl, random from pathlib import Path POOL_FILE = Path("/app/verified_keys.json") DATASET_REPO = os.environ.get("OPENCLAW_DATASET_REPO", "arshitmalik/yt-pipeline-data") HF_TOKEN = os.environ.get("HF_TOKEN", "") TESTER_SPACE = "arshitmalik/yt-keys" POOL_KEY = "verified_keys.json" PROVIDER_PRIORITY = ["anthropic", "openai", "google", "mistral", "xai", "others"] _lock = threading.Lock() pool = {} _usage_idx = {} ctx = ssl.create_default_context() ctx.check_hostname = False; ctx.verify_mode = ssl.CERT_NONE def _fetch(url): try: req = urllib.request.Request(url, headers={"Authorization": f"Bearer {HF_TOKEN}"}) with urllib.request.urlopen(req, timeout=30, context=ctx) as r: return r.read().decode() except Exception: return None def download_pool(): global pool url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{POOL_KEY}" data = _fetch(url) if data: try: with _lock: pool = json.loads(data) print(f"[keypool] Downloaded {sum(len(v) for v in pool.values())} keys") except Exception: pass def trigger_tester(): try: from huggingface_hub import HfApi api = HfApi(token=HF_TOKEN) api.restart_space(TESTER_SPACE, factory_reboot=True) print("[keypool] Triggered yt-keys factory rebuild") except Exception as e: print(f"[keypool] Failed to trigger tester: {e}") def get_key(provider=None): with _lock: providers = [provider] if provider else PROVIDER_PRIORITY for p in providers: keys = pool.get(p, []) if keys: idx = _usage_idx.get(p, 0) % len(keys) key = keys[idx] _usage_idx[p] = idx + 1 return key, p return None, None def remove_key(key): with _lock: for p in pool: if key in pool[p]: pool[p].remove(key) print(f"[keypool] Removed dead key ({p}): {key[:14]}...") return True return False def pool_empty(): with _lock: return sum(len(v) for v in pool.values()) == 0 def start(): download_pool() print(f"[keypool] Started. {sum(len(v) for v in pool.values())} keys loaded.")