Spaces:
Sleeping
Sleeping
File size: 6,883 Bytes
763ef0d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | """
Agent orchestrator - ties planner + executor + retry + repair + browser.
`run_task(task_id, title, description)` is a generator yielding event dicts
suitable for SSE streaming. Persists everything to tasks.db.
"""
from __future__ import annotations
import json
import time
import logging
import traceback
from typing import Any, Dict, Generator, List, Optional
from . import tasks
from .planner import plan_task, repair_plan
from .executor import get_executor
from .classifier import classify
from .browser import run_browser_action
from .llm_router import get_router
logger = logging.getLogger("agent")
def _event(task_id: str, kind: str, message: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
payload = {
"task_id": task_id,
"kind": kind,
"message": message,
"ts": time.time(),
"data": data or {},
}
tasks.log_event(task_id, kind, json.dumps({"message": message, "data": data or {}})[:7000])
return payload
def run_task(task_id: str, title: str, description: str) -> Generator[Dict[str, Any], None, None]:
"""Generator yielding event dicts. Persists to SQLite."""
try:
tasks.update_state(task_id, "planning")
yield _event(task_id, "state", "planning")
yield _event(task_id, "thought", f"Planning task: {title}")
plan = plan_task(title, description)
yield _event(task_id, "plan", "Plan generated", {"steps": plan})
if not plan:
yield _event(task_id, "warn", "Empty plan – using fallback note")
plan = [{"type": "note", "msg": "No actions planned"}]
tasks.update_state(task_id, "executing")
yield _event(task_id, "state", "executing")
executor = get_executor()
for idx, step in enumerate(plan):
tasks.save_checkpoint(task_id, idx, {"plan": plan, "current": step})
stype = step.get("type", "note")
yield _event(task_id, "step.start", f"[{idx + 1}/{len(plan)}] {stype}", {"step": step})
try:
result = _execute_step(task_id, step)
except Exception as e:
tb = traceback.format_exc()
result = {"ok": False, "stderr": str(e), "traceback": tb}
yield _event(task_id, "step.result", "ok" if result.get("ok") else "fail", {"step_index": idx, "result": result})
# If failed, attempt one repair cycle
if not result.get("ok"):
tasks.update_state(task_id, "repairing")
yield _event(task_id, "state", "repairing")
err_text = (result.get("stderr") or "") + "\n" + (result.get("traceback") or "")
err_class = classify(err_text)
if err_class:
yield _event(task_id, "diagnose", f"Detected: {err_class.category}", {"detail": err_class.detail, "fix": err_class.suggested_fix})
repair_actions = repair_plan(err_class.category, err_class.detail)
for ridx, ra in enumerate(repair_actions):
yield _event(task_id, "repair.start", f"repair[{ridx + 1}]", {"action": ra})
rresult = _execute_step(task_id, ra)
yield _event(task_id, "repair.result", "ok" if rresult.get("ok") else "fail", {"result": rresult})
# retry original step once
tasks.update_state(task_id, "retrying")
yield _event(task_id, "state", "retrying")
tasks.record_retry(task_id, 1, err_text[:1000])
try:
retry_result = _execute_step(task_id, step)
except Exception as e:
retry_result = {"ok": False, "stderr": str(e)}
yield _event(task_id, "retry.result", "ok" if retry_result.get("ok") else "fail", {"step_index": idx, "result": retry_result})
else:
yield _event(task_id, "warn", "No automatic repair – continuing")
tasks.update_state(task_id, "executing")
yield _event(task_id, "state", "executing")
tasks.update_state(task_id, "completed")
yield _event(task_id, "state", "completed")
yield _event(task_id, "done", f"Task {task_id} completed")
except Exception as e:
logger.exception("run_task fatal")
tasks.update_state(task_id, "failed")
yield _event(task_id, "error", f"Fatal: {e}", {"traceback": traceback.format_exc()})
def _execute_step(task_id: str, step: Dict[str, Any]) -> Dict[str, Any]:
executor = get_executor()
stype = step.get("type", "note")
if stype == "shell":
cmd = step.get("cmd", "")
if not cmd:
return {"ok": True, "stdout": "(empty cmd)"}
r = executor.shell(cmd, timeout=float(step.get("timeout", 120)))
return {"ok": r.ok, "stdout": r.stdout[-3000:], "stderr": r.stderr[-3000:], "exit_code": r.exit_code, "duration_ms": r.duration_ms}
if stype == "python":
code = step.get("code", "")
r = executor.python(code, timeout=float(step.get("timeout", 120)))
return {"ok": r.ok, "stdout": r.stdout[-3000:], "stderr": r.stderr[-3000:], "exit_code": r.exit_code, "duration_ms": r.duration_ms}
if stype == "browser":
br = run_browser_action(step)
return {
"ok": br.ok,
"stdout": (br.text or "")[:3000],
"stderr": br.error or "",
"exit_code": 0 if br.ok else 1,
"screenshot_b64": br.screenshot_b64[:500] if br.screenshot_b64 else "",
"url": br.url,
}
if stype == "git":
op = step.get("op", "status")
args = step.get("args", "")
cmd = f"git {op} {args}".strip()
r = executor.shell(cmd, timeout=120)
return {"ok": r.ok, "stdout": r.stdout, "stderr": r.stderr, "exit_code": r.exit_code}
if stype == "deploy":
# Real deploy is invoked via dedicated /deploy endpoints; here we just log.
target = step.get("target", "unknown")
msg = f"Deploy step requested: {target}. Use /deploy endpoints for real deployment."
return {"ok": True, "stdout": msg}
if stype == "note":
return {"ok": True, "stdout": step.get("msg", "")}
if stype == "sleep":
time.sleep(float(step.get("seconds", 1)))
return {"ok": True, "stdout": f"slept {step.get('seconds', 1)}s"}
if stype == "llm":
router = get_router()
prompt = step.get("prompt", "")
try:
out = router.chat([{"role": "user", "content": prompt}], temperature=0.2, max_tokens=800)
return {"ok": True, "stdout": out[:3000]}
except Exception as e:
return {"ok": False, "stderr": str(e)}
return {"ok": False, "stderr": f"unknown step type: {stype}"}
|