""" Agent orchestrator - ties planner + executor + retry + repair + browser. `run_task(task_id, title, description)` is a generator yielding event dicts suitable for SSE streaming. Persists everything to tasks.db. """ from __future__ import annotations import json import time import logging import traceback from typing import Any, Dict, Generator, List, Optional from . import tasks from .planner import plan_task, repair_plan from .executor import get_executor from .classifier import classify from .browser import run_browser_action from .llm_router import get_router logger = logging.getLogger("agent") def _event(task_id: str, kind: str, message: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: payload = { "task_id": task_id, "kind": kind, "message": message, "ts": time.time(), "data": data or {}, } tasks.log_event(task_id, kind, json.dumps({"message": message, "data": data or {}})[:7000]) return payload def run_task(task_id: str, title: str, description: str) -> Generator[Dict[str, Any], None, None]: """Generator yielding event dicts. Persists to SQLite.""" try: tasks.update_state(task_id, "planning") yield _event(task_id, "state", "planning") yield _event(task_id, "thought", f"Planning task: {title}") plan = plan_task(title, description) yield _event(task_id, "plan", "Plan generated", {"steps": plan}) if not plan: yield _event(task_id, "warn", "Empty plan – using fallback note") plan = [{"type": "note", "msg": "No actions planned"}] tasks.update_state(task_id, "executing") yield _event(task_id, "state", "executing") executor = get_executor() for idx, step in enumerate(plan): tasks.save_checkpoint(task_id, idx, {"plan": plan, "current": step}) stype = step.get("type", "note") yield _event(task_id, "step.start", f"[{idx + 1}/{len(plan)}] {stype}", {"step": step}) try: result = _execute_step(task_id, step) except Exception as e: tb = traceback.format_exc() result = {"ok": False, "stderr": str(e), "traceback": tb} yield _event(task_id, "step.result", "ok" if result.get("ok") else "fail", {"step_index": idx, "result": result}) # If failed, attempt one repair cycle if not result.get("ok"): tasks.update_state(task_id, "repairing") yield _event(task_id, "state", "repairing") err_text = (result.get("stderr") or "") + "\n" + (result.get("traceback") or "") err_class = classify(err_text) if err_class: yield _event(task_id, "diagnose", f"Detected: {err_class.category}", {"detail": err_class.detail, "fix": err_class.suggested_fix}) repair_actions = repair_plan(err_class.category, err_class.detail) for ridx, ra in enumerate(repair_actions): yield _event(task_id, "repair.start", f"repair[{ridx + 1}]", {"action": ra}) rresult = _execute_step(task_id, ra) yield _event(task_id, "repair.result", "ok" if rresult.get("ok") else "fail", {"result": rresult}) # retry original step once tasks.update_state(task_id, "retrying") yield _event(task_id, "state", "retrying") tasks.record_retry(task_id, 1, err_text[:1000]) try: retry_result = _execute_step(task_id, step) except Exception as e: retry_result = {"ok": False, "stderr": str(e)} yield _event(task_id, "retry.result", "ok" if retry_result.get("ok") else "fail", {"step_index": idx, "result": retry_result}) else: yield _event(task_id, "warn", "No automatic repair – continuing") tasks.update_state(task_id, "executing") yield _event(task_id, "state", "executing") tasks.update_state(task_id, "completed") yield _event(task_id, "state", "completed") yield _event(task_id, "done", f"Task {task_id} completed") except Exception as e: logger.exception("run_task fatal") tasks.update_state(task_id, "failed") yield _event(task_id, "error", f"Fatal: {e}", {"traceback": traceback.format_exc()}) def _execute_step(task_id: str, step: Dict[str, Any]) -> Dict[str, Any]: executor = get_executor() stype = step.get("type", "note") if stype == "shell": cmd = step.get("cmd", "") if not cmd: return {"ok": True, "stdout": "(empty cmd)"} r = executor.shell(cmd, timeout=float(step.get("timeout", 120))) return {"ok": r.ok, "stdout": r.stdout[-3000:], "stderr": r.stderr[-3000:], "exit_code": r.exit_code, "duration_ms": r.duration_ms} if stype == "python": code = step.get("code", "") r = executor.python(code, timeout=float(step.get("timeout", 120))) return {"ok": r.ok, "stdout": r.stdout[-3000:], "stderr": r.stderr[-3000:], "exit_code": r.exit_code, "duration_ms": r.duration_ms} if stype == "browser": br = run_browser_action(step) return { "ok": br.ok, "stdout": (br.text or "")[:3000], "stderr": br.error or "", "exit_code": 0 if br.ok else 1, "screenshot_b64": br.screenshot_b64[:500] if br.screenshot_b64 else "", "url": br.url, } if stype == "git": op = step.get("op", "status") args = step.get("args", "") cmd = f"git {op} {args}".strip() r = executor.shell(cmd, timeout=120) return {"ok": r.ok, "stdout": r.stdout, "stderr": r.stderr, "exit_code": r.exit_code} if stype == "deploy": # Real deploy is invoked via dedicated /deploy endpoints; here we just log. target = step.get("target", "unknown") msg = f"Deploy step requested: {target}. Use /deploy endpoints for real deployment." return {"ok": True, "stdout": msg} if stype == "note": return {"ok": True, "stdout": step.get("msg", "")} if stype == "sleep": time.sleep(float(step.get("seconds", 1))) return {"ok": True, "stdout": f"slept {step.get('seconds', 1)}s"} if stype == "llm": router = get_router() prompt = step.get("prompt", "") try: out = router.chat([{"role": "user", "content": prompt}], temperature=0.2, max_tokens=800) return {"ok": True, "stdout": out[:3000]} except Exception as e: return {"ok": False, "stderr": str(e)} return {"ok": False, "stderr": f"unknown step type: {stype}"}