PYAE1994
πŸš€ Phase 3: Supabase + Redis SSE + WebSocket + enhanced smart routing (6 Gemini / 9 SambaNova / 9 GitHub keys)
9b18003
"""
FastAPI backend β€” Onehands AI runtime gateway.
Phase 1/2 endpoints (unchanged):
GET / β†’ service info
GET /health β†’ liveness + provider/key status
POST /api/chat β†’ JSON, non-streaming convenience endpoint
POST /api/chat/stream β†’ SSE streaming (chat OR execute, auto-routed)
POST /api/execute β†’ SSE streaming, always uses sandbox
POST /api/intent β†’ JSON, returns intent decision only
POST /api/agent/stream β†’ Multi-step planner-driven agent (SSE)
GET /api/tasks β†’ list tasks
GET /api/tasks/{task_id} β†’ get task
GET /api/tasks/{task_id}/events β†’ replay task events
DELETE /api/tasks/{task_id} β†’ delete task
Phase 3 additions (Supabase + Redis + WebSocket):
POST /api/conversations β†’ create conversation
GET /api/conversations β†’ list conversations
GET /api/conversations/{id}/messages β†’ get messages
DELETE /api/conversations/{id} β†’ delete conversation
GET /api/chat/sse/{conv_id} β†’ Redis-backed SSE stream
WS /ws/{room} β†’ WebSocket room
GET /api/models β†’ list available models
GET /health/keys β†’ detailed key health
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import time
from typing import Any, AsyncGenerator, Dict, List, Optional
from fastapi import FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
from . import agent, intent, llm_router, tasks as task_store
from . import persistence
# ─── Logging ──────────────────────────────────────────────────────────────────
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO"),
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("onehands.backend")
# ─── Models ───────────────────────────────────────────────────────────────────
class ChatMessage(BaseModel):
role: str = Field(..., pattern="^(system|user|assistant)$")
content: str
class ChatRequest(BaseModel):
messages: List[ChatMessage] = Field(default_factory=list)
message: Optional[str] = None
force_sandbox: Optional[bool] = None
sandbox_timeout: int = 300
conversation_id: Optional[str] = None
user_id: str = "anonymous"
preferred_provider: Optional[str] = None
model: Optional[str] = None
def to_messages(self) -> List[Dict[str, str]]:
msgs = [m.dict() for m in self.messages]
if self.message:
msgs.append({"role": "user", "content": self.message})
if not msgs:
raise ValueError("at least one message is required")
return msgs
class IntentRequest(BaseModel):
message: str
class AgentRequest(BaseModel):
messages: List[ChatMessage] = Field(default_factory=list)
message: Optional[str] = None
sandbox_timeout: int = 600
max_retries_per_step: int = 2
enable_browser: bool = True
task_id: Optional[str] = None
def to_messages(self) -> List[Dict[str, str]]:
msgs = [m.dict() for m in self.messages]
if self.message:
msgs.append({"role": "user", "content": self.message})
if not msgs:
raise ValueError("at least one message is required")
return msgs
class ConversationCreate(BaseModel):
user_id: str = "anonymous"
title: Optional[str] = None
model: str = "gemini-2.0-flash"
provider: str = "gemini"
# ─── App ──────────────────────────────────────────────────────────────────────
app = FastAPI(
title="Onehands AI Backend",
version="3.0.0",
description="OpenHands AI platform β€” LLM router + E2B + Supabase + Redis + WebSocket.",
)
_allowed = os.environ.get("ALLOWED_ORIGINS", "*").strip()
allowed_origins = ["*"] if _allowed == "*" else [o.strip() for o in _allowed.split(",") if o.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ─── SSE helpers ──────────────────────────────────────────────────────────────
def sse_format(event: str, data: Any) -> bytes:
payload = data if isinstance(data, str) else json.dumps(data, ensure_ascii=False)
return f"event: {event}\ndata: {payload}\n\n".encode("utf-8")
SSE_HEADERS = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
# ─── WebSocket Manager ────────────────────────────────────────────────────────
class WSManager:
def __init__(self):
self._rooms: dict[str, list[WebSocket]] = {}
async def connect(self, ws: WebSocket, room: str):
await ws.accept()
self._rooms.setdefault(room, []).append(ws)
def disconnect(self, ws: WebSocket, room: str):
if room in self._rooms:
self._rooms[room] = [c for c in self._rooms[room] if c is not ws]
async def broadcast(self, room: str, data: dict):
conns = list(self._rooms.get(room, []))
dead = []
for ws in conns:
try:
await ws.send_json(data)
except Exception:
dead.append(ws)
for ws in dead:
self.disconnect(ws, room)
def room_count(self, room: str) -> int:
return len(self._rooms.get(room, []))
ws_manager = WSManager()
# ─── Startup / Shutdown ───────────────────────────────────────────────────────
@app.on_event("startup")
async def _startup() -> None:
# Task DB (sqlite)
try:
await task_store.init()
except Exception as e:
logger.warning("task store init failed (non-fatal): %s", e)
# Supabase DB
await persistence.init_db()
# Redis
await persistence.init_redis()
@app.on_event("shutdown")
async def _shutdown() -> None:
await persistence.close()
# ─── Routes: info + health ────────────────────────────────────────────────────
@app.get("/")
async def root() -> Dict[str, Any]:
return {
"service": "onehands-backend",
"version": "3.0.0",
"status": "ok",
"db": persistence.db_connected(),
"redis": persistence.redis_connected(),
"endpoints": [
"/health", "/health/keys",
"/api/chat", "/api/chat/stream", "/api/execute", "/api/intent",
"/api/agent/stream",
"/api/tasks", "/api/tasks/{task_id}", "/api/tasks/{task_id}/events",
"/api/conversations", "/api/conversations/{id}/messages",
"/api/chat/sse/{conv_id}",
"/ws/{room}",
"/api/models",
],
}
@app.get("/health")
async def health() -> Dict[str, Any]:
redis_ok = await persistence.redis_ping()
return {
"status": "ok",
"time": int(time.time()),
"providers": llm_router.pool_status(),
"e2b_configured": bool(os.environ.get("E2B_API_KEY")),
"database": persistence.db_connected(),
"redis": redis_ok,
}
@app.get("/health/keys")
async def health_keys() -> Dict[str, Any]:
return llm_router.pool_status()
# ─── Routes: Phase 1/2 (unchanged) ───────────────────────────────────────────
@app.post("/api/intent")
async def api_intent(req: IntentRequest) -> Dict[str, Any]:
decision = await intent.detect(req.message)
return {
"needs_sandbox": decision.needs_sandbox,
"reason": decision.reason,
"confidence": decision.confidence,
}
@app.post("/api/chat")
async def api_chat(req: ChatRequest) -> Dict[str, Any]:
try:
messages = req.to_messages()
except ValueError as e:
raise HTTPException(400, str(e))
if not any(m["role"] == "system" for m in messages):
messages = [{"role": "system", "content": agent.CHAT_SYSTEM}, *messages]
result = await llm_router.complete(
messages,
temperature=0.4,
max_tokens=1024,
preferred_provider=req.preferred_provider,
model=req.model,
)
# Persist if conv_id provided
if req.conversation_id:
user_msg = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "")
await persistence.save_message(req.conversation_id, "user", user_msg)
await persistence.save_message(
req.conversation_id, "assistant", result["content"],
provider=result.get("provider"), model=result.get("model")
)
# Publish to Redis
await persistence.publish_event(req.conversation_id, {
"type": "message",
"role": "assistant",
"content": result["content"],
})
await ws_manager.broadcast(req.conversation_id, {
"type": "message",
"role": "assistant",
"content": result["content"],
})
return {
"content": result["content"],
"provider": result.get("provider"),
"model": result.get("model"),
"conv_id": req.conversation_id,
}
@app.post("/api/chat/stream")
async def api_chat_stream(req: ChatRequest):
try:
messages = req.to_messages()
except ValueError as e:
raise HTTPException(400, str(e))
last_user = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "")
async def event_gen():
if req.force_sandbox is True:
decision = intent.ExecutionIntent(True, "forced by client", 1.0)
elif req.force_sandbox is False:
decision = intent.ExecutionIntent(False, "forced by client", 1.0)
else:
decision = await intent.detect(last_user)
yield sse_format("intent", {
"needs_sandbox": decision.needs_sandbox,
"reason": decision.reason,
"confidence": decision.confidence,
})
full_reply = []
provider_used = None
model_used = None
try:
if decision.needs_sandbox:
async for ev in agent.stream_execute(messages, sandbox_timeout=req.sandbox_timeout):
yield sse_format(ev["type"], ev)
else:
async for ev in llm_router.stream_complete(
messages,
temperature=0.4,
max_tokens=2048,
preferred_provider=req.preferred_provider,
model=req.model,
):
yield sse_format(ev["type"], ev)
if ev["type"] == "delta":
full_reply.append(ev.get("content", ""))
provider_used = ev.get("provider")
elif ev["type"] == "done":
model_used = ev.get("model")
except Exception as e:
logger.exception("stream error")
yield sse_format("error", {"error": str(e)})
finally:
yield sse_format("end", {"done": True})
# Persist
if req.conversation_id and full_reply:
reply_text = "".join(full_reply)
await persistence.save_message(req.conversation_id, "user", last_user)
await persistence.save_message(
req.conversation_id, "assistant", reply_text,
provider=provider_used, model=model_used
)
await persistence.publish_event(req.conversation_id, {
"type": "message",
"role": "assistant",
"content": reply_text,
})
return StreamingResponse(event_gen(), headers=SSE_HEADERS)
@app.post("/api/execute")
async def api_execute(req: ChatRequest):
try:
messages = req.to_messages()
except ValueError as e:
raise HTTPException(400, str(e))
async def event_gen():
yield sse_format("intent", {"needs_sandbox": True, "reason": "explicit /execute"})
try:
async for ev in agent.stream_execute(messages, sandbox_timeout=req.sandbox_timeout):
yield sse_format(ev["type"], ev)
# Persist execution events to Redis
if req.conversation_id and ev["type"] in ("stdout", "stderr", "exec_result"):
await persistence.publish_event(req.conversation_id, ev)
except Exception as e:
logger.exception("execute error")
yield sse_format("error", {"error": str(e)})
finally:
yield sse_format("end", {"done": True})
return StreamingResponse(event_gen(), headers=SSE_HEADERS)
@app.post("/api/agent/stream")
async def api_agent_stream(req: AgentRequest):
try:
messages = req.to_messages()
except ValueError as e:
raise HTTPException(400, str(e))
async def event_gen():
try:
async for ev in agent.stream_agent_plan(
messages,
sandbox_timeout=req.sandbox_timeout,
max_retries_per_step=max(0, min(req.max_retries_per_step, 4)),
enable_browser=req.enable_browser,
task_id=req.task_id,
):
yield sse_format(ev["type"], ev)
except Exception as e:
logger.exception("agent stream error")
yield sse_format("error", {"error": str(e)})
finally:
yield sse_format("end", {"done": True})
return StreamingResponse(event_gen(), headers=SSE_HEADERS)
@app.get("/api/tasks")
async def api_tasks_list(limit: int = 50, state: Optional[str] = None) -> Dict[str, Any]:
limit = max(1, min(limit, 200))
rows = await task_store.list_tasks(limit=limit, state=state)
return {"tasks": rows, "count": len(rows)}
@app.get("/api/tasks/{task_id}")
async def api_task_get(task_id: str) -> Dict[str, Any]:
t = await task_store.get_task(task_id)
if t is None:
raise HTTPException(404, "task not found")
return t.to_dict()
@app.get("/api/tasks/{task_id}/events")
async def api_task_events(task_id: str, after: int = 0, limit: int = 500) -> Dict[str, Any]:
t = await task_store.get_task(task_id)
if t is None:
raise HTTPException(404, "task not found")
evs = await task_store.get_events(task_id, after_id=after, limit=max(1, min(limit, 2000)))
return {"task": t.to_dict(include_steps=True), "events": evs}
@app.delete("/api/tasks/{task_id}")
async def api_task_delete(task_id: str) -> Dict[str, Any]:
ok = await task_store.delete_task(task_id)
if not ok:
raise HTTPException(404, "task not found")
return {"ok": True, "task_id": task_id}
# ─── Routes: Phase 3 β€” Conversations (Supabase) ───────────────────────────────
@app.post("/api/conversations")
async def create_conversation(data: ConversationCreate) -> Dict[str, Any]:
conv = await persistence.create_conversation(
user_id=data.user_id,
title=data.title or "New conversation",
model=data.model,
provider=data.provider,
)
if conv is None:
raise HTTPException(503, "Database not available")
return conv
@app.get("/api/conversations")
async def list_conversations(user_id: str = "anonymous") -> List[Dict[str, Any]]:
return await persistence.list_conversations(user_id=user_id)
@app.get("/api/conversations/{conv_id}/messages")
async def get_messages(conv_id: str) -> List[Dict[str, Any]]:
return await persistence.get_conversation_messages(conv_id)
@app.delete("/api/conversations/{conv_id}")
async def delete_conversation(conv_id: str) -> Dict[str, Any]:
ok = await persistence.delete_conversation(conv_id)
return {"deleted": ok, "conv_id": conv_id}
# ─── Routes: Phase 3 β€” Redis SSE stream ──────────────────────────────────────
@app.get("/api/chat/sse/{conv_id}")
async def sse_conversation(conv_id: str, request: Request):
"""Redis pub/sub SSE stream for a conversation room."""
async def event_gen() -> AsyncGenerator[bytes, None]:
redis = persistence.get_redis()
if not redis:
yield b"data: {\"error\":\"Redis not available\"}\n\n"
return
import redis.asyncio as aioredis
pubsub = redis.pubsub()
await pubsub.subscribe(f"room:{conv_id}")
try:
while True:
if await request.is_disconnected():
break
msg = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if msg and msg["type"] == "message":
yield f"data: {msg['data']}\n\n".encode()
else:
yield b": keepalive\n\n"
await asyncio.sleep(0.05)
finally:
try:
await pubsub.unsubscribe(f"room:{conv_id}")
await pubsub.aclose()
except Exception:
pass
return StreamingResponse(
event_gen(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)
# ─── Routes: Phase 3 β€” WebSocket ─────────────────────────────────────────────
@app.websocket("/ws/{room}")
async def websocket_room(ws: WebSocket, room: str):
await ws_manager.connect(ws, room)
logger.info("WS joined room=%s total=%d", room, ws_manager.room_count(room))
try:
while True:
raw = await ws.receive_text()
try:
msg = json.loads(raw)
except Exception:
msg = {"text": raw}
# Broadcast to all in room
await ws_manager.broadcast(room, msg)
# Also publish to Redis for SSE subscribers
await persistence.publish_event(room, msg)
except WebSocketDisconnect:
ws_manager.disconnect(ws, room)
logger.info("WS left room=%s", room)
except Exception as e:
logger.error("WS error: %s", e)
ws_manager.disconnect(ws, room)
# ─── Routes: Phase 3 β€” Models list ────────────────────────────────────────────
@app.get("/api/models")
async def list_models() -> Dict[str, Any]:
return {
"gemini": [
{"id": "gemini-2.0-flash", "name": "Gemini 2.0 Flash", "context": 1048576},
{"id": "gemini-2.0-flash-thinking-exp", "name": "Gemini 2.0 Flash Thinking","context": 32768},
{"id": "gemini-1.5-pro", "name": "Gemini 1.5 Pro", "context": 2097152},
{"id": "gemini-1.5-flash", "name": "Gemini 1.5 Flash", "context": 1048576},
],
"sambanova": [
{"id": "Meta-Llama-3.3-70B-Instruct", "name": "Llama 3.3 70B", "context": 131072},
{"id": "Meta-Llama-3.1-405B-Instruct", "name": "Llama 3.1 405B", "context": 16384},
{"id": "DeepSeek-R1", "name": "DeepSeek R1", "context": 32768},
{"id": "Qwen2.5-72B-Instruct", "name": "Qwen 2.5 72B", "context": 32768},
],
"github_gpt4o": [
{"id": "gpt-4o", "name": "GPT-4o", "context": 128000},
{"id": "gpt-4o-mini", "name": "GPT-4o Mini", "context": 128000},
{"id": "Meta-Llama-3.1-70B-Instruct", "name": "Llama 3.1 70B", "context": 131072},
{"id": "Mistral-large-2407", "name": "Mistral Large", "context": 131072},
],
}
# ─── Entry point ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", "7860"))
uvicorn.run(app, host="0.0.0.0", port=port)