| # Example Usage - Pneumonia Consolidation Segmentation | |
| This notebook demonstrates how to use the pneumonia consolidation segmentation tools. | |
| ## Setup | |
| ```python | |
| import sys | |
| import cv2 | |
| import numpy as np | |
| from pathlib import Path | |
| import matplotlib.pyplot as plt | |
| # Add parent directory to path | |
| sys.path.append('..') | |
| # Import our modules | |
| from preprocessing_consolidation import enhance_consolidation | |
| from dice_calculator_app import ( | |
| calculate_dice_coefficient, | |
| calculate_iou, | |
| calculate_precision_recall, | |
| create_overlay_visualization | |
| ) | |
| ``` | |
| ## 1. Preprocessing Images | |
| ### Enhance a single image to see consolidation better | |
| ```python | |
| # Path to your chest X-ray | |
| input_image = "../data/Pacientes/7035909/7035909_20240326.jpg" | |
| output_image = "../dice/enhanced_images/7035909_enhanced.jpg" | |
| # Enhance the image | |
| enhanced = enhance_consolidation(input_image, output_image) | |
| # Visualize comparison | |
| fig, axes = plt.subplots(1, 2, figsize=(12, 6)) | |
| original = cv2.imread(input_image, cv2.IMREAD_GRAYSCALE) | |
| axes[0].imshow(original, cmap='gray') | |
| axes[0].set_title('Original X-ray') | |
| axes[0].axis('off') | |
| axes[1].imshow(enhanced, cmap='gray') | |
| axes[1].set_title('Enhanced (CLAHE + Sharpening)') | |
| axes[1].axis('off') | |
| plt.tight_layout() | |
| plt.show() | |
| ``` | |
| ### Batch process multiple images | |
| ```python | |
| from preprocessing_consolidation import batch_enhance_consolidation | |
| # Process all patient images | |
| input_dir = "../data/Pacientes/" | |
| output_dir = "../dice/enhanced_images/" | |
| batch_enhance_consolidation(input_dir, output_dir, image_extension='.jpg') | |
| ``` | |
| ## 2. Create Sample Masks for Testing | |
| Let's create some sample masks to demonstrate the Dice calculation. | |
| ```python | |
| def create_sample_masks(image_path): | |
| """Create sample ground truth and prediction masks for demo.""" | |
| # Load image | |
| img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) | |
| h, w = img.shape | |
| # Create ground truth mask (simulated consolidation in lower right lung) | |
| ground_truth = np.zeros((h, w), dtype=np.uint8) | |
| center_y, center_x = int(h * 0.6), int(w * 0.7) | |
| # Create irregular shape for consolidation | |
| for i in range(h): | |
| for j in range(w): | |
| dist = np.sqrt((i - center_y)**2 + (j - center_x)**2) | |
| noise = np.random.randn() * 20 | |
| if dist + noise < 80: | |
| ground_truth[i, j] = 255 | |
| # Create predicted mask (similar but slightly different) | |
| prediction = np.zeros((h, w), dtype=np.uint8) | |
| center_y_pred = int(h * 0.58) # Slightly shifted | |
| center_x_pred = int(w * 0.72) | |
| for i in range(h): | |
| for j in range(w): | |
| dist = np.sqrt((i - center_y_pred)**2 + (j - center_x_pred)**2) | |
| noise = np.random.randn() * 25 | |
| if dist + noise < 75: # Slightly smaller | |
| prediction[i, j] = 255 | |
| return ground_truth, prediction | |
| # Create sample masks | |
| image_path = "../data/Pacientes/7035909/7035909_20240326.jpg" | |
| gt_mask, pred_mask = create_sample_masks(image_path) | |
| # Save masks | |
| cv2.imwrite("../dice/annotations/ground_truth/sample_gt.png", gt_mask) | |
| cv2.imwrite("../dice/annotations/predictions/sample_pred.png", pred_mask) | |
| print("Sample masks created!") | |
| ``` | |
| ## 3. Calculate Dice Coefficient | |
| ```python | |
| # Load masks | |
| ground_truth = cv2.imread("../dice/annotations/ground_truth/sample_gt.png", cv2.IMREAD_GRAYSCALE) | |
| prediction = cv2.imread("../dice/annotations/predictions/sample_pred.png", cv2.IMREAD_GRAYSCALE) | |
| # Calculate metrics | |
| dice = calculate_dice_coefficient(ground_truth, prediction) | |
| iou = calculate_iou(ground_truth, prediction) | |
| precision, recall = calculate_precision_recall(ground_truth, prediction) | |
| f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 | |
| print("Segmentation Metrics:") | |
| print(f" Dice Coefficient: {dice:.4f}") | |
| print(f" IoU (Jaccard): {iou:.4f}") | |
| print(f" Precision: {precision:.4f}") | |
| print(f" Recall: {recall:.4f}") | |
| print(f" F1 Score: {f1:.4f}") | |
| # Interpretation | |
| if dice > 0.85: | |
| quality = "Excellent ✓" | |
| elif dice > 0.70: | |
| quality = "Good (acceptable for fuzzy borders)" | |
| else: | |
| quality = "Needs review" | |
| print(f"\nQuality Assessment: {quality}") | |
| ``` | |
| ## 4. Visualize Results | |
| ```python | |
| # Load original image | |
| original = cv2.imread(image_path) | |
| # Create overlay visualization | |
| overlay = create_overlay_visualization(original, ground_truth, prediction, alpha=0.5) | |
| # Display all views | |
| fig, axes = plt.subplots(2, 2, figsize=(12, 12)) | |
| axes[0, 0].imshow(cv2.cvtColor(original, cv2.COLOR_BGR2RGB)) | |
| axes[0, 0].set_title('Original X-ray') | |
| axes[0, 0].axis('off') | |
| axes[0, 1].imshow(ground_truth, cmap='Greens') | |
| axes[0, 1].set_title('Ground Truth Mask') | |
| axes[0, 1].axis('off') | |
| axes[1, 0].imshow(prediction, cmap='Reds') | |
| axes[1, 0].set_title('Predicted Mask') | |
| axes[1, 0].axis('off') | |
| axes[1, 1].imshow(overlay) | |
| axes[1, 1].set_title(f'Overlay (Dice: {dice:.3f})') | |
| axes[1, 1].axis('off') | |
| # Add legend | |
| legend_elements = [ | |
| plt.Line2D([0], [0], marker='o', color='w', markerfacecolor='g', markersize=10, label='Ground Truth'), | |
| plt.Line2D([0], [0], marker='o', color='w', markerfacecolor='r', markersize=10, label='Prediction'), | |
| plt.Line2D([0], [0], marker='o', color='w', markerfacecolor='y', markersize=10, label='Overlap') | |
| ] | |
| axes[1, 1].legend(handles=legend_elements, loc='upper right') | |
| plt.tight_layout() | |
| plt.savefig('../dice/results/example_visualization.png', dpi=150, bbox_inches='tight') | |
| plt.show() | |
| print("Visualization saved to: dice/results/example_visualization.png") | |
| ``` | |
| ## 5. Batch Calculate Dice Scores | |
| Process multiple mask pairs and generate report. | |
| ```python | |
| import pandas as pd | |
| from pathlib import Path | |
| def batch_calculate_dice(gt_dir, pred_dir, results_file): | |
| """Calculate Dice for all mask pairs in directories.""" | |
| gt_dir = Path(gt_dir) | |
| pred_dir = Path(pred_dir) | |
| results = [] | |
| # Find all ground truth masks | |
| gt_masks = list(gt_dir.glob("*.png")) + list(gt_dir.glob("*.jpg")) | |
| for gt_path in gt_masks: | |
| # Find corresponding prediction | |
| pred_path = pred_dir / gt_path.name | |
| if not pred_path.exists(): | |
| print(f"Warning: No prediction found for {gt_path.name}") | |
| continue | |
| # Load masks | |
| gt = cv2.imread(str(gt_path), cv2.IMREAD_GRAYSCALE) | |
| pred = cv2.imread(str(pred_path), cv2.IMREAD_GRAYSCALE) | |
| # Calculate metrics | |
| dice = calculate_dice_coefficient(gt, pred) | |
| iou = calculate_iou(gt, pred) | |
| precision, recall = calculate_precision_recall(gt, pred) | |
| f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 | |
| results.append({ | |
| 'Image': gt_path.name, | |
| 'Dice': dice, | |
| 'IoU': iou, | |
| 'Precision': precision, | |
| 'Recall': recall, | |
| 'F1': f1 | |
| }) | |
| print(f"Processed: {gt_path.name} - Dice: {dice:.4f}") | |
| # Create DataFrame | |
| df = pd.DataFrame(results) | |
| # Calculate summary statistics | |
| summary = { | |
| 'Metric': ['Mean', 'Std', 'Min', 'Max', 'Median'], | |
| 'Dice': [ | |
| df['Dice'].mean(), | |
| df['Dice'].std(), | |
| df['Dice'].min(), | |
| df['Dice'].max(), | |
| df['Dice'].median() | |
| ] | |
| } | |
| summary_df = pd.DataFrame(summary) | |
| # Save results | |
| with pd.ExcelWriter(results_file, engine='openpyxl') as writer: | |
| df.to_excel(writer, sheet_name='Individual Results', index=False) | |
| summary_df.to_excel(writer, sheet_name='Summary', index=False) | |
| print(f"\nResults saved to: {results_file}") | |
| print("\nSummary Statistics:") | |
| print(summary_df.to_string(index=False)) | |
| return df, summary_df | |
| # Run batch processing | |
| gt_directory = "../dice/annotations/ground_truth/" | |
| pred_directory = "../dice/annotations/predictions/" | |
| results_excel = "../dice/results/dice_scores_report.xlsx" | |
| df_results, df_summary = batch_calculate_dice(gt_directory, pred_directory, results_excel) | |
| ``` | |
| ## 6. Working with Real Patient Data | |
| Example of processing actual patient X-rays from your dataset. | |
| ```python | |
| # Get list of patient directories | |
| patients_dir = Path("../data/Pacientes/") | |
| patient_folders = [d for d in patients_dir.iterdir() if d.is_dir() and d.name.isdigit()] | |
| print(f"Found {len(patient_folders)} patient folders") | |
| # Process first 5 patients as example | |
| for patient_dir in patient_folders[:5]: | |
| patient_id = patient_dir.name | |
| print(f"\nProcessing Patient: {patient_id}") | |
| # Find X-ray image | |
| images = list(patient_dir.glob("*.jpg")) | |
| if images: | |
| xray_path = images[0] | |
| print(f" X-ray: {xray_path.name}") | |
| # Enhance image | |
| output_path = f"../dice/enhanced_images/{patient_id}_enhanced.jpg" | |
| enhanced = enhance_consolidation(str(xray_path), output_path) | |
| print(f" Enhanced image saved: {output_path}") | |
| # Here you would: | |
| # 1. Load or create annotations | |
| # 2. Calculate Dice if annotations exist | |
| # 3. Generate reports | |
| else: | |
| print(f" No images found") | |
| ``` | |
| ## 7. Quality Control Report | |
| Generate a comprehensive quality control report. | |
| ```python | |
| def generate_qc_report(results_df, output_path): | |
| """Generate quality control report with visualizations.""" | |
| fig, axes = plt.subplots(2, 2, figsize=(14, 10)) | |
| # 1. Dice score distribution | |
| axes[0, 0].hist(results_df['Dice'], bins=20, color='steelblue', edgecolor='black') | |
| axes[0, 0].axvline(0.7, color='orange', linestyle='--', label='Good threshold') | |
| axes[0, 0].axvline(0.85, color='green', linestyle='--', label='Excellent threshold') | |
| axes[0, 0].set_xlabel('Dice Coefficient') | |
| axes[0, 0].set_ylabel('Frequency') | |
| axes[0, 0].set_title('Distribution of Dice Scores') | |
| axes[0, 0].legend() | |
| # 2. Dice vs IoU scatter | |
| axes[0, 1].scatter(results_df['Dice'], results_df['IoU'], alpha=0.6) | |
| axes[0, 1].plot([0, 1], [0, 1], 'r--', label='Perfect correlation') | |
| axes[0, 1].set_xlabel('Dice Coefficient') | |
| axes[0, 1].set_ylabel('IoU') | |
| axes[0, 1].set_title('Dice vs IoU Correlation') | |
| axes[0, 1].legend() | |
| # 3. Precision-Recall scatter | |
| axes[1, 0].scatter(results_df['Recall'], results_df['Precision'], | |
| c=results_df['Dice'], cmap='viridis', alpha=0.6) | |
| axes[1, 0].set_xlabel('Recall') | |
| axes[1, 0].set_ylabel('Precision') | |
| axes[1, 0].set_title('Precision vs Recall (colored by Dice)') | |
| plt.colorbar(axes[1, 0].collections[0], ax=axes[1, 0], label='Dice') | |
| # 4. Quality categories | |
| categories = pd.cut(results_df['Dice'], | |
| bins=[0, 0.7, 0.85, 1.0], | |
| labels=['Needs Review', 'Good', 'Excellent']) | |
| category_counts = categories.value_counts() | |
| axes[1, 1].bar(range(len(category_counts)), category_counts.values, | |
| color=['red', 'orange', 'green']) | |
| axes[1, 1].set_xticks(range(len(category_counts))) | |
| axes[1, 1].set_xticklabels(category_counts.index, rotation=45) | |
| axes[1, 1].set_ylabel('Count') | |
| axes[1, 1].set_title('Segmentation Quality Distribution') | |
| plt.tight_layout() | |
| plt.savefig(output_path, dpi=150, bbox_inches='tight') | |
| plt.show() | |
| print(f"Quality control report saved: {output_path}") | |
| # Print summary | |
| print("\n=== Quality Control Summary ===") | |
| print(f"Total cases: {len(results_df)}") | |
| print(f"\nQuality breakdown:") | |
| for cat, count in category_counts.items(): | |
| pct = (count / len(results_df)) * 100 | |
| print(f" {cat}: {count} ({pct:.1f}%)") | |
| # Generate report if we have results | |
| if len(df_results) > 0: | |
| generate_qc_report(df_results, '../dice/results/quality_control_report.png') | |
| ``` | |
| ## Next Steps | |
| 1. **Annotate Real Data**: Use CVAT or Label Studio to create ground truth masks | |
| 2. **Train ML Model**: Use annotated data to train segmentation model | |
| 3. **Validate**: Use this toolkit to validate model predictions | |
| 4. **Iterate**: Refine annotations and model based on Dice scores | |
| ## Resources | |
| - [CVAT Installation](https://opencv.github.io/cvat/docs/) | |
| - [SAM Download](https://github.com/facebookresearch/segment-anything) | |
| - [Medical Image Segmentation Best Practices](https://arxiv.org/abs/1904.03882) | |