""" DealFlow AI — State / Handoff Protocol Implements file-based state persistence across max_turns_per_run = 150 boundaries. On each run END: write state/handoff.json On each run START: read state/handoff.json and resume from in_progress """ from __future__ import annotations import json from datetime import datetime, timezone from pathlib import Path from typing import Any _DEFAULT_STATE_PATH = Path(__file__).parent.parent / "state" / "handoff.json" def load_handoff(state_path: Path | None = None) -> dict[str, Any]: """Load persisted handoff state, or return a fresh default.""" path = state_path or _DEFAULT_STATE_PATH if path.exists(): try: return json.loads(path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): pass return { "phase": "not_started", "completed": [], "in_progress": None, "next_steps": [ "install_deps", "configure_env", "start_vllm", "launch_gradio", "run_full_test", ], "blockers": [], "files_modified": [], "test_status": {"passed": 0, "failed": 0, "skip": 0}, "last_updated": None, "run_count": 0, } def save_handoff( state: dict[str, Any], state_path: Path | None = None, ) -> None: """Persist handoff state to disk.""" path = state_path or _DEFAULT_STATE_PATH path.parent.mkdir(parents=True, exist_ok=True) state = {**state, "last_updated": datetime.now(timezone.utc).isoformat()} path.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8") def advance_phase( state: dict[str, Any], completed_item: str | None = None, next_in_progress: str | None = None, new_blockers: list[str] | None = None, files_modified: list[str] | None = None, test_status: dict[str, int] | None = None, ) -> dict[str, Any]: """Return updated state dict (immutable pattern).""" state = dict(state) completed = list(state.get("completed", [])) next_steps = list(state.get("next_steps", [])) if completed_item and completed_item not in completed: completed.append(completed_item) if completed_item in next_steps: next_steps.remove(completed_item) state["completed"] = completed state["next_steps"] = next_steps state["in_progress"] = next_in_progress state["phase"] = next_in_progress or "idle" state["run_count"] = state.get("run_count", 0) + 1 if new_blockers is not None: state["blockers"] = new_blockers if files_modified: existing = list(state.get("files_modified", [])) for f in files_modified: if f not in existing: existing.append(f) state["files_modified"] = existing if test_status: state["test_status"] = test_status return state