Spaces:
Running
Running
| # ============================================================================ | |
| # Codex-as-API · CLI chat for Google Colab (async streaming + commands) | |
| # USE: open https://colab.research.google.com → new notebook → paste this | |
| # WHOLE file into ONE cell → Run. Type at the "you ▸" prompt. | |
| # | |
| # Commands: | |
| # /help show commands | |
| # /image <prompt> generate an image and show it inline | |
| # /files list files this session created (downloadable) | |
| # /reset start a fresh conversation (new memory) | |
| # /session <id> switch to a named session | |
| # /exit quit | |
| # ============================================================================ | |
| !pip -q install openai nest_asyncio requests | |
| import asyncio, time, uuid, base64, re, requests, nest_asyncio | |
| from urllib.parse import urlparse | |
| from openai import AsyncOpenAI, OpenAI | |
| from IPython.display import Image as IPyImage, display | |
| nest_asyncio.apply() | |
| # ── YOUR API ──────────────────────────────────────────────────────────────── | |
| BASE_URL = "https://sarveshpatel-codex.hf.space/v1" | |
| API_KEY = "CURSEOFWITCHER" | |
| MODEL = "codex" | |
| SESSION = "colab-chat" # same string = remembers context | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # ANSI colors (render fine in Colab output) | |
| YOU, AI, DIM, ERR, OK, BOLD, R = ( | |
| "\033[96m", "\033[92m", "\033[2m", "\033[91m", "\033[93m", "\033[1m", "\033[0m") | |
| aclient = AsyncOpenAI(base_url=BASE_URL, api_key=API_KEY, timeout=600) | |
| sclient = OpenAI(base_url=BASE_URL, api_key=API_KEY, timeout=600) | |
| def _hdr(): | |
| return {"X-Session-Id": SESSION} | |
| def banner(): | |
| print(f"{BOLD}╭─────────────────────────────────────────────╮{R}") | |
| print(f"{BOLD}│ Codex-as-API · CLI chat │{R}") | |
| print(f"{BOLD}╰─────────────────────────────────────────────╯{R}") | |
| print(f"{DIM}session: {SESSION} · /help for commands · /exit to quit{R}\n") | |
| def _reasoning_of(delta): | |
| """Pull reasoning_content from a streamed delta (non-standard field).""" | |
| extra = getattr(delta, "model_extra", None) or {} | |
| return extra.get("reasoning_content") or getattr(delta, "reasoning_content", None) | |
| async def stream_reply(text: str): | |
| """Stream live: a thinking timer → reasoning (dim) → answer, then latency.""" | |
| start = time.perf_counter() | |
| got_any = asyncio.Event() | |
| async def ticker(): | |
| while not got_any.is_set(): | |
| print(f"\r{DIM} …thinking {time.perf_counter()-start:4.0f}s{R}", | |
| end="", flush=True) | |
| await asyncio.sleep(0.4) | |
| tick = asyncio.create_task(ticker()) | |
| parts, first_at, mode = [], None, None # mode: None | "reason" | "answer" | |
| def _start_output(): | |
| nonlocal first_at | |
| if not got_any.is_set(): | |
| got_any.set(); tick.cancel() | |
| first_at = time.perf_counter() - start | |
| print("\r" + " " * 32 + "\r", end="") # wipe the timer line | |
| try: | |
| stream = await aclient.chat.completions.create( | |
| model=MODEL, messages=[{"role": "user", "content": text}], | |
| stream=True, extra_headers=_hdr()) | |
| async for chunk in stream: | |
| if not chunk.choices: | |
| continue | |
| d = chunk.choices[0].delta | |
| rc = _reasoning_of(d) | |
| if rc: | |
| _start_output() | |
| if mode != "reason": | |
| mode = "reason"; print(f"{DIM}🤔 ", end="", flush=True) | |
| print(rc, end="", flush=True) | |
| continue | |
| if d.content: | |
| _start_output() | |
| if mode != "answer": | |
| if mode == "reason": | |
| print(f"{R}\n") # close the thinking block | |
| mode = "answer"; print(f"{AI}ai ▸{R} ", end="", flush=True) | |
| parts.append(d.content); print(d.content, end="", flush=True) | |
| except Exception as e: | |
| got_any.set(); tick.cancel() | |
| print(f"\r{ERR} error: {e}{R}") | |
| return "" | |
| finally: | |
| got_any.set() | |
| if not tick.done(): | |
| tick.cancel() | |
| if mode == "reason": | |
| print(R) | |
| total = time.perf_counter() - start | |
| meta = f"{first_at:.1f}s→1st · {total:.1f}s" if first_at else f"{total:.1f}s" | |
| print(f"\n{DIM} ({meta}){R}\n") | |
| return "".join(parts) | |
| _IMG_RE = re.compile(r"/v1/files/\S+?\.(?:png|jpe?g|webp|gif)", re.I) | |
| def preview_images(text: str): | |
| """If a reply references generated image URLs, fetch + show them inline.""" | |
| if not text: | |
| return | |
| p = urlparse(BASE_URL) | |
| host = f"{p.scheme}://{p.netloc}" | |
| seen = set() | |
| for path in _IMG_RE.findall(text): | |
| if path in seen: | |
| continue | |
| seen.add(path) | |
| url = path if path.startswith("http") else host + path | |
| try: | |
| r = requests.get(url, headers={"Authorization": f"Bearer {API_KEY}"}, | |
| timeout=180) | |
| if r.ok and r.content: | |
| display(IPyImage(data=r.content)); print() | |
| except Exception as e: | |
| print(f"{ERR} (couldn't load image: {e}){R}") | |
| def do_image(prompt: str): | |
| if not prompt: | |
| print(f"{ERR} usage: /image <description>{R}\n"); return | |
| print(f"{DIM} generating image (this is slow, ~1-2 min)…{R}", flush=True) | |
| t = time.perf_counter() | |
| try: | |
| r = sclient.images.generate(model=MODEL, prompt=prompt, extra_headers=_hdr()) | |
| raw = base64.b64decode(r.data[0].b64_json) | |
| print(f"{OK} ✓ image ready ({len(raw)//1024} KB, {time.perf_counter()-t:.0f}s){R}\n") | |
| display(IPyImage(data=raw)) | |
| print() | |
| except Exception as e: | |
| print(f"{ERR} image error: {e}{R}\n") | |
| def do_files(): | |
| try: | |
| resp = requests.get(f"{BASE_URL}/files/{SESSION}", | |
| headers={"Authorization": f"Bearer {API_KEY}"}, timeout=60) | |
| data = resp.json().get("data", []) | |
| if not data: | |
| print(f"{DIM} (no files in this session yet){R}\n"); return | |
| print(f"{BOLD} files in '{SESSION}':{R}") | |
| for f in data: | |
| print(f" {f['name']} {DIM}({f['bytes']//1024} KB){R}") | |
| print(f" {DIM}{BASE_URL}/files/{SESSION}/{f['name']}{R}") | |
| print() | |
| except Exception as e: | |
| print(f"{ERR} files error: {e}{R}\n") | |
| def helptext(): | |
| print(f"""{BOLD} commands:{R} | |
| /image <prompt> generate an image, show it inline | |
| /files list files this session created | |
| /reset new conversation (fresh memory) | |
| /session <id> switch session | |
| /help this help | |
| /exit quit | |
| """) | |
| def chat(): | |
| global SESSION | |
| banner() | |
| while True: | |
| try: | |
| user = input(f"{YOU}you ▸ {R}").strip() | |
| except (EOFError, KeyboardInterrupt): | |
| print(f"\n{DIM}[ended]{R}"); break | |
| if not user: | |
| continue | |
| if user in ("/exit", "/quit"): | |
| print(f"{DIM}[ended]{R}"); break | |
| if user == "/help": | |
| helptext(); continue | |
| if user == "/files": | |
| do_files(); continue | |
| if user == "/reset": | |
| SESSION = f"colab-{uuid.uuid4().hex[:8]}" | |
| print(f"{OK} ✓ new session: {SESSION}{R}\n"); continue | |
| if user.startswith("/session"): | |
| parts = user.split(maxsplit=1) | |
| if len(parts) == 2: | |
| SESSION = parts[1].strip() | |
| print(f"{OK} ✓ session: {SESSION}{R}\n") | |
| else: | |
| print(f"{DIM} current session: {SESSION}{R}\n") | |
| continue | |
| if user.startswith("/image"): | |
| do_image(user[len("/image"):].strip()); continue | |
| if user.startswith("/"): | |
| print(f"{ERR} unknown command. /help{R}\n"); continue | |
| reply = asyncio.run(stream_reply(user)) | |
| preview_images(reply) # auto-show any image the reply generated | |
| chat() | |