AI Developer Agent
AI Developer Agent v1.0 backend
763ef0d
"""
FastAPI backend - AI Developer Agent
====================================
Endpoints:
GET / service info
GET /health health check
GET /api/runtime runtime + provider telemetry
POST /api/tasks create + run a task (sync queued)
GET /api/tasks list tasks
GET /api/tasks/{id} get task
GET /api/tasks/{id}/events list events (REST)
GET /api/tasks/{id}/stream SSE event stream (live)
POST /api/chat one-shot chat (streams)
POST /api/llm/chat chat (non-streaming JSON)
POST /api/deploy/huggingface push backend dir to HF Space
POST /api/deploy/vercel deploy frontend dir to Vercel
POST /api/git/push commit + push to GitHub branch
All endpoints accept JSON bodies and return JSON unless documented otherwise.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import threading
import time
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
from . import tasks
from .agent import run_task
from .llm_router import get_router
from .executor import get_executor
from . import deployers
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger("app")
app = FastAPI(title="AI Developer Agent", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=os.getenv("CORS_ALLOW_ORIGINS", "*").split(","),
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# In-memory task queue (background worker)
# ---------------------------------------------------------------------------
_task_queue: "asyncio.Queue" = asyncio.Queue()
_active_subscribers: Dict[str, List[asyncio.Queue]] = {}
def _publish(task_id: str, event: Dict[str, Any]) -> None:
for q in list(_active_subscribers.get(task_id, [])):
try:
q.put_nowait(event)
except Exception:
pass
def _worker_run(task_id: str, title: str, description: str) -> None:
"""Run the agent generator in a thread and publish events."""
try:
for ev in run_task(task_id, title, description):
_publish(task_id, ev)
except Exception as e:
logger.exception("worker crashed")
_publish(task_id, {"task_id": task_id, "kind": "error", "message": str(e), "ts": time.time(), "data": {}})
# ---------------------------------------------------------------------------
# Schemas
# ---------------------------------------------------------------------------
class CreateTaskBody(BaseModel):
title: str
description: str = ""
payload: Optional[Dict[str, Any]] = None
class ChatBody(BaseModel):
messages: List[Dict[str, str]]
model: Optional[str] = None
temperature: float = 0.2
max_tokens: int = 1500
preferred_provider: Optional[str] = None
class HFDeployBody(BaseModel):
repo_id: str
source_dir: str = "."
commit_message: str = "Update from AI Developer Agent"
class VercelDeployBody(BaseModel):
project_name: str
source_dir: str
framework: Optional[str] = "nextjs"
target: str = "production"
install_command: Optional[str] = None
build_command: Optional[str] = None
env: Optional[Dict[str, str]] = None
class GitPushBody(BaseModel):
repo_dir: str = "."
branch: str = "genspark_ai_developer"
commit_message: str = "AI Developer Agent commit"
remote_url: Optional[str] = None
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.get("/")
def index():
return {
"service": "AI Developer Agent",
"version": "1.0.0",
"ok": True,
"endpoints": [
"/health", "/api/runtime", "/api/tasks", "/api/tasks/{id}/stream",
"/api/chat", "/api/llm/chat",
"/api/deploy/huggingface", "/api/deploy/vercel", "/api/git/push",
],
}
@app.get("/health")
def health():
router = get_router()
return {
"ok": True,
"ts": time.time(),
"providers": list(router.telemetry().keys()),
"executor": "e2b" if (get_executor().sandbox and get_executor().sandbox.available) else "local",
}
@app.get("/api/runtime")
def runtime():
info = get_executor().inspect_runtime()
info["providers"] = get_router().telemetry()
info["db"] = tasks.DB_PATH
return info
# ----- Tasks ---------------------------------------------------------------
@app.post("/api/tasks")
def create_task(body: CreateTaskBody):
task_id = tasks.create_task(body.title, body.description, body.payload or {})
t = threading.Thread(target=_worker_run, args=(task_id, body.title, body.description), daemon=True)
t.start()
return {"task_id": task_id, "title": body.title, "state": "queued"}
@app.get("/api/tasks")
def list_tasks(limit: int = 50):
return {"tasks": tasks.list_tasks(limit=limit)}
@app.get("/api/tasks/{task_id}")
def get_task(task_id: str):
t = tasks.get_task(task_id)
if not t:
raise HTTPException(404, "task not found")
return t
@app.get("/api/tasks/{task_id}/events")
def get_events(task_id: str, since_id: int = 0, limit: int = 1000):
return {"events": tasks.get_events(task_id, since_id=since_id, limit=limit)}
@app.get("/api/tasks/{task_id}/stream")
async def stream_events(task_id: str, request: Request):
"""Server-Sent Events stream. Replays historical events then live events."""
async def gen():
# 1) Replay history
last_id = 0
history = tasks.get_events(task_id, since_id=0, limit=2000)
for ev in history:
last_id = ev["id"]
yield f"id: {ev['id']}\nevent: {ev['kind']}\ndata: {json.dumps(ev)}\n\n"
# 2) Subscribe for live events
q: asyncio.Queue = asyncio.Queue()
_active_subscribers.setdefault(task_id, []).append(q)
try:
while True:
if await request.is_disconnected():
break
try:
ev = await asyncio.wait_for(q.get(), timeout=15.0)
yield f"event: {ev['kind']}\ndata: {json.dumps(ev)}\n\n"
if ev["kind"] in ("done", "error") and ev.get("data", {}).get("final"):
break
except asyncio.TimeoutError:
# heartbeat
yield ":keepalive\n\n"
finally:
try:
_active_subscribers.get(task_id, []).remove(q)
except ValueError:
pass
return StreamingResponse(gen(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
# ----- LLM endpoints -------------------------------------------------------
@app.post("/api/llm/chat")
def llm_chat(body: ChatBody):
router = get_router()
try:
text = router.chat(
body.messages, model=body.model, temperature=body.temperature,
max_tokens=body.max_tokens, preferred_provider=body.preferred_provider,
)
return {"ok": True, "text": text, "telemetry": router.telemetry()}
except Exception as e:
return JSONResponse({"ok": False, "error": str(e)}, status_code=500)
@app.post("/api/chat")
def chat_stream(body: ChatBody):
"""SSE chat stream."""
router = get_router()
def gen():
for chunk in router.stream(
body.messages, model=body.model, temperature=body.temperature,
max_tokens=body.max_tokens, preferred_provider=body.preferred_provider,
):
yield f"data: {json.dumps({'delta': chunk})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(gen(), media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"})
# ----- Deploy endpoints ----------------------------------------------------
@app.post("/api/deploy/huggingface")
def deploy_hf(body: HFDeployBody):
src = os.path.abspath(body.source_dir)
if not os.path.isdir(src):
raise HTTPException(400, f"source_dir not found: {src}")
r = deployers.hf_push_space(source_dir=src, repo_id=body.repo_id, commit_message=body.commit_message)
if r.get("ok"):
tasks.record_deployment("", "huggingface", r.get("url", ""), "ok")
else:
tasks.record_deployment("", "huggingface", "", "failed")
return r
@app.post("/api/deploy/vercel")
def deploy_vercel(body: VercelDeployBody):
src = os.path.abspath(body.source_dir)
if not os.path.isdir(src):
raise HTTPException(400, f"source_dir not found: {src}")
files = deployers.collect_files_for_vercel(src)
r = deployers.vercel_deploy_via_api(
project_name=body.project_name, files=files, target=body.target,
env=body.env, framework=body.framework,
install_command=body.install_command, build_command=body.build_command,
)
if r.get("ok"):
tasks.record_deployment("", "vercel", r.get("url", ""), "ok")
return r
@app.post("/api/git/push")
def git_push(body: GitPushBody):
repo_dir = os.path.abspath(body.repo_dir)
if not os.path.isdir(repo_dir):
raise HTTPException(400, f"repo_dir not found: {repo_dir}")
return deployers.github_push(
repo_dir=repo_dir, branch=body.branch,
commit_message=body.commit_message, remote_url=body.remote_url,
)
# ---------------------------------------------------------------------------
# Startup self-check
# ---------------------------------------------------------------------------
@app.on_event("startup")
def startup_check():
logger.info("AI Developer Agent starting")
try:
tasks.init_db()
info = get_executor().inspect_runtime()
logger.info("Runtime: %s", info)
logger.info("Providers: %s", list(get_router().telemetry().keys()))
except Exception as e:
logger.warning("Startup check error: %s", e)
# Allow running directly
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", "7860"))
uvicorn.run("apps.backend.app:app", host="0.0.0.0", port=port, log_level="info")