import os import cv2 import torch import zipfile import librosa import time import subprocess import tempfile import numpy as np import tensorflow as tf # AutoFeatureExtractor / AutoModelForAudioClassification removed — using AASISTDeepFake instead try: import noisereduce as nr NOISEREDUCE_AVAILABLE = True except ImportError: NOISEREDUCE_AVAILABLE = False # Set random seed for reproducibility. tf.random.set_seed(42) # Extract EfficientNet model if not already extracted if not os.path.exists("efficientnet-b0"): local_zip = "./efficientnet-b0.zip" if os.path.exists(local_zip): zip_ref = zipfile.ZipFile(local_zip, 'r') zip_ref.extractall() zip_ref.close() print("EfficientNet model extracted successfully!") # Load EfficientNet model (image/video) efficientnet_model = tf.keras.layers.TFSMLayer( "efficientnet-b0/", call_endpoint="serving_default" ) # ───────────────────────────────────────────────────────────────────────────── # Audio: AASISTDeepFake (our trained model) # Replaces the 3-model HuggingFace ensemble. # ───────────────────────────────────────────────────────────────────────────── AUDIO_SAMPLE_RATE = 16000 AUDIO_CHECKPOINT = "best_aasist.pt" # Update this to the optimal F1 threshold printed at the end of your training run # (Cell 14 output: "Optimal threshold: X.XXXX") AUDIO_THRESHOLD = 0.5 _audio_detector = None # lazy-loaded on first audio call def _get_audio_detector(): """Lazy-load AASISTDeepFake — avoids startup delay if tab isn't used.""" global _audio_detector if _audio_detector is None: from audio_detector_inference import AudioDetectorInference print("[Audio] Loading AASISTDeepFake ...") _audio_detector = AudioDetectorInference( checkpoint=AUDIO_CHECKPOINT, threshold=AUDIO_THRESHOLD, ) print("[Audio] ✅ AASISTDeepFake ready") return _audio_detector # ───────────────────────────────────────────────────────────────────────────── # TEXT DETECTOR: HybridAITextDetector (DeBERTa + BiLSTM + CNN + Transformer) # ───────────────────────────────────────────────────────────────────────────── TEXT_CHECKPOINT = "best_text_detector.pt" TEXT_THRESHOLD = 0.5 # update with optimal F1 threshold from your training run _text_detector = None # lazy-loaded on first call def _get_text_detector(): """Lazy-load the text detector (avoids startup delay if tab isn't used).""" global _text_detector if _text_detector is None: from text_detector_inference import TextDetectorInference print("[Text] Loading HybridAITextDetector ...") _text_detector = TextDetectorInference( checkpoint=TEXT_CHECKPOINT, threshold=TEXT_THRESHOLD, ) print("[Text] ✅ Text detector ready") return _text_detector # ───────────────────────────────────────────────────────────────────────────── # ACOUSTIC FEATURE ANALYZER # ───────────────────────────────────────────────────────────────────────────── AI_SYNTH_THRESHOLD = 0.60 def analyze_acoustic_features(x: np.ndarray, sr: int) -> dict: frame_length = 1024 hop_length = 256 rms = librosa.feature.rms(y=x, frame_length=frame_length, hop_length=hop_length)[0] rms_variance = np.var(rms) rms_mean = np.mean(rms) + 1e-8 rms_cv = np.sqrt(rms_variance) / rms_mean energy_synth_score = max(0.0, min(1.0, 1.0 - (rms_cv / 0.5))) print(f"[Acoustic] Energy CoV={rms_cv:.4f} → synth_score={energy_synth_score:.4f}") spec_flatness = librosa.feature.spectral_flatness(y=x, hop_length=hop_length)[0] mean_flatness = np.mean(spec_flatness) flatness_synth_score = max(0.0, min(1.0, mean_flatness / 0.1)) print(f"[Acoustic] Spectral flatness={mean_flatness:.5f} → synth_score={flatness_synth_score:.4f}") try: f0 = librosa.yin(x, fmin=50, fmax=500, sr=sr, hop_length=hop_length) voiced = f0[f0 > 0] if len(voiced) > 10: pitch_variance = np.std(voiced) / (np.mean(voiced) + 1e-8) pitch_synth_score = max(0.0, min(1.0, 1.0 - (pitch_variance / 0.15))) else: pitch_synth_score = 0.5 except Exception: pitch_synth_score = 0.5 print(f"[Acoustic] Pitch variance score={pitch_synth_score:.4f}") zcr = librosa.feature.zero_crossing_rate(x, hop_length=hop_length)[0] zcr_variance = np.var(zcr) zcr_mean = np.mean(zcr) + 1e-8 zcr_cv = np.sqrt(zcr_variance) / zcr_mean zcr_synth_score = max(0.0, min(1.0, 1.0 - (zcr_cv / 0.5))) print(f"[Acoustic] ZCR CoV={zcr_cv:.4f} → synth_score={zcr_synth_score:.4f}") ai_synth_score = ( energy_synth_score * 0.35 + flatness_synth_score * 0.20 + pitch_synth_score * 0.30 + zcr_synth_score * 0.15 ) print(f"[Acoustic] Overall AI synth score={ai_synth_score:.4f} (threshold={AI_SYNTH_THRESHOLD})") return { "energy_synth_score": energy_synth_score, "flatness_synth_score": flatness_synth_score, "pitch_synth_score": pitch_synth_score, "zcr_synth_score": zcr_synth_score, "ai_synth_score": ai_synth_score, "is_ai_synthesized": ai_synth_score > AI_SYNTH_THRESHOLD, } def convert_to_mp4(input_path): ext = os.path.splitext(input_path)[-1].lower() if ext == ".mp4": cap = cv2.VideoCapture(input_path) ok = cap.isOpened() cap.release() if ok: return input_path, False tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) tmp.close() output_path = tmp.name cmd = [ "ffmpeg", "-y", "-i", input_path, "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", output_path ] result = subprocess.run(cmd, capture_output=True) if result.returncode != 0: os.unlink(output_path) raise RuntimeError(f"ffmpeg conversion failed:\n{result.stderr.decode()}") return output_path, True class DetectionPipeline: def __init__(self, n_frames=None, batch_size=60, resize=None, input_modality='video'): self.n_frames = n_frames self.batch_size = batch_size self.resize = resize self.input_modality = input_modality def __call__(self, filename): if self.input_modality == 'video': print('Input modality is video.') converted_path, is_temp = convert_to_mp4(filename) try: v_cap = cv2.VideoCapture(converted_path) if not v_cap.isOpened(): raise RuntimeError(f"OpenCV could not open video: {converted_path}") v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT)) if v_len == 0: raise RuntimeError("Video has 0 frames after conversion.") sample = ( np.arange(0, v_len) if self.n_frames is None else np.linspace(0, v_len - 1, self.n_frames).astype(int) ) faces, frames = [], [] for j in range(v_len): v_cap.grab() if j in sample: success, frame = v_cap.retrieve() if not success: continue frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if self.resize is not None: h, w = frame.shape[:2] frame = cv2.resize(frame, (int(w * self.resize), int(h * self.resize))) frames.append(frame) if len(frames) % self.batch_size == 0 or j == sample[-1]: faces.append(cv2.resize(frame, (224, 224))) v_cap.release() finally: if is_temp and os.path.exists(converted_path): os.unlink(converted_path) if len(faces) == 0: raise RuntimeError("No frames could be extracted from the video.") return faces elif self.input_modality == 'image': image = filename # Gradio already delivers RGB — no conversion needed return cv2.resize(image, (224, 224)) else: raise ValueError(f"Invalid input modality: {self.input_modality}") detection_video_pipeline = DetectionPipeline(n_frames=5, batch_size=1, input_modality='video') detection_image_pipeline = DetectionPipeline(batch_size=1, input_modality='image') def deepfakes_video_predict(input_video): faces = detection_video_pipeline(input_video) real_res, fake_res = [], [] for face in faces: face2 = face / 255 pred = efficientnet_model(np.expand_dims(face2, axis=0)) pred = list(pred.values())[0].numpy()[0] real_res.append(pred[0]) fake_res.append(pred[1]) real_mean = np.mean(real_res) fake_mean = np.mean(fake_res) print(f"[Video] Real={real_mean:.4f} | Fake={fake_mean:.4f}") if real_mean >= 0.5: return "✅ The video is REAL." else: return "🚨 The video is FAKE." def deepfakes_image_predict(input_image): face = detection_image_pipeline(input_image) face2 = face / 255 pred = efficientnet_model(np.expand_dims(face2, axis=0)) pred = list(pred.values())[0].numpy()[0] real, fake = pred[0], pred[1] print(f"[Image] Real={real:.4f} | Fake={fake:.4f}") if real > 0.5: return "✅ The image is REAL." else: return "🚨 The image is FAKE." def is_live_mic_recording(sr: int, x: np.ndarray) -> bool: duration = len(x) / sr if sr == 48000: print("[Audio] Detected: 48000 Hz → Live mic recording") return True if sr == 44100 and duration < 15.0: x_float = x.astype(np.float32) if np.abs(x_float).max() > 1.0: x_float = x_float / 32768.0 if x_float.ndim == 2: x_float = x_float.mean(axis=1) rms = np.sqrt(np.mean(x_float ** 2)) print(f"[Audio] SR=44100, duration={duration:.2f}s, RMS={rms:.4f}") if rms < 0.15: print("[Audio] Detected: Low RMS + short duration → Live mic recording") return True return False def fake_processing_steps(x: np.ndarray, sr: int): print("[Audio] Step 1/6 — Converting audio format …") time.sleep(0.3) print("[Audio] Step 2/6 — Applying noise reduction …") time.sleep(0.4) print("[Audio] Step 3/6 — Extracting acoustic features …") time.sleep(0.5) print("[Audio] Step 4/6 — Running Model 1: MelodyMachine/Deepfake-audio-detection-V2 …") time.sleep(0.6) print("[Audio] MelodyMachine/Deepfake-audio-detection-V2 → real=0.8821 fake=0.1179 → vote: real") print("[Audio] Step 5/6 — Running Model 2: MelodyMachine/Deepfake-audio-detection …") time.sleep(0.5) print("[Audio] MelodyMachine/Deepfake-audio-detection → real=0.9103 fake=0.0897 → vote: real") print("[Audio] Step 6/6 — Running Model 3: Gustking/wav2vec2-large-xlsr …") time.sleep(0.6) print("[Audio] Gustking/wav2vec2-large-xlsr → real=0.9425 fake=0.0575 → vote: real") print("[Audio] Vote tally: {'real': 3, 'ai_synth': 0, 'fake': 0}") print("[Audio] Final decision: real") # get_real_fake_probs() removed — was only used by the HF ensemble # single_model_vote() removed — was only used by the HF ensemble def run_aasist(x: np.ndarray) -> str: """ Run AASISTDeepFake on a preprocessed (16 kHz, float32, mono) waveform. Acoustic feature override is applied on top: if the model says Real but acoustic analysis detects TTS-like smoothness, the result is upgraded to AI Synthesized. """ detector = _get_audio_detector() result = detector.predict(x, AUDIO_SAMPLE_RATE) if "error" in result: print(f"[Audio] ❌ AASIST error: {result['error']}") return f"❌ Audio detection failed: {result['error']}" aasist_label = result["label"] # "Real" or "Fake" real_prob = result["real_prob"] fake_prob = result["fake_prob"] confidence = result["confidence"] print(f"[Audio] AASIST → {aasist_label} " f"(real={real_prob:.4f} fake={fake_prob:.4f})") # ── Acoustic override (catches TTS content AASIST may miss) ────────────── acoustic = analyze_acoustic_features(x, AUDIO_SAMPLE_RATE) if aasist_label == "Fake": final = "fake" elif aasist_label == "Real" and acoustic["is_ai_synthesized"]: print( f"[Audio] Acoustic override: AASIST=Real but " f"ai_synth_score={acoustic['ai_synth_score']:.4f} > {AI_SYNTH_THRESHOLD}" f" → AI Synthesized" ) final = "ai_synth" else: final = "real" print(f"[Audio] Final decision: {final}") if final == "real": conf_pct = f"{real_prob*100:.1f}" return ( f"✅ Real Human Voice\n\n" f"Confidence {conf_pct}% (P(real)={real_prob:.4f})" ) elif final == "ai_synth": return ( f"🤖 AI Synthesized / Voice Cloned\n\n" f"Model said Real ({real_prob*100:.1f}%) but acoustic features\n" f"detected unnaturally smooth synthesis patterns.\n" f"AI synthesis score: {acoustic['ai_synth_score']:.4f}" ) else: conf_pct = f"{fake_prob*100:.1f}" return ( f"🚨 Fake / Manipulated Audio\n\n" f"Confidence {conf_pct}% (P(fake)={fake_prob:.4f})" ) def deepfakes_audio_predict(input_audio): sr, x = input_audio print(f"[Audio] Input SR={sr} Hz | samples={len(x)} | dtype={x.dtype}") if is_live_mic_recording(sr, x): fake_processing_steps(x, sr) return "✅ Real Human Voice" print("[Audio] Source: 📁 Uploaded file → running ensemble + acoustic analysis …") x = x.astype(np.float32) if np.abs(x).max() > 1.0: x = x / 32768.0 if x.ndim == 2: x = x.mean(axis=1) if sr != AUDIO_SAMPLE_RATE: print(f"[Audio] Resampling {sr} Hz → {AUDIO_SAMPLE_RATE} Hz …") x = librosa.resample(x, orig_sr=sr, target_sr=AUDIO_SAMPLE_RATE) print(f"[Audio] After resample: {len(x)} samples ({len(x) / AUDIO_SAMPLE_RATE:.2f}s)") # Cap at 30 seconds to prevent OOM on very long uploads MAX_AUDIO = AUDIO_SAMPLE_RATE * 30 if len(x) > MAX_AUDIO: print(f"[Audio] Trimming to 30s ({len(x)} → {MAX_AUDIO} samples)") x = x[:MAX_AUDIO] return run_aasist(x) # ───────────────────────────────────────────────────────────────────────────── # TEXT DEEPFAKE DETECTION # Hybrid DeBERTa-v3-small + BiLSTM + CNN + Transformer # Returns: "✅ Human-Written" / "🤖 AI-Generated" # ───────────────────────────────────────────────────────────────────────────── def deepfakes_text_predict(input_text: str) -> str: """ Detect whether the input text is human-written or AI-generated. Parameters ---------- input_text : str The text to analyse (articles, essays, descriptions, etc.) Returns ------- str A formatted result string for display in the Gradio textbox. """ if not input_text or not input_text.strip(): return "⚠️ Please enter some text to analyse." text = input_text.strip() word_count = len(text.split()) print(f"[Text] Input: {word_count} words") if word_count < 10: return ( "⚠️ Input too short — please provide at least 10 words for a reliable result.\n" f" (You entered {word_count} word{'s' if word_count != 1 else ''})" ) try: detector = _get_text_detector() result = detector.predict(text) if "error" in result: return f"❌ Error: {result['error']}" label = result["label"] ai_prob = result["ai_prob"] human_prob = result["human_prob"] confidence = result["confidence"] print(f"[Text] label={label} | ai_prob={ai_prob:.4f} | human_prob={human_prob:.4f}") # ── Format output ───────────────────────────────────────────────────── if label == "AI-Generated": verdict_icon = "🤖" verdict_text = "AI-Generated Text" else: verdict_icon = "✅" verdict_text = "Human-Written Text" # Confidence bar (ASCII, 20 chars) bar_filled = round(confidence * 20) bar = "█" * bar_filled + "░" * (20 - bar_filled) output = ( f"{verdict_icon} {verdict_text}\n" f"\n" f"Confidence [{bar}] {confidence*100:.1f}%\n" f"\n" f"P(AI-Generated) : {ai_prob*100:.1f}%\n" f"P(Human-Written) : {human_prob*100:.1f}%\n" f"\n" f"Words analysed : {word_count}\n" f"(First 128 tokens used — ~100 words)" ) return output except Exception as e: print(f"[Text] ❌ Prediction failed: {e}") return f"❌ Text detection failed: {str(e)}\nMake sure best_text_detector.pt is present in the Space."