File size: 6,039 Bytes
19ea92a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import os
import numpy as np
import cv2 as cv
from mtcnn.mtcnn import MTCNN
from keras_facenet import FaceNet
class FaceEmbeddingGenerator:
def __init__(self, directory, output_path):
self.directory = directory
self.output_path = output_path
self.target_size = (160, 160)
self.detector = MTCNN()
self.embedder = FaceNet()
self.embeddings = []
self.labels = []
def extract_face(self, filename):
img = cv.imread(filename)
if img is None:
raise ValueError(f"Image {filename} not found or invalid format.")
img = cv.cvtColor(img, cv.COLOR_BGR2RGB)
# Resize large images to avoid excessive memory usage
if max(img.shape[:2]) > 1024:
scale_factor = 1024 / max(img.shape[:2])
img = cv.resize(img, None, fx=scale_factor, fy=scale_factor)
detection = self.detector.detect_faces(img)
if not detection:
raise ValueError(f"No face detected in {filename}.")
x, y, w, h = detection[0]['box']
x, y = abs(x), abs(y)
face = img[y:y + h, x:x + w]
face_arr = cv.resize(face, self.target_size)
return face_arr
def get_embedding(self, face_img):
"""Generate an embedding from a face image."""
face_img = face_img.astype('float32')
face_img = np.expand_dims(face_img, axis=0)
return self.embedder.embeddings(face_img)[0] # 512D vector
def save_batch(self, folder_output_path, embeddings, labels):
"""Save a batch of embeddings to a temporary file."""
temp_path = folder_output_path.replace(".npz", "_temp.npz")
np.savez_compressed(temp_path, embeddings=np.asarray(embeddings), labels=np.asarray(labels))
print(f"Saved intermediate embeddings to {temp_path}")
def process_folder(self, folder_path, label, batch_size=50):
"""Process a single folder of images in batches."""
batch_embeddings = []
batch_labels = []
count = 0
for filename in os.listdir(folder_path):
try:
filepath = os.path.join(folder_path, filename)
face = self.extract_face(filepath)
embedding = self.get_embedding(face)
batch_embeddings.append(embedding)
batch_labels.append(label)
count += 1
if count % batch_size == 0:
# Save batch and clear memory
self.save_batch(self.output_path, batch_embeddings, batch_labels)
self.embeddings.extend(batch_embeddings)
self.labels.extend(batch_labels)
batch_embeddings = []
batch_labels = []
except Exception as e:
print(f"Error processing {filename}: {e}")
# Save remaining data
if batch_embeddings:
self.save_batch(self.output_path, batch_embeddings, batch_labels)
self.embeddings.extend(batch_embeddings)
self.labels.extend(batch_labels)
def process_all_classes(self):
"""Process all folders and save embeddings for each folder separately."""
for sub_dir in os.listdir(self.directory):
sub_dir_path = os.path.join(self.directory, sub_dir)
if not os.path.isdir(sub_dir_path):
continue
# Define output file for this folder
folder_output_path = os.path.join(self.output_path, f"{sub_dir}_embeddings.npz")
# Skip folder if its embeddings already exist
if os.path.exists(folder_output_path):
print(f"Skipping folder {sub_dir} as embeddings already exist.")
continue
print(f"Processing folder: {sub_dir}")
# Clear previous embeddings and labels
self.embeddings = []
self.labels = []
# Process the current folder
self.process_folder(sub_dir_path, sub_dir)
# Save the embeddings and labels for the current folder
np.savez_compressed(folder_output_path, embeddings=np.asarray(self.embeddings), labels=np.asarray(self.labels))
print(f"Saved embeddings for folder {sub_dir} to {folder_output_path}")
def merge_all_embeddings(output_dir, final_output_file):
"""Merge all folder embeddings into a single NPZ file."""
all_embeddings = []
all_labels = []
# Iterate over all files in the output directory
for filename in os.listdir(output_dir):
filepath = os.path.join(output_dir, filename)
# Skip non-NPZ files
if not filename.endswith(".npz"):
continue
# Load the embeddings and labels from the file
data = np.load(filepath)
embeddings = data['embeddings']
labels = data['labels']
# Append to the overall list
all_embeddings.append(embeddings)
all_labels.append(labels)
# Combine all embeddings and labels
all_embeddings = np.vstack(all_embeddings)
all_labels = np.hstack(all_labels)
# Save the merged embeddings and labels into a final NPZ file
np.savez_compressed(final_output_file, embeddings=all_embeddings, labels=all_labels)
print(f"Final merged embeddings saved to {final_output_file}")
# Usage
data_dir = "/home/shanin/Desktop/SHANIN/MAIN/ALL_CODE/Face_Recognition/DATASET" # Replace with the path to your dataset
output_dir = "/home/shanin/Desktop/SHANIN/MAIN/ALL_CODE/Face_Recognition/tmp" # Replace with your desired output directory
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Initialize and process all classes
face_generator = FaceEmbeddingGenerator(data_dir, output_dir)
face_generator.process_all_classes()
# Path for the final merged embeddings file
final_output_file = "/home/shanin/Desktop/SHANIN/MAIN/ALL_CODE/Face_Recognition/Face_Embedding_v5.npz"
# Merge all embeddings
merge_all_embeddings(output_dir, final_output_file)
|