#!/usr/bin/env python3 """ Memory-Efficient Face Recognition & Clustering for Hugging Face Spaces Handles 8 high-res DSLR images with preprocessing to prevent OOM (Exit Code 137) """ import os import sys import json import gc import cv2 import numpy as np from pathlib import Path from collections import defaultdict import face_recognition from sklearn.cluster import DBSCAN from PIL import Image import warnings warnings.filterwarnings('ignore') # Configuration ROOT_DIR = Path(".") IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'} MAX_WIDTH = 1000 # Resize to prevent memory overload DBSCAN_EPS = 0.4 # Distance threshold for clustering (tune based on your needs) DBSCAN_MIN_SAMPLES = 1 # Minimum samples to form a cluster OUTPUT_JSON = "database.json" def get_image_files(directory): """Scan directory for image files.""" image_files = [] for ext in IMAGE_EXTENSIONS: image_files.extend(directory.glob(f"*{ext}")) image_files.extend(directory.glob(f"*{ext.upper()}")) return sorted(list(set(image_files))) def preprocess_image(image_path, max_width=MAX_WIDTH): """ Memory-efficient preprocessing: 1. Open with OpenCV 2. Resize if width > max_width 3. Convert RGB for face_recognition 4. Force garbage collection """ try: # Read image with OpenCV (memory efficient) img = cv2.imread(str(image_path)) if img is None: print(f"āš ļø Could not load: {image_path.name}") return None height, width = img.shape[:2] # Resize if too large (prevents OOM) if width > max_width: scale = max_width / width new_width = int(width * scale) new_height = int(height * scale) img = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_AREA) print(f"šŸ“ Resized {image_path.name}: {width}x{height} → {new_width}x{new_height}") # Convert BGR (OpenCV) to RGB (face_recognition) img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # Clear original large image from memory immediately del img gc.collect() return img_rgb except Exception as e: print(f"āŒ Error processing {image_path.name}: {e}") return None def extract_face_encodings(image_path): """ Extract face encodings from a single image. Returns list of encodings (multiple faces possible per image). """ img_rgb = preprocess_image(image_path) if img_rgb is None: return [], [] try: # Detect face locations face_locations = face_recognition.face_locations(img_rgb, model="hog") # "hog" is faster/less memory than "cnn" if not face_locations: print(f"😶 No faces found in: {image_path.name}") del img_rgb gc.collect() return [], [] print(f"šŸ‘¤ Found {len(face_locations)} face(s) in: {image_path.name}") # Get encodings face_encodings_list = face_recognition.face_encodings(img_rgb, face_locations) # Clean up del img_rgb gc.collect() return face_encodings_list, face_locations except Exception as e: print(f"āŒ Error encoding {image_path.name}: {e}") del img_rgb gc.collect() return [], [] def cluster_faces(encodings, image_paths, face_to_image_map): """ Cluster face encodings using DBSCAN. Returns cluster labels for each face. """ if not encodings: print("āš ļø No face encodings to cluster") return [] print(f"šŸ”¬ Clustering {len(encodings)} face encodings...") # Convert to numpy array encodings_array = np.array(encodings) # DBSCAN clustering # metric='euclidean' is default, but face_recognition uses distance comparison clustering = DBSCAN( eps=DBSCAN_EPS, min_samples=DBSCAN_MIN_SAMPLES, metric='euclidean', n_jobs=1 # Use 1 to prevent memory spikes in constrained environments ) labels = clustering.fit_predict(encodings_array) n_clusters = len(set(labels)) - (1 if -1 in labels else 0) n_noise = list(labels).count(-1) print(f"āœ… Clustering complete: {n_clusters} unique persons, {n_noise} unclustered faces") return labels def organize_by_clusters(image_paths, face_encodings, face_labels, face_to_image_map): """ Map face clusters back to images and create organization structure. """ # Track which images belong to which clusters image_to_clusters = defaultdict(set) for face_idx, label in enumerate(face_labels): if label == -1: continue # Skip noise/unclustered img_idx = face_to_image_map[face_idx] image_to_clusters[img_idx].add(int(label)) # Create database structure database = { "metadata": { "total_images": len(image_paths), "total_faces_detected": len(face_encodings), "unique_persons": len(set(face_labels)) - (1 if -1 in face_labels else 0), "clustering_algorithm": "DBSCAN", "parameters": { "eps": DBSCAN_EPS, "min_samples": DBSCAN_MIN_SAMPLES, "max_image_width": MAX_WIDTH } }, "images": {}, "clusters": defaultdict(list) } # Map each image to its cluster assignments for img_idx, img_path in enumerate(image_paths): clusters = sorted(list(image_to_clusters.get(img_idx, []))) # If image has no faces or unclustered faces, mark as "unknown" if not clusters: cluster_id = "unknown" else: # Use primary cluster (if multiple faces, pick first) cluster_id = f"Person_{clusters[0]}" database["images"][img_path.name] = { "cluster_id": cluster_id, "all_clusters": clusters if clusters else [], "path": str(img_path) } if cluster_id != "unknown": database["clusters"][cluster_id].append(img_path.name) # Convert defaultdict to regular dict for JSON serialization database["clusters"] = dict(database["clusters"]) return database def create_output_folders(database, base_dir=ROOT_DIR): """ Create 'Person_X' folders and organize images (copy, don't move to preserve originals). """ output_base = base_dir / "organized_faces" output_base.mkdir(exist_ok=True) # Create folders for each cluster for cluster_id in database["clusters"].keys(): cluster_folder = output_base / cluster_id cluster_folder.mkdir(exist_ok=True) # Copy images to respective folders for filename in database["clusters"][cluster_id]: src = ROOT_DIR / filename dst = cluster_folder / filename if src.exists(): # Use shutil.copy2 to preserve metadata import shutil shutil.copy2(src, dst) print(f"šŸ“ Copied {filename} → {cluster_folder.name}/") # Create 'unknown' folder for unclustered images unknown_folder = output_base / "unknown" unknown_folder.mkdir(exist_ok=True) for filename, data in database["images"].items(): if data["cluster_id"] == "unknown": src = ROOT_DIR / filename dst = unknown_folder / filename if src.exists(): import shutil shutil.copy2(src, dst) print(f"šŸ“ Copied {filename} → unknown/") print(f"\nšŸ“‚ Organized images saved to: {output_base}") return output_base def main(): print("=" * 60) print("šŸš€ Memory-Efficient Face Recognition & Clustering") print("=" * 60) print(f"šŸ“ Root directory: {ROOT_DIR.absolute()}") print(f"šŸ–¼ļø Max preprocessing width: {MAX_WIDTH}px") print(f"🧠 Aggressive GC enabled to prevent OOM") print("=" * 60 + "\n") # Step 1: Discover images print("šŸ” Scanning for images...") image_files = get_image_files(ROOT_DIR) if not image_files: print("āŒ No images found in root directory!") sys.exit(1) print(f"āœ… Found {len(image_files)} images") # Step 2: Extract face encodings (with memory management) print("\nšŸ” Extracting face encodings...") all_encodings = [] face_to_image_map = [] # Maps face index to image index for idx, img_path in enumerate(image_files): print(f"\n[{idx+1}/{len(image_files)}] Processing: {img_path.name}") encodings, locations = extract_face_encodings(img_path) for enc in encodings: all_encodings.append(enc) face_to_image_map.append(idx) # Force garbage collection after each image gc.collect() print(f"🧠 Memory cleaned (GC invoked)") if not all_encodings: print("\nāŒ No faces detected in any images!") sys.exit(1) print(f"\nšŸ“Š Total faces collected: {len(all_encodings)}") # Step 3: Cluster faces print("\n" + "=" * 60) labels = cluster_faces(all_encodings, image_files, face_to_image_map) # Step 4: Create database mapping print("\nšŸ“‹ Creating database mapping...") database = organize_by_clusters(image_files, all_encodings, labels, face_to_image_map) # Step 5: Save JSON database with open(OUTPUT_JSON, 'w') as f: json.dump(database, f, indent=2) print(f"šŸ’¾ Database saved: {OUTPUT_JSON}") # Step 6: Create organized folders print("\nšŸ“ Creating organized folders...") output_dir = create_output_folders(database) # Final summary print("\n" + "=" * 60) print("āœ… PROCESSING COMPLETE") print("=" * 60) print(f"šŸ“Š Summary:") print(f" • Images processed: {database['metadata']['total_images']}") print(f" • Faces detected: {database['metadata']['total_faces_detected']}") print(f" • Unique persons: {database['metadata']['unique_persons']}") print(f"\nšŸ“‚ Output:") print(f" • JSON database: {OUTPUT_JSON}") print(f" • Organized folders: {output_dir}") print("\nšŸ“ Next steps:") print(" • Check 'database.json' to review cluster assignments") print(" • Review 'organized_faces/' folders") print(" • Adjust DBSCAN_EPS parameter if clustering needs tuning") print("=" * 60) # Final cleanup del all_encodings gc.collect() if __name__ == "__main__": main()