""" ๐ฌ MALDI-TOF MS Template-Based Processing Platform (Hugging Face Spaces) Build a feature template from training set, then batch-process validation sets. Original project: https://github.com/MengyuZhang163/MALDI-TOF-MS-1.3 Stack: Gradio + R (MALDIquant / MALDIquantForeign) """ import gradio as gr import pandas as pd import subprocess import tempfile import shutil from pathlib import Path import zipfile import io import os import gc import json import time import threading import glob # ============================================================ # Auto-install R at startup if not present # ============================================================ def _ensure_r_installed(): """Install r-base via apt if Rscript is not found.""" try: subprocess.run(['Rscript', '--version'], capture_output=True, timeout=5, check=True) print("[R] Rscript already available.") return except Exception: pass print("[R] Rscript not found โ installing r-base via apt-get...") cmds = [ ['apt-get', 'update', '-qq'], ['apt-get', 'install', '-y', '-qq', 'r-base', 'r-base-dev', 'libxml2-dev', 'libcurl4-openssl-dev', 'libssl-dev'], ] for cmd in cmds: result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"[R] Warning: {' '.join(cmd)} failed:\n{result.stderr}") else: print(f"[R] {' '.join(cmd[:3])} ... done") try: subprocess.run(['Rscript', '--version'], capture_output=True, timeout=5, check=True) print("[R] r-base installed successfully.") except Exception: print("[R] ERROR: Rscript still not available after installation attempt.") _ensure_r_installed() # ============================================================ # Cache / temp-file management # ============================================================ TEMP_DIRS_REGISTRY = [] # track all temp dirs we create def get_cache_size_mb(): """Return total size (MB) of all tmp* directories.""" total = 0 temp_root = tempfile.gettempdir() try: for entry in os.scandir(temp_root): if entry.is_dir() and entry.name.startswith('tmp'): for dirpath, dirnames, filenames in os.walk(entry.path): for f in filenames: try: total += os.path.getsize(os.path.join(dirpath, f)) except OSError: pass except OSError: pass return total / (1024 * 1024) def cleanup_temp_files(): """Delete all temporary files and reset application state.""" cleaned = 0 temp_root = tempfile.gettempdir() for d in TEMP_DIRS_REGISTRY[:]: if os.path.exists(d): try: shutil.rmtree(d, ignore_errors=True) cleaned += 1 except: pass TEMP_DIRS_REGISTRY.clear() try: for entry in os.scandir(temp_root): if entry.is_dir() and entry.name.startswith('tmp'): try: shutil.rmtree(entry.path, ignore_errors=True) cleaned += 1 except: pass except OSError: pass app_state.reset(keep_template=False) gc.collect() return cleaned def manual_cleanup(): """Callback for the manual-cleanup button.""" size_before = get_cache_size_mb() cleaned = cleanup_temp_files() size_after = get_cache_size_mb() freed = max(0, size_before - size_after) return ( f"๐งน **Cleanup complete!**\n\n" f"- Before: {size_before:.1f} MB\n" f"- After: {size_after:.1f} MB\n" f"- Freed: {freed:.1f} MB\n" f"- Directories removed: {cleaned}\n" f"- Template status: reset (re-run Stage 1 to rebuild)\n\n" f"โฐ {time.strftime('%Y-%m-%d %H:%M:%S')}" ) def get_cache_status(): """Return a summary of current cache usage.""" size = get_cache_size_mb() template_status = "โ Built" if app_state.template_created else "โ Not built" n_features = len(app_state.template_data) if app_state.template_data is not None else 0 status = ( f"๐พ **Cache Status**\n\n" f"- Temp files: **{size:.1f} MB**\n" f"- Feature template: {template_status}" ) if n_features > 0: status += f" ({n_features} features)" status += f"\n- Checked at: {time.strftime('%H:%M:%S')}" if size > 500: status += "\n\nโ ๏ธ **Cache is large โ cleanup recommended!**" return status def auto_cleanup_worker(): """Background thread: check every 30 min, auto-clean if >500 MB.""" while True: time.sleep(1800) try: size = get_cache_size_mb() if size > 500: cleanup_temp_files() print(f"[Auto-cleanup] Cache was {size:.1f} MB โ cleaned @ {time.strftime('%H:%M:%S')}") except: pass _cleanup_thread = threading.Thread(target=auto_cleanup_worker, daemon=True) _cleanup_thread.start() # ============================================================ # Global application state # ============================================================ class AppState: """Holds in-memory state between Gradio interactions.""" def __init__(self): self.template_created = False self.template_data = None # DataFrame self.processing_params = None self.train_result = None def reset(self, keep_template=True): if not keep_template: self.template_created = False self.template_data = None self.processing_params = None self.train_result = None gc.collect() app_state = AppState() # ============================================================ # R environment helpers # ============================================================ def check_r_installation(): try: result = subprocess.run(['Rscript', '--version'], capture_output=True, text=True, timeout=5) return result.returncode == 0 except: return False def check_r_packages_installed(): try: result = subprocess.run( ['Rscript', '-e', 'library(MALDIquant); library(MALDIquantForeign); library(readxl); cat("OK")'], capture_output=True, text=True, timeout=10 ) return result.returncode == 0 and "OK" in result.stdout except: return False def install_r_packages(): install_script = Path(__file__).parent / 'install_r_packages.R' if not install_script.exists(): return False, "โ install_r_packages.R not found" try: result = subprocess.run( ['Rscript', str(install_script)], capture_output=True, text=True, timeout=1200 ) if result.returncode == 0: return True, f"โ R packages installed successfully!\n\n{result.stdout}" else: return False, f"โ R package installation failed\n\n{result.stderr}" except Exception as e: return False, f"โ Installation error: {str(e)}" def check_environment(): lines = [] r_ok = check_r_installation() lines.append(f"R environment: {'โ Installed' if r_ok else 'โ Not found'}") if r_ok: pkg_ok = check_r_packages_installed() lines.append(f"R packages (MALDIquant etc.): {'โ Installed' if pkg_ok else 'โ Not installed'}") else: lines.append("R packages: โ ๏ธ Skipped (R not available)") return "\n".join(lines) # ============================================================ # Utility functions # ============================================================ def extract_files_from_zip(zip_path): """Extract TXT and Excel files from a ZIP archive.""" txt_files = [] excel_file = None with zipfile.ZipFile(zip_path, 'r') as zip_ref: for file_name in zip_ref.namelist(): if file_name.lower().endswith('.txt') and not file_name.startswith('__MACOSX'): content = zip_ref.read(file_name) base_name = Path(file_name).name txt_files.append((content, base_name)) elif file_name.lower().endswith(('.xlsx', '.xls')) and not file_name.startswith('__MACOSX'): if excel_file is None: content = zip_ref.read(file_name) base_name = Path(file_name).name excel_file = (content, base_name) return txt_files, excel_file def run_r_script(script_content, work_dir): """Write and execute an R script, returning (stdout, stderr, returncode).""" script_path = Path(work_dir) / "process.R" with open(script_path, 'w', encoding='utf-8') as f: f.write(script_content) try: result = subprocess.run( ['Rscript', str(script_path)], cwd=work_dir, capture_output=True, text=True, timeout=1200 ) return result.stdout, result.stderr, result.returncode except subprocess.TimeoutExpired: return "", "Processing timed out (>20 min)", 1 except Exception as e: return "", f"Error running R script: {str(e)}", 1 # ============================================================ # Stage 1: Build training-set template # ============================================================ def process_training_set(train_zip_file, halfWindowSize, SNR, tolerance, iterations, skip_alignment, relaxed_params, progress=gr.Progress()): """Process the training set and build a feature template.""" if train_zip_file is None: return "โ Please upload a training-set ZIP file first.", None, None, None, "" if not check_r_installation(): return "โ R is not installed โ cannot process data!", None, None, None, "" if not check_r_packages_installed(): return "โ R packages not installed! Click 'Install R Packages' first.", None, None, None, "" params = { 'halfWindowSize': int(halfWindowSize), 'SNR': float(SNR), 'tolerance': float(tolerance), 'iterations': int(iterations), 'skip_alignment': skip_alignment, 'relaxed_params': relaxed_params, } progress(0.1, desc="๐ Step 1/6: Extracting files...") temp_dir = tempfile.mkdtemp() train_dir = Path(temp_dir) / "train" train_dir.mkdir() try: txt_files, excel_file = extract_files_from_zip(train_zip_file.name) if not txt_files: return "โ No TXT files found in the ZIP archive.", None, None, None, "" if not excel_file: return "โ No Excel file found in the ZIP archive.", None, None, None, "" for content, name in txt_files: cleaned = content.replace(b'\r\n', b'\n').replace(b'\r', b'\n') with open(train_dir / name, 'wb') as f: f.write(cleaned) excel_path = train_dir / excel_file[1] with open(excel_path, 'wb') as f: f.write(excel_file[0]) progress(0.2, desc="๐ Step 2/6: Generating R script...") alignment_block = "" if not params.get('skip_alignment', False): relaxed_block = "" if params.get('relaxed_params', True): relaxed_block = f''' cat("Retrying with relaxed parameters...\\n") tryCatch({{ avgSpectra <<- alignSpectra(avgSpectra, halfWindowSize = {params['halfWindowSize']}, SNR = max(1.5, {params['SNR']} - 0.5), tolerance = {params['tolerance']} * 2, warpingMethod = "lowess") alignment_success <<- TRUE cat("Alignment succeeded with relaxed parameters.\\n") }}, error = function(e2) {{ cat("Relaxed parameters also failed โ skipping alignment.\\n") }}) ''' alignment_block = f''' cat("Aligning average spectra...\\n") alignment_success <- FALSE tryCatch({{ avgSpectra <- alignSpectra(avgSpectra, halfWindowSize = {params['halfWindowSize']}, SNR = {params['SNR']}, tolerance = {params['tolerance']}, warpingMethod = "lowess") alignment_success <- TRUE cat("Alignment complete.\\n") }}, error = function(e) {{ cat(sprintf("Alignment failed: %s\\n", e$message)) {relaxed_block} }}) if (!alignment_success) {{ cat("Warning: spectral alignment failed โ continuing with unaligned data.\\n") }} ''' r_script = f""" user_lib <- Sys.getenv("R_LIBS_USER") if (user_lib == "") {{ user_lib <- "~/R/library" }} if (!dir.exists(user_lib)) {{ dir.create(user_lib, recursive = TRUE) }} .libPaths(c(user_lib, .libPaths())) library('MALDIquant') library('MALDIquantForeign') library('readxl') cat("Starting training-set processing...\\n") cat("Reading Excel and TXT files...\\n") samples <- read_excel('{excel_path.as_posix()}') txt_files <- list.files('{train_dir.as_posix()}', pattern = "\\\\.txt$", full.names = TRUE) cat(sprintf("Found %d TXT file(s)\\n", length(txt_files))) training_spectra <- list() failed_files <- c() for (f in txt_files) {{ tryCatch({{ s <- importTxt(f) if (length(s) > 0) {{ training_spectra <- c(training_spectra, s) }} }}, error = function(e) {{ cat(sprintf(" โ ๏ธ Skipping %s: %s\\n", basename(f), e$message)) failed_files <<- c(failed_files, basename(f)) }}) }} cat(sprintf("Successfully imported: %d spectra\\n", length(training_spectra))) if (length(failed_files) > 0) {{ cat(sprintf("Skipped (bad format): %d file(s)\\n", length(failed_files))) }} if (length(training_spectra) == 0) {{ stop("No spectra imported. Please check TXT file format.") }} cat("Pre-processing (1/5): intensity transformation...\\n") training_spectra <- transformIntensity(training_spectra, method = "sqrt") cat("Pre-processing (2/5): smoothing...\\n") training_spectra <- smoothIntensity(training_spectra, method = "SavitzkyGolay", halfWindowSize = {params['halfWindowSize']}) cat("Pre-processing (3/5): baseline removal...\\n") training_spectra <- removeBaseline(training_spectra, method = "SNIP", iterations = {params['iterations']}) cat("Pre-processing (4/5): intensity calibration...\\n") training_spectra <- calibrateIntensity(training_spectra, method = "TIC") cat("Pre-processing (5/5): assigning labels...\\n") train_labels <- samples$group[match( sapply(training_spectra, function(s) basename(s@metaData$file)), samples$file )] cat("Computing average spectra...\\n") avgSpectra <- averageMassSpectra(training_spectra, labels = train_labels) cat(sprintf("Average spectra computed: %d group(s)\\n", length(avgSpectra))) {alignment_block} cat("Detecting peaks and building feature template...\\n") train_peaks <- detectPeaks(avgSpectra, method = "MAD", halfWindowSize = {params['halfWindowSize']}, SNR = {params['SNR']}) cat("Binning peaks...\\n") train_binned <- binPeaks(train_peaks, tolerance = 2) cat("Extracting feature m/z values...\\n") feature_mz <- as.numeric(unique(unlist(lapply(train_binned, function(p) p@mass)))) feature_mz <- sort(feature_mz) cat(sprintf("Training-set features: %d peaks\\n", length(feature_mz))) cat(sprintf("m/z range: %.0f - %.0f\\n", min(feature_mz), max(feature_mz))) cat("Saving feature template...\\n") feature_template <- data.frame( feature_id = paste0("mz_", round(feature_mz)), mz = feature_mz ) write.csv(feature_template, file = '{temp_dir}/feature_template.csv', row.names = FALSE) cat("Building training-set intensity matrix...\\n") train_intensity_matrix <- intensityMatrix(train_binned, avgSpectra) bin_centers <- as.numeric(colnames(train_intensity_matrix)) colnames(train_intensity_matrix) <- paste0("mz_", round(bin_centers)) rownames(train_intensity_matrix) <- unique(train_labels) train_df <- as.data.frame(train_intensity_matrix) train_df <- cbind(group = rownames(train_df), train_df) write.csv(train_df, file = '{temp_dir}/peak_intensity_train.csv', row.names = FALSE) cat("Saving processing parameters...\\n") params_df <- data.frame( parameter = c('halfWindowSize', 'SNR', 'tolerance', 'iterations', 'skip_alignment'), value = c({params['halfWindowSize']}, {params['SNR']}, {params['tolerance']}, {params['iterations']}, {'TRUE' if params.get('skip_alignment', False) else 'FALSE'}) ) write.csv(params_df, '{temp_dir}/processing_params.csv', row.names = FALSE) cat("Training-set processing complete!\\n") cat(sprintf(" Groups: %d\\n", nrow(train_df))) cat(sprintf(" Features: %d\\n", ncol(train_df) - 1)) """ progress(0.3, desc="๐ฌ Step 3/6: Processing training set (may take a few minutes)...") stdout, stderr, returncode = run_r_script(r_script, temp_dir) if returncode == 0: progress(0.8, desc="๐ Step 4/6: Reading results...") template_df = pd.read_csv(Path(temp_dir) / 'feature_template.csv') train_df = pd.read_csv(Path(temp_dir) / 'peak_intensity_train.csv') app_state.template_created = True app_state.template_data = template_df app_state.processing_params = params progress(0.95, desc="๐พ Step 5/6: Saving output files...") output_dir = Path(tempfile.mkdtemp()) TEMP_DIRS_REGISTRY.append(str(output_dir)) shutil.copy(Path(temp_dir) / 'peak_intensity_train.csv', output_dir / 'peak_intensity_train.csv') shutil.copy(Path(temp_dir) / 'feature_template.csv', output_dir / 'feature_template.csv') shutil.copy(Path(temp_dir) / 'processing_params.csv', output_dir / 'processing_params.csv') progress(1.0, desc="โ Done!") n_groups = len(train_df) n_features = len(template_df) mz_range = f"{template_df['mz'].min():.0f} โ {template_df['mz'].max():.0f}" summary = ( f"โ **Training set processed โ feature template built!**\n\n" f"๐ **Summary:**\n" f"- Groups: **{n_groups}**\n" f"- Features: **{n_features}**\n" f"- m/z range: **{mz_range}**\n\n" f"๐ก You can now switch to Stage 2 to process the validation set." ) return ( summary, str(output_dir / 'peak_intensity_train.csv'), str(output_dir / 'feature_template.csv'), str(output_dir / 'processing_params.csv'), stdout ) else: return f"โ Processing failed!\n\n{stderr}", None, None, None, stdout except Exception as e: return f"โ Unexpected error: {str(e)}", None, None, None, "" finally: shutil.rmtree(temp_dir, ignore_errors=True) # ============================================================ # Stage 2: Process validation set # ============================================================ def process_validation_set(valid_zip_file, template_csv_file, halfWindowSize, SNR, tolerance, iterations, skip_alignment, relaxed_params, progress=gr.Progress()): """Apply the training-set template to a validation set.""" if valid_zip_file is None: return "โ Please upload a validation-set ZIP file first.", None, "" template_from_upload = False if template_csv_file is not None: try: uploaded_template = pd.read_csv(template_csv_file.name) if 'mz' not in uploaded_template.columns: return "โ Uploaded template CSV is missing the 'mz' column.", None, "" template_df = uploaded_template template_from_upload = True except Exception as e: return f"โ Failed to read template CSV: {str(e)}", None, "" elif app_state.template_created and app_state.template_data is not None: template_df = app_state.template_data else: return ("โ No feature template available!\n" "Complete Stage 1 first, or upload a saved feature_template.csv."), None, "" if not check_r_installation(): return "โ R is not installed!", None, "" if not check_r_packages_installed(): return "โ R packages not installed!", None, "" if not template_from_upload and app_state.processing_params: params = app_state.processing_params else: params = { 'halfWindowSize': int(halfWindowSize), 'SNR': float(SNR), 'tolerance': float(tolerance), 'iterations': int(iterations), 'skip_alignment': skip_alignment, 'relaxed_params': relaxed_params, } progress(0.1, desc="๐ Step 1/5: Extracting validation files...") temp_dir = tempfile.mkdtemp() valid_dir = Path(temp_dir) / "validation" valid_dir.mkdir() try: txt_files, _ = extract_files_from_zip(valid_zip_file.name) if not txt_files: return "โ No TXT files found in the ZIP archive.", None, "" for content, name in txt_files: cleaned = content.replace(b'\r\n', b'\n').replace(b'\r', b'\n') with open(valid_dir / name, 'wb') as f: f.write(cleaned) progress(0.2, desc="๐ Step 2/5: Preparing feature template...") template_path = Path(temp_dir) / 'feature_template.csv' template_df.to_csv(template_path, index=False) n_features = len(template_df) template_source = "uploaded CSV" if template_from_upload else "Stage 1 in-memory template" progress(0.3, desc="๐ Step 3/5: Generating R script...") r_script = f""" user_lib <- Sys.getenv("R_LIBS_USER") if (user_lib == "") {{ user_lib <- "~/R/library" }} if (!dir.exists(user_lib)) {{ dir.create(user_lib, recursive = TRUE) }} .libPaths(c(user_lib, .libPaths())) library('MALDIquant') library('MALDIquantForeign') cat("Processing validation set with training template...\\n") template <- read.csv('{template_path.as_posix()}') template_mz <- template$mz cat(sprintf("Feature template: %d m/z values\\n", length(template_mz))) cat("Reading validation TXT files...\\n") txt_files <- list.files('{valid_dir.as_posix()}', pattern = "\\\\.txt$", full.names = TRUE) cat(sprintf("Found %d TXT file(s)\\n", length(txt_files))) validation_spectra <- list() failed_files <- c() for (f in txt_files) {{ tryCatch({{ s <- importTxt(f) if (length(s) > 0) {{ validation_spectra <- c(validation_spectra, s) }} }}, error = function(e) {{ cat(sprintf(" โ ๏ธ Skipping %s: %s\\n", basename(f), e$message)) failed_files <<- c(failed_files, basename(f)) }}) }} cat(sprintf("Successfully imported: %d spectra\\n", length(validation_spectra))) if (length(failed_files) > 0) {{ cat(sprintf("Skipped (bad format): %d file(s)\\n", length(failed_files))) }} if (length(validation_spectra) == 0) {{ stop("No spectra imported. Please check TXT file format.") }} cat("Pre-processing (1/4): intensity transformation...\\n") validation_spectra <- transformIntensity(validation_spectra, method = "sqrt") cat("Pre-processing (2/4): smoothing...\\n") validation_spectra <- smoothIntensity(validation_spectra, method = "SavitzkyGolay", halfWindowSize = {params['halfWindowSize']}) cat("Pre-processing (3/4): baseline removal...\\n") validation_spectra <- removeBaseline(validation_spectra, method = "SNIP", iterations = {params['iterations']}) cat("Pre-processing (4/4): intensity calibration...\\n") validation_spectra <- calibrateIntensity(validation_spectra, method = "TIC") cat("Aligning validation spectra...\\n") tryCatch({{ validation_spectra <- alignSpectra(validation_spectra, halfWindowSize = {params['halfWindowSize']}, SNR = {params['SNR']}, tolerance = {params['tolerance']}, warpingMethod = "lowess") cat("Alignment complete.\\n") }}, error = function(e) {{ cat("Standard alignment failed โ trying relaxed parameters...\\n") cat(sprintf("Error: %s\\n", e$message)) tryCatch({{ validation_spectra <<- alignSpectra(validation_spectra, halfWindowSize = {params['halfWindowSize']}, SNR = max(1.5, {params['SNR']} - 0.5), tolerance = {params['tolerance']} * 1.5, warpingMethod = "lowess") cat("Alignment succeeded with relaxed parameters.\\n") }}, error = function(e2) {{ cat("Relaxed alignment also failed โ skipping alignment step.\\n") }}) }}) cat("Extracting intensities using template...\\n") n_samples <- length(validation_spectra) n_features <- length(template_mz) intensity_matrix <- matrix(0, nrow = n_samples, ncol = n_features) for (i in 1:n_samples) {{ if (i %% 50 == 0) {{ cat(sprintf(" Progress: %d/%d\\n", i, n_samples)) }} spec <- validation_spectra[[i]] for (j in 1:n_features) {{ target_mz <- template_mz[j] if (length(spec@mass) > 0) {{ idx <- which(abs(spec@mass - target_mz) <= 2) if (length(idx) > 0) {{ closest_idx <- idx[which.min(abs(spec@mass[idx] - target_mz))] intensity_matrix[i, j] <- spec@intensity[closest_idx] }} }} }} }} colnames(intensity_matrix) <- paste0("mz_", round(template_mz)) sample_names <- sapply(validation_spectra, function(s) basename(s@metaData$file)) rownames(intensity_matrix) <- sample_names cat("Saving validation results...\\n") valid_df <- as.data.frame(intensity_matrix) valid_df <- cbind(sample = rownames(valid_df), valid_df) write.csv(valid_df, file = '{temp_dir}/peak_intensity_validation.csv', row.names = FALSE) cat("Validation-set processing complete!\\n") cat(sprintf(" Samples: %d\\n", nrow(valid_df))) cat(sprintf(" Features: %d (consistent with training set)\\n", ncol(valid_df) - 1)) """ progress(0.4, desc="๐ฌ Step 4/5: Processing validation set (may take a few minutes)...") stdout, stderr, returncode = run_r_script(r_script, temp_dir) if returncode == 0: progress(0.9, desc="๐ Step 5/5: Reading results...") valid_df = pd.read_csv(Path(temp_dir) / 'peak_intensity_validation.csv') output_dir = Path(tempfile.mkdtemp()) TEMP_DIRS_REGISTRY.append(str(output_dir)) shutil.copy( Path(temp_dir) / 'peak_intensity_validation.csv', output_dir / 'peak_intensity_validation.csv' ) progress(1.0, desc="โ Done!") summary = ( f"โ **Validation set processed!**\n\n" f"๐ **Summary:**\n" f"- Template source: **{template_source}**\n" f"- Template features: **{n_features}**\n" f"- Validation samples: **{len(valid_df)}**\n" f"- Output features: **{len(valid_df.columns) - 1}**\n" f"- Feature consistency: โ Aligned with template" ) return summary, str(output_dir / 'peak_intensity_validation.csv'), stdout else: return f"โ Processing failed!\n\n{stderr}", None, stdout except Exception as e: return f"โ Unexpected error: {str(e)}", None, "" finally: shutil.rmtree(temp_dir, ignore_errors=True) # ============================================================ # Data Preparation: generate sample-info Excel from folder ZIP # ============================================================ def generate_sample_excel(zip_file, group_mode, progress=gr.Progress()): """ Parse folder structure from a ZIP and generate a sample-info Excel. group_mode: - "Folder name + sample name": group = folder_sample (e.g. KPN-R_1-1) - "Folder name only": group = folder name (e.g. KPN-R) โ samples in the same folder will be averaged in Stage 1 """ if zip_file is None: return "โ Please upload a ZIP file first.", None, None, None progress(0.1, desc="๐ Parsing ZIP file structure...") try: records = [] txt_contents = {} with zipfile.ZipFile(zip_file.name, 'r') as zf: for file_path in sorted(zf.namelist()): if '__MACOSX' in file_path or file_path.startswith('.'): continue if not file_path.lower().endswith('.txt'): continue parts = Path(file_path).parts folder_name = parts[-2] if len(parts) >= 2 else "ungrouped" file_name = parts[-1] sample_stem = Path(file_name).stem if file_name in txt_contents: file_name = f"{folder_name}_{file_name}" sample_stem = Path(file_name).stem group_name = (f"{folder_name}_{sample_stem}" if group_mode == "Folder name + sample name" else folder_name) records.append({'file': file_name, 'group': group_name}) txt_contents[file_name] = zf.read(file_path) if not records: return "โ No TXT files found in the ZIP archive.", None, None, None progress(0.5, desc="๐ Generating Excel file...") df = pd.DataFrame(records) group_counts = df['group'].value_counts() n_groups = len(group_counts) n_files = len(df) folder_counts = {} for _, row in df.iterrows(): fn = row['group'].split('_')[0] if group_mode == "Folder name + sample name" else row['group'] folder_counts[fn] = folder_counts.get(fn, 0) + 1 output_dir = Path(tempfile.mkdtemp()) TEMP_DIRS_REGISTRY.append(str(output_dir)) excel_path = output_dir / 'sample_info.xlsx' df.to_excel(excel_path, index=False, engine='openpyxl') progress(0.7, desc="๐ฆ Packing ready-to-use training ZIP...") ready_zip_path = output_dir / 'training_data_ready.zip' with zipfile.ZipFile(ready_zip_path, 'w', zipfile.ZIP_DEFLATED) as zout: for fname, content in txt_contents.items(): zout.writestr(fname, content) zout.write(excel_path, 'sample_info.xlsx') progress(1.0, desc="โ Done!") summary_lines = [ f"โ **Sample info Excel generated!**\n", f"๐ **Summary:**", f"- Grouping mode: **{group_mode}**", f"- Total files: **{n_files}**", f"- Groups (group): **{n_groups}**", ] if group_mode == "Folder name + sample name": summary_lines.append(f"- Folders: **{len(folder_counts)}**") summary_lines.append(f"\n๐ Samples per folder:") for folder, cnt in sorted(folder_counts.items()): summary_lines.append(f" - `{folder}`: {cnt} sample(s)") summary_lines.append( f"\n๐ก Each sample has a unique group name (e.g. `KPN-R_1-1`). " f"In Stage 1, **each sample is kept separately** โ no averaging." ) else: summary_lines.append(f"\n๐ Samples per group:") for grp, cnt in group_counts.items(): summary_lines.append(f" - `{grp}`: {cnt} sample(s)") summary_lines.append( f"\nโ ๏ธ Samples in the same group will be **averaged** in Stage 1. " f"If there are few groups, the final dataset will have few rows." ) summary_lines.append( f"\n๐ก **Next step:** Download the 'Training Data ZIP' and upload it directly in Stage 1." ) preview_df = df.head(20) return "\n".join(summary_lines), str(excel_path), str(ready_zip_path), preview_df except zipfile.BadZipFile: return "โ The file is not a valid ZIP archive.", None, None, None except Exception as e: return f"โ Unexpected error: {str(e)}", None, None, None # ============================================================ # Gradio UI # ============================================================ CUSTOM_CSS = """ .main-title { text-align: center; margin-bottom: 0.5rem; } .phase-header { background: linear-gradient(90deg, #1f77b4 0%, #4a9eff 100%); color: white; padding: 0.8rem 1rem; border-radius: 8px; margin: 0.5rem 0; font-size: 1.1rem; font-weight: 600; } """ with gr.Blocks( title="๐ฌ MALDI-TOF MS Template Processing Platform", theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky", neutral_hue="slate"), css=CUSTOM_CSS, ) as demo: gr.Markdown( "# ๐ฌ MALDI-TOF MS Template Processing Platform\n" "### Build a feature template from the training set, then batch-process validation sets\n" "---" ) # โโ Environment check โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ with gr.Accordion("๐ง Environment Check & R Package Management", open=False): with gr.Row(): env_check_btn = gr.Button("๐ Check Environment", variant="secondary") install_btn = gr.Button("๐ฆ Install R Packages", variant="primary") env_status = gr.Textbox(label="Environment Status", interactive=False, lines=3) env_check_btn.click(fn=check_environment, outputs=env_status) install_btn.click(fn=lambda: install_r_packages()[1], outputs=env_status) # โโ Processing parameters โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ with gr.Accordion("โ๏ธ Processing Parameters", open=False): with gr.Row(): halfWindowSize = gr.Slider(10, 200, value=90, step=10, label="Half-window size (halfWindowSize)") SNR = gr.Slider(1.0, 10.0, value=2.0, step=0.5, label="Signal-to-noise ratio threshold (SNR)") with gr.Row(): tolerance = gr.Slider(0.001, 0.02, value=0.008, step=0.001, label="Alignment tolerance (tolerance)") iterations = gr.Slider(50, 200, value=100, step=10, label="Baseline removal iterations (iterations)") with gr.Row(): skip_alignment = gr.Checkbox(label="Skip spectral alignment", value=False, info="Check this if alignment keeps failing") relaxed_params = gr.Checkbox(label="Use relaxed parameters", value=True, info="Automatically loosen parameters to improve success rate") param_inputs = [halfWindowSize, SNR, tolerance, iterations, skip_alignment, relaxed_params] # โโ Main tabs โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ with gr.Tabs(): # โโ Data Preparation โโ with gr.TabItem("๐ Data Preparation: Generate Sample Info"): gr.HTML('