Spaces:
Running
Running
File size: 4,742 Bytes
1cdb3e3 6a2abaa 1cdb3e3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | """Pipeline state machine with persistence and cancellation support."""
import json
import uuid
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Generator
class Phase(str, Enum):
IDLE = "idle"
EXTRACTING = "extracting"
EVIDENCE = "evidence"
GRAPH = "graph"
INVESTIGATION = "investigation"
TRIAL = "trial"
VERDICT = "verdict"
REPORT = "report"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class PipelineEvent:
"""Event yielded by the pipeline for UI consumption."""
phase: Phase
status: str
data: dict[str, Any] | None = None
agent_role: str | None = None
delta: str | None = None
@dataclass
class PipelineState:
"""Serializable pipeline state for persistence."""
run_id: str
phase: Phase = Phase.IDLE
target_dir: str = ""
zip_path: str = ""
evidence_report: dict | None = None
code_graph_stats: dict | None = None
investigation_reports: dict | None = None
trial_transcript: str | None = None
verdict: str | None = None
final_report: str | None = None
error: str | None = None
started_at: str = ""
completed_at: str = ""
files_scanned: int = 0
total_findings: int = 0
class Pipeline:
"""Stateful pipeline engine with disk persistence and cancellation."""
def __init__(self, storage_dir: str = ".tribunal_data"):
self._storage = Path(storage_dir)
self.state: PipelineState | None = None
self._cancelled = False
def create_run(self, zip_path: str) -> PipelineState:
"""Create a new pipeline run and persist initial state."""
run_id = uuid.uuid4().hex[:12]
self.state = PipelineState(
run_id=run_id,
zip_path=zip_path,
started_at=datetime.now(timezone.utc).isoformat(),
)
self._cancelled = False
self._save()
return self.state
def resume_run(self, run_id: str) -> PipelineState:
"""Resume a previously saved run."""
data = self._load(run_id)
self.state = PipelineState(**data)
self._cancelled = False
return self.state
def cancel(self) -> None:
"""Mark the current run as cancelled."""
self._cancelled = True
if self.state:
self.state.phase = Phase.FAILED
self.state.error = "Cancelled by user"
self.state.completed_at = datetime.now(timezone.utc).isoformat()
self._save()
@property
def is_cancelled(self) -> bool:
return self._cancelled
def update_phase(self, phase: Phase) -> None:
"""Update the current phase and persist."""
if self.state:
self.state.phase = phase
self._save()
def update(self, **kwargs) -> None:
"""Update arbitrary state fields and persist."""
if not self.state:
return
for k, v in kwargs.items():
if hasattr(self.state, k):
setattr(self.state, k, v)
self._save()
def complete(self) -> None:
"""Mark the run as complete."""
if self.state:
self.state.phase = Phase.COMPLETE
self.state.completed_at = datetime.now(timezone.utc).isoformat()
self._save()
def fail(self, error: str) -> None:
"""Mark the run as failed."""
if self.state:
self.state.phase = Phase.FAILED
self.state.error = error
self.state.completed_at = datetime.now(timezone.utc).isoformat()
self._save()
def _save(self) -> None:
if not self.state:
return
path = self._storage / self.state.run_id / "state.json"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(asdict(self.state), indent=2, default=str))
def _load(self, run_id: str) -> dict:
path = self._storage / run_id / "state.json"
if not path.exists():
raise FileNotFoundError(f"No saved state for run {run_id}")
return json.loads(path.read_text())
@staticmethod
def list_runs(storage_dir: str = ".tribunal_data") -> list[dict]:
"""List all saved pipeline runs."""
storage = Path(storage_dir)
if not storage.exists():
return []
runs = []
for d in sorted(storage.iterdir(), reverse=True):
state_file = d / "state.json"
if state_file.exists():
try:
runs.append(json.loads(state_file.read_text()))
except (json.JSONDecodeError, OSError):
pass
return runs
|