""" FastAPI backend — Onehands AI runtime gateway. Phase 1/2 endpoints (unchanged): GET / → service info GET /health → liveness + provider/key status POST /api/chat → JSON, non-streaming convenience endpoint POST /api/chat/stream → SSE streaming (chat OR execute, auto-routed) POST /api/execute → SSE streaming, always uses sandbox POST /api/intent → JSON, returns intent decision only POST /api/agent/stream → Multi-step planner-driven agent (SSE) GET /api/tasks → list tasks GET /api/tasks/{task_id} → get task GET /api/tasks/{task_id}/events → replay task events DELETE /api/tasks/{task_id} → delete task Phase 3 additions (Supabase + Redis + WebSocket): POST /api/conversations → create conversation GET /api/conversations → list conversations GET /api/conversations/{id}/messages → get messages DELETE /api/conversations/{id} → delete conversation GET /api/chat/sse/{conv_id} → Redis-backed SSE stream WS /ws/{room} → WebSocket room GET /api/models → list available models GET /health/keys → detailed key health """ from __future__ import annotations import asyncio import json import logging import os import time from typing import Any, AsyncGenerator, Dict, List, Optional from fastapi import FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel, Field from . import agent, intent, llm_router, tasks as task_store from . import persistence # ─── Logging ────────────────────────────────────────────────────────────────── logging.basicConfig( level=os.environ.get("LOG_LEVEL", "INFO"), format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("onehands.backend") # ─── Models ─────────────────────────────────────────────────────────────────── class ChatMessage(BaseModel): role: str = Field(..., pattern="^(system|user|assistant)$") content: str class ChatRequest(BaseModel): messages: List[ChatMessage] = Field(default_factory=list) message: Optional[str] = None force_sandbox: Optional[bool] = None sandbox_timeout: int = 300 conversation_id: Optional[str] = None user_id: str = "anonymous" preferred_provider: Optional[str] = None model: Optional[str] = None def to_messages(self) -> List[Dict[str, str]]: msgs = [m.dict() for m in self.messages] if self.message: msgs.append({"role": "user", "content": self.message}) if not msgs: raise ValueError("at least one message is required") return msgs class IntentRequest(BaseModel): message: str class AgentRequest(BaseModel): messages: List[ChatMessage] = Field(default_factory=list) message: Optional[str] = None sandbox_timeout: int = 600 max_retries_per_step: int = 2 enable_browser: bool = True task_id: Optional[str] = None def to_messages(self) -> List[Dict[str, str]]: msgs = [m.dict() for m in self.messages] if self.message: msgs.append({"role": "user", "content": self.message}) if not msgs: raise ValueError("at least one message is required") return msgs class ConversationCreate(BaseModel): user_id: str = "anonymous" title: Optional[str] = None model: str = "gemini-2.0-flash" provider: str = "gemini" # ─── App ────────────────────────────────────────────────────────────────────── app = FastAPI( title="Onehands AI Backend", version="3.0.0", description="OpenHands AI platform — LLM router + E2B + Supabase + Redis + WebSocket.", ) _allowed = os.environ.get("ALLOWED_ORIGINS", "*").strip() allowed_origins = ["*"] if _allowed == "*" else [o.strip() for o in _allowed.split(",") if o.strip()] app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ─── SSE helpers ────────────────────────────────────────────────────────────── def sse_format(event: str, data: Any) -> bytes: payload = data if isinstance(data, str) else json.dumps(data, ensure_ascii=False) return f"event: {event}\ndata: {payload}\n\n".encode("utf-8") SSE_HEADERS = { "Content-Type": "text/event-stream", "Cache-Control": "no-cache, no-transform", "Connection": "keep-alive", "X-Accel-Buffering": "no", } # ─── WebSocket Manager ──────────────────────────────────────────────────────── class WSManager: def __init__(self): self._rooms: dict[str, list[WebSocket]] = {} async def connect(self, ws: WebSocket, room: str): await ws.accept() self._rooms.setdefault(room, []).append(ws) def disconnect(self, ws: WebSocket, room: str): if room in self._rooms: self._rooms[room] = [c for c in self._rooms[room] if c is not ws] async def broadcast(self, room: str, data: dict): conns = list(self._rooms.get(room, [])) dead = [] for ws in conns: try: await ws.send_json(data) except Exception: dead.append(ws) for ws in dead: self.disconnect(ws, room) def room_count(self, room: str) -> int: return len(self._rooms.get(room, [])) ws_manager = WSManager() # ─── Startup / Shutdown ─────────────────────────────────────────────────────── @app.on_event("startup") async def _startup() -> None: # Task DB (sqlite) try: await task_store.init() except Exception as e: logger.warning("task store init failed (non-fatal): %s", e) # Supabase DB await persistence.init_db() # Redis await persistence.init_redis() @app.on_event("shutdown") async def _shutdown() -> None: await persistence.close() # ─── Routes: info + health ──────────────────────────────────────────────────── @app.get("/") async def root() -> Dict[str, Any]: return { "service": "onehands-backend", "version": "3.0.0", "status": "ok", "db": persistence.db_connected(), "redis": persistence.redis_connected(), "endpoints": [ "/health", "/health/keys", "/api/chat", "/api/chat/stream", "/api/execute", "/api/intent", "/api/agent/stream", "/api/tasks", "/api/tasks/{task_id}", "/api/tasks/{task_id}/events", "/api/conversations", "/api/conversations/{id}/messages", "/api/chat/sse/{conv_id}", "/ws/{room}", "/api/models", ], } @app.get("/health") async def health() -> Dict[str, Any]: redis_ok = await persistence.redis_ping() return { "status": "ok", "time": int(time.time()), "providers": llm_router.pool_status(), "e2b_configured": bool(os.environ.get("E2B_API_KEY")), "database": persistence.db_connected(), "redis": redis_ok, } @app.get("/health/keys") async def health_keys() -> Dict[str, Any]: return llm_router.pool_status() # ─── Routes: Phase 1/2 (unchanged) ─────────────────────────────────────────── @app.post("/api/intent") async def api_intent(req: IntentRequest) -> Dict[str, Any]: decision = await intent.detect(req.message) return { "needs_sandbox": decision.needs_sandbox, "reason": decision.reason, "confidence": decision.confidence, } @app.post("/api/chat") async def api_chat(req: ChatRequest) -> Dict[str, Any]: try: messages = req.to_messages() except ValueError as e: raise HTTPException(400, str(e)) if not any(m["role"] == "system" for m in messages): messages = [{"role": "system", "content": agent.CHAT_SYSTEM}, *messages] result = await llm_router.complete( messages, temperature=0.4, max_tokens=1024, preferred_provider=req.preferred_provider, model=req.model, ) # Persist if conv_id provided if req.conversation_id: user_msg = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "") await persistence.save_message(req.conversation_id, "user", user_msg) await persistence.save_message( req.conversation_id, "assistant", result["content"], provider=result.get("provider"), model=result.get("model") ) # Publish to Redis await persistence.publish_event(req.conversation_id, { "type": "message", "role": "assistant", "content": result["content"], }) await ws_manager.broadcast(req.conversation_id, { "type": "message", "role": "assistant", "content": result["content"], }) return { "content": result["content"], "provider": result.get("provider"), "model": result.get("model"), "conv_id": req.conversation_id, } @app.post("/api/chat/stream") async def api_chat_stream(req: ChatRequest): try: messages = req.to_messages() except ValueError as e: raise HTTPException(400, str(e)) last_user = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "") async def event_gen(): if req.force_sandbox is True: decision = intent.ExecutionIntent(True, "forced by client", 1.0) elif req.force_sandbox is False: decision = intent.ExecutionIntent(False, "forced by client", 1.0) else: decision = await intent.detect(last_user) yield sse_format("intent", { "needs_sandbox": decision.needs_sandbox, "reason": decision.reason, "confidence": decision.confidence, }) full_reply = [] provider_used = None model_used = None try: if decision.needs_sandbox: async for ev in agent.stream_execute(messages, sandbox_timeout=req.sandbox_timeout): yield sse_format(ev["type"], ev) else: async for ev in llm_router.stream_complete( messages, temperature=0.4, max_tokens=2048, preferred_provider=req.preferred_provider, model=req.model, ): yield sse_format(ev["type"], ev) if ev["type"] == "delta": full_reply.append(ev.get("content", "")) provider_used = ev.get("provider") elif ev["type"] == "done": model_used = ev.get("model") except Exception as e: logger.exception("stream error") yield sse_format("error", {"error": str(e)}) finally: yield sse_format("end", {"done": True}) # Persist if req.conversation_id and full_reply: reply_text = "".join(full_reply) await persistence.save_message(req.conversation_id, "user", last_user) await persistence.save_message( req.conversation_id, "assistant", reply_text, provider=provider_used, model=model_used ) await persistence.publish_event(req.conversation_id, { "type": "message", "role": "assistant", "content": reply_text, }) return StreamingResponse(event_gen(), headers=SSE_HEADERS) @app.post("/api/execute") async def api_execute(req: ChatRequest): try: messages = req.to_messages() except ValueError as e: raise HTTPException(400, str(e)) async def event_gen(): yield sse_format("intent", {"needs_sandbox": True, "reason": "explicit /execute"}) try: async for ev in agent.stream_execute(messages, sandbox_timeout=req.sandbox_timeout): yield sse_format(ev["type"], ev) # Persist execution events to Redis if req.conversation_id and ev["type"] in ("stdout", "stderr", "exec_result"): await persistence.publish_event(req.conversation_id, ev) except Exception as e: logger.exception("execute error") yield sse_format("error", {"error": str(e)}) finally: yield sse_format("end", {"done": True}) return StreamingResponse(event_gen(), headers=SSE_HEADERS) @app.post("/api/agent/stream") async def api_agent_stream(req: AgentRequest): try: messages = req.to_messages() except ValueError as e: raise HTTPException(400, str(e)) async def event_gen(): try: async for ev in agent.stream_agent_plan( messages, sandbox_timeout=req.sandbox_timeout, max_retries_per_step=max(0, min(req.max_retries_per_step, 4)), enable_browser=req.enable_browser, task_id=req.task_id, ): yield sse_format(ev["type"], ev) except Exception as e: logger.exception("agent stream error") yield sse_format("error", {"error": str(e)}) finally: yield sse_format("end", {"done": True}) return StreamingResponse(event_gen(), headers=SSE_HEADERS) @app.get("/api/tasks") async def api_tasks_list(limit: int = 50, state: Optional[str] = None) -> Dict[str, Any]: limit = max(1, min(limit, 200)) rows = await task_store.list_tasks(limit=limit, state=state) return {"tasks": rows, "count": len(rows)} @app.get("/api/tasks/{task_id}") async def api_task_get(task_id: str) -> Dict[str, Any]: t = await task_store.get_task(task_id) if t is None: raise HTTPException(404, "task not found") return t.to_dict() @app.get("/api/tasks/{task_id}/events") async def api_task_events(task_id: str, after: int = 0, limit: int = 500) -> Dict[str, Any]: t = await task_store.get_task(task_id) if t is None: raise HTTPException(404, "task not found") evs = await task_store.get_events(task_id, after_id=after, limit=max(1, min(limit, 2000))) return {"task": t.to_dict(include_steps=True), "events": evs} @app.delete("/api/tasks/{task_id}") async def api_task_delete(task_id: str) -> Dict[str, Any]: ok = await task_store.delete_task(task_id) if not ok: raise HTTPException(404, "task not found") return {"ok": True, "task_id": task_id} # ─── Routes: Phase 3 — Conversations (Supabase) ─────────────────────────────── @app.post("/api/conversations") async def create_conversation(data: ConversationCreate) -> Dict[str, Any]: conv = await persistence.create_conversation( user_id=data.user_id, title=data.title or "New conversation", model=data.model, provider=data.provider, ) if conv is None: raise HTTPException(503, "Database not available") return conv @app.get("/api/conversations") async def list_conversations(user_id: str = "anonymous") -> List[Dict[str, Any]]: return await persistence.list_conversations(user_id=user_id) @app.get("/api/conversations/{conv_id}/messages") async def get_messages(conv_id: str) -> List[Dict[str, Any]]: return await persistence.get_conversation_messages(conv_id) @app.delete("/api/conversations/{conv_id}") async def delete_conversation(conv_id: str) -> Dict[str, Any]: ok = await persistence.delete_conversation(conv_id) return {"deleted": ok, "conv_id": conv_id} # ─── Routes: Phase 3 — Redis SSE stream ────────────────────────────────────── @app.get("/api/chat/sse/{conv_id}") async def sse_conversation(conv_id: str, request: Request): """Redis pub/sub SSE stream for a conversation room.""" async def event_gen() -> AsyncGenerator[bytes, None]: redis = persistence.get_redis() if not redis: yield b"data: {\"error\":\"Redis not available\"}\n\n" return import redis.asyncio as aioredis pubsub = redis.pubsub() await pubsub.subscribe(f"room:{conv_id}") try: while True: if await request.is_disconnected(): break msg = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) if msg and msg["type"] == "message": yield f"data: {msg['data']}\n\n".encode() else: yield b": keepalive\n\n" await asyncio.sleep(0.05) finally: try: await pubsub.unsubscribe(f"room:{conv_id}") await pubsub.aclose() except Exception: pass return StreamingResponse( event_gen(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", "Connection": "keep-alive", }, ) # ─── Routes: Phase 3 — WebSocket ───────────────────────────────────────────── @app.websocket("/ws/{room}") async def websocket_room(ws: WebSocket, room: str): await ws_manager.connect(ws, room) logger.info("WS joined room=%s total=%d", room, ws_manager.room_count(room)) try: while True: raw = await ws.receive_text() try: msg = json.loads(raw) except Exception: msg = {"text": raw} # Broadcast to all in room await ws_manager.broadcast(room, msg) # Also publish to Redis for SSE subscribers await persistence.publish_event(room, msg) except WebSocketDisconnect: ws_manager.disconnect(ws, room) logger.info("WS left room=%s", room) except Exception as e: logger.error("WS error: %s", e) ws_manager.disconnect(ws, room) # ─── Routes: Phase 3 — Models list ──────────────────────────────────────────── @app.get("/api/models") async def list_models() -> Dict[str, Any]: return { "gemini": [ {"id": "gemini-2.0-flash", "name": "Gemini 2.0 Flash", "context": 1048576}, {"id": "gemini-2.0-flash-thinking-exp", "name": "Gemini 2.0 Flash Thinking","context": 32768}, {"id": "gemini-1.5-pro", "name": "Gemini 1.5 Pro", "context": 2097152}, {"id": "gemini-1.5-flash", "name": "Gemini 1.5 Flash", "context": 1048576}, ], "sambanova": [ {"id": "Meta-Llama-3.3-70B-Instruct", "name": "Llama 3.3 70B", "context": 131072}, {"id": "Meta-Llama-3.1-405B-Instruct", "name": "Llama 3.1 405B", "context": 16384}, {"id": "DeepSeek-R1", "name": "DeepSeek R1", "context": 32768}, {"id": "Qwen2.5-72B-Instruct", "name": "Qwen 2.5 72B", "context": 32768}, ], "github_gpt4o": [ {"id": "gpt-4o", "name": "GPT-4o", "context": 128000}, {"id": "gpt-4o-mini", "name": "GPT-4o Mini", "context": 128000}, {"id": "Meta-Llama-3.1-70B-Instruct", "name": "Llama 3.1 70B", "context": 131072}, {"id": "Mistral-large-2407", "name": "Mistral Large", "context": 131072}, ], } # ─── Entry point ────────────────────────────────────────────────────────────── if __name__ == "__main__": import uvicorn port = int(os.environ.get("PORT", "7860")) uvicorn.run(app, host="0.0.0.0", port=port)