MALDI-TOF-MS / app.py
fudan-renjun's picture
Update app.py
6e5ecfc verified
"""
πŸ”¬ MALDI-TOF MS Template-Based Processing Platform (Hugging Face Spaces)
Build a feature template from training set, then batch-process validation sets.
Original project: https://github.com/MengyuZhang163/MALDI-TOF-MS-1.3
Stack: Gradio + R (MALDIquant / MALDIquantForeign)
"""
import gradio as gr
import pandas as pd
import subprocess
import tempfile
import shutil
from pathlib import Path
import zipfile
import io
import os
import gc
import json
import time
import threading
import glob
# ============================================================
# Auto-install R at startup if not present
# ============================================================
def _ensure_r_installed():
"""Install r-base via apt if Rscript is not found."""
try:
subprocess.run(['Rscript', '--version'], capture_output=True, timeout=5, check=True)
print("[R] Rscript already available.")
return
except Exception:
pass
print("[R] Rscript not found β€” installing r-base via apt-get...")
cmds = [
['apt-get', 'update', '-qq'],
['apt-get', 'install', '-y', '-qq',
'r-base', 'r-base-dev',
'libxml2-dev', 'libcurl4-openssl-dev', 'libssl-dev'],
]
for cmd in cmds:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"[R] Warning: {' '.join(cmd)} failed:\n{result.stderr}")
else:
print(f"[R] {' '.join(cmd[:3])} ... done")
try:
subprocess.run(['Rscript', '--version'], capture_output=True, timeout=5, check=True)
print("[R] r-base installed successfully.")
except Exception:
print("[R] ERROR: Rscript still not available after installation attempt.")
_ensure_r_installed()
# ============================================================
# Cache / temp-file management
# ============================================================
TEMP_DIRS_REGISTRY = [] # track all temp dirs we create
def get_cache_size_mb():
"""Return total size (MB) of all tmp* directories."""
total = 0
temp_root = tempfile.gettempdir()
try:
for entry in os.scandir(temp_root):
if entry.is_dir() and entry.name.startswith('tmp'):
for dirpath, dirnames, filenames in os.walk(entry.path):
for f in filenames:
try:
total += os.path.getsize(os.path.join(dirpath, f))
except OSError:
pass
except OSError:
pass
return total / (1024 * 1024)
def cleanup_temp_files():
"""Delete all temporary files and reset application state."""
cleaned = 0
temp_root = tempfile.gettempdir()
for d in TEMP_DIRS_REGISTRY[:]:
if os.path.exists(d):
try:
shutil.rmtree(d, ignore_errors=True)
cleaned += 1
except:
pass
TEMP_DIRS_REGISTRY.clear()
try:
for entry in os.scandir(temp_root):
if entry.is_dir() and entry.name.startswith('tmp'):
try:
shutil.rmtree(entry.path, ignore_errors=True)
cleaned += 1
except:
pass
except OSError:
pass
app_state.reset(keep_template=False)
gc.collect()
return cleaned
def manual_cleanup():
"""Callback for the manual-cleanup button."""
size_before = get_cache_size_mb()
cleaned = cleanup_temp_files()
size_after = get_cache_size_mb()
freed = max(0, size_before - size_after)
return (
f"🧹 **Cleanup complete!**\n\n"
f"- Before: {size_before:.1f} MB\n"
f"- After: {size_after:.1f} MB\n"
f"- Freed: {freed:.1f} MB\n"
f"- Directories removed: {cleaned}\n"
f"- Template status: reset (re-run Stage 1 to rebuild)\n\n"
f"⏰ {time.strftime('%Y-%m-%d %H:%M:%S')}"
)
def get_cache_status():
"""Return a summary of current cache usage."""
size = get_cache_size_mb()
template_status = "βœ… Built" if app_state.template_created else "❌ Not built"
n_features = len(app_state.template_data) if app_state.template_data is not None else 0
status = (
f"πŸ’Ύ **Cache Status**\n\n"
f"- Temp files: **{size:.1f} MB**\n"
f"- Feature template: {template_status}"
)
if n_features > 0:
status += f" ({n_features} features)"
status += f"\n- Checked at: {time.strftime('%H:%M:%S')}"
if size > 500:
status += "\n\n⚠️ **Cache is large β€” cleanup recommended!**"
return status
def auto_cleanup_worker():
"""Background thread: check every 30 min, auto-clean if >500 MB."""
while True:
time.sleep(1800)
try:
size = get_cache_size_mb()
if size > 500:
cleanup_temp_files()
print(f"[Auto-cleanup] Cache was {size:.1f} MB β€” cleaned @ {time.strftime('%H:%M:%S')}")
except:
pass
_cleanup_thread = threading.Thread(target=auto_cleanup_worker, daemon=True)
_cleanup_thread.start()
# ============================================================
# Global application state
# ============================================================
class AppState:
"""Holds in-memory state between Gradio interactions."""
def __init__(self):
self.template_created = False
self.template_data = None # DataFrame
self.processing_params = None
self.train_result = None
def reset(self, keep_template=True):
if not keep_template:
self.template_created = False
self.template_data = None
self.processing_params = None
self.train_result = None
gc.collect()
app_state = AppState()
# ============================================================
# R environment helpers
# ============================================================
def check_r_installation():
try:
result = subprocess.run(['Rscript', '--version'],
capture_output=True, text=True, timeout=5)
return result.returncode == 0
except:
return False
def check_r_packages_installed():
try:
result = subprocess.run(
['Rscript', '-e',
'library(MALDIquant); library(MALDIquantForeign); library(readxl); cat("OK")'],
capture_output=True, text=True, timeout=10
)
return result.returncode == 0 and "OK" in result.stdout
except:
return False
def install_r_packages():
install_script = Path(__file__).parent / 'install_r_packages.R'
if not install_script.exists():
return False, "❌ install_r_packages.R not found"
try:
result = subprocess.run(
['Rscript', str(install_script)],
capture_output=True, text=True, timeout=1200
)
if result.returncode == 0:
return True, f"βœ… R packages installed successfully!\n\n{result.stdout}"
else:
return False, f"❌ R package installation failed\n\n{result.stderr}"
except Exception as e:
return False, f"❌ Installation error: {str(e)}"
def check_environment():
lines = []
r_ok = check_r_installation()
lines.append(f"R environment: {'βœ… Installed' if r_ok else '❌ Not found'}")
if r_ok:
pkg_ok = check_r_packages_installed()
lines.append(f"R packages (MALDIquant etc.): {'βœ… Installed' if pkg_ok else '❌ Not installed'}")
else:
lines.append("R packages: ⚠️ Skipped (R not available)")
return "\n".join(lines)
# ============================================================
# Utility functions
# ============================================================
def extract_files_from_zip(zip_path):
"""Extract TXT and Excel files from a ZIP archive."""
txt_files = []
excel_file = None
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
for file_name in zip_ref.namelist():
if file_name.lower().endswith('.txt') and not file_name.startswith('__MACOSX'):
content = zip_ref.read(file_name)
base_name = Path(file_name).name
txt_files.append((content, base_name))
elif file_name.lower().endswith(('.xlsx', '.xls')) and not file_name.startswith('__MACOSX'):
if excel_file is None:
content = zip_ref.read(file_name)
base_name = Path(file_name).name
excel_file = (content, base_name)
return txt_files, excel_file
def run_r_script(script_content, work_dir):
"""Write and execute an R script, returning (stdout, stderr, returncode)."""
script_path = Path(work_dir) / "process.R"
with open(script_path, 'w', encoding='utf-8') as f:
f.write(script_content)
try:
result = subprocess.run(
['Rscript', str(script_path)],
cwd=work_dir,
capture_output=True, text=True, timeout=1200
)
return result.stdout, result.stderr, result.returncode
except subprocess.TimeoutExpired:
return "", "Processing timed out (>20 min)", 1
except Exception as e:
return "", f"Error running R script: {str(e)}", 1
# ============================================================
# Stage 1: Build training-set template
# ============================================================
def process_training_set(train_zip_file, halfWindowSize, SNR, tolerance,
iterations, skip_alignment, relaxed_params,
progress=gr.Progress()):
"""Process the training set and build a feature template."""
if train_zip_file is None:
return "❌ Please upload a training-set ZIP file first.", None, None, None, ""
if not check_r_installation():
return "❌ R is not installed β€” cannot process data!", None, None, None, ""
if not check_r_packages_installed():
return "❌ R packages not installed! Click 'Install R Packages' first.", None, None, None, ""
params = {
'halfWindowSize': int(halfWindowSize),
'SNR': float(SNR),
'tolerance': float(tolerance),
'iterations': int(iterations),
'skip_alignment': skip_alignment,
'relaxed_params': relaxed_params,
}
progress(0.1, desc="πŸ“ Step 1/6: Extracting files...")
temp_dir = tempfile.mkdtemp()
train_dir = Path(temp_dir) / "train"
train_dir.mkdir()
try:
txt_files, excel_file = extract_files_from_zip(train_zip_file.name)
if not txt_files:
return "❌ No TXT files found in the ZIP archive.", None, None, None, ""
if not excel_file:
return "❌ No Excel file found in the ZIP archive.", None, None, None, ""
for content, name in txt_files:
cleaned = content.replace(b'\r\n', b'\n').replace(b'\r', b'\n')
with open(train_dir / name, 'wb') as f:
f.write(cleaned)
excel_path = train_dir / excel_file[1]
with open(excel_path, 'wb') as f:
f.write(excel_file[0])
progress(0.2, desc="πŸ“ Step 2/6: Generating R script...")
alignment_block = ""
if not params.get('skip_alignment', False):
relaxed_block = ""
if params.get('relaxed_params', True):
relaxed_block = f'''
cat("Retrying with relaxed parameters...\\n")
tryCatch({{
avgSpectra <<- alignSpectra(avgSpectra,
halfWindowSize = {params['halfWindowSize']},
SNR = max(1.5, {params['SNR']} - 0.5),
tolerance = {params['tolerance']} * 2,
warpingMethod = "lowess")
alignment_success <<- TRUE
cat("Alignment succeeded with relaxed parameters.\\n")
}}, error = function(e2) {{
cat("Relaxed parameters also failed β€” skipping alignment.\\n")
}})
'''
alignment_block = f'''
cat("Aligning average spectra...\\n")
alignment_success <- FALSE
tryCatch({{
avgSpectra <- alignSpectra(avgSpectra,
halfWindowSize = {params['halfWindowSize']},
SNR = {params['SNR']},
tolerance = {params['tolerance']},
warpingMethod = "lowess")
alignment_success <- TRUE
cat("Alignment complete.\\n")
}}, error = function(e) {{
cat(sprintf("Alignment failed: %s\\n", e$message))
{relaxed_block}
}})
if (!alignment_success) {{
cat("Warning: spectral alignment failed β€” continuing with unaligned data.\\n")
}}
'''
r_script = f"""
user_lib <- Sys.getenv("R_LIBS_USER")
if (user_lib == "") {{ user_lib <- "~/R/library" }}
if (!dir.exists(user_lib)) {{ dir.create(user_lib, recursive = TRUE) }}
.libPaths(c(user_lib, .libPaths()))
library('MALDIquant')
library('MALDIquantForeign')
library('readxl')
cat("Starting training-set processing...\\n")
cat("Reading Excel and TXT files...\\n")
samples <- read_excel('{excel_path.as_posix()}')
txt_files <- list.files('{train_dir.as_posix()}', pattern = "\\\\.txt$", full.names = TRUE)
cat(sprintf("Found %d TXT file(s)\\n", length(txt_files)))
training_spectra <- list()
failed_files <- c()
for (f in txt_files) {{
tryCatch({{
s <- importTxt(f)
if (length(s) > 0) {{ training_spectra <- c(training_spectra, s) }}
}}, error = function(e) {{
cat(sprintf(" ⚠️ Skipping %s: %s\\n", basename(f), e$message))
failed_files <<- c(failed_files, basename(f))
}})
}}
cat(sprintf("Successfully imported: %d spectra\\n", length(training_spectra)))
if (length(failed_files) > 0) {{
cat(sprintf("Skipped (bad format): %d file(s)\\n", length(failed_files)))
}}
if (length(training_spectra) == 0) {{
stop("No spectra imported. Please check TXT file format.")
}}
cat("Pre-processing (1/5): intensity transformation...\\n")
training_spectra <- transformIntensity(training_spectra, method = "sqrt")
cat("Pre-processing (2/5): smoothing...\\n")
training_spectra <- smoothIntensity(training_spectra, method = "SavitzkyGolay",
halfWindowSize = {params['halfWindowSize']})
cat("Pre-processing (3/5): baseline removal...\\n")
training_spectra <- removeBaseline(training_spectra, method = "SNIP",
iterations = {params['iterations']})
cat("Pre-processing (4/5): intensity calibration...\\n")
training_spectra <- calibrateIntensity(training_spectra, method = "TIC")
cat("Pre-processing (5/5): assigning labels...\\n")
train_labels <- samples$group[match(
sapply(training_spectra, function(s) basename(s@metaData$file)),
samples$file
)]
cat("Computing average spectra...\\n")
avgSpectra <- averageMassSpectra(training_spectra, labels = train_labels)
cat(sprintf("Average spectra computed: %d group(s)\\n", length(avgSpectra)))
{alignment_block}
cat("Detecting peaks and building feature template...\\n")
train_peaks <- detectPeaks(avgSpectra,
method = "MAD",
halfWindowSize = {params['halfWindowSize']},
SNR = {params['SNR']})
cat("Binning peaks...\\n")
train_binned <- binPeaks(train_peaks, tolerance = 2)
cat("Extracting feature m/z values...\\n")
feature_mz <- as.numeric(unique(unlist(lapply(train_binned, function(p) p@mass))))
feature_mz <- sort(feature_mz)
cat(sprintf("Training-set features: %d peaks\\n", length(feature_mz)))
cat(sprintf("m/z range: %.0f - %.0f\\n", min(feature_mz), max(feature_mz)))
cat("Saving feature template...\\n")
feature_template <- data.frame(
feature_id = paste0("mz_", round(feature_mz)),
mz = feature_mz
)
write.csv(feature_template, file = '{temp_dir}/feature_template.csv', row.names = FALSE)
cat("Building training-set intensity matrix...\\n")
train_intensity_matrix <- intensityMatrix(train_binned, avgSpectra)
bin_centers <- as.numeric(colnames(train_intensity_matrix))
colnames(train_intensity_matrix) <- paste0("mz_", round(bin_centers))
rownames(train_intensity_matrix) <- unique(train_labels)
train_df <- as.data.frame(train_intensity_matrix)
train_df <- cbind(group = rownames(train_df), train_df)
write.csv(train_df, file = '{temp_dir}/peak_intensity_train.csv', row.names = FALSE)
cat("Saving processing parameters...\\n")
params_df <- data.frame(
parameter = c('halfWindowSize', 'SNR', 'tolerance', 'iterations', 'skip_alignment'),
value = c({params['halfWindowSize']}, {params['SNR']}, {params['tolerance']},
{params['iterations']}, {'TRUE' if params.get('skip_alignment', False) else 'FALSE'})
)
write.csv(params_df, '{temp_dir}/processing_params.csv', row.names = FALSE)
cat("Training-set processing complete!\\n")
cat(sprintf(" Groups: %d\\n", nrow(train_df)))
cat(sprintf(" Features: %d\\n", ncol(train_df) - 1))
"""
progress(0.3, desc="πŸ”¬ Step 3/6: Processing training set (may take a few minutes)...")
stdout, stderr, returncode = run_r_script(r_script, temp_dir)
if returncode == 0:
progress(0.8, desc="πŸ“Š Step 4/6: Reading results...")
template_df = pd.read_csv(Path(temp_dir) / 'feature_template.csv')
train_df = pd.read_csv(Path(temp_dir) / 'peak_intensity_train.csv')
app_state.template_created = True
app_state.template_data = template_df
app_state.processing_params = params
progress(0.95, desc="πŸ’Ύ Step 5/6: Saving output files...")
output_dir = Path(tempfile.mkdtemp())
TEMP_DIRS_REGISTRY.append(str(output_dir))
shutil.copy(Path(temp_dir) / 'peak_intensity_train.csv', output_dir / 'peak_intensity_train.csv')
shutil.copy(Path(temp_dir) / 'feature_template.csv', output_dir / 'feature_template.csv')
shutil.copy(Path(temp_dir) / 'processing_params.csv', output_dir / 'processing_params.csv')
progress(1.0, desc="βœ… Done!")
n_groups = len(train_df)
n_features = len(template_df)
mz_range = f"{template_df['mz'].min():.0f} – {template_df['mz'].max():.0f}"
summary = (
f"βœ… **Training set processed β€” feature template built!**\n\n"
f"πŸ“Š **Summary:**\n"
f"- Groups: **{n_groups}**\n"
f"- Features: **{n_features}**\n"
f"- m/z range: **{mz_range}**\n\n"
f"πŸ’‘ You can now switch to Stage 2 to process the validation set."
)
return (
summary,
str(output_dir / 'peak_intensity_train.csv'),
str(output_dir / 'feature_template.csv'),
str(output_dir / 'processing_params.csv'),
stdout
)
else:
return f"❌ Processing failed!\n\n{stderr}", None, None, None, stdout
except Exception as e:
return f"❌ Unexpected error: {str(e)}", None, None, None, ""
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
# ============================================================
# Stage 2: Process validation set
# ============================================================
def process_validation_set(valid_zip_file, template_csv_file,
halfWindowSize, SNR, tolerance,
iterations, skip_alignment, relaxed_params,
progress=gr.Progress()):
"""Apply the training-set template to a validation set."""
if valid_zip_file is None:
return "❌ Please upload a validation-set ZIP file first.", None, ""
template_from_upload = False
if template_csv_file is not None:
try:
uploaded_template = pd.read_csv(template_csv_file.name)
if 'mz' not in uploaded_template.columns:
return "❌ Uploaded template CSV is missing the 'mz' column.", None, ""
template_df = uploaded_template
template_from_upload = True
except Exception as e:
return f"❌ Failed to read template CSV: {str(e)}", None, ""
elif app_state.template_created and app_state.template_data is not None:
template_df = app_state.template_data
else:
return ("❌ No feature template available!\n"
"Complete Stage 1 first, or upload a saved feature_template.csv."), None, ""
if not check_r_installation():
return "❌ R is not installed!", None, ""
if not check_r_packages_installed():
return "❌ R packages not installed!", None, ""
if not template_from_upload and app_state.processing_params:
params = app_state.processing_params
else:
params = {
'halfWindowSize': int(halfWindowSize),
'SNR': float(SNR),
'tolerance': float(tolerance),
'iterations': int(iterations),
'skip_alignment': skip_alignment,
'relaxed_params': relaxed_params,
}
progress(0.1, desc="πŸ“ Step 1/5: Extracting validation files...")
temp_dir = tempfile.mkdtemp()
valid_dir = Path(temp_dir) / "validation"
valid_dir.mkdir()
try:
txt_files, _ = extract_files_from_zip(valid_zip_file.name)
if not txt_files:
return "❌ No TXT files found in the ZIP archive.", None, ""
for content, name in txt_files:
cleaned = content.replace(b'\r\n', b'\n').replace(b'\r', b'\n')
with open(valid_dir / name, 'wb') as f:
f.write(cleaned)
progress(0.2, desc="πŸ“‹ Step 2/5: Preparing feature template...")
template_path = Path(temp_dir) / 'feature_template.csv'
template_df.to_csv(template_path, index=False)
n_features = len(template_df)
template_source = "uploaded CSV" if template_from_upload else "Stage 1 in-memory template"
progress(0.3, desc="πŸ“ Step 3/5: Generating R script...")
r_script = f"""
user_lib <- Sys.getenv("R_LIBS_USER")
if (user_lib == "") {{ user_lib <- "~/R/library" }}
if (!dir.exists(user_lib)) {{ dir.create(user_lib, recursive = TRUE) }}
.libPaths(c(user_lib, .libPaths()))
library('MALDIquant')
library('MALDIquantForeign')
cat("Processing validation set with training template...\\n")
template <- read.csv('{template_path.as_posix()}')
template_mz <- template$mz
cat(sprintf("Feature template: %d m/z values\\n", length(template_mz)))
cat("Reading validation TXT files...\\n")
txt_files <- list.files('{valid_dir.as_posix()}', pattern = "\\\\.txt$", full.names = TRUE)
cat(sprintf("Found %d TXT file(s)\\n", length(txt_files)))
validation_spectra <- list()
failed_files <- c()
for (f in txt_files) {{
tryCatch({{
s <- importTxt(f)
if (length(s) > 0) {{ validation_spectra <- c(validation_spectra, s) }}
}}, error = function(e) {{
cat(sprintf(" ⚠️ Skipping %s: %s\\n", basename(f), e$message))
failed_files <<- c(failed_files, basename(f))
}})
}}
cat(sprintf("Successfully imported: %d spectra\\n", length(validation_spectra)))
if (length(failed_files) > 0) {{
cat(sprintf("Skipped (bad format): %d file(s)\\n", length(failed_files)))
}}
if (length(validation_spectra) == 0) {{
stop("No spectra imported. Please check TXT file format.")
}}
cat("Pre-processing (1/4): intensity transformation...\\n")
validation_spectra <- transformIntensity(validation_spectra, method = "sqrt")
cat("Pre-processing (2/4): smoothing...\\n")
validation_spectra <- smoothIntensity(validation_spectra, method = "SavitzkyGolay",
halfWindowSize = {params['halfWindowSize']})
cat("Pre-processing (3/4): baseline removal...\\n")
validation_spectra <- removeBaseline(validation_spectra, method = "SNIP",
iterations = {params['iterations']})
cat("Pre-processing (4/4): intensity calibration...\\n")
validation_spectra <- calibrateIntensity(validation_spectra, method = "TIC")
cat("Aligning validation spectra...\\n")
tryCatch({{
validation_spectra <- alignSpectra(validation_spectra,
halfWindowSize = {params['halfWindowSize']},
SNR = {params['SNR']},
tolerance = {params['tolerance']},
warpingMethod = "lowess")
cat("Alignment complete.\\n")
}}, error = function(e) {{
cat("Standard alignment failed β€” trying relaxed parameters...\\n")
cat(sprintf("Error: %s\\n", e$message))
tryCatch({{
validation_spectra <<- alignSpectra(validation_spectra,
halfWindowSize = {params['halfWindowSize']},
SNR = max(1.5, {params['SNR']} - 0.5),
tolerance = {params['tolerance']} * 1.5,
warpingMethod = "lowess")
cat("Alignment succeeded with relaxed parameters.\\n")
}}, error = function(e2) {{
cat("Relaxed alignment also failed β€” skipping alignment step.\\n")
}})
}})
cat("Extracting intensities using template...\\n")
n_samples <- length(validation_spectra)
n_features <- length(template_mz)
intensity_matrix <- matrix(0, nrow = n_samples, ncol = n_features)
for (i in 1:n_samples) {{
if (i %% 50 == 0) {{
cat(sprintf(" Progress: %d/%d\\n", i, n_samples))
}}
spec <- validation_spectra[[i]]
for (j in 1:n_features) {{
target_mz <- template_mz[j]
if (length(spec@mass) > 0) {{
idx <- which(abs(spec@mass - target_mz) <= 2)
if (length(idx) > 0) {{
closest_idx <- idx[which.min(abs(spec@mass[idx] - target_mz))]
intensity_matrix[i, j] <- spec@intensity[closest_idx]
}}
}}
}}
}}
colnames(intensity_matrix) <- paste0("mz_", round(template_mz))
sample_names <- sapply(validation_spectra, function(s) basename(s@metaData$file))
rownames(intensity_matrix) <- sample_names
cat("Saving validation results...\\n")
valid_df <- as.data.frame(intensity_matrix)
valid_df <- cbind(sample = rownames(valid_df), valid_df)
write.csv(valid_df, file = '{temp_dir}/peak_intensity_validation.csv', row.names = FALSE)
cat("Validation-set processing complete!\\n")
cat(sprintf(" Samples: %d\\n", nrow(valid_df)))
cat(sprintf(" Features: %d (consistent with training set)\\n", ncol(valid_df) - 1))
"""
progress(0.4, desc="πŸ”¬ Step 4/5: Processing validation set (may take a few minutes)...")
stdout, stderr, returncode = run_r_script(r_script, temp_dir)
if returncode == 0:
progress(0.9, desc="πŸ“Š Step 5/5: Reading results...")
valid_df = pd.read_csv(Path(temp_dir) / 'peak_intensity_validation.csv')
output_dir = Path(tempfile.mkdtemp())
TEMP_DIRS_REGISTRY.append(str(output_dir))
shutil.copy(
Path(temp_dir) / 'peak_intensity_validation.csv',
output_dir / 'peak_intensity_validation.csv'
)
progress(1.0, desc="βœ… Done!")
summary = (
f"βœ… **Validation set processed!**\n\n"
f"πŸ“Š **Summary:**\n"
f"- Template source: **{template_source}**\n"
f"- Template features: **{n_features}**\n"
f"- Validation samples: **{len(valid_df)}**\n"
f"- Output features: **{len(valid_df.columns) - 1}**\n"
f"- Feature consistency: βœ… Aligned with template"
)
return summary, str(output_dir / 'peak_intensity_validation.csv'), stdout
else:
return f"❌ Processing failed!\n\n{stderr}", None, stdout
except Exception as e:
return f"❌ Unexpected error: {str(e)}", None, ""
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
# ============================================================
# Data Preparation: generate sample-info Excel from folder ZIP
# ============================================================
def generate_sample_excel(zip_file, group_mode, progress=gr.Progress()):
"""
Parse folder structure from a ZIP and generate a sample-info Excel.
group_mode:
- "Folder name + sample name": group = folder_sample (e.g. KPN-R_1-1)
- "Folder name only": group = folder name (e.g. KPN-R)
β†’ samples in the same folder will be averaged in Stage 1
"""
if zip_file is None:
return "❌ Please upload a ZIP file first.", None, None, None
progress(0.1, desc="πŸ“‚ Parsing ZIP file structure...")
try:
records = []
txt_contents = {}
with zipfile.ZipFile(zip_file.name, 'r') as zf:
for file_path in sorted(zf.namelist()):
if '__MACOSX' in file_path or file_path.startswith('.'):
continue
if not file_path.lower().endswith('.txt'):
continue
parts = Path(file_path).parts
folder_name = parts[-2] if len(parts) >= 2 else "ungrouped"
file_name = parts[-1]
sample_stem = Path(file_name).stem
if file_name in txt_contents:
file_name = f"{folder_name}_{file_name}"
sample_stem = Path(file_name).stem
group_name = (f"{folder_name}_{sample_stem}"
if group_mode == "Folder name + sample name"
else folder_name)
records.append({'file': file_name, 'group': group_name})
txt_contents[file_name] = zf.read(file_path)
if not records:
return "❌ No TXT files found in the ZIP archive.", None, None, None
progress(0.5, desc="πŸ“ Generating Excel file...")
df = pd.DataFrame(records)
group_counts = df['group'].value_counts()
n_groups = len(group_counts)
n_files = len(df)
folder_counts = {}
for _, row in df.iterrows():
fn = row['group'].split('_')[0] if group_mode == "Folder name + sample name" else row['group']
folder_counts[fn] = folder_counts.get(fn, 0) + 1
output_dir = Path(tempfile.mkdtemp())
TEMP_DIRS_REGISTRY.append(str(output_dir))
excel_path = output_dir / 'sample_info.xlsx'
df.to_excel(excel_path, index=False, engine='openpyxl')
progress(0.7, desc="πŸ“¦ Packing ready-to-use training ZIP...")
ready_zip_path = output_dir / 'training_data_ready.zip'
with zipfile.ZipFile(ready_zip_path, 'w', zipfile.ZIP_DEFLATED) as zout:
for fname, content in txt_contents.items():
zout.writestr(fname, content)
zout.write(excel_path, 'sample_info.xlsx')
progress(1.0, desc="βœ… Done!")
summary_lines = [
f"βœ… **Sample info Excel generated!**\n",
f"πŸ“Š **Summary:**",
f"- Grouping mode: **{group_mode}**",
f"- Total files: **{n_files}**",
f"- Groups (group): **{n_groups}**",
]
if group_mode == "Folder name + sample name":
summary_lines.append(f"- Folders: **{len(folder_counts)}**")
summary_lines.append(f"\nπŸ“ Samples per folder:")
for folder, cnt in sorted(folder_counts.items()):
summary_lines.append(f" - `{folder}`: {cnt} sample(s)")
summary_lines.append(
f"\nπŸ’‘ Each sample has a unique group name (e.g. `KPN-R_1-1`). "
f"In Stage 1, **each sample is kept separately** β€” no averaging."
)
else:
summary_lines.append(f"\nπŸ“ Samples per group:")
for grp, cnt in group_counts.items():
summary_lines.append(f" - `{grp}`: {cnt} sample(s)")
summary_lines.append(
f"\n⚠️ Samples in the same group will be **averaged** in Stage 1. "
f"If there are few groups, the final dataset will have few rows."
)
summary_lines.append(
f"\nπŸ’‘ **Next step:** Download the 'Training Data ZIP' and upload it directly in Stage 1."
)
preview_df = df.head(20)
return "\n".join(summary_lines), str(excel_path), str(ready_zip_path), preview_df
except zipfile.BadZipFile:
return "❌ The file is not a valid ZIP archive.", None, None, None
except Exception as e:
return f"❌ Unexpected error: {str(e)}", None, None, None
# ============================================================
# Gradio UI
# ============================================================
CUSTOM_CSS = """
.main-title { text-align: center; margin-bottom: 0.5rem; }
.phase-header {
background: linear-gradient(90deg, #1f77b4 0%, #4a9eff 100%);
color: white; padding: 0.8rem 1rem; border-radius: 8px;
margin: 0.5rem 0; font-size: 1.1rem; font-weight: 600;
}
"""
with gr.Blocks(
title="πŸ”¬ MALDI-TOF MS Template Processing Platform",
theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky", neutral_hue="slate"),
css=CUSTOM_CSS,
) as demo:
gr.Markdown(
"# πŸ”¬ MALDI-TOF MS Template Processing Platform\n"
"### Build a feature template from the training set, then batch-process validation sets\n"
"---"
)
# ── Environment check ──────────────────────────────────
with gr.Accordion("πŸ”§ Environment Check & R Package Management", open=False):
with gr.Row():
env_check_btn = gr.Button("πŸ” Check Environment", variant="secondary")
install_btn = gr.Button("πŸ“¦ Install R Packages", variant="primary")
env_status = gr.Textbox(label="Environment Status", interactive=False, lines=3)
env_check_btn.click(fn=check_environment, outputs=env_status)
install_btn.click(fn=lambda: install_r_packages()[1], outputs=env_status)
# ── Processing parameters ──────────────────────────────
with gr.Accordion("βš™οΈ Processing Parameters", open=False):
with gr.Row():
halfWindowSize = gr.Slider(10, 200, value=90, step=10,
label="Half-window size (halfWindowSize)")
SNR = gr.Slider(1.0, 10.0, value=2.0, step=0.5,
label="Signal-to-noise ratio threshold (SNR)")
with gr.Row():
tolerance = gr.Slider(0.001, 0.02, value=0.008, step=0.001,
label="Alignment tolerance (tolerance)")
iterations = gr.Slider(50, 200, value=100, step=10,
label="Baseline removal iterations (iterations)")
with gr.Row():
skip_alignment = gr.Checkbox(label="Skip spectral alignment", value=False,
info="Check this if alignment keeps failing")
relaxed_params = gr.Checkbox(label="Use relaxed parameters", value=True,
info="Automatically loosen parameters to improve success rate")
param_inputs = [halfWindowSize, SNR, tolerance, iterations, skip_alignment, relaxed_params]
# ── Main tabs ──────────────────────────────────────────
with gr.Tabs():
# ── Data Preparation ──
with gr.TabItem("πŸ“‚ Data Preparation: Generate Sample Info"):
gr.HTML('<div class="phase-header">πŸ“‚ Data Preparation: Auto-generate Sample Info Excel from Folder Structure</div>')
gr.Markdown(
"πŸ’‘ **Automatically organise folder-grouped TXT files into the format required by Stage 1.**\n\n"
"**Required ZIP structure:**\n"
"```\n"
"your_archive.zip/\n"
" β”œβ”€β”€ KPN-R/ ← folder name = group\n"
" β”‚ β”œβ”€β”€ 1-1.txt\n"
" β”‚ β”œβ”€β”€ 1-2.txt\n"
" β”œβ”€β”€ KPN-S/\n"
" β”‚ β”œβ”€β”€ 2-1.txt\n"
" β”‚ └── 2-2.txt\n"
"```"
)
prep_zip = gr.File(label="Upload ZIP file (with grouped sub-folders)", file_types=[".zip"])
group_mode = gr.Radio(
choices=["Folder name + sample name", "Folder name only"],
value="Folder name + sample name",
label="πŸ“‹ Grouping mode",
info=(
"'Folder name + sample name' β†’ each sample gets a unique group (e.g. KPN-R_1-1) β€” recommended; "
"'Folder name only' β†’ samples in the same folder are averaged in Stage 1"
)
)
prep_btn = gr.Button("πŸ“‚ Generate Sample Info", variant="primary", size="lg")
prep_status = gr.Markdown()
with gr.Row():
prep_excel_dl = gr.File(label="πŸ“‹ Sample Info Excel (sample_info.xlsx)", interactive=False)
prep_ready_zip_dl = gr.File(label="πŸ“¦ Ready-to-use Training ZIP (use directly in Stage 1)", interactive=False)
prep_preview = gr.Dataframe(label="πŸ“‹ Preview (first 20 rows)", interactive=False)
prep_btn.click(
fn=generate_sample_excel,
inputs=[prep_zip, group_mode],
outputs=[prep_status, prep_excel_dl, prep_ready_zip_dl, prep_preview],
)
# ── Stage 1 ──
with gr.TabItem("🎯 Stage 1: Build Training Template"):
gr.HTML('<div class="phase-header">πŸ“Š Stage 1: Build Feature Template from Training Set</div>')
gr.Markdown(
"πŸ’‘ **Process the training set and build the feature template (one-time step).**\n\n"
"The ZIP file must contain: multiple `.txt` mass-spectrum files + one `.xlsx` sample-info file "
"(with `file` and `group` columns)."
)
train_zip = gr.File(label="Upload Training Set ZIP", file_types=[".zip"])
train_btn = gr.Button("🎯 Build Training Template", variant="primary", size="lg")
train_status = gr.Markdown()
with gr.Row():
train_csv_dl = gr.File(label="πŸ“Š Training Results", interactive=False)
template_csv_dl = gr.File(label="🎯 Feature Template", interactive=False)
params_csv_dl = gr.File(label="βš™οΈ Processing Parameters", interactive=False)
with gr.Accordion("πŸ“‹ Processing Log", open=False):
train_log = gr.Textbox(label="R Script Output", lines=15, interactive=False)
train_btn.click(
fn=process_training_set,
inputs=[train_zip] + param_inputs,
outputs=[train_status, train_csv_dl, template_csv_dl, params_csv_dl, train_log],
)
# ── Stage 2 ──
with gr.TabItem("πŸ”„ Stage 2: Process Validation Set"):
gr.HTML('<div class="phase-header">πŸ”„ Stage 2: Process Validation Set Using Template</div>')
gr.Markdown(
"πŸ’‘ **Two ways to provide the feature template:**\n\n"
"1. **After Stage 1** β†’ the template is kept in memory; just upload the validation ZIP.\n"
"2. **Upload saved template** β†’ upload a previously saved `feature_template.csv`; "
"no need to re-run Stage 1."
)
with gr.Row():
valid_zip = gr.File(label="πŸ“ Validation Set ZIP (required)", file_types=[".zip"])
template_upload = gr.File(label="🎯 Feature Template CSV (optional β€” leave blank to use Stage 1 template)",
file_types=[".csv"])
valid_btn = gr.Button("πŸ”„ Process Validation Set", variant="primary", size="lg")
valid_status = gr.Markdown()
valid_csv_dl = gr.File(label="πŸ“Š Validation Results", interactive=False)
with gr.Accordion("πŸ“‹ Processing Log", open=False):
valid_log = gr.Textbox(label="R Script Output", lines=15, interactive=False)
valid_btn.click(
fn=process_validation_set,
inputs=[valid_zip, template_upload] + param_inputs,
outputs=[valid_status, valid_csv_dl, valid_log],
)
# ── Cache management ───────────────────────────────────
with gr.Accordion("🧹 Cache Management", open=False):
gr.Markdown(
"πŸ’‘ Temporary files are created during processing. "
"For long sessions or large datasets, periodic cleanup is recommended.\n\n"
"⏰ The system checks automatically every 30 minutes and cleans up if usage exceeds 500 MB."
)
cache_status_box = gr.Markdown(value="Click 'View Cache Status' to get information.")
with gr.Row():
check_cache_btn = gr.Button("πŸ“Š View Cache Status", variant="secondary")
clean_cache_btn = gr.Button("🧹 Clear All Cache Now", variant="stop")
gr.Markdown("⚠️ **Note:** Clearing the cache resets the feature template β€” Stage 1 must be re-run.")
check_cache_btn.click(fn=get_cache_status, outputs=cache_status_box)
clean_cache_btn.click(fn=manual_cleanup, outputs=cache_status_box)
# ── Footer ─────────────────────────────────────────────
gr.Markdown(
"---\n"
"**MALDI-TOF MS Template Processing Platform** | "
"Pipeline: √-transform β†’ SavitzkyGolay smoothing β†’ SNIP baseline removal β†’ "
"TIC calibration β†’ Lowess alignment β†’ MAD peak detection | "
"[Original project](https://github.com/MengyuZhang163/MALDI-TOF-MS-1.3)"
)
if __name__ == "__main__":
demo.launch(ssr_mode=False, server_name="0.0.0.0", server_port=7860)