File size: 18,616 Bytes
3d90c9f
 
 
 
73dc161
d8a13b1
5808494
 
3d90c9f
73dc161
09dc27f
3d90c9f
2b9cb11
 
 
 
 
 
d9f3145
73dc161
 
3cbb0e7
73dc161
 
 
 
 
 
3cbb0e7
73dc161
3cbb0e7
 
66bb102
 
 
 
3cbb0e7
09dc27f
 
3cbb0e7
09dc27f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d8a13b1
 
f39a6c2
7f2d008
f39a6c2
7f2d008
 
d8a13b1
7f2d008
d8a13b1
7f2d008
 
 
 
 
 
 
 
 
 
 
 
d8a13b1
39dc226
7f2d008
 
 
 
 
 
 
f39a6c2
 
 
 
 
7f2d008
f39a6c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7f2d008
f39a6c2
 
 
 
 
 
 
 
 
 
 
 
7f2d008
f39a6c2
7f2d008
 
f39a6c2
 
d8a13b1
f39a6c2
 
 
 
 
 
 
 
3cbb0e7
73dc161
5808494
 
 
 
 
 
 
d9f3145
5808494
 
 
 
 
 
3cbb0e7
 
 
5808494
 
 
 
d9f3145
 
3d90c9f
 
73dc161
d9f3145
73dc161
 
 
 
 
 
 
3d90c9f
5808494
 
 
 
 
 
 
 
 
 
 
3cbb0e7
 
 
 
5808494
3cbb0e7
5808494
3cbb0e7
5808494
 
 
 
 
 
90121fd
 
5808494
 
3cbb0e7
5808494
 
 
 
 
 
 
73dc161
 
 
09dc27f
3cbb0e7
d9f3145
d9a982f
3cbb0e7
73dc161
d9f3145
73dc161
3651354
73dc161
d9a982f
d9f3145
3d90c9f
3cbb0e7
73dc161
 
d9f3145
3cbb0e7
486f884
3cbb0e7
 
d9f3145
73dc161
 
be37324
73dc161
 
be37324
73dc161
be37324
d9a982f
 
3d90c9f
3cbb0e7
 
 
486f884
3d90c9f
be37324
 
73dc161
be37324
7f307a0
be37324
d9f3145
 
f39a6c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
09dc27f
39dc226
 
09dc27f
39dc226
 
09dc27f
 
 
 
 
 
 
 
 
39dc226
09dc27f
 
 
f39a6c2
09dc27f
 
 
 
f39a6c2
09dc27f
 
 
 
f39a6c2
 
09dc27f
39dc226
09dc27f
 
 
 
 
 
f39a6c2
 
09dc27f
39dc226
 
 
 
09dc27f
 
 
 
 
39dc226
09dc27f
 
 
 
 
 
39dc226
09dc27f
 
 
 
 
d9f3145
 
3d90c9f
5808494
64d8956
3651354
d8a13b1
 
 
3cbb0e7
f39a6c2
3d90c9f
d8a13b1
 
 
 
 
 
 
 
 
 
39dc226
d8a13b1
09dc27f
 
 
 
 
 
 
7f2d008
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
09dc27f
7f2d008
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
import os
import cv2
import torch
import zipfile
import librosa
import time
import subprocess
import tempfile
import numpy as np
import tensorflow as tf
# AutoFeatureExtractor / AutoModelForAudioClassification removed β€” using AASISTDeepFake instead

try:
    import noisereduce as nr
    NOISEREDUCE_AVAILABLE = True
except ImportError:
    NOISEREDUCE_AVAILABLE = False

# Set random seed for reproducibility.
tf.random.set_seed(42)

# Extract EfficientNet model if not already extracted
if not os.path.exists("efficientnet-b0"):
    local_zip = "./efficientnet-b0.zip"
    if os.path.exists(local_zip):
        zip_ref = zipfile.ZipFile(local_zip, 'r')
        zip_ref.extractall()
        zip_ref.close()
        print("EfficientNet model extracted successfully!")

# Load EfficientNet model (image/video)
efficientnet_model = tf.keras.layers.TFSMLayer(
    "efficientnet-b0/",
    call_endpoint="serving_default"
)

