images / app.py
nexacore's picture
Update app.py
c76265d verified
#!/usr/bin/env python3
"""
Memory-Efficient Face Recognition & Clustering for Hugging Face Spaces
Handles 8 high-res DSLR images with preprocessing to prevent OOM (Exit Code 137)
"""
import os
import sys
import json
import gc
import cv2
import numpy as np
from pathlib import Path
from collections import defaultdict
import face_recognition
from sklearn.cluster import DBSCAN
from PIL import Image
import warnings
warnings.filterwarnings('ignore')
# Configuration
ROOT_DIR = Path(".")
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'}
MAX_WIDTH = 1000 # Resize to prevent memory overload
DBSCAN_EPS = 0.4 # Distance threshold for clustering (tune based on your needs)
DBSCAN_MIN_SAMPLES = 1 # Minimum samples to form a cluster
OUTPUT_JSON = "database.json"
def get_image_files(directory):
"""Scan directory for image files."""
image_files = []
for ext in IMAGE_EXTENSIONS:
image_files.extend(directory.glob(f"*{ext}"))
image_files.extend(directory.glob(f"*{ext.upper()}"))
return sorted(list(set(image_files)))
def preprocess_image(image_path, max_width=MAX_WIDTH):
"""
Memory-efficient preprocessing:
1. Open with OpenCV
2. Resize if width > max_width
3. Convert RGB for face_recognition
4. Force garbage collection
"""
try:
# Read image with OpenCV (memory efficient)
img = cv2.imread(str(image_path))
if img is None:
print(f"⚠️ Could not load: {image_path.name}")
return None
height, width = img.shape[:2]
# Resize if too large (prevents OOM)
if width > max_width:
scale = max_width / width
new_width = int(width * scale)
new_height = int(height * scale)
img = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_AREA)
print(f"πŸ“ Resized {image_path.name}: {width}x{height} β†’ {new_width}x{new_height}")
# Convert BGR (OpenCV) to RGB (face_recognition)
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# Clear original large image from memory immediately
del img
gc.collect()
return img_rgb
except Exception as e:
print(f"❌ Error processing {image_path.name}: {e}")
return None
def extract_face_encodings(image_path):
"""
Extract face encodings from a single image.
Returns list of encodings (multiple faces possible per image).
"""
img_rgb = preprocess_image(image_path)
if img_rgb is None:
return [], []
try:
# Detect face locations
face_locations = face_recognition.face_locations(img_rgb, model="hog") # "hog" is faster/less memory than "cnn"
if not face_locations:
print(f"😢 No faces found in: {image_path.name}")
del img_rgb
gc.collect()
return [], []
print(f"πŸ‘€ Found {len(face_locations)} face(s) in: {image_path.name}")
# Get encodings
face_encodings_list = face_recognition.face_encodings(img_rgb, face_locations)
# Clean up
del img_rgb
gc.collect()
return face_encodings_list, face_locations
except Exception as e:
print(f"❌ Error encoding {image_path.name}: {e}")
del img_rgb
gc.collect()
return [], []
def cluster_faces(encodings, image_paths, face_to_image_map):
"""
Cluster face encodings using DBSCAN.
Returns cluster labels for each face.
"""
if not encodings:
print("⚠️ No face encodings to cluster")
return []
print(f"πŸ”¬ Clustering {len(encodings)} face encodings...")
# Convert to numpy array
encodings_array = np.array(encodings)
# DBSCAN clustering
# metric='euclidean' is default, but face_recognition uses distance comparison
clustering = DBSCAN(
eps=DBSCAN_EPS,
min_samples=DBSCAN_MIN_SAMPLES,
metric='euclidean',
n_jobs=1 # Use 1 to prevent memory spikes in constrained environments
)
labels = clustering.fit_predict(encodings_array)
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = list(labels).count(-1)
print(f"βœ… Clustering complete: {n_clusters} unique persons, {n_noise} unclustered faces")
return labels
def organize_by_clusters(image_paths, face_encodings, face_labels, face_to_image_map):
"""
Map face clusters back to images and create organization structure.
"""
# Track which images belong to which clusters
image_to_clusters = defaultdict(set)
for face_idx, label in enumerate(face_labels):
if label == -1:
continue # Skip noise/unclustered
img_idx = face_to_image_map[face_idx]
image_to_clusters[img_idx].add(int(label))
# Create database structure
database = {
"metadata": {
"total_images": len(image_paths),
"total_faces_detected": len(face_encodings),
"unique_persons": len(set(face_labels)) - (1 if -1 in face_labels else 0),
"clustering_algorithm": "DBSCAN",
"parameters": {
"eps": DBSCAN_EPS,
"min_samples": DBSCAN_MIN_SAMPLES,
"max_image_width": MAX_WIDTH
}
},
"images": {},
"clusters": defaultdict(list)
}
# Map each image to its cluster assignments
for img_idx, img_path in enumerate(image_paths):
clusters = sorted(list(image_to_clusters.get(img_idx, [])))
# If image has no faces or unclustered faces, mark as "unknown"
if not clusters:
cluster_id = "unknown"
else:
# Use primary cluster (if multiple faces, pick first)
cluster_id = f"Person_{clusters[0]}"
database["images"][img_path.name] = {
"cluster_id": cluster_id,
"all_clusters": clusters if clusters else [],
"path": str(img_path)
}
if cluster_id != "unknown":
database["clusters"][cluster_id].append(img_path.name)
# Convert defaultdict to regular dict for JSON serialization
database["clusters"] = dict(database["clusters"])
return database
def create_output_folders(database, base_dir=ROOT_DIR):
"""
Create 'Person_X' folders and organize images (copy, don't move to preserve originals).
"""
output_base = base_dir / "organized_faces"
output_base.mkdir(exist_ok=True)
# Create folders for each cluster
for cluster_id in database["clusters"].keys():
cluster_folder = output_base / cluster_id
cluster_folder.mkdir(exist_ok=True)
# Copy images to respective folders
for filename in database["clusters"][cluster_id]:
src = ROOT_DIR / filename
dst = cluster_folder / filename
if src.exists():
# Use shutil.copy2 to preserve metadata
import shutil
shutil.copy2(src, dst)
print(f"πŸ“ Copied {filename} β†’ {cluster_folder.name}/")
# Create 'unknown' folder for unclustered images
unknown_folder = output_base / "unknown"
unknown_folder.mkdir(exist_ok=True)
for filename, data in database["images"].items():
if data["cluster_id"] == "unknown":
src = ROOT_DIR / filename
dst = unknown_folder / filename
if src.exists():
import shutil
shutil.copy2(src, dst)
print(f"πŸ“ Copied {filename} β†’ unknown/")
print(f"\nπŸ“‚ Organized images saved to: {output_base}")
return output_base
def main():
print("=" * 60)
print("πŸš€ Memory-Efficient Face Recognition & Clustering")
print("=" * 60)
print(f"πŸ“ Root directory: {ROOT_DIR.absolute()}")
print(f"πŸ–ΌοΈ Max preprocessing width: {MAX_WIDTH}px")
print(f"🧠 Aggressive GC enabled to prevent OOM")
print("=" * 60 + "\n")
# Step 1: Discover images
print("πŸ” Scanning for images...")
image_files = get_image_files(ROOT_DIR)
if not image_files:
print("❌ No images found in root directory!")
sys.exit(1)
print(f"βœ… Found {len(image_files)} images")
# Step 2: Extract face encodings (with memory management)
print("\nπŸ” Extracting face encodings...")
all_encodings = []
face_to_image_map = [] # Maps face index to image index
for idx, img_path in enumerate(image_files):
print(f"\n[{idx+1}/{len(image_files)}] Processing: {img_path.name}")
encodings, locations = extract_face_encodings(img_path)
for enc in encodings:
all_encodings.append(enc)
face_to_image_map.append(idx)
# Force garbage collection after each image
gc.collect()
print(f"🧠 Memory cleaned (GC invoked)")
if not all_encodings:
print("\n❌ No faces detected in any images!")
sys.exit(1)
print(f"\nπŸ“Š Total faces collected: {len(all_encodings)}")
# Step 3: Cluster faces
print("\n" + "=" * 60)
labels = cluster_faces(all_encodings, image_files, face_to_image_map)
# Step 4: Create database mapping
print("\nπŸ“‹ Creating database mapping...")
database = organize_by_clusters(image_files, all_encodings, labels, face_to_image_map)
# Step 5: Save JSON database
with open(OUTPUT_JSON, 'w') as f:
json.dump(database, f, indent=2)
print(f"πŸ’Ύ Database saved: {OUTPUT_JSON}")
# Step 6: Create organized folders
print("\nπŸ“ Creating organized folders...")
output_dir = create_output_folders(database)
# Final summary
print("\n" + "=" * 60)
print("βœ… PROCESSING COMPLETE")
print("=" * 60)
print(f"πŸ“Š Summary:")
print(f" β€’ Images processed: {database['metadata']['total_images']}")
print(f" β€’ Faces detected: {database['metadata']['total_faces_detected']}")
print(f" β€’ Unique persons: {database['metadata']['unique_persons']}")
print(f"\nπŸ“‚ Output:")
print(f" β€’ JSON database: {OUTPUT_JSON}")
print(f" β€’ Organized folders: {output_dir}")
print("\nπŸ“ Next steps:")
print(" β€’ Check 'database.json' to review cluster assignments")
print(" β€’ Review 'organized_faces/' folders")
print(" β€’ Adjust DBSCAN_EPS parameter if clustering needs tuning")
print("=" * 60)
# Final cleanup
del all_encodings
gc.collect()
if __name__ == "__main__":
main()