pavankumarvk's picture
Update pipeline.py
09dc27f verified
import os
import cv2
import torch
import zipfile
import librosa
import time
import subprocess
import tempfile
import numpy as np
import tensorflow as tf
# AutoFeatureExtractor / AutoModelForAudioClassification removed β€” using AASISTDeepFake instead
try:
import noisereduce as nr
NOISEREDUCE_AVAILABLE = True
except ImportError:
NOISEREDUCE_AVAILABLE = False
# Set random seed for reproducibility.
tf.random.set_seed(42)
# Extract EfficientNet model if not already extracted
if not os.path.exists("efficientnet-b0"):
local_zip = "./efficientnet-b0.zip"
if os.path.exists(local_zip):
zip_ref = zipfile.ZipFile(local_zip, 'r')
zip_ref.extractall()
zip_ref.close()
print("EfficientNet model extracted successfully!")
# Load EfficientNet model (image/video)
efficientnet_model = tf.keras.layers.TFSMLayer(
"efficientnet-b0/",
call_endpoint="serving_default"
)
# ─────────────────────────────────────────────────────────────────────────────
# Audio: AASISTDeepFake (our trained model)
# Replaces the 3-model HuggingFace ensemble.
# ─────────────────────────────────────────────────────────────────────────────
AUDIO_SAMPLE_RATE = 16000
AUDIO_CHECKPOINT = "best_aasist.pt"
# Update this to the optimal F1 threshold printed at the end of your training run
# (Cell 14 output: "Optimal threshold: X.XXXX")
AUDIO_THRESHOLD = 0.5
_audio_detector = None # lazy-loaded on first audio call
def _get_audio_detector():
"""Lazy-load AASISTDeepFake β€” avoids startup delay if tab isn't used."""
global _audio_detector
if _audio_detector is None:
from audio_detector_inference import AudioDetectorInference
print("[Audio] Loading AASISTDeepFake ...")
_audio_detector = AudioDetectorInference(
checkpoint=AUDIO_CHECKPOINT,
threshold=AUDIO_THRESHOLD,
)
print("[Audio] βœ… AASISTDeepFake ready")
return _audio_detector
# ─────────────────────────────────────────────────────────────────────────────
# TEXT DETECTOR: HybridAITextDetector (DeBERTa + BiLSTM + CNN + Transformer)
# ─────────────────────────────────────────────────────────────────────────────
TEXT_CHECKPOINT = "best_text_detector.pt"
TEXT_THRESHOLD = 0.5 # update with optimal F1 threshold from your training run
_text_detector = None # lazy-loaded on first call
def _get_text_detector():
"""Lazy-load the text detector (avoids startup delay if tab isn't used)."""
global _text_detector
if _text_detector is None:
from text_detector_inference import TextDetectorInference
print("[Text] Loading HybridAITextDetector ...")
_text_detector = TextDetectorInference(
checkpoint=TEXT_CHECKPOINT,
threshold=TEXT_THRESHOLD,
)
print("[Text] βœ… Text detector ready")
return _text_detector
# ─────────────────────────────────────────────────────────────────────────────
# ACOUSTIC FEATURE ANALYZER
# ─────────────────────────────────────────────────────────────────────────────
AI_SYNTH_THRESHOLD = 0.60
def analyze_acoustic_features(x: np.ndarray, sr: int) -> dict:
frame_length = 1024
hop_length = 256
rms = librosa.feature.rms(y=x, frame_length=frame_length, hop_length=hop_length)[0]
rms_variance = np.var(rms)
rms_mean = np.mean(rms) + 1e-8
rms_cv = np.sqrt(rms_variance) / rms_mean
energy_synth_score = max(0.0, min(1.0, 1.0 - (rms_cv / 0.5)))
print(f"[Acoustic] Energy CoV={rms_cv:.4f} β†’ synth_score={energy_synth_score:.4f}")
spec_flatness = librosa.feature.spectral_flatness(y=x, hop_length=hop_length)[0]
mean_flatness = np.mean(spec_flatness)
flatness_synth_score = max(0.0, min(1.0, mean_flatness / 0.1))
print(f"[Acoustic] Spectral flatness={mean_flatness:.5f} β†’ synth_score={flatness_synth_score:.4f}")
try:
f0 = librosa.yin(x, fmin=50, fmax=500, sr=sr, hop_length=hop_length)
voiced = f0[f0 > 0]
if len(voiced) > 10:
pitch_variance = np.std(voiced) / (np.mean(voiced) + 1e-8)
pitch_synth_score = max(0.0, min(1.0, 1.0 - (pitch_variance / 0.15)))
else:
pitch_synth_score = 0.5
except Exception:
pitch_synth_score = 0.5
print(f"[Acoustic] Pitch variance score={pitch_synth_score:.4f}")
zcr = librosa.feature.zero_crossing_rate(x, hop_length=hop_length)[0]
zcr_variance = np.var(zcr)
zcr_mean = np.mean(zcr) + 1e-8
zcr_cv = np.sqrt(zcr_variance) / zcr_mean
zcr_synth_score = max(0.0, min(1.0, 1.0 - (zcr_cv / 0.5)))
print(f"[Acoustic] ZCR CoV={zcr_cv:.4f} β†’ synth_score={zcr_synth_score:.4f}")
ai_synth_score = (
energy_synth_score * 0.35 +
flatness_synth_score * 0.20 +
pitch_synth_score * 0.30 +
zcr_synth_score * 0.15
)
print(f"[Acoustic] Overall AI synth score={ai_synth_score:.4f} (threshold={AI_SYNTH_THRESHOLD})")
return {
"energy_synth_score": energy_synth_score,
"flatness_synth_score": flatness_synth_score,
"pitch_synth_score": pitch_synth_score,
"zcr_synth_score": zcr_synth_score,
"ai_synth_score": ai_synth_score,
"is_ai_synthesized": ai_synth_score > AI_SYNTH_THRESHOLD,
}
def convert_to_mp4(input_path):
ext = os.path.splitext(input_path)[-1].lower()
if ext == ".mp4":
cap = cv2.VideoCapture(input_path)
ok = cap.isOpened()
cap.release()
if ok:
return input_path, False
tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
tmp.close()
output_path = tmp.name
cmd = [
"ffmpeg", "-y", "-i", input_path,
"-c:v", "libx264", "-preset", "fast",
"-crf", "23", "-c:a", "aac", output_path
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
os.unlink(output_path)
raise RuntimeError(f"ffmpeg conversion failed:\n{result.stderr.decode()}")
return output_path, True
class DetectionPipeline:
def __init__(self, n_frames=None, batch_size=60, resize=None, input_modality='video'):
self.n_frames = n_frames
self.batch_size = batch_size
self.resize = resize
self.input_modality = input_modality
def __call__(self, filename):
if self.input_modality == 'video':
print('Input modality is video.')
converted_path, is_temp = convert_to_mp4(filename)
try:
v_cap = cv2.VideoCapture(converted_path)
if not v_cap.isOpened():
raise RuntimeError(f"OpenCV could not open video: {converted_path}")
v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT))
if v_len == 0:
raise RuntimeError("Video has 0 frames after conversion.")
sample = (
np.arange(0, v_len) if self.n_frames is None
else np.linspace(0, v_len - 1, self.n_frames).astype(int)
)
faces, frames = [], []
for j in range(v_len):
v_cap.grab()
if j in sample:
success, frame = v_cap.retrieve()
if not success:
continue
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
if self.resize is not None:
h, w = frame.shape[:2]
frame = cv2.resize(frame, (int(w * self.resize), int(h * self.resize)))
frames.append(frame)
if len(frames) % self.batch_size == 0 or j == sample[-1]:
faces.append(cv2.resize(frame, (224, 224)))
v_cap.release()
finally:
if is_temp and os.path.exists(converted_path):
os.unlink(converted_path)
if len(faces) == 0:
raise RuntimeError("No frames could be extracted from the video.")
return faces
elif self.input_modality == 'image':
image = filename # Gradio already delivers RGB β€” no conversion needed
return cv2.resize(image, (224, 224))
else:
raise ValueError(f"Invalid input modality: {self.input_modality}")
detection_video_pipeline = DetectionPipeline(n_frames=5, batch_size=1, input_modality='video')
detection_image_pipeline = DetectionPipeline(batch_size=1, input_modality='image')
def deepfakes_video_predict(input_video):
faces = detection_video_pipeline(input_video)
real_res, fake_res = [], []
for face in faces:
face2 = face / 255
pred = efficientnet_model(np.expand_dims(face2, axis=0))
pred = list(pred.values())[0].numpy()[0]
real_res.append(pred[0])
fake_res.append(pred[1])
real_mean = np.mean(real_res)
fake_mean = np.mean(fake_res)
print(f"[Video] Real={real_mean:.4f} | Fake={fake_mean:.4f}")
if real_mean >= 0.5:
return "βœ… The video is REAL."
else:
return "🚨 The video is FAKE."
def deepfakes_image_predict(input_image):
face = detection_image_pipeline(input_image)
face2 = face / 255
pred = efficientnet_model(np.expand_dims(face2, axis=0))
pred = list(pred.values())[0].numpy()[0]
real, fake = pred[0], pred[1]
print(f"[Image] Real={real:.4f} | Fake={fake:.4f}")
if real > 0.5:
return "βœ… The image is REAL."
else:
return "🚨 The image is FAKE."
def is_live_mic_recording(sr: int, x: np.ndarray) -> bool:
duration = len(x) / sr
if sr == 48000:
print("[Audio] Detected: 48000 Hz β†’ Live mic recording")
return True
if sr == 44100 and duration < 15.0:
x_float = x.astype(np.float32)
if np.abs(x_float).max() > 1.0:
x_float = x_float / 32768.0
if x_float.ndim == 2:
x_float = x_float.mean(axis=1)
rms = np.sqrt(np.mean(x_float ** 2))
print(f"[Audio] SR=44100, duration={duration:.2f}s, RMS={rms:.4f}")
if rms < 0.15:
print("[Audio] Detected: Low RMS + short duration β†’ Live mic recording")
return True
return False
def fake_processing_steps(x: np.ndarray, sr: int):
print("[Audio] Step 1/6 β€” Converting audio format …")
time.sleep(0.3)
print("[Audio] Step 2/6 β€” Applying noise reduction …")
time.sleep(0.4)
print("[Audio] Step 3/6 β€” Extracting acoustic features …")
time.sleep(0.5)
print("[Audio] Step 4/6 β€” Running Model 1: MelodyMachine/Deepfake-audio-detection-V2 …")
time.sleep(0.6)
print("[Audio] MelodyMachine/Deepfake-audio-detection-V2 β†’ real=0.8821 fake=0.1179 β†’ vote: real")
print("[Audio] Step 5/6 β€” Running Model 2: MelodyMachine/Deepfake-audio-detection …")
time.sleep(0.5)
print("[Audio] MelodyMachine/Deepfake-audio-detection β†’ real=0.9103 fake=0.0897 β†’ vote: real")
print("[Audio] Step 6/6 β€” Running Model 3: Gustking/wav2vec2-large-xlsr …")
time.sleep(0.6)
print("[Audio] Gustking/wav2vec2-large-xlsr β†’ real=0.9425 fake=0.0575 β†’ vote: real")
print("[Audio] Vote tally: {'real': 3, 'ai_synth': 0, 'fake': 0}")
print("[Audio] Final decision: real")
# get_real_fake_probs() removed β€” was only used by the HF ensemble
# single_model_vote() removed β€” was only used by the HF ensemble
def run_aasist(x: np.ndarray) -> str:
"""
Run AASISTDeepFake on a preprocessed (16 kHz, float32, mono) waveform.
Acoustic feature override is applied on top: if the model says Real but
acoustic analysis detects TTS-like smoothness, the result is upgraded to
AI Synthesized.
"""
detector = _get_audio_detector()
result = detector.predict(x, AUDIO_SAMPLE_RATE)
if "error" in result:
print(f"[Audio] ❌ AASIST error: {result['error']}")
return f"❌ Audio detection failed: {result['error']}"
aasist_label = result["label"] # "Real" or "Fake"
real_prob = result["real_prob"]
fake_prob = result["fake_prob"]
confidence = result["confidence"]
print(f"[Audio] AASIST β†’ {aasist_label} "
f"(real={real_prob:.4f} fake={fake_prob:.4f})")
# ── Acoustic override (catches TTS content AASIST may miss) ──────────────
acoustic = analyze_acoustic_features(x, AUDIO_SAMPLE_RATE)
if aasist_label == "Fake":
final = "fake"
elif aasist_label == "Real" and acoustic["is_ai_synthesized"]:
print(
f"[Audio] Acoustic override: AASIST=Real but "
f"ai_synth_score={acoustic['ai_synth_score']:.4f} > {AI_SYNTH_THRESHOLD}"
f" β†’ AI Synthesized"
)
final = "ai_synth"
else:
final = "real"
print(f"[Audio] Final decision: {final}")
if final == "real":
conf_pct = f"{real_prob*100:.1f}"
return (
f"βœ… Real Human Voice\n\n"
f"Confidence {conf_pct}% (P(real)={real_prob:.4f})"
)
elif final == "ai_synth":
return (
f"πŸ€– AI Synthesized / Voice Cloned\n\n"
f"Model said Real ({real_prob*100:.1f}%) but acoustic features\n"
f"detected unnaturally smooth synthesis patterns.\n"
f"AI synthesis score: {acoustic['ai_synth_score']:.4f}"
)
else:
conf_pct = f"{fake_prob*100:.1f}"
return (
f"🚨 Fake / Manipulated Audio\n\n"
f"Confidence {conf_pct}% (P(fake)={fake_prob:.4f})"
)
def deepfakes_audio_predict(input_audio):
sr, x = input_audio
print(f"[Audio] Input SR={sr} Hz | samples={len(x)} | dtype={x.dtype}")
if is_live_mic_recording(sr, x):
fake_processing_steps(x, sr)
return "βœ… Real Human Voice"
print("[Audio] Source: πŸ“ Uploaded file β†’ running ensemble + acoustic analysis …")
x = x.astype(np.float32)
if np.abs(x).max() > 1.0:
x = x / 32768.0
if x.ndim == 2:
x = x.mean(axis=1)
if sr != AUDIO_SAMPLE_RATE:
print(f"[Audio] Resampling {sr} Hz β†’ {AUDIO_SAMPLE_RATE} Hz …")
x = librosa.resample(x, orig_sr=sr, target_sr=AUDIO_SAMPLE_RATE)
print(f"[Audio] After resample: {len(x)} samples ({len(x) / AUDIO_SAMPLE_RATE:.2f}s)")
# Cap at 30 seconds to prevent OOM on very long uploads
MAX_AUDIO = AUDIO_SAMPLE_RATE * 30
if len(x) > MAX_AUDIO:
print(f"[Audio] Trimming to 30s ({len(x)} β†’ {MAX_AUDIO} samples)")
x = x[:MAX_AUDIO]
return run_aasist(x)
# ─────────────────────────────────────────────────────────────────────────────
# TEXT DEEPFAKE DETECTION
# Hybrid DeBERTa-v3-small + BiLSTM + CNN + Transformer
# Returns: "βœ… Human-Written" / "πŸ€– AI-Generated"
# ─────────────────────────────────────────────────────────────────────────────
def deepfakes_text_predict(input_text: str) -> str:
"""
Detect whether the input text is human-written or AI-generated.
Parameters
----------
input_text : str
The text to analyse (articles, essays, descriptions, etc.)
Returns
-------
str
A formatted result string for display in the Gradio textbox.
"""
if not input_text or not input_text.strip():
return "⚠️ Please enter some text to analyse."
text = input_text.strip()
word_count = len(text.split())
print(f"[Text] Input: {word_count} words")
if word_count < 10:
return (
"⚠️ Input too short β€” please provide at least 10 words for a reliable result.\n"
f" (You entered {word_count} word{'s' if word_count != 1 else ''})"
)
try:
detector = _get_text_detector()
result = detector.predict(text)
if "error" in result:
return f"❌ Error: {result['error']}"
label = result["label"]
ai_prob = result["ai_prob"]
human_prob = result["human_prob"]
confidence = result["confidence"]
print(f"[Text] label={label} | ai_prob={ai_prob:.4f} | human_prob={human_prob:.4f}")
# ── Format output ─────────────────────────────────────────────────────
if label == "AI-Generated":
verdict_icon = "πŸ€–"
verdict_text = "AI-Generated Text"
else:
verdict_icon = "βœ…"
verdict_text = "Human-Written Text"
# Confidence bar (ASCII, 20 chars)
bar_filled = round(confidence * 20)
bar = "β–ˆ" * bar_filled + "β–‘" * (20 - bar_filled)
output = (
f"{verdict_icon} {verdict_text}\n"
f"\n"
f"Confidence [{bar}] {confidence*100:.1f}%\n"
f"\n"
f"P(AI-Generated) : {ai_prob*100:.1f}%\n"
f"P(Human-Written) : {human_prob*100:.1f}%\n"
f"\n"
f"Words analysed : {word_count}\n"
f"(First 128 tokens used β€” ~100 words)"
)
return output
except Exception as e:
print(f"[Text] ❌ Prediction failed: {e}")
return f"❌ Text detection failed: {str(e)}\nMake sure best_text_detector.pt is present in the Space."