|
|
import os |
|
|
import cv2 |
|
|
import shutil |
|
|
|
|
|
import numpy as np |
|
|
from tqdm import tqdm |
|
|
from sklearn.cluster import DBSCAN |
|
|
from facenet_pytorch import InceptionResnetV1 |
|
|
import torch |
|
|
from torchvision import transforms |
|
|
from PIL import Image |
|
|
from facenet_pytorch import InceptionResnetV1 |
|
|
import torch |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
from torchvision import transforms |
|
|
from sklearn.cluster import DBSCAN |
|
|
import os |
|
|
import cv2 |
|
|
from tqdm import tqdm |
|
|
from facenet_pytorch import InceptionResnetV1 |
|
|
import torch |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
from torchvision import transforms |
|
|
from sklearn.cluster import DBSCAN |
|
|
import os |
|
|
import cv2 |
|
|
from tqdm import tqdm |
|
|
import os |
|
|
import pandas as pd |
|
|
import re |
|
|
import streamlit |
|
|
|
|
|
def enhance_image(img): |
|
|
|
|
|
lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) |
|
|
l, a, b = cv2.split(lab) |
|
|
l = cv2.equalizeHist(l) |
|
|
lab = cv2.merge((l, a, b)) |
|
|
return cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) |
|
|
|
|
|
def group_faces_by_identity_facenet_fixed( |
|
|
faces_folder: str = "output_faces", |
|
|
output_folder: str = "grouped_faces_facenet", |
|
|
eps_start: float = 0.01, |
|
|
eps_step: float = 0.01, |
|
|
eps_end: float = 0.99, |
|
|
num_identities: int = 2, |
|
|
log_params: bool = True, |
|
|
param_log_path: str = "grouping_params.txt", |
|
|
streamlit_progress=None, |
|
|
progress_range=(0, 100) |
|
|
): |
|
|
|
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
|
|
|
|
|
|
image_paths = sorted([ |
|
|
os.path.join(faces_folder, fname) |
|
|
for fname in os.listdir(faces_folder) |
|
|
if fname.lower().endswith((".jpg", ".png")) |
|
|
]) |
|
|
|
|
|
if not image_paths: |
|
|
print("β No images found in faces folder.") |
|
|
return |
|
|
|
|
|
|
|
|
model = InceptionResnetV1(pretrained='vggface2').eval() |
|
|
|
|
|
|
|
|
transform = transforms.Compose([ |
|
|
transforms.Resize((160, 160)), |
|
|
transforms.ToTensor(), |
|
|
transforms.Normalize(mean=[0.5] * 3, std=[0.5] * 3) |
|
|
]) |
|
|
|
|
|
print("π‘ Extracting embeddings...") |
|
|
embeddings = [] |
|
|
total_images = len(image_paths) |
|
|
start, end = progress_range |
|
|
|
|
|
for i, path in enumerate(tqdm(image_paths, desc="Extracting embeddings")): |
|
|
if streamlit_progress: |
|
|
progress_value = start + (end - start) * (i + 1) / total_images |
|
|
streamlit_progress.progress(int(progress_value)) |
|
|
try: |
|
|
img = Image.open(path).convert('RGB') |
|
|
img_tensor = transform(img).unsqueeze(0) |
|
|
with torch.no_grad(): |
|
|
emb = model(img_tensor).squeeze().numpy() |
|
|
embeddings.append(emb) |
|
|
except Exception as e: |
|
|
print(f"β οΈ Skipping {path}: {e}") |
|
|
|
|
|
if not embeddings: |
|
|
print("β No valid images were embedded.") |
|
|
return |
|
|
|
|
|
embeddings = np.stack(embeddings) |
|
|
|
|
|
print("π Clustering with varying eps...") |
|
|
best_result = None |
|
|
for current_eps in np.arange(eps_start, eps_end + eps_step, eps_step): |
|
|
clustering = DBSCAN(eps=current_eps, min_samples=2, metric='euclidean').fit(embeddings) |
|
|
labels = clustering.labels_ |
|
|
num_clusters = len(set(labels)) - (1 if -1 in labels else 0) |
|
|
num_unknowns = list(labels).count(-1) |
|
|
|
|
|
print(f"π eps={current_eps:.3f} β {num_clusters} clusters, {num_unknowns} unknowns") |
|
|
|
|
|
if num_clusters == num_identities: |
|
|
if best_result is None or num_unknowns < best_result["unknowns"]: |
|
|
best_result = { |
|
|
"eps": current_eps, |
|
|
"labels": labels, |
|
|
"unknowns": num_unknowns, |
|
|
"clusters": num_clusters |
|
|
} |
|
|
if num_unknowns == 0: |
|
|
break |
|
|
|
|
|
if best_result is None: |
|
|
print(f"β No clustering resulted in exactly {num_identities} identities.") |
|
|
return False |
|
|
|
|
|
print(f"β
Best result: eps={best_result['eps']:.3f}, clusters={best_result['clusters']}, unknowns={best_result['unknowns']}") |
|
|
labels = best_result["labels"] |
|
|
|
|
|
for path, label in zip(image_paths, labels): |
|
|
label_str = f"identity_{label}" if label != -1 else "unknown" |
|
|
identity_dir = os.path.join(output_folder, label_str) |
|
|
os.makedirs(identity_dir, exist_ok=True) |
|
|
fname = os.path.basename(path) |
|
|
cv2.imwrite(os.path.join(identity_dir, fname), cv2.imread(path)) |
|
|
|
|
|
if log_params: |
|
|
with open(param_log_path, "w") as f: |
|
|
f.write(f"Best eps: {best_result['eps']:.3f}\n") |
|
|
f.write(f"Identities: {best_result['clusters']}\n") |
|
|
f.write(f"Unknowns: {best_result['unknowns']}\n") |
|
|
|
|
|
print(f"π Saved results to: {output_folder}") |
|
|
if log_params: |
|
|
print(f"π Params logged to: {param_log_path}") |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
def group_faces_by_identity_facenet_fixed_( |
|
|
faces_folder: str = "output_faces", |
|
|
output_folder: str = "grouped_faces_facenet", |
|
|
eps_start: float = 0.01, |
|
|
eps_step: float = 0.01, |
|
|
eps_end: float = 0.99, |
|
|
num_identities: int = 2, |
|
|
log_params: bool = True, |
|
|
param_log_path: str = "grouping_params.txt", |
|
|
streamlit_progress=None, |
|
|
progress_range=(0, 100) |
|
|
): |
|
|
|
|
|
|
|
|
model = InceptionResnetV1(pretrained='vggface2').eval() |
|
|
|
|
|
|
|
|
transform = transforms.Compose([ |
|
|
transforms.Resize((160, 160)), |
|
|
transforms.ToTensor(), |
|
|
transforms.Normalize(mean=[0.5] * 3, std=[0.5] * 3) |
|
|
]) |
|
|
|
|
|
print("π‘ Extracting embeddings...") |
|
|
embeddings,image_paths = [],[] |
|
|
total_images = len(image_paths) |
|
|
start, end = progress_range |
|
|
|
|
|
for i, path in enumerate(tqdm(image_paths, desc="Extracting embeddings")): |
|
|
if streamlit_progress: |
|
|
progress_value = start + (end - start) * (i + 1) / total_images |
|
|
streamlit_progress.progress(int(progress_value)) |
|
|
try: |
|
|
img = Image.open(path).convert('RGB') |
|
|
img_tensor = transform(img).unsqueeze(0) |
|
|
with torch.no_grad(): |
|
|
emb = model(img_tensor).squeeze().numpy() |
|
|
embeddings.append(emb) |
|
|
except Exception as e: |
|
|
print(f"β οΈ Skipping {path}: {e}") |
|
|
continue |
|
|
|
|
|
if not embeddings: |
|
|
print("β No valid images found.") |
|
|
return |
|
|
|
|
|
embeddings = np.stack(embeddings) |
|
|
|
|
|
|
|
|
print("π Clustering with varying eps...") |
|
|
best_result = None |
|
|
for i, current_eps in enumerate(np.arange(eps_start, eps_end + eps_step, eps_step)): |
|
|
clustering = DBSCAN(eps=current_eps, min_samples=2, metric='euclidean').fit(embeddings) |
|
|
labels = clustering.labels_ |
|
|
num_clusters = len(set(labels)) - (1 if -1 in labels else 0) |
|
|
num_unknowns = list(labels).count(-1) |
|
|
|
|
|
print(f"π eps={current_eps:.3f} β {num_clusters} clusters, {num_unknowns} unknowns") |
|
|
|
|
|
|
|
|
if num_clusters == num_identities: |
|
|
if best_result is None or num_unknowns < best_result["unknowns"]: |
|
|
best_result = { |
|
|
"eps": current_eps, |
|
|
"labels": labels, |
|
|
"unknowns": num_unknowns, |
|
|
"clusters": num_clusters |
|
|
} |
|
|
if num_unknowns == 0: |
|
|
break |
|
|
|
|
|
if best_result is None: |
|
|
print(f"β No clustering resulted in exactly {num_identities} identities.") |
|
|
return |
|
|
|
|
|
|
|
|
print(f"β
Best result: eps={best_result['eps']:.3f}, clusters={best_result['clusters']}, unknowns={best_result['unknowns']}") |
|
|
labels = best_result["labels"] |
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
|
|
|
for path, label in zip(image_paths, labels): |
|
|
label_str = f"identity_{label}" if label != -1 else "unknown" |
|
|
identity_dir = os.path.join(output_folder, label_str) |
|
|
os.makedirs(identity_dir, exist_ok=True) |
|
|
fname = os.path.basename(path) |
|
|
cv2.imwrite(os.path.join(identity_dir, fname), cv2.imread(path)) |
|
|
|
|
|
if log_params: |
|
|
with open(param_log_path, "w") as f: |
|
|
f.write(f"Best eps: {best_result['eps']:.3f}\n") |
|
|
f.write(f"Identities: {best_result['clusters']}\n") |
|
|
f.write(f"Unknowns: {best_result['unknowns']}\n") |
|
|
|
|
|
print(f"π Saved results to: {output_folder}") |
|
|
if log_params: |
|
|
print(f"π Params logged to: {param_log_path}") |
|
|
|
|
|
|
|
|
def group_faces_by_identity_facenet( |
|
|
faces_folder: str = "output_faces", |
|
|
output_folder: str = "grouped_faces_facenet", |
|
|
eps: float = 0.6, |
|
|
num_identities: int = 1 |
|
|
): |
|
|
|
|
|
if os.path.exists(output_folder): |
|
|
shutil.rmtree(output_folder) |
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
|
|
|
|
|
|
model = InceptionResnetV1(pretrained='vggface2').eval() |
|
|
|
|
|
embeddings = [] |
|
|
image_paths = [] |
|
|
|
|
|
transform = transforms.Compose([ |
|
|
transforms.Resize((160, 160)), |
|
|
transforms.ToTensor(), |
|
|
transforms.Normalize(mean=[0.5]*3, std=[0.5]*3) |
|
|
]) |
|
|
|
|
|
print("π‘ Extracting embeddings...") |
|
|
for fname in tqdm(sorted(os.listdir(faces_folder))): |
|
|
path = os.path.join(faces_folder, fname) |
|
|
try: |
|
|
img = Image.open(path).convert('RGB') |
|
|
img_tensor = transform(img).unsqueeze(0) |
|
|
with torch.no_grad(): |
|
|
emb = model(img_tensor).squeeze().numpy() |
|
|
embeddings.append(emb) |
|
|
image_paths.append(path) |
|
|
except Exception as e: |
|
|
print(f"β οΈ Skipping {path}: {e}") |
|
|
|
|
|
if not embeddings: |
|
|
print("β No valid images found for embedding.") |
|
|
return |
|
|
|
|
|
embeddings = np.stack(embeddings) |
|
|
|
|
|
print("π Clustering...") |
|
|
clustering = DBSCAN(eps=eps, min_samples=2, metric='euclidean').fit(embeddings) |
|
|
labels = clustering.labels_ |
|
|
|
|
|
for path, label in zip(image_paths, labels): |
|
|
label_str = f"identity_{label}" if label != -1 else "unknown" |
|
|
identity_dir = os.path.join(output_folder, label_str) |
|
|
os.makedirs(identity_dir, exist_ok=True) |
|
|
fname = os.path.basename(path) |
|
|
cv2.imwrite(os.path.join(identity_dir, fname), cv2.imread(path)) |
|
|
|
|
|
num_clusters = len(set(labels)) - (1 if -1 in labels else 0) |
|
|
print(f"β
Grouped {len(image_paths)} images into {num_clusters} identities.") |
|
|
|
|
|
|
|
|
def split_csv_by_identity( |
|
|
grouped_folder: str = "grouped_faces_facenet", |
|
|
original_csv: str = "meta_data/frames_detections.csv", |
|
|
output_dir: str = "meta_data/identity_csvs" |
|
|
): |
|
|
""" |
|
|
Creates separate CSVs for each identity based on grouped face images. |
|
|
Args: |
|
|
grouped_folder (str): Path to grouped identity folders (e.g., identity_0/, identity_1/). |
|
|
original_csv (str): CSV file with all detections. |
|
|
output_dir (str): Output directory to save identity-specific CSVs. |
|
|
""" |
|
|
|
|
|
if os.path.exists(output_dir): |
|
|
shutil.rmtree(output_dir) |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
df = pd.read_csv(original_csv) |
|
|
|
|
|
|
|
|
df_lookup = {} |
|
|
for i, row in df.iterrows(): |
|
|
key = f"frame_{int(row['frame'])}_face_{i}.jpg" |
|
|
df_lookup[key] = row |
|
|
|
|
|
for identity_name in os.listdir(grouped_folder): |
|
|
identity_path = os.path.join(grouped_folder, identity_name) |
|
|
if not os.path.isdir(identity_path): |
|
|
continue |
|
|
|
|
|
identity_rows = [] |
|
|
|
|
|
for fname in os.listdir(identity_path): |
|
|
if not fname.endswith(".jpg"): |
|
|
continue |
|
|
|
|
|
match = re.match(r"frame_(\d+)_face_(\d+)\.jpg", fname) |
|
|
if not match: |
|
|
print(f"β οΈ Could not parse filename: {fname}") |
|
|
continue |
|
|
|
|
|
key = fname |
|
|
if key in df_lookup: |
|
|
identity_rows.append(df_lookup[key]) |
|
|
else: |
|
|
print(f"β οΈ No match found in CSV for {fname}") |
|
|
|
|
|
if identity_rows: |
|
|
out_csv_path = os.path.join(output_dir, f"{identity_name}.csv") |
|
|
pd.DataFrame(identity_rows).to_csv(out_csv_path, index=False) |
|
|
print(f"β
Saved {len(identity_rows)} rows to {out_csv_path}") |
|
|
else: |
|
|
print(f"β οΈ No valid entries for {identity_name}") |
|
|
|
|
|
def test(): |
|
|
group_faces_by_identity("output_faces", "grouped_faces") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
test() |