|
|
import cv2 |
|
|
import pandas as pd |
|
|
from tqdm import tqdm |
|
|
import load_models |
|
|
from load_models import warm_up_models |
|
|
import os |
|
|
import pandas as pd |
|
|
from tqdm import tqdm |
|
|
import cv2 |
|
|
import os |
|
|
import pandas as pd |
|
|
from tqdm import tqdm |
|
|
from insightface.app import FaceAnalysis |
|
|
import json |
|
|
import shutil |
|
|
import streamlit |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_faces_insightface_to_csv( |
|
|
input_video: str, |
|
|
output_csv: str = None, |
|
|
app: FaceAnalysis = None, |
|
|
start_frame: int = 0, |
|
|
end_frame: int = None |
|
|
) -> None: |
|
|
""" |
|
|
Detects faces in a video using InsightFace and saves bounding boxes, landmarks, and embeddings to a CSV file. |
|
|
""" |
|
|
if output_csv is None: |
|
|
os.makedirs("meta_data", exist_ok=True) |
|
|
base_name = os.path.splitext(os.path.basename(input_video))[0] |
|
|
base_name = 'test' |
|
|
output_csv = os.path.join("meta_data", f"{base_name}_detections.csv") |
|
|
|
|
|
if app is None: |
|
|
app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) |
|
|
app.prepare(ctx_id=0) |
|
|
|
|
|
cap = cv2.VideoCapture(input_video) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
if end_frame is None: |
|
|
end_frame = total_frames - 1 |
|
|
|
|
|
assert 0 <= start_frame <= end_frame < total_frames, "Invalid frame range." |
|
|
|
|
|
all_detections = [] |
|
|
|
|
|
print(f"π Detecting faces from frame {start_frame} to {end_frame} with InsightFace...") |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) |
|
|
current_frame = start_frame |
|
|
|
|
|
with tqdm(total=end_frame - start_frame + 1, desc="Frames processed") as pbar: |
|
|
while current_frame <= end_frame: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
print(f"β οΈ Frame {current_frame} could not be read.") |
|
|
break |
|
|
|
|
|
faces = app.get(frame) |
|
|
for face in faces: |
|
|
x1, y1, x2, y2 = map(int, face.bbox) |
|
|
landmarks = face.kps.tolist() if hasattr(face, 'kps') else [] |
|
|
normed_embedding = face.normed_embedding.tolist() if hasattr(face, |
|
|
'normed_embedding') and face.normed_embedding is not None else [] |
|
|
|
|
|
all_detections.append({ |
|
|
"frame": current_frame, |
|
|
"x1": x1, |
|
|
"y1": y1, |
|
|
"x2": x2, |
|
|
"y2": y2, |
|
|
"landmarks": json.dumps(landmarks), |
|
|
"normed_embedding": json.dumps(normed_embedding) |
|
|
}) |
|
|
|
|
|
current_frame += 1 |
|
|
pbar.update(1) |
|
|
|
|
|
cap.release() |
|
|
|
|
|
df = pd.DataFrame(all_detections) |
|
|
df.to_csv(output_csv, index=False) |
|
|
print(f"β
InsightFace detections saved to {output_csv}") |
|
|
def extract_faces_from_csv( |
|
|
input_video: str, |
|
|
csv_path: str, |
|
|
output_folder: str = "output_faces", |
|
|
padding: int = 10 |
|
|
) -> None: |
|
|
""" |
|
|
Extracts and saves face crops from a video using bounding boxes in a CSV. |
|
|
Args: |
|
|
input_video (str): Path to original video. |
|
|
csv_path (str): Path to CSV with face bounding boxes. |
|
|
output_folder (str): Directory to save cropped face images. |
|
|
padding (int): Padding in pixels to add around each face crop. |
|
|
""" |
|
|
import os |
|
|
import cv2 |
|
|
import pandas as pd |
|
|
from tqdm import tqdm |
|
|
|
|
|
df = pd.read_csv(csv_path) |
|
|
if df.empty: |
|
|
print("β CSV is empty β no faces to extract.") |
|
|
return |
|
|
|
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
|
|
|
cap = cv2.VideoCapture(input_video) |
|
|
assert cap.isOpened(), "Failed to open input video" |
|
|
|
|
|
frame_cache = {} |
|
|
|
|
|
print("π¦ Extracting faces...") |
|
|
for frame_id in tqdm(sorted(df["frame"].unique()), desc="Reading frames"): |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id) |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
print(f"β οΈ Could not read frame {frame_id}") |
|
|
continue |
|
|
frame_cache[frame_id] = frame.copy() |
|
|
|
|
|
for i, row in tqdm(df.iterrows(), total=len(df), desc="Cropping faces"): |
|
|
frame_id = row["frame"] |
|
|
x1, y1, x2, y2 = int(row["x1"]), int(row["y1"]), int(row["x2"]), int(row["y2"]) |
|
|
frame = frame_cache.get(frame_id) |
|
|
if frame is None: |
|
|
continue |
|
|
|
|
|
height, width = frame.shape[:2] |
|
|
x1_p = max(0, x1 - padding) |
|
|
y1_p = max(0, y1 - padding) |
|
|
x2_p = min(width, x2 + padding) |
|
|
y2_p = min(height, y2 + padding) |
|
|
|
|
|
crop = frame[y1_p:y2_p, x1_p:x2_p] |
|
|
img_resized = cv2.resize(crop, (112, 112)) |
|
|
|
|
|
crop_path = os.path.join(output_folder, f"frame_{frame_id}_face_{i}.jpg") |
|
|
cv2.imwrite(crop_path, img_resized) |
|
|
|
|
|
cap.release() |
|
|
print(f"β
Saved 112x112 face crops to: {output_folder}") |
|
|
|
|
|
|
|
|
def detect_faces_insightface_from_frames( |
|
|
frames_folder: str, |
|
|
output_csv: str = None, |
|
|
app: FaceAnalysis = None, |
|
|
streamlit_progress=None, |
|
|
progress_range=(0, 100) |
|
|
) -> None: |
|
|
""" |
|
|
Detects faces in a folder of frames using InsightFace and saves results to a CSV. |
|
|
""" |
|
|
frame_paths = sorted( |
|
|
[f for f in os.listdir(frames_folder) if f.lower().endswith((".jpg", ".png"))], |
|
|
key=lambda x: int(os.path.splitext(x)[0]) |
|
|
) |
|
|
|
|
|
if output_csv is None: |
|
|
os.makedirs("meta_data", exist_ok=True) |
|
|
base_name = os.path.basename(os.path.normpath(frames_folder)) |
|
|
output_csv = os.path.join("meta_data", f"{base_name}_detections.csv") |
|
|
|
|
|
if app is None: |
|
|
app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider']) |
|
|
app.prepare(ctx_id=0) |
|
|
|
|
|
all_detections = [] |
|
|
print("β
Total frames found:", len(frame_paths)) |
|
|
print("β
First few frames:", frame_paths[:10]) |
|
|
print("β
Last few frames:", frame_paths[-10:]) |
|
|
|
|
|
print(f"π Detecting faces in {len(frame_paths)} frames with InsightFace...") |
|
|
num_faces = 0 |
|
|
total = len(frame_paths) |
|
|
start, end = progress_range |
|
|
for i, filename in enumerate(frame_paths): |
|
|
frame_index = int(os.path.splitext(filename)[0]) |
|
|
frame_path = os.path.join(frames_folder, filename) |
|
|
frame = cv2.imread(frame_path) |
|
|
|
|
|
if streamlit_progress: |
|
|
progress_value = start + (end - start) * (i + 1) / total |
|
|
streamlit_progress.progress(int(progress_value)) |
|
|
|
|
|
if frame is None: |
|
|
print(f"β οΈ Could not read frame: {frame_path}") |
|
|
continue |
|
|
|
|
|
faces = app.get(frame) |
|
|
for face in faces: |
|
|
x1, y1, x2, y2 = map(int, face.bbox) |
|
|
landmarks = face.kps.tolist() if hasattr(face, 'kps') else [] |
|
|
normed_embedding = face.normed_embedding.tolist() if hasattr(face, |
|
|
'normed_embedding') and face.normed_embedding is not None else [] |
|
|
|
|
|
all_detections.append({ |
|
|
"frame": frame_index, |
|
|
"x1": x1, |
|
|
"y1": y1, |
|
|
"x2": x2, |
|
|
"y2": y2, |
|
|
"landmarks": json.dumps(landmarks), |
|
|
"normed_embedding": json.dumps(normed_embedding) |
|
|
}) |
|
|
num_faces += 1 |
|
|
|
|
|
df = pd.DataFrame(all_detections) |
|
|
df.to_csv(output_csv, index=False) |
|
|
print(f"β
InsightFace detections saved to {output_csv}") |
|
|
print(f"β
Number of faces detected: {num_faces}") |
|
|
|
|
|
def extract_faces_from_frames_folder( |
|
|
frames_folder: str, |
|
|
csv_path: str, |
|
|
output_folder: str = "output_faces", |
|
|
padding: int = 10, |
|
|
streamlit_progress=None, |
|
|
progress_range=(0, 100) |
|
|
) -> None: |
|
|
""" |
|
|
Extracts and saves face crops using bounding boxes from CSV and frames in a folder. |
|
|
Args: |
|
|
frames_folder (str): Path to folder containing frame images (named numerically). |
|
|
csv_path (str): Path to CSV with face bounding boxes. |
|
|
output_folder (str): Directory to save cropped face images. |
|
|
padding (int): Padding in pixels to add around each face crop. |
|
|
""" |
|
|
df = pd.read_csv(csv_path) |
|
|
if df.empty: |
|
|
print("β CSV is empty β no faces to extract.") |
|
|
return |
|
|
|
|
|
|
|
|
if os.path.exists(output_folder): |
|
|
shutil.rmtree(output_folder) |
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
|
|
|
|
|
|
frame_cache = {} |
|
|
unique_frames = sorted(df["frame"].unique()) |
|
|
|
|
|
print("π¦ Reading frames from folder...") |
|
|
for frame_id in tqdm(unique_frames, desc="Reading frames"): |
|
|
frame_path = os.path.join(frames_folder, f"{frame_id}.jpg") |
|
|
frame = cv2.imread(frame_path) |
|
|
if frame is None: |
|
|
print(f"β οΈ Could not read frame {frame_path}") |
|
|
continue |
|
|
frame_cache[frame_id] = frame |
|
|
|
|
|
print("βοΈ Cropping faces...") |
|
|
total_rows = len(df) |
|
|
start, end = progress_range |
|
|
for i, row in tqdm(df.iterrows(), total=total_rows, desc="Cropping faces"): |
|
|
if streamlit_progress: |
|
|
progress_value = start + (end - start) * (i + 1) / total_rows |
|
|
streamlit_progress.progress(int(progress_value)) |
|
|
frame_id = row["frame"] |
|
|
x1, y1, x2, y2 = int(row["x1"]), int(row["y1"]), int(row["x2"]), int(row["y2"]) |
|
|
frame = frame_cache.get(frame_id) |
|
|
if frame is None: |
|
|
continue |
|
|
|
|
|
height, width = frame.shape[:2] |
|
|
x1_p = max(0, x1 - padding) |
|
|
y1_p = max(0, y1 - padding) |
|
|
x2_p = min(width, x2 + padding) |
|
|
y2_p = min(height, y2 + padding) |
|
|
|
|
|
crop = frame[y1_p:y2_p, x1_p:x2_p] |
|
|
img_resized = cv2.resize(crop, (112, 112)) |
|
|
|
|
|
crop_path = os.path.join(output_folder, f"frame_{frame_id}_face_{i}.jpg") |
|
|
cv2.imwrite(crop_path, img_resized) |
|
|
|
|
|
print(f"β
Saved 112x112 face crops to: {output_folder}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
from grouping import group_faces_by_identity_facenet |
|
|
input_video = "/Users/sophiemaw/Downloads/CONFIDENTIAL DO NOT SHARE Edna & Paul 29.10.10 Part 2 00.12.46.531.mov" |
|
|
model_path = "/Users/sophiemaw/Documents/VASR_NEW/pythonProject/models/face_detection/yolov8l-face-lindevs.pt" |
|
|
csv_path = 'meta_data/test_detections.csv' |
|
|
detect_faces_insightface_to_csv(input_video=input_video,end_frame=10) |
|
|
extract_faces_from_csv(input_video=input_video,csv_path=csv_path,padding=30) |
|
|
group_faces_by_identity_facenet() |