Spaces:
Runtime error
Runtime error
| from datasets import Dataset, Image as DatasetsImage | |
| import os | |
| from PIL import Image | |
| import torch | |
| from transformers import AutoFeatureExtractor, AutoModelForImageClassification | |
| from torchvision import transforms | |
| import numpy as np | |
| # Fonction build_dataset (inchangée) | |
| def build_dataset(base_path): | |
| image_paths = [] | |
| labels = [] | |
| # On cible uniquement le dossier "candle" | |
| candle_path = os.path.join(base_path, "candle", "Data", "Images") | |
| # Images normales | |
| normal_path = os.path.join(candle_path, "Normal") | |
| if os.path.exists(normal_path): | |
| for img in os.listdir(normal_path): | |
| image_paths.append(os.path.join(normal_path, img)) | |
| labels.append(0) # Normal | |
| # Images anomalies | |
| anomaly_path = os.path.join(candle_path, "Anomaly") | |
| if os.path.exists(anomaly_path): | |
| for img in os.listdir(anomaly_path): | |
| image_paths.append(os.path.join(anomaly_path, img)) | |
| labels.append(1) # Anomalie | |
| return Dataset.from_dict({ | |
| "image_path": image_paths, | |
| "label": labels, | |
| }) | |
| feature_extractor = AutoFeatureExtractor.from_pretrained("microsoft/resnet-50") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model = AutoModelForImageClassification.from_pretrained("microsoft/resnet-50") | |
| model.to(device) | |
| model.eval() | |
| # Fonction load_and_preprocess_image (inchangée) | |
| def load_and_preprocess_image(img_path): | |
| """ | |
| Charge et prétraite une image en utilisant le feature extractor. | |
| """ | |
| image = Image.open(img_path).convert("RGB") | |
| inputs = feature_extractor(images=image, return_tensors="pt") | |
| return inputs["pixel_values"] | |
| # Fonction extract_features (inchangée) | |
| def extract_features(image_paths): | |
| """ | |
| Extrait des caractéristiques pour chaque image de la liste image_paths. | |
| Ici, on utilise les logits du modèle comme représentation. | |
| """ | |
| features = [] | |
| with torch.no_grad(): | |
| for img_path in image_paths: | |
| image_tensor = load_and_preprocess_image(img_path).to(device) | |
| outputs = model(pixel_values=image_tensor) | |
| # Dans ce cas, nous utilisons les logits comme vecteur de caractéristiques. | |
| feature_vector = outputs.logits.cpu().numpy() | |
| features.append(feature_vector) | |
| return np.vstack(features) | |
| def adaptive_threshold(anomaly_scores): | |
| """ | |
| Calcule un seuil adaptatif en utilisant la méthode IQR. | |
| """ | |
| Q1 = np.percentile(anomaly_scores, 25) | |
| Q3 = np.percentile(anomaly_scores, 75) | |
| IQR = Q3 - Q1 | |
| threshold = Q3 + 1.5 * IQR # Ajuster le facteur 1.5 si besoin | |
| return threshold | |
| def analyze_image(image_path, all_features_for_threshold): | |
| """ | |
| Analyse une image et détermine si elle est une anomalie. | |
| """ | |
| # Extraire les caractéristiques de l'image | |
| features = extract_features([image_path]) | |
| # Appliquer la transformation PCA | |
| pca = joblib.load("pca_transformer.pkl") | |
| features_reduced = pca.transform(features) | |
| # Prédire le score d'anomalie | |
| anomaly_detector = joblib.load("anomaly_detector.pkl") | |
| anomaly_score = anomaly_detector.decision_function(features_reduced)[0] | |
| # Calculer le seuil adaptatif pour l'ensemble d'entraînement | |
| threshold = adaptive_threshold(all_features_for_threshold) | |
| # Déterminer si l'image est une anomalie | |
| if anomaly_score < threshold: | |
| result = "Anomalie" | |
| else: | |
| result = "Normale" | |
| return result, anomaly_score |