Spaces:
Sleeping
Sleeping
| """Pipeline state machine with persistence and cancellation support.""" | |
| import json | |
| import uuid | |
| from dataclasses import asdict, dataclass, field | |
| from datetime import datetime, timezone | |
| from enum import Enum | |
| from pathlib import Path | |
| from typing import Any, Generator | |
| class Phase(str, Enum): | |
| IDLE = "idle" | |
| EXTRACTING = "extracting" | |
| EVIDENCE = "evidence" | |
| GRAPH = "graph" | |
| INVESTIGATION = "investigation" | |
| TRIAL = "trial" | |
| VERDICT = "verdict" | |
| REPORT = "report" | |
| COMPLETE = "complete" | |
| FAILED = "failed" | |
| class PipelineEvent: | |
| """Event yielded by the pipeline for UI consumption.""" | |
| phase: Phase | |
| status: str | |
| data: dict[str, Any] | None = None | |
| agent_role: str | None = None | |
| delta: str | None = None | |
| class PipelineState: | |
| """Serializable pipeline state for persistence.""" | |
| run_id: str | |
| phase: Phase = Phase.IDLE | |
| target_dir: str = "" | |
| zip_path: str = "" | |
| evidence_report: dict | None = None | |
| code_graph_stats: dict | None = None | |
| investigation_reports: dict | None = None | |
| trial_transcript: str | None = None | |
| verdict: str | None = None | |
| final_report: str | None = None | |
| error: str | None = None | |
| started_at: str = "" | |
| completed_at: str = "" | |
| files_scanned: int = 0 | |
| total_findings: int = 0 | |
| class Pipeline: | |
| """Stateful pipeline engine with disk persistence and cancellation.""" | |
| def __init__(self, storage_dir: str = ".tribunal_data"): | |
| self._storage = Path(storage_dir) | |
| self.state: PipelineState | None = None | |
| self._cancelled = False | |
| def create_run(self, zip_path: str) -> PipelineState: | |
| """Create a new pipeline run and persist initial state.""" | |
| run_id = uuid.uuid4().hex[:12] | |
| self.state = PipelineState( | |
| run_id=run_id, | |
| zip_path=zip_path, | |
| started_at=datetime.now(timezone.utc).isoformat(), | |
| ) | |
| self._cancelled = False | |
| self._save() | |
| return self.state | |
| def resume_run(self, run_id: str) -> PipelineState: | |
| """Resume a previously saved run.""" | |
| data = self._load(run_id) | |
| self.state = PipelineState(**data) | |
| self._cancelled = False | |
| return self.state | |
| def cancel(self) -> None: | |
| """Mark the current run as cancelled.""" | |
| self._cancelled = True | |
| if self.state: | |
| self.state.phase = Phase.FAILED | |
| self.state.error = "Cancelled by user" | |
| self.state.completed_at = datetime.now(timezone.utc).isoformat() | |
| self._save() | |
| def is_cancelled(self) -> bool: | |
| return self._cancelled | |
| def update_phase(self, phase: Phase) -> None: | |
| """Update the current phase and persist.""" | |
| if self.state: | |
| self.state.phase = phase | |
| self._save() | |
| def update(self, **kwargs) -> None: | |
| """Update arbitrary state fields and persist.""" | |
| if not self.state: | |
| return | |
| for k, v in kwargs.items(): | |
| if hasattr(self.state, k): | |
| setattr(self.state, k, v) | |
| self._save() | |
| def complete(self) -> None: | |
| """Mark the run as complete.""" | |
| if self.state: | |
| self.state.phase = Phase.COMPLETE | |
| self.state.completed_at = datetime.now(timezone.utc).isoformat() | |
| self._save() | |
| def fail(self, error: str) -> None: | |
| """Mark the run as failed.""" | |
| if self.state: | |
| self.state.phase = Phase.FAILED | |
| self.state.error = error | |
| self.state.completed_at = datetime.now(timezone.utc).isoformat() | |
| self._save() | |
| def _save(self) -> None: | |
| if not self.state: | |
| return | |
| path = self._storage / self.state.run_id / "state.json" | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(asdict(self.state), indent=2, default=str)) | |
| def _load(self, run_id: str) -> dict: | |
| path = self._storage / run_id / "state.json" | |
| if not path.exists(): | |
| raise FileNotFoundError(f"No saved state for run {run_id}") | |
| return json.loads(path.read_text()) | |
| def list_runs(storage_dir: str = ".tribunal_data") -> list[dict]: | |
| """List all saved pipeline runs.""" | |
| storage = Path(storage_dir) | |
| if not storage.exists(): | |
| return [] | |
| runs = [] | |
| for d in sorted(storage.iterdir(), reverse=True): | |
| state_file = d / "state.json" | |
| if state_file.exists(): | |
| try: | |
| runs.append(json.loads(state_file.read_text())) | |
| except (json.JSONDecodeError, OSError): | |
| pass | |
| return runs | |