Spaces:
Runtime error
Runtime error
| import tensorflow as tf | |
| import numpy as np | |
| from PIL import Image | |
| import tensorflow as tf | |
| import logging | |
| import numpy as np | |
| from PIL import Image | |
| from keras.applications.efficientnet_v2 import preprocess_input as effnet_preprocess | |
| from keras.applications.resnet_v2 import preprocess_input as resnet_preprocess | |
| import io | |
| from tf_keras_vis.gradcam import Gradcam,GradcamPlusPlus | |
| from tf_keras_vis.utils import normalize | |
| import numpy as np | |
| import tensorflow as tf | |
| from tf_keras_vis.saliency import Saliency | |
| from tf_keras_vis.utils import normalize | |
| import numpy as np | |
| import tensorflow as tf | |
| from tf_keras_vis.saliency import Saliency | |
| from tf_keras_vis.utils import normalize | |
| import logging | |
| import time | |
| import pickle | |
| import hashlib | |
| from typing import TypedDict, Callable, Any | |
| from app.log import logger | |
| import os | |
| import diskcache as dc | |
| confidence_threshold=0.55 | |
| entropy_threshold=2 | |
| # Cache en mémoire (simple dict) | |
| HEATMAP_CACHE = {} | |
| # Optionnel : répertoire cache sur disque | |
| CACHE_DIR = "./cache_heatmaps" | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| CACHE_DIR = './cache' | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| cache = dc.Cache(CACHE_DIR) | |
| class ModelStruct(TypedDict): | |
| model_name: str | |
| model: tf.keras.Model | |
| gradcam_model:tf.keras.Model | |
| preprocess_input: Callable[[np.ndarray], Any] | |
| target_size: tuple[int, int] | |
| last_conv_layer:str | |
| gradcam_type:str | |
| _model_cache: list[ModelStruct] | None = None | |
| def load_model() -> list[ModelStruct]: | |
| global _model_cache | |
| if _model_cache is None: | |
| print("📦 Chargement du modèle Resnet50V2...") | |
| model = tf.keras.models.load_model("model/best_ResNet50V2_gradcam.keras",compile=False) | |
| _model_cache = [ { | |
| "model_name":"ResNet50V2", | |
| "gradcam_model":model, | |
| "model":model, | |
| "preprocess_input":resnet_preprocess, | |
| "target_size":(224, 224), | |
| "last_conv_layer":"conv5_block3_out", | |
| "gradcam_type":"gradcam" | |
| #"gradcam_type":"gradcam++" | |
| }] | |
| return _model_cache | |
| def compute_gradcam_v2(model, image_array, class_index=None, layer_name=None, gradcam_type="gradcam"): | |
| logging.info(f"🔍 Lancement du calcul Grad-CAM — type: {gradcam_type}") | |
| # GPU check | |
| gpus = tf.config.list_physical_devices("GPU") | |
| if not gpus: | |
| logging.warning("⚠️ Aucun GPU visible pour TensorFlow.") | |
| else: | |
| logging.info(f"✅ GPU(s) détecté(s) : {[gpu.name for gpu in gpus]}") | |
| start_time = time.time() | |
| # Préparation de l'input | |
| if image_array.ndim == 3: | |
| input_tensor = tf.convert_to_tensor(np.expand_dims(image_array, axis=0), dtype=tf.float32) | |
| else: | |
| input_tensor = tf.convert_to_tensor(image_array, dtype=tf.float32) | |
| # Vérification device des tenseurs | |
| logging.info(f"📦 Device input_tensor : {input_tensor.device}") | |
| if model.trainable_variables: | |
| try: | |
| logging.info(f"📦 Device premier poids modèle : {model.trainable_variables[0].device}") | |
| except AttributeError: | |
| logging.warning("⚠️ Impossible d'accéder au device du premier poids (Variable object).") | |
| else: | |
| logging.warning("❗ Aucun poids trainable trouvé dans le modèle.") | |
| # Instanciation du GradCAM | |
| if gradcam_type == "gradcam++": | |
| gradcam = GradcamPlusPlus(model, clone=False) | |
| else: | |
| gradcam = Gradcam(model, clone=False) | |
| # Fonction de loss personnalisée | |
| def loss(output): | |
| class_idx = tf.argmax(output[0]) if class_index is None else class_index | |
| return output[:, class_idx] | |
| # Détection de la couche convolutionnelle cible | |
| if layer_name is None: | |
| for layer in reversed(model.layers): | |
| if 'conv' in layer.name and len(layer.output_shape) == 4: | |
| layer_name = layer.name | |
| break | |
| if layer_name is None: | |
| raise ValueError("❌ Aucune couche conv 2D trouvée.") | |
| logging.info(f"🎯 Couche utilisée pour la Grad-CAM : {layer_name}") | |
| # Calcul sur GPU (ou fallback CPU si indisponible) | |
| try: | |
| with tf.device("/GPU:0"): | |
| cam = gradcam(loss, input_tensor, penultimate_layer=layer_name) | |
| logging.info("🚀 Grad-CAM exécutée avec /GPU:0") | |
| except RuntimeError as e: | |
| logging.warning(f"⚠️ Fallback CPU — erreur GPU : {e}") | |
| cam = gradcam(loss, input_tensor, penultimate_layer=layer_name) | |
| logging.info("🏃 Grad-CAM exécutée sur CPU") | |
| cam = cam[0] # (H, W) | |
| cam = normalize(cam) | |
| elapsed = time.time() - start_time | |
| logging.info(f"✅ Grad-CAM terminée en {elapsed:.2f} sec — shape: {cam.shape}") | |
| return cam | |
| def compute_gradcam(model, image_array, class_index=None, layer_name=None,gradcam_type="gradcam"): | |
| """ | |
| Calcule la carte Grad-CAM pour une image et un modèle Keras. | |
| Args: | |
| model: tf.keras.Model. | |
| image_array: np.array (H, W, 3), float32, pré-traitée. | |
| class_index: int ou None, index de la classe cible. Si None, classe prédite. | |
| layer_name: str ou None, nom de la couche convolutionnelle à utiliser. Si None, dernière conv. | |
| Returns: | |
| gradcam_map: np.array (H, W), normalisée entre 0 et 1. | |
| """ | |
| logging.info(f"Lancement calcul de la gradcam avec le type {gradcam_type}") | |
| if image_array.ndim == 3: | |
| input_tensor = np.expand_dims(image_array, axis=0) | |
| else: | |
| input_tensor = image_array | |
| if gradcam_type=="gradcam++": | |
| gradcam = GradcamPlusPlus(model, clone=False) | |
| else: | |
| gradcam = Gradcam(model, clone=False) | |
| def loss(output): | |
| if class_index is None: | |
| class_index_local = tf.argmax(output[0]) | |
| else: | |
| class_index_local = class_index | |
| return output[:, class_index_local] | |
| # Choisir la couche à utiliser pour GradCAM | |
| if layer_name is None: | |
| # Si non spécifié, chercher la dernière couche conv 2D | |
| for layer in reversed(model.layers): | |
| if 'conv' in layer.name and len(layer.output_shape) == 4: | |
| layer_name = layer.name | |
| break | |
| if layer_name is None: | |
| raise ValueError("Aucune couche convolutionnelle 2D trouvée dans le modèle.") | |
| cam = gradcam(loss, input_tensor, penultimate_layer=layer_name) | |
| cam = cam[0] | |
| # Normaliser entre 0 et 1 | |
| cam = normalize(cam) | |
| return cam | |
| def preprocess_image(image_bytes, target_size, preprocess_input): | |
| try: | |
| logger.info("📤 Lecture des bytes et conversion en image PIL") | |
| image = Image.open(io.BytesIO(image_bytes)).convert("RGB") | |
| except Exception as e: | |
| logger.exception("❌ Erreur lors de l'ouverture de l'image") | |
| raise ValueError("Impossible de décoder l'image") from e | |
| logger.info(f"📐 Redimensionnement de l'image à la taille {target_size}") | |
| image = image.resize(target_size) | |
| image_array = np.array(image).astype(np.float32) | |
| logger.debug(f"🔍 Shape de l'image après conversion en tableau : {image_array.shape}") | |
| if image_array.ndim != 3 or image_array.shape[-1] != 3: | |
| logger.error(f"❌ Image invalide : shape={image_array.shape}") | |
| raise ValueError("Image must have 3 channels (RGB)") | |
| logger.info("🎨 Conversion et prétraitement de l'image") | |
| # Préparation pour la prédiction | |
| preprocessed_input = preprocess_input(image_array.copy()) | |
| preprocessed_input = np.expand_dims(preprocessed_input, axis=0) | |
| # Préparation pour Grad-CAM (non prétraitée, mais batchifiée et en float32) | |
| raw_input = np.expand_dims(image_array / 255.0, axis=0) # Mise à l’échelle simple | |
| logger.debug(f"🧪 Shape après ajout de la dimension batch : {preprocessed_input.shape}") | |
| return preprocessed_input, raw_input | |
| def hash_image_bytes(image_bytes): | |
| return hashlib.md5(image_bytes).hexdigest() | |
| def compute_entropy_safe(probas): | |
| probas = np.array(probas) | |
| # On garde uniquement les probabilités strictement positives | |
| mask = probas > 0 | |
| entropy = -np.sum(probas[mask] * np.log(probas[mask])) | |
| return entropy | |
| def predict_with_model(config, image_bytes: bytes): | |
| return predict_with_cache(config, image_bytes) | |
| def predict_with_cache(config, image_bytes: bytes): | |
| hash_key = hash_image_bytes(image_bytes) | |
| pred_key = f"{hash_key}_pred" | |
| if pred_key in cache: | |
| logger.info(f"✅ prédiction trouvée dans le cache {hash_key}") | |
| return cache[pred_key] | |
| input_array,_ = preprocess_image(image_bytes,config["target_size"],config["preprocess_input"]) | |
| logger.info("🤖 Lancement de la prédiction avec le modèle") | |
| preds = config["model"].predict(input_array) | |
| logger.debug(f"📈 Prédictions brutes : {preds[0].tolist()}") | |
| predicted_class_index = int(np.argmax(preds[0])) | |
| confidence = float(preds[0][predicted_class_index]) | |
| entropy=float(compute_entropy_safe(preds)) | |
| logger.info(f"✅ Prédiction : classe={predicted_class_index}, confiance={confidence:.4f},entropy={entropy:.4f}") | |
| result= { | |
| "preds": preds[0].tolist(), | |
| "predicted_class": predicted_class_index, | |
| "confidence": confidence, | |
| "entropy":entropy | |
| } | |
| cache[pred_key] = result | |
| return result | |
| def get_heatmap(config, image_bytes: bytes, predicted_class_index): | |
| result = {} | |
| try: | |
| hash_key = hash_image_bytes(image_bytes) | |
| heatmap_key = f"{hash_key}_heatmap" | |
| # Vérification cache mémoire d'abord | |
| if heatmap_key in cache: | |
| logger.info(f"✅ Heatmap trouvée dans le cache {heatmap_key}") | |
| result["heatmap"] = cache[heatmap_key] | |
| return result | |
| # Calcul si non trouvé dans le cache | |
| _, raw_input = preprocess_image( | |
| image_bytes, config["target_size"], config["preprocess_input"] | |
| ) | |
| logger.info("✅ Début de la génération de la heatmap") | |
| start_time = time.time() | |
| heatmap = compute_gradcam( | |
| config["gradcam_model"], | |
| raw_input, | |
| class_index=predicted_class_index, | |
| layer_name=config["last_conv_layer"], | |
| gradcam_type=config["gradcam_type"], | |
| ) | |
| elapsed_time = time.time() - start_time | |
| logger.info(f"✅ Heatmap générée en {elapsed_time:.2f} secondes") | |
| # Conversion en liste pour le JSON | |
| heatmap_list = heatmap.tolist() | |
| result["heatmap"] = heatmap_list | |
| # Sauvegarde dans cache mémoire | |
| cache[heatmap_key] = heatmap_list | |
| except Exception as e: | |
| logger.error(f"❌ Erreur lors de la génération de la heatmap: {e}") | |
| result["heatmap"] = [] | |
| return result | |
| def get_heatmap_old(config, image_bytes: bytes,predicted_class_index): | |
| result={} | |
| try: | |
| _,raw_input = preprocess_image(image_bytes,config["target_size"],config["preprocess_input"]) | |
| logger.info("✅ Début de la génération de la heatmap") | |
| start_time = time.time() | |
| # Vérification des entrées | |
| logger.info(f"🖼️ Image d'entrée shape: {raw_input.shape}") | |
| logger.info(f"🎯 Index de classe prédite: {predicted_class_index}") | |
| logger.info(f"🛠️ Dernière couche utilisée: {config['last_conv_layer']}") | |
| # Calcul de la heatmap | |
| heatmap = compute_gradcam(config["gradcam_model"], raw_input, class_index=predicted_class_index, layer_name=config["last_conv_layer"],gradcam_type=config["gradcam_type"]) | |
| elapsed_time = time.time() - start_time | |
| logger.info(f"✅ Heatmap générée en {elapsed_time:.2f} secondes") | |
| # Conversion en liste pour le JSON | |
| result["heatmap"] = heatmap.tolist() | |
| except Exception as e: | |
| logger.error(f"❌ Erreur lors de la génération de la heatmap: {e}") | |
| result["heatmap"] = [] | |
| return result | |