PYAE1994's picture
feat: Onehands AI Backend v2.0 β€” smart routing, agent loop, streaming, E2B execution
e192219
"""
Onehands AI Backend β€” Hugging Face Space
==========================================
Full-featured FastAPI backend for the Onehands autonomous AI platform.
Stack:
β€’ FastAPI + uvicorn
β€’ Supabase (PostgreSQL via asyncpg) β€” conversations, messages, executions
β€’ Upstash Redis β€” pub/sub, SSE bridge, caching
β€’ Smart API Router β€” Gemini / SambaNova / GitHub LLM with auto-heal cooldowns
β€’ E2B β€” secure sandboxed code execution
β€’ WebSocket + SSE β€” realtime event streaming
β€’ Agent Loop β€” autonomous task planning & multi-step execution
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import time
import uuid
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Optional
import httpx
from fastapi import (
Depends, FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect,
)
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
import persistence as db
from smart_router import Provider, _to_gemini_format, router as smart_router
# ─── Logging ──────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("onehands")
# ─── Config ───────────────────────────────────────────────────────────────────
E2B_API_KEY = os.environ.get("E2B_API_KEY", "")
ALLOWED_ORIGINS: list[str] = [
"https://onehands-development.vercel.app",
"https://onehands.vercel.app",
"https://pyaesonegtckglay-dotcom-onehands-development.vercel.app",
"http://localhost:3000",
"http://localhost:3001",
"http://localhost:5173",
"*",
]
# ─── Lifespan ─────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
await db.init_db()
await db.init_redis()
logger.info("πŸš€ Onehands backend ready")
yield
await db.close()
logger.info("πŸ›‘ Onehands backend shutdown")
# ─── App ──────────────────────────────────────────────────────────────────────
app = FastAPI(
title="Onehands AI Backend",
description=(
"Autonomous AI platform backend: multi-provider LLM routing, "
"code execution, persistent conversations, realtime streaming."
),
version="2.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ─── WebSocket manager ────────────────────────────────────────────────────────
class WSManager:
def __init__(self):
self._rooms: dict[str, list[WebSocket]] = {}
async def connect(self, ws: WebSocket, room: str):
await ws.accept()
self._rooms.setdefault(room, []).append(ws)
logger.info("WS+ room=%s total=%d", room, len(self._rooms[room]))
def disconnect(self, ws: WebSocket, room: str):
self._rooms[room] = [c for c in self._rooms.get(room, []) if c is not ws]
async def broadcast(self, room: str, data: dict):
dead: list[WebSocket] = []
for ws in list(self._rooms.get(room, [])):
try:
await ws.send_json(data)
except Exception:
dead.append(ws)
for ws in dead:
self.disconnect(ws, room)
ws_mgr = WSManager()
# ─── Schemas ──────────────────────────────────────────────────────────────────
class ChatRequest(BaseModel):
conversation_id: Optional[str] = None
message: str
model: str = "gemini-2.0-flash"
provider: str = "gemini"
temperature: float = 0.7
max_tokens: int = 4096
system_prompt: Optional[str] = None
stream: bool = False
auto_fallback: bool = True
class StreamChatRequest(BaseModel):
conversation_id: Optional[str] = None
message: str
model: str = "gemini-2.0-flash"
provider: str = "gemini"
temperature: float = 0.7
max_tokens: int = 4096
system_prompt: Optional[str] = None
class ExecuteRequest(BaseModel):
conversation_id: Optional[str] = None
code: str
language: str = "python"
timeout: int = 30
class ConversationCreate(BaseModel):
user_id: str = "anonymous"
title: Optional[str] = None
model: str = "gemini-2.0-flash"
provider: str = "gemini"
class AgentTaskRequest(BaseModel):
task: str
conversation_id: Optional[str] = None
model: str = "gemini-2.0-flash"
provider: str = "gemini"
max_steps: int = Field(default=8, ge=1, le=20)
execute_code: bool = True
user_id: str = "anonymous"
# ─── Helpers ──────────────────────────────────────────────────────────────────
def _build_oai_messages(system: Optional[str], history: list, user_msg: str) -> list:
msgs: list[dict] = []
if system:
msgs.append({"role": "system", "content": system})
for row in history:
msgs.append({"role": row["role"], "content": row["content"]})
msgs.append({"role": "user", "content": user_msg})
return msgs
async def _emit(conv_id: str, event: dict):
"""Publish to Redis + broadcast via WS."""
await db.publish_event(conv_id, event)
await ws_mgr.broadcast(conv_id, event)
async def _llm_call(
provider: str,
model: str,
messages: list,
temperature: float,
max_tokens: int,
auto_fallback: bool = True,
) -> tuple[str, str, str]:
"""Returns (content, used_provider, used_model)."""
if auto_fallback:
result = await smart_router.auto_chat(
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
preferred_provider=provider,
preferred_model=model,
)
return result["content"], result["provider"], result["model"]
# strict provider
if provider == "gemini":
r = await smart_router.call_gemini(
model, _to_gemini_format(messages), temperature, max_tokens
)
content = r["candidates"][0]["content"]["parts"][0]["text"]
elif provider == "sambanova":
r = await smart_router.call_sambanova(model, messages, temperature, max_tokens)
content = r["choices"][0]["message"]["content"]
elif provider == "github_llm":
r = await smart_router.call_github_llm(model, messages, temperature, max_tokens)
content = r["choices"][0]["message"]["content"]
else:
raise HTTPException(status_code=400, detail=f"Unknown provider: {provider}")
return content, provider, model
# ─── Routes: root / health ────────────────────────────────────────────────────
@app.get("/")
async def root():
return {
"service": "Onehands AI Backend",
"version": "2.0.0",
"status": "running",
"docs": "/docs",
"endpoints": [
"/chat", "/chat/stream", "/execute",
"/agent/task", "/conversations",
"/health", "/health/keys",
"/ws/{room}", "/models",
],
}
@app.get("/health")
async def health():
redis_ok = await db.redis_ping()
return {
"status": "ok" if (db.db_connected() and redis_ok) else "degraded",
"database": "connected" if db.db_connected() else "disconnected",
"redis": "connected" if redis_ok else "disconnected",
"smart_router": smart_router.health(),
"timestamp": time.time(),
}
@app.get("/health/keys")
async def health_keys():
return smart_router.health()
@app.post("/health/reload-keys")
async def reload_keys():
smart_router._reload_keys()
return {"status": "reloaded", "health": smart_router.health()}
# ─── Conversations ────────────────────────────────────────────────────────────
@app.post("/conversations", status_code=201)
async def create_conversation(data: ConversationCreate):
row = await db.create_conversation(
user_id=data.user_id,
title=data.title or "New conversation",
model=data.model,
provider=data.provider,
)
if not row:
raise HTTPException(status_code=503, detail="Database unavailable")
return row
@app.get("/conversations")
async def list_conversations(user_id: str = "anonymous"):
return await db.list_conversations(user_id)
@app.get("/conversations/{conv_id}/messages")
async def get_messages(conv_id: str):
return await db.get_conversation_messages(conv_id)
@app.delete("/conversations/{conv_id}")
async def delete_conversation(conv_id: str):
ok = await db.delete_conversation(conv_id)
return {"deleted": ok}
# ─── Chat ─────────────────────────────────────────────────────────────────────
@app.post("/chat")
async def chat(req: ChatRequest):
# Ensure conversation
conv_id = req.conversation_id
if not conv_id:
row = await db.create_conversation(
title=req.message[:60],
model=req.model,
provider=req.provider,
)
conv_id = str(row["id"]) if row else str(uuid.uuid4())
# History
history = await db.get_conversation_messages(conv_id)
messages = _build_oai_messages(req.system_prompt, history, req.message)
# Save user msg
await db.save_message(conv_id, "user", req.message)
try:
content, used_provider, used_model = await _llm_call(
provider=req.provider,
model=req.model,
messages=messages,
temperature=req.temperature,
max_tokens=req.max_tokens,
auto_fallback=req.auto_fallback,
)
except Exception as e:
logger.error("chat error: %s", e)
raise HTTPException(status_code=502, detail=str(e))
# Save & emit
await db.save_message(conv_id, "assistant", content, used_provider, used_model)
await _emit(conv_id, {"type": "message", "role": "assistant", "content": content})
return {
"conv_id": conv_id,
"role": "assistant",
"content": content,
"model": used_model,
"provider": used_provider,
}
# ─── Streaming chat ───────────────────────────────────────────────────────────
@app.post("/chat/stream")
async def chat_stream(req: StreamChatRequest):
"""SSE streaming chat endpoint."""
conv_id = req.conversation_id
if not conv_id:
row = await db.create_conversation(
title=req.message[:60], model=req.model, provider=req.provider
)
conv_id = str(row["id"]) if row else str(uuid.uuid4())
history = await db.get_conversation_messages(conv_id)
messages = _build_oai_messages(req.system_prompt, history, req.message)
await db.save_message(conv_id, "user", req.message)
async def event_gen() -> AsyncGenerator[str, None]:
full_text = ""
try:
if req.provider == "gemini":
gen = smart_router.stream_gemini(
req.model, _to_gemini_format(messages),
req.temperature, req.max_tokens,
)
elif req.provider == "sambanova":
gen = smart_router.stream_sambanova(
req.model, messages, req.temperature, req.max_tokens
)
elif req.provider == "github_llm":
gen = smart_router.stream_github_llm(
req.model, messages, req.temperature, req.max_tokens
)
else:
yield f"data: {json.dumps({'error': f'Unknown provider: {req.provider}'})}\n\n"
return
async for chunk in gen:
full_text += chunk
yield f"data: {json.dumps({'type': 'chunk', 'content': chunk, 'conv_id': conv_id})}\n\n"
except Exception as e:
logger.error("stream error: %s", e)
# fallback to non-streaming
try:
content, used_provider, used_model = await _llm_call(
provider=req.provider, model=req.model,
messages=messages, temperature=req.temperature,
max_tokens=req.max_tokens, auto_fallback=True,
)
full_text = content
yield f"data: {json.dumps({'type': 'chunk', 'content': content, 'conv_id': conv_id})}\n\n"
except Exception as e2:
yield f"data: {json.dumps({'type': 'error', 'error': str(e2)})}\n\n"
return
await db.save_message(conv_id, "assistant", full_text, req.provider, req.model)
await _emit(conv_id, {"type": "done", "conv_id": conv_id, "content": full_text})
yield f"data: {json.dumps({'type': 'done', 'conv_id': conv_id})}\n\n"
return StreamingResponse(
event_gen(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
# ─── SSE subscribe ────────────────────────────────────────────────────────────
@app.get("/chat/stream/{conv_id}")
async def sse_subscribe(conv_id: str, request: Request):
"""Subscribe to events for a conversation room via SSE."""
async def gen() -> AsyncGenerator[str, None]:
redis = db.get_redis()
if not redis:
yield 'data: {"error":"Redis not available"}\n\n'
return
pubsub = redis.pubsub()
await pubsub.subscribe(f"room:{conv_id}")
try:
while True:
if await request.is_disconnected():
break
msg = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if msg and msg["type"] == "message":
yield f"data: {msg['data']}\n\n"
else:
yield ": keepalive\n\n"
await asyncio.sleep(0.05)
finally:
await pubsub.unsubscribe(f"room:{conv_id}")
await pubsub.aclose()
return StreamingResponse(
gen(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
# ─── WebSocket ────────────────────────────────────────────────────────────────
@app.websocket("/ws/{room}")
async def websocket_ep(ws: WebSocket, room: str):
await ws_mgr.connect(ws, room)
try:
while True:
data = await ws.receive_text()
try:
msg = json.loads(data)
except Exception:
msg = {"raw": data}
await ws_mgr.broadcast(room, msg)
except WebSocketDisconnect:
ws_mgr.disconnect(ws, room)
except Exception as e:
logger.error("WS error room=%s: %s", room, e)
ws_mgr.disconnect(ws, room)
# ─── Code Execution (E2B) ─────────────────────────────────────────────────────
async def _e2b_run(code: str, language: str, timeout: int) -> dict:
if not E2B_API_KEY:
return {"output": "", "error": "E2B_API_KEY not configured", "exit_code": 1, "duration_ms": 0}
try:
import e2b_code_interpreter as e2b
start = time.time()
sandbox = await asyncio.to_thread(e2b.Sandbox, api_key=E2B_API_KEY, timeout=timeout)
execution = await asyncio.to_thread(sandbox.run_code, code)
duration = int((time.time() - start) * 1000)
await asyncio.to_thread(sandbox.kill)
stdout = "\n".join(execution.logs.stdout or [])
stderr = "\n".join(execution.logs.stderr or [])
return {
"output": stdout,
"error": stderr,
"exit_code": 1 if stderr else 0,
"duration_ms": duration,
}
except Exception as e:
return {"output": "", "error": str(e), "exit_code": 1, "duration_ms": 0}
@app.post("/execute")
async def execute_code(req: ExecuteRequest):
result = await _e2b_run(req.code, req.language, req.timeout)
if req.conversation_id:
await db.save_execution(
req.conversation_id, req.language, req.code,
result["output"], result["error"],
result["exit_code"], result["duration_ms"],
)
await _emit(req.conversation_id, {"type": "execution", **result})
return result
# ─── Agent Loop ───────────────────────────────────────────────────────────────
AGENT_SYSTEM = """You are Onehands β€” an autonomous AI developer agent.
Given a task, you:
1. PLAN: Break it into numbered steps
2. EXECUTE: For each step, either write reasoning OR generate code
3. CODE: When writing code, wrap it in ```python ... ``` blocks
4. OBSERVE: After code runs, use the output to guide next step
5. FINISH: When done, summarize results
Rules:
- Be concise, action-oriented
- One code block per step maximum
- If a step requires browsing or file I/O, describe what you'd do
- Always end with a FINAL ANSWER section
"""
def _extract_code(text: str) -> Optional[str]:
"""Extract first python code block from markdown text."""
import re
pattern = r"```(?:python)?\n(.*?)```"
m = re.search(pattern, text, re.DOTALL)
return m.group(1).strip() if m else None
@app.post("/agent/task")
async def agent_task(req: AgentTaskRequest):
"""
Autonomous agent loop:
1. Create/use conversation
2. LLM plans & executes step-by-step
3. Code blocks run in E2B sandbox
4. Results fed back to LLM
5. Returns full trace
"""
conv_id = req.conversation_id
if not conv_id:
row = await db.create_conversation(
user_id=req.user_id,
title=f"Agent: {req.task[:50]}",
model=req.model,
provider=req.provider,
)
conv_id = str(row["id"]) if row else str(uuid.uuid4())
await db.save_message(conv_id, "user", req.task)
await _emit(conv_id, {"type": "agent_start", "task": req.task, "conv_id": conv_id})
messages: list[dict] = [
{"role": "system", "content": AGENT_SYSTEM},
{"role": "user", "content": f"TASK: {req.task}"},
]
trace: list[dict] = []
step = 0
while step < req.max_steps:
step += 1
logger.info("Agent step %d/%d conv=%s", step, req.max_steps, conv_id)
# LLM call
try:
content, used_provider, used_model = await _llm_call(
provider=req.provider,
model=req.model,
messages=messages,
temperature=0.3,
max_tokens=2048,
auto_fallback=True,
)
except Exception as e:
error_msg = f"LLM failed at step {step}: {e}"
trace.append({"step": step, "type": "error", "content": error_msg})
break
messages.append({"role": "assistant", "content": content})
await db.save_message(conv_id, "assistant", content, used_provider, used_model)
await _emit(conv_id, {
"type": "agent_step",
"step": step,
"content": content,
"provider": used_provider,
"model": used_model,
})
trace.append({"step": step, "type": "thought", "content": content})
# Execute code if present
if req.execute_code:
code = _extract_code(content)
if code:
exec_result = await _e2b_run(code, "python", 30)
await db.save_execution(
conv_id, "python", code,
exec_result["output"], exec_result["error"],
exec_result["exit_code"], exec_result["duration_ms"],
)
await _emit(conv_id, {
"type": "agent_execution",
"step": step,
**exec_result,
})
trace.append({"step": step, "type": "execution", **exec_result})
# Feed result back
obs = f"[Code execution result]\noutput: {exec_result['output']}\nerror: {exec_result['error']}\nexit_code: {exec_result['exit_code']}"
messages.append({"role": "user", "content": obs})
# Check if done
if any(kw in content.lower() for kw in ("final answer", "task complete", "done.", "finished.")):
break
await _emit(conv_id, {"type": "agent_done", "steps": step, "conv_id": conv_id})
return {
"conv_id": conv_id,
"task": req.task,
"steps": step,
"trace": trace,
}
# ─── Models ───────────────────────────────────────────────────────────────────
@app.get("/models")
async def list_models():
return {
"gemini": [
{"id": "gemini-2.0-flash", "name": "Gemini 2.0 Flash", "context": 1048576},
{"id": "gemini-2.0-flash-thinking-exp", "name": "Gemini 2.0 Flash Thinking", "context": 32768},
{"id": "gemini-1.5-pro", "name": "Gemini 1.5 Pro", "context": 2097152},
{"id": "gemini-1.5-flash", "name": "Gemini 1.5 Flash", "context": 1048576},
],
"sambanova": [
{"id": "Meta-Llama-3.3-70B-Instruct", "name": "Llama 3.3 70B", "context": 131072},
{"id": "Meta-Llama-3.1-405B-Instruct", "name": "Llama 3.1 405B", "context": 16384},
{"id": "DeepSeek-R1", "name": "DeepSeek R1", "context": 32768},
{"id": "Qwen2.5-72B-Instruct", "name": "Qwen 2.5 72B", "context": 32768},
],
"github_llm": [
{"id": "gpt-4o", "name": "GPT-4o", "context": 128000},
{"id": "gpt-4o-mini", "name": "GPT-4o Mini", "context": 128000},
{"id": "Meta-Llama-3.1-70B-Instruct", "name": "Llama 3.1 70B (GitHub)", "context": 131072},
{"id": "Mistral-large-2407", "name": "Mistral Large", "context": 131072},
],
}
# ─── Entry ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app:app",
host="0.0.0.0",
port=int(os.environ.get("PORT", 7860)),
log_level="info",
workers=1,
)