File size: 6,039 Bytes
19ea92a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import os
import numpy as np
import cv2 as cv
from mtcnn.mtcnn import MTCNN
from keras_facenet import FaceNet

class FaceEmbeddingGenerator:
    def __init__(self, directory, output_path):
        self.directory = directory
        self.output_path = output_path
        self.target_size = (160, 160)
        self.detector = MTCNN()
        self.embedder = FaceNet()
        self.embeddings = []
        self.labels = []

    def extract_face(self, filename):
        img = cv.imread(filename)
        if img is None:
            raise ValueError(f"Image {filename} not found or invalid format.")
        img = cv.cvtColor(img, cv.COLOR_BGR2RGB)

        # Resize large images to avoid excessive memory usage
        if max(img.shape[:2]) > 1024:
            scale_factor = 1024 / max(img.shape[:2])
            img = cv.resize(img, None, fx=scale_factor, fy=scale_factor)

        detection = self.detector.detect_faces(img)
        if not detection:
            raise ValueError(f"No face detected in {filename}.")

        x, y, w, h = detection[0]['box']
        x, y = abs(x), abs(y)
        face = img[y:y + h, x:x + w]
        face_arr = cv.resize(face, self.target_size)
        return face_arr

    def get_embedding(self, face_img):
        """Generate an embedding from a face image."""
        face_img = face_img.astype('float32')
        face_img = np.expand_dims(face_img, axis=0)
        return self.embedder.embeddings(face_img)[0]  # 512D vector

    def save_batch(self, folder_output_path, embeddings, labels):
        """Save a batch of embeddings to a temporary file."""
        temp_path = folder_output_path.replace(".npz", "_temp.npz")
        np.savez_compressed(temp_path, embeddings=np.asarray(embeddings), labels=np.asarray(labels))
        print(f"Saved intermediate embeddings to {temp_path}")

    def process_folder(self, folder_path, label, batch_size=50):
        """Process a single folder of images in batches."""
        batch_embeddings = []
        batch_labels = []
        count = 0

        for filename in os.listdir(folder_path):
            try:
                filepath = os.path.join(folder_path, filename)
                face = self.extract_face(filepath)
                embedding = self.get_embedding(face)
                batch_embeddings.append(embedding)
                batch_labels.append(label)
                count += 1

                if count % batch_size == 0:
                    # Save batch and clear memory
                    self.save_batch(self.output_path, batch_embeddings, batch_labels)
                    self.embeddings.extend(batch_embeddings)
                    self.labels.extend(batch_labels)
                    batch_embeddings = []
                    batch_labels = []

            except Exception as e:
                print(f"Error processing {filename}: {e}")

        # Save remaining data
        if batch_embeddings:
            self.save_batch(self.output_path, batch_embeddings, batch_labels)
            self.embeddings.extend(batch_embeddings)
            self.labels.extend(batch_labels)

    def process_all_classes(self):
        """Process all folders and save embeddings for each folder separately."""
        for sub_dir in os.listdir(self.directory):
            sub_dir_path = os.path.join(self.directory, sub_dir)
            if not os.path.isdir(sub_dir_path):
                continue

            # Define output file for this folder
            folder_output_path = os.path.join(self.output_path, f"{sub_dir}_embeddings.npz")

            # Skip folder if its embeddings already exist
            if os.path.exists(folder_output_path):
                print(f"Skipping folder {sub_dir} as embeddings already exist.")
                continue

            print(f"Processing folder: {sub_dir}")

            # Clear previous embeddings and labels
            self.embeddings = []
            self.labels = []

            # Process the current folder
            self.process_folder(sub_dir_path, sub_dir)

            # Save the embeddings and labels for the current folder
            np.savez_compressed(folder_output_path, embeddings=np.asarray(self.embeddings), labels=np.asarray(self.labels))
            print(f"Saved embeddings for folder {sub_dir} to {folder_output_path}")

def merge_all_embeddings(output_dir, final_output_file):
    """Merge all folder embeddings into a single NPZ file."""
    all_embeddings = []
    all_labels = []

    # Iterate over all files in the output directory
    for filename in os.listdir(output_dir):
        filepath = os.path.join(output_dir, filename)
        
        # Skip non-NPZ files
        if not filename.endswith(".npz"):
            continue
        
        # Load the embeddings and labels from the file
        data = np.load(filepath)
        embeddings = data['embeddings']
        labels = data['labels']
        
        # Append to the overall list
        all_embeddings.append(embeddings)
        all_labels.append(labels)

    # Combine all embeddings and labels
    all_embeddings = np.vstack(all_embeddings)
    all_labels = np.hstack(all_labels)

    # Save the merged embeddings and labels into a final NPZ file
    np.savez_compressed(final_output_file, embeddings=all_embeddings, labels=all_labels)
    print(f"Final merged embeddings saved to {final_output_file}")

# Usage
data_dir = "/home/shanin/Desktop/SHANIN/MAIN/ALL_CODE/Face_Recognition/DATASET"  # Replace with the path to your dataset
output_dir = "/home/shanin/Desktop/SHANIN/MAIN/ALL_CODE/Face_Recognition/tmp"  # Replace with your desired output directory

# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)

# Initialize and process all classes
face_generator = FaceEmbeddingGenerator(data_dir, output_dir)
face_generator.process_all_classes()

# Path for the final merged embeddings file
final_output_file = "/home/shanin/Desktop/SHANIN/MAIN/ALL_CODE/Face_Recognition/Face_Embedding_v5.npz"

# Merge all embeddings
merge_all_embeddings(output_dir, final_output_file)