|
|
import os |
|
|
import numpy as np |
|
|
import cv2 as cv |
|
|
from mtcnn.mtcnn import MTCNN |
|
|
from keras_facenet import FaceNet |
|
|
|
|
|
class FaceEmbeddingGenerator: |
|
|
def __init__(self, directory, output_path): |
|
|
self.directory = directory |
|
|
self.output_path = output_path |
|
|
self.target_size = (160, 160) |
|
|
self.detector = MTCNN() |
|
|
self.embedder = FaceNet() |
|
|
self.embeddings = [] |
|
|
self.labels = [] |
|
|
|
|
|
def extract_face(self, filename): |
|
|
img = cv.imread(filename) |
|
|
if img is None: |
|
|
raise ValueError(f"Image {filename} not found or invalid format.") |
|
|
img = cv.cvtColor(img, cv.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
if max(img.shape[:2]) > 1024: |
|
|
scale_factor = 1024 / max(img.shape[:2]) |
|
|
img = cv.resize(img, None, fx=scale_factor, fy=scale_factor) |
|
|
|
|
|
detection = self.detector.detect_faces(img) |
|
|
if not detection: |
|
|
raise ValueError(f"No face detected in {filename}.") |
|
|
|
|
|
x, y, w, h = detection[0]['box'] |
|
|
x, y = abs(x), abs(y) |
|
|
face = img[y:y + h, x:x + w] |
|
|
face_arr = cv.resize(face, self.target_size) |
|
|
return face_arr |
|
|
|
|
|
def get_embedding(self, face_img): |
|
|
"""Generate an embedding from a face image.""" |
|
|
face_img = face_img.astype('float32') |
|
|
face_img = np.expand_dims(face_img, axis=0) |
|
|
return self.embedder.embeddings(face_img)[0] |
|
|
|
|
|
def save_batch(self, folder_output_path, embeddings, labels): |
|
|
"""Save a batch of embeddings to a temporary file.""" |
|
|
temp_path = folder_output_path.replace(".npz", "_temp.npz") |
|
|
np.savez_compressed(temp_path, embeddings=np.asarray(embeddings), labels=np.asarray(labels)) |
|
|
print(f"Saved intermediate embeddings to {temp_path}") |
|
|
|
|
|
def process_folder(self, folder_path, label, batch_size=50): |
|
|
"""Process a single folder of images in batches.""" |
|
|
batch_embeddings = [] |
|
|
batch_labels = [] |
|
|
count = 0 |
|
|
|
|
|
for filename in os.listdir(folder_path): |
|
|
try: |
|
|
filepath = os.path.join(folder_path, filename) |
|
|
face = self.extract_face(filepath) |
|
|
embedding = self.get_embedding(face) |
|
|
batch_embeddings.append(embedding) |
|
|
batch_labels.append(label) |
|
|
count += 1 |
|
|
|
|
|
if count % batch_size == 0: |
|
|
|
|
|
self.save_batch(self.output_path, batch_embeddings, batch_labels) |
|
|
self.embeddings.extend(batch_embeddings) |
|
|
self.labels.extend(batch_labels) |
|
|
batch_embeddings = [] |
|
|
batch_labels = [] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing {filename}: {e}") |
|
|
|
|
|
|
|
|
if batch_embeddings: |
|
|
self.save_batch(self.output_path, batch_embeddings, batch_labels) |
|
|
self.embeddings.extend(batch_embeddings) |
|
|
self.labels.extend(batch_labels) |
|
|
|
|
|
def process_all_classes(self): |
|
|
"""Process all folders and save embeddings for each folder separately.""" |
|
|
for sub_dir in os.listdir(self.directory): |
|
|
sub_dir_path = os.path.join(self.directory, sub_dir) |
|
|
if not os.path.isdir(sub_dir_path): |
|
|
continue |
|
|
|
|
|
|
|
|
folder_output_path = os.path.join(self.output_path, f"{sub_dir}_embeddings.npz") |
|
|
|
|
|
|
|
|
if os.path.exists(folder_output_path): |
|
|
print(f"Skipping folder {sub_dir} as embeddings already exist.") |
|
|
continue |
|
|
|
|
|
print(f"Processing folder: {sub_dir}") |
|
|
|
|
|
|
|
|
self.embeddings = [] |
|
|
self.labels = [] |
|
|
|
|
|
|
|
|
self.process_folder(sub_dir_path, sub_dir) |
|
|
|
|
|
|
|
|
np.savez_compressed(folder_output_path, embeddings=np.asarray(self.embeddings), labels=np.asarray(self.labels)) |
|
|
print(f"Saved embeddings for folder {sub_dir} to {folder_output_path}") |
|
|
|
|
|
def merge_all_embeddings(output_dir, final_output_file): |
|
|
"""Merge all folder embeddings into a single NPZ file.""" |
|
|
all_embeddings = [] |
|
|
all_labels = [] |
|
|
|
|
|
|
|
|
for filename in os.listdir(output_dir): |
|
|
filepath = os.path.join(output_dir, filename) |
|
|
|
|
|
|
|
|
if not filename.endswith(".npz"): |
|
|
continue |
|
|
|
|
|
|
|
|
data = np.load(filepath) |
|
|
embeddings = data['embeddings'] |
|
|
labels = data['labels'] |
|
|
|
|
|
|
|
|
all_embeddings.append(embeddings) |
|
|
all_labels.append(labels) |
|
|
|
|
|
|
|
|
all_embeddings = np.vstack(all_embeddings) |
|
|
all_labels = np.hstack(all_labels) |
|
|
|
|
|
|
|
|
np.savez_compressed(final_output_file, embeddings=all_embeddings, labels=all_labels) |
|
|
print(f"Final merged embeddings saved to {final_output_file}") |
|
|
|
|
|
|
|
|
data_dir = "/home/shanin/Desktop/SHANIN/MAIN/ALL_CODE/Face_Recognition/DATASET" |
|
|
output_dir = "/home/shanin/Desktop/SHANIN/MAIN/ALL_CODE/Face_Recognition/tmp" |
|
|
|
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
face_generator = FaceEmbeddingGenerator(data_dir, output_dir) |
|
|
face_generator.process_all_classes() |
|
|
|
|
|
|
|
|
final_output_file = "/home/shanin/Desktop/SHANIN/MAIN/ALL_CODE/Face_Recognition/Face_Embedding_v5.npz" |
|
|
|
|
|
|
|
|
merge_all_embeddings(output_dir, final_output_file) |
|
|
|
|
|
|