codex / codex_engine.py
sarveshpatel's picture
Upload 10 files
19491c5 verified
Raw
History Blame Contribute Delete
10.7 kB
"""
Codex App Server engine — drives `codex app-server` over JSON-RPC (stdio) to get
TRUE token-by-token streaming (`item/agentMessage/delta`), which `codex exec`
cannot do.
Protocol (verified against `codex app-server generate-ts --experimental`, v0.137.0):
-> initialize {clientInfo, capabilities:{experimentalApi:true}}
<- {result:{userAgent,...}}
-> initialized (notification, no id)
-> thread/start {cwd, approvalPolicy:"never", sandbox} | thread/resume {threadId,...}
<- {result:{thread:{id}}}
-> turn/start {threadId, input:[{type:"text", text, text_elements:[]}]}
<- notifications: item/agentMessage/delta {delta}, item/completed {item},
thread/tokenUsage/updated {tokenUsage:{last:{inputTokens,...}}},
turn/completed -> end
One short-lived app-server process per turn. Thread state persists in CODEX_HOME,
so `thread/resume` works across processes (same as exec resume).
"""
import asyncio
import json
import os
from pathlib import Path
from typing import AsyncIterator, Optional
class CodexError(Exception):
pass
_STREAM_LIMIT = 16 * 1024 * 1024 # allow large JSONL lines (full message items)
async def _send(proc: asyncio.subprocess.Process, obj: dict) -> None:
proc.stdin.write((json.dumps(obj) + "\n").encode("utf-8"))
await proc.stdin.drain()
_AUTH_DEAD = ("session has ended", "log in again", "failed to refresh token")
async def run_turn(
*,
codex_bin: str,
codex_home: str,
prompt: str,
workspace: Path,
thread_id: Optional[str],
sandbox: str,
model: Optional[str],
read_timeout: float,
effort: Optional[str] = None,
input_items: Optional[list] = None,
output_schema: Optional[dict] = None,
developer_instructions: Optional[str] = None,
) -> AsyncIterator[dict]:
"""
Async generator that drives one turn and yields events:
{"type": "delta", "text": "<chunk>"} # live token text
{"type": "final", "text": "<full>", "thread_id": "<id>", "usage": {...}}
Raises CodexError on protocol/process failure.
"""
env = {**os.environ, "CODEX_HOME": codex_home}
try:
proc = await asyncio.create_subprocess_exec(
codex_bin, "app-server",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(workspace),
env=env,
limit=_STREAM_LIMIT,
)
except (FileNotFoundError, OSError) as e:
raise CodexError(f"could not start codex app-server ('{codex_bin}'): {e}")
async def read_msg() -> Optional[dict]:
"""Read one JSON-RPC message; skip blank/garbage lines; None on EOF."""
while True:
try:
line = await asyncio.wait_for(
proc.stdout.readline(), timeout=read_timeout
)
except asyncio.TimeoutError:
raise CodexError("app-server timed out (no output)")
if not line:
return None
line = line.strip()
if not line:
continue
try:
return json.loads(line)
except json.JSONDecodeError:
continue # non-JSON log line on stdout — ignore
async def await_response_raw(req_id: int) -> dict:
"""Read until the response for req_id; return the raw message (result or error)."""
while True:
msg = await read_msg()
if msg is None:
raise CodexError("app-server closed before responding")
if msg.get("id") == req_id and ("result" in msg or "error" in msg):
return msg
async def await_response(req_id: int) -> dict:
msg = await await_response_raw(req_id)
if "error" in msg:
raise CodexError(f"app-server error: {msg['error']}")
return msg.get("result", {})
try:
# 1) initialize handshake
await _send(proc, {
"method": "initialize",
"id": 0,
"params": {
"clientInfo": {
"name": "codex-as-api",
"title": "Codex as API",
"version": "1.0.0",
},
"capabilities": {
"experimentalApi": True,
"requestAttestation": False,
},
},
})
await await_response(0)
await _send(proc, {"method": "initialized"})
# 2) resume the thread if we have one; fall back to a fresh thread if the
# rollout is gone (e.g. CODEX_HOME is local and the Space restarted).
tid = None
if thread_id:
resume_params = {
"threadId": thread_id,
"cwd": str(workspace),
"approvalPolicy": "never",
"sandbox": sandbox,
"excludeTurns": True,
}
if developer_instructions:
resume_params["developerInstructions"] = developer_instructions
await _send(proc, {"method": "thread/resume", "id": 1, "params": resume_params})
resumed = await await_response_raw(1)
if "result" in resumed:
tid = (resumed["result"].get("thread") or {}).get("id")
if tid is None: # no thread_id, or resume failed -> start fresh
params = {
"cwd": str(workspace),
"approvalPolicy": "never",
"sandbox": sandbox,
}
if model:
params["model"] = model
if developer_instructions:
params["developerInstructions"] = developer_instructions
await _send(proc, {"method": "thread/start", "id": 2, "params": params})
start_result = await await_response(2)
tid = (start_result.get("thread") or {}).get("id")
if not tid:
raise CodexError("app-server did not return a thread id")
# 3) start the turn
turn_input = input_items or [
{"type": "text", "text": prompt, "text_elements": []}
]
turn_params = {"threadId": tid, "input": turn_input}
if model:
turn_params["model"] = model
if effort:
turn_params["effort"] = effort
if output_schema:
turn_params["outputSchema"] = output_schema
await _send(proc, {"method": "turn/start", "id": 3, "params": turn_params})
# 4) stream notifications until turn/completed
delta_parts: list[str] = []
final_text: Optional[str] = None
images: list[str] = [] # saved paths of any generated images
usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0,
"prompt_tokens_details": {"cached_tokens": 0}}
while True:
msg = await read_msg()
if msg is None:
raise CodexError("app-server closed during the turn")
method = msg.get("method")
if method == "item/agentMessage/delta":
delta = (msg.get("params") or {}).get("delta", "")
if delta:
delta_parts.append(delta)
yield {"type": "delta", "text": delta}
elif method in ("item/reasoning/textDelta",
"item/reasoning/summaryTextDelta"):
# Live "thinking" — stream it so there's no dead air before the
# answer. Surfaced to clients as delta.reasoning_content.
rd = (msg.get("params") or {}).get("delta") \
or (msg.get("params") or {}).get("text", "")
if rd:
yield {"type": "reasoning", "text": rd}
elif method == "item/completed":
item = (msg.get("params") or {}).get("item", {})
itype = item.get("type")
if itype == "agentMessage" and item.get("text") is not None:
final_text = item["text"]
elif itype == "imageGeneration":
path = item.get("savedPath") or item.get("result")
if path:
images.append(path)
elif method == "thread/tokenUsage/updated":
last = ((msg.get("params") or {}).get("tokenUsage") or {}).get("last", {})
usage = {
"prompt_tokens": last.get("inputTokens", 0) or 0,
"completion_tokens": last.get("outputTokens", 0) or 0,
"total_tokens": last.get("totalTokens", 0) or 0,
"prompt_tokens_details": {
"cached_tokens": last.get("cachedInputTokens", 0) or 0},
}
elif method == "error":
err = (msg.get("params") or {}).get("error", {}) or {}
blob = f"{err.get('message','')} {err.get('additionalDetails') or ''}".lower()
# Dead login: don't sit through 5x reconnect retries (~60s). Bail now.
if any(s in blob for s in _AUTH_DEAD):
raise CodexError(
"Codex login expired (session ended). Re-run `codex login` "
"and re-upload a fresh auth.json to /data/.codex/auth.json."
)
elif method == "turn/completed":
break
elif msg.get("id") == 3 and "error" in msg:
raise CodexError(f"turn error: {msg['error']}")
elif msg.get("id") is not None and method is not None:
# Server->client request (e.g. an approval). With approvalPolicy
# "never" this should not happen; decline so we never hang.
await _send(proc, {
"id": msg["id"],
"error": {"code": -32601, "message": "approvals disabled"},
})
# other notifications (thread/started, turn/started, reasoning, ...) ignored
text = final_text if final_text is not None else "".join(delta_parts)
yield {"type": "final", "text": text, "thread_id": tid,
"usage": usage, "images": images}
finally:
for action in (
lambda: proc.stdin.close(),
proc.terminate,
):
try:
action()
except Exception:
pass
try:
await asyncio.wait_for(proc.wait(), timeout=5)
except Exception:
try:
proc.kill()
except Exception:
pass