| |
| """ |
| NeuroScan AI 端到端测试用例 |
| 模拟完整的诊断流程 |
| """ |
|
|
| import os |
| import sys |
| import json |
| import time |
| import requests |
| import numpy as np |
| import nibabel as nib |
| import tempfile |
| import zipfile |
| import pydicom |
| from pathlib import Path |
| from datetime import datetime |
| from pydicom.dataset import Dataset, FileDataset |
| from pydicom.uid import generate_uid |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent.parent)) |
|
|
| |
| BASE_URL = "http://localhost:8080" |
| API_PREFIX = "/api/v1" |
|
|
| |
| TEST_RESULTS_DIR = Path(__file__).parent / "results" |
| TEST_RESULTS_DIR.mkdir(exist_ok=True) |
|
|
| |
| TEST_DATA_DIR = Path(__file__).parent / "test_data" |
| TEST_DATA_DIR.mkdir(exist_ok=True) |
|
|
|
|
| def log_result(test_name: str, success: bool, message: str, data: dict = None): |
| """记录测试结果""" |
| result = { |
| "test_name": test_name, |
| "success": success, |
| "message": message, |
| "timestamp": datetime.now().isoformat(), |
| "data": data |
| } |
| |
| result_file = TEST_RESULTS_DIR / f"e2e_{test_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
| with open(result_file, 'w', encoding='utf-8') as f: |
| json.dump(result, f, ensure_ascii=False, indent=2, default=str) |
| |
| status = "✅ PASS" if success else "❌ FAIL" |
| print(f"{status} - {test_name}: {message}") |
| return result |
|
|
|
|
| def create_synthetic_dicom_series( |
| output_dir: Path, |
| patient_id: str = "TEST_PATIENT", |
| study_date: str = "20260124", |
| series_description: str = "Test CT Series", |
| num_slices: int = 32, |
| image_size: int = 64, |
| with_lesion: bool = False, |
| lesion_size: int = 5 |
| ): |
| """ |
| 创建合成的 DICOM 系列用于测试 |
| """ |
| output_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| study_uid = generate_uid() |
| series_uid = generate_uid() |
| frame_of_ref_uid = generate_uid() |
| |
| |
| volume = np.random.randint(-1000, -900, (num_slices, image_size, image_size), dtype=np.int16) |
| |
| |
| cx, cy, cz = image_size // 2, image_size // 2, num_slices // 2 |
| for z in range(num_slices): |
| for y in range(image_size): |
| for x in range(image_size): |
| dist = np.sqrt((x - cx)**2 + (y - cy)**2 + (z - cz)**2) |
| if dist < 20: |
| volume[z, y, x] = 50 |
| |
| |
| if with_lesion: |
| for z in range(num_slices): |
| for y in range(image_size): |
| for x in range(image_size): |
| dist = np.sqrt((x - cx)**2 + (y - cy)**2 + (z - cz)**2) |
| if dist < lesion_size: |
| volume[z, y, x] = 100 |
| |
| dicom_files = [] |
| |
| for i in range(num_slices): |
| |
| file_meta = pydicom.Dataset() |
| file_meta.MediaStorageSOPClassUID = pydicom.uid.CTImageStorage |
| file_meta.MediaStorageSOPInstanceUID = generate_uid() |
| file_meta.TransferSyntaxUID = pydicom.uid.ExplicitVRLittleEndian |
| |
| |
| ds = FileDataset( |
| None, {}, |
| file_meta=file_meta, |
| preamble=b"\0" * 128 |
| ) |
| |
| |
| ds.PatientName = f"Test^Patient^{patient_id}" |
| ds.PatientID = patient_id |
| ds.PatientBirthDate = "19800101" |
| ds.PatientSex = "M" |
| |
| |
| ds.StudyInstanceUID = study_uid |
| ds.StudyDate = study_date |
| ds.StudyTime = "120000" |
| ds.StudyDescription = "Test CT Study" |
| ds.AccessionNumber = "TEST001" |
| |
| |
| ds.SeriesInstanceUID = series_uid |
| ds.SeriesNumber = 1 |
| ds.SeriesDescription = series_description |
| ds.Modality = "CT" |
| |
| |
| ds.Manufacturer = "NeuroScan Test" |
| ds.ManufacturerModelName = "Test Scanner" |
| |
| |
| ds.SOPClassUID = pydicom.uid.CTImageStorage |
| ds.SOPInstanceUID = file_meta.MediaStorageSOPInstanceUID |
| ds.InstanceNumber = i + 1 |
| ds.ImageType = ["ORIGINAL", "PRIMARY", "AXIAL"] |
| |
| |
| ds.FrameOfReferenceUID = frame_of_ref_uid |
| ds.ImagePositionPatient = [0.0, 0.0, float(i * 2.5)] |
| ds.ImageOrientationPatient = [1.0, 0.0, 0.0, 0.0, 1.0, 0.0] |
| |
| |
| ds.Rows = image_size |
| ds.Columns = image_size |
| ds.PixelSpacing = [1.0, 1.0] |
| ds.SliceThickness = 2.5 |
| ds.SliceLocation = float(i * 2.5) |
| |
| |
| ds.BitsAllocated = 16 |
| ds.BitsStored = 16 |
| ds.HighBit = 15 |
| ds.PixelRepresentation = 1 |
| ds.SamplesPerPixel = 1 |
| ds.PhotometricInterpretation = "MONOCHROME2" |
| |
| |
| ds.RescaleIntercept = 0 |
| ds.RescaleSlope = 1 |
| ds.RescaleType = "HU" |
| |
| |
| ds.WindowCenter = 40 |
| ds.WindowWidth = 400 |
| |
| |
| ds.PixelData = volume[i].tobytes() |
| |
| |
| filename = output_dir / f"slice_{i:04d}.dcm" |
| ds.save_as(filename) |
| dicom_files.append(filename) |
| |
| return dicom_files, volume |
|
|
|
|
| class TestE2ESingleAnalysis: |
| """端到端单次分析测试""" |
| |
| def __init__(self): |
| self.session = requests.Session() |
| self.session.trust_env = False |
| |
| def run(self): |
| print("\n" + "="*60) |
| print("端到端测试 1: 单次扫描分析流程") |
| print("="*60) |
| |
| try: |
| |
| print(" 步骤 1: 创建合成 DICOM 数据...") |
| dicom_dir = TEST_DATA_DIR / "e2e_single_scan" |
| dicom_files, volume = create_synthetic_dicom_series( |
| dicom_dir, |
| patient_id="E2E_SINGLE_001", |
| with_lesion=True, |
| lesion_size=5 |
| ) |
| print(f" 创建了 {len(dicom_files)} 个 DICOM 文件") |
| |
| |
| print(" 步骤 2: 打包 DICOM 为 ZIP...") |
| zip_path = TEST_DATA_DIR / "e2e_single_scan.zip" |
| with zipfile.ZipFile(zip_path, 'w') as zf: |
| for dcm_file in dicom_files: |
| zf.write(dcm_file, dcm_file.name) |
| print(f" ZIP 文件大小: {zip_path.stat().st_size / 1024:.1f} KB") |
| |
| |
| print(" 步骤 3: 上传 DICOM 到 API...") |
| with open(zip_path, 'rb') as f: |
| files = {'file': ('e2e_single_scan.zip', f, 'application/zip')} |
| data = { |
| 'patient_id': 'E2E_SINGLE_001', |
| 'study_date': '2026-01-24' |
| } |
| response = self.session.post(f"{BASE_URL}{API_PREFIX}/ingest", files=files, data=data) |
| |
| if response.status_code != 200: |
| |
| print(f" 上传响应: {response.status_code}") |
| |
| |
| print(" 步骤 4: 验证核心服务...") |
| from app.services.dicom.windowing import apply_ct_window |
| from app.services.analysis.feature_extractor import FeatureExtractor |
| from app.services.analysis.roi_extractor import ROIExtractor |
| |
| |
| windowed = apply_ct_window(volume.astype(np.float32), window_center=40, window_width=400) |
| |
| |
| mask = (volume > 80).astype(np.uint8) |
| |
| |
| extractor = FeatureExtractor(spacing=(1.0, 1.0, 2.5)) |
| if mask.sum() > 0: |
| nodule_finding = extractor.extract_features( |
| volume.astype(np.float32), |
| mask, |
| nodule_id="e2e_test_001", |
| organ="lung", |
| location="test" |
| ) |
| features = { |
| "volume_cc": nodule_finding.volume_cc, |
| "max_diameter_mm": nodule_finding.max_diameter_mm, |
| "mean_hu": nodule_finding.mean_hu |
| } |
| print(f" 提取特征: 体积={nodule_finding.volume_cc:.2f} cc") |
| |
| |
| |
| |
| return log_result("e2e_single_analysis", True, "单次分析端到端测试完成", { |
| "dicom_files_created": len(dicom_files), |
| "volume_shape": list(volume.shape), |
| "lesion_voxels": int(mask.sum()), |
| "features": features if mask.sum() > 0 else None |
| }) |
| |
| except Exception as e: |
| import traceback |
| return log_result("e2e_single_analysis", False, f"错误: {str(e)}", { |
| "traceback": traceback.format_exc() |
| }) |
|
|
|
|
| class TestE2ELongitudinalAnalysis: |
| """端到端纵向对比分析测试""" |
| |
| def __init__(self): |
| self.session = requests.Session() |
| self.session.trust_env = False |
| |
| def run(self): |
| print("\n" + "="*60) |
| print("端到端测试 2: 纵向对比分析流程") |
| print("="*60) |
| |
| try: |
| |
| print(" 步骤 1: 创建基线扫描...") |
| baseline_dir = TEST_DATA_DIR / "e2e_baseline" |
| baseline_files, baseline_volume = create_synthetic_dicom_series( |
| baseline_dir, |
| patient_id="E2E_LONG_001", |
| study_date="20250724", |
| series_description="Baseline CT", |
| with_lesion=True, |
| lesion_size=5 |
| ) |
| print(f" 基线: {len(baseline_files)} 个 DICOM 文件") |
| |
| |
| print(" 步骤 2: 创建随访扫描(病灶增大)...") |
| followup_dir = TEST_DATA_DIR / "e2e_followup" |
| followup_files, followup_volume = create_synthetic_dicom_series( |
| followup_dir, |
| patient_id="E2E_LONG_001", |
| study_date="20260124", |
| series_description="Follow-up CT", |
| with_lesion=True, |
| lesion_size=8 |
| ) |
| print(f" 随访: {len(followup_files)} 个 DICOM 文件") |
| |
| |
| print(" 步骤 3: 执行图像配准...") |
| from app.services.registration.registrator import ImageRegistrator |
| |
| registrator = ImageRegistrator() |
| |
| |
| registered_array, transform = registrator.rigid_registration( |
| followup_volume.astype(np.float32), |
| baseline_volume.astype(np.float32), |
| spacing=(1.0, 1.0, 2.5) |
| ) |
| print(f" 配准完成,输出形状: {registered_array.shape}") |
| |
| |
| print(" 步骤 4: 计算变化...") |
| from app.services.analysis.change_detector import ChangeDetector |
| |
| detector = ChangeDetector() |
| diff_map, significant_changes = detector.compute_difference_map( |
| followup_volume.astype(np.float32), |
| registered_array |
| ) |
| |
| |
| segmentation = np.zeros_like(baseline_volume, dtype=np.uint8) |
| cx, cy, cz = 32, 32, 16 |
| segmentation[cx-10:cx+10, cy-10:cy+10, cz-10:cz+10] = 1 |
| |
| changes = detector.quantify_changes( |
| followup_volume.astype(np.float32), |
| registered_array, |
| segmentation, |
| roi_label=1, |
| spacing=(1.0, 1.0, 2.5) |
| ) |
| print(f" 变化量化: HU 变化={changes.get('hu_change', 0):.1f}") |
| |
| |
| print(" 步骤 5: RECIST 评估...") |
| |
| |
| baseline_mask = (baseline_volume > 80).astype(np.uint8) |
| followup_mask = (registered_array > 80).astype(np.uint8) |
| |
| from app.services.analysis.feature_extractor import FeatureExtractor |
| extractor = FeatureExtractor(spacing=(1.0, 1.0, 2.5)) |
| |
| baseline_finding = extractor.extract_features( |
| baseline_volume.astype(np.float32), |
| baseline_mask, |
| nodule_id="baseline_001", |
| organ="lung", |
| location="test" |
| ) |
| |
| followup_finding = extractor.extract_features( |
| registered_array, |
| followup_mask, |
| nodule_id="followup_001", |
| organ="lung", |
| location="test" |
| ) |
| |
| |
| baseline_diameter = baseline_finding.max_diameter_mm |
| followup_diameter = followup_finding.max_diameter_mm |
| |
| if baseline_diameter > 0: |
| diameter_change_percent = ((followup_diameter - baseline_diameter) / baseline_diameter) * 100 |
| else: |
| diameter_change_percent = 0 |
| |
| |
| if diameter_change_percent >= 20: |
| recist_response = "PD (Progressive Disease)" |
| elif diameter_change_percent <= -30: |
| recist_response = "PR (Partial Response)" |
| elif followup_diameter == 0: |
| recist_response = "CR (Complete Response)" |
| else: |
| recist_response = "SD (Stable Disease)" |
| |
| print(f" 基线直径: {baseline_diameter:.1f} mm") |
| print(f" 随访直径: {followup_diameter:.1f} mm") |
| print(f" 变化: {diameter_change_percent:.1f}%") |
| print(f" RECIST 评估: {recist_response}") |
| |
| return log_result("e2e_longitudinal_analysis", True, "纵向分析端到端测试完成", { |
| "baseline_files": len(baseline_files), |
| "followup_files": len(followup_files), |
| "baseline_diameter_mm": baseline_diameter, |
| "followup_diameter_mm": followup_diameter, |
| "diameter_change_percent": diameter_change_percent, |
| "recist_response": recist_response, |
| "changes": changes |
| }) |
| |
| except Exception as e: |
| import traceback |
| return log_result("e2e_longitudinal_analysis", False, f"错误: {str(e)}", { |
| "traceback": traceback.format_exc() |
| }) |
|
|
|
|
| class TestE2EReportGeneration: |
| """端到端报告生成测试(模拟)""" |
| |
| def run(self): |
| print("\n" + "="*60) |
| print("端到端测试 3: 报告生成流程") |
| print("="*60) |
| |
| try: |
| |
| analysis_result = { |
| "patient_id": "E2E_REPORT_001", |
| "study_date": "2026-01-24", |
| "findings": [ |
| { |
| "location": "右肺上叶", |
| "type": "结节", |
| "size_mm": 12.5, |
| "hu_mean": 35, |
| "characteristics": "边界清晰,密度均匀" |
| }, |
| { |
| "location": "肝脏 S7 段", |
| "type": "低密度灶", |
| "size_mm": 8.2, |
| "hu_mean": 25, |
| "characteristics": "边界模糊" |
| } |
| ], |
| "recist_response": "SD", |
| "measurements": { |
| "target_lesion_sum": 20.7, |
| "baseline_sum": 19.5, |
| "change_percent": 6.2 |
| } |
| } |
| |
| |
| report_template = """ |
| ================================================================================ |
| NeuroScan AI 影像诊断报告 |
| ================================================================================ |
| |
| 患者信息 |
| -------- |
| 患者 ID: {patient_id} |
| 检查日期: {study_date} |
| |
| 影像发现 |
| -------- |
| {findings_text} |
| |
| RECIST 1.1 评估 |
| --------------- |
| 疗效评估: {recist_response} |
| 靶病灶径线和: {target_sum:.1f} mm |
| 基线径线和: {baseline_sum:.1f} mm |
| 变化率: {change_percent:.1f}% |
| |
| 诊断意见 |
| -------- |
| 根据 RECIST 1.1 标准,本次检查显示疾病{status}。 |
| 建议{recommendation}。 |
| |
| -------------------------------------------------------------------------------- |
| 报告生成时间: {report_time} |
| 本报告由 NeuroScan AI 系统自动生成,仅供参考,最终诊断请以临床医师意见为准。 |
| ================================================================================ |
| """ |
| |
| |
| findings_text = "" |
| for i, finding in enumerate(analysis_result["findings"], 1): |
| findings_text += f""" |
| {i}. {finding['location']} - {finding['type']} |
| - 大小: {finding['size_mm']:.1f} mm |
| - 平均 HU 值: {finding['hu_mean']} |
| - 特征: {finding['characteristics']} |
| """ |
| |
| |
| recist = analysis_result["recist_response"] |
| if recist == "PD": |
| status = "进展" |
| recommendation = "密切随访,考虑调整治疗方案" |
| elif recist == "PR": |
| status = "部分缓解" |
| recommendation = "继续当前治疗方案,定期复查" |
| elif recist == "CR": |
| status = "完全缓解" |
| recommendation = "定期随访监测" |
| else: |
| status = "稳定" |
| recommendation = "继续观察,3个月后复查" |
| |
| |
| report = report_template.format( |
| patient_id=analysis_result["patient_id"], |
| study_date=analysis_result["study_date"], |
| findings_text=findings_text, |
| recist_response=recist, |
| target_sum=analysis_result["measurements"]["target_lesion_sum"], |
| baseline_sum=analysis_result["measurements"]["baseline_sum"], |
| change_percent=analysis_result["measurements"]["change_percent"], |
| status=status, |
| recommendation=recommendation, |
| report_time=datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| ) |
| |
| |
| report_file = TEST_RESULTS_DIR / f"e2e_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" |
| with open(report_file, 'w', encoding='utf-8') as f: |
| f.write(report) |
| |
| print(report) |
| |
| return log_result("e2e_report_generation", True, "报告生成测试完成", { |
| "report_file": str(report_file), |
| "findings_count": len(analysis_result["findings"]), |
| "recist_response": recist |
| }) |
| |
| except Exception as e: |
| import traceback |
| return log_result("e2e_report_generation", False, f"错误: {str(e)}", { |
| "traceback": traceback.format_exc() |
| }) |
|
|
|
|
| def run_all_tests(): |
| """运行所有端到端测试""" |
| print("\n" + "="*60) |
| print("NeuroScan AI 端到端测试") |
| print(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
| print("="*60) |
| |
| tests = [ |
| TestE2ESingleAnalysis(), |
| TestE2ELongitudinalAnalysis(), |
| TestE2EReportGeneration(), |
| ] |
| |
| results = [] |
| passed = 0 |
| failed = 0 |
| |
| for test in tests: |
| try: |
| result = test.run() |
| results.append(result) |
| if result["success"]: |
| passed += 1 |
| else: |
| failed += 1 |
| except Exception as e: |
| print(f"❌ 测试异常: {str(e)}") |
| failed += 1 |
| |
| |
| print("\n" + "="*60) |
| print("端到端测试总结") |
| print("="*60) |
| print(f"总计: {len(tests)} 个测试") |
| print(f"通过: {passed} ✅") |
| print(f"失败: {failed} ❌") |
| print(f"通过率: {passed/len(tests)*100:.1f}%") |
| |
| |
| summary = { |
| "timestamp": datetime.now().isoformat(), |
| "total_tests": len(tests), |
| "passed": passed, |
| "failed": failed, |
| "pass_rate": f"{passed/len(tests)*100:.1f}%", |
| "results": results |
| } |
| |
| summary_file = TEST_RESULTS_DIR / f"e2e_test_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
| with open(summary_file, 'w', encoding='utf-8') as f: |
| json.dump(summary, f, ensure_ascii=False, indent=2, default=str) |
| |
| print(f"\n测试结果已保存到: {TEST_RESULTS_DIR}") |
| print(f"测试数据保存到: {TEST_DATA_DIR}") |
| |
| return passed == len(tests) |
|
|
|
|
| if __name__ == "__main__": |
| success = run_all_tests() |
| sys.exit(0 if success else 1) |
|
|
|
|