dealflow-ai / src /state.py
PeterBot22's picture
feat: DealFlow AI MVP — 3-agent CrewAI due diligence system on HF Spaces
8dcf472 verified
"""
DealFlow AI — State / Handoff Protocol
Implements file-based state persistence across max_turns_per_run = 150 boundaries.
On each run END: write state/handoff.json
On each run START: read state/handoff.json and resume from in_progress
"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
_DEFAULT_STATE_PATH = Path(__file__).parent.parent / "state" / "handoff.json"
def load_handoff(state_path: Path | None = None) -> dict[str, Any]:
"""Load persisted handoff state, or return a fresh default."""
path = state_path or _DEFAULT_STATE_PATH
if path.exists():
try:
return json.loads(path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
pass
return {
"phase": "not_started",
"completed": [],
"in_progress": None,
"next_steps": [
"install_deps",
"configure_env",
"start_vllm",
"launch_gradio",
"run_full_test",
],
"blockers": [],
"files_modified": [],
"test_status": {"passed": 0, "failed": 0, "skip": 0},
"last_updated": None,
"run_count": 0,
}
def save_handoff(
state: dict[str, Any],
state_path: Path | None = None,
) -> None:
"""Persist handoff state to disk."""
path = state_path or _DEFAULT_STATE_PATH
path.parent.mkdir(parents=True, exist_ok=True)
state = {**state, "last_updated": datetime.now(timezone.utc).isoformat()}
path.write_text(json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8")
def advance_phase(
state: dict[str, Any],
completed_item: str | None = None,
next_in_progress: str | None = None,
new_blockers: list[str] | None = None,
files_modified: list[str] | None = None,
test_status: dict[str, int] | None = None,
) -> dict[str, Any]:
"""Return updated state dict (immutable pattern)."""
state = dict(state)
completed = list(state.get("completed", []))
next_steps = list(state.get("next_steps", []))
if completed_item and completed_item not in completed:
completed.append(completed_item)
if completed_item in next_steps:
next_steps.remove(completed_item)
state["completed"] = completed
state["next_steps"] = next_steps
state["in_progress"] = next_in_progress
state["phase"] = next_in_progress or "idle"
state["run_count"] = state.get("run_count", 0) + 1
if new_blockers is not None:
state["blockers"] = new_blockers
if files_modified:
existing = list(state.get("files_modified", []))
for f in files_modified:
if f not in existing:
existing.append(f)
state["files_modified"] = existing
if test_status:
state["test_status"] = test_status
return state