import os import cv2 import shutil import numpy as np from tqdm import tqdm from sklearn.cluster import DBSCAN from facenet_pytorch import InceptionResnetV1 import torch from torchvision import transforms from PIL import Image from facenet_pytorch import InceptionResnetV1 import torch import numpy as np from PIL import Image from torchvision import transforms from sklearn.cluster import DBSCAN import os import cv2 from tqdm import tqdm from facenet_pytorch import InceptionResnetV1 import torch import numpy as np from PIL import Image from torchvision import transforms from sklearn.cluster import DBSCAN import os import cv2 from tqdm import tqdm import os import pandas as pd import re import streamlit def enhance_image(img): # Convert to LAB and equalize lightness lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) l = cv2.equalizeHist(l) lab = cv2.merge((l, a, b)) return cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) def group_faces_by_identity_facenet_fixed( faces_folder: str = "output_faces", output_folder: str = "grouped_faces_facenet", eps_start: float = 0.01, eps_step: float = 0.01, eps_end: float = 0.99, num_identities: int = 2, log_params: bool = True, param_log_path: str = "grouping_params.txt", streamlit_progress=None, progress_range=(0, 100) ): # Create output folder early to avoid missing directory errors later os.makedirs(output_folder, exist_ok=True) # Gather image paths image_paths = sorted([ os.path.join(faces_folder, fname) for fname in os.listdir(faces_folder) if fname.lower().endswith((".jpg", ".png")) ]) if not image_paths: print("❌ No images found in faces folder.") return # Load FaceNet model model = InceptionResnetV1(pretrained='vggface2').eval() # Preprocessing transform transform = transforms.Compose([ transforms.Resize((160, 160)), transforms.ToTensor(), transforms.Normalize(mean=[0.5] * 3, std=[0.5] * 3) ]) print("📡 Extracting embeddings...") embeddings = [] total_images = len(image_paths) start, end = progress_range for i, path in enumerate(tqdm(image_paths, desc="Extracting embeddings")): if streamlit_progress: progress_value = start + (end - start) * (i + 1) / total_images streamlit_progress.progress(int(progress_value)) try: img = Image.open(path).convert('RGB') img_tensor = transform(img).unsqueeze(0) with torch.no_grad(): emb = model(img_tensor).squeeze().numpy() embeddings.append(emb) except Exception as e: print(f"⚠️ Skipping {path}: {e}") if not embeddings: print("❌ No valid images were embedded.") return embeddings = np.stack(embeddings) print("🔗 Clustering with varying eps...") best_result = None for current_eps in np.arange(eps_start, eps_end + eps_step, eps_step): clustering = DBSCAN(eps=current_eps, min_samples=2, metric='euclidean').fit(embeddings) labels = clustering.labels_ num_clusters = len(set(labels)) - (1 if -1 in labels else 0) num_unknowns = list(labels).count(-1) print(f"🔍 eps={current_eps:.3f} → {num_clusters} clusters, {num_unknowns} unknowns") if num_clusters == num_identities: if best_result is None or num_unknowns < best_result["unknowns"]: best_result = { "eps": current_eps, "labels": labels, "unknowns": num_unknowns, "clusters": num_clusters } if num_unknowns == 0: break if best_result is None: print(f"❌ No clustering resulted in exactly {num_identities} identities.") return False print(f"✅ Best result: eps={best_result['eps']:.3f}, clusters={best_result['clusters']}, unknowns={best_result['unknowns']}") labels = best_result["labels"] for path, label in zip(image_paths, labels): label_str = f"identity_{label}" if label != -1 else "unknown" identity_dir = os.path.join(output_folder, label_str) os.makedirs(identity_dir, exist_ok=True) fname = os.path.basename(path) cv2.imwrite(os.path.join(identity_dir, fname), cv2.imread(path)) if log_params: with open(param_log_path, "w") as f: f.write(f"Best eps: {best_result['eps']:.3f}\n") f.write(f"Identities: {best_result['clusters']}\n") f.write(f"Unknowns: {best_result['unknowns']}\n") print(f"📁 Saved results to: {output_folder}") if log_params: print(f"📝 Params logged to: {param_log_path}") return True def group_faces_by_identity_facenet_fixed_( faces_folder: str = "output_faces", output_folder: str = "grouped_faces_facenet", eps_start: float = 0.01, eps_step: float = 0.01, eps_end: float = 0.99, num_identities: int = 2, log_params: bool = True, param_log_path: str = "grouping_params.txt", streamlit_progress=None, progress_range=(0, 100) ): # Load FaceNet model = InceptionResnetV1(pretrained='vggface2').eval() # Preprocess transform = transforms.Compose([ transforms.Resize((160, 160)), transforms.ToTensor(), transforms.Normalize(mean=[0.5] * 3, std=[0.5] * 3) ]) print("📡 Extracting embeddings...") embeddings,image_paths = [],[] total_images = len(image_paths) start, end = progress_range for i, path in enumerate(tqdm(image_paths, desc="Extracting embeddings")): if streamlit_progress: progress_value = start + (end - start) * (i + 1) / total_images streamlit_progress.progress(int(progress_value)) try: img = Image.open(path).convert('RGB') img_tensor = transform(img).unsqueeze(0) with torch.no_grad(): emb = model(img_tensor).squeeze().numpy() embeddings.append(emb) except Exception as e: print(f"⚠️ Skipping {path}: {e}") continue if not embeddings: print("❌ No valid images found.") return embeddings = np.stack(embeddings) # Try all eps values print("🔗 Clustering with varying eps...") best_result = None for i, current_eps in enumerate(np.arange(eps_start, eps_end + eps_step, eps_step)): clustering = DBSCAN(eps=current_eps, min_samples=2, metric='euclidean').fit(embeddings) labels = clustering.labels_ num_clusters = len(set(labels)) - (1 if -1 in labels else 0) num_unknowns = list(labels).count(-1) print(f"🔍 eps={current_eps:.3f} → {num_clusters} clusters, {num_unknowns} unknowns") # Check if this is best so far if num_clusters == num_identities: if best_result is None or num_unknowns < best_result["unknowns"]: best_result = { "eps": current_eps, "labels": labels, "unknowns": num_unknowns, "clusters": num_clusters } if num_unknowns == 0: break # Ideal case if best_result is None: print(f"❌ No clustering resulted in exactly {num_identities} identities.") return # Save grouped faces print(f"✅ Best result: eps={best_result['eps']:.3f}, clusters={best_result['clusters']}, unknowns={best_result['unknowns']}") labels = best_result["labels"] os.makedirs(output_folder, exist_ok=True) for path, label in zip(image_paths, labels): label_str = f"identity_{label}" if label != -1 else "unknown" identity_dir = os.path.join(output_folder, label_str) os.makedirs(identity_dir, exist_ok=True) fname = os.path.basename(path) cv2.imwrite(os.path.join(identity_dir, fname), cv2.imread(path)) if log_params: with open(param_log_path, "w") as f: f.write(f"Best eps: {best_result['eps']:.3f}\n") f.write(f"Identities: {best_result['clusters']}\n") f.write(f"Unknowns: {best_result['unknowns']}\n") print(f"📁 Saved results to: {output_folder}") if log_params: print(f"📝 Params logged to: {param_log_path}") def group_faces_by_identity_facenet( faces_folder: str = "output_faces", output_folder: str = "grouped_faces_facenet", eps: float = 0.6, num_identities: int = 1 ): # Clear existing output folder if os.path.exists(output_folder): shutil.rmtree(output_folder) os.makedirs(output_folder, exist_ok=True) # Initialize FaceNet model model = InceptionResnetV1(pretrained='vggface2').eval() embeddings = [] image_paths = [] transform = transforms.Compose([ transforms.Resize((160, 160)), # FaceNet expects 160x160 input transforms.ToTensor(), transforms.Normalize(mean=[0.5]*3, std=[0.5]*3) ]) print("📡 Extracting embeddings...") for fname in tqdm(sorted(os.listdir(faces_folder))): path = os.path.join(faces_folder, fname) try: img = Image.open(path).convert('RGB') img_tensor = transform(img).unsqueeze(0) with torch.no_grad(): emb = model(img_tensor).squeeze().numpy() embeddings.append(emb) image_paths.append(path) except Exception as e: print(f"⚠️ Skipping {path}: {e}") if not embeddings: print("❌ No valid images found for embedding.") return embeddings = np.stack(embeddings) print("🔗 Clustering...") clustering = DBSCAN(eps=eps, min_samples=2, metric='euclidean').fit(embeddings) labels = clustering.labels_ for path, label in zip(image_paths, labels): label_str = f"identity_{label}" if label != -1 else "unknown" identity_dir = os.path.join(output_folder, label_str) os.makedirs(identity_dir, exist_ok=True) fname = os.path.basename(path) cv2.imwrite(os.path.join(identity_dir, fname), cv2.imread(path)) num_clusters = len(set(labels)) - (1 if -1 in labels else 0) print(f"✅ Grouped {len(image_paths)} images into {num_clusters} identities.") def split_csv_by_identity( grouped_folder: str = "grouped_faces_facenet", original_csv: str = "meta_data/frames_detections.csv", output_dir: str = "meta_data/identity_csvs" ): """ Creates separate CSVs for each identity based on grouped face images. Args: grouped_folder (str): Path to grouped identity folders (e.g., identity_0/, identity_1/). original_csv (str): CSV file with all detections. output_dir (str): Output directory to save identity-specific CSVs. """ # Clear existing output CSV folder if os.path.exists(output_dir): shutil.rmtree(output_dir) os.makedirs(output_dir, exist_ok=True) df = pd.read_csv(original_csv) # Index for fast lookup df_lookup = {} for i, row in df.iterrows(): key = f"frame_{int(row['frame'])}_face_{i}.jpg" df_lookup[key] = row for identity_name in os.listdir(grouped_folder): identity_path = os.path.join(grouped_folder, identity_name) if not os.path.isdir(identity_path): continue identity_rows = [] for fname in os.listdir(identity_path): if not fname.endswith(".jpg"): continue match = re.match(r"frame_(\d+)_face_(\d+)\.jpg", fname) if not match: print(f"⚠️ Could not parse filename: {fname}") continue key = fname if key in df_lookup: identity_rows.append(df_lookup[key]) else: print(f"⚠️ No match found in CSV for {fname}") if identity_rows: out_csv_path = os.path.join(output_dir, f"{identity_name}.csv") pd.DataFrame(identity_rows).to_csv(out_csv_path, index=False) print(f"✅ Saved {len(identity_rows)} rows to {out_csv_path}") else: print(f"⚠️ No valid entries for {identity_name}") def test(): group_faces_by_identity("output_faces", "grouped_faces") if __name__ == "__main__": test()