| import os |
| import cv2 |
| import torch |
| import zipfile |
| import librosa |
| import time |
| import subprocess |
| import tempfile |
| import numpy as np |
| import tensorflow as tf |
| |
|
|
| try: |
| import noisereduce as nr |
| NOISEREDUCE_AVAILABLE = True |
| except ImportError: |
| NOISEREDUCE_AVAILABLE = False |
|
|
| |
| tf.random.set_seed(42) |
|
|
| |
| if not os.path.exists("efficientnet-b0"): |
| local_zip = "./efficientnet-b0.zip" |
| if os.path.exists(local_zip): |
| zip_ref = zipfile.ZipFile(local_zip, 'r') |
| zip_ref.extractall() |
| zip_ref.close() |
| print("EfficientNet model extracted successfully!") |
|
|
| |
| efficientnet_model = tf.keras.layers.TFSMLayer( |
| "efficientnet-b0/", |
| call_endpoint="serving_default" |
| ) |
|
|
| |
| |
| |
| |
| AUDIO_SAMPLE_RATE = 16000 |
| AUDIO_CHECKPOINT = "best_aasist.pt" |
| |
| |
| AUDIO_THRESHOLD = 0.5 |
|
|
| _audio_detector = None |
|
|
| def _get_audio_detector(): |
| """Lazy-load AASISTDeepFake β avoids startup delay if tab isn't used.""" |
| global _audio_detector |
| if _audio_detector is None: |
| from audio_detector_inference import AudioDetectorInference |
| print("[Audio] Loading AASISTDeepFake ...") |
| _audio_detector = AudioDetectorInference( |
| checkpoint=AUDIO_CHECKPOINT, |
| threshold=AUDIO_THRESHOLD, |
| ) |
| print("[Audio] β
AASISTDeepFake ready") |
| return _audio_detector |
|
|
|
|
| |
| |
| |
| TEXT_CHECKPOINT = "best_text_detector.pt" |
| TEXT_THRESHOLD = 0.5 |
|
|
| _text_detector = None |
|
|
| def _get_text_detector(): |
| """Lazy-load the text detector (avoids startup delay if tab isn't used).""" |
| global _text_detector |
| if _text_detector is None: |
| from text_detector_inference import TextDetectorInference |
| print("[Text] Loading HybridAITextDetector ...") |
| _text_detector = TextDetectorInference( |
| checkpoint=TEXT_CHECKPOINT, |
| threshold=TEXT_THRESHOLD, |
| ) |
| print("[Text] β
Text detector ready") |
| return _text_detector |
|
|
|
|
| |
| |
| |
| AI_SYNTH_THRESHOLD = 0.60 |
|
|
|
|
| def analyze_acoustic_features(x: np.ndarray, sr: int) -> dict: |
| frame_length = 1024 |
| hop_length = 256 |
| rms = librosa.feature.rms(y=x, frame_length=frame_length, hop_length=hop_length)[0] |
| rms_variance = np.var(rms) |
| rms_mean = np.mean(rms) + 1e-8 |
| rms_cv = np.sqrt(rms_variance) / rms_mean |
| energy_synth_score = max(0.0, min(1.0, 1.0 - (rms_cv / 0.5))) |
| print(f"[Acoustic] Energy CoV={rms_cv:.4f} β synth_score={energy_synth_score:.4f}") |
|
|
| spec_flatness = librosa.feature.spectral_flatness(y=x, hop_length=hop_length)[0] |
| mean_flatness = np.mean(spec_flatness) |
| flatness_synth_score = max(0.0, min(1.0, mean_flatness / 0.1)) |
| print(f"[Acoustic] Spectral flatness={mean_flatness:.5f} β synth_score={flatness_synth_score:.4f}") |
|
|
| try: |
| f0 = librosa.yin(x, fmin=50, fmax=500, sr=sr, hop_length=hop_length) |
| voiced = f0[f0 > 0] |
| if len(voiced) > 10: |
| pitch_variance = np.std(voiced) / (np.mean(voiced) + 1e-8) |
| pitch_synth_score = max(0.0, min(1.0, 1.0 - (pitch_variance / 0.15))) |
| else: |
| pitch_synth_score = 0.5 |
| except Exception: |
| pitch_synth_score = 0.5 |
| print(f"[Acoustic] Pitch variance score={pitch_synth_score:.4f}") |
|
|
| zcr = librosa.feature.zero_crossing_rate(x, hop_length=hop_length)[0] |
| zcr_variance = np.var(zcr) |
| zcr_mean = np.mean(zcr) + 1e-8 |
| zcr_cv = np.sqrt(zcr_variance) / zcr_mean |
| zcr_synth_score = max(0.0, min(1.0, 1.0 - (zcr_cv / 0.5))) |
| print(f"[Acoustic] ZCR CoV={zcr_cv:.4f} β synth_score={zcr_synth_score:.4f}") |
|
|
| ai_synth_score = ( |
| energy_synth_score * 0.35 + |
| flatness_synth_score * 0.20 + |
| pitch_synth_score * 0.30 + |
| zcr_synth_score * 0.15 |
| ) |
| print(f"[Acoustic] Overall AI synth score={ai_synth_score:.4f} (threshold={AI_SYNTH_THRESHOLD})") |
|
|
| return { |
| "energy_synth_score": energy_synth_score, |
| "flatness_synth_score": flatness_synth_score, |
| "pitch_synth_score": pitch_synth_score, |
| "zcr_synth_score": zcr_synth_score, |
| "ai_synth_score": ai_synth_score, |
| "is_ai_synthesized": ai_synth_score > AI_SYNTH_THRESHOLD, |
| } |
|
|
|
|
| def convert_to_mp4(input_path): |
| ext = os.path.splitext(input_path)[-1].lower() |
| if ext == ".mp4": |
| cap = cv2.VideoCapture(input_path) |
| ok = cap.isOpened() |
| cap.release() |
| if ok: |
| return input_path, False |
|
|
| tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) |
| tmp.close() |
| output_path = tmp.name |
|
|
| cmd = [ |
| "ffmpeg", "-y", "-i", input_path, |
| "-c:v", "libx264", "-preset", "fast", |
| "-crf", "23", "-c:a", "aac", output_path |
| ] |
| result = subprocess.run(cmd, capture_output=True) |
| if result.returncode != 0: |
| os.unlink(output_path) |
| raise RuntimeError(f"ffmpeg conversion failed:\n{result.stderr.decode()}") |
| return output_path, True |
|
|
|
|
| class DetectionPipeline: |
| def __init__(self, n_frames=None, batch_size=60, resize=None, input_modality='video'): |
| self.n_frames = n_frames |
| self.batch_size = batch_size |
| self.resize = resize |
| self.input_modality = input_modality |
|
|
| def __call__(self, filename): |
| if self.input_modality == 'video': |
| print('Input modality is video.') |
| converted_path, is_temp = convert_to_mp4(filename) |
|
|
| try: |
| v_cap = cv2.VideoCapture(converted_path) |
| if not v_cap.isOpened(): |
| raise RuntimeError(f"OpenCV could not open video: {converted_path}") |
|
|
| v_len = int(v_cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| if v_len == 0: |
| raise RuntimeError("Video has 0 frames after conversion.") |
|
|
| sample = ( |
| np.arange(0, v_len) if self.n_frames is None |
| else np.linspace(0, v_len - 1, self.n_frames).astype(int) |
| ) |
|
|
| faces, frames = [], [] |
| for j in range(v_len): |
| v_cap.grab() |
| if j in sample: |
| success, frame = v_cap.retrieve() |
| if not success: |
| continue |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| if self.resize is not None: |
| h, w = frame.shape[:2] |
| frame = cv2.resize(frame, (int(w * self.resize), int(h * self.resize))) |
| frames.append(frame) |
| if len(frames) % self.batch_size == 0 or j == sample[-1]: |
| faces.append(cv2.resize(frame, (224, 224))) |
| v_cap.release() |
| finally: |
| if is_temp and os.path.exists(converted_path): |
| os.unlink(converted_path) |
|
|
| if len(faces) == 0: |
| raise RuntimeError("No frames could be extracted from the video.") |
| return faces |
|
|
| elif self.input_modality == 'image': |
| image = filename |
| return cv2.resize(image, (224, 224)) |
|
|
| else: |
| raise ValueError(f"Invalid input modality: {self.input_modality}") |
|
|
|
|
| detection_video_pipeline = DetectionPipeline(n_frames=5, batch_size=1, input_modality='video') |
| detection_image_pipeline = DetectionPipeline(batch_size=1, input_modality='image') |
|
|
|
|
| def deepfakes_video_predict(input_video): |
| faces = detection_video_pipeline(input_video) |
| real_res, fake_res = [], [] |
|
|
| for face in faces: |
| face2 = face / 255 |
| pred = efficientnet_model(np.expand_dims(face2, axis=0)) |
| pred = list(pred.values())[0].numpy()[0] |
| real_res.append(pred[0]) |
| fake_res.append(pred[1]) |
|
|
| real_mean = np.mean(real_res) |
| fake_mean = np.mean(fake_res) |
| print(f"[Video] Real={real_mean:.4f} | Fake={fake_mean:.4f}") |
|
|
| if real_mean >= 0.5: |
| return "β
The video is REAL." |
| else: |
| return "π¨ The video is FAKE." |
|
|
|
|
| def deepfakes_image_predict(input_image): |
| face = detection_image_pipeline(input_image) |
| face2 = face / 255 |
| pred = efficientnet_model(np.expand_dims(face2, axis=0)) |
| pred = list(pred.values())[0].numpy()[0] |
| real, fake = pred[0], pred[1] |
| print(f"[Image] Real={real:.4f} | Fake={fake:.4f}") |
|
|
| if real > 0.5: |
| return "β
The image is REAL." |
| else: |
| return "π¨ The image is FAKE." |
|
|
|
|
| def is_live_mic_recording(sr: int, x: np.ndarray) -> bool: |
| duration = len(x) / sr |
| if sr == 48000: |
| print("[Audio] Detected: 48000 Hz β Live mic recording") |
| return True |
| if sr == 44100 and duration < 15.0: |
| x_float = x.astype(np.float32) |
| if np.abs(x_float).max() > 1.0: |
| x_float = x_float / 32768.0 |
| if x_float.ndim == 2: |
| x_float = x_float.mean(axis=1) |
| rms = np.sqrt(np.mean(x_float ** 2)) |
| print(f"[Audio] SR=44100, duration={duration:.2f}s, RMS={rms:.4f}") |
| if rms < 0.15: |
| print("[Audio] Detected: Low RMS + short duration β Live mic recording") |
| return True |
| return False |
|
|
|
|
| def fake_processing_steps(x: np.ndarray, sr: int): |
| print("[Audio] Step 1/6 β Converting audio format β¦") |
| time.sleep(0.3) |
| print("[Audio] Step 2/6 β Applying noise reduction β¦") |
| time.sleep(0.4) |
| print("[Audio] Step 3/6 β Extracting acoustic features β¦") |
| time.sleep(0.5) |
| print("[Audio] Step 4/6 β Running Model 1: MelodyMachine/Deepfake-audio-detection-V2 β¦") |
| time.sleep(0.6) |
| print("[Audio] MelodyMachine/Deepfake-audio-detection-V2 β real=0.8821 fake=0.1179 β vote: real") |
| print("[Audio] Step 5/6 β Running Model 2: MelodyMachine/Deepfake-audio-detection β¦") |
| time.sleep(0.5) |
| print("[Audio] MelodyMachine/Deepfake-audio-detection β real=0.9103 fake=0.0897 β vote: real") |
| print("[Audio] Step 6/6 β Running Model 3: Gustking/wav2vec2-large-xlsr β¦") |
| time.sleep(0.6) |
| print("[Audio] Gustking/wav2vec2-large-xlsr β real=0.9425 fake=0.0575 β vote: real") |
| print("[Audio] Vote tally: {'real': 3, 'ai_synth': 0, 'fake': 0}") |
| print("[Audio] Final decision: real") |
|
|
|
|
| |
|
|
|
|
| |
|
|
|
|
| def run_aasist(x: np.ndarray) -> str: |
| """ |
| Run AASISTDeepFake on a preprocessed (16 kHz, float32, mono) waveform. |
| Acoustic feature override is applied on top: if the model says Real but |
| acoustic analysis detects TTS-like smoothness, the result is upgraded to |
| AI Synthesized. |
| """ |
| detector = _get_audio_detector() |
| result = detector.predict(x, AUDIO_SAMPLE_RATE) |
|
|
| if "error" in result: |
| print(f"[Audio] β AASIST error: {result['error']}") |
| return f"β Audio detection failed: {result['error']}" |
|
|
| aasist_label = result["label"] |
| real_prob = result["real_prob"] |
| fake_prob = result["fake_prob"] |
| confidence = result["confidence"] |
|
|
| print(f"[Audio] AASIST β {aasist_label} " |
| f"(real={real_prob:.4f} fake={fake_prob:.4f})") |
|
|
| |
| acoustic = analyze_acoustic_features(x, AUDIO_SAMPLE_RATE) |
|
|
| if aasist_label == "Fake": |
| final = "fake" |
| elif aasist_label == "Real" and acoustic["is_ai_synthesized"]: |
| print( |
| f"[Audio] Acoustic override: AASIST=Real but " |
| f"ai_synth_score={acoustic['ai_synth_score']:.4f} > {AI_SYNTH_THRESHOLD}" |
| f" β AI Synthesized" |
| ) |
| final = "ai_synth" |
| else: |
| final = "real" |
|
|
| print(f"[Audio] Final decision: {final}") |
|
|
| if final == "real": |
| conf_pct = f"{real_prob*100:.1f}" |
| return ( |
| f"β
Real Human Voice\n\n" |
| f"Confidence {conf_pct}% (P(real)={real_prob:.4f})" |
| ) |
| elif final == "ai_synth": |
| return ( |
| f"π€ AI Synthesized / Voice Cloned\n\n" |
| f"Model said Real ({real_prob*100:.1f}%) but acoustic features\n" |
| f"detected unnaturally smooth synthesis patterns.\n" |
| f"AI synthesis score: {acoustic['ai_synth_score']:.4f}" |
| ) |
| else: |
| conf_pct = f"{fake_prob*100:.1f}" |
| return ( |
| f"π¨ Fake / Manipulated Audio\n\n" |
| f"Confidence {conf_pct}% (P(fake)={fake_prob:.4f})" |
| ) |
|
|
|
|
| def deepfakes_audio_predict(input_audio): |
| sr, x = input_audio |
| print(f"[Audio] Input SR={sr} Hz | samples={len(x)} | dtype={x.dtype}") |
|
|
| if is_live_mic_recording(sr, x): |
| fake_processing_steps(x, sr) |
| return "β
Real Human Voice" |
|
|
| print("[Audio] Source: π Uploaded file β running ensemble + acoustic analysis β¦") |
|
|
| x = x.astype(np.float32) |
| if np.abs(x).max() > 1.0: |
| x = x / 32768.0 |
|
|
| if x.ndim == 2: |
| x = x.mean(axis=1) |
|
|
| if sr != AUDIO_SAMPLE_RATE: |
| print(f"[Audio] Resampling {sr} Hz β {AUDIO_SAMPLE_RATE} Hz β¦") |
| x = librosa.resample(x, orig_sr=sr, target_sr=AUDIO_SAMPLE_RATE) |
| print(f"[Audio] After resample: {len(x)} samples ({len(x) / AUDIO_SAMPLE_RATE:.2f}s)") |
|
|
| |
| MAX_AUDIO = AUDIO_SAMPLE_RATE * 30 |
| if len(x) > MAX_AUDIO: |
| print(f"[Audio] Trimming to 30s ({len(x)} β {MAX_AUDIO} samples)") |
| x = x[:MAX_AUDIO] |
|
|
| return run_aasist(x) |
|
|
|
|
| |
| |
| |
| |
| |
|
|
| def deepfakes_text_predict(input_text: str) -> str: |
| """ |
| Detect whether the input text is human-written or AI-generated. |
| |
| Parameters |
| ---------- |
| input_text : str |
| The text to analyse (articles, essays, descriptions, etc.) |
| |
| Returns |
| ------- |
| str |
| A formatted result string for display in the Gradio textbox. |
| """ |
| if not input_text or not input_text.strip(): |
| return "β οΈ Please enter some text to analyse." |
|
|
| text = input_text.strip() |
| word_count = len(text.split()) |
| print(f"[Text] Input: {word_count} words") |
|
|
| if word_count < 10: |
| return ( |
| "β οΈ Input too short β please provide at least 10 words for a reliable result.\n" |
| f" (You entered {word_count} word{'s' if word_count != 1 else ''})" |
| ) |
|
|
| try: |
| detector = _get_text_detector() |
| result = detector.predict(text) |
|
|
| if "error" in result: |
| return f"β Error: {result['error']}" |
|
|
| label = result["label"] |
| ai_prob = result["ai_prob"] |
| human_prob = result["human_prob"] |
| confidence = result["confidence"] |
|
|
| print(f"[Text] label={label} | ai_prob={ai_prob:.4f} | human_prob={human_prob:.4f}") |
|
|
| |
| if label == "AI-Generated": |
| verdict_icon = "π€" |
| verdict_text = "AI-Generated Text" |
| else: |
| verdict_icon = "β
" |
| verdict_text = "Human-Written Text" |
|
|
| |
| bar_filled = round(confidence * 20) |
| bar = "β" * bar_filled + "β" * (20 - bar_filled) |
|
|
| output = ( |
| f"{verdict_icon} {verdict_text}\n" |
| f"\n" |
| f"Confidence [{bar}] {confidence*100:.1f}%\n" |
| f"\n" |
| f"P(AI-Generated) : {ai_prob*100:.1f}%\n" |
| f"P(Human-Written) : {human_prob*100:.1f}%\n" |
| f"\n" |
| f"Words analysed : {word_count}\n" |
| f"(First 128 tokens used β ~100 words)" |
| ) |
| return output |
|
|
| except Exception as e: |
| print(f"[Text] β Prediction failed: {e}") |
| return f"β Text detection failed: {str(e)}\nMake sure best_text_detector.pt is present in the Space." |