| # phase 3: end-to-end pipeline (no ui) | |
| ## purpose | |
| Tie together Phase 1 (data loading) and Phase 2 (DeepISLES inference) into a cohesive pipeline. At the end of this phase, we can run stroke segmentation on any case from ISLES24-MR-Lite with a single function call. | |
| ## deliverables | |
| - [ ] `src/stroke_deepisles_demo/pipeline.py` - Main orchestration | |
| - [ ] `src/stroke_deepisles_demo/metrics.py` - Optional Dice computation | |
| - [ ] CLI entry point for testing | |
| - [ ] Unit tests with full mocking | |
| - [ ] Integration test for complete flow | |
| ## vertical slice outcome | |
| After this phase, you can run: | |
| ```python | |
| from stroke_deepisles_demo.pipeline import run_pipeline_on_case | |
| # Run segmentation on a specific case | |
| result = run_pipeline_on_case("sub-001") | |
| print(f"Input DWI: {result.input_files.dwi}") | |
| print(f"Input ADC: {result.input_files.adc}") | |
| print(f"Prediction: {result.prediction_mask}") | |
| print(f"Ground truth: {result.ground_truth}") | |
| print(f"Dice score: {result.dice_score:.3f}") # if computed | |
| ``` | |
| Or via CLI: | |
| ```bash | |
| uv run stroke-demo run --case sub-001 --fast | |
| uv run stroke-demo run --index 0 --output ./results | |
| uv run stroke-demo list # List all available cases | |
| ``` | |
| ## module structure | |
| ``` | |
| src/stroke_deepisles_demo/ | |
| βββ pipeline.py # Main orchestration | |
| βββ metrics.py # Dice score computation | |
| βββ cli.py # CLI entry point (optional) | |
| ``` | |
| ## interfaces and types | |
| ### `pipeline.py` | |
| ```python | |
| """End-to-end pipeline orchestration.""" | |
| from __future__ import annotations | |
| import tempfile | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Mapping | |
| from stroke_deepisles_demo.core.config import settings | |
| from stroke_deepisles_demo.core.types import CaseFiles, InferenceResult | |
| from stroke_deepisles_demo.data import CaseAdapter, load_isles_dataset, stage_case_for_deepisles | |
| from stroke_deepisles_demo.inference import run_deepisles_on_folder | |
| @dataclass(frozen=True) | |
| class PipelineResult: | |
| """Complete result of running the pipeline on a case.""" | |
| case_id: str | |
| input_files: CaseFiles | |
| staged_dir: Path | |
| prediction_mask: Path | |
| ground_truth: Path | None | |
| dice_score: float | None # None if ground truth unavailable or not computed | |
| elapsed_seconds: float | |
| def run_pipeline_on_case( | |
| case_id: str | int, | |
| *, | |
| dataset_id: str | None = None, | |
| output_dir: Path | None = None, | |
| fast: bool = True, | |
| gpu: bool = True, | |
| compute_dice: bool = True, | |
| cleanup_staging: bool = False, | |
| ) -> PipelineResult: | |
| """ | |
| Run the complete segmentation pipeline on a single case. | |
| This function: | |
| 1. Loads the case from HuggingFace Hub (or cache) | |
| 2. Stages NIfTI files with DeepISLES-expected naming | |
| 3. Runs DeepISLES Docker container | |
| 4. Optionally computes Dice score against ground truth | |
| 5. Returns all paths and metrics | |
| Args: | |
| case_id: Case identifier (string) or index (int) | |
| dataset_id: HF dataset ID (default from settings) | |
| output_dir: Directory for results (default: temp dir) | |
| fast: Use SEALS-only mode (ISLES'22 winner, DWI+ADC only, no FLAIR needed) | |
| gpu: Use GPU acceleration | |
| compute_dice: Compute Dice score if ground truth available | |
| cleanup_staging: Remove staging directory after inference | |
| Returns: | |
| PipelineResult with all paths and optional metrics | |
| Raises: | |
| DataLoadError: If case cannot be loaded | |
| MissingInputError: If required files missing | |
| DeepISLESError: If inference fails | |
| Example: | |
| >>> result = run_pipeline_on_case("sub-001", fast=True) | |
| >>> print(f"Dice: {result.dice_score:.3f}") | |
| """ | |
| ... | |
| def run_pipeline_on_batch( | |
| case_ids: list[str | int], | |
| *, | |
| max_workers: int = 1, | |
| **kwargs, | |
| ) -> list[PipelineResult]: | |
| """ | |
| Run pipeline on multiple cases. | |
| Note: Parallel execution requires multiple GPUs or sequential mode. | |
| Args: | |
| case_ids: List of case identifiers or indices | |
| max_workers: Number of parallel workers (default 1 for sequential) | |
| **kwargs: Passed to run_pipeline_on_case | |
| Returns: | |
| List of PipelineResult, one per case | |
| """ | |
| ... | |
| def get_pipeline_summary(results: list[PipelineResult]) -> PipelineSummary: | |
| """ | |
| Compute summary statistics from multiple pipeline results. | |
| Returns: | |
| Summary with mean Dice, success rate, etc. | |
| """ | |
| ... | |
| @dataclass(frozen=True) | |
| class PipelineSummary: | |
| """Summary statistics from multiple pipeline runs.""" | |
| num_cases: int | |
| num_successful: int | |
| num_failed: int | |
| mean_dice: float | None | |
| std_dice: float | None | |
| min_dice: float | None | |
| max_dice: float | None | |
| mean_elapsed_seconds: float | |
| # Internal helper | |
| def _load_or_get_adapter( | |
| dataset_id: str | None = None, | |
| cache: dict | None = None, | |
| ) -> CaseAdapter: | |
| """Load dataset and return adapter, using cache if available.""" | |
| ... | |
| ``` | |
| ### `metrics.py` | |
| ```python | |
| """Metrics for evaluating segmentation quality.""" | |
| from __future__ import annotations | |
| from pathlib import Path | |
| import nibabel as nib | |
| import numpy as np | |
| from numpy.typing import NDArray | |
| def compute_dice( | |
| prediction: Path | NDArray[np.float64], | |
| ground_truth: Path | NDArray[np.float64], | |
| *, | |
| threshold: float = 0.5, | |
| ) -> float: | |
| """ | |
| Compute Dice similarity coefficient between prediction and ground truth. | |
| Dice = 2 * |P β© G| / (|P| + |G|) | |
| Args: | |
| prediction: Path to NIfTI file or numpy array | |
| ground_truth: Path to NIfTI file or numpy array | |
| threshold: Threshold for binarization (if needed) | |
| Returns: | |
| Dice coefficient in [0, 1] | |
| Raises: | |
| ValueError: If shapes don't match | |
| """ | |
| ... | |
| def compute_volume_ml( | |
| mask: Path | NDArray[np.float64], | |
| voxel_size_mm: tuple[float, float, float] | None = None, | |
| ) -> float: | |
| """ | |
| Compute lesion volume in milliliters. | |
| Args: | |
| mask: Path to NIfTI file or numpy array | |
| voxel_size_mm: Voxel dimensions in mm (read from NIfTI if None) | |
| Returns: | |
| Volume in milliliters (mL) | |
| """ | |
| ... | |
| def load_nifti_as_array(path: Path) -> tuple[NDArray[np.float64], tuple[float, ...]]: | |
| """ | |
| Load NIfTI file and return data array with voxel dimensions. | |
| Returns: | |
| Tuple of (data_array, voxel_sizes_mm) | |
| """ | |
| ... | |
| ``` | |
| ### `cli.py` (optional) | |
| ```python | |
| """Command-line interface for stroke-deepisles-demo.""" | |
| from __future__ import annotations | |
| import argparse | |
| import sys | |
| from pathlib import Path | |
| def main(argv: list[str] | None = None) -> int: | |
| """Main CLI entry point.""" | |
| parser = argparse.ArgumentParser( | |
| prog="stroke-demo", | |
| description="Run DeepISLES stroke segmentation on HF datasets", | |
| ) | |
| subparsers = parser.add_subparsers(dest="command", required=True) | |
| # List command | |
| list_parser = subparsers.add_parser("list", help="List available cases") | |
| list_parser.add_argument( | |
| "--dataset", default=None, help="HF dataset ID" | |
| ) | |
| # Run command | |
| run_parser = subparsers.add_parser("run", help="Run segmentation") | |
| run_parser.add_argument( | |
| "--case", type=str, help="Case ID (e.g., sub-001)" | |
| ) | |
| run_parser.add_argument( | |
| "--index", type=int, help="Case index (alternative to --case)" | |
| ) | |
| run_parser.add_argument( | |
| "--output", type=Path, default=None, help="Output directory" | |
| ) | |
| run_parser.add_argument( | |
| "--fast", action="store_true", default=True, help="Use fast mode" | |
| ) | |
| run_parser.add_argument( | |
| "--no-gpu", action="store_true", help="Disable GPU" | |
| ) | |
| args = parser.parse_args(argv) | |
| if args.command == "list": | |
| return cmd_list(args) | |
| elif args.command == "run": | |
| return cmd_run(args) | |
| return 0 | |
| def cmd_list(args: argparse.Namespace) -> int: | |
| """Handle 'list' command.""" | |
| ... | |
| def cmd_run(args: argparse.Namespace) -> int: | |
| """Handle 'run' command.""" | |
| ... | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |
| ``` | |
| ### pyproject.toml addition for CLI | |
| ```toml | |
| [project.scripts] | |
| stroke-demo = "stroke_deepisles_demo.cli:main" | |
| ``` | |
| ## tdd plan | |
| ### test file structure | |
| ``` | |
| tests/ | |
| βββ test_pipeline.py # Pipeline orchestration tests | |
| βββ test_metrics.py # Metrics computation tests | |
| βββ test_cli.py # CLI tests (optional) | |
| ``` | |
| ### tests to write first (TDD order) | |
| #### 1. `tests/test_metrics.py` - Pure functions, no mocks needed | |
| ```python | |
| """Tests for metrics module.""" | |
| from __future__ import annotations | |
| from pathlib import Path | |
| import nibabel as nib | |
| import numpy as np | |
| import pytest | |
| from stroke_deepisles_demo.metrics import ( | |
| compute_dice, | |
| compute_volume_ml, | |
| load_nifti_as_array, | |
| ) | |
| class TestComputeDice: | |
| """Tests for compute_dice.""" | |
| def test_identical_masks_return_one(self) -> None: | |
| """Dice of identical masks is 1.0.""" | |
| mask = np.array([[[1, 1, 0], [0, 1, 0], [0, 0, 1]]]) | |
| dice = compute_dice(mask, mask) | |
| assert dice == 1.0 | |
| def test_no_overlap_returns_zero(self) -> None: | |
| """Dice of non-overlapping masks is 0.0.""" | |
| pred = np.array([[[1, 1, 0], [0, 0, 0], [0, 0, 0]]]) | |
| gt = np.array([[[0, 0, 0], [0, 0, 0], [0, 0, 1]]]) | |
| dice = compute_dice(pred, gt) | |
| assert dice == 0.0 | |
| def test_partial_overlap(self) -> None: | |
| """Dice with partial overlap is between 0 and 1.""" | |
| pred = np.array([[[1, 1, 0], [0, 0, 0], [0, 0, 0]]]) | |
| gt = np.array([[[1, 0, 0], [0, 0, 0], [0, 0, 0]]]) | |
| dice = compute_dice(pred, gt) | |
| # Overlap: 1, Pred: 2, GT: 1 -> Dice = 2*1 / (2+1) = 0.667 | |
| assert 0.6 < dice < 0.7 | |
| def test_empty_masks_return_one(self) -> None: | |
| """Dice of two empty masks is 1.0 (both agree on nothing).""" | |
| empty = np.zeros((10, 10, 10)) | |
| dice = compute_dice(empty, empty) | |
| assert dice == 1.0 | |
| def test_accepts_file_paths(self, temp_dir: Path) -> None: | |
| """Can compute Dice from NIfTI file paths.""" | |
| mask = np.array([[[1, 1, 0], [0, 1, 0], [0, 0, 1]]]).astype(np.float32) | |
| img = nib.Nifti1Image(mask, np.eye(4)) | |
| pred_path = temp_dir / "pred.nii.gz" | |
| gt_path = temp_dir / "gt.nii.gz" | |
| nib.save(img, pred_path) | |
| nib.save(img, gt_path) | |
| dice = compute_dice(pred_path, gt_path) | |
| assert dice == 1.0 | |
| def test_shape_mismatch_raises(self) -> None: | |
| """Raises ValueError if shapes don't match.""" | |
| pred = np.zeros((10, 10, 10)) | |
| gt = np.zeros((10, 10, 5)) | |
| with pytest.raises(ValueError, match="shape"): | |
| compute_dice(pred, gt) | |
| class TestComputeVolumeMl: | |
| """Tests for compute_volume_ml.""" | |
| def test_computes_volume_from_voxel_size(self) -> None: | |
| """Volume computed correctly from voxel dimensions.""" | |
| # 10x10x10 = 1000 voxels of size 1mm^3 each = 1000mm^3 = 1mL | |
| mask = np.ones((10, 10, 10)) | |
| volume = compute_volume_ml(mask, voxel_size_mm=(1.0, 1.0, 1.0)) | |
| assert volume == pytest.approx(1.0, rel=0.01) | |
| def test_reads_voxel_size_from_nifti(self, temp_dir: Path) -> None: | |
| """Reads voxel size from NIfTI header.""" | |
| mask = np.ones((10, 10, 10)).astype(np.float32) | |
| # Affine with 2mm voxels | |
| affine = np.diag([2.0, 2.0, 2.0, 1.0]) | |
| img = nib.Nifti1Image(mask, affine) | |
| path = temp_dir / "mask.nii.gz" | |
| nib.save(img, path) | |
| # 1000 voxels * 8mm^3 = 8000mm^3 = 8mL | |
| volume = compute_volume_ml(path) | |
| assert volume == pytest.approx(8.0, rel=0.01) | |
| class TestLoadNiftiAsArray: | |
| """Tests for load_nifti_as_array.""" | |
| def test_returns_array_and_voxel_sizes(self, temp_dir: Path) -> None: | |
| """Returns data array and voxel dimensions.""" | |
| data = np.random.rand(10, 10, 10).astype(np.float32) | |
| affine = np.diag([1.5, 1.5, 2.0, 1.0]) | |
| img = nib.Nifti1Image(data, affine) | |
| path = temp_dir / "test.nii.gz" | |
| nib.save(img, path) | |
| arr, voxels = load_nifti_as_array(path) | |
| assert arr.shape == (10, 10, 10) | |
| assert voxels == pytest.approx((1.5, 1.5, 2.0), rel=0.01) | |
| ``` | |
| #### 2. `tests/test_pipeline.py` - Full orchestration with mocks | |
| ```python | |
| """Tests for pipeline orchestration.""" | |
| from __future__ import annotations | |
| from pathlib import Path | |
| from unittest.mock import MagicMock, patch | |
| import pytest | |
| from stroke_deepisles_demo.core.types import CaseFiles | |
| from stroke_deepisles_demo.pipeline import ( | |
| PipelineResult, | |
| PipelineSummary, | |
| get_pipeline_summary, | |
| run_pipeline_on_case, | |
| ) | |
| class TestRunPipelineOnCase: | |
| """Tests for run_pipeline_on_case.""" | |
| @pytest.fixture | |
| def mock_dependencies(self, temp_dir: Path): | |
| """Mock all external dependencies.""" | |
| with patch( | |
| "stroke_deepisles_demo.pipeline.load_isles_dataset" | |
| ) as mock_load, patch( | |
| "stroke_deepisles_demo.pipeline.CaseAdapter" | |
| ) as mock_adapter_cls, patch( | |
| "stroke_deepisles_demo.pipeline.stage_case_for_deepisles" | |
| ) as mock_stage, patch( | |
| "stroke_deepisles_demo.pipeline.run_deepisles_on_folder" | |
| ) as mock_inference, patch( | |
| "stroke_deepisles_demo.pipeline.compute_dice" | |
| ) as mock_dice: | |
| # Configure mocks | |
| mock_adapter = MagicMock() | |
| mock_adapter.get_case.return_value = CaseFiles( | |
| dwi=temp_dir / "dwi.nii.gz", | |
| adc=temp_dir / "adc.nii.gz", | |
| flair=None, | |
| ground_truth=temp_dir / "gt.nii.gz", | |
| ) | |
| mock_adapter_cls.return_value = mock_adapter | |
| mock_stage.return_value = MagicMock( | |
| input_dir=temp_dir / "staged", | |
| dwi_path=temp_dir / "staged" / "dwi.nii.gz", | |
| adc_path=temp_dir / "staged" / "adc.nii.gz", | |
| flair_path=None, | |
| ) | |
| mock_inference.return_value = MagicMock( | |
| prediction_path=temp_dir / "results" / "pred.nii.gz", | |
| elapsed_seconds=10.5, | |
| ) | |
| mock_dice.return_value = 0.85 | |
| yield { | |
| "load": mock_load, | |
| "adapter_cls": mock_adapter_cls, | |
| "adapter": mock_adapter, | |
| "stage": mock_stage, | |
| "inference": mock_inference, | |
| "dice": mock_dice, | |
| } | |
| def test_returns_pipeline_result(self, mock_dependencies, temp_dir) -> None: | |
| """Returns PipelineResult with expected fields.""" | |
| result = run_pipeline_on_case("sub-001") | |
| assert isinstance(result, PipelineResult) | |
| assert result.case_id == "sub-001" | |
| def test_loads_case_from_adapter(self, mock_dependencies, temp_dir) -> None: | |
| """Loads case using CaseAdapter.""" | |
| run_pipeline_on_case("sub-001") | |
| mock_dependencies["adapter"].get_case.assert_called_once_with("sub-001") | |
| def test_stages_files_for_deepisles(self, mock_dependencies, temp_dir) -> None: | |
| """Stages files with correct naming.""" | |
| run_pipeline_on_case("sub-001") | |
| mock_dependencies["stage"].assert_called_once() | |
| def test_runs_deepisles_inference(self, mock_dependencies, temp_dir) -> None: | |
| """Runs DeepISLES on staged directory.""" | |
| run_pipeline_on_case("sub-001", fast=True, gpu=False) | |
| mock_dependencies["inference"].assert_called_once() | |
| call_kwargs = mock_dependencies["inference"].call_args.kwargs | |
| assert call_kwargs.get("fast") is True | |
| assert call_kwargs.get("gpu") is False | |
| def test_computes_dice_when_ground_truth_available( | |
| self, mock_dependencies, temp_dir | |
| ) -> None: | |
| """Computes Dice score when ground truth is available.""" | |
| result = run_pipeline_on_case("sub-001", compute_dice=True) | |
| mock_dependencies["dice"].assert_called_once() | |
| assert result.dice_score == 0.85 | |
| def test_skips_dice_when_disabled(self, mock_dependencies, temp_dir) -> None: | |
| """Skips Dice computation when compute_dice=False.""" | |
| result = run_pipeline_on_case("sub-001", compute_dice=False) | |
| mock_dependencies["dice"].assert_not_called() | |
| assert result.dice_score is None | |
| def test_handles_missing_ground_truth(self, mock_dependencies, temp_dir) -> None: | |
| """Handles cases without ground truth gracefully.""" | |
| # Modify mock to return no ground truth | |
| mock_dependencies["adapter"].get_case.return_value = CaseFiles( | |
| dwi=temp_dir / "dwi.nii.gz", | |
| adc=temp_dir / "adc.nii.gz", | |
| flair=None, | |
| ground_truth=None, | |
| ) | |
| result = run_pipeline_on_case("sub-001", compute_dice=True) | |
| assert result.dice_score is None | |
| assert result.ground_truth is None | |
| def test_accepts_integer_index(self, mock_dependencies, temp_dir) -> None: | |
| """Accepts integer index as case identifier.""" | |
| mock_dependencies["adapter"].get_case_by_index.return_value = ( | |
| "sub-001", | |
| CaseFiles( | |
| dwi=temp_dir / "dwi.nii.gz", | |
| adc=temp_dir / "adc.nii.gz", | |
| flair=None, | |
| ground_truth=None, | |
| ), | |
| ) | |
| result = run_pipeline_on_case(0) | |
| assert result.case_id == "sub-001" | |
| class TestGetPipelineSummary: | |
| """Tests for get_pipeline_summary.""" | |
| def test_computes_mean_dice(self) -> None: | |
| """Computes mean Dice from results.""" | |
| results = [ | |
| MagicMock(dice_score=0.8, elapsed_seconds=10), | |
| MagicMock(dice_score=0.9, elapsed_seconds=12), | |
| MagicMock(dice_score=0.7, elapsed_seconds=8), | |
| ] | |
| summary = get_pipeline_summary(results) | |
| assert summary.mean_dice == pytest.approx(0.8, rel=0.01) | |
| def test_handles_none_dice_scores(self) -> None: | |
| """Handles results with None Dice scores.""" | |
| results = [ | |
| MagicMock(dice_score=0.8, elapsed_seconds=10), | |
| MagicMock(dice_score=None, elapsed_seconds=12), | |
| MagicMock(dice_score=0.7, elapsed_seconds=8), | |
| ] | |
| summary = get_pipeline_summary(results) | |
| # Mean of 0.8 and 0.7 only | |
| assert summary.mean_dice == pytest.approx(0.75, rel=0.01) | |
| def test_counts_successful_and_failed(self) -> None: | |
| """Counts successful and failed runs.""" | |
| results = [ | |
| MagicMock(dice_score=0.8, elapsed_seconds=10), | |
| MagicMock(dice_score=None, elapsed_seconds=0), # Failed | |
| ] | |
| summary = get_pipeline_summary(results) | |
| assert summary.num_cases == 2 | |
| assert summary.num_successful == 1 | |
| assert summary.num_failed == 1 | |
| @pytest.mark.integration | |
| class TestPipelineIntegration: | |
| """Integration tests for full pipeline.""" | |
| @pytest.mark.slow | |
| def test_run_on_real_case(self) -> None: | |
| """Run pipeline on actual ISLES24-MR-Lite case.""" | |
| # Requires: network, Docker, DeepISLES image | |
| # Run with: pytest -m "integration and slow" | |
| result = run_pipeline_on_case( | |
| 0, # First case | |
| fast=True, | |
| gpu=False, | |
| compute_dice=True, | |
| ) | |
| assert result.prediction_mask.exists() | |
| assert 0 <= result.dice_score <= 1 | |
| ``` | |
| ### what to mock | |
| - `load_isles_dataset` - Avoid network calls | |
| - `CaseAdapter` - Return synthetic CaseFiles | |
| - `stage_case_for_deepisles` - Return mock staged paths | |
| - `run_deepisles_on_folder` - Avoid Docker | |
| - `compute_dice` - Return fixed value for deterministic tests | |
| ### what to test for real | |
| - Dice computation (pure NumPy) | |
| - Volume computation (pure NumPy + nibabel) | |
| - NIfTI loading | |
| - Integration: full pipeline on real data | |
| ## "done" criteria | |
| Phase 3 is complete when: | |
| 1. All unit tests pass: `uv run pytest tests/test_pipeline.py tests/test_metrics.py -v` | |
| 2. Dice computation is correct for known test cases | |
| 3. Pipeline orchestrates all components correctly | |
| 4. CLI works: `uv run stroke-demo list` and `uv run stroke-demo run --index 0` | |
| 5. Integration test passes: `uv run pytest -m "integration and slow"` | |
| 6. Type checking passes: `uv run mypy src/stroke_deepisles_demo/pipeline.py src/stroke_deepisles_demo/metrics.py` | |
| 7. Code coverage for pipeline module > 80% | |
| ## implementation notes | |
| - Use dataclasses for results (immutable, typed) | |
| - Consider caching the loaded dataset in module-level variable | |
| - Dice should handle edge cases (empty masks, shape mismatches) | |
| - CLI is optional but useful for manual testing | |
| - Batch processing is sequential by default (GPU constraint) | |