Spaces:
Sleeping
Sleeping
| """ | |
| π¬ MALDI-TOF MS Template-Based Processing Platform (Hugging Face Spaces) | |
| Build a feature template from training set, then batch-process validation sets. | |
| Original project: https://github.com/MengyuZhang163/MALDI-TOF-MS-1.3 | |
| Stack: Gradio + R (MALDIquant / MALDIquantForeign) | |
| """ | |
| import gradio as gr | |
| import pandas as pd | |
| import subprocess | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| import zipfile | |
| import io | |
| import os | |
| import gc | |
| import json | |
| import time | |
| import threading | |
| import glob | |
| # ============================================================ | |
| # Auto-install R at startup if not present | |
| # ============================================================ | |
| def _ensure_r_installed(): | |
| """Install r-base via apt if Rscript is not found.""" | |
| try: | |
| subprocess.run(['Rscript', '--version'], capture_output=True, timeout=5, check=True) | |
| print("[R] Rscript already available.") | |
| return | |
| except Exception: | |
| pass | |
| print("[R] Rscript not found β installing r-base via apt-get...") | |
| cmds = [ | |
| ['apt-get', 'update', '-qq'], | |
| ['apt-get', 'install', '-y', '-qq', | |
| 'r-base', 'r-base-dev', | |
| 'libxml2-dev', 'libcurl4-openssl-dev', 'libssl-dev'], | |
| ] | |
| for cmd in cmds: | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| print(f"[R] Warning: {' '.join(cmd)} failed:\n{result.stderr}") | |
| else: | |
| print(f"[R] {' '.join(cmd[:3])} ... done") | |
| try: | |
| subprocess.run(['Rscript', '--version'], capture_output=True, timeout=5, check=True) | |
| print("[R] r-base installed successfully.") | |
| except Exception: | |
| print("[R] ERROR: Rscript still not available after installation attempt.") | |
| _ensure_r_installed() | |
| # ============================================================ | |
| # Cache / temp-file management | |
| # ============================================================ | |
| TEMP_DIRS_REGISTRY = [] # track all temp dirs we create | |
| def get_cache_size_mb(): | |
| """Return total size (MB) of all tmp* directories.""" | |
| total = 0 | |
| temp_root = tempfile.gettempdir() | |
| try: | |
| for entry in os.scandir(temp_root): | |
| if entry.is_dir() and entry.name.startswith('tmp'): | |
| for dirpath, dirnames, filenames in os.walk(entry.path): | |
| for f in filenames: | |
| try: | |
| total += os.path.getsize(os.path.join(dirpath, f)) | |
| except OSError: | |
| pass | |
| except OSError: | |
| pass | |
| return total / (1024 * 1024) | |
| def cleanup_temp_files(): | |
| """Delete all temporary files and reset application state.""" | |
| cleaned = 0 | |
| temp_root = tempfile.gettempdir() | |
| for d in TEMP_DIRS_REGISTRY[:]: | |
| if os.path.exists(d): | |
| try: | |
| shutil.rmtree(d, ignore_errors=True) | |
| cleaned += 1 | |
| except: | |
| pass | |
| TEMP_DIRS_REGISTRY.clear() | |
| try: | |
| for entry in os.scandir(temp_root): | |
| if entry.is_dir() and entry.name.startswith('tmp'): | |
| try: | |
| shutil.rmtree(entry.path, ignore_errors=True) | |
| cleaned += 1 | |
| except: | |
| pass | |
| except OSError: | |
| pass | |
| app_state.reset(keep_template=False) | |
| gc.collect() | |
| return cleaned | |
| def manual_cleanup(): | |
| """Callback for the manual-cleanup button.""" | |
| size_before = get_cache_size_mb() | |
| cleaned = cleanup_temp_files() | |
| size_after = get_cache_size_mb() | |
| freed = max(0, size_before - size_after) | |
| return ( | |
| f"π§Ή **Cleanup complete!**\n\n" | |
| f"- Before: {size_before:.1f} MB\n" | |
| f"- After: {size_after:.1f} MB\n" | |
| f"- Freed: {freed:.1f} MB\n" | |
| f"- Directories removed: {cleaned}\n" | |
| f"- Template status: reset (re-run Stage 1 to rebuild)\n\n" | |
| f"β° {time.strftime('%Y-%m-%d %H:%M:%S')}" | |
| ) | |
| def get_cache_status(): | |
| """Return a summary of current cache usage.""" | |
| size = get_cache_size_mb() | |
| template_status = "β Built" if app_state.template_created else "β Not built" | |
| n_features = len(app_state.template_data) if app_state.template_data is not None else 0 | |
| status = ( | |
| f"πΎ **Cache Status**\n\n" | |
| f"- Temp files: **{size:.1f} MB**\n" | |
| f"- Feature template: {template_status}" | |
| ) | |
| if n_features > 0: | |
| status += f" ({n_features} features)" | |
| status += f"\n- Checked at: {time.strftime('%H:%M:%S')}" | |
| if size > 500: | |
| status += "\n\nβ οΈ **Cache is large β cleanup recommended!**" | |
| return status | |
| def auto_cleanup_worker(): | |
| """Background thread: check every 30 min, auto-clean if >500 MB.""" | |
| while True: | |
| time.sleep(1800) | |
| try: | |
| size = get_cache_size_mb() | |
| if size > 500: | |
| cleanup_temp_files() | |
| print(f"[Auto-cleanup] Cache was {size:.1f} MB β cleaned @ {time.strftime('%H:%M:%S')}") | |
| except: | |
| pass | |
| _cleanup_thread = threading.Thread(target=auto_cleanup_worker, daemon=True) | |
| _cleanup_thread.start() | |
| # ============================================================ | |
| # Global application state | |
| # ============================================================ | |
| class AppState: | |
| """Holds in-memory state between Gradio interactions.""" | |
| def __init__(self): | |
| self.template_created = False | |
| self.template_data = None # DataFrame | |
| self.processing_params = None | |
| self.train_result = None | |
| def reset(self, keep_template=True): | |
| if not keep_template: | |
| self.template_created = False | |
| self.template_data = None | |
| self.processing_params = None | |
| self.train_result = None | |
| gc.collect() | |
| app_state = AppState() | |
| # ============================================================ | |
| # R environment helpers | |
| # ============================================================ | |
| def check_r_installation(): | |
| try: | |
| result = subprocess.run(['Rscript', '--version'], | |
| capture_output=True, text=True, timeout=5) | |
| return result.returncode == 0 | |
| except: | |
| return False | |
| def check_r_packages_installed(): | |
| try: | |
| result = subprocess.run( | |
| ['Rscript', '-e', | |
| 'library(MALDIquant); library(MALDIquantForeign); library(readxl); cat("OK")'], | |
| capture_output=True, text=True, timeout=10 | |
| ) | |
| return result.returncode == 0 and "OK" in result.stdout | |
| except: | |
| return False | |
| def install_r_packages(): | |
| install_script = Path(__file__).parent / 'install_r_packages.R' | |
| if not install_script.exists(): | |
| return False, "β install_r_packages.R not found" | |
| try: | |
| result = subprocess.run( | |
| ['Rscript', str(install_script)], | |
| capture_output=True, text=True, timeout=1200 | |
| ) | |
| if result.returncode == 0: | |
| return True, f"β R packages installed successfully!\n\n{result.stdout}" | |
| else: | |
| return False, f"β R package installation failed\n\n{result.stderr}" | |
| except Exception as e: | |
| return False, f"β Installation error: {str(e)}" | |
| def check_environment(): | |
| lines = [] | |
| r_ok = check_r_installation() | |
| lines.append(f"R environment: {'β Installed' if r_ok else 'β Not found'}") | |
| if r_ok: | |
| pkg_ok = check_r_packages_installed() | |
| lines.append(f"R packages (MALDIquant etc.): {'β Installed' if pkg_ok else 'β Not installed'}") | |
| else: | |
| lines.append("R packages: β οΈ Skipped (R not available)") | |
| return "\n".join(lines) | |
| # ============================================================ | |
| # Utility functions | |
| # ============================================================ | |
| def extract_files_from_zip(zip_path): | |
| """Extract TXT and Excel files from a ZIP archive.""" | |
| txt_files = [] | |
| excel_file = None | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| for file_name in zip_ref.namelist(): | |
| if file_name.lower().endswith('.txt') and not file_name.startswith('__MACOSX'): | |
| content = zip_ref.read(file_name) | |
| base_name = Path(file_name).name | |
| txt_files.append((content, base_name)) | |
| elif file_name.lower().endswith(('.xlsx', '.xls')) and not file_name.startswith('__MACOSX'): | |
| if excel_file is None: | |
| content = zip_ref.read(file_name) | |
| base_name = Path(file_name).name | |
| excel_file = (content, base_name) | |
| return txt_files, excel_file | |
| def run_r_script(script_content, work_dir): | |
| """Write and execute an R script, returning (stdout, stderr, returncode).""" | |
| script_path = Path(work_dir) / "process.R" | |
| with open(script_path, 'w', encoding='utf-8') as f: | |
| f.write(script_content) | |
| try: | |
| result = subprocess.run( | |
| ['Rscript', str(script_path)], | |
| cwd=work_dir, | |
| capture_output=True, text=True, timeout=1200 | |
| ) | |
| return result.stdout, result.stderr, result.returncode | |
| except subprocess.TimeoutExpired: | |
| return "", "Processing timed out (>20 min)", 1 | |
| except Exception as e: | |
| return "", f"Error running R script: {str(e)}", 1 | |
| # ============================================================ | |
| # Stage 1: Build training-set template | |
| # ============================================================ | |
| def process_training_set(train_zip_file, halfWindowSize, SNR, tolerance, | |
| iterations, skip_alignment, relaxed_params, | |
| progress=gr.Progress()): | |
| """Process the training set and build a feature template.""" | |
| if train_zip_file is None: | |
| return "β Please upload a training-set ZIP file first.", None, None, None, "" | |
| if not check_r_installation(): | |
| return "β R is not installed β cannot process data!", None, None, None, "" | |
| if not check_r_packages_installed(): | |
| return "β R packages not installed! Click 'Install R Packages' first.", None, None, None, "" | |
| params = { | |
| 'halfWindowSize': int(halfWindowSize), | |
| 'SNR': float(SNR), | |
| 'tolerance': float(tolerance), | |
| 'iterations': int(iterations), | |
| 'skip_alignment': skip_alignment, | |
| 'relaxed_params': relaxed_params, | |
| } | |
| progress(0.1, desc="π Step 1/6: Extracting files...") | |
| temp_dir = tempfile.mkdtemp() | |
| train_dir = Path(temp_dir) / "train" | |
| train_dir.mkdir() | |
| try: | |
| txt_files, excel_file = extract_files_from_zip(train_zip_file.name) | |
| if not txt_files: | |
| return "β No TXT files found in the ZIP archive.", None, None, None, "" | |
| if not excel_file: | |
| return "β No Excel file found in the ZIP archive.", None, None, None, "" | |
| for content, name in txt_files: | |
| cleaned = content.replace(b'\r\n', b'\n').replace(b'\r', b'\n') | |
| with open(train_dir / name, 'wb') as f: | |
| f.write(cleaned) | |
| excel_path = train_dir / excel_file[1] | |
| with open(excel_path, 'wb') as f: | |
| f.write(excel_file[0]) | |
| progress(0.2, desc="π Step 2/6: Generating R script...") | |
| alignment_block = "" | |
| if not params.get('skip_alignment', False): | |
| relaxed_block = "" | |
| if params.get('relaxed_params', True): | |
| relaxed_block = f''' | |
| cat("Retrying with relaxed parameters...\\n") | |
| tryCatch({{ | |
| avgSpectra <<- alignSpectra(avgSpectra, | |
| halfWindowSize = {params['halfWindowSize']}, | |
| SNR = max(1.5, {params['SNR']} - 0.5), | |
| tolerance = {params['tolerance']} * 2, | |
| warpingMethod = "lowess") | |
| alignment_success <<- TRUE | |
| cat("Alignment succeeded with relaxed parameters.\\n") | |
| }}, error = function(e2) {{ | |
| cat("Relaxed parameters also failed β skipping alignment.\\n") | |
| }}) | |
| ''' | |
| alignment_block = f''' | |
| cat("Aligning average spectra...\\n") | |
| alignment_success <- FALSE | |
| tryCatch({{ | |
| avgSpectra <- alignSpectra(avgSpectra, | |
| halfWindowSize = {params['halfWindowSize']}, | |
| SNR = {params['SNR']}, | |
| tolerance = {params['tolerance']}, | |
| warpingMethod = "lowess") | |
| alignment_success <- TRUE | |
| cat("Alignment complete.\\n") | |
| }}, error = function(e) {{ | |
| cat(sprintf("Alignment failed: %s\\n", e$message)) | |
| {relaxed_block} | |
| }}) | |
| if (!alignment_success) {{ | |
| cat("Warning: spectral alignment failed β continuing with unaligned data.\\n") | |
| }} | |
| ''' | |
| r_script = f""" | |
| user_lib <- Sys.getenv("R_LIBS_USER") | |
| if (user_lib == "") {{ user_lib <- "~/R/library" }} | |
| if (!dir.exists(user_lib)) {{ dir.create(user_lib, recursive = TRUE) }} | |
| .libPaths(c(user_lib, .libPaths())) | |
| library('MALDIquant') | |
| library('MALDIquantForeign') | |
| library('readxl') | |
| cat("Starting training-set processing...\\n") | |
| cat("Reading Excel and TXT files...\\n") | |
| samples <- read_excel('{excel_path.as_posix()}') | |
| txt_files <- list.files('{train_dir.as_posix()}', pattern = "\\\\.txt$", full.names = TRUE) | |
| cat(sprintf("Found %d TXT file(s)\\n", length(txt_files))) | |
| training_spectra <- list() | |
| failed_files <- c() | |
| for (f in txt_files) {{ | |
| tryCatch({{ | |
| s <- importTxt(f) | |
| if (length(s) > 0) {{ training_spectra <- c(training_spectra, s) }} | |
| }}, error = function(e) {{ | |
| cat(sprintf(" β οΈ Skipping %s: %s\\n", basename(f), e$message)) | |
| failed_files <<- c(failed_files, basename(f)) | |
| }}) | |
| }} | |
| cat(sprintf("Successfully imported: %d spectra\\n", length(training_spectra))) | |
| if (length(failed_files) > 0) {{ | |
| cat(sprintf("Skipped (bad format): %d file(s)\\n", length(failed_files))) | |
| }} | |
| if (length(training_spectra) == 0) {{ | |
| stop("No spectra imported. Please check TXT file format.") | |
| }} | |
| cat("Pre-processing (1/5): intensity transformation...\\n") | |
| training_spectra <- transformIntensity(training_spectra, method = "sqrt") | |
| cat("Pre-processing (2/5): smoothing...\\n") | |
| training_spectra <- smoothIntensity(training_spectra, method = "SavitzkyGolay", | |
| halfWindowSize = {params['halfWindowSize']}) | |
| cat("Pre-processing (3/5): baseline removal...\\n") | |
| training_spectra <- removeBaseline(training_spectra, method = "SNIP", | |
| iterations = {params['iterations']}) | |
| cat("Pre-processing (4/5): intensity calibration...\\n") | |
| training_spectra <- calibrateIntensity(training_spectra, method = "TIC") | |
| cat("Pre-processing (5/5): assigning labels...\\n") | |
| train_labels <- samples$group[match( | |
| sapply(training_spectra, function(s) basename(s@metaData$file)), | |
| samples$file | |
| )] | |
| cat("Computing average spectra...\\n") | |
| avgSpectra <- averageMassSpectra(training_spectra, labels = train_labels) | |
| cat(sprintf("Average spectra computed: %d group(s)\\n", length(avgSpectra))) | |
| {alignment_block} | |
| cat("Detecting peaks and building feature template...\\n") | |
| train_peaks <- detectPeaks(avgSpectra, | |
| method = "MAD", | |
| halfWindowSize = {params['halfWindowSize']}, | |
| SNR = {params['SNR']}) | |
| cat("Binning peaks...\\n") | |
| train_binned <- binPeaks(train_peaks, tolerance = 2) | |
| cat("Extracting feature m/z values...\\n") | |
| feature_mz <- as.numeric(unique(unlist(lapply(train_binned, function(p) p@mass)))) | |
| feature_mz <- sort(feature_mz) | |
| cat(sprintf("Training-set features: %d peaks\\n", length(feature_mz))) | |
| cat(sprintf("m/z range: %.0f - %.0f\\n", min(feature_mz), max(feature_mz))) | |
| cat("Saving feature template...\\n") | |
| feature_template <- data.frame( | |
| feature_id = paste0("mz_", round(feature_mz)), | |
| mz = feature_mz | |
| ) | |
| write.csv(feature_template, file = '{temp_dir}/feature_template.csv', row.names = FALSE) | |
| cat("Building training-set intensity matrix...\\n") | |
| train_intensity_matrix <- intensityMatrix(train_binned, avgSpectra) | |
| bin_centers <- as.numeric(colnames(train_intensity_matrix)) | |
| colnames(train_intensity_matrix) <- paste0("mz_", round(bin_centers)) | |
| rownames(train_intensity_matrix) <- unique(train_labels) | |
| train_df <- as.data.frame(train_intensity_matrix) | |
| train_df <- cbind(group = rownames(train_df), train_df) | |
| write.csv(train_df, file = '{temp_dir}/peak_intensity_train.csv', row.names = FALSE) | |
| cat("Saving processing parameters...\\n") | |
| params_df <- data.frame( | |
| parameter = c('halfWindowSize', 'SNR', 'tolerance', 'iterations', 'skip_alignment'), | |
| value = c({params['halfWindowSize']}, {params['SNR']}, {params['tolerance']}, | |
| {params['iterations']}, {'TRUE' if params.get('skip_alignment', False) else 'FALSE'}) | |
| ) | |
| write.csv(params_df, '{temp_dir}/processing_params.csv', row.names = FALSE) | |
| cat("Training-set processing complete!\\n") | |
| cat(sprintf(" Groups: %d\\n", nrow(train_df))) | |
| cat(sprintf(" Features: %d\\n", ncol(train_df) - 1)) | |
| """ | |
| progress(0.3, desc="π¬ Step 3/6: Processing training set (may take a few minutes)...") | |
| stdout, stderr, returncode = run_r_script(r_script, temp_dir) | |
| if returncode == 0: | |
| progress(0.8, desc="π Step 4/6: Reading results...") | |
| template_df = pd.read_csv(Path(temp_dir) / 'feature_template.csv') | |
| train_df = pd.read_csv(Path(temp_dir) / 'peak_intensity_train.csv') | |
| app_state.template_created = True | |
| app_state.template_data = template_df | |
| app_state.processing_params = params | |
| progress(0.95, desc="πΎ Step 5/6: Saving output files...") | |
| output_dir = Path(tempfile.mkdtemp()) | |
| TEMP_DIRS_REGISTRY.append(str(output_dir)) | |
| shutil.copy(Path(temp_dir) / 'peak_intensity_train.csv', output_dir / 'peak_intensity_train.csv') | |
| shutil.copy(Path(temp_dir) / 'feature_template.csv', output_dir / 'feature_template.csv') | |
| shutil.copy(Path(temp_dir) / 'processing_params.csv', output_dir / 'processing_params.csv') | |
| progress(1.0, desc="β Done!") | |
| n_groups = len(train_df) | |
| n_features = len(template_df) | |
| mz_range = f"{template_df['mz'].min():.0f} β {template_df['mz'].max():.0f}" | |
| summary = ( | |
| f"β **Training set processed β feature template built!**\n\n" | |
| f"π **Summary:**\n" | |
| f"- Groups: **{n_groups}**\n" | |
| f"- Features: **{n_features}**\n" | |
| f"- m/z range: **{mz_range}**\n\n" | |
| f"π‘ You can now switch to Stage 2 to process the validation set." | |
| ) | |
| return ( | |
| summary, | |
| str(output_dir / 'peak_intensity_train.csv'), | |
| str(output_dir / 'feature_template.csv'), | |
| str(output_dir / 'processing_params.csv'), | |
| stdout | |
| ) | |
| else: | |
| return f"β Processing failed!\n\n{stderr}", None, None, None, stdout | |
| except Exception as e: | |
| return f"β Unexpected error: {str(e)}", None, None, None, "" | |
| finally: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| # ============================================================ | |
| # Stage 2: Process validation set | |
| # ============================================================ | |
| def process_validation_set(valid_zip_file, template_csv_file, | |
| halfWindowSize, SNR, tolerance, | |
| iterations, skip_alignment, relaxed_params, | |
| progress=gr.Progress()): | |
| """Apply the training-set template to a validation set.""" | |
| if valid_zip_file is None: | |
| return "β Please upload a validation-set ZIP file first.", None, "" | |
| template_from_upload = False | |
| if template_csv_file is not None: | |
| try: | |
| uploaded_template = pd.read_csv(template_csv_file.name) | |
| if 'mz' not in uploaded_template.columns: | |
| return "β Uploaded template CSV is missing the 'mz' column.", None, "" | |
| template_df = uploaded_template | |
| template_from_upload = True | |
| except Exception as e: | |
| return f"β Failed to read template CSV: {str(e)}", None, "" | |
| elif app_state.template_created and app_state.template_data is not None: | |
| template_df = app_state.template_data | |
| else: | |
| return ("β No feature template available!\n" | |
| "Complete Stage 1 first, or upload a saved feature_template.csv."), None, "" | |
| if not check_r_installation(): | |
| return "β R is not installed!", None, "" | |
| if not check_r_packages_installed(): | |
| return "β R packages not installed!", None, "" | |
| if not template_from_upload and app_state.processing_params: | |
| params = app_state.processing_params | |
| else: | |
| params = { | |
| 'halfWindowSize': int(halfWindowSize), | |
| 'SNR': float(SNR), | |
| 'tolerance': float(tolerance), | |
| 'iterations': int(iterations), | |
| 'skip_alignment': skip_alignment, | |
| 'relaxed_params': relaxed_params, | |
| } | |
| progress(0.1, desc="π Step 1/5: Extracting validation files...") | |
| temp_dir = tempfile.mkdtemp() | |
| valid_dir = Path(temp_dir) / "validation" | |
| valid_dir.mkdir() | |
| try: | |
| txt_files, _ = extract_files_from_zip(valid_zip_file.name) | |
| if not txt_files: | |
| return "β No TXT files found in the ZIP archive.", None, "" | |
| for content, name in txt_files: | |
| cleaned = content.replace(b'\r\n', b'\n').replace(b'\r', b'\n') | |
| with open(valid_dir / name, 'wb') as f: | |
| f.write(cleaned) | |
| progress(0.2, desc="π Step 2/5: Preparing feature template...") | |
| template_path = Path(temp_dir) / 'feature_template.csv' | |
| template_df.to_csv(template_path, index=False) | |
| n_features = len(template_df) | |
| template_source = "uploaded CSV" if template_from_upload else "Stage 1 in-memory template" | |
| progress(0.3, desc="π Step 3/5: Generating R script...") | |
| r_script = f""" | |
| user_lib <- Sys.getenv("R_LIBS_USER") | |
| if (user_lib == "") {{ user_lib <- "~/R/library" }} | |
| if (!dir.exists(user_lib)) {{ dir.create(user_lib, recursive = TRUE) }} | |
| .libPaths(c(user_lib, .libPaths())) | |
| library('MALDIquant') | |
| library('MALDIquantForeign') | |
| cat("Processing validation set with training template...\\n") | |
| template <- read.csv('{template_path.as_posix()}') | |
| template_mz <- template$mz | |
| cat(sprintf("Feature template: %d m/z values\\n", length(template_mz))) | |
| cat("Reading validation TXT files...\\n") | |
| txt_files <- list.files('{valid_dir.as_posix()}', pattern = "\\\\.txt$", full.names = TRUE) | |
| cat(sprintf("Found %d TXT file(s)\\n", length(txt_files))) | |
| validation_spectra <- list() | |
| failed_files <- c() | |
| for (f in txt_files) {{ | |
| tryCatch({{ | |
| s <- importTxt(f) | |
| if (length(s) > 0) {{ validation_spectra <- c(validation_spectra, s) }} | |
| }}, error = function(e) {{ | |
| cat(sprintf(" β οΈ Skipping %s: %s\\n", basename(f), e$message)) | |
| failed_files <<- c(failed_files, basename(f)) | |
| }}) | |
| }} | |
| cat(sprintf("Successfully imported: %d spectra\\n", length(validation_spectra))) | |
| if (length(failed_files) > 0) {{ | |
| cat(sprintf("Skipped (bad format): %d file(s)\\n", length(failed_files))) | |
| }} | |
| if (length(validation_spectra) == 0) {{ | |
| stop("No spectra imported. Please check TXT file format.") | |
| }} | |
| cat("Pre-processing (1/4): intensity transformation...\\n") | |
| validation_spectra <- transformIntensity(validation_spectra, method = "sqrt") | |
| cat("Pre-processing (2/4): smoothing...\\n") | |
| validation_spectra <- smoothIntensity(validation_spectra, method = "SavitzkyGolay", | |
| halfWindowSize = {params['halfWindowSize']}) | |
| cat("Pre-processing (3/4): baseline removal...\\n") | |
| validation_spectra <- removeBaseline(validation_spectra, method = "SNIP", | |
| iterations = {params['iterations']}) | |
| cat("Pre-processing (4/4): intensity calibration...\\n") | |
| validation_spectra <- calibrateIntensity(validation_spectra, method = "TIC") | |
| cat("Aligning validation spectra...\\n") | |
| tryCatch({{ | |
| validation_spectra <- alignSpectra(validation_spectra, | |
| halfWindowSize = {params['halfWindowSize']}, | |
| SNR = {params['SNR']}, | |
| tolerance = {params['tolerance']}, | |
| warpingMethod = "lowess") | |
| cat("Alignment complete.\\n") | |
| }}, error = function(e) {{ | |
| cat("Standard alignment failed β trying relaxed parameters...\\n") | |
| cat(sprintf("Error: %s\\n", e$message)) | |
| tryCatch({{ | |
| validation_spectra <<- alignSpectra(validation_spectra, | |
| halfWindowSize = {params['halfWindowSize']}, | |
| SNR = max(1.5, {params['SNR']} - 0.5), | |
| tolerance = {params['tolerance']} * 1.5, | |
| warpingMethod = "lowess") | |
| cat("Alignment succeeded with relaxed parameters.\\n") | |
| }}, error = function(e2) {{ | |
| cat("Relaxed alignment also failed β skipping alignment step.\\n") | |
| }}) | |
| }}) | |
| cat("Extracting intensities using template...\\n") | |
| n_samples <- length(validation_spectra) | |
| n_features <- length(template_mz) | |
| intensity_matrix <- matrix(0, nrow = n_samples, ncol = n_features) | |
| for (i in 1:n_samples) {{ | |
| if (i %% 50 == 0) {{ | |
| cat(sprintf(" Progress: %d/%d\\n", i, n_samples)) | |
| }} | |
| spec <- validation_spectra[[i]] | |
| for (j in 1:n_features) {{ | |
| target_mz <- template_mz[j] | |
| if (length(spec@mass) > 0) {{ | |
| idx <- which(abs(spec@mass - target_mz) <= 2) | |
| if (length(idx) > 0) {{ | |
| closest_idx <- idx[which.min(abs(spec@mass[idx] - target_mz))] | |
| intensity_matrix[i, j] <- spec@intensity[closest_idx] | |
| }} | |
| }} | |
| }} | |
| }} | |
| colnames(intensity_matrix) <- paste0("mz_", round(template_mz)) | |
| sample_names <- sapply(validation_spectra, function(s) basename(s@metaData$file)) | |
| rownames(intensity_matrix) <- sample_names | |
| cat("Saving validation results...\\n") | |
| valid_df <- as.data.frame(intensity_matrix) | |
| valid_df <- cbind(sample = rownames(valid_df), valid_df) | |
| write.csv(valid_df, file = '{temp_dir}/peak_intensity_validation.csv', row.names = FALSE) | |
| cat("Validation-set processing complete!\\n") | |
| cat(sprintf(" Samples: %d\\n", nrow(valid_df))) | |
| cat(sprintf(" Features: %d (consistent with training set)\\n", ncol(valid_df) - 1)) | |
| """ | |
| progress(0.4, desc="π¬ Step 4/5: Processing validation set (may take a few minutes)...") | |
| stdout, stderr, returncode = run_r_script(r_script, temp_dir) | |
| if returncode == 0: | |
| progress(0.9, desc="π Step 5/5: Reading results...") | |
| valid_df = pd.read_csv(Path(temp_dir) / 'peak_intensity_validation.csv') | |
| output_dir = Path(tempfile.mkdtemp()) | |
| TEMP_DIRS_REGISTRY.append(str(output_dir)) | |
| shutil.copy( | |
| Path(temp_dir) / 'peak_intensity_validation.csv', | |
| output_dir / 'peak_intensity_validation.csv' | |
| ) | |
| progress(1.0, desc="β Done!") | |
| summary = ( | |
| f"β **Validation set processed!**\n\n" | |
| f"π **Summary:**\n" | |
| f"- Template source: **{template_source}**\n" | |
| f"- Template features: **{n_features}**\n" | |
| f"- Validation samples: **{len(valid_df)}**\n" | |
| f"- Output features: **{len(valid_df.columns) - 1}**\n" | |
| f"- Feature consistency: β Aligned with template" | |
| ) | |
| return summary, str(output_dir / 'peak_intensity_validation.csv'), stdout | |
| else: | |
| return f"β Processing failed!\n\n{stderr}", None, stdout | |
| except Exception as e: | |
| return f"β Unexpected error: {str(e)}", None, "" | |
| finally: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| # ============================================================ | |
| # Data Preparation: generate sample-info Excel from folder ZIP | |
| # ============================================================ | |
| def generate_sample_excel(zip_file, group_mode, progress=gr.Progress()): | |
| """ | |
| Parse folder structure from a ZIP and generate a sample-info Excel. | |
| group_mode: | |
| - "Folder name + sample name": group = folder_sample (e.g. KPN-R_1-1) | |
| - "Folder name only": group = folder name (e.g. KPN-R) | |
| β samples in the same folder will be averaged in Stage 1 | |
| """ | |
| if zip_file is None: | |
| return "β Please upload a ZIP file first.", None, None, None | |
| progress(0.1, desc="π Parsing ZIP file structure...") | |
| try: | |
| records = [] | |
| txt_contents = {} | |
| with zipfile.ZipFile(zip_file.name, 'r') as zf: | |
| for file_path in sorted(zf.namelist()): | |
| if '__MACOSX' in file_path or file_path.startswith('.'): | |
| continue | |
| if not file_path.lower().endswith('.txt'): | |
| continue | |
| parts = Path(file_path).parts | |
| folder_name = parts[-2] if len(parts) >= 2 else "ungrouped" | |
| file_name = parts[-1] | |
| sample_stem = Path(file_name).stem | |
| if file_name in txt_contents: | |
| file_name = f"{folder_name}_{file_name}" | |
| sample_stem = Path(file_name).stem | |
| group_name = (f"{folder_name}_{sample_stem}" | |
| if group_mode == "Folder name + sample name" | |
| else folder_name) | |
| records.append({'file': file_name, 'group': group_name}) | |
| txt_contents[file_name] = zf.read(file_path) | |
| if not records: | |
| return "β No TXT files found in the ZIP archive.", None, None, None | |
| progress(0.5, desc="π Generating Excel file...") | |
| df = pd.DataFrame(records) | |
| group_counts = df['group'].value_counts() | |
| n_groups = len(group_counts) | |
| n_files = len(df) | |
| folder_counts = {} | |
| for _, row in df.iterrows(): | |
| fn = row['group'].split('_')[0] if group_mode == "Folder name + sample name" else row['group'] | |
| folder_counts[fn] = folder_counts.get(fn, 0) + 1 | |
| output_dir = Path(tempfile.mkdtemp()) | |
| TEMP_DIRS_REGISTRY.append(str(output_dir)) | |
| excel_path = output_dir / 'sample_info.xlsx' | |
| df.to_excel(excel_path, index=False, engine='openpyxl') | |
| progress(0.7, desc="π¦ Packing ready-to-use training ZIP...") | |
| ready_zip_path = output_dir / 'training_data_ready.zip' | |
| with zipfile.ZipFile(ready_zip_path, 'w', zipfile.ZIP_DEFLATED) as zout: | |
| for fname, content in txt_contents.items(): | |
| zout.writestr(fname, content) | |
| zout.write(excel_path, 'sample_info.xlsx') | |
| progress(1.0, desc="β Done!") | |
| summary_lines = [ | |
| f"β **Sample info Excel generated!**\n", | |
| f"π **Summary:**", | |
| f"- Grouping mode: **{group_mode}**", | |
| f"- Total files: **{n_files}**", | |
| f"- Groups (group): **{n_groups}**", | |
| ] | |
| if group_mode == "Folder name + sample name": | |
| summary_lines.append(f"- Folders: **{len(folder_counts)}**") | |
| summary_lines.append(f"\nπ Samples per folder:") | |
| for folder, cnt in sorted(folder_counts.items()): | |
| summary_lines.append(f" - `{folder}`: {cnt} sample(s)") | |
| summary_lines.append( | |
| f"\nπ‘ Each sample has a unique group name (e.g. `KPN-R_1-1`). " | |
| f"In Stage 1, **each sample is kept separately** β no averaging." | |
| ) | |
| else: | |
| summary_lines.append(f"\nπ Samples per group:") | |
| for grp, cnt in group_counts.items(): | |
| summary_lines.append(f" - `{grp}`: {cnt} sample(s)") | |
| summary_lines.append( | |
| f"\nβ οΈ Samples in the same group will be **averaged** in Stage 1. " | |
| f"If there are few groups, the final dataset will have few rows." | |
| ) | |
| summary_lines.append( | |
| f"\nπ‘ **Next step:** Download the 'Training Data ZIP' and upload it directly in Stage 1." | |
| ) | |
| preview_df = df.head(20) | |
| return "\n".join(summary_lines), str(excel_path), str(ready_zip_path), preview_df | |
| except zipfile.BadZipFile: | |
| return "β The file is not a valid ZIP archive.", None, None, None | |
| except Exception as e: | |
| return f"β Unexpected error: {str(e)}", None, None, None | |
| # ============================================================ | |
| # Gradio UI | |
| # ============================================================ | |
| CUSTOM_CSS = """ | |
| .main-title { text-align: center; margin-bottom: 0.5rem; } | |
| .phase-header { | |
| background: linear-gradient(90deg, #1f77b4 0%, #4a9eff 100%); | |
| color: white; padding: 0.8rem 1rem; border-radius: 8px; | |
| margin: 0.5rem 0; font-size: 1.1rem; font-weight: 600; | |
| } | |
| """ | |
| with gr.Blocks( | |
| title="π¬ MALDI-TOF MS Template Processing Platform", | |
| theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky", neutral_hue="slate"), | |
| css=CUSTOM_CSS, | |
| ) as demo: | |
| gr.Markdown( | |
| "# π¬ MALDI-TOF MS Template Processing Platform\n" | |
| "### Build a feature template from the training set, then batch-process validation sets\n" | |
| "---" | |
| ) | |
| # ββ Environment check ββββββββββββββββββββββββββββββββββ | |
| with gr.Accordion("π§ Environment Check & R Package Management", open=False): | |
| with gr.Row(): | |
| env_check_btn = gr.Button("π Check Environment", variant="secondary") | |
| install_btn = gr.Button("π¦ Install R Packages", variant="primary") | |
| env_status = gr.Textbox(label="Environment Status", interactive=False, lines=3) | |
| env_check_btn.click(fn=check_environment, outputs=env_status) | |
| install_btn.click(fn=lambda: install_r_packages()[1], outputs=env_status) | |
| # ββ Processing parameters ββββββββββββββββββββββββββββββ | |
| with gr.Accordion("βοΈ Processing Parameters", open=False): | |
| with gr.Row(): | |
| halfWindowSize = gr.Slider(10, 200, value=90, step=10, | |
| label="Half-window size (halfWindowSize)") | |
| SNR = gr.Slider(1.0, 10.0, value=2.0, step=0.5, | |
| label="Signal-to-noise ratio threshold (SNR)") | |
| with gr.Row(): | |
| tolerance = gr.Slider(0.001, 0.02, value=0.008, step=0.001, | |
| label="Alignment tolerance (tolerance)") | |
| iterations = gr.Slider(50, 200, value=100, step=10, | |
| label="Baseline removal iterations (iterations)") | |
| with gr.Row(): | |
| skip_alignment = gr.Checkbox(label="Skip spectral alignment", value=False, | |
| info="Check this if alignment keeps failing") | |
| relaxed_params = gr.Checkbox(label="Use relaxed parameters", value=True, | |
| info="Automatically loosen parameters to improve success rate") | |
| param_inputs = [halfWindowSize, SNR, tolerance, iterations, skip_alignment, relaxed_params] | |
| # ββ Main tabs ββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Tabs(): | |
| # ββ Data Preparation ββ | |
| with gr.TabItem("π Data Preparation: Generate Sample Info"): | |
| gr.HTML('<div class="phase-header">π Data Preparation: Auto-generate Sample Info Excel from Folder Structure</div>') | |
| gr.Markdown( | |
| "π‘ **Automatically organise folder-grouped TXT files into the format required by Stage 1.**\n\n" | |
| "**Required ZIP structure:**\n" | |
| "```\n" | |
| "your_archive.zip/\n" | |
| " βββ KPN-R/ β folder name = group\n" | |
| " β βββ 1-1.txt\n" | |
| " β βββ 1-2.txt\n" | |
| " βββ KPN-S/\n" | |
| " β βββ 2-1.txt\n" | |
| " β βββ 2-2.txt\n" | |
| "```" | |
| ) | |
| prep_zip = gr.File(label="Upload ZIP file (with grouped sub-folders)", file_types=[".zip"]) | |
| group_mode = gr.Radio( | |
| choices=["Folder name + sample name", "Folder name only"], | |
| value="Folder name + sample name", | |
| label="π Grouping mode", | |
| info=( | |
| "'Folder name + sample name' β each sample gets a unique group (e.g. KPN-R_1-1) β recommended; " | |
| "'Folder name only' β samples in the same folder are averaged in Stage 1" | |
| ) | |
| ) | |
| prep_btn = gr.Button("π Generate Sample Info", variant="primary", size="lg") | |
| prep_status = gr.Markdown() | |
| with gr.Row(): | |
| prep_excel_dl = gr.File(label="π Sample Info Excel (sample_info.xlsx)", interactive=False) | |
| prep_ready_zip_dl = gr.File(label="π¦ Ready-to-use Training ZIP (use directly in Stage 1)", interactive=False) | |
| prep_preview = gr.Dataframe(label="π Preview (first 20 rows)", interactive=False) | |
| prep_btn.click( | |
| fn=generate_sample_excel, | |
| inputs=[prep_zip, group_mode], | |
| outputs=[prep_status, prep_excel_dl, prep_ready_zip_dl, prep_preview], | |
| ) | |
| # ββ Stage 1 ββ | |
| with gr.TabItem("π― Stage 1: Build Training Template"): | |
| gr.HTML('<div class="phase-header">π Stage 1: Build Feature Template from Training Set</div>') | |
| gr.Markdown( | |
| "π‘ **Process the training set and build the feature template (one-time step).**\n\n" | |
| "The ZIP file must contain: multiple `.txt` mass-spectrum files + one `.xlsx` sample-info file " | |
| "(with `file` and `group` columns)." | |
| ) | |
| train_zip = gr.File(label="Upload Training Set ZIP", file_types=[".zip"]) | |
| train_btn = gr.Button("π― Build Training Template", variant="primary", size="lg") | |
| train_status = gr.Markdown() | |
| with gr.Row(): | |
| train_csv_dl = gr.File(label="π Training Results", interactive=False) | |
| template_csv_dl = gr.File(label="π― Feature Template", interactive=False) | |
| params_csv_dl = gr.File(label="βοΈ Processing Parameters", interactive=False) | |
| with gr.Accordion("π Processing Log", open=False): | |
| train_log = gr.Textbox(label="R Script Output", lines=15, interactive=False) | |
| train_btn.click( | |
| fn=process_training_set, | |
| inputs=[train_zip] + param_inputs, | |
| outputs=[train_status, train_csv_dl, template_csv_dl, params_csv_dl, train_log], | |
| ) | |
| # ββ Stage 2 ββ | |
| with gr.TabItem("π Stage 2: Process Validation Set"): | |
| gr.HTML('<div class="phase-header">π Stage 2: Process Validation Set Using Template</div>') | |
| gr.Markdown( | |
| "π‘ **Two ways to provide the feature template:**\n\n" | |
| "1. **After Stage 1** β the template is kept in memory; just upload the validation ZIP.\n" | |
| "2. **Upload saved template** β upload a previously saved `feature_template.csv`; " | |
| "no need to re-run Stage 1." | |
| ) | |
| with gr.Row(): | |
| valid_zip = gr.File(label="π Validation Set ZIP (required)", file_types=[".zip"]) | |
| template_upload = gr.File(label="π― Feature Template CSV (optional β leave blank to use Stage 1 template)", | |
| file_types=[".csv"]) | |
| valid_btn = gr.Button("π Process Validation Set", variant="primary", size="lg") | |
| valid_status = gr.Markdown() | |
| valid_csv_dl = gr.File(label="π Validation Results", interactive=False) | |
| with gr.Accordion("π Processing Log", open=False): | |
| valid_log = gr.Textbox(label="R Script Output", lines=15, interactive=False) | |
| valid_btn.click( | |
| fn=process_validation_set, | |
| inputs=[valid_zip, template_upload] + param_inputs, | |
| outputs=[valid_status, valid_csv_dl, valid_log], | |
| ) | |
| # ββ Cache management βββββββββββββββββββββββββββββββββββ | |
| with gr.Accordion("π§Ή Cache Management", open=False): | |
| gr.Markdown( | |
| "π‘ Temporary files are created during processing. " | |
| "For long sessions or large datasets, periodic cleanup is recommended.\n\n" | |
| "β° The system checks automatically every 30 minutes and cleans up if usage exceeds 500 MB." | |
| ) | |
| cache_status_box = gr.Markdown(value="Click 'View Cache Status' to get information.") | |
| with gr.Row(): | |
| check_cache_btn = gr.Button("π View Cache Status", variant="secondary") | |
| clean_cache_btn = gr.Button("π§Ή Clear All Cache Now", variant="stop") | |
| gr.Markdown("β οΈ **Note:** Clearing the cache resets the feature template β Stage 1 must be re-run.") | |
| check_cache_btn.click(fn=get_cache_status, outputs=cache_status_box) | |
| clean_cache_btn.click(fn=manual_cleanup, outputs=cache_status_box) | |
| # ββ Footer βββββββββββββββββββββββββββββββββββββββββββββ | |
| gr.Markdown( | |
| "---\n" | |
| "**MALDI-TOF MS Template Processing Platform** | " | |
| "Pipeline: β-transform β SavitzkyGolay smoothing β SNIP baseline removal β " | |
| "TIC calibration β Lowess alignment β MAD peak detection | " | |
| "[Original project](https://github.com/MengyuZhang163/MALDI-TOF-MS-1.3)" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(ssr_mode=False, server_name="0.0.0.0", server_port=7860) |