import os
import tempfile
import time
import nibabel as nib
import numpy as np
from PIL import Image
import gradio as gr
import modal
try:
Segmenter = modal.Cls.from_name("ct-summary-backend", "Segmenter")
segmenter_instance = Segmenter()
except Exception as e:
print(f"[LOCAL] Failed to connect to Modal backend: {e}")
segmenter_instance = None
# Removed global ping that blocked HF Space startup
def slice_3d_volumetric_scan(nifti_path):
try:
img = nib.load(nifti_path)
data = img.get_fdata()
z_mid = data.shape[2] // 2
slice_data = data[:, :, z_mid]
slice_data = np.rot90(slice_data)
data_min, data_max = np.min(slice_data), np.max(slice_data)
if data_max - data_min > 0:
normalized = 255.0 * (slice_data - data_min) / (data_max - data_min)
else:
normalized = np.zeros_like(slice_data)
img_uint8 = normalized.astype(np.uint8)
tmp_img = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
Image.fromarray(img_uint8).save(tmp_img.name)
return tmp_img.name
except Exception as e:
print(f"Visualization Error: {e}")
return None
def _validate_scan_local(nifti_path):
"""Minimal validation: 3D only, not a mask. Let TotalSegmentator handle the rest."""
try:
img = nib.load(nifti_path)
data = img.get_fdata()
except Exception as e:
return False, f"Not supported file or wrong CT scan. Could not read volume: {e}"
if len(data.shape) != 3:
return False, f"Not supported file or wrong CT scan. Expected 3D volume, got {len(data.shape)}D shape {data.shape}."
unique_count = len(np.unique(data))
print(f"[LOCAL] Validation: shape={data.shape}, unique_values={unique_count}, min={np.min(data):.1f}, max={np.max(data):.1f}")
if unique_count < 50:
return False, "Not supported file or wrong CT scan. Uploaded file appears to be a segmentation mask (too few unique values)."
return True, None
SECTION_ORDER = [
"Solid Organs", "Gastrointestinal", "Thoracic", "Genitourinary", "Other Structures"
]
def build_preview_html(findings: dict) -> str:
if findings.get("error"):
return (
'
'
f'Processing issue: {findings["error"]}'
'
'
)
alerts = findings.get("alerts", [])
sections = findings.get("sections", {})
total_structures = findings.get("total_structures", 0)
if alerts:
html = ''
html += f'
⚠ {len(alerts)} finding(s) outside expected range
'
html += '
'
for a in alerts:
vol_str = f" — {a['volume']:.1f} cm³" if a.get("volume") is not None else ""
html += f'- {a["name"]}{vol_str}
{a["note"]} '
html += '
'
else:
html = (
''
'✓ No findings outside expected range across measured structures.'
'
'
)
html += ''
for section_name in SECTION_ORDER:
entries = sections.get(section_name)
if not entries:
continue
html += f'
{section_name}
'
for e in entries:
cls = "preview-metric-alert" if e["status"] == "alert" else "preview-metric"
html += f'
{e["name"]}{e["volume"]:.1f} cm³
'
html += '
'
html += (
f''
)
return html
def build_report_html(findings: dict, scan_label: str, for_pdf: bool = False) -> str:
if findings.get("error"):
body = f'Processing issue: {findings["error"]}
'
return _wrap_html(body, scan_label, for_pdf)
alerts = findings.get("alerts", [])
sections = findings.get("sections", {})
total_structures = findings.get("total_structures", 0)
if alerts:
body = ''
body += f'
⚠ {len(alerts)} finding(s) outside expected range
'
body += '
'
for a in alerts:
vol_str = f" ({a['volume']:.1f} cm³)" if a.get("volume") is not None else ""
body += f'- {a["name"]}{vol_str} — {a["note"]}
'
body += '
'
else:
body = ''
body += '
✓ No findings outside expected range
'
body += '
All measured structures fall within typical adult volume ranges for the available reference set.
'
body += '
'
for section_name in SECTION_ORDER:
entries = sections.get(section_name)
if not entries:
continue
body += f'{section_name}
'
for e in entries:
status_class = "status-alert" if e["status"] == "alert" else "status-normal"
note_html = f'{e["note"]}
' if e.get("note") else ""
body += (
f'- '
f'{e["name"]}: {e["volume"]:.1f} cm³'
f'{note_html}
'
)
body += '
'
body += (
f'Total structures measured: {total_structures}. '
f'Volumes are approximate, derived from a fast-mode segmentation pass and intended '
f'for screening purposes only — not a substitute for radiologist review.
'
)
return _wrap_html(body, scan_label, for_pdf)
def _wrap_html(content_html: str, scan_label: str, for_pdf: bool) -> str:
page_rule = """
@page {
size: A4;
margin: 20mm 15mm 20mm 15mm;
@bottom-right {
content: "Page " counter(page) " of " counter(pages);
font-family: 'Helvetica Neue', Helvetica, Arial, sans-serif;
font-size: 9pt;
color: #64748b;
}
}
""" if for_pdf else ""
return f"""
{content_html}
"""
def run_pipeline(file_obj, progress=gr.Progress()):
t_start = time.time()
if file_obj is None:
return None, 'Upload a NIfTI (.nii or .nii.gz) volume to begin.
', None
scan_label = "Whole Body CT (Auto-Detected)"
# --- Local validation ---
progress(0.05, desc="Validating file...")
is_valid, err_msg = _validate_scan_local(file_obj.name)
if not is_valid:
return None, f'{err_msg}
', None
# --- Slice extraction ---
progress(0.15, desc="Extracting preview slice...")
slice_path = slice_3d_volumetric_scan(file_obj.name)
if slice_path is None:
return None, 'Failed to extract preview slice.
', None
if segmenter_instance is None:
err_html = (
''
"Could not connect to the Modal backend. Confirm the 'ct-summary-backend' app is deployed."
'
'
)
return slice_path, err_html, None
try:
# --- Read file ---
progress(0.25, desc="Reading file...")
with open(file_obj.name, "rb") as f:
file_bytes = f.read()
# --- Pre-flight ping ---
progress(0.30, desc="Connecting to backend...")
try:
segmenter_instance.ping.remote()
except Exception as e:
return slice_path, f'Backend unreachable: {e}
', None
# --- Modal remote call ---
progress(0.35, desc="Uploading & running segmentation (~20-30s)...")
t0 = time.time()
findings = segmenter_instance.validate_and_report.remote(file_bytes)
t_remote = time.time() - t0
print(f"[frontend timing] Modal remote call: {t_remote:.1f}s")
# --- Preview HTML ---
progress(0.80, desc="Building report...")
report_preview = build_preview_html(findings)
# --- PDF generation ---
progress(0.90, desc="Generating PDF...")
from weasyprint import HTML
pdf_html = build_report_html(findings, scan_label, for_pdf=True)
pdf_dir = tempfile.mkdtemp()
pdf_path = os.path.join(pdf_dir, "ct_report.pdf")
HTML(string=pdf_html).write_pdf(pdf_path)
progress(1.0, desc="Done")
print(f"[frontend timing] TOTAL pipeline: {time.time() - t_start:.1f}s")
return slice_path, report_preview, pdf_path
except Exception as e:
err_html = f'Pipeline execution failed: {e}
'
return slice_path, err_html, None
clinical_theme = gr.themes.Soft(
primary_hue="blue",
neutral_hue="slate",
).set(
body_background_fill="#0f172a",
block_background_fill="#1e293b",
block_border_color="#334155",
button_primary_background_fill="#2563eb",
button_primary_text_color="#ffffff",
body_text_color="#f1f5f9"
)
custom_css = """
.gradio-container { font-family: 'Helvetica Neue', Arial, sans-serif; }
h1, h2, h3, h4, h5, h6 { color: #ffffff !important; }
#main-heading { text-align: center; }
.full-height-image { height: 790px !important; }
.full-height-image img { height: 100% !important; object-fit: contain; }
.report-frame {
background-color: #ffffff !important;
border-radius: 6px;
border: 1px solid #334155;
min-height: 300px;
max-height: 790px !important;
padding: 16px;
font-family: 'Helvetica Neue', Arial, sans-serif;
overflow-y: auto !important;
}
.report-frame, .report-frame * {
color: #1e293b !important;
}
.report-frame h1, .report-frame h2, .report-frame h3 { color: #0f172a !important; }
.preview-alert {
border-radius: 4px;
padding: 12px 14px;
margin-bottom: 16px;
border: 1px solid;
font-size: 10.5pt;
}
.preview-alert-warning, .preview-alert-warning * { background-color: #fef2f2; border-color: #fecaca; color: #991b1b !important; }
.preview-alert-ok, .preview-alert-ok * { background-color: #f0fdf4; border-color: #bbf7d0; color: #166534 !important; }
.preview-alert-error, .preview-alert-error * { background-color: #fef2f2; border-color: #fecaca; color: #991b1b !important; }
.preview-alert-title { font-weight: bold; margin-bottom: 6px; }
.preview-alert ul { margin: 6px 0 0 0; padding-left: 18px; }
.preview-alert li { margin-bottom: 8px; }
.preview-note, .preview-note * { font-size: 9pt; color: #7f1d1d !important; }
.preview-section-title, .preview-section-title * {
font-size: 10.5pt;
font-weight: bold;
color: #1e3a8a !important;
background-color: #eff6ff;
padding: 4px 8px;
margin-top: 14px;
margin-bottom: 6px;
border-left: 3px solid #2563eb;
text-transform: uppercase;
letter-spacing: 0.5px;
}
.preview-metric, .preview-metric * {
display: flex;
justify-content: space-between;
font-size: 10.5pt;
padding: 3px 6px;
border-bottom: 1px solid #f1f5f9;
color: #1e293b !important;
}
.preview-metric-alert, .preview-metric-alert * {
display: flex;
justify-content: space-between;
font-size: 10.5pt;
padding: 3px 6px;
border-bottom: 1px solid #f1f5f9;
color: #991b1b !important;
font-weight: bold;
background-color: #fef2f2;
}
.preview-footnote, .preview-footnote * {
font-size: 9pt;
color: #64748b !important;
font-style: italic;
margin-top: 14px;
}
"""
PLACEHOLDER_HTML = """
Upload a CT volume (.nii / .nii.gz) and run the analysis to see the metrics here.
"""
with gr.Blocks(theme=clinical_theme, css=custom_css, title="CT Report Generator") as demo:
gr.Markdown("# Automated 3D Imaging Extraction & Reporting Pipeline", elem_id="main-heading")
gr.Markdown(
"Upload a 3D CT volume to generate a structured report with volume-based alerts.",
elem_id="main-heading"
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 1. Cross-Section Visualization")
image_output = gr.Image(
label="Middle Z-Axis Cross-Section",
type="filepath",
height=790,
elem_classes=["full-height-image"]
)
with gr.Column(scale=1):
gr.Markdown("### 2. Upload & Analyze")
file_input = gr.File(
label="Upload 3D Volumetric Scan (.nii.gz / .nii)",
file_types=[".gz", ".nii"]
)
submit_btn = gr.Button("Analyze Scan & Generate Report", variant="primary")
gr.Markdown("#### Metrics & Alerts")
report_output = gr.HTML(value=PLACEHOLDER_HTML, elem_classes=["report-frame"])
pdf_download = gr.DownloadButton("Download Official PDF Report", variant="secondary")
submit_btn.click(
fn=run_pipeline,
inputs=[file_input],
outputs=[image_output, report_output, pdf_download]
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0")