Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import tempfile, os, shutil, zipfile, glob, time | |
| import os | |
| from PIL import Image, ImageDraw, ImageFont | |
| os.environ["GRADIO_TEMP_DIR"] = "/tmp/gradio" | |
| os.environ["TMPDIR"] = "/tmp" | |
| os.environ["HF_HOME"] = "/tmp/hfhome" | |
| os.environ["HF_HUB_CACHE"] = "/tmp/hfhome/hub" | |
| os.environ["XDG_CACHE_HOME"] = "/tmp/xdgcache" # au cas où | |
| os.environ["TORCH_HOME"] = "/tmp/torchhome" | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| from torchvision import transforms | |
| from transformers import SegformerForSemanticSegmentation | |
| import tifffile | |
| from scipy.ndimage import label | |
| from huggingface_hub import hf_hub_download | |
| import json | |
| from datetime import datetime | |
| COUNTER_PATH = "/home/user/app/usage_counter.json" # plus stable que /tmp | |
| def _load_counter(): | |
| try: | |
| with open(COUNTER_PATH, "r") as f: | |
| return json.load(f) | |
| except Exception: | |
| return { | |
| "total_images": 0, | |
| "total_jobs": 0, | |
| "total_demo_images": 0, | |
| "total_demo_jobs": 0, | |
| "last_update_utc": None, | |
| } | |
| def _save_counter(c): | |
| c["last_update_utc"] = datetime.utcnow().isoformat(timespec="seconds") + "Z" | |
| os.makedirs(os.path.dirname(COUNTER_PATH), exist_ok=True) | |
| tmp = COUNTER_PATH + ".tmp" | |
| with open(tmp, "w") as f: | |
| json.dump(c, f, indent=2) | |
| os.replace(tmp, COUNTER_PATH) | |
| def bump_counter(n_images: int, is_demo: bool): | |
| c = _load_counter() | |
| if is_demo: | |
| c["total_demo_images"] += int(n_images) | |
| c["total_demo_jobs"] += 1 | |
| else: | |
| c["total_images"] += int(n_images) | |
| c["total_jobs"] += 1 | |
| _save_counter(c) | |
| return c | |
| PREVSIZE=600 | |
| PLACEHOLDER_W, PLACEHOLDER_H = PREVSIZE, 340 | |
| PLACEHOLDER = Image.new("RGB", (PLACEHOLDER_W, PLACEHOLDER_H), (245, 245, 245)) | |
| def _safe_rmtree(p): | |
| try: shutil.rmtree(p, ignore_errors=True) | |
| except Exception: pass | |
| # -- ménage de démarrage (si quelque chose traîne) -- | |
| for _p in ["/home/user/.cache", "/home/user/.local/share/Trash", "/home/user/app/tmp"]: | |
| _safe_rmtree(_p) | |
| CACHE_DIR = "/tmp/hfhome/transformers" # même valeur que ci-dessus | |
| for _p in ["/home/user/.cache", "/home/user/.local/share/Trash", "/home/user/app/tmp","/home/user/.gradio", "/tmp/gradio", "/tmp/hfhome", "/tmp/xdgcache", "/tmp/torchhome"]: | |
| _safe_rmtree(_p) | |
| # ======== CONFIG ======== | |
| DEVICE = "cpu" # Space CPU | |
| TARGET_SIZE = (341, 512) | |
| TARGET_AR = TARGET_SIZE[1] / TARGET_SIZE[0] # W / H ≈ 1.50 (paysage) | |
| AR_TOLERANCE = 0.20 # 20 % — au-delà, zero-padding au lieu d'étirer | |
| AR_MODEL_ID = "RomainFernandez/MicroDeepAerenchyma-Lacuna" | |
| CE_MODEL_ID = "RomainFernandez/MicroDeepAerenchyma-Cortex" | |
| # (Optionnel) dataset de démo -> stocke un petit ZIP (qq images 341x512 ou proches) | |
| DEMO_DATASET_ID = "RomainFernandez/MicroDeepAerenchyma-Demo" # à créer par toi | |
| DEMO_ZIP_FILENAME = "demo_images.zip" # mets ce nom dans le dataset | |
| # ======== MODELS (load once) ======== | |
| ar_model = SegformerForSemanticSegmentation.from_pretrained( | |
| AR_MODEL_ID, cache_dir=CACHE_DIR).to(DEVICE).eval() | |
| ce_model = SegformerForSemanticSegmentation.from_pretrained( | |
| CE_MODEL_ID, cache_dir=CACHE_DIR).to(DEVICE).eval() | |
| # ---- PURGE OPTIONNELLE DU CACHE HF (réduit l'usage disque) ---- | |
| try: | |
| from huggingface_hub import scan_cache_dir | |
| # CACHE_DIR = "/tmp/hfhome/transformers" -> on monte d'un cran: "/tmp/hfhome" | |
| cache_root = os.path.abspath(os.path.join(CACHE_DIR, "..")) | |
| info = scan_cache_dir(cache_root) | |
| # ne garder que les 2 dépôts utiles à l'app | |
| keep = {AR_MODEL_ID, CE_MODEL_ID} | |
| for repo in info.repos: | |
| if getattr(repo, "repo_id", None) not in keep: | |
| # supprime entièrement le repo non utilisé | |
| repo_path = getattr(repo, "repo_path", None) | |
| if repo_path and os.path.exists(repo_path): | |
| _safe_rmtree(repo_path) | |
| except Exception: | |
| # pas bloquant si la structure du cache change | |
| pass | |
| _to_tensor = transforms.Compose([ | |
| transforms.ToTensor(), # PIL → float [0,1] | |
| transforms.Lambda(lambda x: x.repeat(3, 1, 1)), # grayscale → 3-ch | |
| ]) | |
| def smart_resize(image_L): | |
| """Resize pour le modèle, avec zero-padding si le ratio d'aspect diverge. | |
| Si le ratio W/H de l'image est suffisamment proche de TARGET_AR | |
| (écart ≤ AR_TOLERANCE), on fait un resize classique (comportement | |
| identique à l'entraînement). Sinon on redimensionne en conservant | |
| le ratio puis on zero-pad (noir) jusqu'à TARGET_SIZE. | |
| Returns | |
| ------- | |
| prepared : PIL.Image — image L (grayscale) à TARGET_SIZE | |
| pad_info : dict — métadonnées de la transformation | |
| """ | |
| w, h = image_L.size | |
| target_h, target_w = TARGET_SIZE | |
| input_ar = w / h | |
| if abs(input_ar / TARGET_AR - 1.0) <= AR_TOLERANCE: | |
| # Ratio compatible → resize direct (comportement d'entraînement) | |
| resized = image_L.resize((target_w, target_h), Image.BILINEAR) | |
| return resized, { | |
| "padded": False, | |
| "pad_top": 0, "pad_left": 0, | |
| "effective_h": target_h, "effective_w": target_w, | |
| "scale_x": target_w / w, "scale_y": target_h / h, | |
| } | |
| # Ratio trop différent → conserver l'aspect + zero-pad | |
| scale = min(target_w / w, target_h / h) | |
| new_w = int(round(w * scale)) | |
| new_h = int(round(h * scale)) | |
| resized = image_L.resize((new_w, new_h), Image.BILINEAR) | |
| canvas = Image.new("L", (target_w, target_h), 0) | |
| pad_left = (target_w - new_w) // 2 | |
| pad_top = (target_h - new_h) // 2 | |
| canvas.paste(resized, (pad_left, pad_top)) | |
| return canvas, { | |
| "padded": True, | |
| "pad_top": pad_top, "pad_left": pad_left, | |
| "effective_h": new_h, "effective_w": new_w, | |
| "scale_x": scale, "scale_y": scale, | |
| } | |
| def predict_mask(model, image_L): | |
| """Prédit le masque de segmentation. Gère le zero-padding si nécessaire.""" | |
| prepared, pad_info = smart_resize(image_L) | |
| x = _to_tensor(prepared).unsqueeze(0).to(DEVICE) | |
| with torch.no_grad(): | |
| logits = model(x).logits | |
| up = torch.nn.functional.interpolate( | |
| logits, size=x.shape[-2:], mode="bilinear", align_corners=False | |
| ) | |
| pred = torch.argmax(up, dim=1)[0].cpu().numpy() | |
| # Retirer le padding de la prédiction | |
| if pad_info["padded"]: | |
| pt, pl = pad_info["pad_top"], pad_info["pad_left"] | |
| eh, ew = pad_info["effective_h"], pad_info["effective_w"] | |
| pred = pred[pt : pt + eh, pl : pl + ew] | |
| return pred, pad_info | |
| def largest_cc(binary): | |
| lab, n = label(binary) | |
| if n == 0: return binary*0 | |
| largest = np.argmax(np.bincount(lab.flat)[1:]) + 1 | |
| return (lab == largest).astype(np.uint8) | |
| def safe_list_images(root): | |
| exts = (".png",".jpg",".jpeg",".tif",".tiff") | |
| out = [] | |
| for p in glob.glob(os.path.join(root, "**/*"), recursive=True): | |
| if os.path.isfile(p) and p.lower().endswith(exts): | |
| out.append(p) | |
| return sorted(out) | |
| # On garde trace des répertoires temporaires de la session | |
| ACTIVE_WORKDIRS = set() | |
| BUSY = False | |
| def overlay_counter(img_np: np.ndarray, idx: int, total: int) -> Image.Image: | |
| """Ajoute 'Image idx/total' en haut-gauche, fond noir, police plus grande.""" | |
| im = Image.fromarray(img_np) if isinstance(img_np, np.ndarray) else img_np.copy() | |
| draw = ImageDraw.Draw(im) | |
| text = f"Image {idx}/{total}" | |
| # Taille de police proportionnelle à la largeur de l'image (≈ 3%) | |
| # min/max pour éviter trop petit/grand | |
| try: | |
| W = im.width | |
| except Exception: | |
| W = 800 | |
| fontsize = max(18, min(int(W * 0.03), 48)) | |
| # Essaie une vraie police (HF Spaces a en général DejaVuSans) | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", fontsize) | |
| except Exception: | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", fontsize) | |
| except Exception: | |
| font = ImageFont.load_default() # fallback | |
| pad_x, pad_y = max(6, fontsize // 3), max(4, fontsize // 4) | |
| # bbox du texte | |
| left, top, right, bottom = draw.textbbox((0, 0), text, font=font) | |
| tw, th = right - left, bottom - top | |
| box = (0, 0, tw + 2 * pad_x, th + 2 * pad_y) | |
| # fond noir (opaque). Si tu veux semi-opaque, convertis en RGBA + composite. | |
| draw.rectangle(box, fill=(0, 0, 0)) | |
| draw.text((pad_x, pad_y), text, fill=(255, 255, 255), font=font) | |
| return im | |
| def run_job(files_or_zip, use_demo=False, progress=gr.Progress()): | |
| # garde: espace disque minimal requis (ex: 5 Go) | |
| MIN_FREE = 5 * (1024**3) | |
| try: | |
| usage = shutil.disk_usage("/") | |
| if usage.free < MIN_FREE: | |
| yield None, ( | |
| "⚠️ Low free disk on Space. Please retry later " | |
| "(administrator: increase persistent storage or clean caches)." | |
| ), current_preview, None | |
| return | |
| except Exception: | |
| pass | |
| global BUSY | |
| first_img_time = None # durée mesurée sur la 1ère image | |
| current_preview = PLACEHOLDER.copy() | |
| # Refuser immédiatement si un autre job tourne | |
| if BUSY: | |
| yield None, ( | |
| "⚠️ Server already in use — please come back in a few minutes " | |
| "(another batch is running on this CPU Space)." | |
| ), current_preview, None | |
| return | |
| # BUSY = True | |
| try: | |
| previews = [] # liste d’images PIL pour la galerie | |
| # --- Prépare un espace de travail éphémère --- | |
| workdir = tempfile.mkdtemp(prefix="job_") | |
| ACTIVE_WORKDIRS.add(workdir) | |
| in_root = os.path.join(workdir, "in") | |
| out_root = os.path.join(workdir, "out") | |
| os.makedirs(in_root, exist_ok=True) | |
| os.makedirs(out_root, exist_ok=True) | |
| # --- Récupère les entrées --- | |
| file_list = [] | |
| if use_demo: | |
| demo_zip = hf_hub_download(repo_id=DEMO_DATASET_ID, filename=DEMO_ZIP_FILENAME, repo_type="dataset") | |
| with zipfile.ZipFile(demo_zip) as z: | |
| z.extractall(in_root) | |
| file_list = safe_list_images(in_root) | |
| total = len(file_list) | |
| previews = [] # images pour la galerie | |
| else: | |
| # files_or_zip est une LISTE de chemins (strings) quand type="filepath" | |
| paths = files_or_zip or [] | |
| for p in paths: | |
| if not p: | |
| continue | |
| src = p if isinstance(p, str) else p.name | |
| if src.lower().endswith(".zip"): | |
| with zipfile.ZipFile(src) as z: | |
| z.extractall(in_root) | |
| else: | |
| # copie d'un fichier image individuel | |
| shutil.copy(src, os.path.join(in_root, os.path.basename(src))) | |
| file_list = safe_list_images(in_root) | |
| total = len(file_list) | |
| previews = [] # images pour la galerie | |
| if not file_list: | |
| yield None, "No images found. Upload multiple images or a ZIP (recommended for folders).", current_preview, None | |
| return | |
| start = time.time() | |
| # --- constants for RAW contrast --- | |
| p_lo, p_hi, gamma = 5.0, 99.5, 1.30 | |
| yield None, f"Found {total} image(s). Starting…", current_preview, workdir | |
| progress(0.0) # initialise | |
| # Regroupe par sous-dossier relatif (pour mirrorer la sortie) | |
| rel_map = {} | |
| for p in file_list: | |
| rel = os.path.relpath(p, in_root) | |
| rel_dir = os.path.dirname(rel) # "" si racine | |
| rel_map.setdefault(rel_dir, []).append(p) | |
| processed = 0 | |
| for rel_dir, paths in rel_map.items(): | |
| out_dir = os.path.join(out_root, rel_dir) | |
| pred_dir = os.path.join(out_dir, "predicted_images") | |
| ar_dir = os.path.join(pred_dir, "pred_AR") | |
| cortex_dir= os.path.join(pred_dir, "pred_Cortex") | |
| endo_dir = os.path.join(pred_dir, "pred_Endoderm") | |
| global_dir= os.path.join(pred_dir, "pred_global") | |
| for d in [ar_dir, cortex_dir, endo_dir, global_dir]: | |
| os.makedirs(d, exist_ok=True) | |
| rows = [] | |
| for p in paths: | |
| image_name = os.path.basename(p) | |
| #yield None, f"Processing {image_name}…", previews,workdir | |
| progress(processed / total if total > 0 else 0.0) | |
| im = Image.open(p).convert("L") | |
| # ---- PREVIEW RAW à T/4 (à partir de la 2e image) ---- | |
| # On fabrique un RAW "hint" à la taille du réseau (via smart_resize) | |
| prepared_hint, _ = smart_resize(im) | |
| raw_hint = np.array(prepared_hint, dtype=np.uint8) | |
| # contraste doux | |
| lo_hint, hi_hint = np.percentile(raw_hint, (p_lo, p_hi)) | |
| if hi_hint > lo_hint: | |
| x = (raw_hint.astype(np.float32) - lo_hint) / (hi_hint - lo_hint) | |
| x = np.clip(x, 0.0, 1.0) | |
| x = np.power(x, 1.0 / gamma) | |
| raw_boost_hint = (x * 255.0).astype(np.uint8) | |
| else: | |
| raw_boost_hint = raw_hint | |
| raw_rgb_hint = np.repeat(raw_boost_hint[..., None], 3, axis=-1).astype(np.uint8) | |
| black = np.zeros_like(raw_rgb_hint, dtype=np.uint8) | |
| interim = np.concatenate([raw_rgb_hint, black], axis=1).astype(np.uint8) | |
| # à partir de la 2e image, attendre T/4 (max 0.30s), puis publier l'intermédiaire | |
| if (processed >= 1) and (first_img_time is not None) and (first_img_time > 0): | |
| delay = min(0.7 * first_img_time, 2) | |
| if delay > 0.02: | |
| time.sleep(delay) | |
| interim_prev = overlay_counter(interim, processed + 1, total) | |
| if interim_prev.width > PREVSIZE: | |
| hh = int(interim_prev.height * (PREVSIZE / interim_prev.width)) | |
| interim_prev = interim_prev.resize((PREVSIZE, hh), Image.LANCZOS) | |
| current_preview = interim_prev | |
| yield None, f"Preview (RAW) {processed+1}/{total}: {image_name}", current_preview, workdir | |
| # Prédictions | |
| t0_img = time.time() | |
| # Récupérer les dimensions originales avant resampling | |
| original_width, original_height = im.size # PIL format: (width, height) | |
| pred_ar, pad_info = predict_mask(ar_model, im) | |
| pred_ce, _ = predict_mask(ce_model, im) | |
| scale_factor_x = pad_info["scale_x"] | |
| scale_factor_y = pad_info["scale_y"] | |
| mask_ar_raw = (pred_ar == 1).astype(np.uint8)*255 | |
| mask_cortex = (pred_ce == 1).astype(np.uint8)*255 | |
| mask_endo = (pred_ce == 2).astype(np.uint8)*255 | |
| # Plus grande composante CE | |
| combined_ce = ((mask_cortex > 0) | (mask_endo > 0)).astype(np.uint8) | |
| largest = largest_cc(combined_ce) | |
| # Clip aerenchyma to cortex+endoderm envelope: | |
| # zero out any AR pixel outside the largest CE component | |
| ar_outside = int(((mask_ar_raw > 0) & (largest == 0)).sum()) | |
| mask_ar = (mask_ar_raw * largest).astype(np.uint8) # 0 outside CE | |
| ar_in = ((mask_ar > 0) & (largest > 0)).astype(np.uint8) | |
| cortex_in = ((mask_cortex > 0) & (largest > 0)).astype(np.uint8) | |
| total_ce = int(largest.sum()) | |
| total_cortex = int(cortex_in.sum()) | |
| ar_px = int(ar_in.sum()) | |
| AR_in_CE = (ar_px / total_ce * 100) if total_ce > 0 else 0.0 | |
| AR_in_Cortex = (ar_px / total_cortex * 100) if total_cortex > 0 else 0.0 | |
| # Calculer les surfaces dans l'espace original (approximation) | |
| # Facteur de surface = 1 / (scale_x * scale_y) | |
| area_scale_factor = 1.0 / (scale_factor_x * scale_factor_y) | |
| ar_px_original = ar_px * area_scale_factor | |
| cortex_px_original = total_cortex * area_scale_factor | |
| stele_px_original = (total_ce - total_cortex) * area_scale_factor | |
| rows.append({ | |
| "image_name": image_name, | |
| "original_width": original_width, | |
| "original_height": original_height, | |
| "resampled_width": pad_info["effective_w"], | |
| "resampled_height": pad_info["effective_h"], | |
| "zero_padded": pad_info["padded"], | |
| "scale_factor_x": round(scale_factor_x, 4), | |
| "scale_factor_y": round(scale_factor_y, 4), | |
| "AR_percent_in_Cortex+Endoderm": AR_in_CE, | |
| "AR_percent_in_Cortex_only": AR_in_Cortex, | |
| "Cortex_surface_in_pixels": total_cortex, | |
| "Stele_surface_in_pixels": total_ce - total_cortex, | |
| "AR_surface_in_pixels": ar_px, | |
| "Cortex_surface_original_pixels": round(cortex_px_original, 2), | |
| "Stele_surface_original_pixels": round(stele_px_original, 2), | |
| "AR_surface_original_pixels": round(ar_px_original, 2), | |
| "AR_pixels_removed_outside_cortex": ar_outside | |
| }) | |
| for d in (ar_dir, cortex_dir, endo_dir, global_dir): | |
| os.makedirs(d, exist_ok=True) | |
| # Écrit avec garde-fous | |
| try: | |
| tifffile.imwrite(os.path.join(ar_dir, image_name), mask_ar, compression="deflate") | |
| except FileNotFoundError: | |
| os.makedirs(ar_dir, exist_ok=True) | |
| tifffile.imwrite(os.path.join(ar_dir, image_name), mask_ar, compression="deflate") | |
| try: | |
| tifffile.imwrite(os.path.join(cortex_dir, image_name), mask_cortex, compression="deflate") | |
| except FileNotFoundError: | |
| os.makedirs(cortex_dir, exist_ok=True) | |
| tifffile.imwrite(os.path.join(cortex_dir, image_name), mask_cortex, compression="deflate") | |
| try: | |
| tifffile.imwrite(os.path.join(endo_dir, image_name), mask_endo, compression="deflate") | |
| except FileNotFoundError: | |
| os.makedirs(endo_dir, exist_ok=True) | |
| tifffile.imwrite(os.path.join(endo_dir, image_name), mask_endo, compression="deflate") | |
| # Panel TIFF [raw_boost | overlay RGB] | |
| h, w = mask_ar.shape | |
| imr = im.resize((w, h), Image.BILINEAR) | |
| raw = np.array(imr, dtype=np.uint8) | |
| # --- RAW contrast (softer) --- | |
| # percentiles un peu plus serrés pour réduire l'écrêtage | |
| p_lo, p_hi = 5.0, 99.5 | |
| lo, hi = np.percentile(raw, (p_lo, p_hi)) | |
| if hi > lo: | |
| # mise à l'échelle linéaire 0..1 | |
| x = (raw.astype(np.float32) - lo) / (hi - lo) | |
| x = np.clip(x, 0.0, 1.0) | |
| # courbe gamma douce (>1 comprime les hautes lumières) | |
| gamma = 1.30 | |
| x = np.power(x, 1.0 / gamma) # 1/gamma ~ 0.833 | |
| raw_boost = (x * 255.0).astype(np.uint8) | |
| else: | |
| raw_boost = raw | |
| raw_rgb = np.repeat(raw_boost[..., None], 3, axis=-1).astype(np.uint8) | |
| RED_SCALE, GREEN_SCALE = 0.5, 0.3 # ajustables | |
| r = np.clip(mask_ar.astype(np.float32) * RED_SCALE, 0, 255).astype(np.uint8) | |
| g = np.clip(mask_cortex.astype(np.float32) * GREEN_SCALE, 0, 255).astype(np.uint8) | |
| b = raw_boost # on garde le raw inchangé en B (tu peux mettre raw_boost si tu préfères) | |
| rgb = np.stack([r, g, b], axis=-1) | |
| panel = np.concatenate([raw_rgb, rgb], axis=1).astype(np.uint8) | |
| base = os.path.splitext(image_name)[0] | |
| os.makedirs(global_dir, exist_ok=True) | |
| tifffile.imwrite(os.path.join(global_dir, base + "_panel.tif"), panel, photometric="rgb", compression="deflate") | |
| # Aperçu avec compteur (idx=processed+1) | |
| # durée de cette image | |
| img_elapsed = time.time() - t0_img | |
| # fixe la durée de référence après la 1re image | |
| if processed == 0 and first_img_time is None: | |
| first_img_time = max(0.02, img_elapsed) # borne basse 20 ms | |
| preview_img = overlay_counter(panel, processed + 1, total) | |
| # miniature raisonnable (éviter des images immenses côté client) | |
| maxw = PREVSIZE | |
| if preview_img.width > maxw: | |
| h = int(preview_img.height * (maxw / preview_img.width)) | |
| preview_img = preview_img.resize((maxw, h), Image.LANCZOS) | |
| current_preview = preview_img # <- on remplace l’aperçu courant | |
| pad_note = " [zero-padded]" if pad_info["padded"] else "" | |
| yield None, f"Processed {processed+1}/{total}: {image_name}{pad_note}", current_preview, workdir | |
| processed += 1 | |
| progress(processed / total if total > 0 else 1.0) | |
| #yield None, f"Processed {processed}/{len(file_list)}: {image_name}", current_preview, workdir | |
| if rows: | |
| os.makedirs(out_dir, exist_ok=True) | |
| pd.DataFrame(rows).to_csv(os.path.join(out_dir, "pred_metrics.csv"), index=False) | |
| # ZIP final | |
| zip_path = os.path.join(workdir, "results.zip") | |
| shutil.make_archive(zip_path[:-4], "zip", out_root) | |
| # Copy into Gradio's temp dir so it can serve the file | |
| gradio_tmp = os.environ.get("GRADIO_TEMP_DIR", "/tmp/gradio") | |
| os.makedirs(gradio_tmp, exist_ok=True) | |
| final_zip = os.path.join(gradio_tmp, f"results_{int(time.time())}.zip") | |
| shutil.copy2(zip_path, final_zip) | |
| # libère l'espace immédiatement : on supprime le dossier des sorties | |
| _safe_rmtree(out_root) | |
| elapsed = time.time() - start | |
| avg_time = elapsed / processed if processed > 0 else 0 | |
| c = bump_counter(processed, is_demo=use_demo) | |
| yield final_zip, (f"Done. {processed} image(s) in {elapsed:.1f}s " | |
| f"(avg {avg_time:.2f}s/image).\n" | |
| f" Total number processed since 2026-01: {c['total_images']}.\n" | |
| f" Total number jobs since 2026-01: {c['total_jobs']}.\n" | |
| f" Download the ZIP below."), current_preview, workdir | |
| # On ne supprime pas ici pour laisser le téléchargement se faire | |
| finally: | |
| BUSY = False | |
| def cleanup_all(): | |
| # Appelé à la fin de la session utilisateur | |
| for wd in list(ACTIVE_WORKDIRS): | |
| try: | |
| _safe_rmtree(wd) | |
| shutil.rmtree(wd, ignore_errors=True) | |
| ACTIVE_WORKDIRS.discard(wd) | |
| except Exception: | |
| pass | |
| # purge des caches au cas où | |
| for _p in ["/tmp/hfhome", "/tmp/xdgcache", "/tmp/torchhome"]: | |
| _safe_rmtree(_p) | |
| for wd in list(ACTIVE_WORKDIRS): | |
| try: | |
| _safe_rmtree(wd) | |
| ACTIVE_WORKDIRS.discard(wd) | |
| except Exception: | |
| pass | |
| # Purge de tous les caches/temp usuels | |
| for _p in [ | |
| "/tmp/gradio", "/tmp/hfhome", "/tmp/xdgcache", "/tmp/torchhome", | |
| "/home/user/.cache", "/home/user/.gradio", "/home/user/.local/share/Trash" | |
| ]: | |
| _safe_rmtree(_p) | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Deep Aerenchyma") as demo: | |
| BANNER_URL = "https://huggingface.co/spaces/RomainFernandez/DeepAerenchyma/resolve/main/banner.png" | |
| # ═══════════════════════════════════════════════════════════ | |
| # ALL CSS | |
| # ═══════════════════════════════════════════════════════════ | |
| gr.HTML(f""" | |
| <style> | |
| /* ── HEADER compact ── */ | |
| .banner-wrap {{ margin: 2px 0 4px; text-align:center; }} | |
| .banner-wrap img {{ | |
| display:inline-block; max-width:100%; height:auto; | |
| max-height:clamp(54px, 10vh, 100px); | |
| border-radius:10px; box-shadow: 0 2px 6px rgba(0,0,0,.06); | |
| }} | |
| #titleblock h1 {{ | |
| font-size: 1.4rem !important; line-height: 1.2 !important; | |
| margin: 4px 0 2px !important; | |
| }} | |
| #titleblock p {{ | |
| margin: 1px 0 3px !important; font-size: 0.91rem !important; | |
| }} | |
| /* ── UPLOAD ZONE ── */ | |
| #uploader [data-testid="dropzone"] {{ | |
| min-height: 100px !important; padding: 16px !important; | |
| border: 2px dashed #888 !important; border-radius: 12px !important; | |
| transition: border-color 0.2s, background 0.2s; | |
| }} | |
| #uploader [data-testid="dropzone"]:hover {{ | |
| border-color: #2563eb !important; background: rgba(37,99,235,0.04) !important; | |
| }} | |
| /* ── LIVE PREVIEW ── */ | |
| #live_preview {{ min-height: 180px; }} | |
| #live_preview img {{ width: 100% !important; height: auto !important; object-fit: contain; }} | |
| /* ── STATUS BADGE (dark mode safe) ── */ | |
| .status-badge {{ | |
| display: inline-block; padding: 4px 14px; border-radius: 20px; | |
| font-size: 0.88rem; font-weight: 600; margin: 4px 0 8px; | |
| }} | |
| .status-idle {{ background: #e5e7eb; color: #374151; }} | |
| .status-processing {{ background: #dbeafe; color: #1e40af; animation: pulse-st 1.5s ease-in-out infinite; }} | |
| .status-done {{ background: #d1fae5; color: #065f46; }} | |
| .status-error {{ background: #fee2e2; color: #991b1b; }} | |
| @keyframes pulse-st {{ 0%,100%{{ opacity:1; }} 50%{{ opacity:0.55; }} }} | |
| /* ── RESULTS BLOCK ── */ | |
| #result_block {{ | |
| border: 2px solid #059669; border-radius: 14px; | |
| padding: 18px 20px; text-align: center; margin-top: 8px; | |
| }} | |
| .result-title {{ font-size: 1.25rem; font-weight: 700; color: #059669; margin-bottom: 6px; }} | |
| /* ── GLOBAL / RESPONSIVE ── */ | |
| html, body {{ height:100%; overflow-y:auto !important; }} | |
| .gradio-container {{ min-height:100vh; overflow-y:auto !important; }} | |
| @media (max-width:600px) {{ | |
| .gradio-container {{ padding:6px !important; }} | |
| .banner-wrap img {{ max-height:50px; }} | |
| #result_block {{ | |
| position:sticky; bottom:0; z-index:50; | |
| box-shadow:0 -2px 12px rgba(0,0,0,.12); | |
| }} | |
| }} | |
| </style> | |
| <div class="banner-wrap"> | |
| <img src="{BANNER_URL}" alt="Deep Aerenchyma"> | |
| </div> | |
| """) | |
| # ═══════════════ 0. HEADER ═══════════════ | |
| gr.Markdown( | |
| "# Deep Aerenchyma\n" | |
| "High-throughput root aerenchyma segmentation — drop images or a ZIP, results are automatic.", | |
| elem_id="titleblock", | |
| ) | |
| # ═══════════════ 1. UPLOAD ZONE ═══════════════ | |
| files = gr.Files( | |
| label="Drop images or a ZIP file here", | |
| file_count="multiple", | |
| type="filepath", | |
| elem_id="uploader", | |
| ) | |
| demo_btn = gr.Button( | |
| "Test with demo images", | |
| variant="secondary", | |
| ) | |
| # ═══════════════ 2. STATUS + LIVE PREVIEW ═══════════════ | |
| status_html = gr.HTML( | |
| '<span class="status-badge status-idle">Waiting for images\u2026</span>', | |
| ) | |
| live_preview = gr.Image( | |
| label="Live processing preview", | |
| value=PLACEHOLDER, | |
| interactive=False, | |
| elem_id="live_preview", | |
| height=None, | |
| ) | |
| with gr.Accordion("Show logs", open=False): | |
| log = gr.Textbox(label="Processing logs", lines=4, interactive=False) | |
| # ═══════════════ 3. RESULTS BLOCK ═══════════════ | |
| with gr.Column(visible=False, elem_id="result_block") as result_block: | |
| result_title_html = gr.HTML( | |
| '<div class="result-title">\u2705 Results ready</div>', | |
| ) | |
| out_zip = gr.DownloadButton( | |
| "⬇ Download results (ZIP)", | |
| variant="primary", | |
| size="lg", | |
| visible=True, | |
| ) | |
| newrun_btn = gr.Button( | |
| "Process new images", | |
| variant="secondary", | |
| size="lg", | |
| ) | |
| # ═══════════════ STATE ═══════════════ | |
| workdir_state = gr.State(value=None) | |
| # ═══════════════ HELPERS ═══════════════ | |
| def _badge(kind, text): | |
| return f'<span class="status-badge status-{kind}">{text}</span>' | |
| # ═══════════════ DRIVERS ═══════════════ | |
| def driver(uploaded_files): | |
| if not uploaded_files: | |
| return | |
| if BUSY: | |
| yield { | |
| log: "Server already in use -- please retry in a few minutes.", | |
| status_html: _badge("error", "Server busy"), | |
| live_preview: PLACEHOLDER, workdir_state: None, | |
| files: gr.update(interactive=False), | |
| demo_btn: gr.update(interactive=False), | |
| result_block: gr.update(visible=False), | |
| } | |
| return | |
| yield { | |
| status_html: _badge("processing", "Processing started..."), | |
| files: gr.update(interactive=False), | |
| demo_btn: gr.update(interactive=False), | |
| result_block: gr.update(visible=False), | |
| log: "Processing started...", | |
| live_preview: PLACEHOLDER, workdir_state: None, | |
| } | |
| for zip_path, msg, preview, wd in run_job(uploaded_files, use_demo=False): | |
| upd = { | |
| log: msg, live_preview: preview, workdir_state: wd, | |
| status_html: _badge("processing", msg), | |
| } | |
| if zip_path is not None: | |
| upd[out_zip] = gr.DownloadButton( | |
| "⬇ Download results (ZIP)", | |
| value=zip_path, variant="primary", size="lg", | |
| ) | |
| upd[result_block] = gr.update(visible=True) | |
| upd[status_html] = _badge("done", "Done!") | |
| yield upd | |
| yield { | |
| files: gr.update(interactive=True), | |
| demo_btn: gr.update(interactive=True), | |
| } | |
| def driver_demo(): | |
| if BUSY: | |
| yield { | |
| log: "Server already in use -- please retry in a few minutes.", | |
| status_html: _badge("error", "Server busy"), | |
| live_preview: PLACEHOLDER, workdir_state: None, | |
| files: gr.update(interactive=False), | |
| demo_btn: gr.update(interactive=False), | |
| result_block: gr.update(visible=False), | |
| } | |
| return | |
| yield { | |
| status_html: _badge("processing", "Loading demo images..."), | |
| files: gr.update(interactive=False), | |
| demo_btn: gr.update(interactive=False), | |
| result_block: gr.update(visible=False), | |
| log: "Loading demo images...", | |
| live_preview: PLACEHOLDER, workdir_state: None, | |
| } | |
| for zip_path, msg, preview, wd in run_job(None, use_demo=True): | |
| upd = { | |
| log: msg, live_preview: preview, workdir_state: wd, | |
| status_html: _badge("processing", msg), | |
| } | |
| if zip_path is not None: | |
| upd[out_zip] = gr.DownloadButton( | |
| "⬇ Download results (ZIP)", | |
| value=zip_path, variant="primary", size="lg", | |
| ) | |
| upd[result_block] = gr.update(visible=True) | |
| upd[status_html] = _badge("done", "Done!") | |
| yield upd | |
| yield { | |
| files: gr.update(interactive=True), | |
| demo_btn: gr.update(interactive=True), | |
| } | |
| def reset_for_new_run(): | |
| return { | |
| result_block: gr.update(visible=False), | |
| live_preview: PLACEHOLDER, | |
| log: "", | |
| status_html: _badge("idle", "Waiting for images..."), | |
| files: gr.update(value=None), | |
| out_zip: gr.DownloadButton( | |
| "⬇ Download results (ZIP)", | |
| value=None, variant="primary", size="lg", | |
| ), | |
| } | |
| # ═══════════════ EVENT WIRING ═══════════════ | |
| all_outputs = [log, live_preview, workdir_state, | |
| status_html, files, demo_btn, result_block, out_zip] | |
| files.change( | |
| driver, | |
| inputs=[files], | |
| outputs=all_outputs, | |
| show_progress="hidden", | |
| ) | |
| demo_btn.click( | |
| driver_demo, | |
| inputs=None, | |
| outputs=all_outputs, | |
| show_progress="hidden", | |
| ) | |
| newrun_btn.click( | |
| reset_for_new_run, | |
| inputs=None, | |
| outputs=[result_block, live_preview, log, status_html, files, out_zip], | |
| ) | |
| demo.unload(lambda: cleanup_all()) | |
| demo.queue().launch(ssr_mode=False, allowed_paths=["/tmp"]) | |