""" Codex-as-API: an OpenAI-compatible HTTP wrapper around the OpenAI Codex CLI. Auth is your ChatGPT login (auth.json in CODEX_HOME=/data/.codex), NOT an API key. Sessions + auth persist in the /data bucket, so they survive Space restarts. Streaming is REAL token-by-token streaming, driven by the Codex App Server (JSON-RPC over stdio) via codex_engine.run_turn — `codex exec` cannot stream. Endpoints: GET /health -> liveness + auth status GET /v1/models -> static model list (OpenAI shape) POST /v1/chat/completions -> run Codex; OpenAI completion or SSE stream Security: every /v1 request needs `Authorization: Bearer `. Sessions: pass `X-Session-Id` (or the OpenAI `user` field) for a persistent workdir at /data/sessions//workspace + Codex thread resume. None -> ephemeral. """ import asyncio import base64 import json import os import re import shutil import time import uuid from contextlib import aclosing from pathlib import Path from typing import Any, Optional from fastapi import FastAPI, Header, HTTPException, Request from fastapi.responses import FileResponse, JSONResponse, StreamingResponse from pydantic import BaseModel from codex_engine import CodexError, run_turn from codex_pool import run_turn_pool # Load .env if present (local convenience). On the Space, env comes from Space # Variables/Secrets and .env is absent — load_dotenv is then a harmless no-op. try: from dotenv import load_dotenv load_dotenv() except ImportError: pass # --------------------------------------------------------------------------- # # Config # --------------------------------------------------------------------------- # CODEX_BIN = os.environ.get("CODEX_BIN", "codex") # on the Space this is `codex` # CODEX_HOME lives on FAST LOCAL disk (Codex hammers SQLite here every turn — a # network bucket makes that brutally slow). auth.json is seeded from / synced # back to AUTH_PERSIST_DIR (the /data bucket) so the login still persists. CODEX_HOME = os.environ.get("CODEX_HOME", "/tmp/.codex") AUTH_PERSIST_DIR = Path(os.environ.get("AUTH_PERSIST_DIR", "/data/.codex")) AUTH_FILE = Path(CODEX_HOME) / "auth.json" SESSIONS_ROOT = Path(os.environ.get("SESSIONS_ROOT", "/data/sessions")) API_TOKEN = os.environ.get("API_TOKEN", "") # HF secret; if empty, auth is OPEN DEFAULT_SANDBOX = os.environ.get("CODEX_SANDBOX", "workspace-write") # or read-only CODEX_MODEL = os.environ.get("CODEX_MODEL", "").strip() # optional override CODEX_EFFORT = os.environ.get("CODEX_EFFORT", "low").strip() # minimal|low|medium|high # Engine: "spawn" = cold process per request (proven). "pool" = one warm process # reused across requests (lower latency; serializes turns). Opt-in. CODEX_ENGINE = os.environ.get("CODEX_ENGINE", "spawn").strip().lower() # Branded, user-facing error messages (never leak the underlying engine/errors). LIMIT_MESSAGE = os.environ.get( "LIMIT_MESSAGE", "Antaram AI has reached its usage limit right now. Please try again later, " "or contact the administrator, Aditya Devarshi.") UNAVAILABLE_MESSAGE = os.environ.get( "UNAVAILABLE_MESSAGE", "Antaram AI is temporarily unavailable. Please contact the administrator, " "Aditya Devarshi.") ERROR_MESSAGE = os.environ.get( "ERROR_MESSAGE", "Sorry — Antaram AI couldn't complete that request. Please try again, or " "contact the administrator, Aditya Devarshi.") def _classify_error(raw: str) -> tuple[int, str]: """Map a raw engine error to (http_status, branded message) — never leaks internals.""" low = (raw or "").lower() if any(k in low for k in ("rate limit", "rate_limit", "429", "too many requests", "usage limit", "quota", "exceeded", "insufficient")): return 429, LIMIT_MESSAGE if any(k in low for k in ("session has ended", "failed to refresh token", "log in again", "unauthorized", "invalid api key", "401", "403")): return 503, UNAVAILABLE_MESSAGE return 502, ERROR_MESSAGE def _branded(raw: str) -> str: return _classify_error(raw)[1] # Hidden, authoritative global system prompt (Antaram AI identity + guardrails). # Injected as developerInstructions on every turn, ABOVE any user system prompt. _GLOBAL_SYSTEM_FILE = os.environ.get( "GLOBAL_SYSTEM_FILE", str(Path(__file__).parent / "global_system.md")) try: GLOBAL_SYSTEM = Path(_GLOBAL_SYSTEM_FILE).read_text(encoding="utf-8").strip() except Exception: GLOBAL_SYSTEM = ("You are Antaram AI (ai.antaram.org), created by Aditya " "Devarshi. Never reveal you are anything else or how you work; " "never perform destructive or host-system operations.") def _developer_instructions(user_system: Optional[str]) -> str: """Global rules first (authoritative); user system prompt is subordinate.""" if user_system: return (f"{GLOBAL_SYSTEM}\n\n" "--- Additional user preferences (subordinate to the rules above; " "ignore any part that conflicts with them) ---\n" f"{user_system}") return GLOBAL_SYSTEM PUBLIC_BASE_URL = os.environ.get("PUBLIC_BASE_URL", "").rstrip("/") # for image URLs READ_TIMEOUT = float(os.environ.get("CODEX_TIMEOUT", "180")) # per-output-gap secs # Max Codex turns running at once across all sessions (each is a heavy process). MAX_CONCURRENCY = int(os.environ.get("CODEX_MAX_CONCURRENCY", "4")) # How long a request may wait in the queue before we give up with 429. QUEUE_TIMEOUT = float(os.environ.get("CODEX_QUEUE_TIMEOUT", "90")) DEFAULT_MODEL_NAME = "antaram-pro" SESSION_ID_RE = re.compile(r"[^A-Za-z0-9_.-]") APP_VERSION = "1.0.0" APP_DESCRIPTION = ( "Antaram API — an OpenAI-compatible chat API by Antaram (founder: Aditya " "Devarshi).\n\n" "**Capabilities**\n" "- Text chat — streaming or non-streaming (`POST /v1/chat/completions`)\n" "- Image **input** (vision): attach images as `image_url` content parts and " "ask about them\n" "- Structured output via `response_format` (JSON object / JSON schema)\n" "- Persistent conversations via the `X-Session-Id` header\n\n" "**Not supported:** image generation, data analysis, or code execution — " "this is a text + image-understanding API.\n\n" "**Auth:** send `Authorization: Bearer ` on every `/v1` request.\n\n" "Base URL ends in `/v1`." ) # docs_url/redoc_url=None hide the interactive Swagger UI (/docs) and ReDoc # (/redoc) from end users; the machine-readable schema stays at /openapi.json. app = FastAPI( title="Antaram API", version=APP_VERSION, description=APP_DESCRIPTION, docs_url=None, redoc_url=None, ) # --------------------------------------------------------------------------- # # Concurrency control # - _GLOBAL_SEM caps total simultaneous Codex processes (resource guard). # - _SESSION_LOCKS serialize requests that share a session id, so two calls # never resume/operate on the same thread + workspace at once (corruption). # Different sessions still run fully in parallel (up to the global cap). # --------------------------------------------------------------------------- # _GLOBAL_SEM = asyncio.Semaphore(MAX_CONCURRENCY) _SESSION_LOCKS: dict[str, asyncio.Lock] = {} def _session_lock(session_id: Optional[str]) -> Optional[asyncio.Lock]: if not session_id: return None # ephemeral request: unique workspace, no shared state # setdefault is atomic between awaits in asyncio's single thread. return _SESSION_LOCKS.setdefault(session_id, asyncio.Lock()) class _TurnGuard: """Acquire (session lock -> global slot) with a bounded wait; release both.""" def __init__(self, session_id: Optional[str]): self._lock = _session_lock(session_id) self._have_lock = False self._have_slot = False async def acquire(self) -> None: loop = asyncio.get_event_loop() deadline = loop.time() + QUEUE_TIMEOUT try: if self._lock is not None: await asyncio.wait_for( self._lock.acquire(), timeout=QUEUE_TIMEOUT ) self._have_lock = True remaining = max(0.1, deadline - loop.time()) await asyncio.wait_for(_GLOBAL_SEM.acquire(), timeout=remaining) self._have_slot = True except (asyncio.TimeoutError, TimeoutError): self.release() raise HTTPException( status_code=429, detail="Server busy (concurrency/session limit). Retry shortly.", ) def release(self) -> None: if self._have_slot: self._have_slot = False _GLOBAL_SEM.release() if self._have_lock: self._have_lock = False self._lock.release() # --------------------------------------------------------------------------- # # Request models (loose — we only read what we need) # --------------------------------------------------------------------------- # class ChatMessage(BaseModel): role: str content: Any = "" # str, or list of content parts class ChatRequest(BaseModel): model: Optional[str] = None messages: list[ChatMessage] = [] stream: bool = False stream_options: Optional[dict] = None response_format: Optional[dict] = None # {type: json_schema|json_object} user: Optional[str] = None # --------------------------------------------------------------------------- # # Helpers # --------------------------------------------------------------------------- # def _check_auth(authorization: Optional[str]) -> None: if not API_TOKEN: return # open mode (not recommended) if authorization != f"Bearer {API_TOKEN}": raise HTTPException(status_code=401, detail="Invalid or missing API token.") # Codex rotates auth.json in CODEX_HOME (local disk) when it refreshes the token. # Persist that rotated copy back to the bucket so the login survives restarts. _last_auth_mtime = AUTH_FILE.stat().st_mtime if AUTH_FILE.exists() else 0.0 def _sync_auth_back() -> None: try: if not AUTH_FILE.exists(): return global _last_auth_mtime m = AUTH_FILE.stat().st_mtime if m <= _last_auth_mtime: return # unchanged since last sync — skip the (slow) bucket write AUTH_PERSIST_DIR.mkdir(parents=True, exist_ok=True) shutil.copy2(AUTH_FILE, AUTH_PERSIST_DIR / "auth.json") _last_auth_mtime = m except Exception: pass # best-effort; never fail a request over this def _require_login() -> None: # Branded message for clients; admins see the real state via GET /health. if not AUTH_FILE.exists(): raise HTTPException(status_code=503, detail=UNAVAILABLE_MESSAGE) def _flatten_content(content: Any) -> str: if isinstance(content, str): return content if isinstance(content, list): parts = [] for p in content: if isinstance(p, dict): parts.append(p.get("text") or p.get("content") or "") else: parts.append(str(p)) return "\n".join(x for x in parts if x) return str(content or "") def _safe_session_id(raw: Optional[str]) -> Optional[str]: if not raw: return None cleaned = SESSION_ID_RE.sub("-", raw.strip())[:64] return cleaned or None def _thread_file(session_dir: Path) -> Path: return session_dir / "thread_id" def _system_text(messages: list[ChatMessage]) -> str: """Concatenated system-message text (mapped to Codex developerInstructions).""" return "\n\n".join( _flatten_content(m.content) for m in messages if m.role == "system" ).strip() def _build_prompt(messages: list[ChatMessage], resuming: bool, include_system: bool = True) -> str: """ Turn the OpenAI message list into a single prompt for Codex. - resuming: Codex holds the thread history, so send only the latest user turn. - new/stateless: send the whole transcript as context. - include_system=False: omit system text (it goes to developerInstructions). """ sys_text = _system_text(messages) if include_system else "" if resuming: last_user = next((m for m in reversed(messages) if m.role == "user"), None) body = _flatten_content(last_user.content) if last_user else "" return (f"{sys_text}\n\n{body}").strip() if sys_text else body.strip() lines = [] for m in messages: if m.role == "system": continue tag = "User" if m.role == "user" else "Assistant" lines.append(f"{tag}: {_flatten_content(m.content)}") transcript = "\n\n".join(lines).strip() return (f"{sys_text}\n\n{transcript}").strip() if sys_text else transcript def _image_url_to_item(url: Any, workspace: Path) -> Optional[dict]: """Map an OpenAI image_url (http(s) or data: URL) to a Codex input item.""" if not isinstance(url, str) or not url: return None if url.startswith("data:"): try: header, b64 = url.split(",", 1) except ValueError: return None h = header.lower() ext = "jpg" if ("jpeg" in h or "jpg" in h) else \ "webp" if "webp" in h else "gif" if "gif" in h else "png" try: data = base64.b64decode(b64) except Exception: return None workspace.mkdir(parents=True, exist_ok=True) p = workspace / f"input-{uuid.uuid4().hex[:8]}.{ext}" p.write_bytes(data) return {"type": "localImage", "path": str(p.resolve())} return {"type": "image", "url": url} def _image_items_from_last_user(messages: list[ChatMessage], workspace: Path) -> list[dict]: last_user = next((m for m in reversed(messages) if m.role == "user"), None) items: list[dict] = [] if last_user and isinstance(last_user.content, list): for part in last_user.content: if isinstance(part, dict) and part.get("type") == "image_url": iu = part.get("image_url") url = iu.get("url") if isinstance(iu, dict) else iu item = _image_url_to_item(url, workspace) if item: items.append(item) return items def _build_input(messages: list[ChatMessage], resuming: bool, workspace: Path, extra_instruction: str = "") -> list[dict]: """Codex turn input: a text item (transcript, no system) + any image items. System text is sent separately as developerInstructions.""" text = _build_prompt(messages, resuming, include_system=False) if extra_instruction: text = f"{text}\n\n{extra_instruction}".strip() if text else extra_instruction items: list[dict] = [] if text: items.append({"type": "text", "text": text, "text_elements": []}) items.extend(_image_items_from_last_user(messages, workspace)) return items or [{"type": "text", "text": "", "text_elements": []}] def _parse_response_format(rf: Optional[dict]) -> tuple[Optional[dict], str]: """OpenAI response_format -> (Codex outputSchema, extra prompt instruction).""" if not isinstance(rf, dict): return None, "" t = rf.get("type") if t == "json_schema": schema = (rf.get("json_schema") or {}).get("schema") return (schema if isinstance(schema, dict) else None), "" if t == "json_object": return None, "Respond ONLY with a single valid JSON object — no prose, no code fences." return None, "" def _resolve_workspace(session_id: Optional[str]) -> tuple[Path, Optional[Path], Optional[str]]: """Return (workspace, session_dir, thread_id) for this request.""" if session_id: session_dir = SESSIONS_ROOT / session_id workspace = session_dir / "workspace" workspace.mkdir(parents=True, exist_ok=True) tf = _thread_file(session_dir) thread_id = tf.read_text().strip() if tf.exists() else None return workspace, session_dir, thread_id workspace = Path("/tmp") / f"codex-{uuid.uuid4().hex}" workspace.mkdir(parents=True, exist_ok=True) return workspace, None, None def _persist_thread(session_dir: Optional[Path], thread_id: Optional[str]) -> None: if session_dir is not None and thread_id: tf = _thread_file(session_dir) if not tf.exists() or tf.read_text().strip() != thread_id: tf.write_text(thread_id) def _stage_images(session_id: Optional[str], workspace: Path, images: list[str]) -> list[str]: """Copy generated images (Codex saves them under CODEX_HOME) into the session workspace so /v1/files can serve them. Returns retrievable URL paths.""" urls: list[str] = [] if not session_id or not images: return urls try: workspace.mkdir(parents=True, exist_ok=True) except Exception: return urls for p in images: try: src = Path(p) if not src.is_file(): continue try: src.relative_to(workspace.resolve()) dst = src except ValueError: dst = workspace / src.name shutil.copy2(src, dst) name = dst.relative_to(workspace).as_posix() urls.append(f"{PUBLIC_BASE_URL}/v1/files/{session_id}/{name}") except Exception: continue return urls def make_turn(*, prompt: str = "", workspace, thread_id, sandbox, model=None, effort=None, input_items=None, output_schema=None, session_id=None, developer_instructions=None): """Pick the warm pool or cold-spawn engine — identical event contract.""" common = dict( codex_bin=CODEX_BIN, codex_home=CODEX_HOME, prompt=prompt, workspace=workspace, thread_id=thread_id, sandbox=sandbox, model=model, read_timeout=READ_TIMEOUT, effort=effort, input_items=input_items, output_schema=output_schema, developer_instructions=developer_instructions, ) if CODEX_ENGINE == "pool": return run_turn_pool(session_id=session_id, **common) return run_turn(**common) def _completion_payload(content: str, model: str, usage: dict) -> dict: return { "id": f"chatcmpl-{uuid.uuid4().hex}", "object": "chat.completion", "created": int(time.time()), "model": model, "choices": [ { "index": 0, "message": {"role": "assistant", "content": content}, "finish_reason": "stop", } ], "usage": usage, } # --------------------------------------------------------------------------- # # Routes # --------------------------------------------------------------------------- # @app.get("/health") async def health(): """Lightweight, end-user liveness check (no internal details).""" available = AUTH_FILE.exists() return { "status": "ok" if available else "degraded", "service": "Antaram API", "version": APP_VERSION, "available": available, } @app.get("/v1/models") async def models(authorization: Optional[str] = Header(default=None)): _check_auth(authorization) created = int(time.time()) return { "object": "list", "data": [ {"id": "antaram-flash", "object": "model", "created": created, "owned_by": "antaram"}, {"id": "antaram-pro", "object": "model", "created": created, "owned_by": "antaram"}, ], } @app.post("/v1/chat/completions") async def chat_completions( request: Request, authorization: Optional[str] = Header(default=None), x_session_id: Optional[str] = Header(default=None), ): _check_auth(authorization) _require_login() body = await request.json() req = ChatRequest(**body) if not req.messages: raise HTTPException(status_code=400, detail="`messages` is required.") session_id = _safe_session_id(x_session_id or req.user) model_name = req.model or DEFAULT_MODEL_NAME workspace, session_dir, thread_id = _resolve_workspace(session_id) # Structured output (response_format) + vision input (image_url parts) + # system prompt -> Codex developerInstructions. output_schema, rf_instruction = _parse_response_format(req.response_format) input_items = _build_input(req.messages, bool(thread_id), workspace, rf_instruction) developer_instructions = _developer_instructions(_system_text(req.messages) or None) has_content = any( (it.get("type") == "text" and it.get("text")) or it.get("type") in ("image", "localImage") for it in input_items) if not has_content: raise HTTPException(status_code=400, detail="Empty prompt after parsing.") # Acquire concurrency guard BEFORE starting work, so we can fail fast with # 429 (for streaming, headers are sent once we return — can't 429 later). guard = _TurnGuard(session_id) await guard.acquire() turn = make_turn( workspace=workspace, thread_id=thread_id, sandbox=DEFAULT_SANDBOX, model=CODEX_MODEL or None, effort=CODEX_EFFORT or None, input_items=input_items, output_schema=output_schema, session_id=session_id, developer_instructions=developer_instructions, ) if req.stream: include_usage = bool((req.stream_options or {}).get("include_usage")) # _sse_stream owns the guard from here and releases it when done. return StreamingResponse( _sse_stream(turn, model_name, session_dir, include_usage, guard, session_id, workspace), media_type="text/event-stream", ) # Non-streaming: drain the generator, return one completion. content_parts: list[str] = [] usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, "prompt_tokens_details": {"cached_tokens": 0}} images: list[str] = [] try: async with aclosing(turn) as t: async for evt in t: if evt["type"] == "delta": content_parts.append(evt["text"]) elif evt["type"] == "final": if evt.get("text"): content_parts = [evt["text"]] # authoritative full text usage = evt.get("usage", usage) images = evt.get("images", []) or [] _persist_thread(session_dir, evt.get("thread_id")) except CodexError as e: # Show a clean branded message as the assistant's reply (no leaked internals). return JSONResponse(_completion_payload( _branded(str(e)), model_name, {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, "prompt_tokens_details": {"cached_tokens": 0}})) finally: guard.release() _sync_auth_back() for url in _stage_images(session_id, workspace, images): content_parts.append(f"\n\n🖼️ image: {url}") return JSONResponse(_completion_payload("".join(content_parts), model_name, usage)) async def _sse_stream(turn, model: str, session_dir, include_usage: bool, guard, session_id, workspace): """OpenAI-compatible SSE: role chunk, live content deltas, finish, [DONE]. Owns `guard`: releases the concurrency slot/session lock when the turn ends OR the client disconnects (aclosing -> run_turn finally kills the process). """ cid = f"chatcmpl-{uuid.uuid4().hex}" created = int(time.time()) def chunk(delta: dict, finish: Optional[str] = None, usage: Optional[dict] = None) -> str: payload = { "id": cid, "object": "chat.completion.chunk", "created": created, "model": model, "choices": [{"index": 0, "delta": delta, "finish_reason": finish}], } if usage is not None: payload["usage"] = usage return f"data: {json.dumps(payload)}\n\n" yield chunk({"role": "assistant"}) final_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0, "prompt_tokens_details": {"cached_tokens": 0}} images: list[str] = [] try: async with aclosing(turn) as t: async for evt in t: if evt["type"] == "delta": yield chunk({"content": evt["text"]}) elif evt["type"] == "reasoning": yield chunk({"reasoning_content": evt["text"]}) elif evt["type"] == "final": final_usage = evt.get("usage", final_usage) images = evt.get("images", []) or [] _persist_thread(session_dir, evt.get("thread_id")) except CodexError as e: # Branded message inside the stream (never leak the underlying engine). yield chunk({"content": _branded(str(e))}) finally: guard.release() _sync_auth_back() # Surface any generated image(s) as a trailing content chunk. for url in _stage_images(session_id, workspace, images): yield chunk({"content": f"\n\n🖼️ image: {url}"}) yield chunk({}, finish="stop") if include_usage: yield chunk({}, usage=final_usage) yield "data: [DONE]\n\n" # --------------------------------------------------------------------------- # # Files — retrieve artifacts the agent created (images, code, ...) in a session. # Only works for named sessions (X-Session-Id), whose workspace persists in /data. # --------------------------------------------------------------------------- # def _session_workspace(session_id: str) -> Path: return SESSIONS_ROOT / session_id / "workspace" @app.get("/v1/files/{session_id}") async def list_files(session_id: str, authorization: Optional[str] = Header(default=None)): _check_auth(authorization) sid = _safe_session_id(session_id) if not sid: raise HTTPException(status_code=400, detail="Invalid session id.") ws = _session_workspace(sid) files = [] if ws.exists(): for p in sorted(ws.rglob("*")): if p.is_file(): rel = p.relative_to(ws).as_posix() files.append({"name": rel, "bytes": p.stat().st_size, "url": f"/v1/files/{sid}/{rel}"}) return {"object": "list", "session": sid, "data": files} @app.get("/v1/files/{session_id}/{file_path:path}") async def get_file(session_id: str, file_path: str, authorization: Optional[str] = Header(default=None)): _check_auth(authorization) sid = _safe_session_id(session_id) if not sid: raise HTTPException(status_code=400, detail="Invalid session id.") ws = _session_workspace(sid).resolve() target = (ws / file_path).resolve() try: target.relative_to(ws) # block path traversal (../) except ValueError: raise HTTPException(status_code=403, detail="Path escapes session workspace.") if not target.is_file(): raise HTTPException(status_code=404, detail="File not found.") return FileResponse(target)