VASR_ / src /face_detection.py
sophiemaw's picture
Update src/face_detection.py
20312b9 verified
import cv2 # video handling
import pandas as pd # saving detections
from tqdm import tqdm # progress bar
import load_models
from load_models import warm_up_models
import os
import pandas as pd
from tqdm import tqdm
import cv2
import os
import pandas as pd
from tqdm import tqdm
from insightface.app import FaceAnalysis
import json
import shutil
import streamlit
#VIDEO INPUT
def detect_faces_insightface_to_csv(
input_video: str,
output_csv: str = None,
app: FaceAnalysis = None,
start_frame: int = 0,
end_frame: int = None
) -> None:
"""
Detects faces in a video using InsightFace and saves bounding boxes, landmarks, and embeddings to a CSV file.
"""
if output_csv is None:
os.makedirs("meta_data", exist_ok=True)
base_name = os.path.splitext(os.path.basename(input_video))[0]
base_name = 'test'
output_csv = os.path.join("meta_data", f"{base_name}_detections.csv")
if app is None:
app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider'])
app.prepare(ctx_id=0)
cap = cv2.VideoCapture(input_video)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if end_frame is None:
end_frame = total_frames - 1
assert 0 <= start_frame <= end_frame < total_frames, "Invalid frame range."
all_detections = []
print(f"πŸ” Detecting faces from frame {start_frame} to {end_frame} with InsightFace...")
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
current_frame = start_frame
with tqdm(total=end_frame - start_frame + 1, desc="Frames processed") as pbar:
while current_frame <= end_frame:
ret, frame = cap.read()
if not ret:
print(f"⚠️ Frame {current_frame} could not be read.")
break
faces = app.get(frame)
for face in faces:
x1, y1, x2, y2 = map(int, face.bbox)
landmarks = face.kps.tolist() if hasattr(face, 'kps') else []
normed_embedding = face.normed_embedding.tolist() if hasattr(face,
'normed_embedding') and face.normed_embedding is not None else []
all_detections.append({
"frame": current_frame,
"x1": x1,
"y1": y1,
"x2": x2,
"y2": y2,
"landmarks": json.dumps(landmarks),
"normed_embedding": json.dumps(normed_embedding)
})
current_frame += 1
pbar.update(1)
cap.release()
df = pd.DataFrame(all_detections)
df.to_csv(output_csv, index=False)
print(f"βœ… InsightFace detections saved to {output_csv}")
def extract_faces_from_csv(
input_video: str,
csv_path: str,
output_folder: str = "output_faces",
padding: int = 10 # number of pixels to expand each side
) -> None:
"""
Extracts and saves face crops from a video using bounding boxes in a CSV.
Args:
input_video (str): Path to original video.
csv_path (str): Path to CSV with face bounding boxes.
output_folder (str): Directory to save cropped face images.
padding (int): Padding in pixels to add around each face crop.
"""
import os
import cv2
import pandas as pd
from tqdm import tqdm
df = pd.read_csv(csv_path)
if df.empty:
print("❌ CSV is empty β€” no faces to extract.")
return
os.makedirs(output_folder, exist_ok=True)
cap = cv2.VideoCapture(input_video)
assert cap.isOpened(), "Failed to open input video"
frame_cache = {}
print("πŸ“¦ Extracting faces...")
for frame_id in tqdm(sorted(df["frame"].unique()), desc="Reading frames"):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
ret, frame = cap.read()
if not ret:
print(f"⚠️ Could not read frame {frame_id}")
continue
frame_cache[frame_id] = frame.copy()
for i, row in tqdm(df.iterrows(), total=len(df), desc="Cropping faces"):
frame_id = row["frame"]
x1, y1, x2, y2 = int(row["x1"]), int(row["y1"]), int(row["x2"]), int(row["y2"])
frame = frame_cache.get(frame_id)
if frame is None:
continue
height, width = frame.shape[:2]
x1_p = max(0, x1 - padding)
y1_p = max(0, y1 - padding)
x2_p = min(width, x2 + padding)
y2_p = min(height, y2 + padding)
crop = frame[y1_p:y2_p, x1_p:x2_p]
img_resized = cv2.resize(crop, (112, 112))
crop_path = os.path.join(output_folder, f"frame_{frame_id}_face_{i}.jpg")
cv2.imwrite(crop_path, img_resized) # βœ… Save the resized image
cap.release()
print(f"βœ… Saved 112x112 face crops to: {output_folder}")
#FRAMES INPUT
def detect_faces_insightface_from_frames(
frames_folder: str,
output_csv: str = None,
app: FaceAnalysis = None,
streamlit_progress=None,
progress_range=(0, 100)
) -> None:
"""
Detects faces in a folder of frames using InsightFace and saves results to a CSV.
"""
frame_paths = sorted(
[f for f in os.listdir(frames_folder) if f.lower().endswith((".jpg", ".png"))],
key=lambda x: int(os.path.splitext(x)[0])
)
if output_csv is None:
os.makedirs("meta_data", exist_ok=True)
base_name = os.path.basename(os.path.normpath(frames_folder))
output_csv = os.path.join("meta_data", f"{base_name}_detections.csv")
if app is None:
app = FaceAnalysis(name='buffalo_l', providers=['CPUExecutionProvider'])
app.prepare(ctx_id=0)
all_detections = []
print("βœ… Total frames found:", len(frame_paths))
print("βœ… First few frames:", frame_paths[:10])
print("βœ… Last few frames:", frame_paths[-10:])
print(f"πŸ” Detecting faces in {len(frame_paths)} frames with InsightFace...")
num_faces = 0
total = len(frame_paths)
start, end = progress_range
for i, filename in enumerate(frame_paths): # <-- Use enumerate to get index
frame_index = int(os.path.splitext(filename)[0])
frame_path = os.path.join(frames_folder, filename)
frame = cv2.imread(frame_path)
if streamlit_progress:
progress_value = start + (end - start) * (i + 1) / total
streamlit_progress.progress(int(progress_value))
if frame is None:
print(f"⚠️ Could not read frame: {frame_path}")
continue
faces = app.get(frame)
for face in faces:
x1, y1, x2, y2 = map(int, face.bbox)
landmarks = face.kps.tolist() if hasattr(face, 'kps') else []
normed_embedding = face.normed_embedding.tolist() if hasattr(face,
'normed_embedding') and face.normed_embedding is not None else []
all_detections.append({
"frame": frame_index,
"x1": x1,
"y1": y1,
"x2": x2,
"y2": y2,
"landmarks": json.dumps(landmarks),
"normed_embedding": json.dumps(normed_embedding)
})
num_faces += 1
df = pd.DataFrame(all_detections)
df.to_csv(output_csv, index=False)
print(f"βœ… InsightFace detections saved to {output_csv}")
print(f"βœ… Number of faces detected: {num_faces}")
def extract_faces_from_frames_folder(
frames_folder: str,
csv_path: str,
output_folder: str = "output_faces",
padding: int = 10,
streamlit_progress=None,
progress_range=(0, 100)
) -> None:
"""
Extracts and saves face crops using bounding boxes from CSV and frames in a folder.
Args:
frames_folder (str): Path to folder containing frame images (named numerically).
csv_path (str): Path to CSV with face bounding boxes.
output_folder (str): Directory to save cropped face images.
padding (int): Padding in pixels to add around each face crop.
"""
df = pd.read_csv(csv_path)
if df.empty:
print("❌ CSV is empty β€” no faces to extract.")
return
# Clean the output folder before saving crops
if os.path.exists(output_folder):
shutil.rmtree(output_folder)
os.makedirs(output_folder, exist_ok=True)
# Preload all frames into a cache
frame_cache = {}
unique_frames = sorted(df["frame"].unique())
print("πŸ“¦ Reading frames from folder...")
for frame_id in tqdm(unique_frames, desc="Reading frames"):
frame_path = os.path.join(frames_folder, f"{frame_id}.jpg")
frame = cv2.imread(frame_path)
if frame is None:
print(f"⚠️ Could not read frame {frame_path}")
continue
frame_cache[frame_id] = frame
print("βœ‚οΈ Cropping faces...")
total_rows = len(df)
start, end = progress_range
for i, row in tqdm(df.iterrows(), total=total_rows, desc="Cropping faces"):
if streamlit_progress:
progress_value = start + (end - start) * (i + 1) / total_rows
streamlit_progress.progress(int(progress_value))
frame_id = row["frame"]
x1, y1, x2, y2 = int(row["x1"]), int(row["y1"]), int(row["x2"]), int(row["y2"])
frame = frame_cache.get(frame_id)
if frame is None:
continue
height, width = frame.shape[:2]
x1_p = max(0, x1 - padding)
y1_p = max(0, y1 - padding)
x2_p = min(width, x2 + padding)
y2_p = min(height, y2 + padding)
crop = frame[y1_p:y2_p, x1_p:x2_p]
img_resized = cv2.resize(crop, (112, 112))
crop_path = os.path.join(output_folder, f"frame_{frame_id}_face_{i}.jpg")
cv2.imwrite(crop_path, img_resized)
print(f"βœ… Saved 112x112 face crops to: {output_folder}")
if __name__ == "__main__":
from grouping import group_faces_by_identity_facenet
input_video = "/Users/sophiemaw/Downloads/CONFIDENTIAL DO NOT SHARE Edna & Paul 29.10.10 Part 2 00.12.46.531.mov"
model_path = "/Users/sophiemaw/Documents/VASR_NEW/pythonProject/models/face_detection/yolov8l-face-lindevs.pt"
csv_path = 'meta_data/test_detections.csv'
detect_faces_insightface_to_csv(input_video=input_video,end_frame=10)
extract_faces_from_csv(input_video=input_video,csv_path=csv_path,padding=30)
group_faces_by_identity_facenet()