| | |
| | """ |
| | Memory-Efficient Face Recognition & Clustering for Hugging Face Spaces |
| | Handles 8 high-res DSLR images with preprocessing to prevent OOM (Exit Code 137) |
| | """ |
| |
|
| | import os |
| | import sys |
| | import json |
| | import gc |
| | import cv2 |
| | import numpy as np |
| | from pathlib import Path |
| | from collections import defaultdict |
| | import face_recognition |
| | from sklearn.cluster import DBSCAN |
| | from PIL import Image |
| | import warnings |
| | warnings.filterwarnings('ignore') |
| |
|
| | |
| | ROOT_DIR = Path(".") |
| | IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'} |
| | MAX_WIDTH = 1000 |
| | DBSCAN_EPS = 0.4 |
| | DBSCAN_MIN_SAMPLES = 1 |
| | OUTPUT_JSON = "database.json" |
| |
|
| |
|
| | def get_image_files(directory): |
| | """Scan directory for image files.""" |
| | image_files = [] |
| | for ext in IMAGE_EXTENSIONS: |
| | image_files.extend(directory.glob(f"*{ext}")) |
| | image_files.extend(directory.glob(f"*{ext.upper()}")) |
| | return sorted(list(set(image_files))) |
| |
|
| |
|
| | def preprocess_image(image_path, max_width=MAX_WIDTH): |
| | """ |
| | Memory-efficient preprocessing: |
| | 1. Open with OpenCV |
| | 2. Resize if width > max_width |
| | 3. Convert RGB for face_recognition |
| | 4. Force garbage collection |
| | """ |
| | try: |
| | |
| | img = cv2.imread(str(image_path)) |
| | if img is None: |
| | print(f"β οΈ Could not load: {image_path.name}") |
| | return None |
| |
|
| | height, width = img.shape[:2] |
| |
|
| | |
| | if width > max_width: |
| | scale = max_width / width |
| | new_width = int(width * scale) |
| | new_height = int(height * scale) |
| | img = cv2.resize(img, (new_width, new_height), interpolation=cv2.INTER_AREA) |
| | print(f"π Resized {image_path.name}: {width}x{height} β {new_width}x{new_height}") |
| |
|
| | |
| | img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
| |
|
| | |
| | del img |
| | gc.collect() |
| |
|
| | return img_rgb |
| |
|
| | except Exception as e: |
| | print(f"β Error processing {image_path.name}: {e}") |
| | return None |
| |
|
| |
|
| | def extract_face_encodings(image_path): |
| | """ |
| | Extract face encodings from a single image. |
| | Returns list of encodings (multiple faces possible per image). |
| | """ |
| | img_rgb = preprocess_image(image_path) |
| | if img_rgb is None: |
| | return [], [] |
| |
|
| | try: |
| | |
| | face_locations = face_recognition.face_locations(img_rgb, model="hog") |
| |
|
| | if not face_locations: |
| | print(f"πΆ No faces found in: {image_path.name}") |
| | del img_rgb |
| | gc.collect() |
| | return [], [] |
| |
|
| | print(f"π€ Found {len(face_locations)} face(s) in: {image_path.name}") |
| |
|
| | |
| | face_encodings_list = face_recognition.face_encodings(img_rgb, face_locations) |
| |
|
| | |
| | del img_rgb |
| | gc.collect() |
| |
|
| | return face_encodings_list, face_locations |
| |
|
| | except Exception as e: |
| | print(f"β Error encoding {image_path.name}: {e}") |
| | del img_rgb |
| | gc.collect() |
| | return [], [] |
| |
|
| |
|
| | def cluster_faces(encodings, image_paths, face_to_image_map): |
| | """ |
| | Cluster face encodings using DBSCAN. |
| | Returns cluster labels for each face. |
| | """ |
| | if not encodings: |
| | print("β οΈ No face encodings to cluster") |
| | return [] |
| |
|
| | print(f"π¬ Clustering {len(encodings)} face encodings...") |
| |
|
| | |
| | encodings_array = np.array(encodings) |
| |
|
| | |
| | |
| | clustering = DBSCAN( |
| | eps=DBSCAN_EPS, |
| | min_samples=DBSCAN_MIN_SAMPLES, |
| | metric='euclidean', |
| | n_jobs=1 |
| | ) |
| |
|
| | labels = clustering.fit_predict(encodings_array) |
| |
|
| | n_clusters = len(set(labels)) - (1 if -1 in labels else 0) |
| | n_noise = list(labels).count(-1) |
| |
|
| | print(f"β
Clustering complete: {n_clusters} unique persons, {n_noise} unclustered faces") |
| |
|
| | return labels |
| |
|
| |
|
| | def organize_by_clusters(image_paths, face_encodings, face_labels, face_to_image_map): |
| | """ |
| | Map face clusters back to images and create organization structure. |
| | """ |
| | |
| | image_to_clusters = defaultdict(set) |
| |
|
| | for face_idx, label in enumerate(face_labels): |
| | if label == -1: |
| | continue |
| | img_idx = face_to_image_map[face_idx] |
| | image_to_clusters[img_idx].add(int(label)) |
| |
|
| | |
| | database = { |
| | "metadata": { |
| | "total_images": len(image_paths), |
| | "total_faces_detected": len(face_encodings), |
| | "unique_persons": len(set(face_labels)) - (1 if -1 in face_labels else 0), |
| | "clustering_algorithm": "DBSCAN", |
| | "parameters": { |
| | "eps": DBSCAN_EPS, |
| | "min_samples": DBSCAN_MIN_SAMPLES, |
| | "max_image_width": MAX_WIDTH |
| | } |
| | }, |
| | "images": {}, |
| | "clusters": defaultdict(list) |
| | } |
| |
|
| | |
| | for img_idx, img_path in enumerate(image_paths): |
| | clusters = sorted(list(image_to_clusters.get(img_idx, []))) |
| |
|
| | |
| | if not clusters: |
| | cluster_id = "unknown" |
| | else: |
| | |
| | cluster_id = f"Person_{clusters[0]}" |
| |
|
| | database["images"][img_path.name] = { |
| | "cluster_id": cluster_id, |
| | "all_clusters": clusters if clusters else [], |
| | "path": str(img_path) |
| | } |
| |
|
| | if cluster_id != "unknown": |
| | database["clusters"][cluster_id].append(img_path.name) |
| |
|
| | |
| | database["clusters"] = dict(database["clusters"]) |
| |
|
| | return database |
| |
|
| |
|
| | def create_output_folders(database, base_dir=ROOT_DIR): |
| | """ |
| | Create 'Person_X' folders and organize images (copy, don't move to preserve originals). |
| | """ |
| | output_base = base_dir / "organized_faces" |
| | output_base.mkdir(exist_ok=True) |
| |
|
| | |
| | for cluster_id in database["clusters"].keys(): |
| | cluster_folder = output_base / cluster_id |
| | cluster_folder.mkdir(exist_ok=True) |
| |
|
| | |
| | for filename in database["clusters"][cluster_id]: |
| | src = ROOT_DIR / filename |
| | dst = cluster_folder / filename |
| | if src.exists(): |
| | |
| | import shutil |
| | shutil.copy2(src, dst) |
| | print(f"π Copied {filename} β {cluster_folder.name}/") |
| |
|
| | |
| | unknown_folder = output_base / "unknown" |
| | unknown_folder.mkdir(exist_ok=True) |
| |
|
| | for filename, data in database["images"].items(): |
| | if data["cluster_id"] == "unknown": |
| | src = ROOT_DIR / filename |
| | dst = unknown_folder / filename |
| | if src.exists(): |
| | import shutil |
| | shutil.copy2(src, dst) |
| | print(f"π Copied {filename} β unknown/") |
| |
|
| | print(f"\nπ Organized images saved to: {output_base}") |
| | return output_base |
| |
|
| |
|
| | def main(): |
| | print("=" * 60) |
| | print("π Memory-Efficient Face Recognition & Clustering") |
| | print("=" * 60) |
| | print(f"π Root directory: {ROOT_DIR.absolute()}") |
| | print(f"πΌοΈ Max preprocessing width: {MAX_WIDTH}px") |
| | print(f"π§ Aggressive GC enabled to prevent OOM") |
| | print("=" * 60 + "\n") |
| |
|
| | |
| | print("π Scanning for images...") |
| | image_files = get_image_files(ROOT_DIR) |
| |
|
| | if not image_files: |
| | print("β No images found in root directory!") |
| | sys.exit(1) |
| |
|
| | print(f"β
Found {len(image_files)} images") |
| |
|
| | |
| | print("\nπ Extracting face encodings...") |
| | all_encodings = [] |
| | face_to_image_map = [] |
| |
|
| | for idx, img_path in enumerate(image_files): |
| | print(f"\n[{idx+1}/{len(image_files)}] Processing: {img_path.name}") |
| |
|
| | encodings, locations = extract_face_encodings(img_path) |
| |
|
| | for enc in encodings: |
| | all_encodings.append(enc) |
| | face_to_image_map.append(idx) |
| |
|
| | |
| | gc.collect() |
| | print(f"π§ Memory cleaned (GC invoked)") |
| |
|
| | if not all_encodings: |
| | print("\nβ No faces detected in any images!") |
| | sys.exit(1) |
| |
|
| | print(f"\nπ Total faces collected: {len(all_encodings)}") |
| |
|
| | |
| | print("\n" + "=" * 60) |
| | labels = cluster_faces(all_encodings, image_files, face_to_image_map) |
| |
|
| | |
| | print("\nπ Creating database mapping...") |
| | database = organize_by_clusters(image_files, all_encodings, labels, face_to_image_map) |
| |
|
| | |
| | with open(OUTPUT_JSON, 'w') as f: |
| | json.dump(database, f, indent=2) |
| | print(f"πΎ Database saved: {OUTPUT_JSON}") |
| |
|
| | |
| | print("\nπ Creating organized folders...") |
| | output_dir = create_output_folders(database) |
| |
|
| | |
| | print("\n" + "=" * 60) |
| | print("β
PROCESSING COMPLETE") |
| | print("=" * 60) |
| | print(f"π Summary:") |
| | print(f" β’ Images processed: {database['metadata']['total_images']}") |
| | print(f" β’ Faces detected: {database['metadata']['total_faces_detected']}") |
| | print(f" β’ Unique persons: {database['metadata']['unique_persons']}") |
| | print(f"\nπ Output:") |
| | print(f" β’ JSON database: {OUTPUT_JSON}") |
| | print(f" β’ Organized folders: {output_dir}") |
| | print("\nπ Next steps:") |
| | print(" β’ Check 'database.json' to review cluster assignments") |
| | print(" β’ Review 'organized_faces/' folders") |
| | print(" β’ Adjust DBSCAN_EPS parameter if clustering needs tuning") |
| | print("=" * 60) |
| |
|
| | |
| | del all_encodings |
| | gc.collect() |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|