File size: 2,908 Bytes
8dcf472
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""
DealFlow AI — State / Handoff Protocol
Implements file-based state persistence across max_turns_per_run = 150 boundaries.

On each run END: write state/handoff.json
On each run START: read state/handoff.json and resume from in_progress
"""
from __future__ import annotations

import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

_DEFAULT_STATE_PATH = Path(__file__).parent.parent / "state" / "handoff.json"


def load_handoff(state_path: Path | None = None) -> dict[str, Any]:
    """Load persisted handoff state, or return a fresh default."""
    path = state_path or _DEFAULT_STATE_PATH
    if path.exists():
        try:
            return json.loads(path.read_text(encoding="utf-8"))
        except (json.JSONDecodeError, OSError):
            pass
    return {
        "phase": "not_started",
        "completed": [],
        "in_progress": None,
        "next_steps": [
            "install_deps",
            "configure_env",
            "start_vllm",
            "launch_gradio",
            "run_full_test",
        ],
        "blockers": [],
        "files_modified": [],
        "test_status": {"passed": 0, "failed": 0, "skip": 0},
        "last_updated": None,
        "run_count": 0,
    }


def save_handoff(
    state: dict[str, Any],
    state_path: Path | None = None,
) -> None:
    """Persist handoff state to disk."""
    path = state_path or _DEFAULT_STATE_PATH
    path.parent.mkdir(parents=True, exist_ok=True)
    state = {**state, "last_updated": datetime.now(timezone.utc).isoformat()}
    path.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8")


def advance_phase(
    state: dict[str, Any],
    completed_item: str | None = None,
    next_in_progress: str | None = None,
    new_blockers: list[str] | None = None,
    files_modified: list[str] | None = None,
    test_status: dict[str, int] | None = None,
) -> dict[str, Any]:
    """Return updated state dict (immutable pattern)."""
    state = dict(state)
    completed = list(state.get("completed", []))
    next_steps = list(state.get("next_steps", []))

    if completed_item and completed_item not in completed:
        completed.append(completed_item)
        if completed_item in next_steps:
            next_steps.remove(completed_item)

    state["completed"] = completed
    state["next_steps"] = next_steps
    state["in_progress"] = next_in_progress
    state["phase"] = next_in_progress or "idle"
    state["run_count"] = state.get("run_count", 0) + 1

    if new_blockers is not None:
        state["blockers"] = new_blockers
    if files_modified:
        existing = list(state.get("files_modified", []))
        for f in files_modified:
            if f not in existing:
                existing.append(f)
        state["files_modified"] = existing
    if test_status:
        state["test_status"] = test_status

    return state