Spaces:
Running
Running
| """ | |
| Codex App Server engine — drives `codex app-server` over JSON-RPC (stdio) to get | |
| TRUE token-by-token streaming (`item/agentMessage/delta`), which `codex exec` | |
| cannot do. | |
| Protocol (verified against `codex app-server generate-ts --experimental`, v0.137.0): | |
| -> initialize {clientInfo, capabilities:{experimentalApi:true}} | |
| <- {result:{userAgent,...}} | |
| -> initialized (notification, no id) | |
| -> thread/start {cwd, approvalPolicy:"never", sandbox} | thread/resume {threadId,...} | |
| <- {result:{thread:{id}}} | |
| -> turn/start {threadId, input:[{type:"text", text, text_elements:[]}]} | |
| <- notifications: item/agentMessage/delta {delta}, item/completed {item}, | |
| thread/tokenUsage/updated {tokenUsage:{last:{inputTokens,...}}}, | |
| turn/completed -> end | |
| One short-lived app-server process per turn. Thread state persists in CODEX_HOME, | |
| so `thread/resume` works across processes (same as exec resume). | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| from pathlib import Path | |
| from typing import AsyncIterator, Optional | |
| class CodexError(Exception): | |
| pass | |
| _STREAM_LIMIT = 16 * 1024 * 1024 # allow large JSONL lines (full message items) | |
| async def _send(proc: asyncio.subprocess.Process, obj: dict) -> None: | |
| proc.stdin.write((json.dumps(obj) + "\n").encode("utf-8")) | |
| await proc.stdin.drain() | |
| _AUTH_DEAD = ("session has ended", "log in again", "failed to refresh token") | |
| async def run_turn( | |
| *, | |
| codex_bin: str, | |
| codex_home: str, | |
| prompt: str, | |
| workspace: Path, | |
| thread_id: Optional[str], | |
| sandbox: str, | |
| model: Optional[str], | |
| read_timeout: float, | |
| effort: Optional[str] = None, | |
| input_items: Optional[list] = None, | |
| output_schema: Optional[dict] = None, | |
| developer_instructions: Optional[str] = None, | |
| ) -> AsyncIterator[dict]: | |
| """ | |
| Async generator that drives one turn and yields events: | |
| {"type": "delta", "text": "<chunk>"} # live token text | |
| {"type": "final", "text": "<full>", "thread_id": "<id>", "usage": {...}} | |
| Raises CodexError on protocol/process failure. | |
| """ | |
| env = {**os.environ, "CODEX_HOME": codex_home} | |
| try: | |
| proc = await asyncio.create_subprocess_exec( | |
| codex_bin, "app-server", | |
| stdin=asyncio.subprocess.PIPE, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| cwd=str(workspace), | |
| env=env, | |
| limit=_STREAM_LIMIT, | |
| ) | |
| except (FileNotFoundError, OSError) as e: | |
| raise CodexError(f"could not start codex app-server ('{codex_bin}'): {e}") | |
| async def read_msg() -> Optional[dict]: | |
| """Read one JSON-RPC message; skip blank/garbage lines; None on EOF.""" | |
| while True: | |
| try: | |
| line = await asyncio.wait_for( | |
| proc.stdout.readline(), timeout=read_timeout | |
| ) | |
| except asyncio.TimeoutError: | |
| raise CodexError("app-server timed out (no output)") | |
| if not line: | |
| return None | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| return json.loads(line) | |
| except json.JSONDecodeError: | |
| continue # non-JSON log line on stdout — ignore | |
| async def await_response_raw(req_id: int) -> dict: | |
| """Read until the response for req_id; return the raw message (result or error).""" | |
| while True: | |
| msg = await read_msg() | |
| if msg is None: | |
| raise CodexError("app-server closed before responding") | |
| if msg.get("id") == req_id and ("result" in msg or "error" in msg): | |
| return msg | |
| async def await_response(req_id: int) -> dict: | |
| msg = await await_response_raw(req_id) | |
| if "error" in msg: | |
| raise CodexError(f"app-server error: {msg['error']}") | |
| return msg.get("result", {}) | |
| try: | |
| # 1) initialize handshake | |
| await _send(proc, { | |
| "method": "initialize", | |
| "id": 0, | |
| "params": { | |
| "clientInfo": { | |
| "name": "codex-as-api", | |
| "title": "Codex as API", | |
| "version": "1.0.0", | |
| }, | |
| "capabilities": { | |
| "experimentalApi": True, | |
| "requestAttestation": False, | |
| }, | |
| }, | |
| }) | |
| await await_response(0) | |
| await _send(proc, {"method": "initialized"}) | |
| # 2) resume the thread if we have one; fall back to a fresh thread if the | |
| # rollout is gone (e.g. CODEX_HOME is local and the Space restarted). | |
| tid = None | |
| if thread_id: | |
| resume_params = { | |
| "threadId": thread_id, | |
| "cwd": str(workspace), | |
| "approvalPolicy": "never", | |
| "sandbox": sandbox, | |
| "excludeTurns": True, | |
| } | |
| if developer_instructions: | |
| resume_params["developerInstructions"] = developer_instructions | |
| await _send(proc, {"method": "thread/resume", "id": 1, "params": resume_params}) | |
| resumed = await await_response_raw(1) | |
| if "result" in resumed: | |
| tid = (resumed["result"].get("thread") or {}).get("id") | |
| if tid is None: # no thread_id, or resume failed -> start fresh | |
| params = { | |
| "cwd": str(workspace), | |
| "approvalPolicy": "never", | |
| "sandbox": sandbox, | |
| } | |
| if model: | |
| params["model"] = model | |
| if developer_instructions: | |
| params["developerInstructions"] = developer_instructions | |
| await _send(proc, {"method": "thread/start", "id": 2, "params": params}) | |
| start_result = await await_response(2) | |
| tid = (start_result.get("thread") or {}).get("id") | |
| if not tid: | |
| raise CodexError("app-server did not return a thread id") | |
| # 3) start the turn | |
| turn_input = input_items or [ | |
| {"type": "text", "text": prompt, "text_elements": []} | |
| ] | |
| turn_params = {"threadId": tid, "input": turn_input} | |
| if model: | |
| turn_params["model"] = model | |
| if effort: | |
| turn_params["effort"] = effort | |
| if output_schema: | |
| turn_params["outputSchema"] = output_schema | |
| await _send(proc, {"method": "turn/start", "id": 3, "params": turn_params}) | |
| # 4) stream notifications until turn/completed | |
| delta_parts: list[str] = [] | |
| final_text: Optional[str] = None | |
| images: list[str] = [] # saved paths of any generated images | |
| usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, | |
| "prompt_tokens_details": {"cached_tokens": 0}} | |
| while True: | |
| msg = await read_msg() | |
| if msg is None: | |
| raise CodexError("app-server closed during the turn") | |
| method = msg.get("method") | |
| if method == "item/agentMessage/delta": | |
| delta = (msg.get("params") or {}).get("delta", "") | |
| if delta: | |
| delta_parts.append(delta) | |
| yield {"type": "delta", "text": delta} | |
| elif method in ("item/reasoning/textDelta", | |
| "item/reasoning/summaryTextDelta"): | |
| # Live "thinking" — stream it so there's no dead air before the | |
| # answer. Surfaced to clients as delta.reasoning_content. | |
| rd = (msg.get("params") or {}).get("delta") \ | |
| or (msg.get("params") or {}).get("text", "") | |
| if rd: | |
| yield {"type": "reasoning", "text": rd} | |
| elif method == "item/completed": | |
| item = (msg.get("params") or {}).get("item", {}) | |
| itype = item.get("type") | |
| if itype == "agentMessage" and item.get("text") is not None: | |
| final_text = item["text"] | |
| elif itype == "imageGeneration": | |
| path = item.get("savedPath") or item.get("result") | |
| if path: | |
| images.append(path) | |
| elif method == "thread/tokenUsage/updated": | |
| last = ((msg.get("params") or {}).get("tokenUsage") or {}).get("last", {}) | |
| usage = { | |
| "prompt_tokens": last.get("inputTokens", 0) or 0, | |
| "completion_tokens": last.get("outputTokens", 0) or 0, | |
| "total_tokens": last.get("totalTokens", 0) or 0, | |
| "prompt_tokens_details": { | |
| "cached_tokens": last.get("cachedInputTokens", 0) or 0}, | |
| } | |
| elif method == "error": | |
| err = (msg.get("params") or {}).get("error", {}) or {} | |
| blob = f"{err.get('message','')} {err.get('additionalDetails') or ''}".lower() | |
| # Dead login: don't sit through 5x reconnect retries (~60s). Bail now. | |
| if any(s in blob for s in _AUTH_DEAD): | |
| raise CodexError( | |
| "Codex login expired (session ended). Re-run `codex login` " | |
| "and re-upload a fresh auth.json to /data/.codex/auth.json." | |
| ) | |
| elif method == "turn/completed": | |
| break | |
| elif msg.get("id") == 3 and "error" in msg: | |
| raise CodexError(f"turn error: {msg['error']}") | |
| elif msg.get("id") is not None and method is not None: | |
| # Server->client request (e.g. an approval). With approvalPolicy | |
| # "never" this should not happen; decline so we never hang. | |
| await _send(proc, { | |
| "id": msg["id"], | |
| "error": {"code": -32601, "message": "approvals disabled"}, | |
| }) | |
| # other notifications (thread/started, turn/started, reasoning, ...) ignored | |
| text = final_text if final_text is not None else "".join(delta_parts) | |
| yield {"type": "final", "text": text, "thread_id": tid, | |
| "usage": usage, "images": images} | |
| finally: | |
| for action in ( | |
| lambda: proc.stdin.close(), | |
| proc.terminate, | |
| ): | |
| try: | |
| action() | |
| except Exception: | |
| pass | |
| try: | |
| await asyncio.wait_for(proc.wait(), timeout=5) | |
| except Exception: | |
| try: | |
| proc.kill() | |
| except Exception: | |
| pass | |