#!/usr/bin/env python3 """End-to-end coronary segmentation: CT image → binary mask → segmental labels. Usage: python scripts/run_pipeline.py --input /path/to/dicoms --output /path/to/results docker run --gpus all -v /data/in:/input -v /data/out:/output ct-heart-seg """ import argparse import logging import os import shutil import subprocess import sys import tempfile import time from pathlib import Path logger = logging.getLogger(__name__) BINARY_BUNDLE = "ct_binary_coronary_segmentation" SEGMENTAL_BUNDLE = "ct_segmental_coronary_segmentation" def find_bundle_dir(root: Path, name: str) -> Path: for prefix in ["", "bundle"]: d = root / prefix / name if prefix else root / name if (d / "configs").is_dir(): return d raise FileNotFoundError(f"Cannot find {name}/ in {root} or {root / 'bundle'}") def run_inference(bundle_dir: Path, extra_args: list[str], gpu: int, label: str): env = os.environ.copy() env["CUDA_VISIBLE_DEVICES"] = str(gpu) cmd = [ sys.executable, "-m", "monai.bundle", "run", "inference", "--config_file", "configs/ensemble_inference.yaml", *extra_args, ] logger.info("[%s] %s (cwd=%s)", label, " ".join(cmd), bundle_dir) t0 = time.time() result = subprocess.run(cmd, cwd=str(bundle_dir), env=env, capture_output=True, text=True) elapsed = time.time() - t0 if result.stdout: for line in result.stdout.strip().split("\n"): logger.info("[%s] %s", label, line) if result.stderr: for line in result.stderr.strip().split("\n"): logger.info("[%s] %s", label, line) if result.returncode != 0: logger.error("[%s] FAILED (exit %d) after %.1fs", label, result.returncode, elapsed) sys.exit(result.returncode) logger.info("[%s] Done in %.1fs", label, elapsed) def main(): parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--input", required=True) parser.add_argument("--output", required=True) parser.add_argument("--gpu", type=int, default=0) args = parser.parse_args() output_dir = Path(args.output) output_dir.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[logging.StreamHandler(), logging.FileHandler(output_dir / "pipeline.log")], force=True, ) repo_root = Path(__file__).resolve().parent.parent binary_dir = find_bundle_dir(repo_root, BINARY_BUNDLE) segmental_dir = find_bundle_dir(repo_root, SEGMENTAL_BUNDLE) logger.info("Input: %s | Output: %s | GPU: %d", args.input, output_dir, args.gpu) t_start = time.time() with tempfile.TemporaryDirectory(prefix="binary_output_") as tmp_binary: run_inference(binary_dir, ["--dataset_dir", str(Path(args.input).resolve()), "--output_dir", tmp_binary], args.gpu, "binary") binary_out = output_dir / "binary" binary_out.mkdir(parents=True, exist_ok=True) for f in Path(tmp_binary).glob("*"): shutil.copy2(f, binary_out / f.name) logger.info("Binary: %d files → %s", len(list(binary_out.glob("*.nii.gz"))), binary_out) segmental_out = output_dir / "segmental" segmental_out.mkdir(parents=True, exist_ok=True) run_inference(segmental_dir, ["--binary_label_dir", tmp_binary, "--output_dir", str(segmental_out)], args.gpu, "segmental") logger.info("Segmental: %d files → %s", len(list(segmental_out.glob("*.nii.gz"))), segmental_out) logger.info("Pipeline complete in %.1fs", time.time() - t_start) if __name__ == "__main__": main()