File size: 3,469 Bytes
6462c1c
 
 
 
 
 
 
 
 
 
 
 
f084485
 
6462c1c
f084485
 
 
 
 
 
6462c1c
f084485
 
 
 
 
 
6462c1c
 
 
 
 
 
fe6cf06
2a161ab
3be26bb
 
 
fe6cf06
6462c1c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f6a6e50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from datasets import Dataset, Image as DatasetsImage
import os
from PIL import Image
import torch
from transformers import AutoFeatureExtractor, AutoModelForImageClassification
from torchvision import transforms
import numpy as np

# Fonction build_dataset (inchangée)
def build_dataset(base_path):
    image_paths = []
    labels = []
    # On cible uniquement le dossier "candle"
    candle_path = os.path.join(base_path, "candle", "Data", "Images") 

    # Images normales
    normal_path = os.path.join(candle_path, "Normal")
    if os.path.exists(normal_path):
        for img in os.listdir(normal_path):
            image_paths.append(os.path.join(normal_path, img))
            labels.append(0)  # Normal

    # Images anomalies
    anomaly_path = os.path.join(candle_path, "Anomaly")
    if os.path.exists(anomaly_path):
        for img in os.listdir(anomaly_path):
            image_paths.append(os.path.join(anomaly_path, img))
            labels.append(1)  # Anomalie

    return Dataset.from_dict({
        "image_path": image_paths,
        "label": labels,
    })

feature_extractor = AutoFeatureExtractor.from_pretrained("microsoft/resnet-50")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModelForImageClassification.from_pretrained("microsoft/resnet-50")
model.to(device)
model.eval()

# Fonction load_and_preprocess_image (inchangée)
def load_and_preprocess_image(img_path):
    """
    Charge et prétraite une image en utilisant le feature extractor.
    """
    image = Image.open(img_path).convert("RGB")
    inputs = feature_extractor(images=image, return_tensors="pt")
    return inputs["pixel_values"]

# Fonction extract_features (inchangée)
def extract_features(image_paths):
    """
    Extrait des caractéristiques pour chaque image de la liste image_paths.
    Ici, on utilise les logits du modèle comme représentation.
    """
    features = []
    with torch.no_grad():
        for img_path in image_paths:
            image_tensor = load_and_preprocess_image(img_path).to(device)
            outputs = model(pixel_values=image_tensor)
            # Dans ce cas, nous utilisons les logits comme vecteur de caractéristiques.
            feature_vector = outputs.logits.cpu().numpy()
            features.append(feature_vector)
    return np.vstack(features)

def adaptive_threshold(anomaly_scores):
  """
  Calcule un seuil adaptatif en utilisant la méthode IQR.
  """
  Q1 = np.percentile(anomaly_scores, 25)
  Q3 = np.percentile(anomaly_scores, 75)
  IQR = Q3 - Q1
  threshold = Q3 + 1.5 * IQR  # Ajuster le facteur 1.5 si besoin
  return threshold

def analyze_image(image_path, all_features_for_threshold):
    """
    Analyse une image et détermine si elle est une anomalie.
    """
    # Extraire les caractéristiques de l'image
    features = extract_features([image_path])

    # Appliquer la transformation PCA
    pca = joblib.load("pca_transformer.pkl")
    features_reduced = pca.transform(features)

    # Prédire le score d'anomalie
    anomaly_detector = joblib.load("anomaly_detector.pkl")
    anomaly_score = anomaly_detector.decision_function(features_reduced)[0]

    # Calculer le seuil adaptatif pour l'ensemble d'entraînement
    threshold = adaptive_threshold(all_features_for_threshold)

    # Déterminer si l'image est une anomalie
    if anomaly_score < threshold:
        result = "Anomalie"
    else:
        result = "Normale"

    return result, anomaly_score