InSono / main.py
MilicMilos
Replace medical image segmentation model with a new framework
8122dbe
import os
import io
import base64
import uuid
import time
import requests as http_requests
from datetime import datetime, timezone
from flask import Flask, request, jsonify, send_from_directory, send_file
import cv2
import numpy as np
import pydicom
from PIL import Image
from reportlab.lib.pagesizes import A4
from reportlab.lib.units import mm
from reportlab.lib import colors
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image as RLImage, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_CENTER, TA_LEFT
app = Flask(__name__, static_folder='static', static_url_path='/static')
UPLOAD_FOLDER = 'uploads'
PROCESSED_FOLDER = 'processed'
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(PROCESSED_FOLDER, exist_ok=True)
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'dcm'}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_file_extension(filename):
return filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
def dicom_to_image(dicom_path):
"""Convert DICOM file to numpy array image."""
ds = pydicom.dcmread(dicom_path)
pixel_array = ds.pixel_array
# Normalize to 8-bit
if pixel_array.dtype != np.uint8:
pixel_array = ((pixel_array - pixel_array.min()) /
(pixel_array.max() - pixel_array.min()) * 255).astype(np.uint8)
# Convert to 3 channel if grayscale
if len(pixel_array.shape) == 2:
pixel_array = cv2.cvtColor(pixel_array, cv2.COLOR_GRAY2BGR)
return pixel_array, ds
def extract_dicom_metadata(ds):
"""Extract relevant DICOM metadata."""
metadata = {}
# Basic tags
if hasattr(ds, 'Modality'):
metadata['Modality'] = str(ds.Modality)
if hasattr(ds, 'StudyDate'):
# Format date nicely if possible
date_str = str(ds.StudyDate)
if len(date_str) == 8:
metadata['StudyDate'] = f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}"
else:
metadata['StudyDate'] = date_str
if hasattr(ds, 'SeriesDescription'):
metadata['SeriesDescription'] = str(ds.SeriesDescription)
if hasattr(ds, 'Manufacturer'):
metadata['Manufacturer'] = str(ds.Manufacturer)
if hasattr(ds, 'ManufacturerModelName'):
metadata['EquipmentModel'] = str(ds.ManufacturerModelName)
if hasattr(ds, 'InstitutionName'):
metadata['InstitutionName'] = str(ds.InstitutionName)
if hasattr(ds, 'PatientID'):
metadata['PatientID'] = '[ANONYMIZED]'
if hasattr(ds, 'PatientName'):
metadata['PatientName'] = '[ANONYMIZED]'
return metadata
def apply_gaussian_blur(image, kernel_size=5):
"""Apply Gaussian blur for noise reduction."""
if kernel_size % 2 == 0:
kernel_size += 1
return cv2.GaussianBlur(image, (kernel_size, kernel_size), 0)
def apply_clahe(image, clip_limit=2.0, tile_grid_size=8):
"""Apply CLAHE (Contrast Limited Adaptive Histogram Equalization)."""
# Convert to LAB color space
if len(image.shape) == 3 and image.shape[2] == 3:
lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
l, a, b = cv2.split(lab)
# Apply CLAHE to L channel
clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(tile_grid_size, tile_grid_size))
l = clahe.apply(l)
# Merge channels
lab = cv2.merge([l, a, b])
return cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
else:
clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(tile_grid_size, tile_grid_size))
return clahe.apply(image)
def apply_pseudonymize(image, box_width=80, box_height=40):
"""Apply black box to top-left corner for patient privacy.
Applies a fixed-size black rectangle at position (0,0).
Box size is exactly 80px wide x 40px tall, regardless of image size.
"""
result = image.copy()
img_height, img_width = result.shape[:2]
# Ensure box doesn't exceed image bounds
actual_width = min(box_width, img_width)
actual_height = min(box_height, img_height)
# Apply black box at top-left corner (0,0)
result[0:actual_height, 0:actual_width] = 0
return result
def image_to_base64(image):
"""Convert numpy array to base64 string."""
_, buffer = cv2.imencode('.png', image)
return base64.b64encode(buffer).decode('utf-8')
def calculate_quality_metrics(image):
"""Calculate image quality metrics: sharpness, contrast, and SNR."""
# Convert to grayscale if color
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image
gray_float = gray.astype(np.float64)
# 1. Sharpness Score (Laplacian variance method)
laplacian = cv2.Laplacian(gray, cv2.CV_64F)
laplacian_var = laplacian.var()
# Normalize to 0-100 scale (typical range 0-2000 for ultrasound)
sharpness_raw = min(laplacian_var, 2000)
sharpness_score = round((sharpness_raw / 2000) * 100, 1)
# Determine sharpness rating
if sharpness_score >= 80:
sharpness_rating = "Excellent"
elif sharpness_score >= 60:
sharpness_rating = "Good"
elif sharpness_score >= 40:
sharpness_rating = "Fair"
else:
sharpness_rating = "Poor"
# 2. Contrast Ratio (std / mean)
mean_val = gray_float.mean()
std_val = gray_float.std()
contrast_ratio = round(std_val / mean_val, 2) if mean_val > 0 else 0
# 3. SNR Estimate (mean / std in dB)
if std_val > 0:
snr_linear = mean_val / std_val
snr_db = round(20 * np.log10(snr_linear), 1) if snr_linear > 0 else 0
else:
snr_db = 0
return {
'sharpness_score': sharpness_score,
'sharpness_rating': sharpness_rating,
'sharpness_raw': round(laplacian_var, 2),
'contrast_ratio': contrast_ratio,
'snr_db': snr_db,
'mean': round(mean_val, 2),
'std': round(std_val, 2)
}
batch_storage = {}
def srad_denoise(image, iterations=3, delta_t=0.08, kappa=40):
if len(image.shape) == 3:
channels = cv2.split(image)
denoised_channels = [srad_denoise(ch, iterations, delta_t, kappa) for ch in channels]
return cv2.merge(denoised_channels)
img = image.astype(np.float64)
for _ in range(iterations):
grad_n = np.roll(img, -1, axis=0) - img
grad_s = np.roll(img, 1, axis=0) - img
grad_e = np.roll(img, -1, axis=1) - img
grad_w = np.roll(img, 1, axis=1) - img
grad_mag = np.sqrt(grad_n**2 + grad_s**2 + grad_e**2 + grad_w**2)
q = grad_mag / (img + 1e-10)
q0 = np.sqrt(np.mean(q**2))
c = 1.0 / (1.0 + ((q**2 - q0**2) / (q0**2 * (1 + q0**2) + 1e-10)))
img = img + delta_t * (
c * grad_n + c * grad_s + c * grad_e + c * grad_w
)
return np.clip(img, 0, 255).astype(np.uint8)
def denoise_image(image, strength='medium'):
if strength == 'none' or strength is None:
return image.copy()
params = {
'light': {'iterations': 2, 'delta_t': 0.05, 'kappa': 50},
'medium': {'iterations': 3, 'delta_t': 0.08, 'kappa': 40},
'strong': {'iterations': 5, 'delta_t': 0.1, 'kappa': 30}
}
p = params.get(strength, params['medium'])
return srad_denoise(image, iterations=p['iterations'], delta_t=p['delta_t'], kappa=p['kappa'])
def sharpen_image(image, strength='medium'):
if strength == 'none' or strength is None:
return image.copy()
amounts = {'light': 1.0, 'medium': 1.5, 'strong': 2.0}
amount = amounts.get(strength, 1.5)
blurred = cv2.GaussianBlur(image, (0, 0), 3)
sharpened = cv2.addWeighted(image, 1.0 + amount, blurred, -amount, 0)
return sharpened
def calculate_image_quality_score(image):
metrics = calculate_quality_metrics(image)
score = calculate_overall_quality(metrics['sharpness_score'], metrics['contrast_ratio'], metrics['snr_db'])
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image
total_pixels = gray.size
black_pct = np.sum(gray == 0) / total_pixels
white_pct = np.sum(gray == 255) / total_pixels
if black_pct > 0.02:
score -= min(15, (black_pct - 0.02) * 500)
if white_pct > 0.02:
score -= min(15, (white_pct - 0.02) * 500)
unique_vals = len(np.unique(gray))
tonal_range = unique_vals / 256.0
if tonal_range < 0.5:
score -= (0.5 - tonal_range) * 20
score = max(0, round(score, 1))
return score, metrics
def apply_enhancement_pipeline(image, params):
result = image.copy()
quality_history = []
orig_score, orig_metrics = calculate_image_quality_score(result)
quality_history.append({'step': 'original', 'quality': orig_score})
denoise_strength = params.get('denoise_strength', 'medium')
denoise_active = params.get('denoise_enabled', True) and denoise_strength and denoise_strength != 'none'
if denoise_active:
result = denoise_image(result, denoise_strength)
step_score, _ = calculate_image_quality_score(result)
quality_history.append({'step': 'after_denoise', 'quality': step_score})
if params.get('clahe_enabled', True):
result = apply_clahe(result, clip_limit=params.get('clahe_clip_limit', 2.5))
step_score, _ = calculate_image_quality_score(result)
quality_history.append({'step': 'after_clahe', 'quality': step_score})
sharpen_strength = params.get('sharpen_strength', 'medium')
sharpen_active = params.get('sharpen_enabled', True) and sharpen_strength and sharpen_strength != 'none'
if sharpen_active:
result = sharpen_image(result, sharpen_strength)
step_score, _ = calculate_image_quality_score(result)
quality_history.append({'step': 'after_sharpen', 'quality': step_score})
final_score, final_metrics = calculate_image_quality_score(result)
return {
'enhanced_image': result,
'quality_history': quality_history,
'quality_original': orig_score,
'quality_final': final_score,
'improvement': round(final_score - orig_score, 1),
'params': params,
'final_metrics': final_metrics
}
def quick_enhance_pipeline(image):
result = image.copy()
quality_history = []
orig_score, orig_metrics = calculate_image_quality_score(result)
quality_history.append({'step': 'original', 'quality': orig_score})
result = srad_denoise(result, iterations=3, delta_t=0.08, kappa=40)
step_score, _ = calculate_image_quality_score(result)
quality_history.append({'step': 'after_denoise', 'quality': step_score})
result = apply_clahe(result, clip_limit=2.0)
step_score, _ = calculate_image_quality_score(result)
quality_history.append({'step': 'after_clahe', 'quality': step_score})
blurred = cv2.GaussianBlur(result, (0, 0), 3)
result = cv2.addWeighted(result, 1.5, blurred, -0.5, 0)
step_score, _ = calculate_image_quality_score(result)
quality_history.append({'step': 'after_sharpen', 'quality': step_score})
final_score, final_metrics = calculate_image_quality_score(result)
return {
'enhanced_image': result,
'quality_history': quality_history,
'quality_original': orig_score,
'quality_final': final_score,
'improvement': round(final_score - orig_score, 1),
'params': {
'denoise_method': 'SRAD',
'denoise_strength': 'medium',
'denoise_iterations': 10,
'clahe_clip_limit': 2.0,
'clahe_tile_grid': '8x8',
'sharpen_method': 'unsharp_mask',
'sharpen_alpha': 1.5,
'sharpen_beta': -0.5
},
'final_metrics': final_metrics
}
from models import segment_image, is_segmentation_available
from utils.image_processing import render_overlay
def auto_optimize_pipeline(image, optimize_individually=True):
orig_score, _ = calculate_image_quality_score(image)
best_score = orig_score
best_result = image.copy()
best_params = {'denoise_strength': 'none', 'clahe_clip_limit': 1.3, 'sharpen_strength': 'none'}
best_history = [{'step': 'original', 'quality': orig_score}]
denoise_levels = ['none', 'light']
clahe_limits = [1.1, 1.3, 1.5]
sharpen_levels = ['none', 'light']
for denoise in denoise_levels:
denoised = denoise_image(image, denoise)
for clip_limit in clahe_limits:
enhanced = apply_clahe(denoised, clip_limit=clip_limit)
for sharpen in sharpen_levels:
final = sharpen_image(enhanced, sharpen)
score, metrics = calculate_image_quality_score(final)
if score > best_score:
best_score = score
best_result = final
best_params = {
'denoise_strength': denoise,
'clahe_clip_limit': clip_limit,
'sharpen_strength': sharpen
}
current = image.copy()
best_history = [{'step': 'original', 'quality': orig_score}]
if best_params['denoise_strength'] != 'none':
current = denoise_image(current, best_params['denoise_strength'])
d_score, _ = calculate_image_quality_score(current)
best_history.append({'step': 'after_denoise', 'quality': d_score})
current = apply_clahe(current, clip_limit=best_params['clahe_clip_limit'])
c_score, _ = calculate_image_quality_score(current)
best_history.append({'step': 'after_clahe', 'quality': c_score})
if best_params['sharpen_strength'] != 'none':
current = sharpen_image(current, best_params['sharpen_strength'])
s_score, _ = calculate_image_quality_score(current)
best_history.append({'step': 'after_sharpen', 'quality': s_score})
final_score, final_metrics = calculate_image_quality_score(best_result)
return {
'enhanced_image': best_result,
'quality_history': best_history,
'quality_original': orig_score,
'quality_final': final_score,
'improvement': round(final_score - orig_score, 1),
'params': best_params,
'final_metrics': final_metrics
}
def guided_optimize(image, philosophy='balanced'):
ranges = {
'subtle': {
'denoise': ['none'],
'clahe': [1.0, 1.2],
'sharpen': ['none']
},
'balanced': {
'denoise': ['light'],
'clahe': [1.2, 1.4],
'sharpen': ['light']
},
'aggressive': {
'denoise': ['light'],
'clahe': [1.5, 1.8],
'sharpen': ['light']
}
}
param_range = ranges.get(philosophy, ranges['balanced'])
orig_score, _ = calculate_image_quality_score(image)
best_score = orig_score
best_result = image.copy()
best_params = {
'denoise_strength': param_range['denoise'][0],
'clahe_clip_limit': param_range['clahe'][0],
'sharpen_strength': param_range['sharpen'][0]
}
for denoise in param_range['denoise']:
denoised = denoise_image(image, denoise)
for clip_limit in param_range['clahe']:
enhanced = apply_clahe(denoised, clip_limit=clip_limit)
for sharpen in param_range['sharpen']:
final = sharpen_image(enhanced, sharpen)
score, _ = calculate_image_quality_score(final)
if score > best_score:
best_score = score
best_result = final
best_params = {
'denoise_strength': denoise,
'clahe_clip_limit': clip_limit,
'sharpen_strength': sharpen
}
current = image.copy()
best_history = [{'step': 'original', 'quality': orig_score}]
if best_params['denoise_strength'] != 'none':
current = denoise_image(current, best_params['denoise_strength'])
d_score, _ = calculate_image_quality_score(current)
best_history.append({'step': 'after_denoise', 'quality': d_score})
current = apply_clahe(current, clip_limit=best_params['clahe_clip_limit'])
c_score, _ = calculate_image_quality_score(current)
best_history.append({'step': 'after_clahe', 'quality': c_score})
if best_params['sharpen_strength'] != 'none':
current = sharpen_image(current, best_params['sharpen_strength'])
s_score, _ = calculate_image_quality_score(current)
best_history.append({'step': 'after_sharpen', 'quality': s_score})
final_score, final_metrics = calculate_image_quality_score(best_result)
return {
'enhanced_image': best_result,
'quality_history': best_history,
'quality_original': orig_score,
'quality_final': final_score,
'improvement': round(final_score - orig_score, 1),
'params': best_params,
'philosophy': philosophy,
'final_metrics': final_metrics
}
def calculate_overall_quality(sharpness, contrast, snr):
sharpness_norm = min(100, sharpness)
contrast_norm = min(100, contrast * 100)
snr_norm = min(100, max(0, (snr / 10) * 100))
score = (sharpness_norm * 0.4 + contrast_norm * 0.3 + snr_norm * 0.3)
return round(score, 1)
def get_quality_label(score):
if score >= 80:
return "Excellent"
elif score >= 60:
return "Good"
elif score >= 40:
return "Fair"
else:
return "Poor"
def find_uploaded_file(file_id):
for ext in ALLOWED_EXTENSIONS:
potential_path = os.path.join(UPLOAD_FOLDER, f"{file_id}.{ext}")
if os.path.exists(potential_path):
return potential_path
return None
def load_image_from_path(filepath):
file_ext = get_file_extension(filepath)
if file_ext == 'dcm':
image, _ = dicom_to_image(filepath)
else:
image = cv2.imread(filepath)
return image
def make_thumbnail_base64(image, size=100):
h, w = image.shape[:2]
scale = size / max(h, w)
new_w, new_h = int(w * scale), int(h * scale)
thumb = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
_, buffer = cv2.imencode('.png', thumb)
return base64.b64encode(buffer).decode('utf-8')
@app.route('/api/health', methods=['GET'])
def health_check():
return jsonify({
'status': 'ok',
'message': 'InSono backend is ready',
'timestamp': datetime.now(timezone.utc).isoformat()
}), 200
@app.route('/')
def index():
return send_from_directory('static', 'index.html')
@app.route('/api/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
if not allowed_file(file.filename):
return jsonify({'error': 'File type not supported. Please upload PNG, JPG, or DICOM files.'}), 400
# Check file size
file.seek(0, 2)
file_size = file.tell()
file.seek(0)
if file_size > MAX_FILE_SIZE:
return jsonify({'error': 'File size exceeds 50MB limit'}), 400
# Generate unique filename
file_ext = get_file_extension(file.filename)
unique_id = str(uuid.uuid4())
filename = f"{unique_id}.{file_ext}"
filepath = os.path.join(UPLOAD_FOLDER, filename)
file.save(filepath)
try:
# Process based on file type
is_dicom = file_ext == 'dcm'
dicom_metadata = {}
if is_dicom:
image, ds = dicom_to_image(filepath)
dicom_metadata = extract_dicom_metadata(ds)
else:
image = cv2.imread(filepath)
if image is None:
os.remove(filepath)
return jsonify({'error': 'Could not read image file'}), 400
# Get image dimensions and color space
height, width = image.shape[:2]
color_space = 'RGB' if len(image.shape) == 3 and image.shape[2] == 3 else 'Grayscale'
# Convert to base64 for preview
image_base64 = image_to_base64(image)
# Calculate quality metrics for original image
quality_metrics = calculate_quality_metrics(image)
return jsonify({
'success': True,
'file_id': unique_id,
'filename': file.filename,
'file_size': file_size,
'width': width,
'height': height,
'color_space': color_space,
'is_dicom': is_dicom,
'dicom_metadata': dicom_metadata,
'image': image_base64,
'quality_metrics': quality_metrics
})
except Exception as e:
if os.path.exists(filepath):
os.remove(filepath)
return jsonify({'error': f'Error processing file: {str(e)}'}), 500
@app.route('/api/process', methods=['POST'])
def process_image():
start_time = time.time()
data = request.json
if not data or 'file_id' not in data:
return jsonify({'error': 'No file ID provided'}), 400
file_id = data['file_id']
mode = data.get('mode', 'legacy')
pseudonymize = data.get('pseudonymize', False)
filepath = None
for ext in ALLOWED_EXTENSIONS:
potential_path = os.path.join(UPLOAD_FOLDER, f"{file_id}.{ext}")
if os.path.exists(potential_path):
filepath = potential_path
break
if not filepath:
return jsonify({'error': 'File not found'}), 404
try:
file_ext = get_file_extension(filepath)
if file_ext == 'dcm':
image, _ = dicom_to_image(filepath)
else:
image = cv2.imread(filepath)
if image is None:
return jsonify({'error': 'Could not read image'}), 500
processed = image.copy()
processing_log = []
quality_history = []
enhancement_params = {}
result = None
if mode == 'quick':
result = quick_enhance_pipeline(image)
processed = result['enhanced_image']
enhancement_params = result['params']
quality_history = result['quality_history']
processing_log.append("Quick Enhancement Pipeline (Medical Ultrasound)")
processing_log.append("SRAD Denoising (3 iterations, ultrasound-optimized)")
processing_log.append("CLAHE (clipLimit=2.0, tileGrid=8x8)")
processing_log.append("Unsharp Mask (alpha=1.5, beta=-0.5)")
elif mode == 'advanced':
custom_params = data.get('custom_params', {})
denoise_enabled = custom_params.get('denoise_enabled', True)
denoise_strength = custom_params.get('denoise_strength', 'medium')
clahe_enabled = custom_params.get('clahe_enabled', True)
clahe_clip_limit = custom_params.get('clahe_clip_limit', 2.5)
sharpen_enabled = custom_params.get('sharpen_enabled', True)
sharpen_strength = custom_params.get('sharpen_strength', 'medium')
pipeline_params = {
'denoise_enabled': denoise_enabled,
'denoise_strength': denoise_strength,
'clahe_enabled': clahe_enabled,
'clahe_clip_limit': clahe_clip_limit,
'sharpen_enabled': sharpen_enabled,
'sharpen_strength': sharpen_strength
}
result = apply_enhancement_pipeline(image, pipeline_params)
processed = result['enhanced_image']
enhancement_params = {
'denoise_strength': denoise_strength if denoise_enabled else 'none',
'clahe_clip_limit': clahe_clip_limit if clahe_enabled else None,
'sharpen_strength': sharpen_strength if sharpen_enabled else 'none'
}
quality_history = result['quality_history']
if denoise_enabled:
h_values = {'light': 5, 'medium': 10, 'strong': 15}
processing_log.append(f"Denoising ({denoise_strength}, h={h_values.get(denoise_strength, 10)})")
if clahe_enabled:
processing_log.append(f"CLAHE (clip limit: {clahe_clip_limit})")
if sharpen_enabled:
amounts = {'light': 1.0, 'medium': 1.5, 'strong': 2.0}
processing_log.append(f"Sharpening ({sharpen_strength}, amount={amounts.get(sharpen_strength, 1.5)})")
else:
blur_amount = data.get('blur_amount', 0)
clahe_enabled = data.get('clahe_enabled', False)
clahe_clip_limit = data.get('clahe_clip_limit', 2.0)
if blur_amount > 0:
kernel_size = int(blur_amount * 2) + 1
processed = apply_gaussian_blur(processed, kernel_size)
processing_log.append(f"Gaussian Blur (kernel: {kernel_size}x{kernel_size})")
if clahe_enabled:
processed = apply_clahe(processed, clip_limit=clahe_clip_limit)
processing_log.append(f"CLAHE (clip limit: {clahe_clip_limit})")
if pseudonymize:
processed = apply_pseudonymize(processed)
processing_log.append("Pseudonymization (80x40px)")
processed_base64 = image_to_base64(processed)
original_quality = calculate_quality_metrics(image)
processed_quality = calculate_quality_metrics(processed)
sharpness_improvement = round(processed_quality['sharpness_score'] - original_quality['sharpness_score'], 1)
contrast_improvement = round(((processed_quality['contrast_ratio'] - original_quality['contrast_ratio']) / original_quality['contrast_ratio'] * 100) if original_quality['contrast_ratio'] > 0 else 0, 1)
snr_improvement = round(processed_quality['snr_db'] - original_quality['snr_db'], 1)
processing_duration = round(time.time() - start_time, 3)
processing_timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
original_overall = calculate_overall_quality(
original_quality['sharpness_score'], original_quality['contrast_ratio'], original_quality['snr_db']
)
processed_overall = calculate_overall_quality(
processed_quality['sharpness_score'], processed_quality['contrast_ratio'], processed_quality['snr_db']
)
response_data = {
'success': True,
'processed_image': processed_base64,
'processing_log': processing_log,
'processing_duration': processing_duration,
'processing_timestamp': processing_timestamp,
'original_quality': original_quality,
'processed_quality': processed_quality,
'quality_improvement': {
'sharpness': sharpness_improvement,
'contrast_percent': contrast_improvement,
'snr': snr_improvement
},
'mode': mode,
'enhancement_params': enhancement_params,
'quality_history': quality_history,
'original_overall': original_overall,
'processed_overall': processed_overall
}
return jsonify(response_data)
except Exception as e:
return jsonify({'error': f'Error processing image: {str(e)}'}), 500
@app.route('/api/download', methods=['POST'])
def download_image():
data = request.json
if not data or 'image' not in data:
return jsonify({'error': 'No image data provided'}), 400
try:
image_data = base64.b64decode(data['image'])
return send_file(
io.BytesIO(image_data),
mimetype='image/png',
as_attachment=True,
download_name='processed_ultrasound.png'
)
except Exception as e:
return jsonify({'error': f'Error downloading: {str(e)}'}), 500
segmentation_storage = {}
@app.route('/api/ai-segment', methods=['POST'])
def ai_segment():
data = request.json
if not data or 'file_id' not in data:
return jsonify({'error': 'No file_id provided'}), 400
file_id = data['file_id']
x_pct = data.get('x', 50)
y_pct = data.get('y', 50)
print(f"[API] ai-segment: file_id={file_id}, x_pct={x_pct}, y_pct={y_pct}")
filepath = find_uploaded_file(file_id)
if not filepath:
return jsonify({'error': 'File not found'}), 404
try:
image = load_image_from_path(filepath)
if image is None:
return jsonify({'error': 'Could not read image'}), 500
img_h, img_w = image.shape[:2]
pixel_x = int(float(x_pct) / 100.0 * img_w)
pixel_y = int(float(y_pct) / 100.0 * img_h)
pixel_x = max(0, min(pixel_x, img_w - 1))
pixel_y = max(0, min(pixel_y, img_h - 1))
total_pixels = img_h * img_w
print(f"[API] Image: {img_w}x{img_h}, click pixel: ({pixel_x}, {pixel_y})")
result = segment_image(image, pixel_x, pixel_y)
if not result.get('success'):
error_msg = result.get('message', 'Segmentation failed')
error_type = result.get('error_type', 'unknown')
print(f"[API] Segmentation failed: {error_type}{error_msg}")
return jsonify({
'error': error_msg,
'error_type': error_type
}), 503
masks_list = result['masks']
method = result['method']
masks_list.sort(key=lambda m: m['area'])
while len(masks_list) < 3:
masks_list.append(masks_list[-1])
masks_list = masks_list[:3]
default_idx = min(1, len(masks_list) - 1)
segmentation_storage[file_id] = {
'masks': masks_list,
'image': image
}
overlays = []
for i, m in enumerate(masks_list):
area_pct = round((m['area'] / total_pixels) * 100, 2)
overlays.append({
'segmented_image': render_overlay(image, m['mask']),
'mask_area_percent': area_pct,
'score': m['score'],
'index': i
})
selected_area_pct = round((masks_list[default_idx]['area'] / total_pixels) * 100, 2)
print(f"[API] Segmentation success: {len(overlays)} masks, method={method}")
return jsonify({
'success': True,
'segmented_image': overlays[default_idx]['segmented_image'],
'method': method,
'mask_area_percent': selected_area_pct,
'selected_index': default_idx,
'masks': overlays,
'mask_count': len(overlays)
})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({'error': f'Error during segmentation: {str(e)}'}), 500
@app.route('/api/ai-segment-select', methods=['POST'])
def ai_segment_select():
data = request.json
if not data or 'file_id' not in data or 'mask_index' not in data:
return jsonify({'error': 'file_id and mask_index required'}), 400
file_id = data['file_id']
mask_index = int(data['mask_index'])
if file_id not in segmentation_storage:
return jsonify({'error': 'No segmentation data found. Please segment again.'}), 404
stored = segmentation_storage[file_id]
masks_list = stored['masks']
image = stored['image']
if mask_index < 0 or mask_index >= len(masks_list):
return jsonify({'error': f'Invalid mask index. Valid: 0-{len(masks_list)-1}'}), 400
selected = masks_list[mask_index]
total_pixels = image.shape[0] * image.shape[1]
area_pct = round((selected['area'] / total_pixels) * 100, 2)
return jsonify({
'success': True,
'segmented_image': render_overlay(image, selected['mask']),
'mask_area_percent': area_pct,
'selected_index': mask_index
})
@app.route('/api/segmentation-status', methods=['GET'])
def segmentation_status():
available = is_segmentation_available()
import psutil
mem = psutil.virtual_memory()
return jsonify({
'model': 'MONAI UNet',
'available': available,
'ram': {
'total_gb': round(mem.total / (1024**3), 1),
'available_gb': round(mem.available / (1024**3), 1),
'sufficient': True
}
})
@app.route('/api/segmentation-methods', methods=['GET'])
def segmentation_methods_legacy():
available = is_segmentation_available()
return jsonify({
'methods': [{
'name': 'monai',
'available': available,
'tier': 1,
'description': 'MONAI UNet - medical image segmentation'
}]
})
@app.route('/api/batch-upload', methods=['POST'])
def batch_upload():
files = request.files.getlist('files')
if not files or len(files) == 0:
return jsonify({'error': 'No files provided'}), 400
if len(files) > 50:
return jsonify({'error': 'Maximum 50 files allowed per batch'}), 400
batch_id = str(uuid.uuid4())
results = []
errors = []
for file in files:
if file.filename == '':
continue
if not allowed_file(file.filename):
errors.append({'filename': file.filename, 'error': 'Unsupported file type'})
continue
file.seek(0, 2)
file_size = file.tell()
file.seek(0)
if file_size > MAX_FILE_SIZE:
errors.append({'filename': file.filename, 'error': 'File exceeds 50MB limit'})
continue
file_ext = get_file_extension(file.filename)
unique_id = str(uuid.uuid4())
filename = f"{unique_id}.{file_ext}"
filepath = os.path.join(UPLOAD_FOLDER, filename)
file.save(filepath)
try:
is_dicom = file_ext == 'dcm'
if is_dicom:
image, ds = dicom_to_image(filepath)
else:
image = cv2.imread(filepath)
if image is None:
os.remove(filepath)
errors.append({'filename': file.filename, 'error': 'Could not read image'})
continue
height, width = image.shape[:2]
color_space = 'RGB' if len(image.shape) == 3 and image.shape[2] == 3 else 'Grayscale'
quality = calculate_quality_metrics(image)
overall_score = calculate_overall_quality(
quality['sharpness_score'], quality['contrast_ratio'], quality['snr_db']
)
thumbnail = make_thumbnail_base64(image)
result = {
'file_id': unique_id,
'filename': file.filename,
'file_size': file_size,
'width': width,
'height': height,
'color_space': color_space,
'is_dicom': is_dicom,
'quality_metrics': quality,
'overall_quality': overall_score,
'quality_label': get_quality_label(overall_score),
'thumbnail': thumbnail,
'status': 'uploaded'
}
results.append(result)
except Exception as e:
if os.path.exists(filepath):
os.remove(filepath)
errors.append({'filename': file.filename, 'error': str(e)})
batch_storage[batch_id] = {
'items': results,
'created': datetime.now(timezone.utc).isoformat(),
'processed': False
}
return jsonify({
'success': True,
'batch_id': batch_id,
'uploaded': len(results),
'failed': len(errors),
'results': results,
'errors': errors
})
@app.route('/api/batch-process', methods=['POST'])
def batch_process():
start_time = time.time()
data = request.json
if not data or 'batch_id' not in data:
return jsonify({'error': 'No batch_id provided'}), 400
batch_id = data['batch_id']
if batch_id not in batch_storage:
return jsonify({'error': 'Batch not found'}), 404
mode = data.get('mode', 'auto')
pseudonymize = data.get('pseudonymize', False)
optimize_individually = data.get('optimize_individually', True)
philosophy = data.get('philosophy', 'balanced')
auto_optimize_guided = data.get('auto_optimize_guided', True)
custom_params = data.get('custom_params', {})
batch = batch_storage[batch_id]
processed_count = 0
error_count = 0
shared_auto_params = None
denoise_contributions = []
clahe_contributions = []
sharpen_contributions = []
for item in batch['items']:
filepath = find_uploaded_file(item['file_id'])
if not filepath:
item['status'] = 'error'
item['error'] = 'File not found on disk'
error_count += 1
continue
try:
image = load_image_from_path(filepath)
if image is None:
item['status'] = 'error'
item['error'] = 'Could not read image'
error_count += 1
continue
if mode == 'ai' or mode == 'auto':
if optimize_individually or shared_auto_params is None:
result = auto_optimize_pipeline(image)
if not optimize_individually and shared_auto_params is None:
shared_auto_params = result['params']
else:
result = apply_enhancement_pipeline(image, {
'denoise_enabled': True,
'denoise_strength': shared_auto_params['denoise_strength'],
'clahe_enabled': True,
'clahe_clip_limit': shared_auto_params['clahe_clip_limit'],
'sharpen_enabled': True,
'sharpen_strength': shared_auto_params['sharpen_strength']
})
elif mode == 'guided':
if auto_optimize_guided:
result = guided_optimize(image, philosophy)
else:
preset_params = {
'subtle': {'denoise_strength': 'light', 'clahe_clip_limit': 1.5, 'sharpen_strength': 'light'},
'balanced': {'denoise_strength': 'medium', 'clahe_clip_limit': 2.5, 'sharpen_strength': 'medium'},
'aggressive': {'denoise_strength': 'strong', 'clahe_clip_limit': 4.0, 'sharpen_strength': 'strong'}
}
p = preset_params.get(philosophy, preset_params['balanced'])
result = apply_enhancement_pipeline(image, {
'denoise_enabled': True, 'denoise_strength': p['denoise_strength'],
'clahe_enabled': True, 'clahe_clip_limit': p['clahe_clip_limit'],
'sharpen_enabled': True, 'sharpen_strength': p['sharpen_strength']
})
else:
result = apply_enhancement_pipeline(image, {
'denoise_enabled': custom_params.get('denoise_enabled', True),
'denoise_strength': custom_params.get('denoise_strength', 'medium'),
'clahe_enabled': custom_params.get('clahe_enabled', True),
'clahe_clip_limit': custom_params.get('clahe_clip_limit', 2.5),
'sharpen_enabled': custom_params.get('sharpen_enabled', True),
'sharpen_strength': custom_params.get('sharpen_strength', 'medium')
})
processed_img = result['enhanced_image']
if pseudonymize:
processed_img = apply_pseudonymize(processed_img)
processed_quality = result['final_metrics']
orig_score = result['quality_original']
proc_score = result['quality_final']
qh = result['quality_history']
if len(qh) >= 2:
denoise_step = next((s for s in qh if s['step'] == 'after_denoise'), None)
clahe_step = next((s for s in qh if s['step'] == 'after_clahe'), None)
sharpen_step = next((s for s in qh if s['step'] == 'after_sharpen'), None)
orig_q = qh[0]['quality']
if denoise_step:
denoise_contributions.append(round(denoise_step['quality'] - orig_q, 1))
if clahe_step:
prev_q = denoise_step['quality'] if denoise_step else orig_q
clahe_contributions.append(round(clahe_step['quality'] - prev_q, 1))
if sharpen_step:
prev_q = clahe_step['quality'] if clahe_step else (denoise_step['quality'] if denoise_step else orig_q)
sharpen_contributions.append(round(sharpen_step['quality'] - prev_q, 1))
item['original_quality'] = calculate_quality_metrics(image)
item['processed_quality'] = processed_quality
item['original_overall'] = orig_score
item['processed_overall'] = proc_score
item['quality_improvement'] = result['improvement']
item['overall_quality'] = proc_score
item['quality_label'] = get_quality_label(proc_score)
item['processed_thumbnail'] = make_thumbnail_base64(processed_img)
item['enhancement_params'] = result['params']
item['quality_history'] = result['quality_history']
item['enhancement_mode'] = mode
item['status'] = 'processed'
processed_count += 1
except Exception as e:
item['status'] = 'error'
item['error'] = str(e)
error_count += 1
batch['processed'] = True
batch['settings'] = {
'mode': mode,
'pseudonymize': pseudonymize,
'philosophy': philosophy if mode == 'guided' else None,
'optimize_individually': optimize_individually if mode == 'auto' else None,
'custom_params': custom_params if mode == 'custom' else None,
}
processed_items = [i for i in batch['items'] if i['status'] == 'processed']
avg_quality = round(sum(i['processed_overall'] for i in processed_items) / len(processed_items), 1) if processed_items else 0
best_quality = max((i['processed_overall'] for i in processed_items), default=0)
avg_improvement = round(sum(i['quality_improvement'] for i in processed_items) / len(processed_items), 1) if processed_items else 0
avg_denoise = round(sum(denoise_contributions) / len(denoise_contributions), 1) if denoise_contributions else 0
avg_clahe = round(sum(clahe_contributions) / len(clahe_contributions), 1) if clahe_contributions else 0
avg_sharpen = round(sum(sharpen_contributions) / len(sharpen_contributions), 1) if sharpen_contributions else 0
duration = round(time.time() - start_time, 3)
return jsonify({
'success': True,
'batch_id': batch_id,
'processed_count': processed_count,
'error_count': error_count,
'processing_duration': duration,
'statistics': {
'total': len(batch['items']),
'processed': processed_count,
'errors': error_count,
'avg_quality': avg_quality,
'best_quality': best_quality,
'avg_improvement': avg_improvement,
'avg_denoise_contribution': avg_denoise,
'avg_clahe_contribution': avg_clahe,
'avg_sharpen_contribution': avg_sharpen
},
'results': batch['items']
})
@app.route('/api/filter-batch', methods=['POST'])
def filter_batch():
data = request.json
if not data or 'batch_id' not in data:
return jsonify({'error': 'No batch_id provided'}), 400
batch_id = data['batch_id']
if batch_id not in batch_storage:
return jsonify({'error': 'Batch not found'}), 404
threshold = data.get('threshold', 70)
top_n = data.get('top_n', None)
batch = batch_storage[batch_id]
processed_items = [i for i in batch['items'] if i['status'] == 'processed']
sorted_items = sorted(processed_items, key=lambda x: x.get('processed_overall', x.get('overall_quality', 0)), reverse=True)
if top_n:
included = sorted_items[:top_n]
excluded = sorted_items[top_n:]
else:
included = [i for i in sorted_items if i.get('processed_overall', i.get('overall_quality', 0)) >= threshold]
excluded = [i for i in sorted_items if i.get('processed_overall', i.get('overall_quality', 0)) < threshold]
avg_quality = round(sum(i.get('processed_overall', i.get('overall_quality', 0)) for i in processed_items) / len(processed_items), 1) if processed_items else 0
best_quality = max((i.get('processed_overall', i.get('overall_quality', 0)) for i in processed_items), default=0)
return jsonify({
'success': True,
'included': included,
'excluded': excluded,
'statistics': {
'total_count': len(processed_items),
'passed_count': len(included),
'failed_count': len(excluded),
'average_quality': avg_quality,
'best_quality': best_quality
}
})
@app.route('/api/batch-report', methods=['POST'])
def batch_report():
data = request.json
if not data or 'batch_id' not in data:
return jsonify({'error': 'No batch_id provided'}), 400
batch_id = data['batch_id']
if batch_id not in batch_storage:
return jsonify({'error': 'Batch not found'}), 404
batch = batch_storage[batch_id]
file_ids = data.get('file_ids', None)
if file_ids:
items = [i for i in batch['items'] if i['file_id'] in file_ids and i['status'] == 'processed']
else:
items = [i for i in batch['items'] if i['status'] == 'processed']
items = sorted(items, key=lambda x: x.get('processed_overall', x.get('overall_quality', 0)), reverse=True)
if not items:
return jsonify({'error': 'No processed images to include in report'}), 400
try:
buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=A4,
topMargin=15*mm, bottomMargin=15*mm,
leftMargin=15*mm, rightMargin=15*mm)
styles = getSampleStyleSheet()
story = []
section_style = ParagraphStyle('Section', parent=styles['Heading2'],
fontSize=14, textColor=colors.white,
backColor=colors.HexColor('#2563eb'),
borderPadding=(4, 6, 4, 6),
spaceAfter=4*mm, spaceBefore=6*mm)
label_style = ParagraphStyle('Label', parent=styles['Normal'],
fontSize=10, textColor=colors.HexColor('#334155'))
small_style = ParagraphStyle('Small', parent=styles['Normal'],
fontSize=8, textColor=colors.HexColor('#94a3b8'))
ts = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')
timestamp_style = ParagraphStyle('Timestamp', parent=styles['Normal'],
fontSize=10, textColor=colors.HexColor('#94a3b8'),
spaceAfter=4*mm)
story.append(Paragraph(f'Generated: {ts}', timestamp_style))
story.append(Paragraph('BATCH SUMMARY', section_style))
settings = batch.get('settings', {})
all_processed = [i for i in batch['items'] if i['status'] == 'processed']
avg_q = round(sum(i.get('processed_overall', i.get('overall_quality', 0)) for i in all_processed) / len(all_processed), 1) if all_processed else 0
best_q = max((i.get('processed_overall', i.get('overall_quality', 0)) for i in all_processed), default=0)
avg_imp = round(sum(i.get('quality_improvement', 0) for i in all_processed) / len(all_processed), 1) if all_processed else 0
mode_label = {'auto': 'Auto Optimize', 'guided': 'Guided', 'custom': 'Custom'}.get(settings.get('mode', 'auto'), 'Auto Optimize')
summary_data = [
['Total Images', str(len(batch['items']))],
['Processed', str(len(all_processed))],
['Enhancement Mode', mode_label],
['Avg Quality Score', f'{avg_q}/100'],
['Best Quality Score', f'{best_q}/100'],
['Avg Improvement', f'+{avg_imp}' if avg_imp >= 0 else str(avg_imp)],
['Pseudonymized', 'Yes' if settings.get('pseudonymize') else 'No'],
]
if settings.get('mode') == 'guided' and settings.get('philosophy'):
summary_data.insert(4, ['Philosophy', settings['philosophy'].capitalize()])
if settings.get('mode') == 'custom' and settings.get('custom_params'):
cp = settings['custom_params']
if cp.get('denoise_enabled'):
summary_data.append(['Denoise', cp.get('denoise_strength', 'medium').capitalize()])
if cp.get('clahe_enabled'):
summary_data.append(['CLAHE Clip Limit', str(cp.get('clahe_clip_limit', 2.5))])
if cp.get('sharpen_enabled'):
summary_data.append(['Sharpen', cp.get('sharpen_strength', 'medium').capitalize()])
summary_table = Table(summary_data, colWidths=[55*mm, 50*mm])
summary_table.setStyle(TableStyle([
('FONTSIZE', (0, 0), (-1, -1), 9),
('FONTNAME', (0, 0), (0, -1), 'Helvetica-Bold'),
('TEXTCOLOR', (0, 0), (0, -1), colors.HexColor('#334155')),
('TEXTCOLOR', (1, 0), (1, -1), colors.HexColor('#1e293b')),
('BOTTOMPADDING', (0, 0), (-1, -1), 3),
('TOPPADDING', (0, 0), (-1, -1), 3),
('LINEBELOW', (0, 0), (-1, -2), 0.5, colors.HexColor('#e2e8f0')),
]))
story.append(summary_table)
story.append(Spacer(1, 6*mm))
story.append(Paragraph('QUALITY RANKING', section_style))
rank_header = ['Rank', 'Filename', 'Quality', 'Sharpness', 'Contrast', 'SNR (dB)']
rank_rows = [rank_header]
for idx, item in enumerate(items):
pq = item.get('processed_quality', item.get('quality_metrics', {}))
score = item.get('processed_overall', item.get('overall_quality', 0))
rank_rows.append([
str(idx + 1),
item['filename'][:25],
f'{score}',
str(pq.get('sharpness_score', 'N/A')),
str(pq.get('contrast_ratio', 'N/A')),
str(pq.get('snr_db', 'N/A'))
])
rank_table = Table(rank_rows, colWidths=[12*mm, 52*mm, 20*mm, 24*mm, 22*mm, 22*mm])
rank_table.setStyle(TableStyle([
('FONTSIZE', (0, 0), (-1, -1), 8),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#2563eb')),
('TEXTCOLOR', (0, 0), (-1, 0), colors.white),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('ALIGN', (1, 1), (1, -1), 'LEFT'),
('BOTTOMPADDING', (0, 0), (-1, -1), 3),
('TOPPADDING', (0, 0), (-1, -1), 3),
('LINEBELOW', (0, 0), (-1, -1), 0.5, colors.HexColor('#e2e8f0')),
('ROWBACKGROUNDS', (0, 1), (-1, -1), [colors.white, colors.HexColor('#f8fafc')]),
]))
story.append(rank_table)
for idx, item in enumerate(items):
story.append(PageBreak())
score = item.get('processed_overall', item.get('overall_quality', 0))
label = get_quality_label(score)
story.append(Paragraph(f'Image {idx+1}: {item["filename"]}', section_style))
story.append(Paragraph(f'Quality Score: {score}/100 ({label})', label_style))
ep = item.get('enhancement_params', {})
if ep:
parts = []
if ep.get('denoise_strength'):
parts.append(f"Denoise: {ep['denoise_strength'].capitalize()}")
if ep.get('clahe_clip_limit'):
parts.append(f"CLAHE: {ep['clahe_clip_limit']}")
if ep.get('sharpen_strength'):
parts.append(f"Sharpen: {ep['sharpen_strength'].capitalize()}")
if parts:
story.append(Paragraph(f'Enhancement: {" | ".join(parts)}', small_style))
qh = item.get('quality_history', [])
if len(qh) > 1:
steps_str = ' -> '.join([f"{s['step'].replace('_', ' ').title()}: {s['quality']}" for s in qh])
story.append(Paragraph(f'Quality Steps: {steps_str}', small_style))
story.append(Spacer(1, 3*mm))
filepath = find_uploaded_file(item['file_id'])
if filepath:
try:
image = load_image_from_path(filepath)
if image is not None:
h, w = image.shape[:2]
max_w = 170 * mm
aspect = h / w
img_w = min(max_w, 170 * mm)
img_h = img_w * aspect
if img_h > 80 * mm:
img_h = 80 * mm
img_w = img_h / aspect
orig_buf = io.BytesIO()
_, enc = cv2.imencode('.png', image)
orig_buf.write(enc.tobytes())
orig_buf.seek(0)
story.append(Paragraph('Original Image', label_style))
story.append(Spacer(1, 2*mm))
story.append(RLImage(orig_buf, width=img_w, height=img_h))
story.append(Spacer(1, 4*mm))
ep = item.get('enhancement_params', {})
processed = image.copy()
if ep.get('denoise_strength') or ep.get('denoise_enabled', False):
processed = denoise_image(processed, ep.get('denoise_strength', 'medium'))
if ep.get('clahe_clip_limit') or ep.get('clahe_enabled', False):
processed = apply_clahe(processed, clip_limit=ep.get('clahe_clip_limit', 2.5))
if ep.get('sharpen_strength') or ep.get('sharpen_enabled', False):
processed = sharpen_image(processed, ep.get('sharpen_strength', 'medium'))
if settings.get('pseudonymize'):
processed = apply_pseudonymize(processed)
proc_buf = io.BytesIO()
_, enc2 = cv2.imencode('.png', processed)
proc_buf.write(enc2.tobytes())
proc_buf.seek(0)
story.append(Paragraph('Processed Image', label_style))
story.append(Spacer(1, 2*mm))
story.append(RLImage(proc_buf, width=img_w, height=img_h))
story.append(Spacer(1, 4*mm))
except Exception:
story.append(Paragraph('(Images could not be loaded)', small_style))
pq = item.get('processed_quality', item.get('quality_metrics', {}))
oq = item.get('original_quality', item.get('quality_metrics', {}))
metrics_data = [
['Metric', 'Original', 'Processed', 'Change'],
['Sharpness', str(oq.get('sharpness_score', 'N/A')), str(pq.get('sharpness_score', 'N/A')),
str(round(pq.get('sharpness_score', 0) - oq.get('sharpness_score', 0), 1)) if 'sharpness_score' in pq and 'sharpness_score' in oq else 'N/A'],
['Contrast', str(oq.get('contrast_ratio', 'N/A')), str(pq.get('contrast_ratio', 'N/A')),
str(round(pq.get('contrast_ratio', 0) - oq.get('contrast_ratio', 0), 3)) if 'contrast_ratio' in pq and 'contrast_ratio' in oq else 'N/A'],
['SNR (dB)', str(oq.get('snr_db', 'N/A')), str(pq.get('snr_db', 'N/A')),
str(round(pq.get('snr_db', 0) - oq.get('snr_db', 0), 1)) if 'snr_db' in pq and 'snr_db' in oq else 'N/A'],
]
metrics_table = Table(metrics_data, colWidths=[35*mm, 35*mm, 35*mm, 35*mm])
metrics_table.setStyle(TableStyle([
('FONTSIZE', (0, 0), (-1, -1), 9),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#2563eb')),
('TEXTCOLOR', (0, 0), (-1, 0), colors.white),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('BOTTOMPADDING', (0, 0), (-1, -1), 4),
('TOPPADDING', (0, 0), (-1, -1), 4),
('LINEBELOW', (0, 0), (-1, -1), 0.5, colors.HexColor('#e2e8f0')),
]))
story.append(metrics_table)
story.append(Spacer(1, 10*mm))
story.append(Paragraph('Generated by InSono v1.0', small_style))
story.append(Paragraph('Research Use Only - Not for clinical diagnosis', small_style))
doc.build(story)
buffer.seek(0)
return send_file(
buffer,
mimetype='application/pdf',
as_attachment=True,
download_name=f'insono_batch_report_{datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")}.pdf'
)
except Exception as e:
return jsonify({'error': f'Error generating PDF: {str(e)}'}), 500
def preload_segmentation_model():
import threading
def _load():
try:
print("[Startup] Pre-loading MONAI segmentation model...")
from models import preload_model
model = preload_model()
if model is not None:
print("[Startup] MONAI model pre-loaded successfully")
else:
print("[Startup] MONAI model not available (will retry on first request)")
except Exception as e:
print(f"[Startup] MONAI pre-load failed: {e}")
t = threading.Thread(target=_load, daemon=True)
t.start()
if __name__ == '__main__':
import sys
port = int(os.environ.get('PORT', sys.argv[1] if len(sys.argv) > 1 else 7860))
preload_segmentation_model()
app.run(host='0.0.0.0', port=port, debug=True)