doatlas-2 / artifacts /api-server /src /llm /codex-cli.ts
Iostream-Li's picture
Add files using upload-large-folder tool
6d1fe92 verified
/**
* codex-cli adapter — bridge to a user-installed local code helper
* binary (e.g. OpenAI's `codex` CLI or any other shell-invoked code
* assistant) so the "DoAtlas Code" model can stream through the same
* gateway / SSE pipeline as the cloud providers.
*
* Wire protocol (newline-delimited JSON, both directions):
*
* stdin (single JSON object, then EOF):
* {
* "model": "<upstreamModel>",
* "messages": [{ role, content, tool_call_id?, tool_calls? }, ...],
* "tools": [{ name, description, parameters }, ...],
* "temperature"?: number,
* "max_tokens"?: number
* }
*
* stdout (one JSON event per line):
* { "type": "text_delta", "text": "..." }
* { "type": "reasoning_delta", "text": "..." }
* { "type": "tool_call", "id": "...", "name": "...", "arguments": {...} }
* { "type": "usage", "input_tokens": N, "output_tokens": N }
* { "type": "done", "stop_reason": "end_turn" | "tool_use" | ... }
* { "type": "error", "message": "...", "code"?: "..." }
*
* The binary is configured via env:
* CODEX_CLI_BIN — absolute path or PATH-resolvable command (required)
* CODEX_CLI_ARGS — optional space-separated extra args
* CODEX_CLI_CWD — optional working directory (default: process.cwd())
*
* Anything the binary writes to stderr is forwarded to the server log
* but never bleeds into the user-visible answer.
*/
import { spawn } from "node:child_process";
import { logger } from "../lib/logger";
import { withGateway } from "./gateway";
import type { LlmAdapter, LlmStreamOptions, StreamChunk } from "./types";
interface CodexCliConfig {
bin: string;
args: string[];
cwd: string;
}
/**
* Carries a normalised provider error code alongside the human-readable
* message so the gateway can forward both to the SSE stream instead of
* collapsing every codex-cli failure into a generic `unknown_error`.
*/
class CodexCliError extends Error {
readonly providerErrorCode: string;
constructor(message: string, providerErrorCode: string) {
super(message);
this.name = "CodexCliError";
this.providerErrorCode = providerErrorCode;
}
}
interface NodeSpawnError extends Error {
code?: string;
syscall?: string;
path?: string;
}
function mapSpawnError(err: NodeSpawnError, bin: string): CodexCliError {
const code = err.code;
if (code === "ENOENT") {
return new CodexCliError(
`DoAtlas Code helper binary not found at "${bin}". ` +
`Set CODEX_CLI_BIN to the path of an installed local code helper.`,
"codex_cli_not_found",
);
}
if (code === "EACCES" || code === "EPERM") {
return new CodexCliError(
`Permission denied executing DoAtlas Code helper at "${bin}". ` +
`Make sure the file is executable (chmod +x).`,
"codex_cli_permission_denied",
);
}
return new CodexCliError(
`Failed to launch DoAtlas Code helper at "${bin}": ${err.message}`,
"codex_cli_spawn_failed",
);
}
function readConfig(): CodexCliConfig {
const bin = process.env["CODEX_CLI_BIN"];
if (!bin) {
throw new Error(
"DoAtlas Code not configured: set CODEX_CLI_BIN to the local code helper binary",
);
}
const rawArgs = (process.env["CODEX_CLI_ARGS"] || "").trim();
const args = rawArgs ? rawArgs.split(/\s+/) : [];
const cwd = process.env["CODEX_CLI_CWD"] || process.cwd();
return { bin, args, cwd };
}
interface RawEvent {
type?: string;
text?: string;
id?: string;
name?: string;
arguments?: Record<string, unknown>;
input_tokens?: number;
output_tokens?: number;
stop_reason?: string;
message?: string;
code?: string;
}
function toChunk(ev: RawEvent): StreamChunk | null {
switch (ev.type) {
case "text_delta":
return { type: "text_delta", text: ev.text || "" };
case "reasoning_delta":
return { type: "reasoning_delta", text: ev.text || "" };
case "tool_call":
if (!ev.id || !ev.name) return null;
return {
type: "tool_call",
toolCallId: ev.id,
toolName: ev.name,
toolArguments: ev.arguments ?? {},
};
case "usage":
return {
type: "usage",
inputTokens: ev.input_tokens ?? 0,
outputTokens: ev.output_tokens ?? 0,
};
case "done":
return { type: "done", stopReason: ev.stop_reason };
case "error":
return {
type: "error",
message: ev.message || "codex-cli error",
providerErrorCode: ev.code,
};
default:
return null;
}
}
async function* runCodexCli(
cfg: CodexCliConfig,
opts: LlmStreamOptions,
signal: AbortSignal,
): AsyncIterable<StreamChunk> {
const child = spawn(cfg.bin, cfg.args, {
cwd: cfg.cwd,
stdio: ["pipe", "pipe", "pipe"],
env: process.env,
});
let spawned = false;
child.once("spawn", () => {
spawned = true;
});
// Forward upstream aborts as SIGTERM.
const onAbort = () => {
if (!child.killed) child.kill("SIGTERM");
};
if (signal.aborted) onAbort();
else signal.addEventListener("abort", onAbort, { once: true });
// Stderr is captured for the server log AND mirrored into the
// user-facing failure message when the helper exits non-zero — that
// tail is usually the only clue the operator has about what went
// wrong (missing model, bad config, etc.).
let stderrBuf = "";
child.stderr.setEncoding("utf8");
child.stderr.on("data", (d: string) => {
stderrBuf += d;
if (stderrBuf.length > 8192) {
stderrBuf = stderrBuf.slice(-8192);
}
});
// Backpressure-friendly stdout queue.
type Item = { line?: string; end?: true; err?: CodexCliError };
const queue: Item[] = [];
let waiter: ((it: Item) => void) | null = null;
const push = (it: Item) => {
if (waiter) {
const w = waiter;
waiter = null;
w(it);
} else {
queue.push(it);
}
};
const next = (): Promise<Item> =>
queue.length > 0
? Promise.resolve(queue.shift()!)
: new Promise<Item>((r) => {
waiter = r;
});
// The child's stdout `end` and process `close` events fire in
// unspecified order; we must wait for *both* before declaring the
// request truly finished, otherwise an early `end` would race past a
// non-zero exit code and the consumer would never see the error.
let stdoutEnded = false;
let childClosed = false;
let pendingErr: CodexCliError | null = null;
const maybeFinish = () => {
if (!stdoutEnded || !childClosed) return;
if (pendingErr) push({ err: pendingErr });
push({ end: true });
};
let buf = "";
child.stdout.setEncoding("utf8");
child.stdout.on("data", (d: string) => {
buf += d;
let idx: number;
while ((idx = buf.indexOf("\n")) >= 0) {
const line = buf.slice(0, idx).trim();
buf = buf.slice(idx + 1);
if (line) push({ line });
}
});
child.stdout.on("end", () => {
const tail = buf.trim();
buf = "";
if (tail) push({ line: tail });
stdoutEnded = true;
maybeFinish();
});
child.on("error", (err) => {
pendingErr = mapSpawnError(err as NodeSpawnError, cfg.bin);
stdoutEnded = true;
childClosed = true;
maybeFinish();
});
child.on("close", (code, sigSignal) => {
if (!pendingErr && code != null && code !== 0) {
const tail = stderrBuf.trim().slice(-500);
const detail = tail ? `: ${tail}` : "";
pendingErr = new CodexCliError(
`DoAtlas Code helper exited with code ${code}${detail}`,
spawned ? "codex_cli_exited" : "codex_cli_spawn_failed",
);
} else if (!pendingErr && code == null && sigSignal && sigSignal !== "SIGTERM") {
const tail = stderrBuf.trim().slice(-500);
const detail = tail ? `: ${tail}` : "";
pendingErr = new CodexCliError(
`DoAtlas Code helper terminated by signal ${sigSignal}${detail}`,
"codex_cli_exited",
);
}
childClosed = true;
maybeFinish();
});
// Send the request payload, then close stdin.
const payload = JSON.stringify({
model: opts.model,
messages: opts.messages,
tools: opts.tools ?? [],
temperature: opts.temperature,
max_tokens: opts.maxTokens,
});
// Swallow stdin errors here — when spawn fails (ENOENT / EACCES) the
// stdin pipe also tears down, but the structured `child.on("error")`
// handler above is the source of truth for the user-facing message.
child.stdin.on("error", () => {
/* surfaced via child.on("error") */
});
try {
child.stdin.write(payload + "\n");
child.stdin.end();
} catch {
/* swallowed; child.on("error") will surface the real cause */
}
let sawDone = false;
let sawUsage = false;
try {
while (true) {
const it = await next();
if (it.err) throw it.err;
if (it.end) break;
if (!it.line) continue;
let ev: RawEvent;
try {
ev = JSON.parse(it.line) as RawEvent;
} catch {
// Treat non-JSON lines as plain text deltas so a misbehaving
// helper still produces *something* the user can read instead
// of a silent hang.
yield { type: "text_delta", text: it.line };
continue;
}
const chunk = toChunk(ev);
if (!chunk) continue;
if (chunk.type === "usage") sawUsage = true;
if (chunk.type === "done") sawDone = true;
yield chunk;
}
} finally {
if (!child.killed) child.kill("SIGTERM");
}
if (stderrBuf.trim()) {
logger.warn(
{ adapter: "codex-cli", stderr: stderrBuf.trim().slice(0, 1000) },
"codex-cli stderr",
);
}
if (!sawUsage) {
yield { type: "usage", inputTokens: 0, outputTokens: 0 };
}
if (!sawDone) {
yield { type: "done", stopReason: "end_turn" };
}
}
export const codexCliAdapter: LlmAdapter = {
name: "codex-cli",
stream(opts: LlmStreamOptions): AsyncIterable<StreamChunk> {
const cfg = readConfig();
return withGateway(
{ name: "codex-cli", source: "byok-direct", upstream: cfg.bin, retries: 0 },
(signal) => runCodexCli(cfg, opts, signal),
);
},
};