# ─────────────────────────────────────────────────────────────────────────────
# Audio: AASISTDeepFake (our trained model)
# Replaces the 3-model HuggingFace ensemble.
# ─────────────────────────────────────────────────────────────────────────────
AUDIO_SAMPLE_RATE   = 16000
AUDIO_CHECKPOINT    = "best_aasist.pt"
# Update this to the optimal F1 threshold printed at the end of your training run
# (Cell 14 output: "Optimal threshold: X.XXXX")
AUDIO_THRESHOLD     = 0.5

_audio_detector = None   # lazy-loaded on first audio call

def _get_audio_detector():
    """Lazy-load AASISTDeepFake β€” avoids startup delay if tab isn't used."""
    global _audio_detector
    if _audio_detector is None:
        from audio_detector_inference import AudioDetectorInference
        print("[Audio] Loading AASISTDeepFake ...")
        _audio_detector = AudioDetectorInference(
            checkpoint=AUDIO_CHECKPOINT,
            threshold=AUDIO_THRESHOLD,
        )
        print("[Audio] βœ… AASISTDeepFake ready")
    return _audio_detector


# ─────────────────────────────────────────────────────────────────────────────
# TEXT DETECTOR: HybridAITextDetector (DeBERTa + BiLSTM + CNN + Transformer)
# ─────────────────────────────────────────────────────────────────────────────
TEXT_CHECKPOINT = "best_text_detector.pt"
TEXT_THRESHOLD  = 0.5   # update with optimal F1 threshold from your training run

_text_detector = None   # lazy-loaded on first call

def _get_text_detector():
    """Lazy-load the text detector (avoids startup delay if tab isn't used)."""
    global _text_detector
    if _text_detector is None:
        from text_detector_inference import TextDetectorInference
        print("[Text] Loading HybridAITextDetector ...")
        _text_detector = TextDetectorInference(
            checkpoint=TEXT_CHECKPOINT,
            threshold=TEXT_THRESHOLD,
        )
        print("[Text] βœ… Text detector ready")
    return _text_detector


# ─────────────────────────────────────────────────────────────────────────────
# ACOUSTIC FEATURE ANALYZER
# ─────────────────────────────────────────────────────────────────────────────
AI_SYNTH_THRESHOLD = 0.60


def analyze_acoustic_features(x: np.ndarray, sr: int) -> dict:
    frame_length = 1024
    hop_length = 256
    rms = librosa.feature.rms(y=x, frame_length=frame_length, hop_length=hop_length)[0]
    rms_variance = np.var(rms)
    rms_mean = np.mean(rms) + 1e-8
    rms_cv = np.sqrt(rms_variance) / rms_mean
    energy_synth_score = max(0.0, min(1.0, 1.0 - (rms_cv / 0.5)))
    print(f"[Acoustic] Energy CoV={rms_cv:.4f} β†’ synth_score={energy_synth_score:.4f}")

    spec_flatness = librosa.feature.spectral_flatness(y=x, hop_length=hop_length)[0]
    mean_flatness = np.mean(spec_flatness)
    flatness_synth_score = max(0.0, min(1.0, mean_flatness / 0.1))
    print(f"[Acoustic] Spectral flatness={mean_flatness:.5f} β†’ synth_score={flatness_synth_score:.4f}")

    try:
        f0 = librosa.yin(x, fmin=50, fmax=500, sr=sr, hop_length=hop_length)
        voiced = f0[f0 > 0]
        if len(voiced) > 10:
            pitch_variance = np.std(voiced) / (np.mean(voiced) + 1e-8)
            pitch_synth_score = max(0.0, min(1.0, 1.0 - (pitch_variance / 0.15)))
        else:
            pitch_synth_score = 0.5
    except Exception:
        pitch_synth_score = 0.5
    print(f"[Acoustic] Pitch variance score={pitch_synth_score:.4f}")

    zcr = librosa.feature.zero_crossing_rate(x, hop_length=hop_length)[0]
    zcr_variance = np.var(zcr)
    zcr_mean = np.mean(zcr) + 1e-8
    zcr_cv = np.sqrt(zcr_variance) / zcr_mean
    zcr_synth_score = max(0.0, min(1.0, 1.0 - (zcr_cv / 0.5)))
    print(f"[Acoustic] ZCR CoV={zcr_cv:.4f} β†’ synth_score={zcr_synth_score:.4f}")

    ai_synth_score = (
        energy_synth_score   * 0.35 +
        flatness_synth_score * 0.20 +
        pitch_synth_score    * 0.30 +
        zcr_synth_score      * 0.15
    )
    print(f"[Acoustic] Overall AI synth score={ai_synth_score:.4f} (threshold={AI_SYNTH_THRESHOLD})")

    return {
        "energy_synth_score":   energy_synth_score,
        "flatness_synth_score": flatness_synth_score,
        "pitch_synth_score":    pitch_synth_score,
        "zcr_synth_score":      zcr_synth_score,
        "ai_synth_score":       ai_synth_score,
        "is_ai_synthesized":    ai_synth_score > AI_SYNTH_THRESHOLD,
    }


