| """
|
| Audio Forensics Analyzer for deepfake detection.
|
|
|
| Extracts low-level audio features that help distinguish
|
| AI-generated speech from human speech.
|
| """
|
|
|
| import numpy as np
|
| from scipy import signal
|
| from scipy.fft import fft
|
|
|
| from app.utils.logger import get_logger
|
|
|
| logger = get_logger(__name__)
|
|
|
|
|
| class AudioForensicsAnalyzer:
|
| """
|
| Forensic analysis of audio to detect AI-generated patterns.
|
|
|
| analyzes spectral characteristics, pitch stability, and silence patterns
|
| to identify artifacts typical of neural vocoders.
|
| """
|
|
|
| def __init__(self, sample_rate: int = 16000):
|
| """Initialize analyzer with sample rate."""
|
| self.sample_rate = sample_rate
|
|
|
| def analyze(self, audio_array: np.ndarray) -> dict:
|
| """
|
| Perform comprehensive forensic analysis on audio.
|
|
|
| Args:
|
| audio_array: Normalized audio samples (16kHz, mono)
|
|
|
| Returns:
|
| Dictionary with forensic metrics and AI likelihood indicators
|
| """
|
| logger.debug("Starting forensic audio analysis")
|
|
|
|
|
| spectral = self._analyze_spectral(audio_array)
|
| temporal = self._analyze_temporal(audio_array)
|
| pitch = self._analyze_pitch_stability(audio_array)
|
| energy = self._analyze_energy_patterns(audio_array)
|
|
|
|
|
| forensics = {
|
| "spectral": spectral,
|
| "temporal": temporal,
|
| "pitch": pitch,
|
| "energy": energy,
|
| "ai_indicators": self._compute_ai_indicators(spectral, temporal, pitch, energy),
|
| }
|
|
|
| logger.debug("Forensic analysis complete", indicators=forensics["ai_indicators"])
|
| return forensics
|
|
|
| def _analyze_spectral(self, audio: np.ndarray) -> dict:
|
| """Analyze spectral characteristics."""
|
|
|
| n = len(audio)
|
| fft_vals = np.abs(fft(audio))[:n // 2]
|
| freqs = np.fft.fftfreq(n, 1 / self.sample_rate)[:n // 2]
|
|
|
|
|
| spectral_centroid = np.sum(freqs * fft_vals) / (np.sum(fft_vals) + 1e-10)
|
|
|
|
|
|
|
| geometric_mean = np.exp(np.mean(np.log(fft_vals + 1e-10)))
|
| arithmetic_mean = np.mean(fft_vals) + 1e-10
|
| spectral_flatness = geometric_mean / arithmetic_mean
|
|
|
|
|
| cumsum = np.cumsum(fft_vals)
|
| rolloff_idx = np.searchsorted(cumsum, 0.85 * cumsum[-1])
|
| spectral_rolloff = freqs[min(rolloff_idx, len(freqs) - 1)]
|
|
|
|
|
| spectral_bandwidth = np.sqrt(
|
| np.sum(((freqs - spectral_centroid) ** 2) * fft_vals) / (np.sum(fft_vals) + 1e-10)
|
| )
|
|
|
| return {
|
| "centroid_hz": round(float(spectral_centroid), 2),
|
| "flatness": round(float(spectral_flatness), 4),
|
| "rolloff_hz": round(float(spectral_rolloff), 2),
|
| "bandwidth_hz": round(float(spectral_bandwidth), 2),
|
| }
|
|
|
| def _analyze_temporal(self, audio: np.ndarray) -> dict:
|
| """Analyze temporal characteristics."""
|
|
|
| zero_crossings = np.sum(np.abs(np.diff(np.sign(audio)))) / 2
|
| zcr = zero_crossings / len(audio)
|
|
|
|
|
| rms = np.sqrt(np.mean(audio ** 2))
|
|
|
|
|
| frame_size = int(0.025 * self.sample_rate)
|
| hop_size = int(0.010 * self.sample_rate)
|
|
|
| energies = []
|
| for i in range(0, len(audio) - frame_size, hop_size):
|
| frame = audio[i:i + frame_size]
|
| energies.append(np.sum(frame ** 2))
|
|
|
| energy_variance = np.var(energies) if energies else 0
|
|
|
|
|
| silence_threshold = 0.01 * np.max(np.abs(audio))
|
| silence_samples = np.sum(np.abs(audio) < silence_threshold)
|
| silence_ratio = silence_samples / len(audio)
|
|
|
| return {
|
| "zero_crossing_rate": round(float(zcr), 6),
|
| "rms_energy": round(float(rms), 6),
|
| "energy_variance": round(float(energy_variance), 8),
|
| "silence_ratio": round(float(silence_ratio), 4),
|
| }
|
|
|
| def _analyze_pitch_stability(self, audio: np.ndarray) -> dict:
|
| """
|
| Analyze pitch stability.
|
|
|
| AI-generated speech often has unnaturally stable pitch.
|
| Humans have natural pitch variations (jitter).
|
| """
|
|
|
| frame_size = int(0.030 * self.sample_rate)
|
| hop_size = int(0.010 * self.sample_rate)
|
|
|
| pitches = []
|
| for i in range(0, len(audio) - frame_size, hop_size):
|
| frame = audio[i:i + frame_size]
|
|
|
|
|
| corr = np.correlate(frame, frame, mode='full')
|
| corr = corr[len(corr) // 2:]
|
|
|
|
|
| d = np.diff(corr)
|
| start = np.where(d > 0)[0]
|
|
|
| if len(start) > 0:
|
| start = start[0]
|
| peak = np.argmax(corr[start:]) + start
|
| if peak > 0 and corr[peak] > 0.3 * corr[0]:
|
| pitch = self.sample_rate / peak
|
| if 50 < pitch < 500:
|
| pitches.append(pitch)
|
|
|
| if len(pitches) < 2:
|
| return {
|
| "mean_pitch_hz": 0,
|
| "pitch_std": 0,
|
| "pitch_stability": 1.0,
|
| "jitter": 0,
|
| }
|
|
|
| pitches = np.array(pitches)
|
| mean_pitch = np.mean(pitches)
|
| pitch_std = np.std(pitches)
|
|
|
|
|
| pitch_stability = 1.0 / (1.0 + pitch_std / (mean_pitch + 1e-10))
|
|
|
|
|
| jitter = np.mean(np.abs(np.diff(pitches))) / (mean_pitch + 1e-10)
|
|
|
| return {
|
| "mean_pitch_hz": round(float(mean_pitch), 2),
|
| "pitch_std": round(float(pitch_std), 4),
|
| "pitch_stability": round(float(pitch_stability), 4),
|
| "jitter": round(float(jitter), 6),
|
| }
|
|
|
| def _analyze_energy_patterns(self, audio: np.ndarray) -> dict:
|
| """Analyze energy envelope patterns."""
|
|
|
| analytic_signal = signal.hilbert(audio)
|
| envelope = np.abs(analytic_signal)
|
|
|
|
|
| envelope_diff = np.abs(np.diff(envelope))
|
| envelope_roughness = np.mean(envelope_diff)
|
|
|
|
|
|
|
| peaks, _ = signal.find_peaks(envelope, height=0.1 * np.max(envelope))
|
|
|
| if len(peaks) > 1:
|
|
|
| peak_heights = envelope[peaks]
|
| peak_consistency = 1.0 - (np.std(peak_heights) / (np.mean(peak_heights) + 1e-10))
|
| else:
|
| peak_consistency = 0.5
|
|
|
| return {
|
| "envelope_roughness": round(float(envelope_roughness), 6),
|
| "peak_consistency": round(float(peak_consistency), 4),
|
| "dynamic_range": round(float(np.max(envelope) - np.min(envelope)), 4),
|
| }
|
|
|
| def _compute_ai_indicators(
|
| self,
|
| spectral: dict,
|
| temporal: dict,
|
| pitch: dict,
|
| energy: dict,
|
| ) -> dict:
|
| """
|
| Compute features indicating AI generation (Tuned for modern TTS).
|
|
|
| Modern AI (ElevenLabs etc) adds simulated breaths and jitter, so we must
|
| be more sensitive to 'slightly too perfect' signals.
|
| """
|
| indicators = {}
|
|
|
|
|
|
|
| pitch_stability = pitch.get("pitch_stability", 0.5)
|
|
|
|
|
| indicators["pitch_regularity"] = min(1.0, pitch_stability / 0.75)
|
|
|
|
|
|
|
| jitter = pitch.get("jitter", 0.02)
|
|
|
| indicators["low_jitter"] = max(0.0, 1.0 - (jitter / 0.025))
|
|
|
|
|
|
|
| roughness = energy.get("envelope_roughness", 0.01)
|
|
|
| indicators["smooth_envelope"] = max(0.0, 1.0 - (roughness / 0.03))
|
|
|
|
|
|
|
| zcr = temporal.get("zero_crossing_rate", 0.1)
|
|
|
| indicators["unnatural_silence"] = 1.0 if zcr < 0.01 else 0.0
|
|
|
|
|
| peak_consistency = energy.get("peak_consistency", 0.5)
|
| indicators["energy_consistency"] = peak_consistency if peak_consistency > 0.8 else 0.0
|
|
|
|
|
|
|
|
|
| scores = [
|
| indicators["pitch_regularity"] * 1.2,
|
| indicators["low_jitter"] * 1.0,
|
| indicators["smooth_envelope"] * 0.8,
|
| indicators["unnatural_silence"] * 0.5,
|
| indicators["energy_consistency"] * 0.6
|
| ]
|
|
|
|
|
|
|
| strongest_signal = max(scores)
|
| average_signal = sum(scores) / len(scores)
|
|
|
|
|
| combined_likelihood = (strongest_signal * 0.7) + (average_signal * 0.3)
|
|
|
| indicators["combined_ai_likelihood"] = min(1.0, combined_likelihood)
|
|
|
| return indicators
|
|
|
| def get_explanation_factors(self, forensics: dict, classification: str = None) -> list[str]:
|
| """
|
| Get human-readable factors that contributed to detection.
|
|
|
| Args:
|
| forensics: Forensics analysis data
|
| classification: The final classification (AI_GENERATED or HUMAN)
|
|
|
| Returns list of detected indicators in plain English.
|
| """
|
| factors = []
|
| indicators = forensics.get("ai_indicators", {})
|
| ai_likelihood = indicators.get("combined_ai_likelihood", 0.5)
|
|
|
|
|
| if classification == "AI_GENERATED":
|
|
|
| if indicators.get("pitch_regularity", 0) > 0.4:
|
| factors.append("unnaturally consistent pitch patterns")
|
| if indicators.get("low_jitter", 0) > 0.4:
|
| factors.append("absence of natural voice micro-variations")
|
| if indicators.get("energy_consistency", 0) > 0.4:
|
| factors.append("mechanical energy envelope patterns")
|
| if indicators.get("smooth_envelope", 0) > 0.4:
|
| factors.append("artificially smooth amplitude transitions")
|
| if indicators.get("unnatural_silence", 0) > 0.3:
|
| factors.append("irregular silence patterns")
|
|
|
|
|
| if not factors:
|
| factors.append("subtle synthetic audio artifacts")
|
|
|
| else:
|
| if forensics["pitch"]["jitter"] > 0.015:
|
| factors.append("natural pitch variations")
|
| if forensics["energy"]["envelope_roughness"] > 0.015:
|
| factors.append("organic voice texture")
|
| if 0.05 < forensics["temporal"]["silence_ratio"] < 0.25:
|
| factors.append("natural breathing patterns")
|
|
|
| if not factors:
|
| factors.append("natural human voice characteristics")
|
|
|
| return factors if factors else ["voice characteristics analyzed"]
|
|
|
|
|