proxycf / proxy_cf.py
arfandi7322's picture
fix(proxy): architectural overhaul for timeouts, 9k limits, and infinite hang prevention
6149ef7
Raw
History Blame Contribute Delete
21.2 kB
import os
import json
import time
import datetime
import uuid
import asyncio
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, Response, StreamingResponse, HTMLResponse
from starlette.requests import ClientDisconnect
app = FastAPI()
# =====================================================
# CONFIG
# =====================================================
MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla")
ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin")
DEFAULT_CF_MODEL = os.getenv("DEFAULT_CF_MODEL", "@cf/moonshotai/kimi-k2.6")
# =====================================================
# STORAGE MANAGER
# =====================================================
class StorageManager:
def __init__(self, filepath="data.json"):
self.filepath = filepath
self.lock = asyncio.Lock()
self.data = {
"accounts": [],
"usage": {}, # "YYYY-MM-DD": {"account_id": {"neurons": 0, "input": 0, "output": 0}}
"settings": {"thinking_enabled": True},
"logs": []
}
self._load_sync()
def _load_sync(self):
if os.path.exists(self.filepath):
try:
with open(self.filepath, "r", encoding="utf-8") as f:
self.data = json.load(f)
except Exception as e:
print(f"[WARN] Failed to load {self.filepath}: {e}")
if "accounts" not in self.data: self.data["accounts"] = []
if "usage" not in self.data: self.data["usage"] = {}
if "settings" not in self.data: self.data["settings"] = {"thinking_enabled": True}
if "logs" not in self.data: self.data["logs"] = []
async def save(self):
async with self.lock:
try:
with open(self.filepath, "w", encoding="utf-8") as f:
json.dump(self.data, f, indent=2)
except Exception as e:
print(f"[WARN] Failed to save {self.filepath}: {e}")
def get_today_utc(self):
return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d")
async def add_usage(self, account_id: str, input_tokens: int, output_tokens: int):
today = self.get_today_utc()
neurons_used = (input_tokens / 1_000_000.0 * 86364) + (output_tokens / 1_000_000.0 * 363636)
async with self.lock:
if today not in self.data["usage"]:
self.data["usage"][today] = {}
if account_id not in self.data["usage"][today]:
self.data["usage"][today][account_id] = {"neurons": 0, "input": 0, "output": 0}
self.data["usage"][today][account_id]["neurons"] += neurons_used
self.data["usage"][today][account_id]["input"] += input_tokens
self.data["usage"][today][account_id]["output"] += output_tokens
asyncio.create_task(self.save())
return neurons_used
async def add_log(self, account_id: str, model: str, status: int, neurons: float):
log_entry = {
"timestamp": int(time.time()),
"account_id": account_id,
"model": model,
"status": status,
"neurons": neurons
}
async with self.lock:
self.data["logs"].append(log_entry)
if len(self.data["logs"]) > 200:
self.data["logs"] = self.data["logs"][-200:]
asyncio.create_task(self.save())
storage = StorageManager()
# =====================================================
# LOAD CF CREDENTIALS
# =====================================================
ENV_CF_ACCOUNTS = [] # list of {"account_id": ..., "api_key": ...}
for i in range(1, 101):
raw = os.getenv(f"CF_{i}")
if not raw: continue
parts = raw.split(",", 1)
if len(parts) != 2: continue
account_id, api_key = parts[0].strip(), parts[1].strip()
if account_id and api_key:
ENV_CF_ACCOUNTS.append({"account_id": account_id, "api_key": api_key})
# =====================================================
# KEY STATUS
# =====================================================
key_status = {}
_key_lock = asyncio.Lock()
def get_all_accounts():
accounts = {}
for acc in ENV_CF_ACCOUNTS:
accounts[acc["account_id"]] = {"account_id": acc["account_id"], "api_key": acc["api_key"], "is_env": True}
for acc in storage.data.get("accounts", []):
accounts[acc["account_id"]] = {"account_id": acc["account_id"], "api_key": acc["api_key"], "is_env": False}
if not accounts:
return [{"account_id": "dummy", "api_key": "dummy", "is_env": True}]
return list(accounts.values())
async def sync_key_status():
all_accs = get_all_accounts()
async with _key_lock:
for idx, acc in enumerate(all_accs, 1):
kid = acc["account_id"]
if kid not in key_status:
key_status[kid] = {
"index": idx,
"healthy": True,
"busy": False,
"success": 0,
"fail": 0,
"cooldown_until": 0
}
@app.on_event("startup")
async def startup_event():
await sync_key_status()
# =====================================================
# HELPERS
# =====================================================
def log(x):
print(f"[{time.strftime('%H:%M:%S')}] {x}", flush=True)
def sse(obj):
return "data: " + json.dumps(obj, ensure_ascii=False) + "\n\n"
def auth_ok(req: Request):
token = req.headers.get("Authorization", "").replace("Bearer ", "")
return token == MASTER_API_KEY
CF_AI_BASE = "https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/v1"
def cf_base(account_id: str) -> str:
return CF_AI_BASE.format(account_id=account_id)
async def get_key(exclude=None):
if exclude is None:
exclude = set()
await sync_key_status()
all_accs = get_all_accounts()
today = storage.get_today_utc()
now = time.time()
async with _key_lock:
# Sequential: Always start from 0 and pick the first available
for acc in all_accs:
kid = acc["account_id"]
st = key_status.get(kid)
if not st: continue
# Reset cooldown if expired
if st["cooldown_until"] > 0 and now > st["cooldown_until"]:
st["healthy"] = True
st["cooldown_until"] = 0
st["fail"] = 0
usage_today = 0
if today in storage.data["usage"] and kid in storage.data["usage"][today]:
usage_today = storage.data["usage"][today][kid].get("neurons", 0)
if usage_today >= 9000 and st["cooldown_until"] == 0:
now_utc = datetime.datetime.now(datetime.timezone.utc)
tomorrow_utc = (now_utc + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
st["cooldown_until"] = tomorrow_utc.timestamp()
st["healthy"] = False
log(f"[INFO] Account {kid[:8]} reached 9000 neurons, explicitly cooled down until midnight.")
if st["healthy"] and not st["busy"] and kid not in exclude:
st["busy"] = True
return acc
return None
async def release_key(acc):
async with _key_lock:
kid = acc["account_id"]
if kid in key_status:
key_status[kid]["busy"] = False
async def mark_fail(acc, rate_limited=False):
async with _key_lock:
kid = acc["account_id"]
if kid in key_status:
key_status[kid]["fail"] += 1
if rate_limited or key_status[kid]["fail"] >= 3:
key_status[kid]["healthy"] = False
now_utc = datetime.datetime.now(datetime.timezone.utc)
# Grace period: if Cloudflare is delayed in resetting quota at midnight UTC
if rate_limited and now_utc.hour == 0 and now_utc.minute < 15:
key_status[kid]["cooldown_until"] = time.time() + 300
log(f"[INFO] Account {kid[:8]} cooled down for 5 mins (Midnight grace period).")
else:
tomorrow_utc = (now_utc + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
key_status[kid]["cooldown_until"] = tomorrow_utc.timestamp()
log(f"[INFO] Account {kid[:8]} cooled down until tomorrow (UTC midnight).")
async def mark_ok(acc):
async with _key_lock:
kid = acc["account_id"]
if kid in key_status:
key_status[kid]["success"] += 1
key_status[kid]["fail"] = 0
key_status[kid]["healthy"] = True
key_status[kid]["cooldown_until"] = 0
async def wait_for_free_key(exclude=None, max_wait=30.0, interval=0.05):
elapsed = 0.0
while elapsed < max_wait:
acc = await get_key(exclude)
if acc: return acc
await asyncio.sleep(interval)
elapsed += interval
return None
def is_rate_limited_status(status_code: int) -> bool:
return status_code == 429
def is_rate_limited_error_body(text: str) -> bool:
t = text.lower()
return "rate limit" in t or "too many requests" in t or "usage limit" in t
# =====================================================
# API / UI ROUTES
# =====================================================
@app.get("/", response_class=HTMLResponse)
async def serve_dashboard():
try:
base_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(base_dir, "dashboard.html")
with open(file_path, "r", encoding="utf-8") as f:
html = f.read()
return HTMLResponse(content=html)
except Exception as e:
return HTMLResponse(content=f"Error loading dashboard: {e}", status_code=500)
@app.get("/api/dashboard_data")
async def get_dashboard_data():
return JSONResponse(storage.data)
@app.get("/api/status")
async def get_status():
await sync_key_status()
async with _key_lock:
safe = {}
for kid, v in key_status.items():
masked = kid[:6] + "****" + kid[-4:]
safe[masked] = {
"index": v.get("index", 0),
"healthy": v.get("healthy", True),
"busy": v.get("busy", False),
"success": v.get("success", 0),
"fail": v.get("fail", 0),
}
return {
"status": "ok",
"accounts": len(get_all_accounts()),
"env_accounts": [acc["account_id"] for acc in ENV_CF_ACCOUNTS],
"default_model": DEFAULT_CF_MODEL,
"detail": safe
}
@app.post("/api/accounts")
async def add_account(req: Request):
data = await req.json()
if data.get("password") != ADMIN_PASSWORD:
return JSONResponse({"error": "Unauthorized: Invalid password"}, status_code=401)
acc_id = data.get("account_id")
api_key = data.get("api_key")
if not acc_id or not api_key:
return JSONResponse({"error": "Missing account_id or api_key"}, status_code=400)
async with storage.lock:
for acc in storage.data["accounts"]:
if acc["account_id"] == acc_id:
return JSONResponse({"error": "Account already exists"}, status_code=400)
storage.data["accounts"].append({
"account_id": acc_id.strip(),
"api_key": api_key.strip()
})
await storage.save()
await sync_key_status()
return JSONResponse({"status": "success"})
@app.delete("/api/accounts")
async def delete_account(account_id: str, password: str):
if password != ADMIN_PASSWORD:
return JSONResponse({"error": "Unauthorized: Invalid password"}, status_code=401)
async with storage.lock:
original_len = len(storage.data["accounts"])
storage.data["accounts"] = [a for a in storage.data["accounts"] if a["account_id"] != account_id]
if len(storage.data["accounts"]) == original_len:
return JSONResponse({"error": "Account not found in JSON"}, status_code=404)
await storage.save()
async with _key_lock:
if account_id in key_status:
del key_status[account_id]
return JSONResponse({"status": "success"})
@app.post("/api/settings")
async def update_settings(req: Request):
data = await req.json()
if data.get("password") != ADMIN_PASSWORD:
return JSONResponse({"error": "Unauthorized: Invalid password"}, status_code=401)
new_settings = data.get("settings", {})
async with storage.lock:
storage.data["settings"].update(new_settings)
await storage.save()
return JSONResponse({"status": "success"})
@app.get("/v1/models")
async def models(req: Request):
if not auth_ok(req): return JSONResponse({"error": "Unauthorized"}, status_code=401)
return JSONResponse({
"object": "list",
"data": [{"id": DEFAULT_CF_MODEL, "object": "model", "created": int(time.time()), "owned_by": "cloudflare"}]
})
# =====================================================
# /v1/chat/completions
# =====================================================
@app.post("/v1/chat/completions")
async def chat(req: Request):
if not auth_ok(req):
return JSONResponse({"error": "Unauthorized"}, status_code=401)
try:
body = await req.json()
except Exception:
return JSONResponse({"error": "Bad JSON"}, status_code=400)
is_stream = body.get("stream", False)
model = body.get("model", DEFAULT_CF_MODEL)
cf_body = {**body, "model": model}
if is_stream and "stream_options" not in cf_body:
cf_body["stream_options"] = {"include_usage": True}
thinking_enabled = storage.data.get("settings", {}).get("thinking_enabled", True)
all_accs = get_all_accounts()
if not is_stream:
tried = set()
for _ in range(len(all_accs)):
acc = await wait_for_free_key(exclude=tried)
if not acc: break
tried.add(acc["account_id"])
try:
async with httpx.AsyncClient(timeout=180) as client:
r = await client.post(
f"{cf_base(acc['account_id'])}/chat/completions",
json=cf_body,
headers={"Authorization": f"Bearer {acc['api_key']}"}
)
if is_rate_limited_status(r.status_code) or (r.status_code != 200 and is_rate_limited_error_body(r.text)):
log(f"Account {acc['account_id'][:8]}... rate limited (non-stream)")
await mark_fail(acc, rate_limited=True)
continue
if r.status_code != 200:
if 400 <= r.status_code < 500:
log(f"Account {acc['account_id'][:8]}... HTTP {r.status_code} client error")
return Response(content=r.content, media_type="application/json", status_code=r.status_code)
await mark_fail(acc)
continue
await mark_ok(acc)
resp_json = r.json()
usage_info = resp_json.get("usage", {})
neurons = await storage.add_usage(acc["account_id"], usage_info.get("prompt_tokens", 0), usage_info.get("completion_tokens", 0))
await storage.add_log(acc["account_id"], model, r.status_code, neurons)
if not thinking_enabled and "choices" in resp_json:
for choice in resp_json["choices"]:
if "message" in choice and "reasoning_content" in choice["message"]:
del choice["message"]["reasoning_content"]
return JSONResponse(resp_json)
except Exception as e:
log(f"Account {acc['account_id'][:8]}... exception: {e}")
await mark_fail(acc)
finally:
await release_key(acc)
return JSONResponse({"error": {"message": "All accounts failed", "type": "api_error"}}, status_code=500)
# STREAM
async def gen():
consecutive_failures = 0
max_failures = max(10, len(all_accs) * 2)
total_usage = {"prompt_tokens": 0, "completion_tokens": 0}
while consecutive_failures < max_failures:
acc = await wait_for_free_key()
if not acc:
yield sse({"error": {"message": "All accounts are busy, rate limited, or have reached their daily limit. Please try again later.", "type": "api_error"}})
yield "data: [DONE]\n\n"
return
try:
async with httpx.AsyncClient(timeout=15.0) as client:
async with client.stream(
"POST",
f"{cf_base(acc['account_id'])}/chat/completions",
json=cf_body,
headers={"Authorization": f"Bearer {acc['api_key']}"}
) as r:
if is_rate_limited_status(r.status_code):
log(f"Account {acc['account_id'][:8]}... rate limited (stream)")
await mark_fail(acc, rate_limited=True)
consecutive_failures += 1
continue
if r.status_code != 200:
if 400 <= r.status_code < 500:
err_text = await r.aread()
try:
err_json = json.loads(err_text)
if "error" not in err_json: err_json = {"error": {"message": str(err_json), "type": "api_error"}}
yield sse(err_json)
except:
yield sse({"error": {"message": err_text.decode("utf-8", "ignore"), "type": "api_error"}})
yield "data: [DONE]\n\n"
return
await mark_fail(acc)
consecutive_failures += 1
continue
# Pass-through streaming
try:
async for line in r.aiter_lines():
if not line: continue
if line.strip() == "data: [DONE]":
yield line + "\n\n"
continue
raw = line[6:] if line.startswith("data: ") else line
try:
j = json.loads(raw)
# Usage
if "usage" in j and j["usage"]:
total_usage["prompt_tokens"] = j["usage"].get("prompt_tokens", 0)
total_usage["completion_tokens"] = j["usage"].get("completion_tokens", 0)
if not thinking_enabled and "choices" in j:
for c in j["choices"]:
if "delta" in c and "reasoning_content" in c["delta"]:
del c["delta"]["reasoning_content"]
yield sse(j)
except:
yield line + "\n\n"
await mark_ok(acc)
neurons = await storage.add_usage(acc["account_id"], total_usage["prompt_tokens"], total_usage["completion_tokens"])
await storage.add_log(acc["account_id"], model, 200, neurons)
return
except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e:
log(f"STREAM DROP: {acc['account_id'][:8]}... {e}")
await mark_fail(acc, rate_limited=True)
consecutive_failures += 1
continue
except Exception as e:
log(f"Account {acc['account_id'][:8]}... stream exception: {e}")
await mark_fail(acc)
consecutive_failures += 1
finally:
await release_key(acc)
yield sse({"error": {"message": "All accounts failed during stream", "type": "api_error"}})
yield "data: [DONE]\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
# =====================================================
# /v1/messages
# =====================================================
@app.post("/v1/messages")
async def anthropic(req: Request):
return JSONResponse({"error": {"message": "Anthropic endpoint is currently disabled. Please use the OpenAI /v1/chat/completions endpoint for native Kimi tool support.", "type": "api_error"}}, status_code=400))