Spaces:
Paused
Paused
File size: 4,349 Bytes
cf54850 eff3d67 cf54850 eff3d67 cf54850 eff3d67 969e16d cf54850 eff3d67 19d9b40 eff3d67 cf54850 eff3d67 969e16d eff3d67 969e16d eff3d67 969e16d eff3d67 cf54850 eff3d67 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | from __future__ import annotations
import cv2
import librosa
import numpy as np
import torch
from huggingface_hub import hf_hub_download
class LipSyncModule:
"""
LipFD pretrained lip-sync deepfake detector.
Output score is in [0, 1], higher means more likely fake.
"""
def __init__(self, cache_dir: str = "/data/model_cache"):
self.device = "cpu"
self.cache_dir = cache_dir
self.available = False
self.load_error = ""
try:
self._load_model()
self.available = True
except Exception as exc:
self.model = None
self.load_error = str(exc)
print(f"LipSyncModule unavailable: {exc}")
def _load_model(self) -> None:
ckpt_path = hf_hub_download(
repo_id="akagtag/LipFD-checkpoint",
filename="ckpt.pth",
cache_dir=self.cache_dir,
)
from lipfd.model import LipFDNet
self.model = LipFDNet()
state_dict = torch.load(ckpt_path, map_location="cpu")
if isinstance(state_dict, dict) and "state_dict" in state_dict:
state_dict = state_dict["state_dict"]
current = self.model.state_dict()
compatible = {
key.removeprefix("module."): value
for key, value in state_dict.items()
if key.removeprefix("module.") in current
and current[key.removeprefix("module.")].shape == value.shape
}
self.model.load_state_dict(compatible, strict=False)
self.model.eval()
def to_gpu(self) -> None:
if not self.available:
return
self.device = "cuda"
self.model = self.model.to("cuda")
def to_cpu(self) -> None:
if not self.available:
return
self.device = "cpu"
self.model = self.model.to("cpu")
@torch.no_grad()
def score(self, video_path: str) -> dict:
if not self.available:
return {
"s1": 0.5,
"segments": [],
"note": f"module_unavailable: {self.load_error}",
}
frames, audio, fps = self._preprocess(video_path)
if frames is None or audio is None:
return {"s1": 0.5, "segments": [], "note": "no_face_or_audio"}
frames_t = torch.tensor(frames, dtype=torch.float32).to(self.device)
audio_t = torch.tensor(audio, dtype=torch.float32).to(self.device)
logits = self.model(frames_t, audio_t)
score = torch.sigmoid(logits).mean().item()
return {"s1": score, "segments": self._get_segments(logits, fps)}
def _preprocess(self, video_path: str):
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
frames = []
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
lip_crop = self._extract_lip_region(frame)
if lip_crop is not None and lip_crop.size > 0:
lip_crop = cv2.resize(lip_crop, (96, 96))
frames.append(lip_crop)
cap.release()
if len(frames) < 5:
return None, None, fps
audio, sr = librosa.load(video_path, sr=16000)
if audio.size == 0:
return None, None, fps
mel = librosa.feature.melspectrogram(y=audio, sr=sr)
frames_arr = np.array(frames).transpose(0, 3, 1, 2) / 255.0
return frames_arr, mel, fps
def _extract_lip_region(self, frame):
face_cascade = cv2.CascadeClassifier(
cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = face_cascade.detectMultiScale(gray, 1.3, 5)
if len(faces) == 0:
return None
x, y, w, h = faces[0]
lip_y = y + int(h * 0.65)
lip_h = int(h * 0.35)
lip_x = x + int(w * 0.2)
lip_w = int(w * 0.6)
return frame[lip_y : lip_y + lip_h, lip_x : lip_x + lip_w]
def _get_segments(self, logits, fps: float) -> list[dict]:
scores = torch.sigmoid(logits).detach().cpu().flatten().numpy()
return [
{"time": round(i / fps, 2), "score": round(float(score), 3)}
for i, score in enumerate(scores)
if score > 0.6
]
|