from datasets import Dataset, Image as DatasetsImage import os from PIL import Image import torch from transformers import AutoFeatureExtractor, AutoModelForImageClassification from torchvision import transforms import numpy as np # Fonction build_dataset (inchangée) def build_dataset(base_path): image_paths = [] labels = [] # On cible uniquement le dossier "candle" candle_path = os.path.join(base_path, "candle", "Data", "Images") # Images normales normal_path = os.path.join(candle_path, "Normal") if os.path.exists(normal_path): for img in os.listdir(normal_path): image_paths.append(os.path.join(normal_path, img)) labels.append(0) # Normal # Images anomalies anomaly_path = os.path.join(candle_path, "Anomaly") if os.path.exists(anomaly_path): for img in os.listdir(anomaly_path): image_paths.append(os.path.join(anomaly_path, img)) labels.append(1) # Anomalie return Dataset.from_dict({ "image_path": image_paths, "label": labels, }) feature_extractor = AutoFeatureExtractor.from_pretrained("microsoft/resnet-50") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = AutoModelForImageClassification.from_pretrained("microsoft/resnet-50") model.to(device) model.eval() # Fonction load_and_preprocess_image (inchangée) def load_and_preprocess_image(img_path): """ Charge et prétraite une image en utilisant le feature extractor. """ image = Image.open(img_path).convert("RGB") inputs = feature_extractor(images=image, return_tensors="pt") return inputs["pixel_values"] # Fonction extract_features (inchangée) def extract_features(image_paths): """ Extrait des caractéristiques pour chaque image de la liste image_paths. Ici, on utilise les logits du modèle comme représentation. """ features = [] with torch.no_grad(): for img_path in image_paths: image_tensor = load_and_preprocess_image(img_path).to(device) outputs = model(pixel_values=image_tensor) # Dans ce cas, nous utilisons les logits comme vecteur de caractéristiques. feature_vector = outputs.logits.cpu().numpy() features.append(feature_vector) return np.vstack(features) def adaptive_threshold(anomaly_scores): """ Calcule un seuil adaptatif en utilisant la méthode IQR. """ Q1 = np.percentile(anomaly_scores, 25) Q3 = np.percentile(anomaly_scores, 75) IQR = Q3 - Q1 threshold = Q3 + 1.5 * IQR # Ajuster le facteur 1.5 si besoin return threshold def analyze_image(image_path, all_features_for_threshold): """ Analyse une image et détermine si elle est une anomalie. """ # Extraire les caractéristiques de l'image features = extract_features([image_path]) # Appliquer la transformation PCA pca = joblib.load("pca_transformer.pkl") features_reduced = pca.transform(features) # Prédire le score d'anomalie anomaly_detector = joblib.load("anomaly_detector.pkl") anomaly_score = anomaly_detector.decision_function(features_reduced)[0] # Calculer le seuil adaptatif pour l'ensemble d'entraînement threshold = adaptive_threshold(all_features_for_threshold) # Déterminer si l'image est une anomalie if anomaly_score < threshold: result = "Anomalie" else: result = "Normale" return result, anomaly_score