PYAE1994's picture
feat(phase5): Upload apps/backend/api/main.py
5422cbb verified
"""FastAPI entrypoint — Phase 5 (Autonomous Reasoning Engine).
Routes (Phase 1 — all preserved):
GET / → landing JSON
GET /api/health → full health snapshot
POST /api/tasks → create + run task
GET /api/tasks → list latest tasks
GET /api/tasks/{id} → task detail with steps
POST /api/tasks/{id}/cancel → cooperative cancel + SIGKILL
GET /api/tasks/{id}/files → list workdir contents
GET /api/tasks/{id}/files/{path} → read a single sandbox file
Phase 2 routes (preserved):
GET /api/tasks/{id}/dag → DAG node status JSON
GET /api/memory → recent episodic memories
GET /api/memory/search?q=... → search episodic memory
Phase 4 routes (preserved):
POST /api/queue/submit → submit job to priority queue
GET /api/queue/status → queue depth + running count
GET /api/queue/jobs → list queued jobs
GET /api/queue/jobs/{job_id} → get specific job
POST /api/tasks/{id}/resume → resume from last checkpoint
GET /api/tasks/{id}/checkpoints → list task checkpoints
POST /api/workspace/projects → create workspace project
GET /api/workspace/projects → list workspace projects
GET /api/workspace/projects/{id} → get project detail
POST /api/workspace/projects/{id}/notes → add project note
GET /api/workspace/projects/{id}/notes → get project notes
Phase 5 routes (new):
POST /api/reason → run recursive plan-eval-replan reasoning
POST /api/debug → run self-debugging coding loop
POST /api/tasks/{id}/reason → attach reasoning to existing task
GET /api/reason/tools → list available dynamic tools
GET /api/tasks/{id}/reasoning → get reasoning result for a task
"""
from __future__ import annotations
import asyncio
import json
import os
import threading
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from fastapi import Depends, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from sqlalchemy.orm import Session
from ..agent import tools as agent_tools
from ..agent.e2b_sandbox import e2b_status, is_e2b_enabled
from ..agent.memory import get_memory
from ..agent.queue_engine import JobPriority, QueueJob, get_queue, get_worker
from ..agent.checkpoint import get_checkpoint_store
from ..agent.workspace_memory import get_workspace_memory
from ..agent.runtime import get_registry
from ..agent.runner_v2 import AgentRunnerV2
from ..agent.runner_v4 import AgentRunnerV4
from ..agent.runner_v5 import AgentRunnerV5
from ..agent.state_machine import AgentRunner # Phase 1 legacy
from ..agent.reasoner import RecursiveReasoner, AVAILABLE_TOOLS
from ..agent.self_debug import SelfDebuggingEngine
from ..db.schema import Step, Task, get_session, init_db
from ..router.smart_router import get_router
# In-memory store for reasoning results (task_id → result dict)
_reasoning_store: Dict[int, Dict[str, Any]] = {}
# ---------- App setup ---------- #
app = FastAPI(
title="OpenHands-Genspark AI Developer Agent",
description=(
"Manus-Class Autonomous AI Developer — Phase 5 (Autonomous Reasoning Engine). "
"Recursive planning, self-debugging code loops, dynamic tool selection."
),
version="5.0.0",
)
allowed = os.getenv("ALLOWED_ORIGINS", "*").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in allowed],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
def _on_startup() -> None:
init_db()
# Start Phase 4 queue worker (preserved)
worker = get_worker()
worker.start()
@app.on_event("shutdown")
def _on_shutdown() -> None:
get_worker().stop()
# ---------- Pydantic Schemas ---------- #
class TaskCreate(BaseModel):
title: str
prompt: Optional[str] = None
runner: Optional[str] = "v5" # "v1" | "v2" | "v4" | "v5"
priority: Optional[int] = 2 # 0=CRITICAL 1=HIGH 2=NORMAL 3=LOW
project_id: Optional[str] = None
class StepOut(BaseModel):
id: int
description: str
status: str
output: str
error_log: str
class Config:
from_attributes = True
class TaskOut(BaseModel):
id: int
title: str
status: str
created_at: datetime
updated_at: datetime
current_step_id: Optional[int]
runner_ver: Optional[str] = "v5"
steps: List[StepOut] = []
class Config:
from_attributes = True
class QueueSubmit(BaseModel):
title: str
prompt: Optional[str] = None
priority: Optional[int] = 2
project_id: Optional[str] = None
class ProjectCreate(BaseModel):
name: str
description: Optional[str] = ""
tech_stack: Optional[List[str]] = []
goals: Optional[List[str]] = []
conventions: Optional[Dict[str, str]] = {}
class ProjectNote(BaseModel):
note: str
tags: Optional[List[str]] = []
# Phase 5 schemas
class ReasonRequest(BaseModel):
goal: str
max_rounds: Optional[int] = 4
success_threshold: Optional[float] = 0.80
memory_context: Optional[str] = ""
task_id: Optional[int] = None # link to existing task
class DebugRequest(BaseModel):
goal: str
language: Optional[str] = "python"
max_attempts: Optional[int] = 4
initial_code: Optional[str] = ""
memory_context: Optional[str] = ""
task_id: Optional[int] = None
# ---------- Routes — Landing + Health ---------- #
@app.get("/")
def landing():
return {
"service": "OpenHands-Genspark AI Developer Agent",
"blueprint": "v5.0 (Phase 5 — Autonomous Reasoning Engine)",
"phase": "5",
"features": [
"Recursive plan-eval-replan reasoning engine",
"Self-debugging code generation loop (generate→execute→test→fix)",
"Dynamic tool selection at node level",
"Code critique + quality review engine",
"Priority-based task queue (Redis/in-memory)",
"Checkpoint-based crash recovery + resume",
"Workspace project memory (Supabase PostgreSQL)",
"Long-running async job execution",
"Phase 1-4 capabilities fully preserved",
"DAG-based multi-step planning",
"Episodic memory (SQLite)",
"Self-recovery with exponential back-off",
"E2B cloud sandbox",
],
"endpoints": [
"GET /api/health",
"POST /api/tasks",
"GET /api/tasks",
"GET /api/tasks/{id}",
"POST /api/tasks/{id}/cancel",
"POST /api/tasks/{id}/resume",
"GET /api/tasks/{id}/dag",
"GET /api/tasks/{id}/files",
"GET /api/tasks/{id}/files/{path}",
"GET /api/tasks/{id}/checkpoints",
"GET /api/memory",
"GET /api/memory/search?q=<query>",
"POST /api/queue/submit",
"GET /api/queue/status",
"GET /api/queue/jobs",
"GET /api/queue/jobs/{job_id}",
"POST /api/workspace/projects",
"GET /api/workspace/projects",
"GET /api/workspace/projects/{id}",
"POST /api/workspace/projects/{id}/notes",
"GET /api/workspace/projects/{id}/notes",
# Phase 5
"POST /api/reason",
"POST /api/debug",
"POST /api/tasks/{id}/reason",
"GET /api/reason/tools",
"GET /api/tasks/{id}/reasoning",
],
}
@app.get("/api/health")
def health():
router = get_router()
snap = router.status_snapshot()
active = sum(1 for k in snap if k["status"] == "ACTIVE")
mem = get_memory()
mem_stats = {"recent_memories": len(mem.task_history_summary(limit=20))}
# Phase 4 stats
q = get_queue()
wm = get_workspace_memory()
q_stats = {
"queue_depth": q.queue_depth(),
"running_jobs": q.running_count(),
"backend": "redis" if hasattr(q, "_r") else "memory",
}
return {
"status": "ok",
"time": datetime.utcnow().isoformat() + "Z",
"phase": "5",
"total_keys": len(snap),
"active_keys": active,
"providers": sorted({k["provider"] for k in snap}),
"keys": snap,
"sandbox_root": str(agent_tools.SANDBOX_ROOT),
"e2b": e2b_status(),
"memory": mem_stats,
"version": "5.0.0",
"queue": q_stats,
"workspace": wm.status(),
"reasoning": {
"cached_results": len(_reasoning_store),
"available_tools": list(AVAILABLE_TOOLS.keys()),
},
"capabilities": {
"dag_planning": True,
"episodic_memory": True,
"self_recovery": True,
"e2b_sandbox": is_e2b_enabled(),
"task_queue": True, # Phase 4
"checkpoint_resume": True, # Phase 4
"workspace_memory": True, # Phase 4
"recursive_reasoning": True, # Phase 5
"self_debugging": True, # Phase 5
"dynamic_tool_selection": True, # Phase 5
"code_critique": True, # Phase 5
"browser_automation": False, # Phase 3 (separate HF space)
"multi_agent": False, # Phase 6
},
}
# ---------- Task management ---------- #
def _run_task_v5(task_id: int, prompt: str, project_id: Optional[str] = None) -> None:
loop = asyncio.new_event_loop()
try:
runner = AgentRunnerV5(task_id, project_id=project_id)
result = loop.run_until_complete(runner.run(prompt))
# Store reasoning result
_reasoning_store[task_id] = result
# Save DAG if present
dag_data = result.get("reasoning", {}).get("final_dag")
if dag_data:
from ..db.schema import SessionLocal
with SessionLocal() as db:
t = db.get(Task, task_id)
if t:
t.dag_json = json.dumps(dag_data)
db.commit()
except Exception as exc:
import logging
logging.exception("AgentRunnerV5 failed: %s", exc)
finally:
loop.close()
def _run_task_v4(task_id: int, prompt: str, project_id: Optional[str] = None) -> None:
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(AgentRunnerV4(task_id, project_id).run(prompt))
dag_data = result.get("dag")
if dag_data:
from ..db.schema import SessionLocal
with SessionLocal() as db:
t = db.get(Task, task_id)
if t:
t.dag_json = json.dumps(dag_data)
db.commit()
except Exception as exc:
import logging
logging.exception("AgentRunnerV4 failed: %s", exc)
finally:
loop.close()
def _run_task_v2(task_id: int, prompt: str) -> None:
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(AgentRunnerV2(task_id).run(prompt))
dag_data = result.get("dag")
if dag_data:
from ..db.schema import SessionLocal
with SessionLocal() as db:
t = db.get(Task, task_id)
if t:
t.dag_json = json.dumps(dag_data)
db.commit()
except Exception as exc:
import logging
logging.exception("AgentRunnerV2 failed: %s", exc)
finally:
loop.close()
def _run_task_v1(task_id: int, prompt: str) -> None:
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(AgentRunner(task_id).run(prompt))
except Exception as exc:
import logging
logging.exception("AgentRunner v1 failed: %s", exc)
finally:
loop.close()
@app.post("/api/tasks", response_model=TaskOut)
def create_task(body: TaskCreate, db: Session = Depends(get_session)):
runner_ver = (body.runner or "v5").lower()
if runner_ver not in ("v1", "v2", "v4", "v5"):
runner_ver = "v5"
t = Task(title=body.title, status="INIT", runner_ver=runner_ver)
db.add(t)
db.commit()
db.refresh(t)
prompt = body.prompt or body.title
task_id = t.id
get_registry().register(task_id)
if runner_ver == "v5":
th = threading.Thread(target=_run_task_v5, args=(task_id, prompt, body.project_id), daemon=True)
elif runner_ver == "v4":
th = threading.Thread(target=_run_task_v4, args=(task_id, prompt, body.project_id), daemon=True)
elif runner_ver == "v2":
th = threading.Thread(target=_run_task_v2, args=(task_id, prompt), daemon=True)
else:
th = threading.Thread(target=_run_task_v1, args=(task_id, prompt), daemon=True)
th.start()
return _serialize_task(t, db)
@app.get("/api/tasks", response_model=List[TaskOut])
def list_tasks(db: Session = Depends(get_session)):
rows = db.query(Task).order_by(Task.created_at.desc()).limit(50).all()
return [_serialize_task(r, db) for r in rows]
@app.get("/api/tasks/{task_id}", response_model=TaskOut)
def get_task(task_id: int, db: Session = Depends(get_session)):
t = db.query(Task).get(task_id)
if not t:
raise HTTPException(404, "Task not found")
return _serialize_task(t, db)
@app.post("/api/tasks/{task_id}/cancel")
def cancel_task(task_id: int, db: Session = Depends(get_session)):
t = db.query(Task).get(task_id)
if not t:
raise HTTPException(404, "Task not found")
if t.status in ("COMPLETE", "COMPLETED", "FAILED", "CANCELLED"):
return {"ok": True, "already_terminal": True, "status": t.status}
applied = get_registry().cancel(task_id)
return {"ok": True, "applied": applied, "status": t.status}
@app.post("/api/tasks/{task_id}/resume")
def resume_task(task_id: int, db: Session = Depends(get_session)):
"""Resume a task from its last checkpoint."""
t = db.query(Task).get(task_id)
if not t:
raise HTTPException(404, "Task not found")
cp = get_checkpoint_store().load_latest(task_id)
if not cp:
raise HTTPException(400, "No checkpoint found for this task")
prompt = t.title
get_registry().register(task_id)
th = threading.Thread(target=_run_task_v5, args=(task_id, prompt, None), daemon=True)
th.start()
return {
"ok": True,
"task_id": task_id,
"resumed_from": cp.node_id,
"pending_nodes": cp.pending_nodes,
"resume_count": cp.resume_count,
}
# ---------- DAG endpoint (Phase 2, preserved) ---------- #
@app.get("/api/tasks/{task_id}/dag")
def get_task_dag(task_id: int, db: Session = Depends(get_session)):
t = db.query(Task).get(task_id)
if not t:
raise HTTPException(404, "Task not found")
if not t.dag_json:
return {
"task_id": task_id,
"runner_ver": getattr(t, "runner_ver", "v5"),
"dag": None,
"message": "DAG not yet available",
}
try:
dag_data = json.loads(t.dag_json)
except Exception:
dag_data = None
return {
"task_id": task_id,
"runner_ver": getattr(t, "runner_ver", "v5"),
"status": t.status,
"dag": dag_data,
}
# ---------- Checkpoint endpoints (Phase 4, preserved) ---------- #
@app.get("/api/tasks/{task_id}/checkpoints")
def list_checkpoints(task_id: int):
cps = get_checkpoint_store().list_checkpoints(task_id)
return {
"task_id": task_id,
"checkpoints": cps,
"count": len(cps),
}
# ---------- Queue endpoints (Phase 4, preserved) ---------- #
@app.post("/api/queue/submit")
def queue_submit(body: QueueSubmit, db: Session = Depends(get_session)):
"""Submit a task to the priority queue."""
t = Task(title=body.title, status="QUEUED", runner_ver="v5")
db.add(t)
db.commit()
db.refresh(t)
job_id = str(uuid.uuid4())[:12]
prompt = body.prompt or body.title
priority = JobPriority(min(max(body.priority or 2, 0), 3))
job = QueueJob(
job_id=job_id,
task_id=t.id,
prompt=prompt,
priority=priority,
)
get_queue().enqueue(job)
get_registry().register(t.id)
return {
"ok": True,
"job_id": job_id,
"task_id": t.id,
"priority": int(priority),
"priority_name": priority.name,
"queue_depth": get_queue().queue_depth(),
}
@app.get("/api/queue/status")
def queue_status():
q = get_queue()
return {
"queue_depth": q.queue_depth(),
"running_jobs": q.running_count(),
"backend": "redis" if hasattr(q, "_r") else "memory",
}
@app.get("/api/queue/jobs")
def list_queue_jobs():
jobs = get_queue().list_queued()
return {
"jobs": [j.to_dict() for j in jobs],
"count": len(jobs),
}
@app.get("/api/queue/jobs/{job_id}")
def get_queue_job(job_id: str):
job = get_queue().get_job(job_id)
if not job:
raise HTTPException(404, "Job not found")
return job.to_dict()
# ---------- Memory endpoints (Phase 2, preserved) ---------- #
@app.get("/api/memory")
def get_memory_history(limit: int = Query(default=20, le=100)):
mem = get_memory()
return {"memories": mem.task_history_summary(limit=limit), "count": limit}
@app.get("/api/memory/search")
def search_memory(q: str = Query(..., min_length=2)):
mem = get_memory()
results = mem.retrieve_similar(q, top_k=10)
return {"query": q, "results": results, "count": len(results)}
# ---------- Workspace Project endpoints (Phase 4, preserved) ---------- #
@app.post("/api/workspace/projects")
def create_project(body: ProjectCreate):
wm = get_workspace_memory()
proj = wm.create_project(
name=body.name,
description=body.description or "",
tech_stack=body.tech_stack or [],
goals=body.goals or [],
conventions=body.conventions or {},
)
return {"ok": True, "project": proj.to_dict()}
@app.get("/api/workspace/projects")
def list_projects():
wm = get_workspace_memory()
return {"projects": wm.list_projects()}
@app.get("/api/workspace/projects/{project_id}")
def get_project(project_id: str):
wm = get_workspace_memory()
proj = wm.get_project(project_id)
if not proj:
raise HTTPException(404, "Project not found")
return proj.to_dict()
@app.post("/api/workspace/projects/{project_id}/notes")
def add_project_note(project_id: str, body: ProjectNote):
wm = get_workspace_memory()
proj = wm.get_project(project_id)
if not proj:
raise HTTPException(404, "Project not found")
wm.add_note(project_id, body.note, body.tags)
return {"ok": True}
@app.get("/api/workspace/projects/{project_id}/notes")
def get_project_notes(project_id: str, limit: int = Query(default=20, le=100)):
wm = get_workspace_memory()
notes = wm.get_notes(project_id, limit=limit)
return {"project_id": project_id, "notes": notes, "count": len(notes)}
# ---------- Phase 5 — Reasoning endpoints ---------- #
@app.get("/api/reason/tools")
def list_tools():
"""List all dynamically selectable tools."""
return {
"tools": [
{"name": k, "description": v}
for k, v in AVAILABLE_TOOLS.items()
],
"count": len(AVAILABLE_TOOLS),
}
@app.post("/api/reason")
async def run_reasoning(body: ReasonRequest, db: Session = Depends(get_session)):
"""Run the recursive plan-eval-replan reasoning engine on a goal."""
task_id = body.task_id
if not task_id:
# Create a synthetic task to anchor the reasoning run
t = Task(title=f"Reasoning: {body.goal[:80]}", status="INIT", runner_ver="v5")
db.add(t)
db.commit()
db.refresh(t)
task_id = t.id
reasoner = RecursiveReasoner(
task_id=task_id,
max_rounds=min(body.max_rounds or 4, 8),
success_threshold=body.success_threshold or 0.80,
)
try:
result = await reasoner.reason(
body.goal,
memory_context=body.memory_context or "",
)
result_dict = result.to_dict()
_reasoning_store[task_id] = result_dict
# Update task status
t2 = db.query(Task).get(task_id)
if t2:
t2.status = "COMPLETE" if result.success else "FAILED"
db.commit()
return {
"ok": True,
"task_id": task_id,
"result": result_dict,
}
except Exception as exc:
raise HTTPException(500, f"Reasoning failed: {exc}")
@app.post("/api/debug")
async def run_self_debug(body: DebugRequest, db: Session = Depends(get_session)):
"""Run the self-debugging code generation loop."""
task_id = body.task_id
if not task_id:
t = Task(title=f"Debug: {body.goal[:80]}", status="INIT", runner_ver="v5")
db.add(t)
db.commit()
db.refresh(t)
task_id = t.id
engine = SelfDebuggingEngine(
task_id=task_id,
max_attempts=min(body.max_attempts or 4, 8),
language=body.language or "python",
run_tests_flag=True,
)
try:
result = await engine.debug(
body.goal,
initial_code=body.initial_code or "",
memory_context=body.memory_context or "",
)
result_dict = result.to_dict()
t2 = db.query(Task).get(task_id)
if t2:
t2.status = "COMPLETE" if result.success else "FAILED"
db.commit()
return {
"ok": True,
"task_id": task_id,
"result": result_dict,
}
except Exception as exc:
raise HTTPException(500, f"Self-debug failed: {exc}")
@app.post("/api/tasks/{task_id}/reason")
async def reason_on_task(task_id: int, body: ReasonRequest, db: Session = Depends(get_session)):
"""Attach recursive reasoning to an existing task."""
t = db.query(Task).get(task_id)
if not t:
raise HTTPException(404, "Task not found")
reasoner = RecursiveReasoner(
task_id=task_id,
max_rounds=min(body.max_rounds or 4, 8),
success_threshold=body.success_threshold or 0.80,
)
result = await reasoner.reason(
body.goal or t.title,
memory_context=body.memory_context or "",
)
result_dict = result.to_dict()
_reasoning_store[task_id] = result_dict
if result.final_dag:
t.dag_json = json.dumps(result.final_dag)
t.status = "COMPLETE" if result.success else "FAILED"
db.commit()
return {"ok": True, "task_id": task_id, "result": result_dict}
@app.get("/api/tasks/{task_id}/reasoning")
def get_task_reasoning(task_id: int):
"""Get the stored reasoning result for a task."""
result = _reasoning_store.get(task_id)
if not result:
return {"task_id": task_id, "reasoning": None, "message": "No reasoning result found"}
return {"task_id": task_id, "reasoning": result}
# ---------- File endpoints (Phase 1 preserved) ---------- #
@app.get("/api/tasks/{task_id}/files")
def list_task_files(task_id: int, path: str = "."):
workdir = agent_tools.get_workdir(task_id)
res = agent_tools.fs_list(workdir, path)
return {
"task_id": task_id,
"path": path,
"ok": res.ok,
"output": res.output,
"entries": res.data.get("entries", []),
"workdir": str(workdir),
}
@app.get("/api/tasks/{task_id}/files/{file_path:path}")
def read_task_file(task_id: int, file_path: str):
workdir = agent_tools.get_workdir(task_id)
res = agent_tools.fs_read(workdir, file_path)
if not res.ok:
raise HTTPException(404, res.output)
return {
"task_id": task_id,
"path": file_path,
"content": res.output,
"truncated": res.data.get("truncated", False),
"size": res.data.get("size", 0),
}
# ---------- Helpers ---------- #
def _serialize_task(t: Task, db: Session) -> TaskOut:
steps = (
db.query(Step)
.filter(Step.task_id == t.id)
.order_by(Step.id.asc())
.all()
)
return TaskOut(
id=t.id,
title=t.title,
status=t.status,
created_at=t.created_at,
updated_at=t.updated_at,
current_step_id=t.current_step_id,
runner_ver=getattr(t, "runner_ver", "v5"),
steps=[StepOut.model_validate(s) for s in steps],
)