| | |
| | """ |
| | Example script to test the face clustering system |
| | Demonstrates how to use the API programmatically |
| | """ |
| |
|
| | import json |
| | from pathlib import Path |
| | from face_clustering import MemoryEfficientFaceClustering |
| |
|
| |
|
| | def analyze_results(database_path: str = "database.json"): |
| | """ |
| | Analyze and display clustering results from database.json |
| | |
| | Args: |
| | database_path: Path to the database.json file |
| | """ |
| | with open(database_path, 'r') as f: |
| | data = json.load(f) |
| | |
| | print("\n" + "=" * 60) |
| | print("CLUSTERING ANALYSIS") |
| | print("=" * 60) |
| | |
| | |
| | metadata = data.get("metadata", {}) |
| | print("\n📊 OVERALL STATISTICS:") |
| | print(f" Total images processed: {metadata.get('total_images', 0)}") |
| | print(f" Images with faces: {metadata.get('images_with_faces', 0)}") |
| | print(f" Unique people identified: {metadata.get('num_clusters', 0)}") |
| | print(f" Clustering threshold (eps): {metadata.get('dbscan_eps', 'N/A')}") |
| | |
| | |
| | clusters = {} |
| | images_data = data.get("images", {}) |
| | |
| | for filename, info in images_data.items(): |
| | cluster_name = info.get("cluster_name", "Unknown") |
| | if cluster_name not in clusters: |
| | clusters[cluster_name] = [] |
| | clusters[cluster_name].append(filename) |
| | |
| | |
| | print(f"\n👥 CLUSTER BREAKDOWN:") |
| | for cluster_name in sorted(clusters.keys()): |
| | images = clusters[cluster_name] |
| | print(f"\n 📁 {cluster_name} ({len(images)} images):") |
| | for img in sorted(images): |
| | print(f" • {img}") |
| | |
| | |
| | print("\n" + "=" * 60) |
| | print("💡 RECOMMENDATIONS") |
| | print("=" * 60) |
| | |
| | num_clusters = metadata.get('num_clusters', 0) |
| | num_faces = metadata.get('images_with_faces', 0) |
| | |
| | if num_clusters == 0 and num_faces > 0: |
| | print("⚠️ All faces clustered as noise/outliers") |
| | print(" Try INCREASING eps value (e.g., 0.6 or 0.65)") |
| | elif num_clusters > num_faces * 0.7: |
| | print("⚠️ Too many clusters (many single-image clusters)") |
| | print(" Try INCREASING eps value to merge similar faces") |
| | elif num_clusters < num_faces * 0.2: |
| | print("⚠️ Very few clusters (might be merging different people)") |
| | print(" Try DECREASING eps value for stricter matching") |
| | else: |
| | print("✅ Clustering looks good!") |
| | |
| | print("\n") |
| |
|
| |
|
| | def rename_files_from_database(database_path: str = "database.json", dry_run: bool = True): |
| | """ |
| | Example: Rename files based on cluster assignments |
| | |
| | Args: |
| | database_path: Path to database.json |
| | dry_run: If True, only print what would be renamed (don't actually rename) |
| | """ |
| | with open(database_path, 'r') as f: |
| | data = json.load(f) |
| | |
| | print("\n" + "=" * 60) |
| | print("FILE RENAMING" + (" (DRY RUN)" if dry_run else "")) |
| | print("=" * 60) |
| | |
| | images_data = data.get("images", {}) |
| | |
| | for filename, info in images_data.items(): |
| | cluster_id = info.get("cluster_id", -1) |
| | cluster_name = info.get("cluster_name", "Unknown") |
| | output_path = info.get("output_path", "") |
| | |
| | if cluster_id >= 0: |
| | |
| | old_path = Path("clustered_faces") / cluster_name / filename |
| | stem = Path(filename).stem |
| | ext = Path(filename).suffix |
| | new_name = f"{cluster_name}_{stem}{ext}" |
| | new_path = old_path.parent / new_name |
| | |
| | if dry_run: |
| | print(f"Would rename: {old_path} -> {new_path.name}") |
| | else: |
| | if old_path.exists(): |
| | old_path.rename(new_path) |
| | print(f"Renamed: {old_path.name} -> {new_path.name}") |
| | else: |
| | print(f"Skipped: {filename} (cluster_id: {cluster_id})") |
| | |
| | if dry_run: |
| | print("\nℹ️ This was a dry run. Set dry_run=False to actually rename files.") |
| |
|
| |
|
| | def export_cluster_to_list(database_path: str = "database.json", cluster_id: int = 0): |
| | """ |
| | Export all filenames from a specific cluster |
| | |
| | Args: |
| | database_path: Path to database.json |
| | cluster_id: Which cluster to export (0, 1, 2, etc.) |
| | |
| | Returns: |
| | List of filenames in that cluster |
| | """ |
| | with open(database_path, 'r') as f: |
| | data = json.load(f) |
| | |
| | images_data = data.get("images", {}) |
| | cluster_images = [ |
| | filename for filename, info in images_data.items() |
| | if info.get("cluster_id") == cluster_id |
| | ] |
| | |
| | print(f"\n📋 Images in cluster {cluster_id} (Person_{cluster_id}):") |
| | for img in cluster_images: |
| | print(f" • {img}") |
| | |
| | return cluster_images |
| |
|
| |
|
| | def main(): |
| | """ |
| | Example usage of the clustering system |
| | """ |
| | print("=" * 60) |
| | print("FACE CLUSTERING - EXAMPLE USAGE") |
| | print("=" * 60) |
| | |
| | |
| | print("\n1️⃣ Running face clustering...") |
| | print(" (Make sure you have images in the current directory)") |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | print(" [Skipped - uncomment code to run]") |
| | |
| | |
| | db_path = "database.json" |
| | if Path(db_path).exists(): |
| | print(f"\n2️⃣ Analyzing results from {db_path}...") |
| | analyze_results(db_path) |
| | |
| | |
| | print("\n3️⃣ Demonstrating file renaming...") |
| | rename_files_from_database(db_path, dry_run=True) |
| | |
| | |
| | print("\n4️⃣ Exporting cluster 0...") |
| | export_cluster_to_list(db_path, cluster_id=0) |
| | else: |
| | print(f"\n⚠️ {db_path} not found. Run clustering first!") |
| | |
| | print("\n" + "=" * 60) |
| | print("Example complete!") |
| | print("=" * 60) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|