proxy / api /lib /proxy.js
OpenCode
feat: add Cloudflare Workers AI fallback for kimi models
3f8984a
Raw
History Blame Contribute Delete
11.3 kB
const https = require("https");
const http = require("http");
const { URL } = require("url");
const { markKeyExhausted, recordKeyError } = require("./stats.js");
const RETRYABLE_STATUSES = new Set([402, 429, 500, 502, 503, 504, 524]);
const DEFAULT_MAX_RETRIES = 55;
const BASE_DELAY_MS = 200;
const MAX_DELAY_MS = 5000;
const BACKOFF_FACTOR = 1.5;
const FETCH_TIMEOUT_MS = 120000;
const AUTO_CONTINUE_MAX = 3;
const KEEPALIVE_INTERVAL_MS = 10000;
const KIMCHI_CLI_HEADERS = {
"User-Agent": "kimchi/0.1.34",
"Accept": "application/json",
"X-Stainless-Lang": "js",
"X-Stainless-Package-Version": "5.20.0",
"X-Stainless-OS": "linux",
"X-Stainless-Arch": "x64",
"X-Stainless-Runtime": "node",
"X-Stainless-Runtime-Version": "v22.0.0",
};
function computeDelay(attempt, random = Math.random) {
const planned = Math.min(BASE_DELAY_MS * Math.pow(BACKOFF_FACTOR, attempt - 1), MAX_DELAY_MS);
return Math.max(planned * random(), 50);
}
function parseRetryAfterMs(headers) {
const header = headers["retry-after"];
if (!header) return null;
const seconds = Number(header);
if (Number.isFinite(seconds) && seconds >= 0) return seconds * 1000;
const dateMs = Date.parse(header);
if (!Number.isNaN(dateMs)) return Math.max(dateMs - Date.now(), 0);
return null;
}
function isCreditExhausted(status, body) {
if (status !== 402 && status !== 429) return false;
try {
return body.includes("exhausted") || body.includes("credit") || body.includes("quota");
} catch {
return false;
}
}
function requestUpstream(options) {
return new Promise((resolve, reject) => {
const { url, method, headers, body, timeoutMs, signal } = options;
const parsed = new URL(url);
const lib = parsed.protocol === "https:" ? https : http;
const reqHeaders = { ...headers, "Content-Length": Buffer.byteLength(body || "") };
const req = lib.request(
{
hostname: parsed.hostname,
port: parsed.port || (parsed.protocol === "https:" ? 443 : 80),
path: parsed.pathname + parsed.search,
method,
headers: reqHeaders,
timeout: timeoutMs,
},
(res) => {
const chunks = [];
res.on("data", (chunk) => chunks.push(chunk));
res.on("end", () => {
resolve({
status: res.statusCode,
headers: res.headers,
body: Buffer.concat(chunks).toString("utf-8"),
});
});
res.on("error", reject);
},
);
req.on("error", reject);
req.on("timeout", () => {
req.destroy();
reject(new Error("Request timeout"));
});
if (signal) {
if (signal.aborted) {
req.destroy();
reject(new Error("Request aborted"));
return;
}
signal.addEventListener(
"abort",
() => {
req.destroy();
reject(new Error("Request aborted"));
},
{ once: true },
);
}
if (body) req.write(body);
req.end();
});
}
function requestUpstreamStreaming(options) {
return new Promise((resolve, reject) => {
const { url, method, headers, body, signal } = options;
const parsed = new URL(url);
const lib = parsed.protocol === "https:" ? https : http;
const reqHeaders = { ...headers, "Content-Length": Buffer.byteLength(body || "") };
const req = lib.request(
{
hostname: parsed.hostname,
port: parsed.port || (parsed.protocol === "https:" ? 443 : 80),
path: parsed.pathname + parsed.search,
method,
headers: reqHeaders,
},
(res) => {
resolve({
status: res.statusCode,
headers: res.headers,
stream: res,
});
},
);
req.on("error", reject);
if (signal) {
if (signal.aborted) {
req.destroy();
reject(new Error("Request aborted"));
return;
}
signal.addEventListener(
"abort",
() => {
req.destroy();
reject(new Error("Request aborted"));
},
{ once: true },
);
}
if (body) req.write(body);
req.end();
});
}
async function proxyToKimchi(options) {
const {
upstreamUrl,
getNextKey,
requestHeaders = {},
requestBody,
method = "POST",
maxRetries: maxRetriesParam,
retry = {},
timeoutMs = FETCH_TIMEOUT_MS,
signal,
} = options;
const maxRetries = maxRetriesParam ?? retry.maxRetries ?? DEFAULT_MAX_RETRIES;
const lastError = [];
let currentKey = null;
let currentIndex = 0;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
if (signal?.aborted) {
throw new Error(`Proxy aborted after ${attempt - 1} attempts`);
}
const keyInfo = getNextKey();
currentKey = keyInfo.key;
currentIndex = keyInfo.index;
try {
const result = await requestUpstream({
url: upstreamUrl,
method,
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${currentKey}`,
...KIMCHI_CLI_HEADERS,
...requestHeaders,
"X-Proxy-Key-Index": String(currentIndex),
},
body: requestBody ? JSON.stringify(requestBody) : undefined,
timeoutMs,
signal,
});
if (!RETRYABLE_STATUSES.has(result.status)) {
return {
status: result.status,
headers: result.headers,
body: result.body,
keyIndex: currentIndex,
attempts: attempt,
};
}
if (isCreditExhausted(result.status, result.body)) {
markKeyExhausted(currentIndex);
recordKeyError(currentIndex, `HTTP ${result.status}: credits exhausted`);
lastError.push(new Error(`HTTP ${result.status}: credits exhausted (key ${currentIndex})`));
continue;
}
recordKeyError(currentIndex, `HTTP ${result.status}`);
lastError.push(new Error(`HTTP ${result.status} (key ${currentIndex})`));
if (result.status === 429) {
const retryAfterMs = parseRetryAfterMs(result.headers);
const delay = retryAfterMs !== null ? Math.max(computeDelay(attempt), retryAfterMs) : computeDelay(attempt);
await new Promise((resolve) => setTimeout(resolve, delay));
}
} catch (error) {
recordKeyError(currentIndex, error.message);
lastError.push(error instanceof Error ? error : new Error(String(error)));
if (attempt === maxRetries) {
throw new Error(
`Proxy failed after ${maxRetries} attempts: ${lastError.slice(-3).map((e) => e.message).join("; ")}`,
);
}
await new Promise((resolve) => setTimeout(resolve, computeDelay(attempt)));
}
}
throw new Error("Proxy retry loop exhausted");
}
async function proxyToKimchiStreaming(options) {
const {
upstreamUrl,
getNextKey,
requestHeaders = {},
requestBody,
method = "POST",
timeoutMs = FETCH_TIMEOUT_MS,
signal,
} = options;
let lastError = [];
for (let attempt = 1; attempt <= 3; attempt++) {
const keyInfo = getNextKey();
try {
const result = await requestUpstreamStreaming({
url: upstreamUrl,
method,
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${keyInfo.key}`,
...KIMCHI_CLI_HEADERS,
...requestHeaders,
"X-Proxy-Key-Index": String(keyInfo.index),
},
body: requestBody ? JSON.stringify(requestBody) : undefined,
signal,
});
return {
status: result.status,
headers: result.headers,
stream: result.stream,
keyIndex: keyInfo.index,
attempts: attempt,
};
} catch (error) {
lastError.push(error instanceof Error ? error : new Error(String(error)));
if (attempt < 3) {
await new Promise((resolve) => setTimeout(resolve, 200));
}
}
}
throw new Error(`Streaming proxy failed: ${lastError.map((e) => e.message).join("; ")}`);
}
function writeResponse(clientRes, result) {
const skipHeaders = new Set(["transfer-encoding", "connection", "content-length"]);
for (const [key, value] of Object.entries(result.headers)) {
if (!skipHeaders.has(key.toLowerCase())) {
clientRes.setHeader(key, String(value));
}
}
const body = result.body || "";
clientRes.setHeader("Content-Length", Buffer.byteLength(body));
clientRes.status(result.status);
clientRes.end(body);
}
function streamResponse(clientRes, result, options = {}) {
const skipHeaders = new Set(["transfer-encoding", "connection", "content-length"]);
for (const [key, value] of Object.entries(result.headers)) {
if (!skipHeaders.has(key.toLowerCase())) {
clientRes.setHeader(key, String(value));
}
}
clientRes.setHeader("Content-Type", "text/event-stream");
clientRes.setHeader("Cache-Control", "no-cache");
clientRes.setHeader("Connection", "keepalive");
clientRes.setHeader("X-Accel-Buffering", "no");
clientRes.status(result.status);
const stream = result.stream;
let done = false;
let lastDataTime = Date.now();
let buffer = "";
let hasContent = false;
let hasReasoning = false;
const keepalive = setInterval(() => {
if (!done && Date.now() - lastDataTime > KEEPALIVE_INTERVAL_MS) {
try {
clientRes.write(": keepalive\n\n");
lastDataTime = Date.now();
} catch {}
}
}, 5000);
stream.on("data", (chunk) => {
lastDataTime = Date.now();
const text = chunk.toString("utf-8");
buffer += text;
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data === "[DONE]") {
done = true;
try {
clientRes.write(`data: [DONE]\n\n`);
} catch {}
} else {
try {
const parsed = JSON.parse(data);
if (parsed.choices && parsed.choices[0]) {
const delta = parsed.choices[0].delta;
if (delta && delta.reasoning_content !== undefined) {
hasReasoning = true;
}
if (delta && delta.content !== undefined) {
hasContent = true;
}
}
} catch {}
try {
clientRes.write(`data: ${data}\n\n`);
} catch {}
}
} else if (line.trim() === "") {
// empty line, ignore
} else if (line.startsWith(":")) {
// SSE comment, ignore
}
}
});
stream.on("end", () => {
clearInterval(keepalive);
done = true;
if (!clientRes.writableEnded) {
try {
clientRes.end();
} catch {}
}
});
stream.on("error", (err) => {
clearInterval(keepalive);
if (!clientRes.writableEnded) {
try {
clientRes.end();
} catch {}
}
});
stream.on("close", () => {
clearInterval(keepalive);
if (!clientRes.writableEnded) {
try {
clientRes.end();
} catch {}
}
});
}
module.exports = { proxyToKimchi, proxyToKimchiStreaming, requestUpstream, requestUpstreamStreaming, writeResponse, streamResponse, KIMCHI_CLI_HEADERS, FETCH_TIMEOUT_MS };