AI Developer Agent
AI Developer Agent v1.0 backend
763ef0d
"""
Agent orchestrator - ties planner + executor + retry + repair + browser.
`run_task(task_id, title, description)` is a generator yielding event dicts
suitable for SSE streaming. Persists everything to tasks.db.
"""
from __future__ import annotations
import json
import time
import logging
import traceback
from typing import Any, Dict, Generator, List, Optional
from . import tasks
from .planner import plan_task, repair_plan
from .executor import get_executor
from .classifier import classify
from .browser import run_browser_action
from .llm_router import get_router
logger = logging.getLogger("agent")
def _event(task_id: str, kind: str, message: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
payload = {
"task_id": task_id,
"kind": kind,
"message": message,
"ts": time.time(),
"data": data or {},
}
tasks.log_event(task_id, kind, json.dumps({"message": message, "data": data or {}})[:7000])
return payload
def run_task(task_id: str, title: str, description: str) -> Generator[Dict[str, Any], None, None]:
"""Generator yielding event dicts. Persists to SQLite."""
try:
tasks.update_state(task_id, "planning")
yield _event(task_id, "state", "planning")
yield _event(task_id, "thought", f"Planning task: {title}")
plan = plan_task(title, description)
yield _event(task_id, "plan", "Plan generated", {"steps": plan})
if not plan:
yield _event(task_id, "warn", "Empty plan – using fallback note")
plan = [{"type": "note", "msg": "No actions planned"}]
tasks.update_state(task_id, "executing")
yield _event(task_id, "state", "executing")
executor = get_executor()
for idx, step in enumerate(plan):
tasks.save_checkpoint(task_id, idx, {"plan": plan, "current": step})
stype = step.get("type", "note")
yield _event(task_id, "step.start", f"[{idx + 1}/{len(plan)}] {stype}", {"step": step})
try:
result = _execute_step(task_id, step)
except Exception as e:
tb = traceback.format_exc()
result = {"ok": False, "stderr": str(e), "traceback": tb}
yield _event(task_id, "step.result", "ok" if result.get("ok") else "fail", {"step_index": idx, "result": result})
# If failed, attempt one repair cycle
if not result.get("ok"):
tasks.update_state(task_id, "repairing")
yield _event(task_id, "state", "repairing")
err_text = (result.get("stderr") or "") + "\n" + (result.get("traceback") or "")
err_class = classify(err_text)
if err_class:
yield _event(task_id, "diagnose", f"Detected: {err_class.category}", {"detail": err_class.detail, "fix": err_class.suggested_fix})
repair_actions = repair_plan(err_class.category, err_class.detail)
for ridx, ra in enumerate(repair_actions):
yield _event(task_id, "repair.start", f"repair[{ridx + 1}]", {"action": ra})
rresult = _execute_step(task_id, ra)
yield _event(task_id, "repair.result", "ok" if rresult.get("ok") else "fail", {"result": rresult})
# retry original step once
tasks.update_state(task_id, "retrying")
yield _event(task_id, "state", "retrying")
tasks.record_retry(task_id, 1, err_text[:1000])
try:
retry_result = _execute_step(task_id, step)
except Exception as e:
retry_result = {"ok": False, "stderr": str(e)}
yield _event(task_id, "retry.result", "ok" if retry_result.get("ok") else "fail", {"step_index": idx, "result": retry_result})
else:
yield _event(task_id, "warn", "No automatic repair – continuing")
tasks.update_state(task_id, "executing")
yield _event(task_id, "state", "executing")
tasks.update_state(task_id, "completed")
yield _event(task_id, "state", "completed")
yield _event(task_id, "done", f"Task {task_id} completed")
except Exception as e:
logger.exception("run_task fatal")
tasks.update_state(task_id, "failed")
yield _event(task_id, "error", f"Fatal: {e}", {"traceback": traceback.format_exc()})
def _execute_step(task_id: str, step: Dict[str, Any]) -> Dict[str, Any]:
executor = get_executor()
stype = step.get("type", "note")
if stype == "shell":
cmd = step.get("cmd", "")
if not cmd:
return {"ok": True, "stdout": "(empty cmd)"}
r = executor.shell(cmd, timeout=float(step.get("timeout", 120)))
return {"ok": r.ok, "stdout": r.stdout[-3000:], "stderr": r.stderr[-3000:], "exit_code": r.exit_code, "duration_ms": r.duration_ms}
if stype == "python":
code = step.get("code", "")
r = executor.python(code, timeout=float(step.get("timeout", 120)))
return {"ok": r.ok, "stdout": r.stdout[-3000:], "stderr": r.stderr[-3000:], "exit_code": r.exit_code, "duration_ms": r.duration_ms}
if stype == "browser":
br = run_browser_action(step)
return {
"ok": br.ok,
"stdout": (br.text or "")[:3000],
"stderr": br.error or "",
"exit_code": 0 if br.ok else 1,
"screenshot_b64": br.screenshot_b64[:500] if br.screenshot_b64 else "",
"url": br.url,
}
if stype == "git":
op = step.get("op", "status")
args = step.get("args", "")
cmd = f"git {op} {args}".strip()
r = executor.shell(cmd, timeout=120)
return {"ok": r.ok, "stdout": r.stdout, "stderr": r.stderr, "exit_code": r.exit_code}
if stype == "deploy":
# Real deploy is invoked via dedicated /deploy endpoints; here we just log.
target = step.get("target", "unknown")
msg = f"Deploy step requested: {target}. Use /deploy endpoints for real deployment."
return {"ok": True, "stdout": msg}
if stype == "note":
return {"ok": True, "stdout": step.get("msg", "")}
if stype == "sleep":
time.sleep(float(step.get("seconds", 1)))
return {"ok": True, "stdout": f"slept {step.get('seconds', 1)}s"}
if stype == "llm":
router = get_router()
prompt = step.get("prompt", "")
try:
out = router.chat([{"role": "user", "content": prompt}], temperature=0.2, max_tokens=800)
return {"ok": True, "stdout": out[:3000]}
except Exception as e:
return {"ok": False, "stderr": str(e)}
return {"ok": False, "stderr": f"unknown step type: {stype}"}