Spaces:
Sleeping
Sleeping
| """Key Pool Manager – yt-flow (main space). NEVER tests keys.""" | |
| import json, os, time, threading, urllib.request, urllib.error, ssl, random | |
| from pathlib import Path | |
| POOL_FILE = Path("/app/verified_keys.json") | |
| DATASET_REPO = os.environ.get("OPENCLAW_DATASET_REPO", "arshitmalik/yt-pipeline-data") | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| TESTER_SPACE = "arshitmalik/yt-keys" | |
| POOL_KEY = "verified_keys.json" | |
| PROVIDER_PRIORITY = ["anthropic", "openai", "google", "mistral", "xai", "others"] | |
| _lock = threading.Lock() | |
| pool = {} | |
| _usage_idx = {} | |
| ctx = ssl.create_default_context() | |
| ctx.check_hostname = False; ctx.verify_mode = ssl.CERT_NONE | |
| def _fetch(url): | |
| try: | |
| req = urllib.request.Request(url, headers={"Authorization": f"Bearer {HF_TOKEN}"}) | |
| with urllib.request.urlopen(req, timeout=30, context=ctx) as r: | |
| return r.read().decode() | |
| except Exception: return None | |
| def download_pool(): | |
| global pool | |
| url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{POOL_KEY}" | |
| data = _fetch(url) | |
| if data: | |
| try: | |
| with _lock: pool = json.loads(data) | |
| print(f"[keypool] Downloaded {sum(len(v) for v in pool.values())} keys") | |
| except Exception: pass | |
| def trigger_tester(): | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi(token=HF_TOKEN) | |
| api.restart_space(TESTER_SPACE, factory_reboot=True) | |
| print("[keypool] Triggered yt-keys factory rebuild") | |
| except Exception as e: | |
| print(f"[keypool] Failed to trigger tester: {e}") | |
| def get_key(provider=None): | |
| with _lock: | |
| providers = [provider] if provider else PROVIDER_PRIORITY | |
| for p in providers: | |
| keys = pool.get(p, []) | |
| if keys: | |
| idx = _usage_idx.get(p, 0) % len(keys) | |
| key = keys[idx] | |
| _usage_idx[p] = idx + 1 | |
| return key, p | |
| return None, None | |
| def remove_key(key): | |
| with _lock: | |
| for p in pool: | |
| if key in pool[p]: | |
| pool[p].remove(key) | |
| print(f"[keypool] Removed dead key ({p}): {key[:14]}...") | |
| return True | |
| return False | |
| def pool_empty(): | |
| with _lock: return sum(len(v) for v in pool.values()) == 0 | |
| def start(): | |
| download_pool() | |
| print(f"[keypool] Started. {sum(len(v) for v in pool.values())} keys loaded.") | |