Instructions to use Aditya2162/ivus-segmentation with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Keras
How to use Aditya2162/ivus-segmentation with Keras:
# Available backend options are: "jax", "torch", "tensorflow". import os os.environ["KERAS_BACKEND"] = "jax" import keras model = keras.saving.load_model("hf://Aditya2162/ivus-segmentation") - Notebooks
- Google Colab
- Kaggle
| #!/usr/bin/env python3 | |
| """Evaluation frame-bank workflow for manual annotation. | |
| This pathway is intentionally outside the core library pipeline. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import hashlib | |
| import json | |
| import os | |
| import sys | |
| import tempfile | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| from matplotlib.backend_bases import MouseButton | |
| from matplotlib.lines import Line2D | |
| from matplotlib.widgets import Button, RadioButtons, Slider | |
| ROOT = Path(__file__).resolve().parents[2] | |
| if str(ROOT) not in sys.path: | |
| sys.path.insert(0, str(ROOT)) | |
| from deepivus.io.dicom import read_dicom | |
| GROUPS = ("bifurcation", "paul") | |
| class FrameState: | |
| frame: int | |
| lumen_x: list[float] | |
| lumen_y: list[float] | |
| lumen_confidence: float | None | |
| bifurcation: bool | |
| def _now_iso() -> str: | |
| return datetime.now().isoformat(timespec="seconds") | |
| def _safe_float(value: Any) -> float | None: | |
| if value is None: | |
| return None | |
| try: | |
| out = float(value) | |
| except (TypeError, ValueError): | |
| return None | |
| if np.isnan(out): | |
| return None | |
| return out | |
| def _video_id(group: str, dicom_path: Path) -> str: | |
| return f"{group}/{dicom_path.stem}" | |
| def _stable_video_seed(video_id: str, base_seed: int) -> int: | |
| digest = hashlib.sha256(video_id.encode("utf-8")).hexdigest()[:16] | |
| return (int(digest, 16) ^ int(base_seed)) & 0xFFFFFFFF | |
| def _read_jsonl(path: Path) -> list[dict[str, Any]]: | |
| rows: list[dict[str, Any]] = [] | |
| if not path.exists(): | |
| return rows | |
| with path.open("r", encoding="utf-8") as fp: | |
| for raw in fp: | |
| line = raw.strip() | |
| if not line: | |
| continue | |
| rows.append(json.loads(line)) | |
| return rows | |
| def _write_jsonl_atomic(path: Path, rows: list[dict[str, Any]]) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with tempfile.NamedTemporaryFile("w", encoding="utf-8", dir=path.parent, delete=False) as tmp: | |
| tmp_path = Path(tmp.name) | |
| for rec in rows: | |
| tmp.write(json.dumps(rec) + "\n") | |
| tmp.flush() | |
| os.fsync(tmp.fileno()) | |
| os.replace(tmp_path, path) | |
| def _choose_frames(num_frames: int, frames_per_video: int, seed: int) -> list[int]: | |
| k = min(frames_per_video, num_frames) | |
| rng = np.random.default_rng(seed) | |
| picks = rng.choice(num_frames, size=k, replace=False) | |
| return sorted(int(v) for v in picks) | |
| def _build_bank_file( | |
| dicom_path: Path, | |
| group: str, | |
| out_root: Path, | |
| frames_per_video: int, | |
| base_seed: int, | |
| force: bool, | |
| ) -> Path: | |
| video_id = _video_id(group, dicom_path) | |
| out_dir = out_root / group | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| out_file = out_dir / f"{dicom_path.stem}.jsonl" | |
| if out_file.exists() and not force: | |
| return out_file | |
| _, images = read_dicom(str(dicom_path)) | |
| seed = _stable_video_seed(video_id, base_seed) | |
| sampled = _choose_frames(images.shape[0], frames_per_video, seed) | |
| default_bif = group == "bifurcation" | |
| with out_file.open("w", encoding="utf-8") as fp: | |
| meta = { | |
| "record_type": "meta", | |
| "schema": "deepivus_eval_frame_bank_v1", | |
| "created_at": _now_iso(), | |
| "video_id": video_id, | |
| "group": group, | |
| "dicom_path": str(dicom_path.resolve()), | |
| "seed": seed, | |
| "requested_frames_per_video": int(frames_per_video), | |
| "sampled_frames": sampled, | |
| "num_sampled_frames": len(sampled), | |
| "default_bifurcation": default_bif, | |
| } | |
| fp.write(json.dumps(meta) + "\n") | |
| for i, frame_idx in enumerate(sampled): | |
| rec = { | |
| "record_type": "frame", | |
| "sample_order": i, | |
| "frame": int(frame_idx), | |
| "bifurcation": default_bif, | |
| "lumen_confidence": None, | |
| "lumen": {"x": [], "y": []}, | |
| "annotated_at": None, | |
| "updated_at": None, | |
| } | |
| fp.write(json.dumps(rec) + "\n") | |
| return out_file | |
| def build_all_banks(data_root: Path, out_root: Path, frames_per_video: int, seed: int, force: bool) -> list[Path]: | |
| out_files: list[Path] = [] | |
| for group in GROUPS: | |
| group_dir = data_root / group | |
| if not group_dir.exists(): | |
| continue | |
| for dicom_path in sorted(group_dir.glob("*.dcm")): | |
| out_files.append(_build_bank_file(dicom_path, group, out_root, frames_per_video, seed, force)) | |
| return out_files | |
| def _load_bank_states(bank_path: Path) -> tuple[dict[str, Any], list[FrameState]]: | |
| rows = _read_jsonl(bank_path) | |
| if not rows: | |
| raise ValueError(f"Empty bank file: {bank_path}") | |
| meta = rows[0] | |
| if meta.get("record_type") != "meta": | |
| raise ValueError(f"Missing meta header in {bank_path}") | |
| base: dict[int, FrameState] = {} | |
| for rec in rows: | |
| rtype = rec.get("record_type") | |
| if rtype == "frame": | |
| frame_idx = int(rec["frame"]) | |
| base[frame_idx] = FrameState( | |
| frame=frame_idx, | |
| lumen_x=[float(v) for v in rec.get("lumen", {}).get("x", [])], | |
| lumen_y=[float(v) for v in rec.get("lumen", {}).get("y", [])], | |
| lumen_confidence=_safe_float(rec.get("lumen_confidence")), | |
| bifurcation=bool(rec.get("bifurcation", False)), | |
| ) | |
| sampled = [int(v) for v in meta.get("sampled_frames", [])] | |
| states = [base[idx] for idx in sampled if idx in base] | |
| return meta, states | |
| def _list_bank_entries(out_root: Path) -> list[dict[str, Any]]: | |
| entries: list[dict[str, Any]] = [] | |
| for group in GROUPS: | |
| group_dir = out_root / group | |
| if not group_dir.exists(): | |
| continue | |
| for bank_path in sorted(group_dir.glob("*.jsonl")): | |
| rows = _read_jsonl(bank_path) | |
| if not rows: | |
| continue | |
| meta = rows[0] if rows[0].get("record_type") == "meta" else {} | |
| sampled = [r for r in rows if r.get("record_type") == "frame"] | |
| edited_frames = { | |
| int(r["frame"]) | |
| for r in sampled | |
| if r.get("annotated_at") is not None or len(r.get("lumen", {}).get("x", [])) > 0 | |
| } | |
| dicom_path = Path(meta.get("dicom_path", "")) | |
| if not dicom_path.exists(): | |
| # Fallback from filename/group when absolute path changed. | |
| dicom_path = Path("data") / group / f"{bank_path.stem}.dcm" | |
| label = f"{group}/{bank_path.stem} [{len(edited_frames)}/{len(sampled)}]" | |
| entries.append( | |
| { | |
| "label": label, | |
| "group": group, | |
| "stem": bank_path.stem, | |
| "bank_path": bank_path, | |
| "dicom_path": dicom_path, | |
| } | |
| ) | |
| return entries | |
| class BankPicker: | |
| """Small GUI chooser for selecting a video bank to annotate.""" | |
| def __init__(self, entries: list[dict[str, Any]]) -> None: | |
| self.entries = entries | |
| self.selected_idx = 0 | |
| self.selected: dict[str, Any] | None = None | |
| self.fig = None | |
| self.radio = None | |
| self.info_text = None | |
| def _on_pick(self, label: str) -> None: | |
| for i, entry in enumerate(self.entries): | |
| if entry["label"] == label: | |
| self.selected_idx = i | |
| break | |
| self._render_info() | |
| def _on_open(self, _event: Any) -> None: | |
| self.selected = self.entries[self.selected_idx] | |
| plt.close(self.fig) | |
| def _on_cancel(self, _event: Any) -> None: | |
| self.selected = None | |
| plt.close(self.fig) | |
| def _render_info(self) -> None: | |
| entry = self.entries[self.selected_idx] | |
| self.info_text.set_text( | |
| f"Selected: {entry['group']}/{entry['stem']}\n" | |
| f"DICOM: {entry['dicom_path']}\n" | |
| f"Bank: {entry['bank_path']}" | |
| ) | |
| self.fig.canvas.draw_idle() | |
| def show(self) -> dict[str, Any] | None: | |
| if not self.entries: | |
| raise ValueError("No bank files found to pick from.") | |
| labels = [e["label"] for e in self.entries] | |
| self.fig = plt.figure(figsize=(10, 8)) | |
| self.fig.canvas.manager.set_window_title("DeepIVUS Eval Bank Picker") | |
| ax_radio = self.fig.add_axes([0.05, 0.2, 0.9, 0.74]) | |
| self.radio = RadioButtons(ax_radio, labels, active=0) | |
| self.radio.on_clicked(self._on_pick) | |
| ax_open = self.fig.add_axes([0.2, 0.06, 0.25, 0.08]) | |
| ax_cancel = self.fig.add_axes([0.55, 0.06, 0.25, 0.08]) | |
| open_btn = Button(ax_open, "Open Selected", color="#D6F5D6", hovercolor="#BFF0BF") | |
| cancel_btn = Button(ax_cancel, "Cancel", color="#F5D6D6", hovercolor="#F0BFBF") | |
| open_btn.on_clicked(self._on_open) | |
| cancel_btn.on_clicked(self._on_cancel) | |
| self.info_text = self.fig.text(0.05, 0.01, "", fontsize=9) | |
| self._render_info() | |
| plt.show() | |
| return self.selected | |
| class EvalFrameBankEditor: | |
| def __init__(self, dicom_path: Path, bank_path: Path) -> None: | |
| self.dicom_path = dicom_path | |
| self.bank_path = bank_path | |
| _, self.images = read_dicom(str(dicom_path)) | |
| self.meta, self.states = _load_bank_states(bank_path) | |
| if not self.states: | |
| raise ValueError(f"No sampled frame records found in {bank_path}") | |
| self.current = 0 | |
| self.drag_target: int | None = None | |
| self.drag_threshold_px = 10.0 | |
| self.dirty = False | |
| self.is_updating_slider = False | |
| self.fig = None | |
| self.ax_image = None | |
| self.image_artist = None | |
| self.lumen_line: Line2D | None = None | |
| self.lumen_points = None | |
| self.slider: Slider | None = None | |
| self.status_text = None | |
| self.bif_button = None | |
| self.clear_button = None | |
| def _state(self) -> FrameState: | |
| return self.states[self.current] | |
| def _autosave_frame(self, reason: str) -> None: | |
| s = self._state() | |
| rows = _read_jsonl(self.bank_path) | |
| if not rows: | |
| raise ValueError(f"Bank file is empty: {self.bank_path}") | |
| meta = rows[0] | |
| sampled = [int(v) for v in meta.get("sampled_frames", [])] | |
| frame_records: dict[int, dict[str, Any]] = { | |
| int(r["frame"]): dict(r) for r in rows if r.get("record_type") == "frame" | |
| } | |
| now = _now_iso() | |
| existing = frame_records.get(s.frame, {}) | |
| frame_records[s.frame] = { | |
| "record_type": "frame", | |
| "sample_order": existing.get("sample_order", self.current), | |
| "frame": int(s.frame), | |
| "bifurcation": bool(s.bifurcation), | |
| "lumen_confidence": s.lumen_confidence, | |
| "lumen": {"x": [float(v) for v in s.lumen_x], "y": [float(v) for v in s.lumen_y]}, | |
| "annotated_at": existing.get("annotated_at") or now, | |
| "updated_at": now, | |
| "last_save_reason": reason, | |
| } | |
| ordered_frames: list[dict[str, Any]] = [] | |
| for i, frame_idx in enumerate(sampled): | |
| rec = frame_records.get(frame_idx) | |
| if rec is None: | |
| rec = { | |
| "record_type": "frame", | |
| "sample_order": i, | |
| "frame": int(frame_idx), | |
| "bifurcation": bool(meta.get("default_bifurcation", False)), | |
| "lumen_confidence": None, | |
| "lumen": {"x": [], "y": []}, | |
| "annotated_at": None, | |
| "updated_at": None, | |
| } | |
| rec["sample_order"] = i | |
| ordered_frames.append(rec) | |
| _write_jsonl_atomic(self.bank_path, [meta] + ordered_frames) | |
| self.dirty = False | |
| def _build_ui(self) -> None: | |
| self.fig = plt.figure(figsize=(13, 9)) | |
| self.fig.canvas.manager.set_window_title("DeepIVUS Eval Frame-Bank Editor") | |
| self.ax_image = self.fig.add_axes([0.05, 0.22, 0.9, 0.74]) | |
| self.ax_image.set_title("DeepIVUS Eval Frame-Bank Editor", fontsize=14, weight="bold") | |
| self.ax_image.set_axis_off() | |
| slider_ax = self.fig.add_axes([0.12, 0.13, 0.76, 0.035]) | |
| self.slider = Slider( | |
| ax=slider_ax, | |
| label="Sample", | |
| valmin=0, | |
| valmax=len(self.states) - 1, | |
| valinit=0, | |
| valstep=1, | |
| color="#1f77b4", | |
| ) | |
| prev_ax = self.fig.add_axes([0.12, 0.05, 0.1, 0.055]) | |
| next_ax = self.fig.add_axes([0.24, 0.05, 0.1, 0.055]) | |
| save_ax = self.fig.add_axes([0.38, 0.05, 0.14, 0.055]) | |
| clear_ax = self.fig.add_axes([0.54, 0.05, 0.14, 0.055]) | |
| bif_ax = self.fig.add_axes([0.72, 0.05, 0.16, 0.055]) | |
| prev_button = Button(prev_ax, "Prev Frame", color="#E0E0E0", hovercolor="#D0D0D0") | |
| next_button = Button(next_ax, "Next Frame", color="#E0E0E0", hovercolor="#D0D0D0") | |
| save_button = Button(save_ax, "Save Frame", color="#D6F5D6", hovercolor="#BFF0BF") | |
| self.clear_button = Button(clear_ax, "Clear Lumen", color="#F9E6CC", hovercolor="#F4DDB8") | |
| self.bif_button = Button(bif_ax, "Bifurcation: No", color="#F5D6D6", hovercolor="#F0BFBF") | |
| self.status_text = self.fig.text(0.05, 0.18, "", fontsize=10) | |
| self.slider.on_changed(self._on_slider) | |
| prev_button.on_clicked(self._on_prev) | |
| next_button.on_clicked(self._on_next) | |
| save_button.on_clicked(self._on_save) | |
| self.clear_button.on_clicked(self._on_clear_lumen) | |
| self.bif_button.on_clicked(self._on_toggle_bifurcation) | |
| self.fig.canvas.mpl_connect("button_press_event", self._on_press) | |
| self.fig.canvas.mpl_connect("motion_notify_event", self._on_motion) | |
| self.fig.canvas.mpl_connect("button_release_event", self._on_release) | |
| self.fig.canvas.mpl_connect("key_press_event", self._on_key) | |
| self.fig.canvas.mpl_connect("close_event", self._on_close) | |
| def _render(self) -> None: | |
| s = self._state() | |
| image = self.images[s.frame] | |
| if self.image_artist is None: | |
| self.image_artist = self.ax_image.imshow(image, cmap="gray") | |
| self.lumen_line = self.ax_image.plot([], [], color="#1db954", lw=2)[0] | |
| self.lumen_points = self.ax_image.scatter([], [], c="#1db954", s=28, edgecolors="black", linewidths=0.4) | |
| else: | |
| self.image_artist.set_data(image) | |
| self.lumen_line.set_data(s.lumen_x + s.lumen_x[:1], s.lumen_y + s.lumen_y[:1]) | |
| lumen_offsets = np.c_[s.lumen_x, s.lumen_y] if s.lumen_x and s.lumen_y else np.empty((0, 2)) | |
| self.lumen_points.set_offsets(lumen_offsets) | |
| bif_text = "Yes" if s.bifurcation else "No" | |
| bif_color = "#D6F5D6" if s.bifurcation else "#F5D6D6" | |
| self.bif_button.label.set_text(f"Bifurcation: {bif_text}") | |
| self.bif_button.ax.set_facecolor(bif_color) | |
| self.status_text.set_text( | |
| f"Video: {self.meta.get('video_id')} Sample {self.current + 1}/{len(self.states)}" | |
| f" DICOM frame: {s.frame} Lumen pts: {len(s.lumen_x)}" | |
| f" Add: right/middle/double-left/shift-left" | |
| ) | |
| self.fig.canvas.draw_idle() | |
| def _set_current(self, idx: int) -> None: | |
| idx = int(np.clip(idx, 0, len(self.states) - 1)) | |
| if idx == self.current: | |
| return | |
| if self.dirty: | |
| self._autosave_frame("sample_change") | |
| self.current = idx | |
| self.is_updating_slider = True | |
| self.slider.set_val(idx) | |
| self.is_updating_slider = False | |
| self._render() | |
| def _nearest_point(self, x: float, y: float) -> int | None: | |
| s = self._state() | |
| if not s.lumen_x: | |
| return None | |
| pts = np.column_stack((np.asarray(s.lumen_x), np.asarray(s.lumen_y))) | |
| d = np.linalg.norm(pts - np.asarray([x, y]), axis=1) | |
| i = int(np.argmin(d)) | |
| if float(d[i]) > self.drag_threshold_px: | |
| return None | |
| return i | |
| def _on_slider(self, val: float) -> None: | |
| if self.is_updating_slider: | |
| return | |
| self._set_current(int(val)) | |
| def _on_prev(self, _event: Any) -> None: | |
| self._set_current(self.current - 1) | |
| def _on_next(self, _event: Any) -> None: | |
| self._set_current(self.current + 1) | |
| def _on_save(self, _event: Any) -> None: | |
| self._autosave_frame("manual_save") | |
| self._render() | |
| def _on_toggle_bifurcation(self, _event: Any) -> None: | |
| s = self._state() | |
| s.bifurcation = not s.bifurcation | |
| self.dirty = True | |
| self._autosave_frame("bifurcation_toggle") | |
| self._render() | |
| def _on_clear_lumen(self, _event: Any) -> None: | |
| s = self._state() | |
| s.lumen_x = [] | |
| s.lumen_y = [] | |
| self.dirty = True | |
| self._autosave_frame("clear_lumen") | |
| self._render() | |
| def _on_key(self, event: Any) -> None: | |
| if event.key in {"left", "a"}: | |
| self._set_current(self.current - 1) | |
| elif event.key in {"right", "d"}: | |
| self._set_current(self.current + 1) | |
| elif event.key == "s": | |
| self._on_save(event) | |
| elif event.key == "b": | |
| self._on_toggle_bifurcation(event) | |
| elif event.key in {"x", "delete"}: | |
| self._on_clear_lumen(event) | |
| def _on_press(self, event: Any) -> None: | |
| if event.inaxes != self.ax_image: | |
| return | |
| if event.xdata is None or event.ydata is None: | |
| return | |
| s = self._state() | |
| h, w = self.images[s.frame].shape | |
| x = float(np.clip(event.xdata, 0, w - 1)) | |
| y = float(np.clip(event.ydata, 0, h - 1)) | |
| is_left = event.button in (1, MouseButton.LEFT) | |
| is_add_click = ( | |
| event.button in (3, 2, MouseButton.RIGHT, MouseButton.MIDDLE) | |
| or (is_left and bool(getattr(event, "dblclick", False))) | |
| or (is_left and str(getattr(event, "key", "")).lower() == "shift") | |
| ) | |
| # Left-click: drag nearest existing point. | |
| if is_left and not is_add_click: | |
| self.drag_target = self._nearest_point(x, y) | |
| return | |
| # Right/middle/double-left/shift-left: add lumen point. | |
| if is_add_click: | |
| s.lumen_x.append(x) | |
| s.lumen_y.append(y) | |
| self.dirty = True | |
| self._autosave_frame("point_add") | |
| self._render() | |
| def _on_motion(self, event: Any) -> None: | |
| if self.drag_target is None: | |
| return | |
| if event.inaxes != self.ax_image or event.xdata is None or event.ydata is None: | |
| return | |
| s = self._state() | |
| idx = self.drag_target | |
| h, w = self.images[s.frame].shape | |
| x = float(np.clip(event.xdata, 0, w - 1)) | |
| y = float(np.clip(event.ydata, 0, h - 1)) | |
| s.lumen_x[idx] = x | |
| s.lumen_y[idx] = y | |
| self.dirty = True | |
| self._render() | |
| def _on_release(self, _event: Any) -> None: | |
| if self.drag_target is None: | |
| return | |
| self.drag_target = None | |
| if self.dirty: | |
| self._autosave_frame("point_drag") | |
| self._render() | |
| def _on_close(self, _event: Any) -> None: | |
| if self.dirty: | |
| self._autosave_frame("window_close") | |
| def show(self) -> None: | |
| self._build_ui() | |
| self._render() | |
| plt.show() | |
| def _resolve_group_from_dicom(dicom_path: Path) -> str: | |
| group = dicom_path.parent.name | |
| if group not in GROUPS: | |
| raise ValueError(f"Expected DICOM under data/{{bifurcation,paul}}, got: {dicom_path}") | |
| return group | |
| def _cmd_build(args: argparse.Namespace) -> None: | |
| files = build_all_banks( | |
| data_root=Path(args.data_root), | |
| out_root=Path(args.out_root), | |
| frames_per_video=args.frames_per_video, | |
| seed=args.seed, | |
| force=args.force, | |
| ) | |
| print(f"Prepared {len(files)} bank files under {args.out_root}") | |
| def _cmd_annotate(args: argparse.Namespace) -> None: | |
| data_root = Path(args.data_root) | |
| out_root = Path(args.out_root) | |
| if (args.group and not args.stem) or (args.stem and not args.group): | |
| raise ValueError("Provide both --group and --stem together, or provide neither.") | |
| if args.dicom_path: | |
| dicom_path = Path(args.dicom_path) | |
| group = _resolve_group_from_dicom(dicom_path) | |
| elif args.group and args.stem: | |
| group = args.group | |
| dicom_path = data_root / group / f"{args.stem}.dcm" | |
| else: | |
| entries = _list_bank_entries(out_root) | |
| if not entries: | |
| # Build banks if none exist yet. | |
| build_all_banks( | |
| data_root=data_root, | |
| out_root=out_root, | |
| frames_per_video=args.frames_per_video, | |
| seed=args.seed, | |
| force=False, | |
| ) | |
| entries = _list_bank_entries(out_root) | |
| picked = BankPicker(entries).show() | |
| if picked is None: | |
| return | |
| dicom_path = Path(picked["dicom_path"]) | |
| group = str(picked["group"]) | |
| if not dicom_path.exists(): | |
| raise FileNotFoundError(f"DICOM not found: {dicom_path}") | |
| bank_path = _build_bank_file( | |
| dicom_path=dicom_path, | |
| group=group, | |
| out_root=out_root, | |
| frames_per_video=args.frames_per_video, | |
| base_seed=args.seed, | |
| force=False, | |
| ) | |
| editor = EvalFrameBankEditor(dicom_path=dicom_path, bank_path=bank_path) | |
| editor.show() | |
| def _cmd_status(args: argparse.Namespace) -> None: | |
| out_root = Path(args.out_root) | |
| total = 0 | |
| completed = 0 | |
| for group in GROUPS: | |
| for bank_path in sorted((out_root / group).glob("*.jsonl")) if (out_root / group).exists() else []: | |
| rows = _read_jsonl(bank_path) | |
| sampled = [r for r in rows if r.get("record_type") == "frame"] | |
| edited_frames = { | |
| int(r["frame"]) | |
| for r in sampled | |
| if r.get("annotated_at") is not None or len(r.get("lumen", {}).get("x", [])) > 0 | |
| } | |
| total += 1 | |
| if sampled and len(edited_frames) >= len(sampled): | |
| completed += 1 | |
| print( | |
| f"{group}/{bank_path.stem}: {len(edited_frames)}/{len(sampled)} sampled frames edited" | |
| ) | |
| print(f"Completed videos: {completed}/{total}") | |
| def build_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser(description="Eval frame-bank annotation workflow") | |
| sub = parser.add_subparsers(dest="command", required=True) | |
| p_build = sub.add_parser("build", help="Create locked sampled-frame bank files for all videos") | |
| p_build.add_argument("--data-root", default="data", help="Root data folder with bifurcation/ and paul/") | |
| p_build.add_argument("--out-root", default="evals/frame_bank_merged", help="Output bank root") | |
| p_build.add_argument("--frames-per-video", type=int, default=15) | |
| p_build.add_argument("--seed", type=int, default=20260218) | |
| p_build.add_argument("--force", action="store_true", help="Rebuild existing bank files") | |
| p_build.set_defaults(func=_cmd_build) | |
| p_annotate = sub.add_parser( | |
| "annotate", | |
| help="Open sampled-frame GUI for one video (or launch bank-picker GUI if no video is specified)", | |
| ) | |
| p_annotate.add_argument("--data-root", default="data", help="Root data folder with bifurcation/ and paul/") | |
| p_annotate.add_argument("--out-root", default="evals/frame_bank_merged", help="Output bank root") | |
| p_annotate.add_argument("--frames-per-video", type=int, default=15) | |
| p_annotate.add_argument("--seed", type=int, default=20260218) | |
| p_annotate.add_argument("--dicom-path", default=None, help="Direct path to DICOM") | |
| p_annotate.add_argument("--group", choices=GROUPS, default=None, help="Group when using --stem") | |
| p_annotate.add_argument("--stem", default=None, help="File stem (e.g. FILE0003) when using --group") | |
| p_annotate.set_defaults(func=_cmd_annotate) | |
| p_status = sub.add_parser("status", help="Show annotation progress for bank files") | |
| p_status.add_argument("--out-root", default="evals/frame_bank_merged", help="Output bank root") | |
| p_status.set_defaults(func=_cmd_status) | |
| return parser | |
| def main() -> None: | |
| parser = build_parser() | |
| args = parser.parse_args() | |
| args.func(args) | |
| if __name__ == "__main__": | |
| main() | |