greenIA_space_test / utils.py
Imposteur04's picture
add functions
f6a6e50 verified
from datasets import Dataset, Image as DatasetsImage
import os
from PIL import Image
import torch
from transformers import AutoFeatureExtractor, AutoModelForImageClassification
from torchvision import transforms
import numpy as np
# Fonction build_dataset (inchangée)
def build_dataset(base_path):
image_paths = []
labels = []
# On cible uniquement le dossier "candle"
candle_path = os.path.join(base_path, "candle", "Data", "Images")
# Images normales
normal_path = os.path.join(candle_path, "Normal")
if os.path.exists(normal_path):
for img in os.listdir(normal_path):
image_paths.append(os.path.join(normal_path, img))
labels.append(0) # Normal
# Images anomalies
anomaly_path = os.path.join(candle_path, "Anomaly")
if os.path.exists(anomaly_path):
for img in os.listdir(anomaly_path):
image_paths.append(os.path.join(anomaly_path, img))
labels.append(1) # Anomalie
return Dataset.from_dict({
"image_path": image_paths,
"label": labels,
})
feature_extractor = AutoFeatureExtractor.from_pretrained("microsoft/resnet-50")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModelForImageClassification.from_pretrained("microsoft/resnet-50")
model.to(device)
model.eval()
# Fonction load_and_preprocess_image (inchangée)
def load_and_preprocess_image(img_path):
"""
Charge et prétraite une image en utilisant le feature extractor.
"""
image = Image.open(img_path).convert("RGB")
inputs = feature_extractor(images=image, return_tensors="pt")
return inputs["pixel_values"]
# Fonction extract_features (inchangée)
def extract_features(image_paths):
"""
Extrait des caractéristiques pour chaque image de la liste image_paths.
Ici, on utilise les logits du modèle comme représentation.
"""
features = []
with torch.no_grad():
for img_path in image_paths:
image_tensor = load_and_preprocess_image(img_path).to(device)
outputs = model(pixel_values=image_tensor)
# Dans ce cas, nous utilisons les logits comme vecteur de caractéristiques.
feature_vector = outputs.logits.cpu().numpy()
features.append(feature_vector)
return np.vstack(features)
def adaptive_threshold(anomaly_scores):
"""
Calcule un seuil adaptatif en utilisant la méthode IQR.
"""
Q1 = np.percentile(anomaly_scores, 25)
Q3 = np.percentile(anomaly_scores, 75)
IQR = Q3 - Q1
threshold = Q3 + 1.5 * IQR # Ajuster le facteur 1.5 si besoin
return threshold
def analyze_image(image_path, all_features_for_threshold):
"""
Analyse une image et détermine si elle est une anomalie.
"""
# Extraire les caractéristiques de l'image
features = extract_features([image_path])
# Appliquer la transformation PCA
pca = joblib.load("pca_transformer.pkl")
features_reduced = pca.transform(features)
# Prédire le score d'anomalie
anomaly_detector = joblib.load("anomaly_detector.pkl")
anomaly_score = anomaly_detector.decision_function(features_reduced)[0]
# Calculer le seuil adaptatif pour l'ensemble d'entraînement
threshold = adaptive_threshold(all_features_for_threshold)
# Déterminer si l'image est une anomalie
if anomaly_score < threshold:
result = "Anomalie"
else:
result = "Normale"
return result, anomaly_score