File size: 4,349 Bytes
cf54850
 
eff3d67
 
 
 
 
cf54850
 
 
eff3d67
 
 
 
 
cf54850
eff3d67
 
969e16d
 
 
 
 
 
 
 
 
cf54850
eff3d67
 
19d9b40
eff3d67
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cf54850
eff3d67
 
 
 
969e16d
 
eff3d67
 
 
 
969e16d
 
eff3d67
 
 
 
 
969e16d
 
 
 
 
 
 
eff3d67
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cf54850
eff3d67
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from __future__ import annotations

import cv2
import librosa
import numpy as np
import torch
from huggingface_hub import hf_hub_download


class LipSyncModule:
    """
    LipFD pretrained lip-sync deepfake detector.
    Output score is in [0, 1], higher means more likely fake.
    """

    def __init__(self, cache_dir: str = "/data/model_cache"):
        self.device = "cpu"
        self.cache_dir = cache_dir
        self.available = False
        self.load_error = ""
        try:
            self._load_model()
            self.available = True
        except Exception as exc:
            self.model = None
            self.load_error = str(exc)
            print(f"LipSyncModule unavailable: {exc}")

    def _load_model(self) -> None:
        ckpt_path = hf_hub_download(
            repo_id="akagtag/LipFD-checkpoint",
            filename="ckpt.pth",
            cache_dir=self.cache_dir,
        )
        from lipfd.model import LipFDNet

        self.model = LipFDNet()
        state_dict = torch.load(ckpt_path, map_location="cpu")
        if isinstance(state_dict, dict) and "state_dict" in state_dict:
            state_dict = state_dict["state_dict"]
        current = self.model.state_dict()
        compatible = {
            key.removeprefix("module."): value
            for key, value in state_dict.items()
            if key.removeprefix("module.") in current
            and current[key.removeprefix("module.")].shape == value.shape
        }
        self.model.load_state_dict(compatible, strict=False)
        self.model.eval()

    def to_gpu(self) -> None:
        if not self.available:
            return
        self.device = "cuda"
        self.model = self.model.to("cuda")

    def to_cpu(self) -> None:
        if not self.available:
            return
        self.device = "cpu"
        self.model = self.model.to("cpu")

    @torch.no_grad()
    def score(self, video_path: str) -> dict:
        if not self.available:
            return {
                "s1": 0.5,
                "segments": [],
                "note": f"module_unavailable: {self.load_error}",
            }

        frames, audio, fps = self._preprocess(video_path)

        if frames is None or audio is None:
            return {"s1": 0.5, "segments": [], "note": "no_face_or_audio"}

        frames_t = torch.tensor(frames, dtype=torch.float32).to(self.device)
        audio_t = torch.tensor(audio, dtype=torch.float32).to(self.device)

        logits = self.model(frames_t, audio_t)
        score = torch.sigmoid(logits).mean().item()

        return {"s1": score, "segments": self._get_segments(logits, fps)}

    def _preprocess(self, video_path: str):
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
        frames = []
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            lip_crop = self._extract_lip_region(frame)
            if lip_crop is not None and lip_crop.size > 0:
                lip_crop = cv2.resize(lip_crop, (96, 96))
                frames.append(lip_crop)
        cap.release()

        if len(frames) < 5:
            return None, None, fps

        audio, sr = librosa.load(video_path, sr=16000)
        if audio.size == 0:
            return None, None, fps

        mel = librosa.feature.melspectrogram(y=audio, sr=sr)
        frames_arr = np.array(frames).transpose(0, 3, 1, 2) / 255.0
        return frames_arr, mel, fps

    def _extract_lip_region(self, frame):
        face_cascade = cv2.CascadeClassifier(
            cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
        )
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(gray, 1.3, 5)
        if len(faces) == 0:
            return None
        x, y, w, h = faces[0]
        lip_y = y + int(h * 0.65)
        lip_h = int(h * 0.35)
        lip_x = x + int(w * 0.2)
        lip_w = int(w * 0.6)
        return frame[lip_y : lip_y + lip_h, lip_x : lip_x + lip_w]

    def _get_segments(self, logits, fps: float) -> list[dict]:
        scores = torch.sigmoid(logits).detach().cpu().flatten().numpy()
        return [
            {"time": round(i / fps, 2), "score": round(float(score), 3)}
            for i, score in enumerate(scores)
            if score > 0.6
        ]