"""FastAPI entrypoint — Phase 5 (Autonomous Reasoning Engine). Routes (Phase 1 — all preserved): GET / → landing JSON GET /api/health → full health snapshot POST /api/tasks → create + run task GET /api/tasks → list latest tasks GET /api/tasks/{id} → task detail with steps POST /api/tasks/{id}/cancel → cooperative cancel + SIGKILL GET /api/tasks/{id}/files → list workdir contents GET /api/tasks/{id}/files/{path} → read a single sandbox file Phase 2 routes (preserved): GET /api/tasks/{id}/dag → DAG node status JSON GET /api/memory → recent episodic memories GET /api/memory/search?q=... → search episodic memory Phase 4 routes (preserved): POST /api/queue/submit → submit job to priority queue GET /api/queue/status → queue depth + running count GET /api/queue/jobs → list queued jobs GET /api/queue/jobs/{job_id} → get specific job POST /api/tasks/{id}/resume → resume from last checkpoint GET /api/tasks/{id}/checkpoints → list task checkpoints POST /api/workspace/projects → create workspace project GET /api/workspace/projects → list workspace projects GET /api/workspace/projects/{id} → get project detail POST /api/workspace/projects/{id}/notes → add project note GET /api/workspace/projects/{id}/notes → get project notes Phase 5 routes (new): POST /api/reason → run recursive plan-eval-replan reasoning POST /api/debug → run self-debugging coding loop POST /api/tasks/{id}/reason → attach reasoning to existing task GET /api/reason/tools → list available dynamic tools GET /api/tasks/{id}/reasoning → get reasoning result for a task """ from __future__ import annotations import asyncio import json import os import threading import uuid from datetime import datetime from typing import Any, Dict, List, Optional from fastapi import Depends, FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from sqlalchemy.orm import Session from ..agent import tools as agent_tools from ..agent.e2b_sandbox import e2b_status, is_e2b_enabled from ..agent.memory import get_memory from ..agent.queue_engine import JobPriority, QueueJob, get_queue, get_worker from ..agent.checkpoint import get_checkpoint_store from ..agent.workspace_memory import get_workspace_memory from ..agent.runtime import get_registry from ..agent.runner_v2 import AgentRunnerV2 from ..agent.runner_v4 import AgentRunnerV4 from ..agent.runner_v5 import AgentRunnerV5 from ..agent.state_machine import AgentRunner # Phase 1 legacy from ..agent.reasoner import RecursiveReasoner, AVAILABLE_TOOLS from ..agent.self_debug import SelfDebuggingEngine from ..db.schema import Step, Task, get_session, init_db from ..router.smart_router import get_router # In-memory store for reasoning results (task_id → result dict) _reasoning_store: Dict[int, Dict[str, Any]] = {} # ---------- App setup ---------- # app = FastAPI( title="OpenHands-Genspark AI Developer Agent", description=( "Manus-Class Autonomous AI Developer — Phase 5 (Autonomous Reasoning Engine). " "Recursive planning, self-debugging code loops, dynamic tool selection." ), version="5.0.0", ) allowed = os.getenv("ALLOWED_ORIGINS", "*").split(",") app.add_middleware( CORSMiddleware, allow_origins=[o.strip() for o in allowed], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.on_event("startup") def _on_startup() -> None: init_db() # Start Phase 4 queue worker (preserved) worker = get_worker() worker.start() @app.on_event("shutdown") def _on_shutdown() -> None: get_worker().stop() # ---------- Pydantic Schemas ---------- # class TaskCreate(BaseModel): title: str prompt: Optional[str] = None runner: Optional[str] = "v5" # "v1" | "v2" | "v4" | "v5" priority: Optional[int] = 2 # 0=CRITICAL 1=HIGH 2=NORMAL 3=LOW project_id: Optional[str] = None class StepOut(BaseModel): id: int description: str status: str output: str error_log: str class Config: from_attributes = True class TaskOut(BaseModel): id: int title: str status: str created_at: datetime updated_at: datetime current_step_id: Optional[int] runner_ver: Optional[str] = "v5" steps: List[StepOut] = [] class Config: from_attributes = True class QueueSubmit(BaseModel): title: str prompt: Optional[str] = None priority: Optional[int] = 2 project_id: Optional[str] = None class ProjectCreate(BaseModel): name: str description: Optional[str] = "" tech_stack: Optional[List[str]] = [] goals: Optional[List[str]] = [] conventions: Optional[Dict[str, str]] = {} class ProjectNote(BaseModel): note: str tags: Optional[List[str]] = [] # Phase 5 schemas class ReasonRequest(BaseModel): goal: str max_rounds: Optional[int] = 4 success_threshold: Optional[float] = 0.80 memory_context: Optional[str] = "" task_id: Optional[int] = None # link to existing task class DebugRequest(BaseModel): goal: str language: Optional[str] = "python" max_attempts: Optional[int] = 4 initial_code: Optional[str] = "" memory_context: Optional[str] = "" task_id: Optional[int] = None # ---------- Routes — Landing + Health ---------- # @app.get("/") def landing(): return { "service": "OpenHands-Genspark AI Developer Agent", "blueprint": "v5.0 (Phase 5 — Autonomous Reasoning Engine)", "phase": "5", "features": [ "Recursive plan-eval-replan reasoning engine", "Self-debugging code generation loop (generate→execute→test→fix)", "Dynamic tool selection at node level", "Code critique + quality review engine", "Priority-based task queue (Redis/in-memory)", "Checkpoint-based crash recovery + resume", "Workspace project memory (Supabase PostgreSQL)", "Long-running async job execution", "Phase 1-4 capabilities fully preserved", "DAG-based multi-step planning", "Episodic memory (SQLite)", "Self-recovery with exponential back-off", "E2B cloud sandbox", ], "endpoints": [ "GET /api/health", "POST /api/tasks", "GET /api/tasks", "GET /api/tasks/{id}", "POST /api/tasks/{id}/cancel", "POST /api/tasks/{id}/resume", "GET /api/tasks/{id}/dag", "GET /api/tasks/{id}/files", "GET /api/tasks/{id}/files/{path}", "GET /api/tasks/{id}/checkpoints", "GET /api/memory", "GET /api/memory/search?q=", "POST /api/queue/submit", "GET /api/queue/status", "GET /api/queue/jobs", "GET /api/queue/jobs/{job_id}", "POST /api/workspace/projects", "GET /api/workspace/projects", "GET /api/workspace/projects/{id}", "POST /api/workspace/projects/{id}/notes", "GET /api/workspace/projects/{id}/notes", # Phase 5 "POST /api/reason", "POST /api/debug", "POST /api/tasks/{id}/reason", "GET /api/reason/tools", "GET /api/tasks/{id}/reasoning", ], } @app.get("/api/health") def health(): router = get_router() snap = router.status_snapshot() active = sum(1 for k in snap if k["status"] == "ACTIVE") mem = get_memory() mem_stats = {"recent_memories": len(mem.task_history_summary(limit=20))} # Phase 4 stats q = get_queue() wm = get_workspace_memory() q_stats = { "queue_depth": q.queue_depth(), "running_jobs": q.running_count(), "backend": "redis" if hasattr(q, "_r") else "memory", } return { "status": "ok", "time": datetime.utcnow().isoformat() + "Z", "phase": "5", "total_keys": len(snap), "active_keys": active, "providers": sorted({k["provider"] for k in snap}), "keys": snap, "sandbox_root": str(agent_tools.SANDBOX_ROOT), "e2b": e2b_status(), "memory": mem_stats, "version": "5.0.0", "queue": q_stats, "workspace": wm.status(), "reasoning": { "cached_results": len(_reasoning_store), "available_tools": list(AVAILABLE_TOOLS.keys()), }, "capabilities": { "dag_planning": True, "episodic_memory": True, "self_recovery": True, "e2b_sandbox": is_e2b_enabled(), "task_queue": True, # Phase 4 "checkpoint_resume": True, # Phase 4 "workspace_memory": True, # Phase 4 "recursive_reasoning": True, # Phase 5 "self_debugging": True, # Phase 5 "dynamic_tool_selection": True, # Phase 5 "code_critique": True, # Phase 5 "browser_automation": False, # Phase 3 (separate HF space) "multi_agent": False, # Phase 6 }, } # ---------- Task management ---------- # def _run_task_v5(task_id: int, prompt: str, project_id: Optional[str] = None) -> None: loop = asyncio.new_event_loop() try: runner = AgentRunnerV5(task_id, project_id=project_id) result = loop.run_until_complete(runner.run(prompt)) # Store reasoning result _reasoning_store[task_id] = result # Save DAG if present dag_data = result.get("reasoning", {}).get("final_dag") if dag_data: from ..db.schema import SessionLocal with SessionLocal() as db: t = db.get(Task, task_id) if t: t.dag_json = json.dumps(dag_data) db.commit() except Exception as exc: import logging logging.exception("AgentRunnerV5 failed: %s", exc) finally: loop.close() def _run_task_v4(task_id: int, prompt: str, project_id: Optional[str] = None) -> None: loop = asyncio.new_event_loop() try: result = loop.run_until_complete(AgentRunnerV4(task_id, project_id).run(prompt)) dag_data = result.get("dag") if dag_data: from ..db.schema import SessionLocal with SessionLocal() as db: t = db.get(Task, task_id) if t: t.dag_json = json.dumps(dag_data) db.commit() except Exception as exc: import logging logging.exception("AgentRunnerV4 failed: %s", exc) finally: loop.close() def _run_task_v2(task_id: int, prompt: str) -> None: loop = asyncio.new_event_loop() try: result = loop.run_until_complete(AgentRunnerV2(task_id).run(prompt)) dag_data = result.get("dag") if dag_data: from ..db.schema import SessionLocal with SessionLocal() as db: t = db.get(Task, task_id) if t: t.dag_json = json.dumps(dag_data) db.commit() except Exception as exc: import logging logging.exception("AgentRunnerV2 failed: %s", exc) finally: loop.close() def _run_task_v1(task_id: int, prompt: str) -> None: loop = asyncio.new_event_loop() try: loop.run_until_complete(AgentRunner(task_id).run(prompt)) except Exception as exc: import logging logging.exception("AgentRunner v1 failed: %s", exc) finally: loop.close() @app.post("/api/tasks", response_model=TaskOut) def create_task(body: TaskCreate, db: Session = Depends(get_session)): runner_ver = (body.runner or "v5").lower() if runner_ver not in ("v1", "v2", "v4", "v5"): runner_ver = "v5" t = Task(title=body.title, status="INIT", runner_ver=runner_ver) db.add(t) db.commit() db.refresh(t) prompt = body.prompt or body.title task_id = t.id get_registry().register(task_id) if runner_ver == "v5": th = threading.Thread(target=_run_task_v5, args=(task_id, prompt, body.project_id), daemon=True) elif runner_ver == "v4": th = threading.Thread(target=_run_task_v4, args=(task_id, prompt, body.project_id), daemon=True) elif runner_ver == "v2": th = threading.Thread(target=_run_task_v2, args=(task_id, prompt), daemon=True) else: th = threading.Thread(target=_run_task_v1, args=(task_id, prompt), daemon=True) th.start() return _serialize_task(t, db) @app.get("/api/tasks", response_model=List[TaskOut]) def list_tasks(db: Session = Depends(get_session)): rows = db.query(Task).order_by(Task.created_at.desc()).limit(50).all() return [_serialize_task(r, db) for r in rows] @app.get("/api/tasks/{task_id}", response_model=TaskOut) def get_task(task_id: int, db: Session = Depends(get_session)): t = db.query(Task).get(task_id) if not t: raise HTTPException(404, "Task not found") return _serialize_task(t, db) @app.post("/api/tasks/{task_id}/cancel") def cancel_task(task_id: int, db: Session = Depends(get_session)): t = db.query(Task).get(task_id) if not t: raise HTTPException(404, "Task not found") if t.status in ("COMPLETE", "COMPLETED", "FAILED", "CANCELLED"): return {"ok": True, "already_terminal": True, "status": t.status} applied = get_registry().cancel(task_id) return {"ok": True, "applied": applied, "status": t.status} @app.post("/api/tasks/{task_id}/resume") def resume_task(task_id: int, db: Session = Depends(get_session)): """Resume a task from its last checkpoint.""" t = db.query(Task).get(task_id) if not t: raise HTTPException(404, "Task not found") cp = get_checkpoint_store().load_latest(task_id) if not cp: raise HTTPException(400, "No checkpoint found for this task") prompt = t.title get_registry().register(task_id) th = threading.Thread(target=_run_task_v5, args=(task_id, prompt, None), daemon=True) th.start() return { "ok": True, "task_id": task_id, "resumed_from": cp.node_id, "pending_nodes": cp.pending_nodes, "resume_count": cp.resume_count, } # ---------- DAG endpoint (Phase 2, preserved) ---------- # @app.get("/api/tasks/{task_id}/dag") def get_task_dag(task_id: int, db: Session = Depends(get_session)): t = db.query(Task).get(task_id) if not t: raise HTTPException(404, "Task not found") if not t.dag_json: return { "task_id": task_id, "runner_ver": getattr(t, "runner_ver", "v5"), "dag": None, "message": "DAG not yet available", } try: dag_data = json.loads(t.dag_json) except Exception: dag_data = None return { "task_id": task_id, "runner_ver": getattr(t, "runner_ver", "v5"), "status": t.status, "dag": dag_data, } # ---------- Checkpoint endpoints (Phase 4, preserved) ---------- # @app.get("/api/tasks/{task_id}/checkpoints") def list_checkpoints(task_id: int): cps = get_checkpoint_store().list_checkpoints(task_id) return { "task_id": task_id, "checkpoints": cps, "count": len(cps), } # ---------- Queue endpoints (Phase 4, preserved) ---------- # @app.post("/api/queue/submit") def queue_submit(body: QueueSubmit, db: Session = Depends(get_session)): """Submit a task to the priority queue.""" t = Task(title=body.title, status="QUEUED", runner_ver="v5") db.add(t) db.commit() db.refresh(t) job_id = str(uuid.uuid4())[:12] prompt = body.prompt or body.title priority = JobPriority(min(max(body.priority or 2, 0), 3)) job = QueueJob( job_id=job_id, task_id=t.id, prompt=prompt, priority=priority, ) get_queue().enqueue(job) get_registry().register(t.id) return { "ok": True, "job_id": job_id, "task_id": t.id, "priority": int(priority), "priority_name": priority.name, "queue_depth": get_queue().queue_depth(), } @app.get("/api/queue/status") def queue_status(): q = get_queue() return { "queue_depth": q.queue_depth(), "running_jobs": q.running_count(), "backend": "redis" if hasattr(q, "_r") else "memory", } @app.get("/api/queue/jobs") def list_queue_jobs(): jobs = get_queue().list_queued() return { "jobs": [j.to_dict() for j in jobs], "count": len(jobs), } @app.get("/api/queue/jobs/{job_id}") def get_queue_job(job_id: str): job = get_queue().get_job(job_id) if not job: raise HTTPException(404, "Job not found") return job.to_dict() # ---------- Memory endpoints (Phase 2, preserved) ---------- # @app.get("/api/memory") def get_memory_history(limit: int = Query(default=20, le=100)): mem = get_memory() return {"memories": mem.task_history_summary(limit=limit), "count": limit} @app.get("/api/memory/search") def search_memory(q: str = Query(..., min_length=2)): mem = get_memory() results = mem.retrieve_similar(q, top_k=10) return {"query": q, "results": results, "count": len(results)} # ---------- Workspace Project endpoints (Phase 4, preserved) ---------- # @app.post("/api/workspace/projects") def create_project(body: ProjectCreate): wm = get_workspace_memory() proj = wm.create_project( name=body.name, description=body.description or "", tech_stack=body.tech_stack or [], goals=body.goals or [], conventions=body.conventions or {}, ) return {"ok": True, "project": proj.to_dict()} @app.get("/api/workspace/projects") def list_projects(): wm = get_workspace_memory() return {"projects": wm.list_projects()} @app.get("/api/workspace/projects/{project_id}") def get_project(project_id: str): wm = get_workspace_memory() proj = wm.get_project(project_id) if not proj: raise HTTPException(404, "Project not found") return proj.to_dict() @app.post("/api/workspace/projects/{project_id}/notes") def add_project_note(project_id: str, body: ProjectNote): wm = get_workspace_memory() proj = wm.get_project(project_id) if not proj: raise HTTPException(404, "Project not found") wm.add_note(project_id, body.note, body.tags) return {"ok": True} @app.get("/api/workspace/projects/{project_id}/notes") def get_project_notes(project_id: str, limit: int = Query(default=20, le=100)): wm = get_workspace_memory() notes = wm.get_notes(project_id, limit=limit) return {"project_id": project_id, "notes": notes, "count": len(notes)} # ---------- Phase 5 — Reasoning endpoints ---------- # @app.get("/api/reason/tools") def list_tools(): """List all dynamically selectable tools.""" return { "tools": [ {"name": k, "description": v} for k, v in AVAILABLE_TOOLS.items() ], "count": len(AVAILABLE_TOOLS), } @app.post("/api/reason") async def run_reasoning(body: ReasonRequest, db: Session = Depends(get_session)): """Run the recursive plan-eval-replan reasoning engine on a goal.""" task_id = body.task_id if not task_id: # Create a synthetic task to anchor the reasoning run t = Task(title=f"Reasoning: {body.goal[:80]}", status="INIT", runner_ver="v5") db.add(t) db.commit() db.refresh(t) task_id = t.id reasoner = RecursiveReasoner( task_id=task_id, max_rounds=min(body.max_rounds or 4, 8), success_threshold=body.success_threshold or 0.80, ) try: result = await reasoner.reason( body.goal, memory_context=body.memory_context or "", ) result_dict = result.to_dict() _reasoning_store[task_id] = result_dict # Update task status t2 = db.query(Task).get(task_id) if t2: t2.status = "COMPLETE" if result.success else "FAILED" db.commit() return { "ok": True, "task_id": task_id, "result": result_dict, } except Exception as exc: raise HTTPException(500, f"Reasoning failed: {exc}") @app.post("/api/debug") async def run_self_debug(body: DebugRequest, db: Session = Depends(get_session)): """Run the self-debugging code generation loop.""" task_id = body.task_id if not task_id: t = Task(title=f"Debug: {body.goal[:80]}", status="INIT", runner_ver="v5") db.add(t) db.commit() db.refresh(t) task_id = t.id engine = SelfDebuggingEngine( task_id=task_id, max_attempts=min(body.max_attempts or 4, 8), language=body.language or "python", run_tests_flag=True, ) try: result = await engine.debug( body.goal, initial_code=body.initial_code or "", memory_context=body.memory_context or "", ) result_dict = result.to_dict() t2 = db.query(Task).get(task_id) if t2: t2.status = "COMPLETE" if result.success else "FAILED" db.commit() return { "ok": True, "task_id": task_id, "result": result_dict, } except Exception as exc: raise HTTPException(500, f"Self-debug failed: {exc}") @app.post("/api/tasks/{task_id}/reason") async def reason_on_task(task_id: int, body: ReasonRequest, db: Session = Depends(get_session)): """Attach recursive reasoning to an existing task.""" t = db.query(Task).get(task_id) if not t: raise HTTPException(404, "Task not found") reasoner = RecursiveReasoner( task_id=task_id, max_rounds=min(body.max_rounds or 4, 8), success_threshold=body.success_threshold or 0.80, ) result = await reasoner.reason( body.goal or t.title, memory_context=body.memory_context or "", ) result_dict = result.to_dict() _reasoning_store[task_id] = result_dict if result.final_dag: t.dag_json = json.dumps(result.final_dag) t.status = "COMPLETE" if result.success else "FAILED" db.commit() return {"ok": True, "task_id": task_id, "result": result_dict} @app.get("/api/tasks/{task_id}/reasoning") def get_task_reasoning(task_id: int): """Get the stored reasoning result for a task.""" result = _reasoning_store.get(task_id) if not result: return {"task_id": task_id, "reasoning": None, "message": "No reasoning result found"} return {"task_id": task_id, "reasoning": result} # ---------- File endpoints (Phase 1 preserved) ---------- # @app.get("/api/tasks/{task_id}/files") def list_task_files(task_id: int, path: str = "."): workdir = agent_tools.get_workdir(task_id) res = agent_tools.fs_list(workdir, path) return { "task_id": task_id, "path": path, "ok": res.ok, "output": res.output, "entries": res.data.get("entries", []), "workdir": str(workdir), } @app.get("/api/tasks/{task_id}/files/{file_path:path}") def read_task_file(task_id: int, file_path: str): workdir = agent_tools.get_workdir(task_id) res = agent_tools.fs_read(workdir, file_path) if not res.ok: raise HTTPException(404, res.output) return { "task_id": task_id, "path": file_path, "content": res.output, "truncated": res.data.get("truncated", False), "size": res.data.get("size", 0), } # ---------- Helpers ---------- # def _serialize_task(t: Task, db: Session) -> TaskOut: steps = ( db.query(Step) .filter(Step.task_id == t.id) .order_by(Step.id.asc()) .all() ) return TaskOut( id=t.id, title=t.title, status=t.status, created_at=t.created_at, updated_at=t.updated_at, current_step_id=t.current_step_id, runner_ver=getattr(t, "runner_ver", "v5"), steps=[StepOut.model_validate(s) for s in steps], )