reco-resnet-api / app /model.py
rkonan's picture
optimisation gradcam
1befe3f
import tensorflow as tf
import numpy as np
from PIL import Image
import tensorflow as tf
import logging
import numpy as np
from PIL import Image
from keras.applications.efficientnet_v2 import preprocess_input as effnet_preprocess
from keras.applications.resnet_v2 import preprocess_input as resnet_preprocess
import io
from tf_keras_vis.gradcam import Gradcam,GradcamPlusPlus
from tf_keras_vis.utils import normalize
import numpy as np
import tensorflow as tf
from tf_keras_vis.saliency import Saliency
from tf_keras_vis.utils import normalize
import numpy as np
import tensorflow as tf
from tf_keras_vis.saliency import Saliency
from tf_keras_vis.utils import normalize
import logging
import time
import pickle
import hashlib
from typing import TypedDict, Callable, Any
from app.log import logger
import os
import diskcache as dc
confidence_threshold=0.55
entropy_threshold=2
# Cache en mémoire (simple dict)
HEATMAP_CACHE = {}
# Optionnel : répertoire cache sur disque
CACHE_DIR = "./cache_heatmaps"
os.makedirs(CACHE_DIR, exist_ok=True)
CACHE_DIR = './cache'
os.makedirs(CACHE_DIR, exist_ok=True)
cache = dc.Cache(CACHE_DIR)
class ModelStruct(TypedDict):
model_name: str
model: tf.keras.Model
gradcam_model:tf.keras.Model
preprocess_input: Callable[[np.ndarray], Any]
target_size: tuple[int, int]
last_conv_layer:str
gradcam_type:str
_model_cache: list[ModelStruct] | None = None
def load_model() -> list[ModelStruct]:
global _model_cache
if _model_cache is None:
print("📦 Chargement du modèle Resnet50V2...")
model = tf.keras.models.load_model("model/best_ResNet50V2_gradcam.keras",compile=False)
_model_cache = [ {
"model_name":"ResNet50V2",
"gradcam_model":model,
"model":model,
"preprocess_input":resnet_preprocess,
"target_size":(224, 224),
"last_conv_layer":"conv5_block3_out",
"gradcam_type":"gradcam"
#"gradcam_type":"gradcam++"
}]
return _model_cache
def compute_gradcam_v2(model, image_array, class_index=None, layer_name=None, gradcam_type="gradcam"):
logging.info(f"🔍 Lancement du calcul Grad-CAM — type: {gradcam_type}")
# GPU check
gpus = tf.config.list_physical_devices("GPU")
if not gpus:
logging.warning("⚠️ Aucun GPU visible pour TensorFlow.")
else:
logging.info(f"✅ GPU(s) détecté(s) : {[gpu.name for gpu in gpus]}")
start_time = time.time()
# Préparation de l'input
if image_array.ndim == 3:
input_tensor = tf.convert_to_tensor(np.expand_dims(image_array, axis=0), dtype=tf.float32)
else:
input_tensor = tf.convert_to_tensor(image_array, dtype=tf.float32)
# Vérification device des tenseurs
logging.info(f"📦 Device input_tensor : {input_tensor.device}")
if model.trainable_variables:
try:
logging.info(f"📦 Device premier poids modèle : {model.trainable_variables[0].device}")
except AttributeError:
logging.warning("⚠️ Impossible d'accéder au device du premier poids (Variable object).")
else:
logging.warning("❗ Aucun poids trainable trouvé dans le modèle.")
# Instanciation du GradCAM
if gradcam_type == "gradcam++":
gradcam = GradcamPlusPlus(model, clone=False)
else:
gradcam = Gradcam(model, clone=False)
# Fonction de loss personnalisée
def loss(output):
class_idx = tf.argmax(output[0]) if class_index is None else class_index
return output[:, class_idx]
# Détection de la couche convolutionnelle cible
if layer_name is None:
for layer in reversed(model.layers):
if 'conv' in layer.name and len(layer.output_shape) == 4:
layer_name = layer.name
break
if layer_name is None:
raise ValueError("❌ Aucune couche conv 2D trouvée.")
logging.info(f"🎯 Couche utilisée pour la Grad-CAM : {layer_name}")
# Calcul sur GPU (ou fallback CPU si indisponible)
try:
with tf.device("/GPU:0"):
cam = gradcam(loss, input_tensor, penultimate_layer=layer_name)
logging.info("🚀 Grad-CAM exécutée avec /GPU:0")
except RuntimeError as e:
logging.warning(f"⚠️ Fallback CPU — erreur GPU : {e}")
cam = gradcam(loss, input_tensor, penultimate_layer=layer_name)
logging.info("🏃 Grad-CAM exécutée sur CPU")
cam = cam[0] # (H, W)
cam = normalize(cam)
elapsed = time.time() - start_time
logging.info(f"✅ Grad-CAM terminée en {elapsed:.2f} sec — shape: {cam.shape}")
return cam
def compute_gradcam(model, image_array, class_index=None, layer_name=None,gradcam_type="gradcam"):
"""
Calcule la carte Grad-CAM pour une image et un modèle Keras.
Args:
model: tf.keras.Model.
image_array: np.array (H, W, 3), float32, pré-traitée.
class_index: int ou None, index de la classe cible. Si None, classe prédite.
layer_name: str ou None, nom de la couche convolutionnelle à utiliser. Si None, dernière conv.
Returns:
gradcam_map: np.array (H, W), normalisée entre 0 et 1.
"""
logging.info(f"Lancement calcul de la gradcam avec le type {gradcam_type}")
if image_array.ndim == 3:
input_tensor = np.expand_dims(image_array, axis=0)
else:
input_tensor = image_array
if gradcam_type=="gradcam++":
gradcam = GradcamPlusPlus(model, clone=False)
else:
gradcam = Gradcam(model, clone=False)
def loss(output):
if class_index is None:
class_index_local = tf.argmax(output[0])
else:
class_index_local = class_index
return output[:, class_index_local]
# Choisir la couche à utiliser pour GradCAM
if layer_name is None:
# Si non spécifié, chercher la dernière couche conv 2D
for layer in reversed(model.layers):
if 'conv' in layer.name and len(layer.output_shape) == 4:
layer_name = layer.name
break
if layer_name is None:
raise ValueError("Aucune couche convolutionnelle 2D trouvée dans le modèle.")
cam = gradcam(loss, input_tensor, penultimate_layer=layer_name)
cam = cam[0]
# Normaliser entre 0 et 1
cam = normalize(cam)
return cam
def preprocess_image(image_bytes, target_size, preprocess_input):
try:
logger.info("📤 Lecture des bytes et conversion en image PIL")
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
except Exception as e:
logger.exception("❌ Erreur lors de l'ouverture de l'image")
raise ValueError("Impossible de décoder l'image") from e
logger.info(f"📐 Redimensionnement de l'image à la taille {target_size}")
image = image.resize(target_size)
image_array = np.array(image).astype(np.float32)
logger.debug(f"🔍 Shape de l'image après conversion en tableau : {image_array.shape}")
if image_array.ndim != 3 or image_array.shape[-1] != 3:
logger.error(f"❌ Image invalide : shape={image_array.shape}")
raise ValueError("Image must have 3 channels (RGB)")
logger.info("🎨 Conversion et prétraitement de l'image")
# Préparation pour la prédiction
preprocessed_input = preprocess_input(image_array.copy())
preprocessed_input = np.expand_dims(preprocessed_input, axis=0)
# Préparation pour Grad-CAM (non prétraitée, mais batchifiée et en float32)
raw_input = np.expand_dims(image_array / 255.0, axis=0) # Mise à l’échelle simple
logger.debug(f"🧪 Shape après ajout de la dimension batch : {preprocessed_input.shape}")
return preprocessed_input, raw_input
def hash_image_bytes(image_bytes):
return hashlib.md5(image_bytes).hexdigest()
def compute_entropy_safe(probas):
probas = np.array(probas)
# On garde uniquement les probabilités strictement positives
mask = probas > 0
entropy = -np.sum(probas[mask] * np.log(probas[mask]))
return entropy
def predict_with_model(config, image_bytes: bytes):
return predict_with_cache(config, image_bytes)
def predict_with_cache(config, image_bytes: bytes):
hash_key = hash_image_bytes(image_bytes)
pred_key = f"{hash_key}_pred"
if pred_key in cache:
logger.info(f"✅ prédiction trouvée dans le cache {hash_key}")
return cache[pred_key]
input_array,_ = preprocess_image(image_bytes,config["target_size"],config["preprocess_input"])
logger.info("🤖 Lancement de la prédiction avec le modèle")
preds = config["model"].predict(input_array)
logger.debug(f"📈 Prédictions brutes : {preds[0].tolist()}")
predicted_class_index = int(np.argmax(preds[0]))
confidence = float(preds[0][predicted_class_index])
entropy=float(compute_entropy_safe(preds))
logger.info(f"✅ Prédiction : classe={predicted_class_index}, confiance={confidence:.4f},entropy={entropy:.4f}")
result= {
"preds": preds[0].tolist(),
"predicted_class": predicted_class_index,
"confidence": confidence,
"entropy":entropy
}
cache[pred_key] = result
return result
def get_heatmap(config, image_bytes: bytes, predicted_class_index):
result = {}
try:
hash_key = hash_image_bytes(image_bytes)
heatmap_key = f"{hash_key}_heatmap"
# Vérification cache mémoire d'abord
if heatmap_key in cache:
logger.info(f"✅ Heatmap trouvée dans le cache {heatmap_key}")
result["heatmap"] = cache[heatmap_key]
return result
# Calcul si non trouvé dans le cache
_, raw_input = preprocess_image(
image_bytes, config["target_size"], config["preprocess_input"]
)
logger.info("✅ Début de la génération de la heatmap")
start_time = time.time()
heatmap = compute_gradcam(
config["gradcam_model"],
raw_input,
class_index=predicted_class_index,
layer_name=config["last_conv_layer"],
gradcam_type=config["gradcam_type"],
)
elapsed_time = time.time() - start_time
logger.info(f"✅ Heatmap générée en {elapsed_time:.2f} secondes")
# Conversion en liste pour le JSON
heatmap_list = heatmap.tolist()
result["heatmap"] = heatmap_list
# Sauvegarde dans cache mémoire
cache[heatmap_key] = heatmap_list
except Exception as e:
logger.error(f"❌ Erreur lors de la génération de la heatmap: {e}")
result["heatmap"] = []
return result
def get_heatmap_old(config, image_bytes: bytes,predicted_class_index):
result={}
try:
_,raw_input = preprocess_image(image_bytes,config["target_size"],config["preprocess_input"])
logger.info("✅ Début de la génération de la heatmap")
start_time = time.time()
# Vérification des entrées
logger.info(f"🖼️ Image d'entrée shape: {raw_input.shape}")
logger.info(f"🎯 Index de classe prédite: {predicted_class_index}")
logger.info(f"🛠️ Dernière couche utilisée: {config['last_conv_layer']}")
# Calcul de la heatmap
heatmap = compute_gradcam(config["gradcam_model"], raw_input, class_index=predicted_class_index, layer_name=config["last_conv_layer"],gradcam_type=config["gradcam_type"])
elapsed_time = time.time() - start_time
logger.info(f"✅ Heatmap générée en {elapsed_time:.2f} secondes")
# Conversion en liste pour le JSON
result["heatmap"] = heatmap.tolist()
except Exception as e:
logger.error(f"❌ Erreur lors de la génération de la heatmap: {e}")
result["heatmap"] = []
return result