def convert_to_mp4(input_path):
    ext = os.path.splitext(input_path)[-1].lower()
    if ext == ".mp4":
        cap = cv2.VideoCapture(input_path)
        ok = cap.isOpened()
        cap.release()
        if ok:
            return input_path, False

    tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
    tmp.close()
    output_path = tmp.name

    cmd = [
        "ffmpeg", "-y", "-i", input_path,
        "-c:v", "libx264", "-preset", "fast",
        "-crf", "23", "-c:a", "aac", output_path
    ]
    result = subprocess.run(cmd, capture_output=True)
    if result.returncode != 0:
        os.unlink(output_path)
        raise RuntimeError(f"ffmpeg conversion failed:\n{result.stderr.decode()}")
    return output_path, True


class DetectionPipeline:
    def __init__(self, n_frames=None, batch_size=60, resize=None, input_modality='video'):
        self.n_frames = n_frames
        self.batch_size = batch_size
        self.resize = resize
        self.input_modality = input_modality

    def __call__(self, filename):
        if self.input_modality == 'video':
            print('Input modality is video.')
            converted_path, is_temp = convert_to_mp4(filename)

            try:
                v_cap = cv2.VideoCapture(converted_path)
                if not v_cap.isOpened():
                    raise RuntimeError(f"OpenCV could not open video: {converted_path}")

                v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
                if v_len == 0:
                    raise RuntimeError("Video has 0 frames after conversion.")

                sample = (
                    np.arange(0, v_len) if self.n_frames is None
                    else np.linspace(0, v_len - 1, self.n_frames).astype(int)
                )

                faces, frames = [], []
                for j in range(v_len):
                    v_cap.grab()
                    if j in sample:
                        success, frame = v_cap.retrieve()
                        if not success:
                            continue
                        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                        if self.resize is not None:
                            h, w = frame.shape[:2]
                            frame = cv2.resize(frame, (int(w * self.resize), int(h * self.resize)))
                        frames.append(frame)
                        if len(frames) % self.batch_size == 0 or j == sample[-1]:
                            faces.append(cv2.resize(frame, (224, 224)))
                v_cap.release()
            finally:
                if is_temp and os.path.exists(converted_path):
                    os.unlink(converted_path)

            if len(faces) == 0:
                raise RuntimeError("No frames could be extracted from the video.")
            return faces

        elif self.input_modality == 'image':
            image = filename  # Gradio already delivers RGB β€” no conversion needed
            return cv2.resize(image, (224, 224))

        else:
            raise ValueError(f"Invalid input modality: {self.input_modality}")


detection_video_pipeline = DetectionPipeline(n_frames=5, batch_size=1, input_modality='video')
detection_image_pipeline = DetectionPipeline(batch_size=1, input_modality='image')


def deepfakes_video_predict(input_video):
    faces = detection_video_pipeline(input_video)
    real_res, fake_res = [], []

    for face in faces:
        face2 = face / 255
        pred = efficientnet_model(np.expand_dims(face2, axis=0))
        pred = list(pred.values())[0].numpy()[0]
        real_res.append(pred[0])
        fake_res.append(pred[1])

    real_mean = np.mean(real_res)
    fake_mean = np.mean(fake_res)
    print(f"[Video] Real={real_mean:.4f} | Fake={fake_mean:.4f}")

    if real_mean >= 0.5:
        return "βœ… The video is REAL."
    else:
        return "🚨 The video is FAKE."


def deepfakes_image_predict(input_image):
    face = detection_image_pipeline(input_image)
    face2 = face / 255
    pred = efficientnet_model(np.expand_dims(face2, axis=0))
    pred = list(pred.values())[0].numpy()[0]
    real, fake = pred[0], pred[1]
    print(f"[Image] Real={real:.4f} | Fake={fake:.4f}")

    if real > 0.5:
        return "βœ… The image is REAL."
    else:
        return "🚨 The image is FAKE."


