| | |
| | """ |
| | NeuroScan AI 后端调试脚本 |
| | |
| | 这个脚本用于独立测试后端的核心功能: |
| | 1. DICOM/NIfTI 加载 |
| | 2. 图像配准(刚性 + 非刚性) |
| | 3. 变化检测 |
| | 4. 报告生成(模板模式 + LLM 模式) |
| | |
| | 使用方法: |
| | python scripts/debug_backend.py |
| | |
| | 作者: NeuroScan AI Team |
| | 日期: 2026-01-28 |
| | """ |
| |
|
| | import os |
| | import sys |
| | from pathlib import Path |
| |
|
| | |
| | project_root = Path(__file__).resolve().parent.parent |
| | sys.path.insert(0, str(project_root)) |
| |
|
| | import numpy as np |
| | import nibabel as nib |
| | from datetime import datetime |
| | import json |
| | import traceback |
| |
|
| | |
| | os.environ['PYTHONIOENCODING'] = 'utf-8' |
| |
|
| |
|
| | def print_header(title: str): |
| | """打印标题""" |
| | print("\n" + "=" * 60) |
| | print(f" {title}") |
| | print("=" * 60) |
| |
|
| |
|
| | def print_step(step: str, status: str = "running"): |
| | """打印步骤""" |
| | symbols = { |
| | "running": "🔄", |
| | "success": "✅", |
| | "error": "❌", |
| | "info": "ℹ️", |
| | "warning": "⚠️" |
| | } |
| | print(f"\n{symbols.get(status, '•')} {step}") |
| |
|
| |
|
| | def print_dict(d: dict, indent: int = 2): |
| | """格式化打印字典""" |
| | for k, v in d.items(): |
| | if isinstance(v, float): |
| | print(f"{' ' * indent}{k}: {v:.4f}") |
| | else: |
| | print(f"{' ' * indent}{k}: {v}") |
| |
|
| |
|
| | class BackendDebugger: |
| | """后端调试器""" |
| | |
| | def __init__(self): |
| | self.data_dir = project_root / "data" / "processed" |
| | self.output_dir = project_root / "output" / "debug_results" |
| | self.output_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | self.results = { |
| | "timestamp": datetime.now().isoformat(), |
| | "tests": {} |
| | } |
| | |
| | def find_test_data(self): |
| | """查找可用的测试数据""" |
| | print_step("查找测试数据", "running") |
| | |
| | |
| | lung_dirs = list(self.data_dir.glob("real_lung_*")) |
| | |
| | if lung_dirs: |
| | print(f" 找到 {len(lung_dirs)} 个肺部 CT 数据集:") |
| | for d in lung_dirs: |
| | files = list(d.glob("*.nii.gz")) |
| | print(f" - {d.name}: {len(files)} 个文件") |
| | return lung_dirs[0] |
| | |
| | |
| | all_dirs = [d for d in self.data_dir.iterdir() if d.is_dir()] |
| | if all_dirs: |
| | print(f" 找到 {len(all_dirs)} 个数据目录") |
| | return all_dirs[0] |
| | |
| | return None |
| | |
| | def test_dicom_loader(self): |
| | """测试 DICOM/NIfTI 加载器""" |
| | print_header("测试 1: DICOM/NIfTI 加载器") |
| | |
| | try: |
| | from app.services.dicom import DicomLoader |
| | |
| | loader = DicomLoader() |
| | print_step("DicomLoader 初始化成功", "success") |
| | |
| | |
| | test_dir = self.find_test_data() |
| | if test_dir is None: |
| | print_step("未找到测试数据,跳过加载测试", "warning") |
| | return None, None |
| | |
| | |
| | baseline_path = test_dir / "baseline.nii.gz" |
| | followup_path = test_dir / "followup.nii.gz" |
| | |
| | if not baseline_path.exists(): |
| | |
| | nii_files = list(test_dir.glob("*.nii.gz")) |
| | if len(nii_files) >= 2: |
| | baseline_path = nii_files[0] |
| | followup_path = nii_files[1] |
| | elif len(nii_files) == 1: |
| | baseline_path = nii_files[0] |
| | followup_path = nii_files[0] |
| | |
| | print_step(f"加载基线: {baseline_path.name}", "running") |
| | baseline_data, baseline_img = loader.load_nifti(baseline_path) |
| | print(f" 形状: {baseline_data.shape}") |
| | print(f" 体素大小: {baseline_img.header.get_zooms()[:3]}") |
| | print(f" 数据范围: [{baseline_data.min():.1f}, {baseline_data.max():.1f}]") |
| | |
| | print_step(f"加载随访: {followup_path.name}", "running") |
| | followup_data, followup_img = loader.load_nifti(followup_path) |
| | print(f" 形状: {followup_data.shape}") |
| | print(f" 体素大小: {followup_img.header.get_zooms()[:3]}") |
| | |
| | self.results["tests"]["dicom_loader"] = { |
| | "status": "success", |
| | "baseline_shape": list(baseline_data.shape), |
| | "followup_shape": list(followup_data.shape) |
| | } |
| | |
| | print_step("DICOM/NIfTI 加载测试通过", "success") |
| | return (baseline_path, baseline_data, baseline_img), (followup_path, followup_data, followup_img) |
| | |
| | except Exception as e: |
| | print_step(f"加载测试失败: {e}", "error") |
| | traceback.print_exc() |
| | self.results["tests"]["dicom_loader"] = { |
| | "status": "error", |
| | "error": str(e) |
| | } |
| | return None, None |
| | |
| | def test_registration(self, baseline_path: Path, followup_path: Path): |
| | """测试图像配准""" |
| | print_header("测试 2: 图像配准") |
| | |
| | try: |
| | from app.services.registration import ImageRegistrator |
| | |
| | print_step("初始化配准器", "running") |
| | registrator = ImageRegistrator() |
| | print_step("ImageRegistrator 初始化成功", "success") |
| | |
| | print_step("执行两级配准(刚性 + 非刚性)", "running") |
| | print(" 这可能需要 30-60 秒...") |
| | |
| | import time |
| | start_time = time.time() |
| | |
| | warped_path, transforms = registrator.register_files( |
| | followup_path, |
| | baseline_path, |
| | use_deformable=True |
| | ) |
| | |
| | elapsed = time.time() - start_time |
| | |
| | print(f" 配准完成!耗时: {elapsed:.1f} 秒") |
| | print(f" 输出文件: {warped_path}") |
| | print(f" 变换类型: {list(transforms.keys())}") |
| | |
| | self.results["tests"]["registration"] = { |
| | "status": "success", |
| | "elapsed_seconds": elapsed, |
| | "warped_path": str(warped_path), |
| | "transforms": list(transforms.keys()) |
| | } |
| | |
| | print_step("配准测试通过", "success") |
| | return warped_path |
| | |
| | except Exception as e: |
| | print_step(f"配准测试失败: {e}", "error") |
| | traceback.print_exc() |
| | self.results["tests"]["registration"] = { |
| | "status": "error", |
| | "error": str(e) |
| | } |
| | return None |
| | |
| | def test_change_detection(self, followup_data: np.ndarray, warped_path: Path): |
| | """测试变化检测""" |
| | print_header("测试 3: 变化检测") |
| | |
| | try: |
| | from app.services.analysis import ChangeDetector |
| | from app.services.dicom import DicomLoader |
| | |
| | print_step("初始化变化检测器", "running") |
| | detector = ChangeDetector() |
| | loader = DicomLoader() |
| | |
| | |
| | warped_data, warped_img = loader.load_nifti(warped_path) |
| | spacing = tuple(warped_img.header.get_zooms()[:3]) |
| | |
| | print_step("计算差分图", "running") |
| | diff_map, significant = detector.compute_difference_map( |
| | followup_data, |
| | warped_data |
| | ) |
| | |
| | print(f" 差分图范围: [{diff_map.min():.1f}, {diff_map.max():.1f}]") |
| | print(f" 显著变化体素: {(significant != 0).sum():,}") |
| | |
| | print_step("量化变化", "running") |
| | changes = detector.quantify_changes(diff_map, significant, spacing=spacing) |
| | print_dict(changes) |
| | |
| | |
| | print_step("生成热力图", "running") |
| | heatmap_path = self.output_dir / "diff_heatmap.png" |
| | detector.generate_heatmap(significant, followup_data, heatmap_path) |
| | print(f" 热力图保存至: {heatmap_path}") |
| | |
| | self.results["tests"]["change_detection"] = { |
| | "status": "success", |
| | "changes": {k: float(v) if isinstance(v, (np.floating, np.integer)) else v |
| | for k, v in changes.items()}, |
| | "heatmap_path": str(heatmap_path) |
| | } |
| | |
| | print_step("变化检测测试通过", "success") |
| | return changes, significant |
| | |
| | except Exception as e: |
| | print_step(f"变化检测测试失败: {e}", "error") |
| | traceback.print_exc() |
| | self.results["tests"]["change_detection"] = { |
| | "status": "error", |
| | "error": str(e) |
| | } |
| | return None, None |
| | |
| | def test_report_generation(self, change_results: dict = None): |
| | """测试报告生成""" |
| | print_header("测试 4: 报告生成") |
| | |
| | try: |
| | from app.services.report import ReportGenerator |
| | |
| | |
| | print_step("测试模板模式报告生成", "running") |
| | generator = ReportGenerator(llm_backend="template") |
| | |
| | |
| | baseline_findings = [{ |
| | "organ": "右肺上叶", |
| | "location": "前段", |
| | "max_diameter_mm": 15.5, |
| | "volume_cc": 1.2, |
| | "mean_hu": -25, |
| | "shape": "类圆形", |
| | "density_type": "磨玻璃" |
| | }] |
| | |
| | followup_findings = [{ |
| | "organ": "右肺上叶", |
| | "location": "前段", |
| | "max_diameter_mm": 12.3, |
| | "volume_cc": 0.8, |
| | "mean_hu": -20, |
| | "shape": "类圆形", |
| | "density_type": "磨玻璃" |
| | }] |
| | |
| | registration_results = { |
| | "rigid": "completed", |
| | "deformable": "completed", |
| | "spacing": (1.0, 1.0, 1.0) |
| | } |
| | |
| | if change_results is None: |
| | change_results = { |
| | "changed_voxels": 15000, |
| | "change_percent": 0.05, |
| | "max_hu_increase": 50.0, |
| | "max_hu_decrease": -45.0 |
| | } |
| | |
| | |
| | report = generator.generate_longitudinal_report( |
| | patient_id="TEST001", |
| | baseline_date="2025-06-15", |
| | followup_date="2026-01-28", |
| | baseline_findings=baseline_findings, |
| | followup_findings=followup_findings, |
| | registration_results=registration_results, |
| | change_results=change_results, |
| | modality="CT" |
| | ) |
| | |
| | |
| | report_path = self.output_dir / "test_report.md" |
| | generator.save_report(report, report_path, format="md") |
| | print(f" 模板报告保存至: {report_path}") |
| | |
| | |
| | html_path = self.output_dir / "test_report" |
| | generator.save_report(report, html_path, format="html") |
| | print(f" HTML 报告保存至: {html_path}.html") |
| | |
| | |
| | print_step("测试 LLM 模式报告生成", "running") |
| | try: |
| | llm_generator = ReportGenerator(llm_backend="ollama") |
| | |
| | llm_report = llm_generator.generate_longitudinal_report( |
| | patient_id="TEST001", |
| | baseline_date="2025-06-15", |
| | followup_date="2026-01-28", |
| | baseline_findings=baseline_findings, |
| | followup_findings=followup_findings, |
| | registration_results=registration_results, |
| | change_results=change_results, |
| | modality="CT" |
| | ) |
| | |
| | llm_report_path = self.output_dir / "test_report_llm.md" |
| | llm_generator.save_report(llm_report, llm_report_path, format="md") |
| | print(f" LLM 报告保存至: {llm_report_path}") |
| | print_step("LLM 模式测试通过", "success") |
| | |
| | self.results["tests"]["report_generation"] = { |
| | "status": "success", |
| | "template_report_path": str(report_path), |
| | "llm_report_path": str(llm_report_path), |
| | "llm_available": True |
| | } |
| | |
| | except Exception as llm_error: |
| | print_step(f"LLM 模式不可用: {llm_error}", "warning") |
| | self.results["tests"]["report_generation"] = { |
| | "status": "success", |
| | "template_report_path": str(report_path), |
| | "llm_available": False, |
| | "llm_error": str(llm_error) |
| | } |
| | |
| | print_step("报告生成测试通过", "success") |
| | return report_path |
| | |
| | except Exception as e: |
| | print_step(f"报告生成测试失败: {e}", "error") |
| | traceback.print_exc() |
| | self.results["tests"]["report_generation"] = { |
| | "status": "error", |
| | "error": str(e) |
| | } |
| | return None |
| | |
| | def test_segmentation(self, nifti_path: Path): |
| | """测试器官分割(可选,耗时较长)""" |
| | print_header("测试 5: 器官分割 (可选)") |
| | |
| | |
| | print("器官分割需要 GPU 且耗时较长(约 1-2 分钟)") |
| | |
| | try: |
| | from app.services.segmentation import OrganSegmentor |
| | |
| | print_step("初始化分割器", "running") |
| | segmentor = OrganSegmentor() |
| | |
| | print_step("检查 GPU 状态", "running") |
| | import torch |
| | if torch.cuda.is_available(): |
| | gpu_name = torch.cuda.get_device_name(0) |
| | gpu_mem = torch.cuda.get_device_properties(0).total_memory / 1e9 |
| | print(f" GPU: {gpu_name}") |
| | print(f" 显存: {gpu_mem:.1f} GB") |
| | |
| | |
| | free_mem = (torch.cuda.get_device_properties(0).total_memory - |
| | torch.cuda.memory_allocated()) / 1e9 |
| | print(f" 可用显存: {free_mem:.1f} GB") |
| | |
| | if free_mem < 4.0: |
| | print_step("显存不足(需要至少 4GB),跳过分割测试", "warning") |
| | self.results["tests"]["segmentation"] = { |
| | "status": "skipped", |
| | "reason": "insufficient GPU memory" |
| | } |
| | return None |
| | else: |
| | print_step("GPU 不可用,跳过分割测试", "warning") |
| | self.results["tests"]["segmentation"] = { |
| | "status": "skipped", |
| | "reason": "GPU not available" |
| | } |
| | return None |
| | |
| | print_step("执行器官分割", "running") |
| | print(" 这可能需要 1-2 分钟...") |
| | |
| | import time |
| | start_time = time.time() |
| | |
| | seg_path, organ_paths = segmentor.segment_file( |
| | nifti_path, |
| | save_individual_organs=False |
| | ) |
| | |
| | elapsed = time.time() - start_time |
| | |
| | print(f" 分割完成!耗时: {elapsed:.1f} 秒") |
| | print(f" 分割结果: {seg_path}") |
| | |
| | self.results["tests"]["segmentation"] = { |
| | "status": "success", |
| | "elapsed_seconds": elapsed, |
| | "seg_path": str(seg_path) |
| | } |
| | |
| | print_step("分割测试通过", "success") |
| | return seg_path |
| | |
| | except Exception as e: |
| | print_step(f"分割测试失败: {e}", "error") |
| | traceback.print_exc() |
| | self.results["tests"]["segmentation"] = { |
| | "status": "error", |
| | "error": str(e) |
| | } |
| | return None |
| | |
| | def run_all_tests(self, skip_segmentation: bool = True): |
| | """运行所有测试""" |
| | print_header("NeuroScan AI 后端调试") |
| | print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
| | print(f"输出目录: {self.output_dir}") |
| | |
| | |
| | baseline_result, followup_result = self.test_dicom_loader() |
| | |
| | if baseline_result is None or followup_result is None: |
| | print_step("无法继续测试,需要有效的测试数据", "error") |
| | self.save_results() |
| | return |
| | |
| | baseline_path, baseline_data, baseline_img = baseline_result |
| | followup_path, followup_data, followup_img = followup_result |
| | |
| | |
| | warped_path = self.test_registration(baseline_path, followup_path) |
| | |
| | |
| | if warped_path: |
| | change_results, significant = self.test_change_detection(followup_data, warped_path) |
| | else: |
| | change_results = None |
| | |
| | |
| | self.test_report_generation(change_results) |
| | |
| | |
| | if not skip_segmentation: |
| | self.test_segmentation(baseline_path) |
| | else: |
| | print_step("跳过分割测试(使用 --with-segmentation 启用)", "info") |
| | |
| | |
| | self.save_results() |
| | |
| | |
| | self.print_summary() |
| | |
| | def save_results(self): |
| | """保存测试结果""" |
| | results_path = self.output_dir / "debug_results.json" |
| | with open(results_path, 'w', encoding='utf-8') as f: |
| | json.dump(self.results, f, indent=2, ensure_ascii=False) |
| | print(f"\n测试结果已保存至: {results_path}") |
| | |
| | def print_summary(self): |
| | """打印测试总结""" |
| | print_header("测试总结") |
| | |
| | total = len(self.results["tests"]) |
| | passed = sum(1 for t in self.results["tests"].values() if t.get("status") == "success") |
| | failed = sum(1 for t in self.results["tests"].values() if t.get("status") == "error") |
| | skipped = sum(1 for t in self.results["tests"].values() if t.get("status") == "skipped") |
| | |
| | print(f"\n总计: {total} 个测试") |
| | print(f" ✅ 通过: {passed}") |
| | print(f" ❌ 失败: {failed}") |
| | print(f" ⏭️ 跳过: {skipped}") |
| | |
| | if failed == 0: |
| | print("\n🎉 所有测试通过!后端功能正常。") |
| | else: |
| | print("\n⚠️ 部分测试失败,请检查错误信息。") |
| | |
| | print(f"\n输出文件位于: {self.output_dir}") |
| |
|
| |
|
| | def main(): |
| | """主函数""" |
| | import argparse |
| | |
| | parser = argparse.ArgumentParser(description="NeuroScan AI 后端调试脚本") |
| | parser.add_argument( |
| | "--with-segmentation", |
| | action="store_true", |
| | help="包含分割测试(需要 GPU,耗时较长)" |
| | ) |
| | |
| | args = parser.parse_args() |
| | |
| | debugger = BackendDebugger() |
| | debugger.run_all_tests(skip_segmentation=not args.with_segmentation) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|