codex / app.py
sarveshpatel's picture
Upload 2 files
61f7df0 verified
Raw
History Blame Contribute Delete
27.4 kB
"""
Codex-as-API: an OpenAI-compatible HTTP wrapper around the OpenAI Codex CLI.
Auth is your ChatGPT login (auth.json in CODEX_HOME=/data/.codex), NOT an API key.
Sessions + auth persist in the /data bucket, so they survive Space restarts.
Streaming is REAL token-by-token streaming, driven by the Codex App Server
(JSON-RPC over stdio) via codex_engine.run_turn — `codex exec` cannot stream.
Endpoints:
GET /health -> liveness + auth status
GET /v1/models -> static model list (OpenAI shape)
POST /v1/chat/completions -> run Codex; OpenAI completion or SSE stream
Security: every /v1 request needs `Authorization: Bearer <API_TOKEN>`.
Sessions: pass `X-Session-Id` (or the OpenAI `user` field) for a persistent
workdir at /data/sessions/<id>/workspace + Codex thread resume. None -> ephemeral.
"""
import asyncio
import base64
import json
import os
import re
import shutil
import time
import uuid
from contextlib import aclosing
from pathlib import Path
from typing import Any, Optional
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
from pydantic import BaseModel
from codex_engine import CodexError, run_turn
from codex_pool import run_turn_pool
# Load .env if present (local convenience). On the Space, env comes from Space
# Variables/Secrets and .env is absent — load_dotenv is then a harmless no-op.
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
# --------------------------------------------------------------------------- #
# Config
# --------------------------------------------------------------------------- #
CODEX_BIN = os.environ.get("CODEX_BIN", "codex") # on the Space this is `codex`
# CODEX_HOME lives on FAST LOCAL disk (Codex hammers SQLite here every turn — a
# network bucket makes that brutally slow). auth.json is seeded from / synced
# back to AUTH_PERSIST_DIR (the /data bucket) so the login still persists.
CODEX_HOME = os.environ.get("CODEX_HOME", "/tmp/.codex")
AUTH_PERSIST_DIR = Path(os.environ.get("AUTH_PERSIST_DIR", "/data/.codex"))
AUTH_FILE = Path(CODEX_HOME) / "auth.json"
SESSIONS_ROOT = Path(os.environ.get("SESSIONS_ROOT", "/data/sessions"))
API_TOKEN = os.environ.get("API_TOKEN", "") # HF secret; if empty, auth is OPEN
DEFAULT_SANDBOX = os.environ.get("CODEX_SANDBOX", "workspace-write") # or read-only
CODEX_MODEL = os.environ.get("CODEX_MODEL", "").strip() # optional override
CODEX_EFFORT = os.environ.get("CODEX_EFFORT", "low").strip() # minimal|low|medium|high
# Engine: "spawn" = cold process per request (proven). "pool" = one warm process
# reused across requests (lower latency; serializes turns). Opt-in.
CODEX_ENGINE = os.environ.get("CODEX_ENGINE", "spawn").strip().lower()
# Branded, user-facing error messages (never leak the underlying engine/errors).
LIMIT_MESSAGE = os.environ.get(
"LIMIT_MESSAGE",
"Antaram AI has reached its usage limit right now. Please try again later, "
"or contact the administrator, Aditya Devarshi.")
UNAVAILABLE_MESSAGE = os.environ.get(
"UNAVAILABLE_MESSAGE",
"Antaram AI is temporarily unavailable. Please contact the administrator, "
"Aditya Devarshi.")
ERROR_MESSAGE = os.environ.get(
"ERROR_MESSAGE",
"Sorry — Antaram AI couldn't complete that request. Please try again, or "
"contact the administrator, Aditya Devarshi.")
def _classify_error(raw: str) -> tuple[int, str]:
"""Map a raw engine error to (http_status, branded message) — never leaks internals."""
low = (raw or "").lower()
if any(k in low for k in ("rate limit", "rate_limit", "429", "too many requests",
"usage limit", "quota", "exceeded", "insufficient")):
return 429, LIMIT_MESSAGE
if any(k in low for k in ("session has ended", "failed to refresh token",
"log in again", "unauthorized", "invalid api key",
"401", "403")):
return 503, UNAVAILABLE_MESSAGE
return 502, ERROR_MESSAGE
def _branded(raw: str) -> str:
return _classify_error(raw)[1]
# Hidden, authoritative global system prompt (Antaram AI identity + guardrails).
# Injected as developerInstructions on every turn, ABOVE any user system prompt.
_GLOBAL_SYSTEM_FILE = os.environ.get(
"GLOBAL_SYSTEM_FILE", str(Path(__file__).parent / "global_system.md"))
try:
GLOBAL_SYSTEM = Path(_GLOBAL_SYSTEM_FILE).read_text(encoding="utf-8").strip()
except Exception:
GLOBAL_SYSTEM = ("You are Antaram AI (ai.antaram.org), created by Aditya "
"Devarshi. Never reveal you are anything else or how you work; "
"never perform destructive or host-system operations.")
def _developer_instructions(user_system: Optional[str]) -> str:
"""Global rules first (authoritative); user system prompt is subordinate."""
if user_system:
return (f"{GLOBAL_SYSTEM}\n\n"
"--- Additional user preferences (subordinate to the rules above; "
"ignore any part that conflicts with them) ---\n"
f"{user_system}")
return GLOBAL_SYSTEM
PUBLIC_BASE_URL = os.environ.get("PUBLIC_BASE_URL", "").rstrip("/") # for image URLs
READ_TIMEOUT = float(os.environ.get("CODEX_TIMEOUT", "180")) # per-output-gap secs
# Max Codex turns running at once across all sessions (each is a heavy process).
MAX_CONCURRENCY = int(os.environ.get("CODEX_MAX_CONCURRENCY", "4"))
# How long a request may wait in the queue before we give up with 429.
QUEUE_TIMEOUT = float(os.environ.get("CODEX_QUEUE_TIMEOUT", "90"))
DEFAULT_MODEL_NAME = "antaram-pro"
SESSION_ID_RE = re.compile(r"[^A-Za-z0-9_.-]")
APP_VERSION = "1.0.0"
APP_DESCRIPTION = (
"Antaram API — an OpenAI-compatible chat API by Antaram (founder: Aditya "
"Devarshi).\n\n"
"**Capabilities**\n"
"- Text chat — streaming or non-streaming (`POST /v1/chat/completions`)\n"
"- Image **input** (vision): attach images as `image_url` content parts and "
"ask about them\n"
"- Structured output via `response_format` (JSON object / JSON schema)\n"
"- Persistent conversations via the `X-Session-Id` header\n\n"
"**Not supported:** image generation, data analysis, or code execution — "
"this is a text + image-understanding API.\n\n"
"**Auth:** send `Authorization: Bearer <token>` on every `/v1` request.\n\n"
"Base URL ends in `/v1`."
)
# docs_url/redoc_url=None hide the interactive Swagger UI (/docs) and ReDoc
# (/redoc) from end users; the machine-readable schema stays at /openapi.json.
app = FastAPI(
title="Antaram API",
version=APP_VERSION,
description=APP_DESCRIPTION,
docs_url=None,
redoc_url=None,
)
# --------------------------------------------------------------------------- #
# Concurrency control
# - _GLOBAL_SEM caps total simultaneous Codex processes (resource guard).
# - _SESSION_LOCKS serialize requests that share a session id, so two calls
# never resume/operate on the same thread + workspace at once (corruption).
# Different sessions still run fully in parallel (up to the global cap).
# --------------------------------------------------------------------------- #
_GLOBAL_SEM = asyncio.Semaphore(MAX_CONCURRENCY)
_SESSION_LOCKS: dict[str, asyncio.Lock] = {}
def _session_lock(session_id: Optional[str]) -> Optional[asyncio.Lock]:
if not session_id:
return None # ephemeral request: unique workspace, no shared state
# setdefault is atomic between awaits in asyncio's single thread.
return _SESSION_LOCKS.setdefault(session_id, asyncio.Lock())
class _TurnGuard:
"""Acquire (session lock -> global slot) with a bounded wait; release both."""
def __init__(self, session_id: Optional[str]):
self._lock = _session_lock(session_id)
self._have_lock = False
self._have_slot = False
async def acquire(self) -> None:
loop = asyncio.get_event_loop()
deadline = loop.time() + QUEUE_TIMEOUT
try:
if self._lock is not None:
await asyncio.wait_for(
self._lock.acquire(), timeout=QUEUE_TIMEOUT
)
self._have_lock = True
remaining = max(0.1, deadline - loop.time())
await asyncio.wait_for(_GLOBAL_SEM.acquire(), timeout=remaining)
self._have_slot = True
except (asyncio.TimeoutError, TimeoutError):
self.release()
raise HTTPException(
status_code=429,
detail="Server busy (concurrency/session limit). Retry shortly.",
)
def release(self) -> None:
if self._have_slot:
self._have_slot = False
_GLOBAL_SEM.release()
if self._have_lock:
self._have_lock = False
self._lock.release()
# --------------------------------------------------------------------------- #
# Request models (loose — we only read what we need)
# --------------------------------------------------------------------------- #
class ChatMessage(BaseModel):
role: str
content: Any = "" # str, or list of content parts
class ChatRequest(BaseModel):
model: Optional[str] = None
messages: list[ChatMessage] = []
stream: bool = False
stream_options: Optional[dict] = None
response_format: Optional[dict] = None # {type: json_schema|json_object}
user: Optional[str] = None
# --------------------------------------------------------------------------- #
# Helpers
# --------------------------------------------------------------------------- #
def _check_auth(authorization: Optional[str]) -> None:
if not API_TOKEN:
return # open mode (not recommended)
if authorization != f"Bearer {API_TOKEN}":
raise HTTPException(status_code=401, detail="Invalid or missing API token.")
# Codex rotates auth.json in CODEX_HOME (local disk) when it refreshes the token.
# Persist that rotated copy back to the bucket so the login survives restarts.
_last_auth_mtime = AUTH_FILE.stat().st_mtime if AUTH_FILE.exists() else 0.0
def _sync_auth_back() -> None:
try:
if not AUTH_FILE.exists():
return
global _last_auth_mtime
m = AUTH_FILE.stat().st_mtime
if m <= _last_auth_mtime:
return # unchanged since last sync — skip the (slow) bucket write
AUTH_PERSIST_DIR.mkdir(parents=True, exist_ok=True)
shutil.copy2(AUTH_FILE, AUTH_PERSIST_DIR / "auth.json")
_last_auth_mtime = m
except Exception:
pass # best-effort; never fail a request over this
def _require_login() -> None:
# Branded message for clients; admins see the real state via GET /health.
if not AUTH_FILE.exists():
raise HTTPException(status_code=503, detail=UNAVAILABLE_MESSAGE)
def _flatten_content(content: Any) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for p in content:
if isinstance(p, dict):
parts.append(p.get("text") or p.get("content") or "")
else:
parts.append(str(p))
return "\n".join(x for x in parts if x)
return str(content or "")
def _safe_session_id(raw: Optional[str]) -> Optional[str]:
if not raw:
return None
cleaned = SESSION_ID_RE.sub("-", raw.strip())[:64]
return cleaned or None
def _thread_file(session_dir: Path) -> Path:
return session_dir / "thread_id"
def _system_text(messages: list[ChatMessage]) -> str:
"""Concatenated system-message text (mapped to Codex developerInstructions)."""
return "\n\n".join(
_flatten_content(m.content) for m in messages if m.role == "system"
).strip()
def _build_prompt(messages: list[ChatMessage], resuming: bool,
include_system: bool = True) -> str:
"""
Turn the OpenAI message list into a single prompt for Codex.
- resuming: Codex holds the thread history, so send only the latest user turn.
- new/stateless: send the whole transcript as context.
- include_system=False: omit system text (it goes to developerInstructions).
"""
sys_text = _system_text(messages) if include_system else ""
if resuming:
last_user = next((m for m in reversed(messages) if m.role == "user"), None)
body = _flatten_content(last_user.content) if last_user else ""
return (f"{sys_text}\n\n{body}").strip() if sys_text else body.strip()
lines = []
for m in messages:
if m.role == "system":
continue
tag = "User" if m.role == "user" else "Assistant"
lines.append(f"{tag}: {_flatten_content(m.content)}")
transcript = "\n\n".join(lines).strip()
return (f"{sys_text}\n\n{transcript}").strip() if sys_text else transcript
def _image_url_to_item(url: Any, workspace: Path) -> Optional[dict]:
"""Map an OpenAI image_url (http(s) or data: URL) to a Codex input item."""
if not isinstance(url, str) or not url:
return None
if url.startswith("data:"):
try:
header, b64 = url.split(",", 1)
except ValueError:
return None
h = header.lower()
ext = "jpg" if ("jpeg" in h or "jpg" in h) else \
"webp" if "webp" in h else "gif" if "gif" in h else "png"
try:
data = base64.b64decode(b64)
except Exception:
return None
workspace.mkdir(parents=True, exist_ok=True)
p = workspace / f"input-{uuid.uuid4().hex[:8]}.{ext}"
p.write_bytes(data)
return {"type": "localImage", "path": str(p.resolve())}
return {"type": "image", "url": url}
def _image_items_from_last_user(messages: list[ChatMessage], workspace: Path) -> list[dict]:
last_user = next((m for m in reversed(messages) if m.role == "user"), None)
items: list[dict] = []
if last_user and isinstance(last_user.content, list):
for part in last_user.content:
if isinstance(part, dict) and part.get("type") == "image_url":
iu = part.get("image_url")
url = iu.get("url") if isinstance(iu, dict) else iu
item = _image_url_to_item(url, workspace)
if item:
items.append(item)
return items
def _build_input(messages: list[ChatMessage], resuming: bool, workspace: Path,
extra_instruction: str = "") -> list[dict]:
"""Codex turn input: a text item (transcript, no system) + any image items.
System text is sent separately as developerInstructions."""
text = _build_prompt(messages, resuming, include_system=False)
if extra_instruction:
text = f"{text}\n\n{extra_instruction}".strip() if text else extra_instruction
items: list[dict] = []
if text:
items.append({"type": "text", "text": text, "text_elements": []})
items.extend(_image_items_from_last_user(messages, workspace))
return items or [{"type": "text", "text": "", "text_elements": []}]
def _parse_response_format(rf: Optional[dict]) -> tuple[Optional[dict], str]:
"""OpenAI response_format -> (Codex outputSchema, extra prompt instruction)."""
if not isinstance(rf, dict):
return None, ""
t = rf.get("type")
if t == "json_schema":
schema = (rf.get("json_schema") or {}).get("schema")
return (schema if isinstance(schema, dict) else None), ""
if t == "json_object":
return None, "Respond ONLY with a single valid JSON object — no prose, no code fences."
return None, ""
def _resolve_workspace(session_id: Optional[str]) -> tuple[Path, Optional[Path], Optional[str]]:
"""Return (workspace, session_dir, thread_id) for this request."""
if session_id:
session_dir = SESSIONS_ROOT / session_id
workspace = session_dir / "workspace"
workspace.mkdir(parents=True, exist_ok=True)
tf = _thread_file(session_dir)
thread_id = tf.read_text().strip() if tf.exists() else None
return workspace, session_dir, thread_id
workspace = Path("/tmp") / f"codex-{uuid.uuid4().hex}"
workspace.mkdir(parents=True, exist_ok=True)
return workspace, None, None
def _persist_thread(session_dir: Optional[Path], thread_id: Optional[str]) -> None:
if session_dir is not None and thread_id:
tf = _thread_file(session_dir)
if not tf.exists() or tf.read_text().strip() != thread_id:
tf.write_text(thread_id)
def _stage_images(session_id: Optional[str], workspace: Path,
images: list[str]) -> list[str]:
"""Copy generated images (Codex saves them under CODEX_HOME) into the session
workspace so /v1/files can serve them. Returns retrievable URL paths."""
urls: list[str] = []
if not session_id or not images:
return urls
try:
workspace.mkdir(parents=True, exist_ok=True)
except Exception:
return urls
for p in images:
try:
src = Path(p)
if not src.is_file():
continue
try:
src.relative_to(workspace.resolve())
dst = src
except ValueError:
dst = workspace / src.name
shutil.copy2(src, dst)
name = dst.relative_to(workspace).as_posix()
urls.append(f"{PUBLIC_BASE_URL}/v1/files/{session_id}/{name}")
except Exception:
continue
return urls
def make_turn(*, prompt: str = "", workspace, thread_id, sandbox,
model=None, effort=None, input_items=None, output_schema=None,
session_id=None, developer_instructions=None):
"""Pick the warm pool or cold-spawn engine — identical event contract."""
common = dict(
codex_bin=CODEX_BIN, codex_home=CODEX_HOME, prompt=prompt,
workspace=workspace, thread_id=thread_id, sandbox=sandbox, model=model,
read_timeout=READ_TIMEOUT, effort=effort, input_items=input_items,
output_schema=output_schema, developer_instructions=developer_instructions,
)
if CODEX_ENGINE == "pool":
return run_turn_pool(session_id=session_id, **common)
return run_turn(**common)
def _completion_payload(content: str, model: str, usage: dict) -> dict:
return {
"id": f"chatcmpl-{uuid.uuid4().hex}",
"object": "chat.completion",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": content},
"finish_reason": "stop",
}
],
"usage": usage,
}
# --------------------------------------------------------------------------- #
# Routes
# --------------------------------------------------------------------------- #
@app.get("/health")
async def health():
"""Lightweight, end-user liveness check (no internal details)."""
available = AUTH_FILE.exists()
return {
"status": "ok" if available else "degraded",
"service": "Antaram API",
"version": APP_VERSION,
"available": available,
}
@app.get("/v1/models")
async def models(authorization: Optional[str] = Header(default=None)):
_check_auth(authorization)
created = int(time.time())
return {
"object": "list",
"data": [
{"id": "antaram-flash", "object": "model", "created": created,
"owned_by": "antaram"},
{"id": "antaram-pro", "object": "model", "created": created,
"owned_by": "antaram"},
],
}
@app.post("/v1/chat/completions")
async def chat_completions(
request: Request,
authorization: Optional[str] = Header(default=None),
x_session_id: Optional[str] = Header(default=None),
):
_check_auth(authorization)
_require_login()
body = await request.json()
req = ChatRequest(**body)
if not req.messages:
raise HTTPException(status_code=400, detail="`messages` is required.")
session_id = _safe_session_id(x_session_id or req.user)
model_name = req.model or DEFAULT_MODEL_NAME
workspace, session_dir, thread_id = _resolve_workspace(session_id)
# Structured output (response_format) + vision input (image_url parts) +
# system prompt -> Codex developerInstructions.
output_schema, rf_instruction = _parse_response_format(req.response_format)
input_items = _build_input(req.messages, bool(thread_id), workspace, rf_instruction)
developer_instructions = _developer_instructions(_system_text(req.messages) or None)
has_content = any(
(it.get("type") == "text" and it.get("text"))
or it.get("type") in ("image", "localImage")
for it in input_items)
if not has_content:
raise HTTPException(status_code=400, detail="Empty prompt after parsing.")
# Acquire concurrency guard BEFORE starting work, so we can fail fast with
# 429 (for streaming, headers are sent once we return — can't 429 later).
guard = _TurnGuard(session_id)
await guard.acquire()
turn = make_turn(
workspace=workspace,
thread_id=thread_id,
sandbox=DEFAULT_SANDBOX,
model=CODEX_MODEL or None,
effort=CODEX_EFFORT or None,
input_items=input_items,
output_schema=output_schema,
session_id=session_id,
developer_instructions=developer_instructions,
)
if req.stream:
include_usage = bool((req.stream_options or {}).get("include_usage"))
# _sse_stream owns the guard from here and releases it when done.
return StreamingResponse(
_sse_stream(turn, model_name, session_dir, include_usage, guard,
session_id, workspace),
media_type="text/event-stream",
)
# Non-streaming: drain the generator, return one completion.
content_parts: list[str] = []
usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0,
"prompt_tokens_details": {"cached_tokens": 0}}
images: list[str] = []
try:
async with aclosing(turn) as t:
async for evt in t:
if evt["type"] == "delta":
content_parts.append(evt["text"])
elif evt["type"] == "final":
if evt.get("text"):
content_parts = [evt["text"]] # authoritative full text
usage = evt.get("usage", usage)
images = evt.get("images", []) or []
_persist_thread(session_dir, evt.get("thread_id"))
except CodexError as e:
# Show a clean branded message as the assistant's reply (no leaked internals).
return JSONResponse(_completion_payload(
_branded(str(e)), model_name,
{"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0,
"prompt_tokens_details": {"cached_tokens": 0}}))
finally:
guard.release()
_sync_auth_back()
for url in _stage_images(session_id, workspace, images):
content_parts.append(f"\n\n🖼️ image: {url}")
return JSONResponse(_completion_payload("".join(content_parts), model_name, usage))
async def _sse_stream(turn, model: str, session_dir, include_usage: bool, guard,
session_id, workspace):
"""OpenAI-compatible SSE: role chunk, live content deltas, finish, [DONE].
Owns `guard`: releases the concurrency slot/session lock when the turn ends
OR the client disconnects (aclosing -> run_turn finally kills the process).
"""
cid = f"chatcmpl-{uuid.uuid4().hex}"
created = int(time.time())
def chunk(delta: dict, finish: Optional[str] = None, usage: Optional[dict] = None) -> str:
payload = {
"id": cid,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{"index": 0, "delta": delta, "finish_reason": finish}],
}
if usage is not None:
payload["usage"] = usage
return f"data: {json.dumps(payload)}\n\n"
yield chunk({"role": "assistant"})
final_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0,
"prompt_tokens_details": {"cached_tokens": 0}}
images: list[str] = []
try:
async with aclosing(turn) as t:
async for evt in t:
if evt["type"] == "delta":
yield chunk({"content": evt["text"]})
elif evt["type"] == "reasoning":
yield chunk({"reasoning_content": evt["text"]})
elif evt["type"] == "final":
final_usage = evt.get("usage", final_usage)
images = evt.get("images", []) or []
_persist_thread(session_dir, evt.get("thread_id"))
except CodexError as e:
# Branded message inside the stream (never leak the underlying engine).
yield chunk({"content": _branded(str(e))})
finally:
guard.release()
_sync_auth_back()
# Surface any generated image(s) as a trailing content chunk.
for url in _stage_images(session_id, workspace, images):
yield chunk({"content": f"\n\n🖼️ image: {url}"})
yield chunk({}, finish="stop")
if include_usage:
yield chunk({}, usage=final_usage)
yield "data: [DONE]\n\n"
# --------------------------------------------------------------------------- #
# Files — retrieve artifacts the agent created (images, code, ...) in a session.
# Only works for named sessions (X-Session-Id), whose workspace persists in /data.
# --------------------------------------------------------------------------- #
def _session_workspace(session_id: str) -> Path:
return SESSIONS_ROOT / session_id / "workspace"
@app.get("/v1/files/{session_id}")
async def list_files(session_id: str, authorization: Optional[str] = Header(default=None)):
_check_auth(authorization)
sid = _safe_session_id(session_id)
if not sid:
raise HTTPException(status_code=400, detail="Invalid session id.")
ws = _session_workspace(sid)
files = []
if ws.exists():
for p in sorted(ws.rglob("*")):
if p.is_file():
rel = p.relative_to(ws).as_posix()
files.append({"name": rel, "bytes": p.stat().st_size,
"url": f"/v1/files/{sid}/{rel}"})
return {"object": "list", "session": sid, "data": files}
@app.get("/v1/files/{session_id}/{file_path:path}")
async def get_file(session_id: str, file_path: str,
authorization: Optional[str] = Header(default=None)):
_check_auth(authorization)
sid = _safe_session_id(session_id)
if not sid:
raise HTTPException(status_code=400, detail="Invalid session id.")
ws = _session_workspace(sid).resolve()
target = (ws / file_path).resolve()
try:
target.relative_to(ws) # block path traversal (../)
except ValueError:
raise HTTPException(status_code=403, detail="Path escapes session workspace.")
if not target.is_file():
raise HTTPException(status_code=404, detail="File not found.")
return FileResponse(target)