File size: 10,507 Bytes
c76265d a0368fa c76265d a0368fa c76265d 70733a7 a0368fa c76265d a0368fa c76265d 70733a7 c76265d 70733a7 c76265d a0368fa c76265d a0368fa c76265d a0368fa c76265d a0368fa c76265d a0368fa 70733a7 c76265d 70733a7 c76265d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 | #!/usr/bin/env python3
"""
Memory-Efficient Face Recognition & Clustering for Hugging Face Spaces
Handles 8 high-res DSLR images with preprocessing to prevent OOM (Exit Code 137)
"""
import os
import sys
import json
import gc
import cv2
import numpy as np
from pathlib import Path
from collections import defaultdict
import face_recognition
from sklearn.cluster import DBSCAN
from PIL import Image
import warnings
warnings.filterwarnings('ignore')
# Configuration
ROOT_DIR = Path(".")
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'}
MAX_WIDTH = 1000 # Resize to prevent memory overload
DBSCAN_EPS = 0.4 # Distance threshold for clustering (tune based on your needs)
DBSCAN_MIN_SAMPLES = 1 # Minimum samples to form a cluster
OUTPUT_JSON = "database.json"
def get_image_files(directory):
"""Scan directory for image files."""
image_files = []
for ext in IMAGE_EXTENSIONS:
image_files.extend(directory.glob(f"*{ext}"))
image_files.extend(directory.glob(f"*{ext.upper()}"))
return sorted(list(set(image_files)))
def preprocess_image(image_path, max_width=MAX_WIDTH):
"""
Memory-efficient preprocessing:
1. Open with OpenCV
2. Resize if width > max_width
3. Convert RGB for face_recognition
4. Force garbage collection
"""
try:
# Read image with OpenCV (memory efficient)
img = cv2.imread(str(image_path))
if img is None:
print(f"β οΈ Could not load: {image_path.name}")
return None
height, width = img.shape[:2]
# Resize if too large (prevents OOM)
if width > max_width:
scale = max_width / width
new_width = int(width * scale)
new_height = int(height * scale)
img = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_AREA)
print(f"π Resized {image_path.name}: {width}x{height} β {new_width}x{new_height}")
# Convert BGR (OpenCV) to RGB (face_recognition)
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# Clear original large image from memory immediately
del img
gc.collect()
return img_rgb
except Exception as e:
print(f"β Error processing {image_path.name}: {e}")
return None
def extract_face_encodings(image_path):
"""
Extract face encodings from a single image.
Returns list of encodings (multiple faces possible per image).
"""
img_rgb = preprocess_image(image_path)
if img_rgb is None:
return [], []
try:
# Detect face locations
face_locations = face_recognition.face_locations(img_rgb, model="hog") # "hog" is faster/less memory than "cnn"
if not face_locations:
print(f"πΆ No faces found in: {image_path.name}")
del img_rgb
gc.collect()
return [], []
print(f"π€ Found {len(face_locations)} face(s) in: {image_path.name}")
# Get encodings
face_encodings_list = face_recognition.face_encodings(img_rgb, face_locations)
# Clean up
del img_rgb
gc.collect()
return face_encodings_list, face_locations
except Exception as e:
print(f"β Error encoding {image_path.name}: {e}")
del img_rgb
gc.collect()
return [], []
def cluster_faces(encodings, image_paths, face_to_image_map):
"""
Cluster face encodings using DBSCAN.
Returns cluster labels for each face.
"""
if not encodings:
print("β οΈ No face encodings to cluster")
return []
print(f"π¬ Clustering {len(encodings)} face encodings...")
# Convert to numpy array
encodings_array = np.array(encodings)
# DBSCAN clustering
# metric='euclidean' is default, but face_recognition uses distance comparison
clustering = DBSCAN(
eps=DBSCAN_EPS,
min_samples=DBSCAN_MIN_SAMPLES,
metric='euclidean',
n_jobs=1 # Use 1 to prevent memory spikes in constrained environments
)
labels = clustering.fit_predict(encodings_array)
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = list(labels).count(-1)
print(f"β
Clustering complete: {n_clusters} unique persons, {n_noise} unclustered faces")
return labels
def organize_by_clusters(image_paths, face_encodings, face_labels, face_to_image_map):
"""
Map face clusters back to images and create organization structure.
"""
# Track which images belong to which clusters
image_to_clusters = defaultdict(set)
for face_idx, label in enumerate(face_labels):
if label == -1:
continue # Skip noise/unclustered
img_idx = face_to_image_map[face_idx]
image_to_clusters[img_idx].add(int(label))
# Create database structure
database = {
"metadata": {
"total_images": len(image_paths),
"total_faces_detected": len(face_encodings),
"unique_persons": len(set(face_labels)) - (1 if -1 in face_labels else 0),
"clustering_algorithm": "DBSCAN",
"parameters": {
"eps": DBSCAN_EPS,
"min_samples": DBSCAN_MIN_SAMPLES,
"max_image_width": MAX_WIDTH
}
},
"images": {},
"clusters": defaultdict(list)
}
# Map each image to its cluster assignments
for img_idx, img_path in enumerate(image_paths):
clusters = sorted(list(image_to_clusters.get(img_idx, [])))
# If image has no faces or unclustered faces, mark as "unknown"
if not clusters:
cluster_id = "unknown"
else:
# Use primary cluster (if multiple faces, pick first)
cluster_id = f"Person_{clusters[0]}"
database["images"][img_path.name] = {
"cluster_id": cluster_id,
"all_clusters": clusters if clusters else [],
"path": str(img_path)
}
if cluster_id != "unknown":
database["clusters"][cluster_id].append(img_path.name)
# Convert defaultdict to regular dict for JSON serialization
database["clusters"] = dict(database["clusters"])
return database
def create_output_folders(database, base_dir=ROOT_DIR):
"""
Create 'Person_X' folders and organize images (copy, don't move to preserve originals).
"""
output_base = base_dir / "organized_faces"
output_base.mkdir(exist_ok=True)
# Create folders for each cluster
for cluster_id in database["clusters"].keys():
cluster_folder = output_base / cluster_id
cluster_folder.mkdir(exist_ok=True)
# Copy images to respective folders
for filename in database["clusters"][cluster_id]:
src = ROOT_DIR / filename
dst = cluster_folder / filename
if src.exists():
# Use shutil.copy2 to preserve metadata
import shutil
shutil.copy2(src, dst)
print(f"π Copied {filename} β {cluster_folder.name}/")
# Create 'unknown' folder for unclustered images
unknown_folder = output_base / "unknown"
unknown_folder.mkdir(exist_ok=True)
for filename, data in database["images"].items():
if data["cluster_id"] == "unknown":
src = ROOT_DIR / filename
dst = unknown_folder / filename
if src.exists():
import shutil
shutil.copy2(src, dst)
print(f"π Copied {filename} β unknown/")
print(f"\nπ Organized images saved to: {output_base}")
return output_base
def main():
print("=" * 60)
print("π Memory-Efficient Face Recognition & Clustering")
print("=" * 60)
print(f"π Root directory: {ROOT_DIR.absolute()}")
print(f"πΌοΈ Max preprocessing width: {MAX_WIDTH}px")
print(f"π§ Aggressive GC enabled to prevent OOM")
print("=" * 60 + "\n")
# Step 1: Discover images
print("π Scanning for images...")
image_files = get_image_files(ROOT_DIR)
if not image_files:
print("β No images found in root directory!")
sys.exit(1)
print(f"β
Found {len(image_files)} images")
# Step 2: Extract face encodings (with memory management)
print("\nπ Extracting face encodings...")
all_encodings = []
face_to_image_map = [] # Maps face index to image index
for idx, img_path in enumerate(image_files):
print(f"\n[{idx+1}/{len(image_files)}] Processing: {img_path.name}")
encodings, locations = extract_face_encodings(img_path)
for enc in encodings:
all_encodings.append(enc)
face_to_image_map.append(idx)
# Force garbage collection after each image
gc.collect()
print(f"π§ Memory cleaned (GC invoked)")
if not all_encodings:
print("\nβ No faces detected in any images!")
sys.exit(1)
print(f"\nπ Total faces collected: {len(all_encodings)}")
# Step 3: Cluster faces
print("\n" + "=" * 60)
labels = cluster_faces(all_encodings, image_files, face_to_image_map)
# Step 4: Create database mapping
print("\nπ Creating database mapping...")
database = organize_by_clusters(image_files, all_encodings, labels, face_to_image_map)
# Step 5: Save JSON database
with open(OUTPUT_JSON, 'w') as f:
json.dump(database, f, indent=2)
print(f"πΎ Database saved: {OUTPUT_JSON}")
# Step 6: Create organized folders
print("\nπ Creating organized folders...")
output_dir = create_output_folders(database)
# Final summary
print("\n" + "=" * 60)
print("β
PROCESSING COMPLETE")
print("=" * 60)
print(f"π Summary:")
print(f" β’ Images processed: {database['metadata']['total_images']}")
print(f" β’ Faces detected: {database['metadata']['total_faces_detected']}")
print(f" β’ Unique persons: {database['metadata']['unique_persons']}")
print(f"\nπ Output:")
print(f" β’ JSON database: {OUTPUT_JSON}")
print(f" β’ Organized folders: {output_dir}")
print("\nπ Next steps:")
print(" β’ Check 'database.json' to review cluster assignments")
print(" β’ Review 'organized_faces/' folders")
print(" β’ Adjust DBSCAN_EPS parameter if clustering needs tuning")
print("=" * 60)
# Final cleanup
del all_encodings
gc.collect()
if __name__ == "__main__":
main()
|