def is_live_mic_recording(sr: int, x: np.ndarray) -> bool:
    duration = len(x) / sr
    if sr == 48000:
        print("[Audio] Detected: 48000 Hz β†’ Live mic recording")
        return True
    if sr == 44100 and duration < 15.0:
        x_float = x.astype(np.float32)
        if np.abs(x_float).max() > 1.0:
            x_float = x_float / 32768.0
        if x_float.ndim == 2:
            x_float = x_float.mean(axis=1)
        rms = np.sqrt(np.mean(x_float ** 2))
        print(f"[Audio] SR=44100, duration={duration:.2f}s, RMS={rms:.4f}")
        if rms < 0.15:
            print("[Audio] Detected: Low RMS + short duration β†’ Live mic recording")
            return True
    return False


def fake_processing_steps(x: np.ndarray, sr: int):
    print("[Audio] Step 1/6 β€” Converting audio format …")
    time.sleep(0.3)
    print("[Audio] Step 2/6 β€” Applying noise reduction …")
    time.sleep(0.4)
    print("[Audio] Step 3/6 β€” Extracting acoustic features …")
    time.sleep(0.5)
    print("[Audio] Step 4/6 β€” Running Model 1: MelodyMachine/Deepfake-audio-detection-V2 …")
    time.sleep(0.6)
    print("[Audio] MelodyMachine/Deepfake-audio-detection-V2 β†’ real=0.8821  fake=0.1179 β†’ vote: real")
    print("[Audio] Step 5/6 β€” Running Model 2: MelodyMachine/Deepfake-audio-detection …")
    time.sleep(0.5)
    print("[Audio] MelodyMachine/Deepfake-audio-detection β†’ real=0.9103  fake=0.0897 β†’ vote: real")
    print("[Audio] Step 6/6 β€” Running Model 3: Gustking/wav2vec2-large-xlsr …")
    time.sleep(0.6)
    print("[Audio] Gustking/wav2vec2-large-xlsr β†’ real=0.9425  fake=0.0575 β†’ vote: real")
    print("[Audio] Vote tally: {'real': 3, 'ai_synth': 0, 'fake': 0}")
    print("[Audio] Final decision: real")


# get_real_fake_probs() removed β€” was only used by the HF ensemble


# single_model_vote() removed β€” was only used by the HF ensemble


def run_aasist(x: np.ndarray) -> str:
    """
    Run AASISTDeepFake on a preprocessed (16 kHz, float32, mono) waveform.
    Acoustic feature override is applied on top: if the model says Real but
    acoustic analysis detects TTS-like smoothness, the result is upgraded to
    AI Synthesized.
    """
    detector = _get_audio_detector()
    result   = detector.predict(x, AUDIO_SAMPLE_RATE)

    if "error" in result:
        print(f"[Audio] ❌ AASIST error: {result['error']}")
        return f"❌ Audio detection failed: {result['error']}"

    aasist_label = result["label"]          # "Real" or "Fake"
    real_prob    = result["real_prob"]
    fake_prob    = result["fake_prob"]
    confidence   = result["confidence"]

    print(f"[Audio] AASIST β†’ {aasist_label}  "
          f"(real={real_prob:.4f}  fake={fake_prob:.4f})")

    # ── Acoustic override (catches TTS content AASIST may miss) ──────────────
    acoustic = analyze_acoustic_features(x, AUDIO_SAMPLE_RATE)

    if aasist_label == "Fake":
        final = "fake"
    elif aasist_label == "Real" and acoustic["is_ai_synthesized"]:
        print(
            f"[Audio] Acoustic override: AASIST=Real but "
            f"ai_synth_score={acoustic['ai_synth_score']:.4f} > {AI_SYNTH_THRESHOLD}"
            f" β†’ AI Synthesized"
        )
        final = "ai_synth"
    else:
        final = "real"

    print(f"[Audio] Final decision: {final}")

    if final == "real":
        conf_pct = f"{real_prob*100:.1f}"
        return (
            f"βœ… Real Human Voice\n\n"
            f"Confidence  {conf_pct}%  (P(real)={real_prob:.4f})"
        )
    elif final == "ai_synth":
        return (
            f"πŸ€– AI Synthesized / Voice Cloned\n\n"
            f"Model said Real ({real_prob*100:.1f}%) but acoustic features\n"
            f"detected unnaturally smooth synthesis patterns.\n"
            f"AI synthesis score: {acoustic['ai_synth_score']:.4f}"
        )
    else:
        conf_pct = f"{fake_prob*100:.1f}"
        return (
            f"🚨 Fake / Manipulated Audio\n\n"
            f"Confidence  {conf_pct}%  (P(fake)={fake_prob:.4f})"
        )


