Spaces:
Sleeping
Sleeping
| """ | |
| Agent orchestrator - ties planner + executor + retry + repair + browser. | |
| `run_task(task_id, title, description)` is a generator yielding event dicts | |
| suitable for SSE streaming. Persists everything to tasks.db. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import time | |
| import logging | |
| import traceback | |
| from typing import Any, Dict, Generator, List, Optional | |
| from . import tasks | |
| from .planner import plan_task, repair_plan | |
| from .executor import get_executor | |
| from .classifier import classify | |
| from .browser import run_browser_action | |
| from .llm_router import get_router | |
| logger = logging.getLogger("agent") | |
| def _event(task_id: str, kind: str, message: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
| payload = { | |
| "task_id": task_id, | |
| "kind": kind, | |
| "message": message, | |
| "ts": time.time(), | |
| "data": data or {}, | |
| } | |
| tasks.log_event(task_id, kind, json.dumps({"message": message, "data": data or {}})[:7000]) | |
| return payload | |
| def run_task(task_id: str, title: str, description: str) -> Generator[Dict[str, Any], None, None]: | |
| """Generator yielding event dicts. Persists to SQLite.""" | |
| try: | |
| tasks.update_state(task_id, "planning") | |
| yield _event(task_id, "state", "planning") | |
| yield _event(task_id, "thought", f"Planning task: {title}") | |
| plan = plan_task(title, description) | |
| yield _event(task_id, "plan", "Plan generated", {"steps": plan}) | |
| if not plan: | |
| yield _event(task_id, "warn", "Empty plan – using fallback note") | |
| plan = [{"type": "note", "msg": "No actions planned"}] | |
| tasks.update_state(task_id, "executing") | |
| yield _event(task_id, "state", "executing") | |
| executor = get_executor() | |
| for idx, step in enumerate(plan): | |
| tasks.save_checkpoint(task_id, idx, {"plan": plan, "current": step}) | |
| stype = step.get("type", "note") | |
| yield _event(task_id, "step.start", f"[{idx + 1}/{len(plan)}] {stype}", {"step": step}) | |
| try: | |
| result = _execute_step(task_id, step) | |
| except Exception as e: | |
| tb = traceback.format_exc() | |
| result = {"ok": False, "stderr": str(e), "traceback": tb} | |
| yield _event(task_id, "step.result", "ok" if result.get("ok") else "fail", {"step_index": idx, "result": result}) | |
| # If failed, attempt one repair cycle | |
| if not result.get("ok"): | |
| tasks.update_state(task_id, "repairing") | |
| yield _event(task_id, "state", "repairing") | |
| err_text = (result.get("stderr") or "") + "\n" + (result.get("traceback") or "") | |
| err_class = classify(err_text) | |
| if err_class: | |
| yield _event(task_id, "diagnose", f"Detected: {err_class.category}", {"detail": err_class.detail, "fix": err_class.suggested_fix}) | |
| repair_actions = repair_plan(err_class.category, err_class.detail) | |
| for ridx, ra in enumerate(repair_actions): | |
| yield _event(task_id, "repair.start", f"repair[{ridx + 1}]", {"action": ra}) | |
| rresult = _execute_step(task_id, ra) | |
| yield _event(task_id, "repair.result", "ok" if rresult.get("ok") else "fail", {"result": rresult}) | |
| # retry original step once | |
| tasks.update_state(task_id, "retrying") | |
| yield _event(task_id, "state", "retrying") | |
| tasks.record_retry(task_id, 1, err_text[:1000]) | |
| try: | |
| retry_result = _execute_step(task_id, step) | |
| except Exception as e: | |
| retry_result = {"ok": False, "stderr": str(e)} | |
| yield _event(task_id, "retry.result", "ok" if retry_result.get("ok") else "fail", {"step_index": idx, "result": retry_result}) | |
| else: | |
| yield _event(task_id, "warn", "No automatic repair – continuing") | |
| tasks.update_state(task_id, "executing") | |
| yield _event(task_id, "state", "executing") | |
| tasks.update_state(task_id, "completed") | |
| yield _event(task_id, "state", "completed") | |
| yield _event(task_id, "done", f"Task {task_id} completed") | |
| except Exception as e: | |
| logger.exception("run_task fatal") | |
| tasks.update_state(task_id, "failed") | |
| yield _event(task_id, "error", f"Fatal: {e}", {"traceback": traceback.format_exc()}) | |
| def _execute_step(task_id: str, step: Dict[str, Any]) -> Dict[str, Any]: | |
| executor = get_executor() | |
| stype = step.get("type", "note") | |
| if stype == "shell": | |
| cmd = step.get("cmd", "") | |
| if not cmd: | |
| return {"ok": True, "stdout": "(empty cmd)"} | |
| r = executor.shell(cmd, timeout=float(step.get("timeout", 120))) | |
| return {"ok": r.ok, "stdout": r.stdout[-3000:], "stderr": r.stderr[-3000:], "exit_code": r.exit_code, "duration_ms": r.duration_ms} | |
| if stype == "python": | |
| code = step.get("code", "") | |
| r = executor.python(code, timeout=float(step.get("timeout", 120))) | |
| return {"ok": r.ok, "stdout": r.stdout[-3000:], "stderr": r.stderr[-3000:], "exit_code": r.exit_code, "duration_ms": r.duration_ms} | |
| if stype == "browser": | |
| br = run_browser_action(step) | |
| return { | |
| "ok": br.ok, | |
| "stdout": (br.text or "")[:3000], | |
| "stderr": br.error or "", | |
| "exit_code": 0 if br.ok else 1, | |
| "screenshot_b64": br.screenshot_b64[:500] if br.screenshot_b64 else "", | |
| "url": br.url, | |
| } | |
| if stype == "git": | |
| op = step.get("op", "status") | |
| args = step.get("args", "") | |
| cmd = f"git {op} {args}".strip() | |
| r = executor.shell(cmd, timeout=120) | |
| return {"ok": r.ok, "stdout": r.stdout, "stderr": r.stderr, "exit_code": r.exit_code} | |
| if stype == "deploy": | |
| # Real deploy is invoked via dedicated /deploy endpoints; here we just log. | |
| target = step.get("target", "unknown") | |
| msg = f"Deploy step requested: {target}. Use /deploy endpoints for real deployment." | |
| return {"ok": True, "stdout": msg} | |
| if stype == "note": | |
| return {"ok": True, "stdout": step.get("msg", "")} | |
| if stype == "sleep": | |
| time.sleep(float(step.get("seconds", 1))) | |
| return {"ok": True, "stdout": f"slept {step.get('seconds', 1)}s"} | |
| if stype == "llm": | |
| router = get_router() | |
| prompt = step.get("prompt", "") | |
| try: | |
| out = router.chat([{"role": "user", "content": prompt}], temperature=0.2, max_tokens=800) | |
| return {"ok": True, "stdout": out[:3000]} | |
| except Exception as e: | |
| return {"ok": False, "stderr": str(e)} | |
| return {"ok": False, "stderr": f"unknown step type: {stype}"} | |