| """ |
| 变化检测器 - 计算差分图和变化量化 |
| """ |
| from pathlib import Path |
| from typing import Tuple, Optional, List, Dict, Any |
| import numpy as np |
| import nibabel as nib |
| from scipy import ndimage |
| from skimage import measure |
| import matplotlib.pyplot as plt |
| import matplotlib.colors as mcolors |
|
|
| from app.core.config import settings |
| from app.core.logging import logger |
| from app.core.exceptions import AnalysisError |
| from app.schemas.analysis import ComparisonResult, RECISTResponse |
|
|
|
|
| class ChangeDetector: |
| """变化检测器""" |
| |
| def __init__( |
| self, |
| threshold_hu: float = None, |
| gaussian_sigma: float = None |
| ): |
| """ |
| 初始化变化检测器 |
| |
| Args: |
| threshold_hu: HU 值变化阈值 |
| gaussian_sigma: 高斯平滑 sigma |
| """ |
| self.threshold_hu = threshold_hu or settings.DIFF_THRESHOLD_HU |
| self.gaussian_sigma = gaussian_sigma or settings.GAUSSIAN_SIGMA |
| |
| def compute_difference_map( |
| self, |
| followup: np.ndarray, |
| warped_baseline: np.ndarray, |
| mask: Optional[np.ndarray] = None |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| 计算差分图 |
| |
| Args: |
| followup: 随访图像 |
| warped_baseline: 配准后的基线图像 |
| mask: 可选的掩膜 (仅在掩膜区域计算) |
| |
| Returns: |
| (原始差分图, 阈值化后的显著差异图) |
| """ |
| logger.info("计算差分图...") |
| |
| |
| diff_map = followup.astype(np.float32) - warped_baseline.astype(np.float32) |
| |
| |
| diff_smoothed = ndimage.gaussian_filter(diff_map, sigma=self.gaussian_sigma) |
| |
| |
| if mask is not None: |
| diff_smoothed = diff_smoothed * mask |
| |
| |
| significant_changes = np.zeros_like(diff_smoothed) |
| significant_changes[np.abs(diff_smoothed) > self.threshold_hu] = diff_smoothed[np.abs(diff_smoothed) > self.threshold_hu] |
| |
| return diff_map, significant_changes |
| |
| def generate_heatmap( |
| self, |
| diff_map: np.ndarray, |
| background: np.ndarray, |
| output_path: Path, |
| slice_idx: Optional[int] = None, |
| view: str = "axial" |
| ) -> Path: |
| """ |
| 生成热力图 |
| |
| Args: |
| diff_map: 差分图 |
| background: 背景图像 |
| output_path: 输出路径 |
| slice_idx: 切片索引 (None 则自动选择最大变化切片) |
| view: 视图方向 (axial, sagittal, coronal) |
| |
| Returns: |
| 热力图文件路径 |
| """ |
| |
| if slice_idx is None: |
| |
| if view == "axial": |
| slice_changes = np.sum(np.abs(diff_map), axis=(0, 1)) |
| elif view == "sagittal": |
| slice_changes = np.sum(np.abs(diff_map), axis=(1, 2)) |
| else: |
| slice_changes = np.sum(np.abs(diff_map), axis=(0, 2)) |
| slice_idx = np.argmax(slice_changes) |
| |
| |
| if view == "axial": |
| bg_slice = background[:, :, slice_idx] |
| diff_slice = diff_map[:, :, slice_idx] |
| elif view == "sagittal": |
| bg_slice = background[slice_idx, :, :] |
| diff_slice = diff_map[slice_idx, :, :] |
| else: |
| bg_slice = background[:, slice_idx, :] |
| diff_slice = diff_map[:, slice_idx, :] |
| |
| |
| fig, ax = plt.subplots(1, 1, figsize=(10, 10)) |
| |
| |
| ax.imshow(bg_slice.T, cmap='gray', origin='lower', aspect='equal') |
| |
| |
| |
| colors = [(0, 0, 1), (1, 1, 1, 0), (1, 0, 0)] |
| n_bins = 256 |
| cmap = mcolors.LinearSegmentedColormap.from_list("diff_cmap", colors, N=n_bins) |
| |
| |
| max_val = max(np.abs(diff_slice).max(), 1) |
| diff_normalized = diff_slice / max_val |
| |
| |
| alpha = np.abs(diff_normalized) |
| alpha = np.clip(alpha, 0, 1) |
| |
| |
| heatmap = ax.imshow( |
| diff_normalized.T, |
| cmap='RdBu_r', |
| origin='lower', |
| aspect='equal', |
| alpha=alpha.T * 0.7, |
| vmin=-1, |
| vmax=1 |
| ) |
| |
| |
| cbar = plt.colorbar(heatmap, ax=ax, fraction=0.046, pad=0.04) |
| cbar.set_label('HU Change (normalized)', fontsize=12) |
| |
| ax.set_title(f'{view.capitalize()} View - Slice {slice_idx}', fontsize=14) |
| ax.axis('off') |
| |
| |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| plt.savefig(output_path, dpi=150, bbox_inches='tight', facecolor='black') |
| plt.close() |
| |
| logger.info(f"热力图已保存: {output_path}") |
| return output_path |
| |
| def quantify_changes( |
| self, |
| diff_map: np.ndarray, |
| significant: np.ndarray, |
| spacing: Tuple[float, ...] = (1.0, 1.0, 1.0) |
| ) -> Dict[str, Any]: |
| """ |
| 量化整体变化(不需要特定 ROI) |
| |
| Args: |
| diff_map: 差分图 |
| significant: 显著变化掩膜 |
| spacing: 体素间距 (mm) |
| |
| Returns: |
| 变化量化结果 |
| """ |
| |
| voxel_volume_cc = np.prod(spacing) / 1000 |
| |
| |
| changed_voxels = (significant != 0).sum() |
| total_voxels = significant.size |
| change_percent = (changed_voxels / total_voxels) * 100 if total_voxels > 0 else 0 |
| |
| |
| changed_volume_cc = changed_voxels * voxel_volume_cc |
| |
| |
| significant_values = significant[significant != 0] |
| if len(significant_values) > 0: |
| max_hu_increase = float(significant_values.max()) |
| max_hu_decrease = float(significant_values.min()) |
| mean_hu_change = float(significant_values.mean()) |
| else: |
| max_hu_increase = 0.0 |
| max_hu_decrease = 0.0 |
| mean_hu_change = 0.0 |
| |
| |
| increase_mask = significant > 0 |
| decrease_mask = significant < 0 |
| |
| increase_voxels = increase_mask.sum() |
| decrease_voxels = decrease_mask.sum() |
| |
| return { |
| "changed_voxels": int(changed_voxels), |
| "total_voxels": int(total_voxels), |
| "change_percent": float(change_percent), |
| "changed_volume_cc": float(changed_volume_cc), |
| "max_hu_increase": max_hu_increase, |
| "max_hu_decrease": max_hu_decrease, |
| "mean_hu_change": mean_hu_change, |
| "increase_voxels": int(increase_voxels), |
| "decrease_voxels": int(decrease_voxels), |
| "increase_percent": float(increase_voxels / total_voxels * 100) if total_voxels > 0 else 0, |
| "decrease_percent": float(decrease_voxels / total_voxels * 100) if total_voxels > 0 else 0 |
| } |
| |
| def quantify_roi_changes( |
| self, |
| followup: np.ndarray, |
| warped_baseline: np.ndarray, |
| segmentation: np.ndarray, |
| roi_label: int, |
| spacing: Tuple[float, ...] = (1.0, 1.0, 1.0) |
| ) -> Dict[str, Any]: |
| """ |
| 量化特定 ROI 区域的变化 |
| |
| Args: |
| followup: 随访图像 |
| warped_baseline: 配准后的基线图像 |
| segmentation: 分割掩膜 |
| roi_label: ROI 标签值 |
| spacing: 体素间距 (mm) |
| |
| Returns: |
| 变化量化结果 |
| """ |
| |
| roi_mask = (segmentation == roi_label) |
| |
| if roi_mask.sum() == 0: |
| return {"error": "ROI not found"} |
| |
| |
| voxel_volume_cc = np.prod(spacing) / 1000 |
| |
| |
| baseline_roi = warped_baseline * roi_mask |
| baseline_volume = roi_mask.sum() * voxel_volume_cc |
| |
| |
| followup_roi = followup * roi_mask |
| |
| |
| baseline_mean_hu = baseline_roi[roi_mask].mean() |
| followup_mean_hu = followup_roi[roi_mask].mean() |
| hu_change = followup_mean_hu - baseline_mean_hu |
| |
| |
| coords = np.array(np.where(roi_mask)).T |
| if len(coords) > 0: |
| bbox_size = coords.max(axis=0) - coords.min(axis=0) |
| baseline_diameter = max(bbox_size) * spacing[0] |
| followup_diameter = baseline_diameter |
| else: |
| baseline_diameter = 0 |
| followup_diameter = 0 |
| |
| return { |
| "baseline_volume_cc": float(baseline_volume), |
| "followup_volume_cc": float(baseline_volume), |
| "volume_change_percent": 0.0, |
| "baseline_diameter_mm": float(baseline_diameter), |
| "followup_diameter_mm": float(followup_diameter), |
| "diameter_change_percent": 0.0, |
| "baseline_mean_hu": float(baseline_mean_hu), |
| "followup_mean_hu": float(followup_mean_hu), |
| "hu_change": float(hu_change) |
| } |
| |
| def evaluate_recist( |
| self, |
| baseline_diameter: float, |
| followup_diameter: float |
| ) -> RECISTResponse: |
| """ |
| 根据 RECIST 1.1 标准评估疗效 |
| |
| Args: |
| baseline_diameter: 基线最大直径 (mm) |
| followup_diameter: 随访最大直径 (mm) |
| |
| Returns: |
| RECIST 评估结果 |
| """ |
| if baseline_diameter == 0: |
| return RECISTResponse.SD |
| |
| change_percent = ((followup_diameter - baseline_diameter) / baseline_diameter) * 100 |
| |
| if followup_diameter == 0: |
| return RECISTResponse.CR |
| elif change_percent <= -30: |
| return RECISTResponse.PR |
| elif change_percent >= 20: |
| return RECISTResponse.PD |
| else: |
| return RECISTResponse.SD |
| |
| def calculate_doubling_time( |
| self, |
| baseline_volume: float, |
| followup_volume: float, |
| days_between: int |
| ) -> Optional[float]: |
| """ |
| 计算体积倍增时间 |
| |
| Args: |
| baseline_volume: 基线体积 |
| followup_volume: 随访体积 |
| days_between: 间隔天数 |
| |
| Returns: |
| 倍增时间 (天) 或 None |
| """ |
| if baseline_volume <= 0 or followup_volume <= 0 or days_between <= 0: |
| return None |
| |
| if followup_volume <= baseline_volume: |
| return None |
| |
| |
| ratio = followup_volume / baseline_volume |
| if ratio > 1: |
| doubling_time = days_between * np.log(2) / np.log(ratio) |
| return float(doubling_time) |
| |
| return None |
|
|
|
|
|
|