"""Pipeline state machine with persistence and cancellation support.""" import json import uuid from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Any, Generator class Phase(str, Enum): IDLE = "idle" EXTRACTING = "extracting" EVIDENCE = "evidence" GRAPH = "graph" INVESTIGATION = "investigation" TRIAL = "trial" VERDICT = "verdict" REPORT = "report" COMPLETE = "complete" FAILED = "failed" @dataclass class PipelineEvent: """Event yielded by the pipeline for UI consumption.""" phase: Phase status: str data: dict[str, Any] | None = None agent_role: str | None = None delta: str | None = None @dataclass class PipelineState: """Serializable pipeline state for persistence.""" run_id: str phase: Phase = Phase.IDLE target_dir: str = "" zip_path: str = "" evidence_report: dict | None = None code_graph_stats: dict | None = None investigation_reports: dict | None = None trial_transcript: str | None = None verdict: str | None = None final_report: str | None = None error: str | None = None started_at: str = "" completed_at: str = "" files_scanned: int = 0 total_findings: int = 0 class Pipeline: """Stateful pipeline engine with disk persistence and cancellation.""" def __init__(self, storage_dir: str = ".tribunal_data"): self._storage = Path(storage_dir) self.state: PipelineState | None = None self._cancelled = False def create_run(self, zip_path: str) -> PipelineState: """Create a new pipeline run and persist initial state.""" run_id = uuid.uuid4().hex[:12] self.state = PipelineState( run_id=run_id, zip_path=zip_path, started_at=datetime.now(timezone.utc).isoformat(), ) self._cancelled = False self._save() return self.state def resume_run(self, run_id: str) -> PipelineState: """Resume a previously saved run.""" data = self._load(run_id) self.state = PipelineState(**data) self._cancelled = False return self.state def cancel(self) -> None: """Mark the current run as cancelled.""" self._cancelled = True if self.state: self.state.phase = Phase.FAILED self.state.error = "Cancelled by user" self.state.completed_at = datetime.now(timezone.utc).isoformat() self._save() @property def is_cancelled(self) -> bool: return self._cancelled def update_phase(self, phase: Phase) -> None: """Update the current phase and persist.""" if self.state: self.state.phase = phase self._save() def update(self, **kwargs) -> None: """Update arbitrary state fields and persist.""" if not self.state: return for k, v in kwargs.items(): if hasattr(self.state, k): setattr(self.state, k, v) self._save() def complete(self) -> None: """Mark the run as complete.""" if self.state: self.state.phase = Phase.COMPLETE self.state.completed_at = datetime.now(timezone.utc).isoformat() self._save() def fail(self, error: str) -> None: """Mark the run as failed.""" if self.state: self.state.phase = Phase.FAILED self.state.error = error self.state.completed_at = datetime.now(timezone.utc).isoformat() self._save() def _save(self) -> None: if not self.state: return path = self._storage / self.state.run_id / "state.json" path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(asdict(self.state), indent=2, default=str)) def _load(self, run_id: str) -> dict: path = self._storage / run_id / "state.json" if not path.exists(): raise FileNotFoundError(f"No saved state for run {run_id}") return json.loads(path.read_text()) @staticmethod def list_runs(storage_dir: str = ".tribunal_data") -> list[dict]: """List all saved pipeline runs.""" storage = Path(storage_dir) if not storage.exists(): return [] runs = [] for d in sorted(storage.iterdir(), reverse=True): state_file = d / "state.json" if state_file.exists(): try: runs.append(json.loads(state_file.read_text())) except (json.JSONDecodeError, OSError): pass return runs