""" Codex App Server engine — drives `codex app-server` over JSON-RPC (stdio) to get TRUE token-by-token streaming (`item/agentMessage/delta`), which `codex exec` cannot do. Protocol (verified against `codex app-server generate-ts --experimental`, v0.137.0): -> initialize {clientInfo, capabilities:{experimentalApi:true}} <- {result:{userAgent,...}} -> initialized (notification, no id) -> thread/start {cwd, approvalPolicy:"never", sandbox} | thread/resume {threadId,...} <- {result:{thread:{id}}} -> turn/start {threadId, input:[{type:"text", text, text_elements:[]}]} <- notifications: item/agentMessage/delta {delta}, item/completed {item}, thread/tokenUsage/updated {tokenUsage:{last:{inputTokens,...}}}, turn/completed -> end One short-lived app-server process per turn. Thread state persists in CODEX_HOME, so `thread/resume` works across processes (same as exec resume). """ import asyncio import json import os from pathlib import Path from typing import AsyncIterator, Optional class CodexError(Exception): pass _STREAM_LIMIT = 16 * 1024 * 1024 # allow large JSONL lines (full message items) async def _send(proc: asyncio.subprocess.Process, obj: dict) -> None: proc.stdin.write((json.dumps(obj) + "\n").encode("utf-8")) await proc.stdin.drain() _AUTH_DEAD = ("session has ended", "log in again", "failed to refresh token") async def run_turn( *, codex_bin: str, codex_home: str, prompt: str, workspace: Path, thread_id: Optional[str], sandbox: str, model: Optional[str], read_timeout: float, effort: Optional[str] = None, input_items: Optional[list] = None, output_schema: Optional[dict] = None, developer_instructions: Optional[str] = None, ) -> AsyncIterator[dict]: """ Async generator that drives one turn and yields events: {"type": "delta", "text": ""} # live token text {"type": "final", "text": "", "thread_id": "", "usage": {...}} Raises CodexError on protocol/process failure. """ env = {**os.environ, "CODEX_HOME": codex_home} try: proc = await asyncio.create_subprocess_exec( codex_bin, "app-server", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(workspace), env=env, limit=_STREAM_LIMIT, ) except (FileNotFoundError, OSError) as e: raise CodexError(f"could not start codex app-server ('{codex_bin}'): {e}") async def read_msg() -> Optional[dict]: """Read one JSON-RPC message; skip blank/garbage lines; None on EOF.""" while True: try: line = await asyncio.wait_for( proc.stdout.readline(), timeout=read_timeout ) except asyncio.TimeoutError: raise CodexError("app-server timed out (no output)") if not line: return None line = line.strip() if not line: continue try: return json.loads(line) except json.JSONDecodeError: continue # non-JSON log line on stdout — ignore async def await_response_raw(req_id: int) -> dict: """Read until the response for req_id; return the raw message (result or error).""" while True: msg = await read_msg() if msg is None: raise CodexError("app-server closed before responding") if msg.get("id") == req_id and ("result" in msg or "error" in msg): return msg async def await_response(req_id: int) -> dict: msg = await await_response_raw(req_id) if "error" in msg: raise CodexError(f"app-server error: {msg['error']}") return msg.get("result", {}) try: # 1) initialize handshake await _send(proc, { "method": "initialize", "id": 0, "params": { "clientInfo": { "name": "codex-as-api", "title": "Codex as API", "version": "1.0.0", }, "capabilities": { "experimentalApi": True, "requestAttestation": False, }, }, }) await await_response(0) await _send(proc, {"method": "initialized"}) # 2) resume the thread if we have one; fall back to a fresh thread if the # rollout is gone (e.g. CODEX_HOME is local and the Space restarted). tid = None if thread_id: resume_params = { "threadId": thread_id, "cwd": str(workspace), "approvalPolicy": "never", "sandbox": sandbox, "excludeTurns": True, } if developer_instructions: resume_params["developerInstructions"] = developer_instructions await _send(proc, {"method": "thread/resume", "id": 1, "params": resume_params}) resumed = await await_response_raw(1) if "result" in resumed: tid = (resumed["result"].get("thread") or {}).get("id") if tid is None: # no thread_id, or resume failed -> start fresh params = { "cwd": str(workspace), "approvalPolicy": "never", "sandbox": sandbox, } if model: params["model"] = model if developer_instructions: params["developerInstructions"] = developer_instructions await _send(proc, {"method": "thread/start", "id": 2, "params": params}) start_result = await await_response(2) tid = (start_result.get("thread") or {}).get("id") if not tid: raise CodexError("app-server did not return a thread id") # 3) start the turn turn_input = input_items or [ {"type": "text", "text": prompt, "text_elements": []} ] turn_params = {"threadId": tid, "input": turn_input} if model: turn_params["model"] = model if effort: turn_params["effort"] = effort if output_schema: turn_params["outputSchema"] = output_schema await _send(proc, {"method": "turn/start", "id": 3, "params": turn_params}) # 4) stream notifications until turn/completed delta_parts: list[str] = [] final_text: Optional[str] = None images: list[str] = [] # saved paths of any generated images usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, "prompt_tokens_details": {"cached_tokens": 0}} while True: msg = await read_msg() if msg is None: raise CodexError("app-server closed during the turn") method = msg.get("method") if method == "item/agentMessage/delta": delta = (msg.get("params") or {}).get("delta", "") if delta: delta_parts.append(delta) yield {"type": "delta", "text": delta} elif method in ("item/reasoning/textDelta", "item/reasoning/summaryTextDelta"): # Live "thinking" — stream it so there's no dead air before the # answer. Surfaced to clients as delta.reasoning_content. rd = (msg.get("params") or {}).get("delta") \ or (msg.get("params") or {}).get("text", "") if rd: yield {"type": "reasoning", "text": rd} elif method == "item/completed": item = (msg.get("params") or {}).get("item", {}) itype = item.get("type") if itype == "agentMessage" and item.get("text") is not None: final_text = item["text"] elif itype == "imageGeneration": path = item.get("savedPath") or item.get("result") if path: images.append(path) elif method == "thread/tokenUsage/updated": last = ((msg.get("params") or {}).get("tokenUsage") or {}).get("last", {}) usage = { "prompt_tokens": last.get("inputTokens", 0) or 0, "completion_tokens": last.get("outputTokens", 0) or 0, "total_tokens": last.get("totalTokens", 0) or 0, "prompt_tokens_details": { "cached_tokens": last.get("cachedInputTokens", 0) or 0}, } elif method == "error": err = (msg.get("params") or {}).get("error", {}) or {} blob = f"{err.get('message','')} {err.get('additionalDetails') or ''}".lower() # Dead login: don't sit through 5x reconnect retries (~60s). Bail now. if any(s in blob for s in _AUTH_DEAD): raise CodexError( "Codex login expired (session ended). Re-run `codex login` " "and re-upload a fresh auth.json to /data/.codex/auth.json." ) elif method == "turn/completed": break elif msg.get("id") == 3 and "error" in msg: raise CodexError(f"turn error: {msg['error']}") elif msg.get("id") is not None and method is not None: # Server->client request (e.g. an approval). With approvalPolicy # "never" this should not happen; decline so we never hang. await _send(proc, { "id": msg["id"], "error": {"code": -32601, "message": "approvals disabled"}, }) # other notifications (thread/started, turn/started, reasoning, ...) ignored text = final_text if final_text is not None else "".join(delta_parts) yield {"type": "final", "text": text, "thread_id": tid, "usage": usage, "images": images} finally: for action in ( lambda: proc.stdin.close(), proc.terminate, ): try: action() except Exception: pass try: await asyncio.wait_for(proc.wait(), timeout=5) except Exception: try: proc.kill() except Exception: pass