CodeTribunal / src /code_tribunal /pipeline.py
amine-yagoub's picture
refactor: clean up core modules by removing comment headers and unused code
6a2abaa
"""Pipeline state machine with persistence and cancellation support."""
import json
import uuid
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Generator
class Phase(str, Enum):
IDLE = "idle"
EXTRACTING = "extracting"
EVIDENCE = "evidence"
GRAPH = "graph"
INVESTIGATION = "investigation"
TRIAL = "trial"
VERDICT = "verdict"
REPORT = "report"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class PipelineEvent:
"""Event yielded by the pipeline for UI consumption."""
phase: Phase
status: str
data: dict[str, Any] | None = None
agent_role: str | None = None
delta: str | None = None
@dataclass
class PipelineState:
"""Serializable pipeline state for persistence."""
run_id: str
phase: Phase = Phase.IDLE
target_dir: str = ""
zip_path: str = ""
evidence_report: dict | None = None
code_graph_stats: dict | None = None
investigation_reports: dict | None = None
trial_transcript: str | None = None
verdict: str | None = None
final_report: str | None = None
error: str | None = None
started_at: str = ""
completed_at: str = ""
files_scanned: int = 0
total_findings: int = 0
class Pipeline:
"""Stateful pipeline engine with disk persistence and cancellation."""
def __init__(self, storage_dir: str = ".tribunal_data"):
self._storage = Path(storage_dir)
self.state: PipelineState | None = None
self._cancelled = False
def create_run(self, zip_path: str) -> PipelineState:
"""Create a new pipeline run and persist initial state."""
run_id = uuid.uuid4().hex[:12]
self.state = PipelineState(
run_id=run_id,
zip_path=zip_path,
started_at=datetime.now(timezone.utc).isoformat(),
)
self._cancelled = False
self._save()
return self.state
def resume_run(self, run_id: str) -> PipelineState:
"""Resume a previously saved run."""
data = self._load(run_id)
self.state = PipelineState(**data)
self._cancelled = False
return self.state
def cancel(self) -> None:
"""Mark the current run as cancelled."""
self._cancelled = True
if self.state:
self.state.phase = Phase.FAILED
self.state.error = "Cancelled by user"
self.state.completed_at = datetime.now(timezone.utc).isoformat()
self._save()
@property
def is_cancelled(self) -> bool:
return self._cancelled
def update_phase(self, phase: Phase) -> None:
"""Update the current phase and persist."""
if self.state:
self.state.phase = phase
self._save()
def update(self, **kwargs) -> None:
"""Update arbitrary state fields and persist."""
if not self.state:
return
for k, v in kwargs.items():
if hasattr(self.state, k):
setattr(self.state, k, v)
self._save()
def complete(self) -> None:
"""Mark the run as complete."""
if self.state:
self.state.phase = Phase.COMPLETE
self.state.completed_at = datetime.now(timezone.utc).isoformat()
self._save()
def fail(self, error: str) -> None:
"""Mark the run as failed."""
if self.state:
self.state.phase = Phase.FAILED
self.state.error = error
self.state.completed_at = datetime.now(timezone.utc).isoformat()
self._save()
def _save(self) -> None:
if not self.state:
return
path = self._storage / self.state.run_id / "state.json"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(asdict(self.state), indent=2, default=str))
def _load(self, run_id: str) -> dict:
path = self._storage / run_id / "state.json"
if not path.exists():
raise FileNotFoundError(f"No saved state for run {run_id}")
return json.loads(path.read_text())
@staticmethod
def list_runs(storage_dir: str = ".tribunal_data") -> list[dict]:
"""List all saved pipeline runs."""
storage = Path(storage_dir)
if not storage.exists():
return []
runs = []
for d in sorted(storage.iterdir(), reverse=True):
state_file = d / "state.json"
if state_file.exists():
try:
runs.append(json.loads(state_file.read_text()))
except (json.JSONDecodeError, OSError):
pass
return runs