import os import json import time import datetime import uuid import asyncio import httpx from fastapi import FastAPI, Request from fastapi.responses import JSONResponse, Response, StreamingResponse, HTMLResponse from starlette.requests import ClientDisconnect app = FastAPI() # ===================================================== # CONFIG # ===================================================== MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla") ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin") DEFAULT_CF_MODEL = os.getenv("DEFAULT_CF_MODEL", "@cf/moonshotai/kimi-k2.6") # ===================================================== # STORAGE MANAGER # ===================================================== class StorageManager: def __init__(self, filepath="data.json"): self.filepath = filepath self.lock = asyncio.Lock() self.data = { "accounts": [], "usage": {}, # "YYYY-MM-DD": {"account_id": {"neurons": 0, "input": 0, "output": 0}} "settings": {"thinking_enabled": True}, "logs": [] } self._load_sync() def _load_sync(self): if os.path.exists(self.filepath): try: with open(self.filepath, "r", encoding="utf-8") as f: self.data = json.load(f) except Exception as e: print(f"[WARN] Failed to load {self.filepath}: {e}") if "accounts" not in self.data: self.data["accounts"] = [] if "usage" not in self.data: self.data["usage"] = {} if "settings" not in self.data: self.data["settings"] = {"thinking_enabled": True} if "logs" not in self.data: self.data["logs"] = [] async def save(self): async with self.lock: try: with open(self.filepath, "w", encoding="utf-8") as f: json.dump(self.data, f, indent=2) except Exception as e: print(f"[WARN] Failed to save {self.filepath}: {e}") def get_today_utc(self): return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d") async def add_usage(self, account_id: str, input_tokens: int, output_tokens: int): today = self.get_today_utc() neurons_used = (input_tokens / 1_000_000.0 * 86364) + (output_tokens / 1_000_000.0 * 363636) async with self.lock: if today not in self.data["usage"]: self.data["usage"][today] = {} if account_id not in self.data["usage"][today]: self.data["usage"][today][account_id] = {"neurons": 0, "input": 0, "output": 0} self.data["usage"][today][account_id]["neurons"] += neurons_used self.data["usage"][today][account_id]["input"] += input_tokens self.data["usage"][today][account_id]["output"] += output_tokens asyncio.create_task(self.save()) return neurons_used async def add_log(self, account_id: str, model: str, status: int, neurons: float): log_entry = { "timestamp": int(time.time()), "account_id": account_id, "model": model, "status": status, "neurons": neurons } async with self.lock: self.data["logs"].append(log_entry) if len(self.data["logs"]) > 200: self.data["logs"] = self.data["logs"][-200:] asyncio.create_task(self.save()) storage = StorageManager() # ===================================================== # LOAD CF CREDENTIALS # ===================================================== ENV_CF_ACCOUNTS = [] # list of {"account_id": ..., "api_key": ...} for i in range(1, 101): raw = os.getenv(f"CF_{i}") if not raw: continue parts = raw.split(",", 1) if len(parts) != 2: continue account_id, api_key = parts[0].strip(), parts[1].strip() if account_id and api_key: ENV_CF_ACCOUNTS.append({"account_id": account_id, "api_key": api_key}) # ===================================================== # KEY STATUS # ===================================================== key_status = {} _key_lock = asyncio.Lock() def get_all_accounts(): accounts = {} for acc in ENV_CF_ACCOUNTS: accounts[acc["account_id"]] = {"account_id": acc["account_id"], "api_key": acc["api_key"], "is_env": True} for acc in storage.data.get("accounts", []): accounts[acc["account_id"]] = {"account_id": acc["account_id"], "api_key": acc["api_key"], "is_env": False} if not accounts: return [{"account_id": "dummy", "api_key": "dummy", "is_env": True}] return list(accounts.values()) async def sync_key_status(): all_accs = get_all_accounts() async with _key_lock: for idx, acc in enumerate(all_accs, 1): kid = acc["account_id"] if kid not in key_status: key_status[kid] = { "index": idx, "healthy": True, "busy": False, "success": 0, "fail": 0, "cooldown_until": 0 } @app.on_event("startup") async def startup_event(): await sync_key_status() # ===================================================== # HELPERS # ===================================================== def log(x): print(f"[{time.strftime('%H:%M:%S')}] {x}", flush=True) def sse(obj): return "data: " + json.dumps(obj, ensure_ascii=False) + "\n\n" def auth_ok(req: Request): token = req.headers.get("Authorization", "").replace("Bearer ", "") return token == MASTER_API_KEY CF_AI_BASE = "https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/v1" def cf_base(account_id: str) -> str: return CF_AI_BASE.format(account_id=account_id) async def get_key(exclude=None): if exclude is None: exclude = set() await sync_key_status() all_accs = get_all_accounts() today = storage.get_today_utc() now = time.time() async with _key_lock: # Sequential: Always start from 0 and pick the first available for acc in all_accs: kid = acc["account_id"] st = key_status.get(kid) if not st: continue # Reset cooldown if expired if st["cooldown_until"] > 0 and now > st["cooldown_until"]: st["healthy"] = True st["cooldown_until"] = 0 st["fail"] = 0 usage_today = 0 if today in storage.data["usage"] and kid in storage.data["usage"][today]: usage_today = storage.data["usage"][today][kid].get("neurons", 0) if usage_today >= 9000 and st["cooldown_until"] == 0: now_utc = datetime.datetime.now(datetime.timezone.utc) tomorrow_utc = (now_utc + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) st["cooldown_until"] = tomorrow_utc.timestamp() st["healthy"] = False log(f"[INFO] Account {kid[:8]} reached 9000 neurons, explicitly cooled down until midnight.") if st["healthy"] and not st["busy"] and kid not in exclude: st["busy"] = True return acc return None async def release_key(acc): async with _key_lock: kid = acc["account_id"] if kid in key_status: key_status[kid]["busy"] = False async def mark_fail(acc, rate_limited=False): async with _key_lock: kid = acc["account_id"] if kid in key_status: key_status[kid]["fail"] += 1 if rate_limited or key_status[kid]["fail"] >= 3: key_status[kid]["healthy"] = False now_utc = datetime.datetime.now(datetime.timezone.utc) # Grace period: if Cloudflare is delayed in resetting quota at midnight UTC if rate_limited and now_utc.hour == 0 and now_utc.minute < 15: key_status[kid]["cooldown_until"] = time.time() + 300 log(f"[INFO] Account {kid[:8]} cooled down for 5 mins (Midnight grace period).") else: tomorrow_utc = (now_utc + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) key_status[kid]["cooldown_until"] = tomorrow_utc.timestamp() log(f"[INFO] Account {kid[:8]} cooled down until tomorrow (UTC midnight).") async def mark_ok(acc): async with _key_lock: kid = acc["account_id"] if kid in key_status: key_status[kid]["success"] += 1 key_status[kid]["fail"] = 0 key_status[kid]["healthy"] = True key_status[kid]["cooldown_until"] = 0 async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.05): elapsed = 0.0 while elapsed < max_wait: acc = await get_key(exclude) if acc: return acc await asyncio.sleep(interval) elapsed += interval return None def is_rate_limited_status(status_code: int) -> bool: return status_code == 429 def is_rate_limited_error_body(text: str) -> bool: t = text.lower() return "rate limit" in t or "too many requests" in t or "usage limit" in t # ===================================================== # API / UI ROUTES # ===================================================== @app.get("/", response_class=HTMLResponse) async def serve_dashboard(): try: base_dir = os.path.dirname(os.path.abspath(__file__)) file_path = os.path.join(base_dir, "dashboard.html") with open(file_path, "r", encoding="utf-8") as f: html = f.read() return HTMLResponse(content=html) except Exception as e: return HTMLResponse(content=f"Error loading dashboard: {e}", status_code=500) @app.get("/api/dashboard_data") async def get_dashboard_data(): return JSONResponse(storage.data) @app.get("/api/status") async def get_status(): await sync_key_status() async with _key_lock: safe = {} for kid, v in key_status.items(): masked = kid[:6] + "****" + kid[-4:] safe[masked] = { "index": v.get("index", 0), "healthy": v.get("healthy", True), "busy": v.get("busy", False), "success": v.get("success", 0), "fail": v.get("fail", 0), } return { "status": "ok", "accounts": len(get_all_accounts()), "env_accounts": [acc["account_id"] for acc in ENV_CF_ACCOUNTS], "default_model": DEFAULT_CF_MODEL, "detail": safe } @app.post("/api/accounts") async def add_account(req: Request): data = await req.json() if data.get("password") != ADMIN_PASSWORD: return JSONResponse({"error": "Unauthorized: Invalid password"}, status_code=401) acc_id = data.get("account_id") api_key = data.get("api_key") if not acc_id or not api_key: return JSONResponse({"error": "Missing account_id or api_key"}, status_code=400) async with storage.lock: for acc in storage.data["accounts"]: if acc["account_id"] == acc_id: return JSONResponse({"error": "Account already exists"}, status_code=400) storage.data["accounts"].append({ "account_id": acc_id.strip(), "api_key": api_key.strip() }) await storage.save() await sync_key_status() return JSONResponse({"status": "success"}) @app.delete("/api/accounts") async def delete_account(account_id: str, password: str): if password != ADMIN_PASSWORD: return JSONResponse({"error": "Unauthorized: Invalid password"}, status_code=401) async with storage.lock: original_len = len(storage.data["accounts"]) storage.data["accounts"] = [a for a in storage.data["accounts"] if a["account_id"] != account_id] if len(storage.data["accounts"]) == original_len: return JSONResponse({"error": "Account not found in JSON"}, status_code=404) await storage.save() async with _key_lock: if account_id in key_status: del key_status[account_id] return JSONResponse({"status": "success"}) @app.post("/api/settings") async def update_settings(req: Request): data = await req.json() if data.get("password") != ADMIN_PASSWORD: return JSONResponse({"error": "Unauthorized: Invalid password"}, status_code=401) new_settings = data.get("settings", {}) async with storage.lock: storage.data["settings"].update(new_settings) await storage.save() return JSONResponse({"status": "success"}) @app.get("/v1/models") async def models(req: Request): if not auth_ok(req): return JSONResponse({"error": "Unauthorized"}, status_code=401) return JSONResponse({ "object": "list", "data": [{"id": DEFAULT_CF_MODEL, "object": "model", "created": int(time.time()), "owned_by": "cloudflare"}] }) # ===================================================== # /v1/chat/completions # ===================================================== @app.post("/v1/chat/completions") async def chat(req: Request): if not auth_ok(req): return JSONResponse({"error": "Unauthorized"}, status_code=401) try: body = await req.json() except Exception: return JSONResponse({"error": "Bad JSON"}, status_code=400) is_stream = body.get("stream", False) model = body.get("model", DEFAULT_CF_MODEL) cf_body = {**body, "model": model} if is_stream and "stream_options" not in cf_body: cf_body["stream_options"] = {"include_usage": True} thinking_enabled = storage.data.get("settings", {}).get("thinking_enabled", True) all_accs = get_all_accounts() if not is_stream: tried = set() for _ in range(len(all_accs)): acc = await wait_for_free_key(exclude=tried) if not acc: break tried.add(acc["account_id"]) try: async with httpx.AsyncClient(timeout=180) as client: r = await client.post( f"{cf_base(acc['account_id'])}/chat/completions", json=cf_body, headers={"Authorization": f"Bearer {acc['api_key']}"} ) if is_rate_limited_status(r.status_code) or (r.status_code != 200 and is_rate_limited_error_body(r.text)): log(f"Account {acc['account_id'][:8]}... rate limited (non-stream)") await mark_fail(acc, rate_limited=True) continue if r.status_code != 200: if 400 <= r.status_code < 500: log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} client error") return Response(content=r.content, media_type="application/json", status_code=r.status_code) await mark_fail(acc) continue await mark_ok(acc) resp_json = r.json() usage_info = resp_json.get("usage", {}) neurons = await storage.add_usage(acc["account_id"], usage_info.get("prompt_tokens", 0), usage_info.get("completion_tokens", 0)) await storage.add_log(acc["account_id"], model, r.status_code, neurons) if not thinking_enabled and "choices" in resp_json: for choice in resp_json["choices"]: if "message" in choice and "reasoning_content" in choice["message"]: del choice["message"]["reasoning_content"] return JSONResponse(resp_json) except Exception as e: log(f"Account {acc['account_id'][:8]}... exception: {e}") await mark_fail(acc) finally: await release_key(acc) return JSONResponse({"error": {"message": "All accounts failed", "type": "api_error"}}, status_code=500) # STREAM async def gen(): consecutive_failures = 0 max_failures = max(10, len(all_accs) * 2) total_usage = {"prompt_tokens": 0, "completion_tokens": 0} while consecutive_failures < max_failures: acc = await wait_for_free_key() if not acc: yield sse({"error": {"message": "All accounts are busy, rate limited, or have reached their daily limit. Please try again later.", "type": "api_error"}}) yield "data: [DONE]\n\n" return try: async with httpx.AsyncClient(timeout=15.0) as client: async with client.stream( "POST", f"{cf_base(acc['account_id'])}/chat/completions", json=cf_body, headers={"Authorization": f"Bearer {acc['api_key']}"} ) as r: if is_rate_limited_status(r.status_code): log(f"Account {acc['account_id'][:8]}... rate limited (stream)") await mark_fail(acc, rate_limited=True) consecutive_failures += 1 continue if r.status_code != 200: if 400 <= r.status_code < 500: err_text = await r.aread() try: err_json = json.loads(err_text) if "error" not in err_json: err_json = {"error": {"message": str(err_json), "type": "api_error"}} yield sse(err_json) except: yield sse({"error": {"message": err_text.decode("utf-8", "ignore"), "type": "api_error"}}) yield "data: [DONE]\n\n" return await mark_fail(acc) consecutive_failures += 1 continue # Pass-through streaming try: async for line in r.aiter_lines(): if not line: continue if line.strip() == "data: [DONE]": yield line + "\n\n" continue raw = line[6:] if line.startswith("data: ") else line try: j = json.loads(raw) # Usage if "usage" in j and j["usage"]: total_usage["prompt_tokens"] = j["usage"].get("prompt_tokens", 0) total_usage["completion_tokens"] = j["usage"].get("completion_tokens", 0) if not thinking_enabled and "choices" in j: for c in j["choices"]: if "delta" in c and "reasoning_content" in c["delta"]: del c["delta"]["reasoning_content"] yield sse(j) except: yield line + "\n\n" await mark_ok(acc) neurons = await storage.add_usage(acc["account_id"], total_usage["prompt_tokens"], total_usage["completion_tokens"]) await storage.add_log(acc["account_id"], model, 200, neurons) return except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e: log(f"STREAM DROP: {acc['account_id'][:8]}... {e}") await mark_fail(acc, rate_limited=True) consecutive_failures += 1 continue except Exception as e: log(f"Account {acc['account_id'][:8]}... stream exception: {e}") await mark_fail(acc) consecutive_failures += 1 finally: await release_key(acc) yield sse({"error": {"message": "All accounts failed during stream", "type": "api_error"}}) yield "data: [DONE]\n\n" return StreamingResponse(gen(), media_type="text/event-stream") # ===================================================== # /v1/messages # ===================================================== @app.post("/v1/messages") async def anthropic(req: Request): return JSONResponse({"error": {"message": "Anthropic endpoint is currently disabled. Please use the OpenAI /v1/chat/completions endpoint for native Kimi tool support.", "type": "api_error"}}, status_code=400))