Spaces:
Sleeping
Sleeping
PYAE1994
π Phase 3: Supabase + Redis SSE + WebSocket + enhanced smart routing (6 Gemini / 9 SambaNova / 9 GitHub keys)
9b18003 | """ | |
| FastAPI backend β Onehands AI runtime gateway. | |
| Phase 1/2 endpoints (unchanged): | |
| GET / β service info | |
| GET /health β liveness + provider/key status | |
| POST /api/chat β JSON, non-streaming convenience endpoint | |
| POST /api/chat/stream β SSE streaming (chat OR execute, auto-routed) | |
| POST /api/execute β SSE streaming, always uses sandbox | |
| POST /api/intent β JSON, returns intent decision only | |
| POST /api/agent/stream β Multi-step planner-driven agent (SSE) | |
| GET /api/tasks β list tasks | |
| GET /api/tasks/{task_id} β get task | |
| GET /api/tasks/{task_id}/events β replay task events | |
| DELETE /api/tasks/{task_id} β delete task | |
| Phase 3 additions (Supabase + Redis + WebSocket): | |
| POST /api/conversations β create conversation | |
| GET /api/conversations β list conversations | |
| GET /api/conversations/{id}/messages β get messages | |
| DELETE /api/conversations/{id} β delete conversation | |
| GET /api/chat/sse/{conv_id} β Redis-backed SSE stream | |
| WS /ws/{room} β WebSocket room | |
| GET /api/models β list available models | |
| GET /health/keys β detailed key health | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from typing import Any, AsyncGenerator, Dict, List, Optional | |
| from fastapi import FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from pydantic import BaseModel, Field | |
| from . import agent, intent, llm_router, tasks as task_store | |
| from . import persistence | |
| # βββ Logging ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level=os.environ.get("LOG_LEVEL", "INFO"), | |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| ) | |
| logger = logging.getLogger("onehands.backend") | |
| # βββ Models βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ChatMessage(BaseModel): | |
| role: str = Field(..., pattern="^(system|user|assistant)$") | |
| content: str | |
| class ChatRequest(BaseModel): | |
| messages: List[ChatMessage] = Field(default_factory=list) | |
| message: Optional[str] = None | |
| force_sandbox: Optional[bool] = None | |
| sandbox_timeout: int = 300 | |
| conversation_id: Optional[str] = None | |
| user_id: str = "anonymous" | |
| preferred_provider: Optional[str] = None | |
| model: Optional[str] = None | |
| def to_messages(self) -> List[Dict[str, str]]: | |
| msgs = [m.dict() for m in self.messages] | |
| if self.message: | |
| msgs.append({"role": "user", "content": self.message}) | |
| if not msgs: | |
| raise ValueError("at least one message is required") | |
| return msgs | |
| class IntentRequest(BaseModel): | |
| message: str | |
| class AgentRequest(BaseModel): | |
| messages: List[ChatMessage] = Field(default_factory=list) | |
| message: Optional[str] = None | |
| sandbox_timeout: int = 600 | |
| max_retries_per_step: int = 2 | |
| enable_browser: bool = True | |
| task_id: Optional[str] = None | |
| def to_messages(self) -> List[Dict[str, str]]: | |
| msgs = [m.dict() for m in self.messages] | |
| if self.message: | |
| msgs.append({"role": "user", "content": self.message}) | |
| if not msgs: | |
| raise ValueError("at least one message is required") | |
| return msgs | |
| class ConversationCreate(BaseModel): | |
| user_id: str = "anonymous" | |
| title: Optional[str] = None | |
| model: str = "gemini-2.0-flash" | |
| provider: str = "gemini" | |
| # βββ App ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI( | |
| title="Onehands AI Backend", | |
| version="3.0.0", | |
| description="OpenHands AI platform β LLM router + E2B + Supabase + Redis + WebSocket.", | |
| ) | |
| _allowed = os.environ.get("ALLOWED_ORIGINS", "*").strip() | |
| allowed_origins = ["*"] if _allowed == "*" else [o.strip() for o in _allowed.split(",") if o.strip()] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=allowed_origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # βββ SSE helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def sse_format(event: str, data: Any) -> bytes: | |
| payload = data if isinstance(data, str) else json.dumps(data, ensure_ascii=False) | |
| return f"event: {event}\ndata: {payload}\n\n".encode("utf-8") | |
| SSE_HEADERS = { | |
| "Content-Type": "text/event-stream", | |
| "Cache-Control": "no-cache, no-transform", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| } | |
| # βββ WebSocket Manager ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class WSManager: | |
| def __init__(self): | |
| self._rooms: dict[str, list[WebSocket]] = {} | |
| async def connect(self, ws: WebSocket, room: str): | |
| await ws.accept() | |
| self._rooms.setdefault(room, []).append(ws) | |
| def disconnect(self, ws: WebSocket, room: str): | |
| if room in self._rooms: | |
| self._rooms[room] = [c for c in self._rooms[room] if c is not ws] | |
| async def broadcast(self, room: str, data: dict): | |
| conns = list(self._rooms.get(room, [])) | |
| dead = [] | |
| for ws in conns: | |
| try: | |
| await ws.send_json(data) | |
| except Exception: | |
| dead.append(ws) | |
| for ws in dead: | |
| self.disconnect(ws, room) | |
| def room_count(self, room: str) -> int: | |
| return len(self._rooms.get(room, [])) | |
| ws_manager = WSManager() | |
| # βββ Startup / Shutdown βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def _startup() -> None: | |
| # Task DB (sqlite) | |
| try: | |
| await task_store.init() | |
| except Exception as e: | |
| logger.warning("task store init failed (non-fatal): %s", e) | |
| # Supabase DB | |
| await persistence.init_db() | |
| # Redis | |
| await persistence.init_redis() | |
| async def _shutdown() -> None: | |
| await persistence.close() | |
| # βββ Routes: info + health ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def root() -> Dict[str, Any]: | |
| return { | |
| "service": "onehands-backend", | |
| "version": "3.0.0", | |
| "status": "ok", | |
| "db": persistence.db_connected(), | |
| "redis": persistence.redis_connected(), | |
| "endpoints": [ | |
| "/health", "/health/keys", | |
| "/api/chat", "/api/chat/stream", "/api/execute", "/api/intent", | |
| "/api/agent/stream", | |
| "/api/tasks", "/api/tasks/{task_id}", "/api/tasks/{task_id}/events", | |
| "/api/conversations", "/api/conversations/{id}/messages", | |
| "/api/chat/sse/{conv_id}", | |
| "/ws/{room}", | |
| "/api/models", | |
| ], | |
| } | |
| async def health() -> Dict[str, Any]: | |
| redis_ok = await persistence.redis_ping() | |
| return { | |
| "status": "ok", | |
| "time": int(time.time()), | |
| "providers": llm_router.pool_status(), | |
| "e2b_configured": bool(os.environ.get("E2B_API_KEY")), | |
| "database": persistence.db_connected(), | |
| "redis": redis_ok, | |
| } | |
| async def health_keys() -> Dict[str, Any]: | |
| return llm_router.pool_status() | |
| # βββ Routes: Phase 1/2 (unchanged) βββββββββββββββββββββββββββββββββββββββββββ | |
| async def api_intent(req: IntentRequest) -> Dict[str, Any]: | |
| decision = await intent.detect(req.message) | |
| return { | |
| "needs_sandbox": decision.needs_sandbox, | |
| "reason": decision.reason, | |
| "confidence": decision.confidence, | |
| } | |
| async def api_chat(req: ChatRequest) -> Dict[str, Any]: | |
| try: | |
| messages = req.to_messages() | |
| except ValueError as e: | |
| raise HTTPException(400, str(e)) | |
| if not any(m["role"] == "system" for m in messages): | |
| messages = [{"role": "system", "content": agent.CHAT_SYSTEM}, *messages] | |
| result = await llm_router.complete( | |
| messages, | |
| temperature=0.4, | |
| max_tokens=1024, | |
| preferred_provider=req.preferred_provider, | |
| model=req.model, | |
| ) | |
| # Persist if conv_id provided | |
| if req.conversation_id: | |
| user_msg = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "") | |
| await persistence.save_message(req.conversation_id, "user", user_msg) | |
| await persistence.save_message( | |
| req.conversation_id, "assistant", result["content"], | |
| provider=result.get("provider"), model=result.get("model") | |
| ) | |
| # Publish to Redis | |
| await persistence.publish_event(req.conversation_id, { | |
| "type": "message", | |
| "role": "assistant", | |
| "content": result["content"], | |
| }) | |
| await ws_manager.broadcast(req.conversation_id, { | |
| "type": "message", | |
| "role": "assistant", | |
| "content": result["content"], | |
| }) | |
| return { | |
| "content": result["content"], | |
| "provider": result.get("provider"), | |
| "model": result.get("model"), | |
| "conv_id": req.conversation_id, | |
| } | |
| async def api_chat_stream(req: ChatRequest): | |
| try: | |
| messages = req.to_messages() | |
| except ValueError as e: | |
| raise HTTPException(400, str(e)) | |
| last_user = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "") | |
| async def event_gen(): | |
| if req.force_sandbox is True: | |
| decision = intent.ExecutionIntent(True, "forced by client", 1.0) | |
| elif req.force_sandbox is False: | |
| decision = intent.ExecutionIntent(False, "forced by client", 1.0) | |
| else: | |
| decision = await intent.detect(last_user) | |
| yield sse_format("intent", { | |
| "needs_sandbox": decision.needs_sandbox, | |
| "reason": decision.reason, | |
| "confidence": decision.confidence, | |
| }) | |
| full_reply = [] | |
| provider_used = None | |
| model_used = None | |
| try: | |
| if decision.needs_sandbox: | |
| async for ev in agent.stream_execute(messages, sandbox_timeout=req.sandbox_timeout): | |
| yield sse_format(ev["type"], ev) | |
| else: | |
| async for ev in llm_router.stream_complete( | |
| messages, | |
| temperature=0.4, | |
| max_tokens=2048, | |
| preferred_provider=req.preferred_provider, | |
| model=req.model, | |
| ): | |
| yield sse_format(ev["type"], ev) | |
| if ev["type"] == "delta": | |
| full_reply.append(ev.get("content", "")) | |
| provider_used = ev.get("provider") | |
| elif ev["type"] == "done": | |
| model_used = ev.get("model") | |
| except Exception as e: | |
| logger.exception("stream error") | |
| yield sse_format("error", {"error": str(e)}) | |
| finally: | |
| yield sse_format("end", {"done": True}) | |
| # Persist | |
| if req.conversation_id and full_reply: | |
| reply_text = "".join(full_reply) | |
| await persistence.save_message(req.conversation_id, "user", last_user) | |
| await persistence.save_message( | |
| req.conversation_id, "assistant", reply_text, | |
| provider=provider_used, model=model_used | |
| ) | |
| await persistence.publish_event(req.conversation_id, { | |
| "type": "message", | |
| "role": "assistant", | |
| "content": reply_text, | |
| }) | |
| return StreamingResponse(event_gen(), headers=SSE_HEADERS) | |
| async def api_execute(req: ChatRequest): | |
| try: | |
| messages = req.to_messages() | |
| except ValueError as e: | |
| raise HTTPException(400, str(e)) | |
| async def event_gen(): | |
| yield sse_format("intent", {"needs_sandbox": True, "reason": "explicit /execute"}) | |
| try: | |
| async for ev in agent.stream_execute(messages, sandbox_timeout=req.sandbox_timeout): | |
| yield sse_format(ev["type"], ev) | |
| # Persist execution events to Redis | |
| if req.conversation_id and ev["type"] in ("stdout", "stderr", "exec_result"): | |
| await persistence.publish_event(req.conversation_id, ev) | |
| except Exception as e: | |
| logger.exception("execute error") | |
| yield sse_format("error", {"error": str(e)}) | |
| finally: | |
| yield sse_format("end", {"done": True}) | |
| return StreamingResponse(event_gen(), headers=SSE_HEADERS) | |
| async def api_agent_stream(req: AgentRequest): | |
| try: | |
| messages = req.to_messages() | |
| except ValueError as e: | |
| raise HTTPException(400, str(e)) | |
| async def event_gen(): | |
| try: | |
| async for ev in agent.stream_agent_plan( | |
| messages, | |
| sandbox_timeout=req.sandbox_timeout, | |
| max_retries_per_step=max(0, min(req.max_retries_per_step, 4)), | |
| enable_browser=req.enable_browser, | |
| task_id=req.task_id, | |
| ): | |
| yield sse_format(ev["type"], ev) | |
| except Exception as e: | |
| logger.exception("agent stream error") | |
| yield sse_format("error", {"error": str(e)}) | |
| finally: | |
| yield sse_format("end", {"done": True}) | |
| return StreamingResponse(event_gen(), headers=SSE_HEADERS) | |
| async def api_tasks_list(limit: int = 50, state: Optional[str] = None) -> Dict[str, Any]: | |
| limit = max(1, min(limit, 200)) | |
| rows = await task_store.list_tasks(limit=limit, state=state) | |
| return {"tasks": rows, "count": len(rows)} | |
| async def api_task_get(task_id: str) -> Dict[str, Any]: | |
| t = await task_store.get_task(task_id) | |
| if t is None: | |
| raise HTTPException(404, "task not found") | |
| return t.to_dict() | |
| async def api_task_events(task_id: str, after: int = 0, limit: int = 500) -> Dict[str, Any]: | |
| t = await task_store.get_task(task_id) | |
| if t is None: | |
| raise HTTPException(404, "task not found") | |
| evs = await task_store.get_events(task_id, after_id=after, limit=max(1, min(limit, 2000))) | |
| return {"task": t.to_dict(include_steps=True), "events": evs} | |
| async def api_task_delete(task_id: str) -> Dict[str, Any]: | |
| ok = await task_store.delete_task(task_id) | |
| if not ok: | |
| raise HTTPException(404, "task not found") | |
| return {"ok": True, "task_id": task_id} | |
| # βββ Routes: Phase 3 β Conversations (Supabase) βββββββββββββββββββββββββββββββ | |
| async def create_conversation(data: ConversationCreate) -> Dict[str, Any]: | |
| conv = await persistence.create_conversation( | |
| user_id=data.user_id, | |
| title=data.title or "New conversation", | |
| model=data.model, | |
| provider=data.provider, | |
| ) | |
| if conv is None: | |
| raise HTTPException(503, "Database not available") | |
| return conv | |
| async def list_conversations(user_id: str = "anonymous") -> List[Dict[str, Any]]: | |
| return await persistence.list_conversations(user_id=user_id) | |
| async def get_messages(conv_id: str) -> List[Dict[str, Any]]: | |
| return await persistence.get_conversation_messages(conv_id) | |
| async def delete_conversation(conv_id: str) -> Dict[str, Any]: | |
| ok = await persistence.delete_conversation(conv_id) | |
| return {"deleted": ok, "conv_id": conv_id} | |
| # βββ Routes: Phase 3 β Redis SSE stream ββββββββββββββββββββββββββββββββββββββ | |
| async def sse_conversation(conv_id: str, request: Request): | |
| """Redis pub/sub SSE stream for a conversation room.""" | |
| async def event_gen() -> AsyncGenerator[bytes, None]: | |
| redis = persistence.get_redis() | |
| if not redis: | |
| yield b"data: {\"error\":\"Redis not available\"}\n\n" | |
| return | |
| import redis.asyncio as aioredis | |
| pubsub = redis.pubsub() | |
| await pubsub.subscribe(f"room:{conv_id}") | |
| try: | |
| while True: | |
| if await request.is_disconnected(): | |
| break | |
| msg = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) | |
| if msg and msg["type"] == "message": | |
| yield f"data: {msg['data']}\n\n".encode() | |
| else: | |
| yield b": keepalive\n\n" | |
| await asyncio.sleep(0.05) | |
| finally: | |
| try: | |
| await pubsub.unsubscribe(f"room:{conv_id}") | |
| await pubsub.aclose() | |
| except Exception: | |
| pass | |
| return StreamingResponse( | |
| event_gen(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "X-Accel-Buffering": "no", | |
| "Connection": "keep-alive", | |
| }, | |
| ) | |
| # βββ Routes: Phase 3 β WebSocket βββββββββββββββββββββββββββββββββββββββββββββ | |
| async def websocket_room(ws: WebSocket, room: str): | |
| await ws_manager.connect(ws, room) | |
| logger.info("WS joined room=%s total=%d", room, ws_manager.room_count(room)) | |
| try: | |
| while True: | |
| raw = await ws.receive_text() | |
| try: | |
| msg = json.loads(raw) | |
| except Exception: | |
| msg = {"text": raw} | |
| # Broadcast to all in room | |
| await ws_manager.broadcast(room, msg) | |
| # Also publish to Redis for SSE subscribers | |
| await persistence.publish_event(room, msg) | |
| except WebSocketDisconnect: | |
| ws_manager.disconnect(ws, room) | |
| logger.info("WS left room=%s", room) | |
| except Exception as e: | |
| logger.error("WS error: %s", e) | |
| ws_manager.disconnect(ws, room) | |
| # βββ Routes: Phase 3 β Models list ββββββββββββββββββββββββββββββββββββββββββββ | |
| async def list_models() -> Dict[str, Any]: | |
| return { | |
| "gemini": [ | |
| {"id": "gemini-2.0-flash", "name": "Gemini 2.0 Flash", "context": 1048576}, | |
| {"id": "gemini-2.0-flash-thinking-exp", "name": "Gemini 2.0 Flash Thinking","context": 32768}, | |
| {"id": "gemini-1.5-pro", "name": "Gemini 1.5 Pro", "context": 2097152}, | |
| {"id": "gemini-1.5-flash", "name": "Gemini 1.5 Flash", "context": 1048576}, | |
| ], | |
| "sambanova": [ | |
| {"id": "Meta-Llama-3.3-70B-Instruct", "name": "Llama 3.3 70B", "context": 131072}, | |
| {"id": "Meta-Llama-3.1-405B-Instruct", "name": "Llama 3.1 405B", "context": 16384}, | |
| {"id": "DeepSeek-R1", "name": "DeepSeek R1", "context": 32768}, | |
| {"id": "Qwen2.5-72B-Instruct", "name": "Qwen 2.5 72B", "context": 32768}, | |
| ], | |
| "github_gpt4o": [ | |
| {"id": "gpt-4o", "name": "GPT-4o", "context": 128000}, | |
| {"id": "gpt-4o-mini", "name": "GPT-4o Mini", "context": 128000}, | |
| {"id": "Meta-Llama-3.1-70B-Instruct", "name": "Llama 3.1 70B", "context": 131072}, | |
| {"id": "Mistral-large-2407", "name": "Mistral Large", "context": 131072}, | |
| ], | |
| } | |
| # βββ Entry point ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.environ.get("PORT", "7860")) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |