neuroscan-ai / scripts /debug_backend.py
cyd0806's picture
Upload NeuroScan AI code
c2d8817 verified
#!/usr/bin/env python3
"""
NeuroScan AI 后端调试脚本
这个脚本用于独立测试后端的核心功能:
1. DICOM/NIfTI 加载
2. 图像配准(刚性 + 非刚性)
3. 变化检测
4. 报告生成(模板模式 + LLM 模式)
使用方法:
python scripts/debug_backend.py
作者: NeuroScan AI Team
日期: 2026-01-28
"""
import os
import sys
from pathlib import Path
# 添加项目根目录到 Python 路径
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))
import numpy as np
import nibabel as nib
from datetime import datetime
import json
import traceback
# 设置环境变量
os.environ['PYTHONIOENCODING'] = 'utf-8'
def print_header(title: str):
"""打印标题"""
print("\n" + "=" * 60)
print(f" {title}")
print("=" * 60)
def print_step(step: str, status: str = "running"):
"""打印步骤"""
symbols = {
"running": "🔄",
"success": "✅",
"error": "❌",
"info": "ℹ️",
"warning": "⚠️"
}
print(f"\n{symbols.get(status, '•')} {step}")
def print_dict(d: dict, indent: int = 2):
"""格式化打印字典"""
for k, v in d.items():
if isinstance(v, float):
print(f"{' ' * indent}{k}: {v:.4f}")
else:
print(f"{' ' * indent}{k}: {v}")
class BackendDebugger:
"""后端调试器"""
def __init__(self):
self.data_dir = project_root / "data" / "processed"
self.output_dir = project_root / "output" / "debug_results"
self.output_dir.mkdir(parents=True, exist_ok=True)
# 测试结果
self.results = {
"timestamp": datetime.now().isoformat(),
"tests": {}
}
def find_test_data(self):
"""查找可用的测试数据"""
print_step("查找测试数据", "running")
# 查找 real_lung 数据
lung_dirs = list(self.data_dir.glob("real_lung_*"))
if lung_dirs:
print(f" 找到 {len(lung_dirs)} 个肺部 CT 数据集:")
for d in lung_dirs:
files = list(d.glob("*.nii.gz"))
print(f" - {d.name}: {len(files)} 个文件")
return lung_dirs[0] # 返回第一个
# 查找其他数据
all_dirs = [d for d in self.data_dir.iterdir() if d.is_dir()]
if all_dirs:
print(f" 找到 {len(all_dirs)} 个数据目录")
return all_dirs[0]
return None
def test_dicom_loader(self):
"""测试 DICOM/NIfTI 加载器"""
print_header("测试 1: DICOM/NIfTI 加载器")
try:
from app.services.dicom import DicomLoader
loader = DicomLoader()
print_step("DicomLoader 初始化成功", "success")
# 查找测试文件
test_dir = self.find_test_data()
if test_dir is None:
print_step("未找到测试数据,跳过加载测试", "warning")
return None, None
# 加载 NIfTI 文件
baseline_path = test_dir / "baseline.nii.gz"
followup_path = test_dir / "followup.nii.gz"
if not baseline_path.exists():
# 尝试其他文件名
nii_files = list(test_dir.glob("*.nii.gz"))
if len(nii_files) >= 2:
baseline_path = nii_files[0]
followup_path = nii_files[1]
elif len(nii_files) == 1:
baseline_path = nii_files[0]
followup_path = nii_files[0] # 使用同一个文件进行测试
print_step(f"加载基线: {baseline_path.name}", "running")
baseline_data, baseline_img = loader.load_nifti(baseline_path)
print(f" 形状: {baseline_data.shape}")
print(f" 体素大小: {baseline_img.header.get_zooms()[:3]}")
print(f" 数据范围: [{baseline_data.min():.1f}, {baseline_data.max():.1f}]")
print_step(f"加载随访: {followup_path.name}", "running")
followup_data, followup_img = loader.load_nifti(followup_path)
print(f" 形状: {followup_data.shape}")
print(f" 体素大小: {followup_img.header.get_zooms()[:3]}")
self.results["tests"]["dicom_loader"] = {
"status": "success",
"baseline_shape": list(baseline_data.shape),
"followup_shape": list(followup_data.shape)
}
print_step("DICOM/NIfTI 加载测试通过", "success")
return (baseline_path, baseline_data, baseline_img), (followup_path, followup_data, followup_img)
except Exception as e:
print_step(f"加载测试失败: {e}", "error")
traceback.print_exc()
self.results["tests"]["dicom_loader"] = {
"status": "error",
"error": str(e)
}
return None, None
def test_registration(self, baseline_path: Path, followup_path: Path):
"""测试图像配准"""
print_header("测试 2: 图像配准")
try:
from app.services.registration import ImageRegistrator
print_step("初始化配准器", "running")
registrator = ImageRegistrator()
print_step("ImageRegistrator 初始化成功", "success")
print_step("执行两级配准(刚性 + 非刚性)", "running")
print(" 这可能需要 30-60 秒...")
import time
start_time = time.time()
warped_path, transforms = registrator.register_files(
followup_path, # fixed
baseline_path, # moving
use_deformable=True
)
elapsed = time.time() - start_time
print(f" 配准完成!耗时: {elapsed:.1f} 秒")
print(f" 输出文件: {warped_path}")
print(f" 变换类型: {list(transforms.keys())}")
self.results["tests"]["registration"] = {
"status": "success",
"elapsed_seconds": elapsed,
"warped_path": str(warped_path),
"transforms": list(transforms.keys())
}
print_step("配准测试通过", "success")
return warped_path
except Exception as e:
print_step(f"配准测试失败: {e}", "error")
traceback.print_exc()
self.results["tests"]["registration"] = {
"status": "error",
"error": str(e)
}
return None
def test_change_detection(self, followup_data: np.ndarray, warped_path: Path):
"""测试变化检测"""
print_header("测试 3: 变化检测")
try:
from app.services.analysis import ChangeDetector
from app.services.dicom import DicomLoader
print_step("初始化变化检测器", "running")
detector = ChangeDetector()
loader = DicomLoader()
# 加载配准后的图像
warped_data, warped_img = loader.load_nifti(warped_path)
spacing = tuple(warped_img.header.get_zooms()[:3])
print_step("计算差分图", "running")
diff_map, significant = detector.compute_difference_map(
followup_data,
warped_data
)
print(f" 差分图范围: [{diff_map.min():.1f}, {diff_map.max():.1f}]")
print(f" 显著变化体素: {(significant != 0).sum():,}")
print_step("量化变化", "running")
changes = detector.quantify_changes(diff_map, significant, spacing=spacing)
print_dict(changes)
# 生成热力图
print_step("生成热力图", "running")
heatmap_path = self.output_dir / "diff_heatmap.png"
detector.generate_heatmap(significant, followup_data, heatmap_path)
print(f" 热力图保存至: {heatmap_path}")
self.results["tests"]["change_detection"] = {
"status": "success",
"changes": {k: float(v) if isinstance(v, (np.floating, np.integer)) else v
for k, v in changes.items()},
"heatmap_path": str(heatmap_path)
}
print_step("变化检测测试通过", "success")
return changes, significant
except Exception as e:
print_step(f"变化检测测试失败: {e}", "error")
traceback.print_exc()
self.results["tests"]["change_detection"] = {
"status": "error",
"error": str(e)
}
return None, None
def test_report_generation(self, change_results: dict = None):
"""测试报告生成"""
print_header("测试 4: 报告生成")
try:
from app.services.report import ReportGenerator
# 测试模板模式
print_step("测试模板模式报告生成", "running")
generator = ReportGenerator(llm_backend="template")
# 构造测试数据
baseline_findings = [{
"organ": "右肺上叶",
"location": "前段",
"max_diameter_mm": 15.5,
"volume_cc": 1.2,
"mean_hu": -25,
"shape": "类圆形",
"density_type": "磨玻璃"
}]
followup_findings = [{
"organ": "右肺上叶",
"location": "前段",
"max_diameter_mm": 12.3,
"volume_cc": 0.8,
"mean_hu": -20,
"shape": "类圆形",
"density_type": "磨玻璃"
}]
registration_results = {
"rigid": "completed",
"deformable": "completed",
"spacing": (1.0, 1.0, 1.0)
}
if change_results is None:
change_results = {
"changed_voxels": 15000,
"change_percent": 0.05,
"max_hu_increase": 50.0,
"max_hu_decrease": -45.0
}
# 生成纵向报告
report = generator.generate_longitudinal_report(
patient_id="TEST001",
baseline_date="2025-06-15",
followup_date="2026-01-28",
baseline_findings=baseline_findings,
followup_findings=followup_findings,
registration_results=registration_results,
change_results=change_results,
modality="CT"
)
# 保存报告
report_path = self.output_dir / "test_report.md"
generator.save_report(report, report_path, format="md")
print(f" 模板报告保存至: {report_path}")
# 生成 HTML 报告
html_path = self.output_dir / "test_report"
generator.save_report(report, html_path, format="html")
print(f" HTML 报告保存至: {html_path}.html")
# 测试 LLM 模式(如果可用)
print_step("测试 LLM 模式报告生成", "running")
try:
llm_generator = ReportGenerator(llm_backend="ollama")
llm_report = llm_generator.generate_longitudinal_report(
patient_id="TEST001",
baseline_date="2025-06-15",
followup_date="2026-01-28",
baseline_findings=baseline_findings,
followup_findings=followup_findings,
registration_results=registration_results,
change_results=change_results,
modality="CT"
)
llm_report_path = self.output_dir / "test_report_llm.md"
llm_generator.save_report(llm_report, llm_report_path, format="md")
print(f" LLM 报告保存至: {llm_report_path}")
print_step("LLM 模式测试通过", "success")
self.results["tests"]["report_generation"] = {
"status": "success",
"template_report_path": str(report_path),
"llm_report_path": str(llm_report_path),
"llm_available": True
}
except Exception as llm_error:
print_step(f"LLM 模式不可用: {llm_error}", "warning")
self.results["tests"]["report_generation"] = {
"status": "success",
"template_report_path": str(report_path),
"llm_available": False,
"llm_error": str(llm_error)
}
print_step("报告生成测试通过", "success")
return report_path
except Exception as e:
print_step(f"报告生成测试失败: {e}", "error")
traceback.print_exc()
self.results["tests"]["report_generation"] = {
"status": "error",
"error": str(e)
}
return None
def test_segmentation(self, nifti_path: Path):
"""测试器官分割(可选,耗时较长)"""
print_header("测试 5: 器官分割 (可选)")
# 询问是否运行
print("器官分割需要 GPU 且耗时较长(约 1-2 分钟)")
try:
from app.services.segmentation import OrganSegmentor
print_step("初始化分割器", "running")
segmentor = OrganSegmentor()
print_step("检查 GPU 状态", "running")
import torch
if torch.cuda.is_available():
gpu_name = torch.cuda.get_device_name(0)
gpu_mem = torch.cuda.get_device_properties(0).total_memory / 1e9
print(f" GPU: {gpu_name}")
print(f" 显存: {gpu_mem:.1f} GB")
# 检查可用显存
free_mem = (torch.cuda.get_device_properties(0).total_memory -
torch.cuda.memory_allocated()) / 1e9
print(f" 可用显存: {free_mem:.1f} GB")
if free_mem < 4.0:
print_step("显存不足(需要至少 4GB),跳过分割测试", "warning")
self.results["tests"]["segmentation"] = {
"status": "skipped",
"reason": "insufficient GPU memory"
}
return None
else:
print_step("GPU 不可用,跳过分割测试", "warning")
self.results["tests"]["segmentation"] = {
"status": "skipped",
"reason": "GPU not available"
}
return None
print_step("执行器官分割", "running")
print(" 这可能需要 1-2 分钟...")
import time
start_time = time.time()
seg_path, organ_paths = segmentor.segment_file(
nifti_path,
save_individual_organs=False
)
elapsed = time.time() - start_time
print(f" 分割完成!耗时: {elapsed:.1f} 秒")
print(f" 分割结果: {seg_path}")
self.results["tests"]["segmentation"] = {
"status": "success",
"elapsed_seconds": elapsed,
"seg_path": str(seg_path)
}
print_step("分割测试通过", "success")
return seg_path
except Exception as e:
print_step(f"分割测试失败: {e}", "error")
traceback.print_exc()
self.results["tests"]["segmentation"] = {
"status": "error",
"error": str(e)
}
return None
def run_all_tests(self, skip_segmentation: bool = True):
"""运行所有测试"""
print_header("NeuroScan AI 后端调试")
print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"输出目录: {self.output_dir}")
# 测试 1: 加载器
baseline_result, followup_result = self.test_dicom_loader()
if baseline_result is None or followup_result is None:
print_step("无法继续测试,需要有效的测试数据", "error")
self.save_results()
return
baseline_path, baseline_data, baseline_img = baseline_result
followup_path, followup_data, followup_img = followup_result
# 测试 2: 配准
warped_path = self.test_registration(baseline_path, followup_path)
# 测试 3: 变化检测
if warped_path:
change_results, significant = self.test_change_detection(followup_data, warped_path)
else:
change_results = None
# 测试 4: 报告生成
self.test_report_generation(change_results)
# 测试 5: 分割(可选)
if not skip_segmentation:
self.test_segmentation(baseline_path)
else:
print_step("跳过分割测试(使用 --with-segmentation 启用)", "info")
# 保存结果
self.save_results()
# 打印总结
self.print_summary()
def save_results(self):
"""保存测试结果"""
results_path = self.output_dir / "debug_results.json"
with open(results_path, 'w', encoding='utf-8') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
print(f"\n测试结果已保存至: {results_path}")
def print_summary(self):
"""打印测试总结"""
print_header("测试总结")
total = len(self.results["tests"])
passed = sum(1 for t in self.results["tests"].values() if t.get("status") == "success")
failed = sum(1 for t in self.results["tests"].values() if t.get("status") == "error")
skipped = sum(1 for t in self.results["tests"].values() if t.get("status") == "skipped")
print(f"\n总计: {total} 个测试")
print(f" ✅ 通过: {passed}")
print(f" ❌ 失败: {failed}")
print(f" ⏭️ 跳过: {skipped}")
if failed == 0:
print("\n🎉 所有测试通过!后端功能正常。")
else:
print("\n⚠️ 部分测试失败,请检查错误信息。")
print(f"\n输出文件位于: {self.output_dir}")
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description="NeuroScan AI 后端调试脚本")
parser.add_argument(
"--with-segmentation",
action="store_true",
help="包含分割测试(需要 GPU,耗时较长)"
)
args = parser.parse_args()
debugger = BackendDebugger()
debugger.run_all_tests(skip_segmentation=not args.with_segmentation)
if __name__ == "__main__":
main()