|
|
import cv2 |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import json |
|
|
from PIL import Image |
|
|
from insightface.app import FaceAnalysis |
|
|
from insightface.model_zoo import get_model |
|
|
from generate import generate_face_image |
|
|
import os |
|
|
import shutil |
|
|
from tqdm import tqdm |
|
|
import os |
|
|
import cv2 |
|
|
import json |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from PIL import Image |
|
|
from tqdm import tqdm |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from insightface.app import FaceAnalysis |
|
|
from insightface.model_zoo import get_model |
|
|
from concurrent.futures import as_completed |
|
|
|
|
|
|
|
|
def face_swap_with_csv_info( |
|
|
frame: np.ndarray, |
|
|
face_info: dict, |
|
|
src_face, |
|
|
inswapper_model_path: str |
|
|
) -> np.ndarray: |
|
|
class FakeFace: |
|
|
pass |
|
|
|
|
|
try: |
|
|
dst_face = FakeFace() |
|
|
dst_face.bbox = [face_info["x1"], face_info["y1"], face_info["x2"], face_info["y2"]] |
|
|
|
|
|
|
|
|
if "landmark_2d_106" in face_info and face_info["landmark_2d_106"]: |
|
|
landmarks = json.loads(face_info["landmark_2d_106"]) |
|
|
elif "landmarks" in face_info and face_info["landmarks"]: |
|
|
landmarks = json.loads(face_info["landmarks"]) |
|
|
else: |
|
|
print("β No landmarks found in CSV row.") |
|
|
return frame |
|
|
|
|
|
dst_face.landmark_2d_106 = np.array(landmarks) |
|
|
dst_face.kps = dst_face.landmark_2d_106 |
|
|
|
|
|
|
|
|
dst_face.embedding = np.array(json.loads(face_info["embedding"])) if "embedding" in face_info and face_info["embedding"] else np.zeros((512,)) |
|
|
dst_face.normed_embedding = np.array(json.loads(face_info["normed_embedding"])) if "normed_embedding" in face_info and face_info["normed_embedding"] else np.zeros((512,)) |
|
|
except KeyError as e: |
|
|
print(f"β Missing key in CSV row: {e}") |
|
|
return frame |
|
|
|
|
|
|
|
|
inswapper = get_model(inswapper_model_path, providers=["CPUExecutionProvider"]) |
|
|
|
|
|
try: |
|
|
swapped = inswapper.get(frame, dst_face, src_face) |
|
|
return swapped |
|
|
except Exception as e: |
|
|
print(f"β Face swap failed: {e}") |
|
|
return frame |
|
|
|
|
|
def face_swap_on_frame_folder( |
|
|
frame_folder: str, |
|
|
csv_path: str, |
|
|
generated_image_path: str, |
|
|
output_folder: str, |
|
|
inswapper_model_path: str = "models/faceswap/inswapper_128.onnx" |
|
|
) -> None: |
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
|
|
|
|
|
|
generated_face_pil = generated_image_path.convert("RGB") |
|
|
generated_face_np = np.array(generated_face_pil) |
|
|
|
|
|
|
|
|
face_analyser = FaceAnalysis(name="buffalo_l", providers=["CPUExecutionProvider"]) |
|
|
face_analyser.prepare(ctx_id=0, det_size=(640, 640), det_thresh=0.1) |
|
|
embedding_model = get_model("models/buffalo_l/w600k_r50.onnx", providers=["CPUExecutionProvider"]) |
|
|
embedding_model.prepare(ctx_id=0) |
|
|
face_analyser.models["embedding"] = embedding_model |
|
|
|
|
|
src_faces = face_analyser.get(generated_face_np) |
|
|
if not src_faces: |
|
|
print("β No face detected in generated face image.") |
|
|
return |
|
|
src_face = src_faces[0] |
|
|
|
|
|
|
|
|
swapper = get_model(inswapper_model_path, providers=["CPUExecutionProvider"]) |
|
|
|
|
|
|
|
|
df = pd.read_csv(csv_path) |
|
|
if df.empty: |
|
|
print("β CSV is empty.") |
|
|
return |
|
|
|
|
|
grouped = df.groupby("frame") |
|
|
|
|
|
for frame_id, group in grouped: |
|
|
frame_path_jpg = os.path.join(frame_folder, f"{frame_id}.jpg") |
|
|
frame_path_png = os.path.join(frame_folder, f"{frame_id}.png") |
|
|
frame_path = frame_path_jpg if os.path.exists(frame_path_jpg) else frame_path_png |
|
|
|
|
|
if not os.path.exists(frame_path): |
|
|
print(f"β οΈ Frame not found: {frame_path}") |
|
|
continue |
|
|
|
|
|
frame = cv2.imread(frame_path) |
|
|
if frame is None: |
|
|
print(f"β οΈ Could not read frame {frame_id}") |
|
|
continue |
|
|
|
|
|
for i, row in group.iterrows(): |
|
|
|
|
|
class FakeFace: pass |
|
|
dst_face = FakeFace() |
|
|
dst_face.bbox = [row["x1"], row["y1"], row["x2"], row["y2"]] |
|
|
|
|
|
try: |
|
|
landmarks = json.loads(row.get("landmark_2d_106", row.get("landmarks", "[]"))) |
|
|
if not landmarks: |
|
|
continue |
|
|
dst_face.landmark_2d_106 = np.array(landmarks) |
|
|
dst_face.kps = dst_face.landmark_2d_106 |
|
|
dst_face.embedding = np.array(json.loads(row.get("embedding", "[]"))) if "embedding" in row else np.zeros((512,)) |
|
|
dst_face.normed_embedding = np.array(json.loads(row.get("normed_embedding", "[]"))) if "normed_embedding" in row else np.zeros((512,)) |
|
|
except Exception as e: |
|
|
print(f"β οΈ Failed parsing row {i}: {e}") |
|
|
continue |
|
|
|
|
|
try: |
|
|
frame = swapper.get(frame, dst_face, src_face) |
|
|
except Exception as e: |
|
|
print(f"β Swap failed for frame {frame_id}, face {i}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
out_path = os.path.join(output_folder, os.path.basename(frame_path)) |
|
|
cv2.imwrite(out_path, frame) |
|
|
print(f"β
Saved swapped frame: {out_path}") |
|
|
|
|
|
def face_swap_multiple_identities( |
|
|
frame_folder: str, |
|
|
identity_csv_paths: list, |
|
|
generated_images: list, |
|
|
output_folder: str = None, |
|
|
inswapper_model_path: str = "models/faceswap/inswapper_128.onnx", |
|
|
max_workers: int = 4, |
|
|
streamlit_progress=None, |
|
|
progress_range=(0, 100) |
|
|
) -> None: |
|
|
if output_folder is None: |
|
|
output_folder = 'output_frames' |
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
|
|
|
|
|
|
face_analyser = FaceAnalysis(name="buffalo_l", providers=["CPUExecutionProvider"]) |
|
|
face_analyser.prepare(ctx_id=0, det_size=(640, 640), det_thresh=0.1) |
|
|
embedding_model = get_model("models/buffalo_l/w600k_r50.onnx", providers=["CPUExecutionProvider"]) |
|
|
embedding_model.prepare(ctx_id=0) |
|
|
face_analyser.models["embedding"] = embedding_model |
|
|
swapper = get_model(inswapper_model_path, providers=["CPUExecutionProvider"]) |
|
|
|
|
|
|
|
|
src_faces = [] |
|
|
for img in generated_images: |
|
|
if isinstance(img, str): |
|
|
img = Image.open(img) |
|
|
img = img.convert("RGB") |
|
|
face_np = np.array(img) |
|
|
faces = face_analyser.get(face_np) |
|
|
src_faces.append(faces[0] if faces else None) |
|
|
|
|
|
|
|
|
identity_dfs = [pd.read_csv(p) for p in identity_csv_paths] |
|
|
frame_to_faces = {} |
|
|
for identity_idx, df in enumerate(identity_dfs): |
|
|
for _, row in df.iterrows(): |
|
|
frame = int(row["frame"]) |
|
|
frame_to_faces.setdefault(frame, []).append((identity_idx, row)) |
|
|
|
|
|
def process_frame(frame_id): |
|
|
frame_path_jpg = os.path.join(frame_folder, f"{frame_id}.jpg") |
|
|
frame_path_png = os.path.join(frame_folder, f"{frame_id}.png") |
|
|
frame_path = frame_path_jpg if os.path.exists(frame_path_jpg) else frame_path_png |
|
|
|
|
|
if not os.path.exists(frame_path): |
|
|
return |
|
|
|
|
|
frame = cv2.imread(frame_path) |
|
|
if frame is None: |
|
|
return |
|
|
|
|
|
for identity_idx, row in frame_to_faces[frame_id]: |
|
|
src_face = src_faces[identity_idx] |
|
|
if src_face is None: |
|
|
continue |
|
|
|
|
|
class FakeFace: pass |
|
|
dst_face = FakeFace() |
|
|
dst_face.bbox = [row["x1"], row["y1"], row["x2"], row["y2"]] |
|
|
try: |
|
|
landmarks = json.loads(row.get("landmark_2d_106", row.get("landmarks", "[]"))) |
|
|
if not landmarks: |
|
|
continue |
|
|
dst_face.landmark_2d_106 = np.array(landmarks) |
|
|
dst_face.kps = dst_face.landmark_2d_106 |
|
|
dst_face.embedding = np.array(json.loads(row.get("embedding", "[]"))) if "embedding" in row else np.zeros((512,)) |
|
|
dst_face.normed_embedding = np.array(json.loads(row.get("normed_embedding", "[]"))) if "normed_embedding" in row else np.zeros((512,)) |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
try: |
|
|
frame = swapper.get(frame, dst_face, src_face) |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
out_path = os.path.join(output_folder, os.path.basename(frame_path)) |
|
|
cv2.imwrite(out_path, frame) |
|
|
|
|
|
frame_ids = sorted(frame_to_faces.keys()) |
|
|
start_p, end_p = progress_range |
|
|
total = len(frame_ids) |
|
|
completed = 0 |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
|
futures = {executor.submit(process_frame, fid): fid for fid in frame_ids} |
|
|
for future in as_completed(futures): |
|
|
completed += 1 |
|
|
if streamlit_progress: |
|
|
pct = start_p + int((completed / total) * (end_p - start_p)) |
|
|
streamlit_progress.progress(pct) |
|
|
|
|
|
def test_faceswap_on_first_frame( |
|
|
input_video: str, |
|
|
csv_path: str, |
|
|
face_index: int = 0, |
|
|
inswapper_model_path: str = "/Users/sophiemaw/Documents/VASR_NEW/pythonProject/models/faceswap/inswapper_128.onnx", |
|
|
output_path: str = "swapped_test_frame.jpg" |
|
|
) -> None: |
|
|
df = pd.read_csv(csv_path) |
|
|
if df.empty: |
|
|
print("β CSV is empty.") |
|
|
return |
|
|
|
|
|
row = df.iloc[face_index] |
|
|
frame_num = int(row["frame"]) |
|
|
|
|
|
cap = cv2.VideoCapture(input_video) |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) |
|
|
ret, frame = cap.read() |
|
|
cap.release() |
|
|
|
|
|
if not ret: |
|
|
print(f"β Could not read frame {frame_num}.") |
|
|
return |
|
|
|
|
|
|
|
|
face_analyser = FaceAnalysis(name="buffalo_l", providers=["CPUExecutionProvider"]) |
|
|
face_analyser.prepare(ctx_id=0, det_size=(640, 640), det_thresh=0.1) |
|
|
embedding_model = get_model("models/buffalo_l/w600k_r50.onnx", providers=["CPUExecutionProvider"]) |
|
|
embedding_model.prepare(ctx_id=0) |
|
|
face_analyser.models["embedding"] = embedding_model |
|
|
|
|
|
|
|
|
attempts = 0 |
|
|
while True: |
|
|
attempts += 1 |
|
|
gen_face_pil = generate_face_image() |
|
|
gen_face_np = np.asarray(gen_face_pil.convert("RGB")).astype(np.uint8) |
|
|
gen_face_np = cv2.resize(gen_face_np, (256, 256)) |
|
|
|
|
|
print(f"π§ Attempt {attempts}: Generated face shape:", gen_face_np.shape) |
|
|
|
|
|
cv2.imshow("Generated Face", cv2.cvtColor(gen_face_np, cv2.COLOR_RGB2BGR)) |
|
|
cv2.waitKey(500) |
|
|
|
|
|
try: |
|
|
src_faces = face_analyser.get(gen_face_np) |
|
|
except Exception as e: |
|
|
print("βΌοΈ Face detection failed:", e) |
|
|
src_faces = [] |
|
|
|
|
|
if src_faces: |
|
|
print(f"β
Face detected in generated image after {attempts} attempt(s).") |
|
|
src_face = src_faces[0] |
|
|
break |
|
|
else: |
|
|
print("β Still no face detected after processing.") |
|
|
|
|
|
cv2.destroyAllWindows() |
|
|
|
|
|
swapped = face_swap_with_csv_info( |
|
|
frame=frame, |
|
|
face_info=row, |
|
|
src_face=src_face, |
|
|
inswapper_model_path=inswapper_model_path |
|
|
) |
|
|
|
|
|
cv2.imwrite(output_path, swapped) |
|
|
print(f"β
Swapped frame saved to {output_path}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
csv_path = 'meta_data/test_detections.csv' |
|
|
input_video = "/Users/sophiemaw/Downloads/CONFIDENTIAL DO NOT SHARE Edna & Paul 29.10.10 Part 2 00.12.46.531.mov" |
|
|
test_faceswap_on_first_frame(input_video, csv_path) |
|
|
|