Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import base64 | |
| import uuid | |
| import time | |
| import requests as http_requests | |
| from datetime import datetime, timezone | |
| from flask import Flask, request, jsonify, send_from_directory, send_file | |
| import cv2 | |
| import numpy as np | |
| import pydicom | |
| from PIL import Image | |
| from reportlab.lib.pagesizes import A4 | |
| from reportlab.lib.units import mm | |
| from reportlab.lib import colors | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image as RLImage, PageBreak | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.enums import TA_CENTER, TA_LEFT | |
| app = Flask(__name__, static_folder='static', static_url_path='/static') | |
| UPLOAD_FOLDER = 'uploads' | |
| PROCESSED_FOLDER = 'processed' | |
| MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(PROCESSED_FOLDER, exist_ok=True) | |
| ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'dcm'} | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def get_file_extension(filename): | |
| return filename.rsplit('.', 1)[1].lower() if '.' in filename else '' | |
| def dicom_to_image(dicom_path): | |
| """Convert DICOM file to numpy array image.""" | |
| ds = pydicom.dcmread(dicom_path) | |
| pixel_array = ds.pixel_array | |
| # Normalize to 8-bit | |
| if pixel_array.dtype != np.uint8: | |
| pixel_array = ((pixel_array - pixel_array.min()) / | |
| (pixel_array.max() - pixel_array.min()) * 255).astype(np.uint8) | |
| # Convert to 3 channel if grayscale | |
| if len(pixel_array.shape) == 2: | |
| pixel_array = cv2.cvtColor(pixel_array, cv2.COLOR_GRAY2BGR) | |
| return pixel_array, ds | |
| def extract_dicom_metadata(ds): | |
| """Extract relevant DICOM metadata.""" | |
| metadata = {} | |
| # Basic tags | |
| if hasattr(ds, 'Modality'): | |
| metadata['Modality'] = str(ds.Modality) | |
| if hasattr(ds, 'StudyDate'): | |
| # Format date nicely if possible | |
| date_str = str(ds.StudyDate) | |
| if len(date_str) == 8: | |
| metadata['StudyDate'] = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}" | |
| else: | |
| metadata['StudyDate'] = date_str | |
| if hasattr(ds, 'SeriesDescription'): | |
| metadata['SeriesDescription'] = str(ds.SeriesDescription) | |
| if hasattr(ds, 'Manufacturer'): | |
| metadata['Manufacturer'] = str(ds.Manufacturer) | |
| if hasattr(ds, 'ManufacturerModelName'): | |
| metadata['EquipmentModel'] = str(ds.ManufacturerModelName) | |
| if hasattr(ds, 'InstitutionName'): | |
| metadata['InstitutionName'] = str(ds.InstitutionName) | |
| if hasattr(ds, 'PatientID'): | |
| metadata['PatientID'] = '[ANONYMIZED]' | |
| if hasattr(ds, 'PatientName'): | |
| metadata['PatientName'] = '[ANONYMIZED]' | |
| return metadata | |
| def apply_gaussian_blur(image, kernel_size=5): | |
| """Apply Gaussian blur for noise reduction.""" | |
| if kernel_size % 2 == 0: | |
| kernel_size += 1 | |
| return cv2.GaussianBlur(image, (kernel_size, kernel_size), 0) | |
| def apply_clahe(image, clip_limit=2.0, tile_grid_size=8): | |
| """Apply CLAHE (Contrast Limited Adaptive Histogram Equalization).""" | |
| # Convert to LAB color space | |
| if len(image.shape) == 3 and image.shape[2] == 3: | |
| lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB) | |
| l, a, b = cv2.split(lab) | |
| # Apply CLAHE to L channel | |
| clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(tile_grid_size, tile_grid_size)) | |
| l = clahe.apply(l) | |
| # Merge channels | |
| lab = cv2.merge([l, a, b]) | |
| return cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) | |
| else: | |
| clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(tile_grid_size, tile_grid_size)) | |
| return clahe.apply(image) | |
| def apply_pseudonymize(image, box_width=80, box_height=40): | |
| """Apply black box to top-left corner for patient privacy. | |
| Applies a fixed-size black rectangle at position (0,0). | |
| Box size is exactly 80px wide x 40px tall, regardless of image size. | |
| """ | |
| result = image.copy() | |
| img_height, img_width = result.shape[:2] | |
| # Ensure box doesn't exceed image bounds | |
| actual_width = min(box_width, img_width) | |
| actual_height = min(box_height, img_height) | |
| # Apply black box at top-left corner (0,0) | |
| result[0:actual_height, 0:actual_width] = 0 | |
| return result | |
| def image_to_base64(image): | |
| """Convert numpy array to base64 string.""" | |
| _, buffer = cv2.imencode('.png', image) | |
| return base64.b64encode(buffer).decode('utf-8') | |
| def calculate_quality_metrics(image): | |
| """Calculate image quality metrics: sharpness, contrast, and SNR.""" | |
| # Convert to grayscale if color | |
| if len(image.shape) == 3: | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| else: | |
| gray = image | |
| gray_float = gray.astype(np.float64) | |
| # 1. Sharpness Score (Laplacian variance method) | |
| laplacian = cv2.Laplacian(gray, cv2.CV_64F) | |
| laplacian_var = laplacian.var() | |
| # Normalize to 0-100 scale (typical range 0-2000 for ultrasound) | |
| sharpness_raw = min(laplacian_var, 2000) | |
| sharpness_score = round((sharpness_raw / 2000) * 100, 1) | |
| # Determine sharpness rating | |
| if sharpness_score >= 80: | |
| sharpness_rating = "Excellent" | |
| elif sharpness_score >= 60: | |
| sharpness_rating = "Good" | |
| elif sharpness_score >= 40: | |
| sharpness_rating = "Fair" | |
| else: | |
| sharpness_rating = "Poor" | |
| # 2. Contrast Ratio (std / mean) | |
| mean_val = gray_float.mean() | |
| std_val = gray_float.std() | |
| contrast_ratio = round(std_val / mean_val, 2) if mean_val > 0 else 0 | |
| # 3. SNR Estimate (mean / std in dB) | |
| if std_val > 0: | |
| snr_linear = mean_val / std_val | |
| snr_db = round(20 * np.log10(snr_linear), 1) if snr_linear > 0 else 0 | |
| else: | |
| snr_db = 0 | |
| return { | |
| 'sharpness_score': sharpness_score, | |
| 'sharpness_rating': sharpness_rating, | |
| 'sharpness_raw': round(laplacian_var, 2), | |
| 'contrast_ratio': contrast_ratio, | |
| 'snr_db': snr_db, | |
| 'mean': round(mean_val, 2), | |
| 'std': round(std_val, 2) | |
| } | |
| batch_storage = {} | |
| def srad_denoise(image, iterations=3, delta_t=0.08, kappa=40): | |
| if len(image.shape) == 3: | |
| channels = cv2.split(image) | |
| denoised_channels = [srad_denoise(ch, iterations, delta_t, kappa) for ch in channels] | |
| return cv2.merge(denoised_channels) | |
| img = image.astype(np.float64) | |
| for _ in range(iterations): | |
| grad_n = np.roll(img, -1, axis=0) - img | |
| grad_s = np.roll(img, 1, axis=0) - img | |
| grad_e = np.roll(img, -1, axis=1) - img | |
| grad_w = np.roll(img, 1, axis=1) - img | |
| grad_mag = np.sqrt(grad_n**2 + grad_s**2 + grad_e**2 + grad_w**2) | |
| q = grad_mag / (img + 1e-10) | |
| q0 = np.sqrt(np.mean(q**2)) | |
| c = 1.0 / (1.0 + ((q**2 - q0**2) / (q0**2 * (1 + q0**2) + 1e-10))) | |
| img = img + delta_t * ( | |
| c * grad_n + c * grad_s + c * grad_e + c * grad_w | |
| ) | |
| return np.clip(img, 0, 255).astype(np.uint8) | |
| def denoise_image(image, strength='medium'): | |
| if strength == 'none' or strength is None: | |
| return image.copy() | |
| params = { | |
| 'light': {'iterations': 2, 'delta_t': 0.05, 'kappa': 50}, | |
| 'medium': {'iterations': 3, 'delta_t': 0.08, 'kappa': 40}, | |
| 'strong': {'iterations': 5, 'delta_t': 0.1, 'kappa': 30} | |
| } | |
| p = params.get(strength, params['medium']) | |
| return srad_denoise(image, iterations=p['iterations'], delta_t=p['delta_t'], kappa=p['kappa']) | |
| def sharpen_image(image, strength='medium'): | |
| if strength == 'none' or strength is None: | |
| return image.copy() | |
| amounts = {'light': 1.0, 'medium': 1.5, 'strong': 2.0} | |
| amount = amounts.get(strength, 1.5) | |
| blurred = cv2.GaussianBlur(image, (0, 0), 3) | |
| sharpened = cv2.addWeighted(image, 1.0 + amount, blurred, -amount, 0) | |
| return sharpened | |
| def calculate_image_quality_score(image): | |
| metrics = calculate_quality_metrics(image) | |
| score = calculate_overall_quality(metrics['sharpness_score'], metrics['contrast_ratio'], metrics['snr_db']) | |
| if len(image.shape) == 3: | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| else: | |
| gray = image | |
| total_pixels = gray.size | |
| black_pct = np.sum(gray == 0) / total_pixels | |
| white_pct = np.sum(gray == 255) / total_pixels | |
| if black_pct > 0.02: | |
| score -= min(15, (black_pct - 0.02) * 500) | |
| if white_pct > 0.02: | |
| score -= min(15, (white_pct - 0.02) * 500) | |
| unique_vals = len(np.unique(gray)) | |
| tonal_range = unique_vals / 256.0 | |
| if tonal_range < 0.5: | |
| score -= (0.5 - tonal_range) * 20 | |
| score = max(0, round(score, 1)) | |
| return score, metrics | |
| def apply_enhancement_pipeline(image, params): | |
| result = image.copy() | |
| quality_history = [] | |
| orig_score, orig_metrics = calculate_image_quality_score(result) | |
| quality_history.append({'step': 'original', 'quality': orig_score}) | |
| denoise_strength = params.get('denoise_strength', 'medium') | |
| denoise_active = params.get('denoise_enabled', True) and denoise_strength and denoise_strength != 'none' | |
| if denoise_active: | |
| result = denoise_image(result, denoise_strength) | |
| step_score, _ = calculate_image_quality_score(result) | |
| quality_history.append({'step': 'after_denoise', 'quality': step_score}) | |
| if params.get('clahe_enabled', True): | |
| result = apply_clahe(result, clip_limit=params.get('clahe_clip_limit', 2.5)) | |
| step_score, _ = calculate_image_quality_score(result) | |
| quality_history.append({'step': 'after_clahe', 'quality': step_score}) | |
| sharpen_strength = params.get('sharpen_strength', 'medium') | |
| sharpen_active = params.get('sharpen_enabled', True) and sharpen_strength and sharpen_strength != 'none' | |
| if sharpen_active: | |
| result = sharpen_image(result, sharpen_strength) | |
| step_score, _ = calculate_image_quality_score(result) | |
| quality_history.append({'step': 'after_sharpen', 'quality': step_score}) | |
| final_score, final_metrics = calculate_image_quality_score(result) | |
| return { | |
| 'enhanced_image': result, | |
| 'quality_history': quality_history, | |
| 'quality_original': orig_score, | |
| 'quality_final': final_score, | |
| 'improvement': round(final_score - orig_score, 1), | |
| 'params': params, | |
| 'final_metrics': final_metrics | |
| } | |
| def quick_enhance_pipeline(image): | |
| result = image.copy() | |
| quality_history = [] | |
| orig_score, orig_metrics = calculate_image_quality_score(result) | |
| quality_history.append({'step': 'original', 'quality': orig_score}) | |
| result = srad_denoise(result, iterations=3, delta_t=0.08, kappa=40) | |
| step_score, _ = calculate_image_quality_score(result) | |
| quality_history.append({'step': 'after_denoise', 'quality': step_score}) | |
| result = apply_clahe(result, clip_limit=2.0) | |
| step_score, _ = calculate_image_quality_score(result) | |
| quality_history.append({'step': 'after_clahe', 'quality': step_score}) | |
| blurred = cv2.GaussianBlur(result, (0, 0), 3) | |
| result = cv2.addWeighted(result, 1.5, blurred, -0.5, 0) | |
| step_score, _ = calculate_image_quality_score(result) | |
| quality_history.append({'step': 'after_sharpen', 'quality': step_score}) | |
| final_score, final_metrics = calculate_image_quality_score(result) | |
| return { | |
| 'enhanced_image': result, | |
| 'quality_history': quality_history, | |
| 'quality_original': orig_score, | |
| 'quality_final': final_score, | |
| 'improvement': round(final_score - orig_score, 1), | |
| 'params': { | |
| 'denoise_method': 'SRAD', | |
| 'denoise_strength': 'medium', | |
| 'denoise_iterations': 10, | |
| 'clahe_clip_limit': 2.0, | |
| 'clahe_tile_grid': '8x8', | |
| 'sharpen_method': 'unsharp_mask', | |
| 'sharpen_alpha': 1.5, | |
| 'sharpen_beta': -0.5 | |
| }, | |
| 'final_metrics': final_metrics | |
| } | |
| from models import segment_image, is_segmentation_available | |
| from utils.image_processing import render_overlay | |
| def auto_optimize_pipeline(image, optimize_individually=True): | |
| orig_score, _ = calculate_image_quality_score(image) | |
| best_score = orig_score | |
| best_result = image.copy() | |
| best_params = {'denoise_strength': 'none', 'clahe_clip_limit': 1.3, 'sharpen_strength': 'none'} | |
| best_history = [{'step': 'original', 'quality': orig_score}] | |
| denoise_levels = ['none', 'light'] | |
| clahe_limits = [1.1, 1.3, 1.5] | |
| sharpen_levels = ['none', 'light'] | |
| for denoise in denoise_levels: | |
| denoised = denoise_image(image, denoise) | |
| for clip_limit in clahe_limits: | |
| enhanced = apply_clahe(denoised, clip_limit=clip_limit) | |
| for sharpen in sharpen_levels: | |
| final = sharpen_image(enhanced, sharpen) | |
| score, metrics = calculate_image_quality_score(final) | |
| if score > best_score: | |
| best_score = score | |
| best_result = final | |
| best_params = { | |
| 'denoise_strength': denoise, | |
| 'clahe_clip_limit': clip_limit, | |
| 'sharpen_strength': sharpen | |
| } | |
| current = image.copy() | |
| best_history = [{'step': 'original', 'quality': orig_score}] | |
| if best_params['denoise_strength'] != 'none': | |
| current = denoise_image(current, best_params['denoise_strength']) | |
| d_score, _ = calculate_image_quality_score(current) | |
| best_history.append({'step': 'after_denoise', 'quality': d_score}) | |
| current = apply_clahe(current, clip_limit=best_params['clahe_clip_limit']) | |
| c_score, _ = calculate_image_quality_score(current) | |
| best_history.append({'step': 'after_clahe', 'quality': c_score}) | |
| if best_params['sharpen_strength'] != 'none': | |
| current = sharpen_image(current, best_params['sharpen_strength']) | |
| s_score, _ = calculate_image_quality_score(current) | |
| best_history.append({'step': 'after_sharpen', 'quality': s_score}) | |
| final_score, final_metrics = calculate_image_quality_score(best_result) | |
| return { | |
| 'enhanced_image': best_result, | |
| 'quality_history': best_history, | |
| 'quality_original': orig_score, | |
| 'quality_final': final_score, | |
| 'improvement': round(final_score - orig_score, 1), | |
| 'params': best_params, | |
| 'final_metrics': final_metrics | |
| } | |
| def guided_optimize(image, philosophy='balanced'): | |
| ranges = { | |
| 'subtle': { | |
| 'denoise': ['none'], | |
| 'clahe': [1.0, 1.2], | |
| 'sharpen': ['none'] | |
| }, | |
| 'balanced': { | |
| 'denoise': ['light'], | |
| 'clahe': [1.2, 1.4], | |
| 'sharpen': ['light'] | |
| }, | |
| 'aggressive': { | |
| 'denoise': ['light'], | |
| 'clahe': [1.5, 1.8], | |
| 'sharpen': ['light'] | |
| } | |
| } | |
| param_range = ranges.get(philosophy, ranges['balanced']) | |
| orig_score, _ = calculate_image_quality_score(image) | |
| best_score = orig_score | |
| best_result = image.copy() | |
| best_params = { | |
| 'denoise_strength': param_range['denoise'][0], | |
| 'clahe_clip_limit': param_range['clahe'][0], | |
| 'sharpen_strength': param_range['sharpen'][0] | |
| } | |
| for denoise in param_range['denoise']: | |
| denoised = denoise_image(image, denoise) | |
| for clip_limit in param_range['clahe']: | |
| enhanced = apply_clahe(denoised, clip_limit=clip_limit) | |
| for sharpen in param_range['sharpen']: | |
| final = sharpen_image(enhanced, sharpen) | |
| score, _ = calculate_image_quality_score(final) | |
| if score > best_score: | |
| best_score = score | |
| best_result = final | |
| best_params = { | |
| 'denoise_strength': denoise, | |
| 'clahe_clip_limit': clip_limit, | |
| 'sharpen_strength': sharpen | |
| } | |
| current = image.copy() | |
| best_history = [{'step': 'original', 'quality': orig_score}] | |
| if best_params['denoise_strength'] != 'none': | |
| current = denoise_image(current, best_params['denoise_strength']) | |
| d_score, _ = calculate_image_quality_score(current) | |
| best_history.append({'step': 'after_denoise', 'quality': d_score}) | |
| current = apply_clahe(current, clip_limit=best_params['clahe_clip_limit']) | |
| c_score, _ = calculate_image_quality_score(current) | |
| best_history.append({'step': 'after_clahe', 'quality': c_score}) | |
| if best_params['sharpen_strength'] != 'none': | |
| current = sharpen_image(current, best_params['sharpen_strength']) | |
| s_score, _ = calculate_image_quality_score(current) | |
| best_history.append({'step': 'after_sharpen', 'quality': s_score}) | |
| final_score, final_metrics = calculate_image_quality_score(best_result) | |
| return { | |
| 'enhanced_image': best_result, | |
| 'quality_history': best_history, | |
| 'quality_original': orig_score, | |
| 'quality_final': final_score, | |
| 'improvement': round(final_score - orig_score, 1), | |
| 'params': best_params, | |
| 'philosophy': philosophy, | |
| 'final_metrics': final_metrics | |
| } | |
| def calculate_overall_quality(sharpness, contrast, snr): | |
| sharpness_norm = min(100, sharpness) | |
| contrast_norm = min(100, contrast * 100) | |
| snr_norm = min(100, max(0, (snr / 10) * 100)) | |
| score = (sharpness_norm * 0.4 + contrast_norm * 0.3 + snr_norm * 0.3) | |
| return round(score, 1) | |
| def get_quality_label(score): | |
| if score >= 80: | |
| return "Excellent" | |
| elif score >= 60: | |
| return "Good" | |
| elif score >= 40: | |
| return "Fair" | |
| else: | |
| return "Poor" | |
| def find_uploaded_file(file_id): | |
| for ext in ALLOWED_EXTENSIONS: | |
| potential_path = os.path.join(UPLOAD_FOLDER, f"{file_id}.{ext}") | |
| if os.path.exists(potential_path): | |
| return potential_path | |
| return None | |
| def load_image_from_path(filepath): | |
| file_ext = get_file_extension(filepath) | |
| if file_ext == 'dcm': | |
| image, _ = dicom_to_image(filepath) | |
| else: | |
| image = cv2.imread(filepath) | |
| return image | |
| def make_thumbnail_base64(image, size=100): | |
| h, w = image.shape[:2] | |
| scale = size / max(h, w) | |
| new_w, new_h = int(w * scale), int(h * scale) | |
| thumb = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA) | |
| _, buffer = cv2.imencode('.png', thumb) | |
| return base64.b64encode(buffer).decode('utf-8') | |
| def health_check(): | |
| return jsonify({ | |
| 'status': 'ok', | |
| 'message': 'InSono backend is ready', | |
| 'timestamp': datetime.now(timezone.utc).isoformat() | |
| }), 200 | |
| def index(): | |
| return send_from_directory('static', 'index.html') | |
| def upload_file(): | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file provided'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No file selected'}), 400 | |
| if not allowed_file(file.filename): | |
| return jsonify({'error': 'File type not supported. Please upload PNG, JPG, or DICOM files.'}), 400 | |
| # Check file size | |
| file.seek(0, 2) | |
| file_size = file.tell() | |
| file.seek(0) | |
| if file_size > MAX_FILE_SIZE: | |
| return jsonify({'error': 'File size exceeds 50MB limit'}), 400 | |
| # Generate unique filename | |
| file_ext = get_file_extension(file.filename) | |
| unique_id = str(uuid.uuid4()) | |
| filename = f"{unique_id}.{file_ext}" | |
| filepath = os.path.join(UPLOAD_FOLDER, filename) | |
| file.save(filepath) | |
| try: | |
| # Process based on file type | |
| is_dicom = file_ext == 'dcm' | |
| dicom_metadata = {} | |
| if is_dicom: | |
| image, ds = dicom_to_image(filepath) | |
| dicom_metadata = extract_dicom_metadata(ds) | |
| else: | |
| image = cv2.imread(filepath) | |
| if image is None: | |
| os.remove(filepath) | |
| return jsonify({'error': 'Could not read image file'}), 400 | |
| # Get image dimensions and color space | |
| height, width = image.shape[:2] | |
| color_space = 'RGB' if len(image.shape) == 3 and image.shape[2] == 3 else 'Grayscale' | |
| # Convert to base64 for preview | |
| image_base64 = image_to_base64(image) | |
| # Calculate quality metrics for original image | |
| quality_metrics = calculate_quality_metrics(image) | |
| return jsonify({ | |
| 'success': True, | |
| 'file_id': unique_id, | |
| 'filename': file.filename, | |
| 'file_size': file_size, | |
| 'width': width, | |
| 'height': height, | |
| 'color_space': color_space, | |
| 'is_dicom': is_dicom, | |
| 'dicom_metadata': dicom_metadata, | |
| 'image': image_base64, | |
| 'quality_metrics': quality_metrics | |
| }) | |
| except Exception as e: | |
| if os.path.exists(filepath): | |
| os.remove(filepath) | |
| return jsonify({'error': f'Error processing file: {str(e)}'}), 500 | |
| def process_image(): | |
| start_time = time.time() | |
| data = request.json | |
| if not data or 'file_id' not in data: | |
| return jsonify({'error': 'No file ID provided'}), 400 | |
| file_id = data['file_id'] | |
| mode = data.get('mode', 'legacy') | |
| pseudonymize = data.get('pseudonymize', False) | |
| filepath = None | |
| for ext in ALLOWED_EXTENSIONS: | |
| potential_path = os.path.join(UPLOAD_FOLDER, f"{file_id}.{ext}") | |
| if os.path.exists(potential_path): | |
| filepath = potential_path | |
| break | |
| if not filepath: | |
| return jsonify({'error': 'File not found'}), 404 | |
| try: | |
| file_ext = get_file_extension(filepath) | |
| if file_ext == 'dcm': | |
| image, _ = dicom_to_image(filepath) | |
| else: | |
| image = cv2.imread(filepath) | |
| if image is None: | |
| return jsonify({'error': 'Could not read image'}), 500 | |
| processed = image.copy() | |
| processing_log = [] | |
| quality_history = [] | |
| enhancement_params = {} | |
| result = None | |
| if mode == 'quick': | |
| result = quick_enhance_pipeline(image) | |
| processed = result['enhanced_image'] | |
| enhancement_params = result['params'] | |
| quality_history = result['quality_history'] | |
| processing_log.append("Quick Enhancement Pipeline (Medical Ultrasound)") | |
| processing_log.append("SRAD Denoising (3 iterations, ultrasound-optimized)") | |
| processing_log.append("CLAHE (clipLimit=2.0, tileGrid=8x8)") | |
| processing_log.append("Unsharp Mask (alpha=1.5, beta=-0.5)") | |
| elif mode == 'advanced': | |
| custom_params = data.get('custom_params', {}) | |
| denoise_enabled = custom_params.get('denoise_enabled', True) | |
| denoise_strength = custom_params.get('denoise_strength', 'medium') | |
| clahe_enabled = custom_params.get('clahe_enabled', True) | |
| clahe_clip_limit = custom_params.get('clahe_clip_limit', 2.5) | |
| sharpen_enabled = custom_params.get('sharpen_enabled', True) | |
| sharpen_strength = custom_params.get('sharpen_strength', 'medium') | |
| pipeline_params = { | |
| 'denoise_enabled': denoise_enabled, | |
| 'denoise_strength': denoise_strength, | |
| 'clahe_enabled': clahe_enabled, | |
| 'clahe_clip_limit': clahe_clip_limit, | |
| 'sharpen_enabled': sharpen_enabled, | |
| 'sharpen_strength': sharpen_strength | |
| } | |
| result = apply_enhancement_pipeline(image, pipeline_params) | |
| processed = result['enhanced_image'] | |
| enhancement_params = { | |
| 'denoise_strength': denoise_strength if denoise_enabled else 'none', | |
| 'clahe_clip_limit': clahe_clip_limit if clahe_enabled else None, | |
| 'sharpen_strength': sharpen_strength if sharpen_enabled else 'none' | |
| } | |
| quality_history = result['quality_history'] | |
| if denoise_enabled: | |
| h_values = {'light': 5, 'medium': 10, 'strong': 15} | |
| processing_log.append(f"Denoising ({denoise_strength}, h={h_values.get(denoise_strength, 10)})") | |
| if clahe_enabled: | |
| processing_log.append(f"CLAHE (clip limit: {clahe_clip_limit})") | |
| if sharpen_enabled: | |
| amounts = {'light': 1.0, 'medium': 1.5, 'strong': 2.0} | |
| processing_log.append(f"Sharpening ({sharpen_strength}, amount={amounts.get(sharpen_strength, 1.5)})") | |
| else: | |
| blur_amount = data.get('blur_amount', 0) | |
| clahe_enabled = data.get('clahe_enabled', False) | |
| clahe_clip_limit = data.get('clahe_clip_limit', 2.0) | |
| if blur_amount > 0: | |
| kernel_size = int(blur_amount * 2) + 1 | |
| processed = apply_gaussian_blur(processed, kernel_size) | |
| processing_log.append(f"Gaussian Blur (kernel: {kernel_size}x{kernel_size})") | |
| if clahe_enabled: | |
| processed = apply_clahe(processed, clip_limit=clahe_clip_limit) | |
| processing_log.append(f"CLAHE (clip limit: {clahe_clip_limit})") | |
| if pseudonymize: | |
| processed = apply_pseudonymize(processed) | |
| processing_log.append("Pseudonymization (80x40px)") | |
| processed_base64 = image_to_base64(processed) | |
| original_quality = calculate_quality_metrics(image) | |
| processed_quality = calculate_quality_metrics(processed) | |
| sharpness_improvement = round(processed_quality['sharpness_score'] - original_quality['sharpness_score'], 1) | |
| contrast_improvement = round(((processed_quality['contrast_ratio'] - original_quality['contrast_ratio']) / original_quality['contrast_ratio'] * 100) if original_quality['contrast_ratio'] > 0 else 0, 1) | |
| snr_improvement = round(processed_quality['snr_db'] - original_quality['snr_db'], 1) | |
| processing_duration = round(time.time() - start_time, 3) | |
| processing_timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') | |
| original_overall = calculate_overall_quality( | |
| original_quality['sharpness_score'], original_quality['contrast_ratio'], original_quality['snr_db'] | |
| ) | |
| processed_overall = calculate_overall_quality( | |
| processed_quality['sharpness_score'], processed_quality['contrast_ratio'], processed_quality['snr_db'] | |
| ) | |
| response_data = { | |
| 'success': True, | |
| 'processed_image': processed_base64, | |
| 'processing_log': processing_log, | |
| 'processing_duration': processing_duration, | |
| 'processing_timestamp': processing_timestamp, | |
| 'original_quality': original_quality, | |
| 'processed_quality': processed_quality, | |
| 'quality_improvement': { | |
| 'sharpness': sharpness_improvement, | |
| 'contrast_percent': contrast_improvement, | |
| 'snr': snr_improvement | |
| }, | |
| 'mode': mode, | |
| 'enhancement_params': enhancement_params, | |
| 'quality_history': quality_history, | |
| 'original_overall': original_overall, | |
| 'processed_overall': processed_overall | |
| } | |
| return jsonify(response_data) | |
| except Exception as e: | |
| return jsonify({'error': f'Error processing image: {str(e)}'}), 500 | |
| def download_image(): | |
| data = request.json | |
| if not data or 'image' not in data: | |
| return jsonify({'error': 'No image data provided'}), 400 | |
| try: | |
| image_data = base64.b64decode(data['image']) | |
| return send_file( | |
| io.BytesIO(image_data), | |
| mimetype='image/png', | |
| as_attachment=True, | |
| download_name='processed_ultrasound.png' | |
| ) | |
| except Exception as e: | |
| return jsonify({'error': f'Error downloading: {str(e)}'}), 500 | |
| segmentation_storage = {} | |
| def ai_segment(): | |
| data = request.json | |
| if not data or 'file_id' not in data: | |
| return jsonify({'error': 'No file_id provided'}), 400 | |
| file_id = data['file_id'] | |
| x_pct = data.get('x', 50) | |
| y_pct = data.get('y', 50) | |
| print(f"[API] ai-segment: file_id={file_id}, x_pct={x_pct}, y_pct={y_pct}") | |
| filepath = find_uploaded_file(file_id) | |
| if not filepath: | |
| return jsonify({'error': 'File not found'}), 404 | |
| try: | |
| image = load_image_from_path(filepath) | |
| if image is None: | |
| return jsonify({'error': 'Could not read image'}), 500 | |
| img_h, img_w = image.shape[:2] | |
| pixel_x = int(float(x_pct) / 100.0 * img_w) | |
| pixel_y = int(float(y_pct) / 100.0 * img_h) | |
| pixel_x = max(0, min(pixel_x, img_w - 1)) | |
| pixel_y = max(0, min(pixel_y, img_h - 1)) | |
| total_pixels = img_h * img_w | |
| print(f"[API] Image: {img_w}x{img_h}, click pixel: ({pixel_x}, {pixel_y})") | |
| result = segment_image(image, pixel_x, pixel_y) | |
| if not result.get('success'): | |
| error_msg = result.get('message', 'Segmentation failed') | |
| error_type = result.get('error_type', 'unknown') | |
| print(f"[API] Segmentation failed: {error_type} — {error_msg}") | |
| return jsonify({ | |
| 'error': error_msg, | |
| 'error_type': error_type | |
| }), 503 | |
| masks_list = result['masks'] | |
| method = result['method'] | |
| masks_list.sort(key=lambda m: m['area']) | |
| while len(masks_list) < 3: | |
| masks_list.append(masks_list[-1]) | |
| masks_list = masks_list[:3] | |
| default_idx = min(1, len(masks_list) - 1) | |
| segmentation_storage[file_id] = { | |
| 'masks': masks_list, | |
| 'image': image | |
| } | |
| overlays = [] | |
| for i, m in enumerate(masks_list): | |
| area_pct = round((m['area'] / total_pixels) * 100, 2) | |
| overlays.append({ | |
| 'segmented_image': render_overlay(image, m['mask']), | |
| 'mask_area_percent': area_pct, | |
| 'score': m['score'], | |
| 'index': i | |
| }) | |
| selected_area_pct = round((masks_list[default_idx]['area'] / total_pixels) * 100, 2) | |
| print(f"[API] Segmentation success: {len(overlays)} masks, method={method}") | |
| return jsonify({ | |
| 'success': True, | |
| 'segmented_image': overlays[default_idx]['segmented_image'], | |
| 'method': method, | |
| 'mask_area_percent': selected_area_pct, | |
| 'selected_index': default_idx, | |
| 'masks': overlays, | |
| 'mask_count': len(overlays) | |
| }) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return jsonify({'error': f'Error during segmentation: {str(e)}'}), 500 | |
| def ai_segment_select(): | |
| data = request.json | |
| if not data or 'file_id' not in data or 'mask_index' not in data: | |
| return jsonify({'error': 'file_id and mask_index required'}), 400 | |
| file_id = data['file_id'] | |
| mask_index = int(data['mask_index']) | |
| if file_id not in segmentation_storage: | |
| return jsonify({'error': 'No segmentation data found. Please segment again.'}), 404 | |
| stored = segmentation_storage[file_id] | |
| masks_list = stored['masks'] | |
| image = stored['image'] | |
| if mask_index < 0 or mask_index >= len(masks_list): | |
| return jsonify({'error': f'Invalid mask index. Valid: 0-{len(masks_list)-1}'}), 400 | |
| selected = masks_list[mask_index] | |
| total_pixels = image.shape[0] * image.shape[1] | |
| area_pct = round((selected['area'] / total_pixels) * 100, 2) | |
| return jsonify({ | |
| 'success': True, | |
| 'segmented_image': render_overlay(image, selected['mask']), | |
| 'mask_area_percent': area_pct, | |
| 'selected_index': mask_index | |
| }) | |
| def segmentation_status(): | |
| available = is_segmentation_available() | |
| import psutil | |
| mem = psutil.virtual_memory() | |
| return jsonify({ | |
| 'model': 'MONAI UNet', | |
| 'available': available, | |
| 'ram': { | |
| 'total_gb': round(mem.total / (1024**3), 1), | |
| 'available_gb': round(mem.available / (1024**3), 1), | |
| 'sufficient': True | |
| } | |
| }) | |
| def segmentation_methods_legacy(): | |
| available = is_segmentation_available() | |
| return jsonify({ | |
| 'methods': [{ | |
| 'name': 'monai', | |
| 'available': available, | |
| 'tier': 1, | |
| 'description': 'MONAI UNet - medical image segmentation' | |
| }] | |
| }) | |
| def batch_upload(): | |
| files = request.files.getlist('files') | |
| if not files or len(files) == 0: | |
| return jsonify({'error': 'No files provided'}), 400 | |
| if len(files) > 50: | |
| return jsonify({'error': 'Maximum 50 files allowed per batch'}), 400 | |
| batch_id = str(uuid.uuid4()) | |
| results = [] | |
| errors = [] | |
| for file in files: | |
| if file.filename == '': | |
| continue | |
| if not allowed_file(file.filename): | |
| errors.append({'filename': file.filename, 'error': 'Unsupported file type'}) | |
| continue | |
| file.seek(0, 2) | |
| file_size = file.tell() | |
| file.seek(0) | |
| if file_size > MAX_FILE_SIZE: | |
| errors.append({'filename': file.filename, 'error': 'File exceeds 50MB limit'}) | |
| continue | |
| file_ext = get_file_extension(file.filename) | |
| unique_id = str(uuid.uuid4()) | |
| filename = f"{unique_id}.{file_ext}" | |
| filepath = os.path.join(UPLOAD_FOLDER, filename) | |
| file.save(filepath) | |
| try: | |
| is_dicom = file_ext == 'dcm' | |
| if is_dicom: | |
| image, ds = dicom_to_image(filepath) | |
| else: | |
| image = cv2.imread(filepath) | |
| if image is None: | |
| os.remove(filepath) | |
| errors.append({'filename': file.filename, 'error': 'Could not read image'}) | |
| continue | |
| height, width = image.shape[:2] | |
| color_space = 'RGB' if len(image.shape) == 3 and image.shape[2] == 3 else 'Grayscale' | |
| quality = calculate_quality_metrics(image) | |
| overall_score = calculate_overall_quality( | |
| quality['sharpness_score'], quality['contrast_ratio'], quality['snr_db'] | |
| ) | |
| thumbnail = make_thumbnail_base64(image) | |
| result = { | |
| 'file_id': unique_id, | |
| 'filename': file.filename, | |
| 'file_size': file_size, | |
| 'width': width, | |
| 'height': height, | |
| 'color_space': color_space, | |
| 'is_dicom': is_dicom, | |
| 'quality_metrics': quality, | |
| 'overall_quality': overall_score, | |
| 'quality_label': get_quality_label(overall_score), | |
| 'thumbnail': thumbnail, | |
| 'status': 'uploaded' | |
| } | |
| results.append(result) | |
| except Exception as e: | |
| if os.path.exists(filepath): | |
| os.remove(filepath) | |
| errors.append({'filename': file.filename, 'error': str(e)}) | |
| batch_storage[batch_id] = { | |
| 'items': results, | |
| 'created': datetime.now(timezone.utc).isoformat(), | |
| 'processed': False | |
| } | |
| return jsonify({ | |
| 'success': True, | |
| 'batch_id': batch_id, | |
| 'uploaded': len(results), | |
| 'failed': len(errors), | |
| 'results': results, | |
| 'errors': errors | |
| }) | |
| def batch_process(): | |
| start_time = time.time() | |
| data = request.json | |
| if not data or 'batch_id' not in data: | |
| return jsonify({'error': 'No batch_id provided'}), 400 | |
| batch_id = data['batch_id'] | |
| if batch_id not in batch_storage: | |
| return jsonify({'error': 'Batch not found'}), 404 | |
| mode = data.get('mode', 'auto') | |
| pseudonymize = data.get('pseudonymize', False) | |
| optimize_individually = data.get('optimize_individually', True) | |
| philosophy = data.get('philosophy', 'balanced') | |
| auto_optimize_guided = data.get('auto_optimize_guided', True) | |
| custom_params = data.get('custom_params', {}) | |
| batch = batch_storage[batch_id] | |
| processed_count = 0 | |
| error_count = 0 | |
| shared_auto_params = None | |
| denoise_contributions = [] | |
| clahe_contributions = [] | |
| sharpen_contributions = [] | |
| for item in batch['items']: | |
| filepath = find_uploaded_file(item['file_id']) | |
| if not filepath: | |
| item['status'] = 'error' | |
| item['error'] = 'File not found on disk' | |
| error_count += 1 | |
| continue | |
| try: | |
| image = load_image_from_path(filepath) | |
| if image is None: | |
| item['status'] = 'error' | |
| item['error'] = 'Could not read image' | |
| error_count += 1 | |
| continue | |
| if mode == 'ai' or mode == 'auto': | |
| if optimize_individually or shared_auto_params is None: | |
| result = auto_optimize_pipeline(image) | |
| if not optimize_individually and shared_auto_params is None: | |
| shared_auto_params = result['params'] | |
| else: | |
| result = apply_enhancement_pipeline(image, { | |
| 'denoise_enabled': True, | |
| 'denoise_strength': shared_auto_params['denoise_strength'], | |
| 'clahe_enabled': True, | |
| 'clahe_clip_limit': shared_auto_params['clahe_clip_limit'], | |
| 'sharpen_enabled': True, | |
| 'sharpen_strength': shared_auto_params['sharpen_strength'] | |
| }) | |
| elif mode == 'guided': | |
| if auto_optimize_guided: | |
| result = guided_optimize(image, philosophy) | |
| else: | |
| preset_params = { | |
| 'subtle': {'denoise_strength': 'light', 'clahe_clip_limit': 1.5, 'sharpen_strength': 'light'}, | |
| 'balanced': {'denoise_strength': 'medium', 'clahe_clip_limit': 2.5, 'sharpen_strength': 'medium'}, | |
| 'aggressive': {'denoise_strength': 'strong', 'clahe_clip_limit': 4.0, 'sharpen_strength': 'strong'} | |
| } | |
| p = preset_params.get(philosophy, preset_params['balanced']) | |
| result = apply_enhancement_pipeline(image, { | |
| 'denoise_enabled': True, 'denoise_strength': p['denoise_strength'], | |
| 'clahe_enabled': True, 'clahe_clip_limit': p['clahe_clip_limit'], | |
| 'sharpen_enabled': True, 'sharpen_strength': p['sharpen_strength'] | |
| }) | |
| else: | |
| result = apply_enhancement_pipeline(image, { | |
| 'denoise_enabled': custom_params.get('denoise_enabled', True), | |
| 'denoise_strength': custom_params.get('denoise_strength', 'medium'), | |
| 'clahe_enabled': custom_params.get('clahe_enabled', True), | |
| 'clahe_clip_limit': custom_params.get('clahe_clip_limit', 2.5), | |
| 'sharpen_enabled': custom_params.get('sharpen_enabled', True), | |
| 'sharpen_strength': custom_params.get('sharpen_strength', 'medium') | |
| }) | |
| processed_img = result['enhanced_image'] | |
| if pseudonymize: | |
| processed_img = apply_pseudonymize(processed_img) | |
| processed_quality = result['final_metrics'] | |
| orig_score = result['quality_original'] | |
| proc_score = result['quality_final'] | |
| qh = result['quality_history'] | |
| if len(qh) >= 2: | |
| denoise_step = next((s for s in qh if s['step'] == 'after_denoise'), None) | |
| clahe_step = next((s for s in qh if s['step'] == 'after_clahe'), None) | |
| sharpen_step = next((s for s in qh if s['step'] == 'after_sharpen'), None) | |
| orig_q = qh[0]['quality'] | |
| if denoise_step: | |
| denoise_contributions.append(round(denoise_step['quality'] - orig_q, 1)) | |
| if clahe_step: | |
| prev_q = denoise_step['quality'] if denoise_step else orig_q | |
| clahe_contributions.append(round(clahe_step['quality'] - prev_q, 1)) | |
| if sharpen_step: | |
| prev_q = clahe_step['quality'] if clahe_step else (denoise_step['quality'] if denoise_step else orig_q) | |
| sharpen_contributions.append(round(sharpen_step['quality'] - prev_q, 1)) | |
| item['original_quality'] = calculate_quality_metrics(image) | |
| item['processed_quality'] = processed_quality | |
| item['original_overall'] = orig_score | |
| item['processed_overall'] = proc_score | |
| item['quality_improvement'] = result['improvement'] | |
| item['overall_quality'] = proc_score | |
| item['quality_label'] = get_quality_label(proc_score) | |
| item['processed_thumbnail'] = make_thumbnail_base64(processed_img) | |
| item['enhancement_params'] = result['params'] | |
| item['quality_history'] = result['quality_history'] | |
| item['enhancement_mode'] = mode | |
| item['status'] = 'processed' | |
| processed_count += 1 | |
| except Exception as e: | |
| item['status'] = 'error' | |
| item['error'] = str(e) | |
| error_count += 1 | |
| batch['processed'] = True | |
| batch['settings'] = { | |
| 'mode': mode, | |
| 'pseudonymize': pseudonymize, | |
| 'philosophy': philosophy if mode == 'guided' else None, | |
| 'optimize_individually': optimize_individually if mode == 'auto' else None, | |
| 'custom_params': custom_params if mode == 'custom' else None, | |
| } | |
| processed_items = [i for i in batch['items'] if i['status'] == 'processed'] | |
| avg_quality = round(sum(i['processed_overall'] for i in processed_items) / len(processed_items), 1) if processed_items else 0 | |
| best_quality = max((i['processed_overall'] for i in processed_items), default=0) | |
| avg_improvement = round(sum(i['quality_improvement'] for i in processed_items) / len(processed_items), 1) if processed_items else 0 | |
| avg_denoise = round(sum(denoise_contributions) / len(denoise_contributions), 1) if denoise_contributions else 0 | |
| avg_clahe = round(sum(clahe_contributions) / len(clahe_contributions), 1) if clahe_contributions else 0 | |
| avg_sharpen = round(sum(sharpen_contributions) / len(sharpen_contributions), 1) if sharpen_contributions else 0 | |
| duration = round(time.time() - start_time, 3) | |
| return jsonify({ | |
| 'success': True, | |
| 'batch_id': batch_id, | |
| 'processed_count': processed_count, | |
| 'error_count': error_count, | |
| 'processing_duration': duration, | |
| 'statistics': { | |
| 'total': len(batch['items']), | |
| 'processed': processed_count, | |
| 'errors': error_count, | |
| 'avg_quality': avg_quality, | |
| 'best_quality': best_quality, | |
| 'avg_improvement': avg_improvement, | |
| 'avg_denoise_contribution': avg_denoise, | |
| 'avg_clahe_contribution': avg_clahe, | |
| 'avg_sharpen_contribution': avg_sharpen | |
| }, | |
| 'results': batch['items'] | |
| }) | |
| def filter_batch(): | |
| data = request.json | |
| if not data or 'batch_id' not in data: | |
| return jsonify({'error': 'No batch_id provided'}), 400 | |
| batch_id = data['batch_id'] | |
| if batch_id not in batch_storage: | |
| return jsonify({'error': 'Batch not found'}), 404 | |
| threshold = data.get('threshold', 70) | |
| top_n = data.get('top_n', None) | |
| batch = batch_storage[batch_id] | |
| processed_items = [i for i in batch['items'] if i['status'] == 'processed'] | |
| sorted_items = sorted(processed_items, key=lambda x: x.get('processed_overall', x.get('overall_quality', 0)), reverse=True) | |
| if top_n: | |
| included = sorted_items[:top_n] | |
| excluded = sorted_items[top_n:] | |
| else: | |
| included = [i for i in sorted_items if i.get('processed_overall', i.get('overall_quality', 0)) >= threshold] | |
| excluded = [i for i in sorted_items if i.get('processed_overall', i.get('overall_quality', 0)) < threshold] | |
| avg_quality = round(sum(i.get('processed_overall', i.get('overall_quality', 0)) for i in processed_items) / len(processed_items), 1) if processed_items else 0 | |
| best_quality = max((i.get('processed_overall', i.get('overall_quality', 0)) for i in processed_items), default=0) | |
| return jsonify({ | |
| 'success': True, | |
| 'included': included, | |
| 'excluded': excluded, | |
| 'statistics': { | |
| 'total_count': len(processed_items), | |
| 'passed_count': len(included), | |
| 'failed_count': len(excluded), | |
| 'average_quality': avg_quality, | |
| 'best_quality': best_quality | |
| } | |
| }) | |
| def batch_report(): | |
| data = request.json | |
| if not data or 'batch_id' not in data: | |
| return jsonify({'error': 'No batch_id provided'}), 400 | |
| batch_id = data['batch_id'] | |
| if batch_id not in batch_storage: | |
| return jsonify({'error': 'Batch not found'}), 404 | |
| batch = batch_storage[batch_id] | |
| file_ids = data.get('file_ids', None) | |
| if file_ids: | |
| items = [i for i in batch['items'] if i['file_id'] in file_ids and i['status'] == 'processed'] | |
| else: | |
| items = [i for i in batch['items'] if i['status'] == 'processed'] | |
| items = sorted(items, key=lambda x: x.get('processed_overall', x.get('overall_quality', 0)), reverse=True) | |
| if not items: | |
| return jsonify({'error': 'No processed images to include in report'}), 400 | |
| try: | |
| buffer = io.BytesIO() | |
| doc = SimpleDocTemplate(buffer, pagesize=A4, | |
| topMargin=15*mm, bottomMargin=15*mm, | |
| leftMargin=15*mm, rightMargin=15*mm) | |
| styles = getSampleStyleSheet() | |
| story = [] | |
| section_style = ParagraphStyle('Section', parent=styles['Heading2'], | |
| fontSize=14, textColor=colors.white, | |
| backColor=colors.HexColor('#2563eb'), | |
| borderPadding=(4, 6, 4, 6), | |
| spaceAfter=4*mm, spaceBefore=6*mm) | |
| label_style = ParagraphStyle('Label', parent=styles['Normal'], | |
| fontSize=10, textColor=colors.HexColor('#334155')) | |
| small_style = ParagraphStyle('Small', parent=styles['Normal'], | |
| fontSize=8, textColor=colors.HexColor('#94a3b8')) | |
| ts = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') | |
| timestamp_style = ParagraphStyle('Timestamp', parent=styles['Normal'], | |
| fontSize=10, textColor=colors.HexColor('#94a3b8'), | |
| spaceAfter=4*mm) | |
| story.append(Paragraph(f'Generated: {ts}', timestamp_style)) | |
| story.append(Paragraph('BATCH SUMMARY', section_style)) | |
| settings = batch.get('settings', {}) | |
| all_processed = [i for i in batch['items'] if i['status'] == 'processed'] | |
| avg_q = round(sum(i.get('processed_overall', i.get('overall_quality', 0)) for i in all_processed) / len(all_processed), 1) if all_processed else 0 | |
| best_q = max((i.get('processed_overall', i.get('overall_quality', 0)) for i in all_processed), default=0) | |
| avg_imp = round(sum(i.get('quality_improvement', 0) for i in all_processed) / len(all_processed), 1) if all_processed else 0 | |
| mode_label = {'auto': 'Auto Optimize', 'guided': 'Guided', 'custom': 'Custom'}.get(settings.get('mode', 'auto'), 'Auto Optimize') | |
| summary_data = [ | |
| ['Total Images', str(len(batch['items']))], | |
| ['Processed', str(len(all_processed))], | |
| ['Enhancement Mode', mode_label], | |
| ['Avg Quality Score', f'{avg_q}/100'], | |
| ['Best Quality Score', f'{best_q}/100'], | |
| ['Avg Improvement', f'+{avg_imp}' if avg_imp >= 0 else str(avg_imp)], | |
| ['Pseudonymized', 'Yes' if settings.get('pseudonymize') else 'No'], | |
| ] | |
| if settings.get('mode') == 'guided' and settings.get('philosophy'): | |
| summary_data.insert(4, ['Philosophy', settings['philosophy'].capitalize()]) | |
| if settings.get('mode') == 'custom' and settings.get('custom_params'): | |
| cp = settings['custom_params'] | |
| if cp.get('denoise_enabled'): | |
| summary_data.append(['Denoise', cp.get('denoise_strength', 'medium').capitalize()]) | |
| if cp.get('clahe_enabled'): | |
| summary_data.append(['CLAHE Clip Limit', str(cp.get('clahe_clip_limit', 2.5))]) | |
| if cp.get('sharpen_enabled'): | |
| summary_data.append(['Sharpen', cp.get('sharpen_strength', 'medium').capitalize()]) | |
| summary_table = Table(summary_data, colWidths=[55*mm, 50*mm]) | |
| summary_table.setStyle(TableStyle([ | |
| ('FONTSIZE', (0, 0), (-1, -1), 9), | |
| ('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'), | |
| ('TEXTCOLOR', (0, 0), (0, -1), colors.HexColor('#334155')), | |
| ('TEXTCOLOR', (1, 0), (1, -1), colors.HexColor('#1e293b')), | |
| ('BOTTOMPADDING', (0, 0), (-1, -1), 3), | |
| ('TOPPADDING', (0, 0), (-1, -1), 3), | |
| ('LINEBELOW', (0, 0), (-1, -2), 0.5, colors.HexColor('#e2e8f0')), | |
| ])) | |
| story.append(summary_table) | |
| story.append(Spacer(1, 6*mm)) | |
| story.append(Paragraph('QUALITY RANKING', section_style)) | |
| rank_header = ['Rank', 'Filename', 'Quality', 'Sharpness', 'Contrast', 'SNR (dB)'] | |
| rank_rows = [rank_header] | |
| for idx, item in enumerate(items): | |
| pq = item.get('processed_quality', item.get('quality_metrics', {})) | |
| score = item.get('processed_overall', item.get('overall_quality', 0)) | |
| rank_rows.append([ | |
| str(idx + 1), | |
| item['filename'][:25], | |
| f'{score}', | |
| str(pq.get('sharpness_score', 'N/A')), | |
| str(pq.get('contrast_ratio', 'N/A')), | |
| str(pq.get('snr_db', 'N/A')) | |
| ]) | |
| rank_table = Table(rank_rows, colWidths=[12*mm, 52*mm, 20*mm, 24*mm, 22*mm, 22*mm]) | |
| rank_table.setStyle(TableStyle([ | |
| ('FONTSIZE', (0, 0), (-1, -1), 8), | |
| ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#2563eb')), | |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.white), | |
| ('ALIGN', (0, 0), (-1, -1), 'CENTER'), | |
| ('ALIGN', (1, 1), (1, -1), 'LEFT'), | |
| ('BOTTOMPADDING', (0, 0), (-1, -1), 3), | |
| ('TOPPADDING', (0, 0), (-1, -1), 3), | |
| ('LINEBELOW', (0, 0), (-1, -1), 0.5, colors.HexColor('#e2e8f0')), | |
| ('ROWBACKGROUNDS', (0, 1), (-1, -1), [colors.white, colors.HexColor('#f8fafc')]), | |
| ])) | |
| story.append(rank_table) | |
| for idx, item in enumerate(items): | |
| story.append(PageBreak()) | |
| score = item.get('processed_overall', item.get('overall_quality', 0)) | |
| label = get_quality_label(score) | |
| story.append(Paragraph(f'Image {idx+1}: {item["filename"]}', section_style)) | |
| story.append(Paragraph(f'Quality Score: {score}/100 ({label})', label_style)) | |
| ep = item.get('enhancement_params', {}) | |
| if ep: | |
| parts = [] | |
| if ep.get('denoise_strength'): | |
| parts.append(f"Denoise: {ep['denoise_strength'].capitalize()}") | |
| if ep.get('clahe_clip_limit'): | |
| parts.append(f"CLAHE: {ep['clahe_clip_limit']}") | |
| if ep.get('sharpen_strength'): | |
| parts.append(f"Sharpen: {ep['sharpen_strength'].capitalize()}") | |
| if parts: | |
| story.append(Paragraph(f'Enhancement: {" | ".join(parts)}', small_style)) | |
| qh = item.get('quality_history', []) | |
| if len(qh) > 1: | |
| steps_str = ' -> '.join([f"{s['step'].replace('_', ' ').title()}: {s['quality']}" for s in qh]) | |
| story.append(Paragraph(f'Quality Steps: {steps_str}', small_style)) | |
| story.append(Spacer(1, 3*mm)) | |
| filepath = find_uploaded_file(item['file_id']) | |
| if filepath: | |
| try: | |
| image = load_image_from_path(filepath) | |
| if image is not None: | |
| h, w = image.shape[:2] | |
| max_w = 170 * mm | |
| aspect = h / w | |
| img_w = min(max_w, 170 * mm) | |
| img_h = img_w * aspect | |
| if img_h > 80 * mm: | |
| img_h = 80 * mm | |
| img_w = img_h / aspect | |
| orig_buf = io.BytesIO() | |
| _, enc = cv2.imencode('.png', image) | |
| orig_buf.write(enc.tobytes()) | |
| orig_buf.seek(0) | |
| story.append(Paragraph('Original Image', label_style)) | |
| story.append(Spacer(1, 2*mm)) | |
| story.append(RLImage(orig_buf, width=img_w, height=img_h)) | |
| story.append(Spacer(1, 4*mm)) | |
| ep = item.get('enhancement_params', {}) | |
| processed = image.copy() | |
| if ep.get('denoise_strength') or ep.get('denoise_enabled', False): | |
| processed = denoise_image(processed, ep.get('denoise_strength', 'medium')) | |
| if ep.get('clahe_clip_limit') or ep.get('clahe_enabled', False): | |
| processed = apply_clahe(processed, clip_limit=ep.get('clahe_clip_limit', 2.5)) | |
| if ep.get('sharpen_strength') or ep.get('sharpen_enabled', False): | |
| processed = sharpen_image(processed, ep.get('sharpen_strength', 'medium')) | |
| if settings.get('pseudonymize'): | |
| processed = apply_pseudonymize(processed) | |
| proc_buf = io.BytesIO() | |
| _, enc2 = cv2.imencode('.png', processed) | |
| proc_buf.write(enc2.tobytes()) | |
| proc_buf.seek(0) | |
| story.append(Paragraph('Processed Image', label_style)) | |
| story.append(Spacer(1, 2*mm)) | |
| story.append(RLImage(proc_buf, width=img_w, height=img_h)) | |
| story.append(Spacer(1, 4*mm)) | |
| except Exception: | |
| story.append(Paragraph('(Images could not be loaded)', small_style)) | |
| pq = item.get('processed_quality', item.get('quality_metrics', {})) | |
| oq = item.get('original_quality', item.get('quality_metrics', {})) | |
| metrics_data = [ | |
| ['Metric', 'Original', 'Processed', 'Change'], | |
| ['Sharpness', str(oq.get('sharpness_score', 'N/A')), str(pq.get('sharpness_score', 'N/A')), | |
| str(round(pq.get('sharpness_score', 0) - oq.get('sharpness_score', 0), 1)) if 'sharpness_score' in pq and 'sharpness_score' in oq else 'N/A'], | |
| ['Contrast', str(oq.get('contrast_ratio', 'N/A')), str(pq.get('contrast_ratio', 'N/A')), | |
| str(round(pq.get('contrast_ratio', 0) - oq.get('contrast_ratio', 0), 3)) if 'contrast_ratio' in pq and 'contrast_ratio' in oq else 'N/A'], | |
| ['SNR (dB)', str(oq.get('snr_db', 'N/A')), str(pq.get('snr_db', 'N/A')), | |
| str(round(pq.get('snr_db', 0) - oq.get('snr_db', 0), 1)) if 'snr_db' in pq and 'snr_db' in oq else 'N/A'], | |
| ] | |
| metrics_table = Table(metrics_data, colWidths=[35*mm, 35*mm, 35*mm, 35*mm]) | |
| metrics_table.setStyle(TableStyle([ | |
| ('FONTSIZE', (0, 0), (-1, -1), 9), | |
| ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
| ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#2563eb')), | |
| ('TEXTCOLOR', (0, 0), (-1, 0), colors.white), | |
| ('ALIGN', (0, 0), (-1, -1), 'CENTER'), | |
| ('BOTTOMPADDING', (0, 0), (-1, -1), 4), | |
| ('TOPPADDING', (0, 0), (-1, -1), 4), | |
| ('LINEBELOW', (0, 0), (-1, -1), 0.5, colors.HexColor('#e2e8f0')), | |
| ])) | |
| story.append(metrics_table) | |
| story.append(Spacer(1, 10*mm)) | |
| story.append(Paragraph('Generated by InSono v1.0', small_style)) | |
| story.append(Paragraph('Research Use Only - Not for clinical diagnosis', small_style)) | |
| doc.build(story) | |
| buffer.seek(0) | |
| return send_file( | |
| buffer, | |
| mimetype='application/pdf', | |
| as_attachment=True, | |
| download_name=f'insono_batch_report_{datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")}.pdf' | |
| ) | |
| except Exception as e: | |
| return jsonify({'error': f'Error generating PDF: {str(e)}'}), 500 | |
| def preload_segmentation_model(): | |
| import threading | |
| def _load(): | |
| try: | |
| print("[Startup] Pre-loading MONAI segmentation model...") | |
| from models import preload_model | |
| model = preload_model() | |
| if model is not None: | |
| print("[Startup] MONAI model pre-loaded successfully") | |
| else: | |
| print("[Startup] MONAI model not available (will retry on first request)") | |
| except Exception as e: | |
| print(f"[Startup] MONAI pre-load failed: {e}") | |
| t = threading.Thread(target=_load, daemon=True) | |
| t.start() | |
| if __name__ == '__main__': | |
| import sys | |
| port = int(os.environ.get('PORT', sys.argv[1] if len(sys.argv) > 1 else 7860)) | |
| preload_segmentation_model() | |
| app.run(host='0.0.0.0', port=port, debug=True) | |