/** * codex-cli adapter — bridge to a user-installed local code helper * binary (e.g. OpenAI's `codex` CLI or any other shell-invoked code * assistant) so the "DoAtlas Code" model can stream through the same * gateway / SSE pipeline as the cloud providers. * * Wire protocol (newline-delimited JSON, both directions): * * stdin (single JSON object, then EOF): * { * "model": "", * "messages": [{ role, content, tool_call_id?, tool_calls? }, ...], * "tools": [{ name, description, parameters }, ...], * "temperature"?: number, * "max_tokens"?: number * } * * stdout (one JSON event per line): * { "type": "text_delta", "text": "..." } * { "type": "reasoning_delta", "text": "..." } * { "type": "tool_call", "id": "...", "name": "...", "arguments": {...} } * { "type": "usage", "input_tokens": N, "output_tokens": N } * { "type": "done", "stop_reason": "end_turn" | "tool_use" | ... } * { "type": "error", "message": "...", "code"?: "..." } * * The binary is configured via env: * CODEX_CLI_BIN — absolute path or PATH-resolvable command (required) * CODEX_CLI_ARGS — optional space-separated extra args * CODEX_CLI_CWD — optional working directory (default: process.cwd()) * * Anything the binary writes to stderr is forwarded to the server log * but never bleeds into the user-visible answer. */ import { spawn } from "node:child_process"; import { logger } from "../lib/logger"; import { withGateway } from "./gateway"; import type { LlmAdapter, LlmStreamOptions, StreamChunk } from "./types"; interface CodexCliConfig { bin: string; args: string[]; cwd: string; } /** * Carries a normalised provider error code alongside the human-readable * message so the gateway can forward both to the SSE stream instead of * collapsing every codex-cli failure into a generic `unknown_error`. */ class CodexCliError extends Error { readonly providerErrorCode: string; constructor(message: string, providerErrorCode: string) { super(message); this.name = "CodexCliError"; this.providerErrorCode = providerErrorCode; } } interface NodeSpawnError extends Error { code?: string; syscall?: string; path?: string; } function mapSpawnError(err: NodeSpawnError, bin: string): CodexCliError { const code = err.code; if (code === "ENOENT") { return new CodexCliError( `DoAtlas Code helper binary not found at "${bin}". ` + `Set CODEX_CLI_BIN to the path of an installed local code helper.`, "codex_cli_not_found", ); } if (code === "EACCES" || code === "EPERM") { return new CodexCliError( `Permission denied executing DoAtlas Code helper at "${bin}". ` + `Make sure the file is executable (chmod +x).`, "codex_cli_permission_denied", ); } return new CodexCliError( `Failed to launch DoAtlas Code helper at "${bin}": ${err.message}`, "codex_cli_spawn_failed", ); } function readConfig(): CodexCliConfig { const bin = process.env["CODEX_CLI_BIN"]; if (!bin) { throw new Error( "DoAtlas Code not configured: set CODEX_CLI_BIN to the local code helper binary", ); } const rawArgs = (process.env["CODEX_CLI_ARGS"] || "").trim(); const args = rawArgs ? rawArgs.split(/\s+/) : []; const cwd = process.env["CODEX_CLI_CWD"] || process.cwd(); return { bin, args, cwd }; } interface RawEvent { type?: string; text?: string; id?: string; name?: string; arguments?: Record; input_tokens?: number; output_tokens?: number; stop_reason?: string; message?: string; code?: string; } function toChunk(ev: RawEvent): StreamChunk | null { switch (ev.type) { case "text_delta": return { type: "text_delta", text: ev.text || "" }; case "reasoning_delta": return { type: "reasoning_delta", text: ev.text || "" }; case "tool_call": if (!ev.id || !ev.name) return null; return { type: "tool_call", toolCallId: ev.id, toolName: ev.name, toolArguments: ev.arguments ?? {}, }; case "usage": return { type: "usage", inputTokens: ev.input_tokens ?? 0, outputTokens: ev.output_tokens ?? 0, }; case "done": return { type: "done", stopReason: ev.stop_reason }; case "error": return { type: "error", message: ev.message || "codex-cli error", providerErrorCode: ev.code, }; default: return null; } } async function* runCodexCli( cfg: CodexCliConfig, opts: LlmStreamOptions, signal: AbortSignal, ): AsyncIterable { const child = spawn(cfg.bin, cfg.args, { cwd: cfg.cwd, stdio: ["pipe", "pipe", "pipe"], env: process.env, }); let spawned = false; child.once("spawn", () => { spawned = true; }); // Forward upstream aborts as SIGTERM. const onAbort = () => { if (!child.killed) child.kill("SIGTERM"); }; if (signal.aborted) onAbort(); else signal.addEventListener("abort", onAbort, { once: true }); // Stderr is captured for the server log AND mirrored into the // user-facing failure message when the helper exits non-zero — that // tail is usually the only clue the operator has about what went // wrong (missing model, bad config, etc.). let stderrBuf = ""; child.stderr.setEncoding("utf8"); child.stderr.on("data", (d: string) => { stderrBuf += d; if (stderrBuf.length > 8192) { stderrBuf = stderrBuf.slice(-8192); } }); // Backpressure-friendly stdout queue. type Item = { line?: string; end?: true; err?: CodexCliError }; const queue: Item[] = []; let waiter: ((it: Item) => void) | null = null; const push = (it: Item) => { if (waiter) { const w = waiter; waiter = null; w(it); } else { queue.push(it); } }; const next = (): Promise => queue.length > 0 ? Promise.resolve(queue.shift()!) : new Promise((r) => { waiter = r; }); // The child's stdout `end` and process `close` events fire in // unspecified order; we must wait for *both* before declaring the // request truly finished, otherwise an early `end` would race past a // non-zero exit code and the consumer would never see the error. let stdoutEnded = false; let childClosed = false; let pendingErr: CodexCliError | null = null; const maybeFinish = () => { if (!stdoutEnded || !childClosed) return; if (pendingErr) push({ err: pendingErr }); push({ end: true }); }; let buf = ""; child.stdout.setEncoding("utf8"); child.stdout.on("data", (d: string) => { buf += d; let idx: number; while ((idx = buf.indexOf("\n")) >= 0) { const line = buf.slice(0, idx).trim(); buf = buf.slice(idx + 1); if (line) push({ line }); } }); child.stdout.on("end", () => { const tail = buf.trim(); buf = ""; if (tail) push({ line: tail }); stdoutEnded = true; maybeFinish(); }); child.on("error", (err) => { pendingErr = mapSpawnError(err as NodeSpawnError, cfg.bin); stdoutEnded = true; childClosed = true; maybeFinish(); }); child.on("close", (code, sigSignal) => { if (!pendingErr && code != null && code !== 0) { const tail = stderrBuf.trim().slice(-500); const detail = tail ? `: ${tail}` : ""; pendingErr = new CodexCliError( `DoAtlas Code helper exited with code ${code}${detail}`, spawned ? "codex_cli_exited" : "codex_cli_spawn_failed", ); } else if (!pendingErr && code == null && sigSignal && sigSignal !== "SIGTERM") { const tail = stderrBuf.trim().slice(-500); const detail = tail ? `: ${tail}` : ""; pendingErr = new CodexCliError( `DoAtlas Code helper terminated by signal ${sigSignal}${detail}`, "codex_cli_exited", ); } childClosed = true; maybeFinish(); }); // Send the request payload, then close stdin. const payload = JSON.stringify({ model: opts.model, messages: opts.messages, tools: opts.tools ?? [], temperature: opts.temperature, max_tokens: opts.maxTokens, }); // Swallow stdin errors here — when spawn fails (ENOENT / EACCES) the // stdin pipe also tears down, but the structured `child.on("error")` // handler above is the source of truth for the user-facing message. child.stdin.on("error", () => { /* surfaced via child.on("error") */ }); try { child.stdin.write(payload + "\n"); child.stdin.end(); } catch { /* swallowed; child.on("error") will surface the real cause */ } let sawDone = false; let sawUsage = false; try { while (true) { const it = await next(); if (it.err) throw it.err; if (it.end) break; if (!it.line) continue; let ev: RawEvent; try { ev = JSON.parse(it.line) as RawEvent; } catch { // Treat non-JSON lines as plain text deltas so a misbehaving // helper still produces *something* the user can read instead // of a silent hang. yield { type: "text_delta", text: it.line }; continue; } const chunk = toChunk(ev); if (!chunk) continue; if (chunk.type === "usage") sawUsage = true; if (chunk.type === "done") sawDone = true; yield chunk; } } finally { if (!child.killed) child.kill("SIGTERM"); } if (stderrBuf.trim()) { logger.warn( { adapter: "codex-cli", stderr: stderrBuf.trim().slice(0, 1000) }, "codex-cli stderr", ); } if (!sawUsage) { yield { type: "usage", inputTokens: 0, outputTokens: 0 }; } if (!sawDone) { yield { type: "done", stopReason: "end_turn" }; } } export const codexCliAdapter: LlmAdapter = { name: "codex-cli", stream(opts: LlmStreamOptions): AsyncIterable { const cfg = readConfig(); return withGateway( { name: "codex-cli", source: "byok-direct", upstream: cfg.bin, retries: 0 }, (signal) => runCodexCli(cfg, opts, signal), ); }, };