""" FastAPI backend - AI Developer Agent ==================================== Endpoints: GET / service info GET /health health check GET /api/runtime runtime + provider telemetry POST /api/tasks create + run a task (sync queued) GET /api/tasks list tasks GET /api/tasks/{id} get task GET /api/tasks/{id}/events list events (REST) GET /api/tasks/{id}/stream SSE event stream (live) POST /api/chat one-shot chat (streams) POST /api/llm/chat chat (non-streaming JSON) POST /api/deploy/huggingface push backend dir to HF Space POST /api/deploy/vercel deploy frontend dir to Vercel POST /api/git/push commit + push to GitHub branch All endpoints accept JSON bodies and return JSON unless documented otherwise. """ from __future__ import annotations import asyncio import json import logging import os import threading import time from typing import Any, Dict, List, Optional from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel from . import tasks from .agent import run_task from .llm_router import get_router from .executor import get_executor from . import deployers logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") logger = logging.getLogger("app") app = FastAPI(title="AI Developer Agent", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=os.getenv("CORS_ALLOW_ORIGINS", "*").split(","), allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) # --------------------------------------------------------------------------- # In-memory task queue (background worker) # --------------------------------------------------------------------------- _task_queue: "asyncio.Queue" = asyncio.Queue() _active_subscribers: Dict[str, List[asyncio.Queue]] = {} def _publish(task_id: str, event: Dict[str, Any]) -> None: for q in list(_active_subscribers.get(task_id, [])): try: q.put_nowait(event) except Exception: pass def _worker_run(task_id: str, title: str, description: str) -> None: """Run the agent generator in a thread and publish events.""" try: for ev in run_task(task_id, title, description): _publish(task_id, ev) except Exception as e: logger.exception("worker crashed") _publish(task_id, {"task_id": task_id, "kind": "error", "message": str(e), "ts": time.time(), "data": {}}) # --------------------------------------------------------------------------- # Schemas # --------------------------------------------------------------------------- class CreateTaskBody(BaseModel): title: str description: str = "" payload: Optional[Dict[str, Any]] = None class ChatBody(BaseModel): messages: List[Dict[str, str]] model: Optional[str] = None temperature: float = 0.2 max_tokens: int = 1500 preferred_provider: Optional[str] = None class HFDeployBody(BaseModel): repo_id: str source_dir: str = "." commit_message: str = "Update from AI Developer Agent" class VercelDeployBody(BaseModel): project_name: str source_dir: str framework: Optional[str] = "nextjs" target: str = "production" install_command: Optional[str] = None build_command: Optional[str] = None env: Optional[Dict[str, str]] = None class GitPushBody(BaseModel): repo_dir: str = "." branch: str = "genspark_ai_developer" commit_message: str = "AI Developer Agent commit" remote_url: Optional[str] = None # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.get("/") def index(): return { "service": "AI Developer Agent", "version": "1.0.0", "ok": True, "endpoints": [ "/health", "/api/runtime", "/api/tasks", "/api/tasks/{id}/stream", "/api/chat", "/api/llm/chat", "/api/deploy/huggingface", "/api/deploy/vercel", "/api/git/push", ], } @app.get("/health") def health(): router = get_router() return { "ok": True, "ts": time.time(), "providers": list(router.telemetry().keys()), "executor": "e2b" if (get_executor().sandbox and get_executor().sandbox.available) else "local", } @app.get("/api/runtime") def runtime(): info = get_executor().inspect_runtime() info["providers"] = get_router().telemetry() info["db"] = tasks.DB_PATH return info # ----- Tasks --------------------------------------------------------------- @app.post("/api/tasks") def create_task(body: CreateTaskBody): task_id = tasks.create_task(body.title, body.description, body.payload or {}) t = threading.Thread(target=_worker_run, args=(task_id, body.title, body.description), daemon=True) t.start() return {"task_id": task_id, "title": body.title, "state": "queued"} @app.get("/api/tasks") def list_tasks(limit: int = 50): return {"tasks": tasks.list_tasks(limit=limit)} @app.get("/api/tasks/{task_id}") def get_task(task_id: str): t = tasks.get_task(task_id) if not t: raise HTTPException(404, "task not found") return t @app.get("/api/tasks/{task_id}/events") def get_events(task_id: str, since_id: int = 0, limit: int = 1000): return {"events": tasks.get_events(task_id, since_id=since_id, limit=limit)} @app.get("/api/tasks/{task_id}/stream") async def stream_events(task_id: str, request: Request): """Server-Sent Events stream. Replays historical events then live events.""" async def gen(): # 1) Replay history last_id = 0 history = tasks.get_events(task_id, since_id=0, limit=2000) for ev in history: last_id = ev["id"] yield f"id: {ev['id']}\nevent: {ev['kind']}\ndata: {json.dumps(ev)}\n\n" # 2) Subscribe for live events q: asyncio.Queue = asyncio.Queue() _active_subscribers.setdefault(task_id, []).append(q) try: while True: if await request.is_disconnected(): break try: ev = await asyncio.wait_for(q.get(), timeout=15.0) yield f"event: {ev['kind']}\ndata: {json.dumps(ev)}\n\n" if ev["kind"] in ("done", "error") and ev.get("data", {}).get("final"): break except asyncio.TimeoutError: # heartbeat yield ":keepalive\n\n" finally: try: _active_subscribers.get(task_id, []).remove(q) except ValueError: pass return StreamingResponse(gen(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) # ----- LLM endpoints ------------------------------------------------------- @app.post("/api/llm/chat") def llm_chat(body: ChatBody): router = get_router() try: text = router.chat( body.messages, model=body.model, temperature=body.temperature, max_tokens=body.max_tokens, preferred_provider=body.preferred_provider, ) return {"ok": True, "text": text, "telemetry": router.telemetry()} except Exception as e: return JSONResponse({"ok": False, "error": str(e)}, status_code=500) @app.post("/api/chat") def chat_stream(body: ChatBody): """SSE chat stream.""" router = get_router() def gen(): for chunk in router.stream( body.messages, model=body.model, temperature=body.temperature, max_tokens=body.max_tokens, preferred_provider=body.preferred_provider, ): yield f"data: {json.dumps({'delta': chunk})}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(gen(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}) # ----- Deploy endpoints ---------------------------------------------------- @app.post("/api/deploy/huggingface") def deploy_hf(body: HFDeployBody): src = os.path.abspath(body.source_dir) if not os.path.isdir(src): raise HTTPException(400, f"source_dir not found: {src}") r = deployers.hf_push_space(source_dir=src, repo_id=body.repo_id, commit_message=body.commit_message) if r.get("ok"): tasks.record_deployment("", "huggingface", r.get("url", ""), "ok") else: tasks.record_deployment("", "huggingface", "", "failed") return r @app.post("/api/deploy/vercel") def deploy_vercel(body: VercelDeployBody): src = os.path.abspath(body.source_dir) if not os.path.isdir(src): raise HTTPException(400, f"source_dir not found: {src}") files = deployers.collect_files_for_vercel(src) r = deployers.vercel_deploy_via_api( project_name=body.project_name, files=files, target=body.target, env=body.env, framework=body.framework, install_command=body.install_command, build_command=body.build_command, ) if r.get("ok"): tasks.record_deployment("", "vercel", r.get("url", ""), "ok") return r @app.post("/api/git/push") def git_push(body: GitPushBody): repo_dir = os.path.abspath(body.repo_dir) if not os.path.isdir(repo_dir): raise HTTPException(400, f"repo_dir not found: {repo_dir}") return deployers.github_push( repo_dir=repo_dir, branch=body.branch, commit_message=body.commit_message, remote_url=body.remote_url, ) # --------------------------------------------------------------------------- # Startup self-check # --------------------------------------------------------------------------- @app.on_event("startup") def startup_check(): logger.info("AI Developer Agent starting") try: tasks.init_db() info = get_executor().inspect_runtime() logger.info("Runtime: %s", info) logger.info("Providers: %s", list(get_router().telemetry().keys())) except Exception as e: logger.warning("Startup check error: %s", e) # Allow running directly if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", "7860")) uvicorn.run("apps.backend.app:app", host="0.0.0.0", port=port, log_level="info")