File size: 4,742 Bytes
1cdb3e3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6a2abaa
1cdb3e3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""Pipeline state machine with persistence and cancellation support."""

import json
import uuid
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Generator


class Phase(str, Enum):
    IDLE = "idle"
    EXTRACTING = "extracting"
    EVIDENCE = "evidence"
    GRAPH = "graph"
    INVESTIGATION = "investigation"
    TRIAL = "trial"
    VERDICT = "verdict"
    REPORT = "report"
    COMPLETE = "complete"
    FAILED = "failed"


@dataclass
class PipelineEvent:
    """Event yielded by the pipeline for UI consumption."""

    phase: Phase
    status: str
    data: dict[str, Any] | None = None
    agent_role: str | None = None
    delta: str | None = None


@dataclass
class PipelineState:
    """Serializable pipeline state for persistence."""

    run_id: str
    phase: Phase = Phase.IDLE
    target_dir: str = ""
    zip_path: str = ""
    evidence_report: dict | None = None
    code_graph_stats: dict | None = None
    investigation_reports: dict | None = None
    trial_transcript: str | None = None
    verdict: str | None = None
    final_report: str | None = None
    error: str | None = None
    started_at: str = ""
    completed_at: str = ""
    files_scanned: int = 0
    total_findings: int = 0


class Pipeline:
    """Stateful pipeline engine with disk persistence and cancellation."""

    def __init__(self, storage_dir: str = ".tribunal_data"):
        self._storage = Path(storage_dir)
        self.state: PipelineState | None = None
        self._cancelled = False

    def create_run(self, zip_path: str) -> PipelineState:
        """Create a new pipeline run and persist initial state."""
        run_id = uuid.uuid4().hex[:12]
        self.state = PipelineState(
            run_id=run_id,
            zip_path=zip_path,
            started_at=datetime.now(timezone.utc).isoformat(),
        )
        self._cancelled = False
        self._save()
        return self.state

    def resume_run(self, run_id: str) -> PipelineState:
        """Resume a previously saved run."""
        data = self._load(run_id)
        self.state = PipelineState(**data)
        self._cancelled = False
        return self.state

    def cancel(self) -> None:
        """Mark the current run as cancelled."""
        self._cancelled = True
        if self.state:
            self.state.phase = Phase.FAILED
            self.state.error = "Cancelled by user"
            self.state.completed_at = datetime.now(timezone.utc).isoformat()
            self._save()

    @property
    def is_cancelled(self) -> bool:
        return self._cancelled

    def update_phase(self, phase: Phase) -> None:
        """Update the current phase and persist."""
        if self.state:
            self.state.phase = phase
            self._save()

    def update(self, **kwargs) -> None:
        """Update arbitrary state fields and persist."""
        if not self.state:
            return
        for k, v in kwargs.items():
            if hasattr(self.state, k):
                setattr(self.state, k, v)
        self._save()

    def complete(self) -> None:
        """Mark the run as complete."""
        if self.state:
            self.state.phase = Phase.COMPLETE
            self.state.completed_at = datetime.now(timezone.utc).isoformat()
            self._save()

    def fail(self, error: str) -> None:
        """Mark the run as failed."""
        if self.state:
            self.state.phase = Phase.FAILED
            self.state.error = error
            self.state.completed_at = datetime.now(timezone.utc).isoformat()
            self._save()

    def _save(self) -> None:
        if not self.state:
            return
        path = self._storage / self.state.run_id / "state.json"
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(json.dumps(asdict(self.state), indent=2, default=str))

    def _load(self, run_id: str) -> dict:
        path = self._storage / run_id / "state.json"
        if not path.exists():
            raise FileNotFoundError(f"No saved state for run {run_id}")
        return json.loads(path.read_text())

    @staticmethod
    def list_runs(storage_dir: str = ".tribunal_data") -> list[dict]:
        """List all saved pipeline runs."""
        storage = Path(storage_dir)
        if not storage.exists():
            return []
        runs = []
        for d in sorted(storage.iterdir(), reverse=True):
            state_file = d / "state.json"
            if state_file.exists():
                try:
                    runs.append(json.loads(state_file.read_text()))
                except (json.JSONDecodeError, OSError):
                    pass
        return runs