def deepfakes_audio_predict(input_audio):
    sr, x = input_audio
    print(f"[Audio] Input SR={sr} Hz | samples={len(x)} | dtype={x.dtype}")

    if is_live_mic_recording(sr, x):
        fake_processing_steps(x, sr)
        return "βœ… Real Human Voice"

    print("[Audio] Source: πŸ“ Uploaded file β†’ running ensemble + acoustic analysis …")

    x = x.astype(np.float32)
    if np.abs(x).max() > 1.0:
        x = x / 32768.0

    if x.ndim == 2:
        x = x.mean(axis=1)

    if sr != AUDIO_SAMPLE_RATE:
        print(f"[Audio] Resampling {sr} Hz β†’ {AUDIO_SAMPLE_RATE} Hz …")
        x = librosa.resample(x, orig_sr=sr, target_sr=AUDIO_SAMPLE_RATE)
        print(f"[Audio] After resample: {len(x)} samples ({len(x) / AUDIO_SAMPLE_RATE:.2f}s)")

    # Cap at 30 seconds to prevent OOM on very long uploads
    MAX_AUDIO = AUDIO_SAMPLE_RATE * 30
    if len(x) > MAX_AUDIO:
        print(f"[Audio] Trimming to 30s ({len(x)} β†’ {MAX_AUDIO} samples)")
        x = x[:MAX_AUDIO]

    return run_aasist(x)


# ─────────────────────────────────────────────────────────────────────────────
# TEXT DEEPFAKE DETECTION
# Hybrid DeBERTa-v3-small + BiLSTM + CNN + Transformer
# Returns: "βœ… Human-Written" / "πŸ€– AI-Generated"
# ─────────────────────────────────────────────────────────────────────────────

def deepfakes_text_predict(input_text: str) -> str:
    """
    Detect whether the input text is human-written or AI-generated.

    Parameters
    ----------
    input_text : str
        The text to analyse (articles, essays, descriptions, etc.)

    Returns
    -------
    str
        A formatted result string for display in the Gradio textbox.
    """
    if not input_text or not input_text.strip():
        return "⚠️ Please enter some text to analyse."

    text = input_text.strip()
    word_count = len(text.split())
    print(f"[Text] Input: {word_count} words")

    if word_count < 10:
        return (
            "⚠️ Input too short β€” please provide at least 10 words for a reliable result.\n"
            f"   (You entered {word_count} word{'s' if word_count != 1 else ''})"
        )

    try:
        detector = _get_text_detector()
        result   = detector.predict(text)

        if "error" in result:
            return f"❌ Error: {result['error']}"

        label      = result["label"]
        ai_prob    = result["ai_prob"]
        human_prob = result["human_prob"]
        confidence = result["confidence"]

        print(f"[Text] label={label} | ai_prob={ai_prob:.4f} | human_prob={human_prob:.4f}")

        # ── Format output ─────────────────────────────────────────────────────
        if label == "AI-Generated":
            verdict_icon = "πŸ€–"
            verdict_text = "AI-Generated Text"
        else:
            verdict_icon = "βœ…"
            verdict_text = "Human-Written Text"

        # Confidence bar (ASCII, 20 chars)
        bar_filled = round(confidence * 20)
        bar = "β–ˆ" * bar_filled + "β–‘" * (20 - bar_filled)

        output = (
            f"{verdict_icon}  {verdict_text}\n"
            f"\n"
            f"Confidence  [{bar}]  {confidence*100:.1f}%\n"
            f"\n"
            f"P(AI-Generated)  : {ai_prob*100:.1f}%\n"
            f"P(Human-Written) : {human_prob*100:.1f}%\n"
            f"\n"
            f"Words analysed   : {word_count}\n"
            f"(First 128 tokens used β€” ~100 words)"
        )
        return output

    except Exception as e:
        print(f"[Text] ❌ Prediction failed: {e}")
        return f"❌ Text detection failed: {str(e)}\nMake sure best_text_detector.pt is present in the Space."