File size: 6,883 Bytes
763ef0d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""
Agent orchestrator - ties planner + executor + retry + repair + browser.

`run_task(task_id, title, description)` is a generator yielding event dicts
suitable for SSE streaming. Persists everything to tasks.db.
"""
from __future__ import annotations

import json
import time
import logging
import traceback
from typing import Any, Dict, Generator, List, Optional

from . import tasks
from .planner import plan_task, repair_plan
from .executor import get_executor
from .classifier import classify
from .browser import run_browser_action
from .llm_router import get_router

logger = logging.getLogger("agent")


def _event(task_id: str, kind: str, message: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    payload = {
        "task_id": task_id,
        "kind": kind,
        "message": message,
        "ts": time.time(),
        "data": data or {},
    }
    tasks.log_event(task_id, kind, json.dumps({"message": message, "data": data or {}})[:7000])
    return payload


def run_task(task_id: str, title: str, description: str) -> Generator[Dict[str, Any], None, None]:
    """Generator yielding event dicts. Persists to SQLite."""
    try:
        tasks.update_state(task_id, "planning")
        yield _event(task_id, "state", "planning")
        yield _event(task_id, "thought", f"Planning task: {title}")

        plan = plan_task(title, description)
        yield _event(task_id, "plan", "Plan generated", {"steps": plan})

        if not plan:
            yield _event(task_id, "warn", "Empty plan – using fallback note")
            plan = [{"type": "note", "msg": "No actions planned"}]

        tasks.update_state(task_id, "executing")
        yield _event(task_id, "state", "executing")

        executor = get_executor()
        for idx, step in enumerate(plan):
            tasks.save_checkpoint(task_id, idx, {"plan": plan, "current": step})
            stype = step.get("type", "note")
            yield _event(task_id, "step.start", f"[{idx + 1}/{len(plan)}] {stype}", {"step": step})

            try:
                result = _execute_step(task_id, step)
            except Exception as e:
                tb = traceback.format_exc()
                result = {"ok": False, "stderr": str(e), "traceback": tb}

            yield _event(task_id, "step.result", "ok" if result.get("ok") else "fail", {"step_index": idx, "result": result})

            # If failed, attempt one repair cycle
            if not result.get("ok"):
                tasks.update_state(task_id, "repairing")
                yield _event(task_id, "state", "repairing")
                err_text = (result.get("stderr") or "") + "\n" + (result.get("traceback") or "")
                err_class = classify(err_text)
                if err_class:
                    yield _event(task_id, "diagnose", f"Detected: {err_class.category}", {"detail": err_class.detail, "fix": err_class.suggested_fix})
                    repair_actions = repair_plan(err_class.category, err_class.detail)
                    for ridx, ra in enumerate(repair_actions):
                        yield _event(task_id, "repair.start", f"repair[{ridx + 1}]", {"action": ra})
                        rresult = _execute_step(task_id, ra)
                        yield _event(task_id, "repair.result", "ok" if rresult.get("ok") else "fail", {"result": rresult})
                    # retry original step once
                    tasks.update_state(task_id, "retrying")
                    yield _event(task_id, "state", "retrying")
                    tasks.record_retry(task_id, 1, err_text[:1000])
                    try:
                        retry_result = _execute_step(task_id, step)
                    except Exception as e:
                        retry_result = {"ok": False, "stderr": str(e)}
                    yield _event(task_id, "retry.result", "ok" if retry_result.get("ok") else "fail", {"step_index": idx, "result": retry_result})
                else:
                    yield _event(task_id, "warn", "No automatic repair – continuing")
                tasks.update_state(task_id, "executing")
                yield _event(task_id, "state", "executing")

        tasks.update_state(task_id, "completed")
        yield _event(task_id, "state", "completed")
        yield _event(task_id, "done", f"Task {task_id} completed")
    except Exception as e:
        logger.exception("run_task fatal")
        tasks.update_state(task_id, "failed")
        yield _event(task_id, "error", f"Fatal: {e}", {"traceback": traceback.format_exc()})


def _execute_step(task_id: str, step: Dict[str, Any]) -> Dict[str, Any]:
    executor = get_executor()
    stype = step.get("type", "note")

    if stype == "shell":
        cmd = step.get("cmd", "")
        if not cmd:
            return {"ok": True, "stdout": "(empty cmd)"}
        r = executor.shell(cmd, timeout=float(step.get("timeout", 120)))
        return {"ok": r.ok, "stdout": r.stdout[-3000:], "stderr": r.stderr[-3000:], "exit_code": r.exit_code, "duration_ms": r.duration_ms}

    if stype == "python":
        code = step.get("code", "")
        r = executor.python(code, timeout=float(step.get("timeout", 120)))
        return {"ok": r.ok, "stdout": r.stdout[-3000:], "stderr": r.stderr[-3000:], "exit_code": r.exit_code, "duration_ms": r.duration_ms}

    if stype == "browser":
        br = run_browser_action(step)
        return {
            "ok": br.ok,
            "stdout": (br.text or "")[:3000],
            "stderr": br.error or "",
            "exit_code": 0 if br.ok else 1,
            "screenshot_b64": br.screenshot_b64[:500] if br.screenshot_b64 else "",
            "url": br.url,
        }

    if stype == "git":
        op = step.get("op", "status")
        args = step.get("args", "")
        cmd = f"git {op} {args}".strip()
        r = executor.shell(cmd, timeout=120)
        return {"ok": r.ok, "stdout": r.stdout, "stderr": r.stderr, "exit_code": r.exit_code}

    if stype == "deploy":
        # Real deploy is invoked via dedicated /deploy endpoints; here we just log.
        target = step.get("target", "unknown")
        msg = f"Deploy step requested: {target}. Use /deploy endpoints for real deployment."
        return {"ok": True, "stdout": msg}

    if stype == "note":
        return {"ok": True, "stdout": step.get("msg", "")}

    if stype == "sleep":
        time.sleep(float(step.get("seconds", 1)))
        return {"ok": True, "stdout": f"slept {step.get('seconds', 1)}s"}

    if stype == "llm":
        router = get_router()
        prompt = step.get("prompt", "")
        try:
            out = router.chat([{"role": "user", "content": prompt}], temperature=0.2, max_tokens=800)
            return {"ok": True, "stdout": out[:3000]}
        except Exception as e:
            return {"ok": False, "stderr": str(e)}

    return {"ok": False, "stderr": f"unknown step type: {stype}"}