Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import time | |
| import datetime | |
| import uuid | |
| import asyncio | |
| import httpx | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse, Response, StreamingResponse, HTMLResponse | |
| from starlette.requests import ClientDisconnect | |
| app = FastAPI() | |
| # ===================================================== | |
| # CONFIG | |
| # ===================================================== | |
| MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla") | |
| ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin") | |
| DEFAULT_CF_MODEL = os.getenv("DEFAULT_CF_MODEL", "@cf/moonshotai/kimi-k2.6") | |
| # ===================================================== | |
| # STORAGE MANAGER | |
| # ===================================================== | |
| class StorageManager: | |
| def __init__(self, filepath="data.json"): | |
| self.filepath = filepath | |
| self.lock = asyncio.Lock() | |
| self.data = { | |
| "accounts": [], | |
| "usage": {}, # "YYYY-MM-DD": {"account_id": {"neurons": 0, "input": 0, "output": 0}} | |
| "settings": {"thinking_enabled": True}, | |
| "logs": [] | |
| } | |
| self._load_sync() | |
| def _load_sync(self): | |
| if os.path.exists(self.filepath): | |
| try: | |
| with open(self.filepath, "r", encoding="utf-8") as f: | |
| self.data = json.load(f) | |
| except Exception as e: | |
| print(f"[WARN] Failed to load {self.filepath}: {e}") | |
| if "accounts" not in self.data: self.data["accounts"] = [] | |
| if "usage" not in self.data: self.data["usage"] = {} | |
| if "settings" not in self.data: self.data["settings"] = {"thinking_enabled": True} | |
| if "logs" not in self.data: self.data["logs"] = [] | |
| async def save(self): | |
| async with self.lock: | |
| try: | |
| with open(self.filepath, "w", encoding="utf-8") as f: | |
| json.dump(self.data, f, indent=2) | |
| except Exception as e: | |
| print(f"[WARN] Failed to save {self.filepath}: {e}") | |
| def get_today_utc(self): | |
| return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d") | |
| async def add_usage(self, account_id: str, input_tokens: int, output_tokens: int): | |
| today = self.get_today_utc() | |
| neurons_used = (input_tokens / 1_000_000.0 * 86364) + (output_tokens / 1_000_000.0 * 363636) | |
| async with self.lock: | |
| if today not in self.data["usage"]: | |
| self.data["usage"][today] = {} | |
| if account_id not in self.data["usage"][today]: | |
| self.data["usage"][today][account_id] = {"neurons": 0, "input": 0, "output": 0} | |
| self.data["usage"][today][account_id]["neurons"] += neurons_used | |
| self.data["usage"][today][account_id]["input"] += input_tokens | |
| self.data["usage"][today][account_id]["output"] += output_tokens | |
| asyncio.create_task(self.save()) | |
| return neurons_used | |
| async def add_log(self, account_id: str, model: str, status: int, neurons: float): | |
| log_entry = { | |
| "timestamp": int(time.time()), | |
| "account_id": account_id, | |
| "model": model, | |
| "status": status, | |
| "neurons": neurons | |
| } | |
| async with self.lock: | |
| self.data["logs"].append(log_entry) | |
| if len(self.data["logs"]) > 200: | |
| self.data["logs"] = self.data["logs"][-200:] | |
| asyncio.create_task(self.save()) | |
| storage = StorageManager() | |
| # ===================================================== | |
| # LOAD CF CREDENTIALS | |
| # ===================================================== | |
| ENV_CF_ACCOUNTS = [] # list of {"account_id": ..., "api_key": ...} | |
| for i in range(1, 101): | |
| raw = os.getenv(f"CF_{i}") | |
| if not raw: continue | |
| parts = raw.split(",", 1) | |
| if len(parts) != 2: continue | |
| account_id, api_key = parts[0].strip(), parts[1].strip() | |
| if account_id and api_key: | |
| ENV_CF_ACCOUNTS.append({"account_id": account_id, "api_key": api_key}) | |
| # ===================================================== | |
| # KEY STATUS | |
| # ===================================================== | |
| key_status = {} | |
| _key_lock = asyncio.Lock() | |
| def get_all_accounts(): | |
| accounts = {} | |
| for acc in ENV_CF_ACCOUNTS: | |
| accounts[acc["account_id"]] = {"account_id": acc["account_id"], "api_key": acc["api_key"], "is_env": True} | |
| for acc in storage.data.get("accounts", []): | |
| accounts[acc["account_id"]] = {"account_id": acc["account_id"], "api_key": acc["api_key"], "is_env": False} | |
| if not accounts: | |
| return [{"account_id": "dummy", "api_key": "dummy", "is_env": True}] | |
| return list(accounts.values()) | |
| async def sync_key_status(): | |
| all_accs = get_all_accounts() | |
| async with _key_lock: | |
| for idx, acc in enumerate(all_accs, 1): | |
| kid = acc["account_id"] | |
| if kid not in key_status: | |
| key_status[kid] = { | |
| "index": idx, | |
| "healthy": True, | |
| "busy": False, | |
| "success": 0, | |
| "fail": 0, | |
| "cooldown_until": 0 | |
| } | |
| async def startup_event(): | |
| await sync_key_status() | |
| # ===================================================== | |
| # HELPERS | |
| # ===================================================== | |
| def log(x): | |
| print(f"[{time.strftime('%H:%M:%S')}] {x}", flush=True) | |
| def sse(obj): | |
| return "data: " + json.dumps(obj, ensure_ascii=False) + "\n\n" | |
| def auth_ok(req: Request): | |
| token = req.headers.get("Authorization", "").replace("Bearer ", "") | |
| return token == MASTER_API_KEY | |
| CF_AI_BASE = "https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/v1" | |
| def cf_base(account_id: str) -> str: | |
| return CF_AI_BASE.format(account_id=account_id) | |
| async def get_key(exclude=None): | |
| if exclude is None: | |
| exclude = set() | |
| await sync_key_status() | |
| all_accs = get_all_accounts() | |
| today = storage.get_today_utc() | |
| now = time.time() | |
| async with _key_lock: | |
| # Sequential: Always start from 0 and pick the first available | |
| for acc in all_accs: | |
| kid = acc["account_id"] | |
| st = key_status.get(kid) | |
| if not st: continue | |
| # Reset cooldown if expired | |
| if st["cooldown_until"] > 0 and now > st["cooldown_until"]: | |
| st["healthy"] = True | |
| st["cooldown_until"] = 0 | |
| st["fail"] = 0 | |
| usage_today = 0 | |
| if today in storage.data["usage"] and kid in storage.data["usage"][today]: | |
| usage_today = storage.data["usage"][today][kid].get("neurons", 0) | |
| if usage_today >= 9000 and st["cooldown_until"] == 0: | |
| now_utc = datetime.datetime.now(datetime.timezone.utc) | |
| tomorrow_utc = (now_utc + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) | |
| st["cooldown_until"] = tomorrow_utc.timestamp() | |
| st["healthy"] = False | |
| log(f"[INFO] Account {kid[:8]} reached 9000 neurons, explicitly cooled down until midnight.") | |
| if st["healthy"] and not st["busy"] and kid not in exclude: | |
| st["busy"] = True | |
| return acc | |
| return None | |
| async def release_key(acc): | |
| async with _key_lock: | |
| kid = acc["account_id"] | |
| if kid in key_status: | |
| key_status[kid]["busy"] = False | |
| async def mark_fail(acc, rate_limited=False): | |
| async with _key_lock: | |
| kid = acc["account_id"] | |
| if kid in key_status: | |
| key_status[kid]["fail"] += 1 | |
| if rate_limited or key_status[kid]["fail"] >= 3: | |
| key_status[kid]["healthy"] = False | |
| now_utc = datetime.datetime.now(datetime.timezone.utc) | |
| # Grace period: if Cloudflare is delayed in resetting quota at midnight UTC | |
| if rate_limited and now_utc.hour == 0 and now_utc.minute < 15: | |
| key_status[kid]["cooldown_until"] = time.time() + 300 | |
| log(f"[INFO] Account {kid[:8]} cooled down for 5 mins (Midnight grace period).") | |
| else: | |
| tomorrow_utc = (now_utc + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) | |
| key_status[kid]["cooldown_until"] = tomorrow_utc.timestamp() | |
| log(f"[INFO] Account {kid[:8]} cooled down until tomorrow (UTC midnight).") | |
| async def mark_ok(acc): | |
| async with _key_lock: | |
| kid = acc["account_id"] | |
| if kid in key_status: | |
| key_status[kid]["success"] += 1 | |
| key_status[kid]["fail"] = 0 | |
| key_status[kid]["healthy"] = True | |
| key_status[kid]["cooldown_until"] = 0 | |
| async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.05): | |
| elapsed = 0.0 | |
| while elapsed < max_wait: | |
| acc = await get_key(exclude) | |
| if acc: return acc | |
| await asyncio.sleep(interval) | |
| elapsed += interval | |
| return None | |
| def is_rate_limited_status(status_code: int) -> bool: | |
| return status_code == 429 | |
| def is_rate_limited_error_body(text: str) -> bool: | |
| t = text.lower() | |
| return "rate limit" in t or "too many requests" in t or "usage limit" in t | |
| # ===================================================== | |
| # API / UI ROUTES | |
| # ===================================================== | |
| async def serve_dashboard(): | |
| try: | |
| base_dir = os.path.dirname(os.path.abspath(__file__)) | |
| file_path = os.path.join(base_dir, "dashboard.html") | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| html = f.read() | |
| return HTMLResponse(content=html) | |
| except Exception as e: | |
| return HTMLResponse(content=f"Error loading dashboard: {e}", status_code=500) | |
| async def get_dashboard_data(): | |
| return JSONResponse(storage.data) | |
| async def get_status(): | |
| await sync_key_status() | |
| async with _key_lock: | |
| safe = {} | |
| for kid, v in key_status.items(): | |
| masked = kid[:6] + "****" + kid[-4:] | |
| safe[masked] = { | |
| "index": v.get("index", 0), | |
| "healthy": v.get("healthy", True), | |
| "busy": v.get("busy", False), | |
| "success": v.get("success", 0), | |
| "fail": v.get("fail", 0), | |
| } | |
| return { | |
| "status": "ok", | |
| "accounts": len(get_all_accounts()), | |
| "env_accounts": [acc["account_id"] for acc in ENV_CF_ACCOUNTS], | |
| "default_model": DEFAULT_CF_MODEL, | |
| "detail": safe | |
| } | |
| async def add_account(req: Request): | |
| data = await req.json() | |
| if data.get("password") != ADMIN_PASSWORD: | |
| return JSONResponse({"error": "Unauthorized: Invalid password"}, status_code=401) | |
| acc_id = data.get("account_id") | |
| api_key = data.get("api_key") | |
| if not acc_id or not api_key: | |
| return JSONResponse({"error": "Missing account_id or api_key"}, status_code=400) | |
| async with storage.lock: | |
| for acc in storage.data["accounts"]: | |
| if acc["account_id"] == acc_id: | |
| return JSONResponse({"error": "Account already exists"}, status_code=400) | |
| storage.data["accounts"].append({ | |
| "account_id": acc_id.strip(), | |
| "api_key": api_key.strip() | |
| }) | |
| await storage.save() | |
| await sync_key_status() | |
| return JSONResponse({"status": "success"}) | |
| async def delete_account(account_id: str, password: str): | |
| if password != ADMIN_PASSWORD: | |
| return JSONResponse({"error": "Unauthorized: Invalid password"}, status_code=401) | |
| async with storage.lock: | |
| original_len = len(storage.data["accounts"]) | |
| storage.data["accounts"] = [a for a in storage.data["accounts"] if a["account_id"] != account_id] | |
| if len(storage.data["accounts"]) == original_len: | |
| return JSONResponse({"error": "Account not found in JSON"}, status_code=404) | |
| await storage.save() | |
| async with _key_lock: | |
| if account_id in key_status: | |
| del key_status[account_id] | |
| return JSONResponse({"status": "success"}) | |
| async def update_settings(req: Request): | |
| data = await req.json() | |
| if data.get("password") != ADMIN_PASSWORD: | |
| return JSONResponse({"error": "Unauthorized: Invalid password"}, status_code=401) | |
| new_settings = data.get("settings", {}) | |
| async with storage.lock: | |
| storage.data["settings"].update(new_settings) | |
| await storage.save() | |
| return JSONResponse({"status": "success"}) | |
| async def models(req: Request): | |
| if not auth_ok(req): return JSONResponse({"error": "Unauthorized"}, status_code=401) | |
| return JSONResponse({ | |
| "object": "list", | |
| "data": [{"id": DEFAULT_CF_MODEL, "object": "model", "created": int(time.time()), "owned_by": "cloudflare"}] | |
| }) | |
| # ===================================================== | |
| # /v1/chat/completions | |
| # ===================================================== | |
| async def chat(req: Request): | |
| if not auth_ok(req): | |
| return JSONResponse({"error": "Unauthorized"}, status_code=401) | |
| try: | |
| body = await req.json() | |
| except Exception: | |
| return JSONResponse({"error": "Bad JSON"}, status_code=400) | |
| is_stream = body.get("stream", False) | |
| model = body.get("model", DEFAULT_CF_MODEL) | |
| cf_body = {**body, "model": model} | |
| if is_stream and "stream_options" not in cf_body: | |
| cf_body["stream_options"] = {"include_usage": True} | |
| thinking_enabled = storage.data.get("settings", {}).get("thinking_enabled", True) | |
| all_accs = get_all_accounts() | |
| if not is_stream: | |
| tried = set() | |
| for _ in range(len(all_accs)): | |
| acc = await wait_for_free_key(exclude=tried) | |
| if not acc: break | |
| tried.add(acc["account_id"]) | |
| try: | |
| async with httpx.AsyncClient(timeout=180) as client: | |
| r = await client.post( | |
| f"{cf_base(acc['account_id'])}/chat/completions", | |
| json=cf_body, | |
| headers={"Authorization": f"Bearer {acc['api_key']}"} | |
| ) | |
| if is_rate_limited_status(r.status_code) or (r.status_code != 200 and is_rate_limited_error_body(r.text)): | |
| log(f"Account {acc['account_id'][:8]}... rate limited (non-stream)") | |
| await mark_fail(acc, rate_limited=True) | |
| continue | |
| if r.status_code != 200: | |
| if 400 <= r.status_code < 500: | |
| log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} client error") | |
| return Response(content=r.content, media_type="application/json", status_code=r.status_code) | |
| await mark_fail(acc) | |
| continue | |
| await mark_ok(acc) | |
| resp_json = r.json() | |
| usage_info = resp_json.get("usage", {}) | |
| neurons = await storage.add_usage(acc["account_id"], usage_info.get("prompt_tokens", 0), usage_info.get("completion_tokens", 0)) | |
| await storage.add_log(acc["account_id"], model, r.status_code, neurons) | |
| if not thinking_enabled and "choices" in resp_json: | |
| for choice in resp_json["choices"]: | |
| if "message" in choice and "reasoning_content" in choice["message"]: | |
| del choice["message"]["reasoning_content"] | |
| return JSONResponse(resp_json) | |
| except Exception as e: | |
| log(f"Account {acc['account_id'][:8]}... exception: {e}") | |
| await mark_fail(acc) | |
| finally: | |
| await release_key(acc) | |
| return JSONResponse({"error": {"message": "All accounts failed", "type": "api_error"}}, status_code=500) | |
| # STREAM | |
| async def gen(): | |
| consecutive_failures = 0 | |
| max_failures = max(10, len(all_accs) * 2) | |
| total_usage = {"prompt_tokens": 0, "completion_tokens": 0} | |
| while consecutive_failures < max_failures: | |
| acc = await wait_for_free_key() | |
| if not acc: | |
| yield sse({"error": {"message": "All accounts are busy, rate limited, or have reached their daily limit. Please try again later.", "type": "api_error"}}) | |
| yield "data: [DONE]\n\n" | |
| return | |
| try: | |
| async with httpx.AsyncClient(timeout=15.0) as client: | |
| async with client.stream( | |
| "POST", | |
| f"{cf_base(acc['account_id'])}/chat/completions", | |
| json=cf_body, | |
| headers={"Authorization": f"Bearer {acc['api_key']}"} | |
| ) as r: | |
| if is_rate_limited_status(r.status_code): | |
| log(f"Account {acc['account_id'][:8]}... rate limited (stream)") | |
| await mark_fail(acc, rate_limited=True) | |
| consecutive_failures += 1 | |
| continue | |
| if r.status_code != 200: | |
| if 400 <= r.status_code < 500: | |
| err_text = await r.aread() | |
| try: | |
| err_json = json.loads(err_text) | |
| if "error" not in err_json: err_json = {"error": {"message": str(err_json), "type": "api_error"}} | |
| yield sse(err_json) | |
| except: | |
| yield sse({"error": {"message": err_text.decode("utf-8", "ignore"), "type": "api_error"}}) | |
| yield "data: [DONE]\n\n" | |
| return | |
| await mark_fail(acc) | |
| consecutive_failures += 1 | |
| continue | |
| # Pass-through streaming | |
| try: | |
| async for line in r.aiter_lines(): | |
| if not line: continue | |
| if line.strip() == "data: [DONE]": | |
| yield line + "\n\n" | |
| continue | |
| raw = line[6:] if line.startswith("data: ") else line | |
| try: | |
| j = json.loads(raw) | |
| # Usage | |
| if "usage" in j and j["usage"]: | |
| total_usage["prompt_tokens"] = j["usage"].get("prompt_tokens", 0) | |
| total_usage["completion_tokens"] = j["usage"].get("completion_tokens", 0) | |
| if not thinking_enabled and "choices" in j: | |
| for c in j["choices"]: | |
| if "delta" in c and "reasoning_content" in c["delta"]: | |
| del c["delta"]["reasoning_content"] | |
| yield sse(j) | |
| except: | |
| yield line + "\n\n" | |
| await mark_ok(acc) | |
| neurons = await storage.add_usage(acc["account_id"], total_usage["prompt_tokens"], total_usage["completion_tokens"]) | |
| await storage.add_log(acc["account_id"], model, 200, neurons) | |
| return | |
| except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e: | |
| log(f"STREAM DROP: {acc['account_id'][:8]}... {e}") | |
| await mark_fail(acc, rate_limited=True) | |
| consecutive_failures += 1 | |
| continue | |
| except Exception as e: | |
| log(f"Account {acc['account_id'][:8]}... stream exception: {e}") | |
| await mark_fail(acc) | |
| consecutive_failures += 1 | |
| finally: | |
| await release_key(acc) | |
| yield sse({"error": {"message": "All accounts failed during stream", "type": "api_error"}}) | |
| yield "data: [DONE]\n\n" | |
| return StreamingResponse(gen(), media_type="text/event-stream") | |
| # ===================================================== | |
| # /v1/messages | |
| # ===================================================== | |
| async def anthropic(req: Request): | |
| return JSONResponse({"error": {"message": "Anthropic endpoint is currently disabled. Please use the OpenAI /v1/chat/completions endpoint for native Kimi tool support.", "type": "api_error"}}, status_code=400)) |