Spaces:
Sleeping
Sleeping
| """ | |
| Onehands AI Backend β Hugging Face Space | |
| ========================================== | |
| Full-featured FastAPI backend for the Onehands autonomous AI platform. | |
| Stack: | |
| β’ FastAPI + uvicorn | |
| β’ Supabase (PostgreSQL via asyncpg) β conversations, messages, executions | |
| β’ Upstash Redis β pub/sub, SSE bridge, caching | |
| β’ Smart API Router β Gemini / SambaNova / GitHub LLM with auto-heal cooldowns | |
| β’ E2B β secure sandboxed code execution | |
| β’ WebSocket + SSE β realtime event streaming | |
| β’ Agent Loop β autonomous task planning & multi-step execution | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import time | |
| import uuid | |
| from contextlib import asynccontextmanager | |
| from typing import Any, AsyncGenerator, Optional | |
| import httpx | |
| from fastapi import ( | |
| Depends, FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect, | |
| ) | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from pydantic import BaseModel, Field | |
| import persistence as db | |
| from smart_router import Provider, _to_gemini_format, router as smart_router | |
| # βββ Logging ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| ) | |
| logger = logging.getLogger("onehands") | |
| # βββ Config βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| E2B_API_KEY = os.environ.get("E2B_API_KEY", "") | |
| ALLOWED_ORIGINS: list[str] = [ | |
| "https://onehands-development.vercel.app", | |
| "https://onehands.vercel.app", | |
| "https://pyaesonegtckglay-dotcom-onehands-development.vercel.app", | |
| "http://localhost:3000", | |
| "http://localhost:3001", | |
| "http://localhost:5173", | |
| "*", | |
| ] | |
| # βββ Lifespan βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def lifespan(app: FastAPI): | |
| await db.init_db() | |
| await db.init_redis() | |
| logger.info("π Onehands backend ready") | |
| yield | |
| await db.close() | |
| logger.info("π Onehands backend shutdown") | |
| # βββ App ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI( | |
| title="Onehands AI Backend", | |
| description=( | |
| "Autonomous AI platform backend: multi-provider LLM routing, " | |
| "code execution, persistent conversations, realtime streaming." | |
| ), | |
| version="2.0.0", | |
| lifespan=lifespan, | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=ALLOWED_ORIGINS, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # βββ WebSocket manager ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class WSManager: | |
| def __init__(self): | |
| self._rooms: dict[str, list[WebSocket]] = {} | |
| async def connect(self, ws: WebSocket, room: str): | |
| await ws.accept() | |
| self._rooms.setdefault(room, []).append(ws) | |
| logger.info("WS+ room=%s total=%d", room, len(self._rooms[room])) | |
| def disconnect(self, ws: WebSocket, room: str): | |
| self._rooms[room] = [c for c in self._rooms.get(room, []) if c is not ws] | |
| async def broadcast(self, room: str, data: dict): | |
| dead: list[WebSocket] = [] | |
| for ws in list(self._rooms.get(room, [])): | |
| try: | |
| await ws.send_json(data) | |
| except Exception: | |
| dead.append(ws) | |
| for ws in dead: | |
| self.disconnect(ws, room) | |
| ws_mgr = WSManager() | |
| # βββ Schemas ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ChatRequest(BaseModel): | |
| conversation_id: Optional[str] = None | |
| message: str | |
| model: str = "gemini-2.0-flash" | |
| provider: str = "gemini" | |
| temperature: float = 0.7 | |
| max_tokens: int = 4096 | |
| system_prompt: Optional[str] = None | |
| stream: bool = False | |
| auto_fallback: bool = True | |
| class StreamChatRequest(BaseModel): | |
| conversation_id: Optional[str] = None | |
| message: str | |
| model: str = "gemini-2.0-flash" | |
| provider: str = "gemini" | |
| temperature: float = 0.7 | |
| max_tokens: int = 4096 | |
| system_prompt: Optional[str] = None | |
| class ExecuteRequest(BaseModel): | |
| conversation_id: Optional[str] = None | |
| code: str | |
| language: str = "python" | |
| timeout: int = 30 | |
| class ConversationCreate(BaseModel): | |
| user_id: str = "anonymous" | |
| title: Optional[str] = None | |
| model: str = "gemini-2.0-flash" | |
| provider: str = "gemini" | |
| class AgentTaskRequest(BaseModel): | |
| task: str | |
| conversation_id: Optional[str] = None | |
| model: str = "gemini-2.0-flash" | |
| provider: str = "gemini" | |
| max_steps: int = Field(default=8, ge=1, le=20) | |
| execute_code: bool = True | |
| user_id: str = "anonymous" | |
| # βββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_oai_messages(system: Optional[str], history: list, user_msg: str) -> list: | |
| msgs: list[dict] = [] | |
| if system: | |
| msgs.append({"role": "system", "content": system}) | |
| for row in history: | |
| msgs.append({"role": row["role"], "content": row["content"]}) | |
| msgs.append({"role": "user", "content": user_msg}) | |
| return msgs | |
| async def _emit(conv_id: str, event: dict): | |
| """Publish to Redis + broadcast via WS.""" | |
| await db.publish_event(conv_id, event) | |
| await ws_mgr.broadcast(conv_id, event) | |
| async def _llm_call( | |
| provider: str, | |
| model: str, | |
| messages: list, | |
| temperature: float, | |
| max_tokens: int, | |
| auto_fallback: bool = True, | |
| ) -> tuple[str, str, str]: | |
| """Returns (content, used_provider, used_model).""" | |
| if auto_fallback: | |
| result = await smart_router.auto_chat( | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| preferred_provider=provider, | |
| preferred_model=model, | |
| ) | |
| return result["content"], result["provider"], result["model"] | |
| # strict provider | |
| if provider == "gemini": | |
| r = await smart_router.call_gemini( | |
| model, _to_gemini_format(messages), temperature, max_tokens | |
| ) | |
| content = r["candidates"][0]["content"]["parts"][0]["text"] | |
| elif provider == "sambanova": | |
| r = await smart_router.call_sambanova(model, messages, temperature, max_tokens) | |
| content = r["choices"][0]["message"]["content"] | |
| elif provider == "github_llm": | |
| r = await smart_router.call_github_llm(model, messages, temperature, max_tokens) | |
| content = r["choices"][0]["message"]["content"] | |
| else: | |
| raise HTTPException(status_code=400, detail=f"Unknown provider: {provider}") | |
| return content, provider, model | |
| # βββ Routes: root / health ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def root(): | |
| return { | |
| "service": "Onehands AI Backend", | |
| "version": "2.0.0", | |
| "status": "running", | |
| "docs": "/docs", | |
| "endpoints": [ | |
| "/chat", "/chat/stream", "/execute", | |
| "/agent/task", "/conversations", | |
| "/health", "/health/keys", | |
| "/ws/{room}", "/models", | |
| ], | |
| } | |
| async def health(): | |
| redis_ok = await db.redis_ping() | |
| return { | |
| "status": "ok" if (db.db_connected() and redis_ok) else "degraded", | |
| "database": "connected" if db.db_connected() else "disconnected", | |
| "redis": "connected" if redis_ok else "disconnected", | |
| "smart_router": smart_router.health(), | |
| "timestamp": time.time(), | |
| } | |
| async def health_keys(): | |
| return smart_router.health() | |
| async def reload_keys(): | |
| smart_router._reload_keys() | |
| return {"status": "reloaded", "health": smart_router.health()} | |
| # βββ Conversations ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def create_conversation(data: ConversationCreate): | |
| row = await db.create_conversation( | |
| user_id=data.user_id, | |
| title=data.title or "New conversation", | |
| model=data.model, | |
| provider=data.provider, | |
| ) | |
| if not row: | |
| raise HTTPException(status_code=503, detail="Database unavailable") | |
| return row | |
| async def list_conversations(user_id: str = "anonymous"): | |
| return await db.list_conversations(user_id) | |
| async def get_messages(conv_id: str): | |
| return await db.get_conversation_messages(conv_id) | |
| async def delete_conversation(conv_id: str): | |
| ok = await db.delete_conversation(conv_id) | |
| return {"deleted": ok} | |
| # βββ Chat βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def chat(req: ChatRequest): | |
| # Ensure conversation | |
| conv_id = req.conversation_id | |
| if not conv_id: | |
| row = await db.create_conversation( | |
| title=req.message[:60], | |
| model=req.model, | |
| provider=req.provider, | |
| ) | |
| conv_id = str(row["id"]) if row else str(uuid.uuid4()) | |
| # History | |
| history = await db.get_conversation_messages(conv_id) | |
| messages = _build_oai_messages(req.system_prompt, history, req.message) | |
| # Save user msg | |
| await db.save_message(conv_id, "user", req.message) | |
| try: | |
| content, used_provider, used_model = await _llm_call( | |
| provider=req.provider, | |
| model=req.model, | |
| messages=messages, | |
| temperature=req.temperature, | |
| max_tokens=req.max_tokens, | |
| auto_fallback=req.auto_fallback, | |
| ) | |
| except Exception as e: | |
| logger.error("chat error: %s", e) | |
| raise HTTPException(status_code=502, detail=str(e)) | |
| # Save & emit | |
| await db.save_message(conv_id, "assistant", content, used_provider, used_model) | |
| await _emit(conv_id, {"type": "message", "role": "assistant", "content": content}) | |
| return { | |
| "conv_id": conv_id, | |
| "role": "assistant", | |
| "content": content, | |
| "model": used_model, | |
| "provider": used_provider, | |
| } | |
| # βββ Streaming chat βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def chat_stream(req: StreamChatRequest): | |
| """SSE streaming chat endpoint.""" | |
| conv_id = req.conversation_id | |
| if not conv_id: | |
| row = await db.create_conversation( | |
| title=req.message[:60], model=req.model, provider=req.provider | |
| ) | |
| conv_id = str(row["id"]) if row else str(uuid.uuid4()) | |
| history = await db.get_conversation_messages(conv_id) | |
| messages = _build_oai_messages(req.system_prompt, history, req.message) | |
| await db.save_message(conv_id, "user", req.message) | |
| async def event_gen() -> AsyncGenerator[str, None]: | |
| full_text = "" | |
| try: | |
| if req.provider == "gemini": | |
| gen = smart_router.stream_gemini( | |
| req.model, _to_gemini_format(messages), | |
| req.temperature, req.max_tokens, | |
| ) | |
| elif req.provider == "sambanova": | |
| gen = smart_router.stream_sambanova( | |
| req.model, messages, req.temperature, req.max_tokens | |
| ) | |
| elif req.provider == "github_llm": | |
| gen = smart_router.stream_github_llm( | |
| req.model, messages, req.temperature, req.max_tokens | |
| ) | |
| else: | |
| yield f"data: {json.dumps({'error': f'Unknown provider: {req.provider}'})}\n\n" | |
| return | |
| async for chunk in gen: | |
| full_text += chunk | |
| yield f"data: {json.dumps({'type': 'chunk', 'content': chunk, 'conv_id': conv_id})}\n\n" | |
| except Exception as e: | |
| logger.error("stream error: %s", e) | |
| # fallback to non-streaming | |
| try: | |
| content, used_provider, used_model = await _llm_call( | |
| provider=req.provider, model=req.model, | |
| messages=messages, temperature=req.temperature, | |
| max_tokens=req.max_tokens, auto_fallback=True, | |
| ) | |
| full_text = content | |
| yield f"data: {json.dumps({'type': 'chunk', 'content': content, 'conv_id': conv_id})}\n\n" | |
| except Exception as e2: | |
| yield f"data: {json.dumps({'type': 'error', 'error': str(e2)})}\n\n" | |
| return | |
| await db.save_message(conv_id, "assistant", full_text, req.provider, req.model) | |
| await _emit(conv_id, {"type": "done", "conv_id": conv_id, "content": full_text}) | |
| yield f"data: {json.dumps({'type': 'done', 'conv_id': conv_id})}\n\n" | |
| return StreamingResponse( | |
| event_gen(), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, | |
| ) | |
| # βββ SSE subscribe ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def sse_subscribe(conv_id: str, request: Request): | |
| """Subscribe to events for a conversation room via SSE.""" | |
| async def gen() -> AsyncGenerator[str, None]: | |
| redis = db.get_redis() | |
| if not redis: | |
| yield 'data: {"error":"Redis not available"}\n\n' | |
| return | |
| pubsub = redis.pubsub() | |
| await pubsub.subscribe(f"room:{conv_id}") | |
| try: | |
| while True: | |
| if await request.is_disconnected(): | |
| break | |
| msg = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) | |
| if msg and msg["type"] == "message": | |
| yield f"data: {msg['data']}\n\n" | |
| else: | |
| yield ": keepalive\n\n" | |
| await asyncio.sleep(0.05) | |
| finally: | |
| await pubsub.unsubscribe(f"room:{conv_id}") | |
| await pubsub.aclose() | |
| return StreamingResponse( | |
| gen(), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, | |
| ) | |
| # βββ WebSocket ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def websocket_ep(ws: WebSocket, room: str): | |
| await ws_mgr.connect(ws, room) | |
| try: | |
| while True: | |
| data = await ws.receive_text() | |
| try: | |
| msg = json.loads(data) | |
| except Exception: | |
| msg = {"raw": data} | |
| await ws_mgr.broadcast(room, msg) | |
| except WebSocketDisconnect: | |
| ws_mgr.disconnect(ws, room) | |
| except Exception as e: | |
| logger.error("WS error room=%s: %s", room, e) | |
| ws_mgr.disconnect(ws, room) | |
| # βββ Code Execution (E2B) βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def _e2b_run(code: str, language: str, timeout: int) -> dict: | |
| if not E2B_API_KEY: | |
| return {"output": "", "error": "E2B_API_KEY not configured", "exit_code": 1, "duration_ms": 0} | |
| try: | |
| import e2b_code_interpreter as e2b | |
| start = time.time() | |
| sandbox = await asyncio.to_thread(e2b.Sandbox, api_key=E2B_API_KEY, timeout=timeout) | |
| execution = await asyncio.to_thread(sandbox.run_code, code) | |
| duration = int((time.time() - start) * 1000) | |
| await asyncio.to_thread(sandbox.kill) | |
| stdout = "\n".join(execution.logs.stdout or []) | |
| stderr = "\n".join(execution.logs.stderr or []) | |
| return { | |
| "output": stdout, | |
| "error": stderr, | |
| "exit_code": 1 if stderr else 0, | |
| "duration_ms": duration, | |
| } | |
| except Exception as e: | |
| return {"output": "", "error": str(e), "exit_code": 1, "duration_ms": 0} | |
| async def execute_code(req: ExecuteRequest): | |
| result = await _e2b_run(req.code, req.language, req.timeout) | |
| if req.conversation_id: | |
| await db.save_execution( | |
| req.conversation_id, req.language, req.code, | |
| result["output"], result["error"], | |
| result["exit_code"], result["duration_ms"], | |
| ) | |
| await _emit(req.conversation_id, {"type": "execution", **result}) | |
| return result | |
| # βββ Agent Loop βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| AGENT_SYSTEM = """You are Onehands β an autonomous AI developer agent. | |
| Given a task, you: | |
| 1. PLAN: Break it into numbered steps | |
| 2. EXECUTE: For each step, either write reasoning OR generate code | |
| 3. CODE: When writing code, wrap it in ```python ... ``` blocks | |
| 4. OBSERVE: After code runs, use the output to guide next step | |
| 5. FINISH: When done, summarize results | |
| Rules: | |
| - Be concise, action-oriented | |
| - One code block per step maximum | |
| - If a step requires browsing or file I/O, describe what you'd do | |
| - Always end with a FINAL ANSWER section | |
| """ | |
| def _extract_code(text: str) -> Optional[str]: | |
| """Extract first python code block from markdown text.""" | |
| import re | |
| pattern = r"```(?:python)?\n(.*?)```" | |
| m = re.search(pattern, text, re.DOTALL) | |
| return m.group(1).strip() if m else None | |
| async def agent_task(req: AgentTaskRequest): | |
| """ | |
| Autonomous agent loop: | |
| 1. Create/use conversation | |
| 2. LLM plans & executes step-by-step | |
| 3. Code blocks run in E2B sandbox | |
| 4. Results fed back to LLM | |
| 5. Returns full trace | |
| """ | |
| conv_id = req.conversation_id | |
| if not conv_id: | |
| row = await db.create_conversation( | |
| user_id=req.user_id, | |
| title=f"Agent: {req.task[:50]}", | |
| model=req.model, | |
| provider=req.provider, | |
| ) | |
| conv_id = str(row["id"]) if row else str(uuid.uuid4()) | |
| await db.save_message(conv_id, "user", req.task) | |
| await _emit(conv_id, {"type": "agent_start", "task": req.task, "conv_id": conv_id}) | |
| messages: list[dict] = [ | |
| {"role": "system", "content": AGENT_SYSTEM}, | |
| {"role": "user", "content": f"TASK: {req.task}"}, | |
| ] | |
| trace: list[dict] = [] | |
| step = 0 | |
| while step < req.max_steps: | |
| step += 1 | |
| logger.info("Agent step %d/%d conv=%s", step, req.max_steps, conv_id) | |
| # LLM call | |
| try: | |
| content, used_provider, used_model = await _llm_call( | |
| provider=req.provider, | |
| model=req.model, | |
| messages=messages, | |
| temperature=0.3, | |
| max_tokens=2048, | |
| auto_fallback=True, | |
| ) | |
| except Exception as e: | |
| error_msg = f"LLM failed at step {step}: {e}" | |
| trace.append({"step": step, "type": "error", "content": error_msg}) | |
| break | |
| messages.append({"role": "assistant", "content": content}) | |
| await db.save_message(conv_id, "assistant", content, used_provider, used_model) | |
| await _emit(conv_id, { | |
| "type": "agent_step", | |
| "step": step, | |
| "content": content, | |
| "provider": used_provider, | |
| "model": used_model, | |
| }) | |
| trace.append({"step": step, "type": "thought", "content": content}) | |
| # Execute code if present | |
| if req.execute_code: | |
| code = _extract_code(content) | |
| if code: | |
| exec_result = await _e2b_run(code, "python", 30) | |
| await db.save_execution( | |
| conv_id, "python", code, | |
| exec_result["output"], exec_result["error"], | |
| exec_result["exit_code"], exec_result["duration_ms"], | |
| ) | |
| await _emit(conv_id, { | |
| "type": "agent_execution", | |
| "step": step, | |
| **exec_result, | |
| }) | |
| trace.append({"step": step, "type": "execution", **exec_result}) | |
| # Feed result back | |
| obs = f"[Code execution result]\noutput: {exec_result['output']}\nerror: {exec_result['error']}\nexit_code: {exec_result['exit_code']}" | |
| messages.append({"role": "user", "content": obs}) | |
| # Check if done | |
| if any(kw in content.lower() for kw in ("final answer", "task complete", "done.", "finished.")): | |
| break | |
| await _emit(conv_id, {"type": "agent_done", "steps": step, "conv_id": conv_id}) | |
| return { | |
| "conv_id": conv_id, | |
| "task": req.task, | |
| "steps": step, | |
| "trace": trace, | |
| } | |
| # βββ Models βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def list_models(): | |
| return { | |
| "gemini": [ | |
| {"id": "gemini-2.0-flash", "name": "Gemini 2.0 Flash", "context": 1048576}, | |
| {"id": "gemini-2.0-flash-thinking-exp", "name": "Gemini 2.0 Flash Thinking", "context": 32768}, | |
| {"id": "gemini-1.5-pro", "name": "Gemini 1.5 Pro", "context": 2097152}, | |
| {"id": "gemini-1.5-flash", "name": "Gemini 1.5 Flash", "context": 1048576}, | |
| ], | |
| "sambanova": [ | |
| {"id": "Meta-Llama-3.3-70B-Instruct", "name": "Llama 3.3 70B", "context": 131072}, | |
| {"id": "Meta-Llama-3.1-405B-Instruct", "name": "Llama 3.1 405B", "context": 16384}, | |
| {"id": "DeepSeek-R1", "name": "DeepSeek R1", "context": 32768}, | |
| {"id": "Qwen2.5-72B-Instruct", "name": "Qwen 2.5 72B", "context": 32768}, | |
| ], | |
| "github_llm": [ | |
| {"id": "gpt-4o", "name": "GPT-4o", "context": 128000}, | |
| {"id": "gpt-4o-mini", "name": "GPT-4o Mini", "context": 128000}, | |
| {"id": "Meta-Llama-3.1-70B-Instruct", "name": "Llama 3.1 70B (GitHub)", "context": 131072}, | |
| {"id": "Mistral-large-2407", "name": "Mistral Large", "context": 131072}, | |
| ], | |
| } | |
| # βββ Entry ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run( | |
| "app:app", | |
| host="0.0.0.0", | |
| port=int(os.environ.get("PORT", 7860)), | |
| log_level="info", | |
| workers=1, | |
| ) | |