import asyncio import os import time import orjson from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse # ---------------------------- # App # ---------------------------- app = FastAPI(title="Dummy Python AI", version="1.0.0") START_TS = time.time() def j(obj) -> str: # Fast JSON dumps return orjson.dumps(obj).decode("utf-8") @app.get("/health") def health(): return JSONResponse({ "ok": True, "service": "dummy-ai", "uptime_sec": round(time.time() - START_TS, 2) }) # ---------------------------- # WebSocket Protocol # Client -> Server: # {"type":"task", "text":"..."} # start a task # {"type":"telemetry", "cpu":.., "mem":..} # periodic telemetry # {"type":"cancel"} # cancel current stream # # Server -> Client: # {"type":"ready"} # once on connect # {"type":"log","msg":"..."} # log line # {"type":"token","text":"..." } # streaming token # {"type":"say","text":"..."} # client should speak this ASAP # {"type":"done","result":"..."} # task completed # {"type":"error","msg":"..."} # error # ---------------------------- @app.websocket("/ws/ai") async def ws_ai(ws: WebSocket): await ws.accept() await ws.send_text(j({"type": "ready", "msg": "Dummy AI online"})) current_task = None current_cancel = asyncio.Event() async def stream_dummy_answer(prompt: str): """Stream a staged, convincing dummy answer with tokens and say-cues.""" try: # 1) acknowledge await ws.send_text(j({"type":"log","msg":f"Received task: {prompt[:120]}"})) await asyncio.sleep(0.2) # 2) "thinking…" (simulate tool use / chain-of-thought without revealing it) phases = [ "Analyzing your request", "Planning steps", "Executing subtask 1", "Executing subtask 2", "Compiling results" ] for ph in phases: if current_cancel.is_set(): return await ws.send_text(j({"type":"log","msg":ph})) await asyncio.sleep(0.35) # 3) start streaming an answer token-by-token answer = ( "Sure — here’s a dummy streamed response to verify your end-to-end pipeline. " "I’m emitting short tokens so your client UI can show them live, " "and your TTS can speak them as they arrive." ) # also ask client to speak a "lead in" immediately await ws.send_text(j({"type":"say","text":"Starting response."})) for token in answer.split(" "): if current_cancel.is_set(): return await ws.send_text(j({"type":"token","text":token + " "})) await asyncio.sleep(0.06) # controls stream cadence if current_cancel.is_set(): return await asyncio.sleep(0.15) await ws.send_text(j({"type":"say","text":"Response complete."})) await ws.send_text(j({"type":"done","result":"OK"})) except Exception as e: await ws.send_text(j({"type":"error","msg":str(e)})) try: while True: raw = await ws.receive_text() try: msg = orjson.loads(raw) except Exception: await ws.send_text(j({"type":"error","msg":"Invalid JSON"})) continue mtype = msg.get("type") if mtype == "telemetry": # best-effort log await ws.send_text(j({ "type":"log", "msg": f"Telemetry cpu={msg.get('cpu')} mem={msg.get('mem')} active={msg.get('active_window')}" })) continue if mtype == "cancel": current_cancel.set() await ws.send_text(j({"type":"log","msg":"Cancel requested"})) continue if mtype == "task": # cancel any ongoing stream if current_task and not current_task.done(): current_cancel.set() with contextlib.suppress(asyncio.CancelledError): current_task.cancel() current_task = None await asyncio.sleep(0.05) # reset cancel flag current_cancel = asyncio.Event() prompt = str(msg.get("text", "")).strip() or "(empty)" current_task = asyncio.create_task(stream_dummy_answer(prompt)) continue await ws.send_text(j({"type":"error","msg":f"Unknown message type '{mtype}'"})) except WebSocketDisconnect: # client left return except Exception as e: try: await ws.send_text(j({"type":"error","msg":str(e)})) finally: return # ------------- Local run ------------- if _name_ == "_main_": import uvicorn, contextlib uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "7860")))