import os import numpy as np import cv2 as cv from mtcnn.mtcnn import MTCNN from keras_facenet import FaceNet class FaceEmbeddingGenerator: def __init__(self, directory, output_path): self.directory = directory self.output_path = output_path self.target_size = (160, 160) self.detector = MTCNN() self.embedder = FaceNet() self.embeddings = [] self.labels = [] def extract_face(self, filename): img = cv.imread(filename) if img is None: raise ValueError(f"Image {filename} not found or invalid format.") img = cv.cvtColor(img, cv.COLOR_BGR2RGB) # Resize large images to avoid excessive memory usage if max(img.shape[:2]) > 1024: scale_factor = 1024 / max(img.shape[:2]) img = cv.resize(img, None, fx=scale_factor, fy=scale_factor) detection = self.detector.detect_faces(img) if not detection: raise ValueError(f"No face detected in {filename}.") x, y, w, h = detection[0]['box'] x, y = abs(x), abs(y) face = img[y:y + h, x:x + w] face_arr = cv.resize(face, self.target_size) return face_arr def get_embedding(self, face_img): """Generate an embedding from a face image.""" face_img = face_img.astype('float32') face_img = np.expand_dims(face_img, axis=0) return self.embedder.embeddings(face_img)[0] # 512D vector def save_batch(self, folder_output_path, embeddings, labels): """Save a batch of embeddings to a temporary file.""" temp_path = folder_output_path.replace(".npz", "_temp.npz") np.savez_compressed(temp_path, embeddings=np.asarray(embeddings), labels=np.asarray(labels)) print(f"Saved intermediate embeddings to {temp_path}") def process_folder(self, folder_path, label, batch_size=50): """Process a single folder of images in batches.""" batch_embeddings = [] batch_labels = [] count = 0 for filename in os.listdir(folder_path): try: filepath = os.path.join(folder_path, filename) face = self.extract_face(filepath) embedding = self.get_embedding(face) batch_embeddings.append(embedding) batch_labels.append(label) count += 1 if count % batch_size == 0: # Save batch and clear memory self.save_batch(self.output_path, batch_embeddings, batch_labels) self.embeddings.extend(batch_embeddings) self.labels.extend(batch_labels) batch_embeddings = [] batch_labels = [] except Exception as e: print(f"Error processing {filename}: {e}") # Save remaining data if batch_embeddings: self.save_batch(self.output_path, batch_embeddings, batch_labels) self.embeddings.extend(batch_embeddings) self.labels.extend(batch_labels) def process_all_classes(self): """Process all folders and save embeddings for each folder separately.""" for sub_dir in os.listdir(self.directory): sub_dir_path = os.path.join(self.directory, sub_dir) if not os.path.isdir(sub_dir_path): continue # Define output file for this folder folder_output_path = os.path.join(self.output_path, f"{sub_dir}_embeddings.npz") # Skip folder if its embeddings already exist if os.path.exists(folder_output_path): print(f"Skipping folder {sub_dir} as embeddings already exist.") continue print(f"Processing folder: {sub_dir}") # Clear previous embeddings and labels self.embeddings = [] self.labels = [] # Process the current folder self.process_folder(sub_dir_path, sub_dir) # Save the embeddings and labels for the current folder np.savez_compressed(folder_output_path, embeddings=np.asarray(self.embeddings), labels=np.asarray(self.labels)) print(f"Saved embeddings for folder {sub_dir} to {folder_output_path}") def merge_all_embeddings(output_dir, final_output_file): """Merge all folder embeddings into a single NPZ file.""" all_embeddings = [] all_labels = [] # Iterate over all files in the output directory for filename in os.listdir(output_dir): filepath = os.path.join(output_dir, filename) # Skip non-NPZ files if not filename.endswith(".npz"): continue # Load the embeddings and labels from the file data = np.load(filepath) embeddings = data['embeddings'] labels = data['labels'] # Append to the overall list all_embeddings.append(embeddings) all_labels.append(labels) # Combine all embeddings and labels all_embeddings = np.vstack(all_embeddings) all_labels = np.hstack(all_labels) # Save the merged embeddings and labels into a final NPZ file np.savez_compressed(final_output_file, embeddings=all_embeddings, labels=all_labels) print(f"Final merged embeddings saved to {final_output_file}") # Usage data_dir = "/home/shanin/Desktop/SHANIN/MAIN/ALL_CODE/Face_Recognition/DATASET" # Replace with the path to your dataset output_dir = "/home/shanin/Desktop/SHANIN/MAIN/ALL_CODE/Face_Recognition/tmp" # Replace with your desired output directory # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) # Initialize and process all classes face_generator = FaceEmbeddingGenerator(data_dir, output_dir) face_generator.process_all_classes() # Path for the final merged embeddings file final_output_file = "/home/shanin/Desktop/SHANIN/MAIN/ALL_CODE/Face_Recognition/Face_Embedding_v5.npz" # Merge all embeddings merge_all_embeddings(output_dir, final_output_file)