voice-detection-api / app /core /forensics.py
vineetshukla.work@gmail.com
final commit
c5c9261
"""
Forensic Analyzers — Four independent analysis engines that examine audio
for specific signatures of AI generation vs natural human speech.
Each analyzer returns a score (0=human, 1=AI) and a list of detected artifacts.
The final detection fuses all analyzer scores for maximum accuracy.
"""
import numpy as np
import librosa
import logging
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import concurrent.futures
logger = logging.getLogger(__name__)
@dataclass
class AnalyzerResult:
"""Result from a single forensic analyzer."""
name: str
score: float # 0.0 = definitely human, 1.0 = definitely AI
verdict: str # "HUMAN" or "AI_GENERATED"
artifacts_found: List[str] = field(default_factory=list)
details: Dict[str, Any] = field(default_factory=dict)
@dataclass
class AudioProfile:
"""Technical profile of the audio sample."""
duration_sec: float = 0.0
snr_db: float = 0.0
clipping_detected: bool = False
silence_ratio: float = 0.0
rms_energy: float = 0.0
sample_rate: int = 16000
num_segments: int = 1
# ===============================================================
# Spectral Analyzer
# ===============================================================
class SpectralAnalyzer:
"""
Detects AI signatures in the frequency domain:
- Unnaturally smooth spectral envelope
- Missing or artificial harmonics
- Sharp frequency cutoffs (vocoder artifacts)
- Abnormal spectral flatness
"""
def analyze(self, y: np.ndarray, sr: int) -> AnalyzerResult:
artifacts = []
details = {}
try:
# 1. Spectral Flatness — AI speech tends to have lower flatness (more tonal)
flatness = librosa.feature.spectral_flatness(y=y)[0]
mean_flatness = float(np.mean(flatness))
std_flatness = float(np.std(flatness))
details["spectral_flatness_mean"] = round(mean_flatness, 4)
details["spectral_flatness_std"] = round(std_flatness, 4)
# Human speech has higher variance in spectral flatness
if std_flatness < 0.02:
artifacts.append("unnaturally_uniform_spectral_texture")
if mean_flatness < 0.005:
artifacts.append("overly_tonal_spectrum")
# 2. Spectral Bandwidth — AI audio often has narrower bandwidth
bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)[0]
mean_bw = float(np.mean(bandwidth))
std_bw = float(np.std(bandwidth))
details["spectral_bandwidth_mean"] = round(mean_bw, 1)
details["spectral_bandwidth_std"] = round(std_bw, 1)
if std_bw < 200:
artifacts.append("unnaturally_consistent_bandwidth")
# 3. Spectral Centroid Variance — AI speech has more stable centroid
centroid = librosa.feature.spectral_centroid(y=y, sr=sr)[0]
centroid_cv = float(np.std(centroid) / (np.mean(centroid) + 1e-10))
details["spectral_centroid_cv"] = round(centroid_cv, 4)
if centroid_cv < 0.15:
artifacts.append("unnaturally_stable_spectral_centroid")
# Optimization: Removed expensive HPSS and full STFT
# Score: more artifacts = more likely AI
score = min(1.0, len(artifacts) * 0.3)
except Exception as e:
logger.warning(f"SpectralAnalyzer error: {e}")
score = 0.5
artifacts = []
details["error"] = str(e)
return AnalyzerResult(
name="spectral_analysis",
score=round(score, 4),
verdict="AI_GENERATED" if score >= 0.5 else "HUMAN",
artifacts_found=artifacts,
details=details,
)
# ===============================================================
# Temporal Analyzer
# ===============================================================
class TemporalAnalyzer:
"""
Detects AI signatures in the time domain:
- Robotic / metronomic pause timing
- Missing micro-variations in energy
- Unnaturally smooth energy envelope
- Consistent zero-crossing rate
"""
def analyze(self, y: np.ndarray, sr: int) -> AnalyzerResult:
artifacts = []
details = {}
try:
# 1. Energy contour smoothness
frame_length = int(0.025 * sr)
hop_length = int(0.010 * sr)
rms = librosa.feature.rms(y=y, frame_length=frame_length, hop_length=hop_length)[0]
if len(rms) > 10:
rms_diff = np.diff(rms)
energy_roughness = float(np.std(rms_diff) / (np.mean(rms) + 1e-10))
details["energy_roughness"] = round(energy_roughness, 4)
if energy_roughness < 0.08:
artifacts.append("unnaturally_smooth_energy_contour")
# 2. Zero-Crossing Rate consistency
zcr = librosa.feature.zero_crossing_rate(y, frame_length=frame_length, hop_length=hop_length)[0]
zcr_cv = float(np.std(zcr) / (np.mean(zcr) + 1e-10))
details["zcr_coefficient_of_variation"] = round(zcr_cv, 4)
if zcr_cv < 0.25:
artifacts.append("unnaturally_consistent_zero_crossings")
# 3. Pause regularity analysis
silence_threshold = np.percentile(np.abs(y), 10)
is_silent = np.abs(y) < silence_threshold * 3
silent_changes = np.diff(is_silent.astype(int))
pause_starts = np.where(silent_changes == 1)[0]
if len(pause_starts) >= 3:
pause_intervals = np.diff(pause_starts) / sr
interval_cv = float(np.std(pause_intervals) / (np.mean(pause_intervals) + 1e-10))
details["pause_interval_cv"] = round(interval_cv, 4)
details["num_pauses"] = len(pause_starts)
if interval_cv < 0.2 and len(pause_starts) > 3:
artifacts.append("metronomic_pause_timing")
# 4. Micro-jitter analysis (Optimized)
if float(len(y)) / sr > 0.5:
# Fast energy variance check instead of full autocorrelation loop
chunk_size = int(0.1 * sr)
# Reshape to chunks (discard remainder)
n_chunks = len(y) // chunk_size
if n_chunks > 4:
chunks = y[:n_chunks*chunk_size].reshape(n_chunks, chunk_size)
chunk_energies = np.sqrt(np.mean(chunks**2, axis=1))
# Check if energy variation is too regular
energy_std = np.std(chunk_energies)
if energy_std < 0.001:
artifacts.append("repetitive_energy_pattern")
score = min(1.0, len(artifacts) * 0.3)
except Exception as e:
logger.warning(f"TemporalAnalyzer error: {e}")
score = 0.5
artifacts = []
details["error"] = str(e)
return AnalyzerResult(
name="temporal_analysis",
score=round(score, 4),
verdict="AI_GENERATED" if score >= 0.5 else "HUMAN",
artifacts_found=artifacts,
details=details,
)
# ===============================================================
# Formant Analyzer
# ===============================================================
class FormantAnalyzer:
"""
Detects AI signatures in formant structure via MFCC analysis.
Optimized to use MFCCs as proxy for formants.
"""
def analyze(self, y: np.ndarray, sr: int) -> AnalyzerResult:
artifacts = []
details = {}
try:
# 1. MFCC stability
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
# Vectorized variation coefficient
means = np.abs(np.mean(mfccs[1:], axis=1)) + 1e-10
stds = np.std(mfccs[1:], axis=1)
mfcc_cvs = stds / means
avg_mfcc_cv = float(np.mean(mfcc_cvs))
details["avg_mfcc_cv"] = round(avg_mfcc_cv, 4)
if avg_mfcc_cv < 0.5:
artifacts.append("unnaturally_stable_formant_structure")
# 2. Delta smoothness
mfcc_deltas = librosa.feature.delta(mfccs)
delta_roughness = float(np.mean(np.abs(librosa.feature.delta(mfcc_deltas))))
details["delta_mfcc_roughness"] = round(delta_roughness, 4)
if delta_roughness < 0.3:
artifacts.append("overly_smooth_formant_transitions")
# 3. Inter-frame correlation (Vectorized)
if mfccs.shape[1] > 10:
# Vectorized correlation between adjacent frames
# Normalize frames
frames = mfccs.T
f_mean = frames.mean(axis=1, keepdims=True)
f_std = frames.std(axis=1, keepdims=True) + 1e-10
frames_norm = (frames - f_mean) / f_std
# Compute correlation of frame i with i+1
# Sum of product of normalized values / N
corrs = np.mean(frames_norm[:-1] * frames_norm[1:], axis=1)
mean_corr = float(np.mean(corrs))
details["inter_frame_correlation"] = round(mean_corr, 4)
if mean_corr > 0.95:
artifacts.append("excessive_inter_frame_correlation")
# 4. Mel-band energy uniformity (uses MFCCs as proxy instead of new melspectrogram for speed)
# MFCC[0] is energy; use variance of MFCCs as rough proxy for band variance
mfcc_var_range = float(np.max(stds) - np.min(stds))
if mfcc_var_range < 2.0:
artifacts.append("uniform_mel_band_energy")
score = min(1.0, len(artifacts) * 0.3)
except Exception as e:
logger.warning(f"FormantAnalyzer error: {e}")
score = 0.5
artifacts = []
details["error"] = str(e)
return AnalyzerResult(
name="formant_analysis",
score=round(score, 4),
verdict="AI_GENERATED" if score >= 0.5 else "HUMAN",
artifacts_found=artifacts,
details=details,
)
# ===============================================================
# Artifact Detector
# ===============================================================
class ArtifactDetector:
"""
Detects synthesis artifacts in the raw waveform.
"""
def analyze(self, y: np.ndarray, sr: int) -> AnalyzerResult:
artifacts = []
details = {}
try:
# 1. Click / pop detection
# Use diff for fast gradient check
diffs = np.abs(np.diff(y))
threshold = np.std(y) * 6 # Higher threshold
clicks = np.count_nonzero(diffs > threshold)
click_rate = clicks / (len(y) / sr)
details["click_rate_per_sec"] = round(click_rate, 2)
if click_rate > 10:
artifacts.append("synthesis_click_artifacts")
# 2. Waveform symmetry
pos_vals = y[y > 0]
neg_vals = y[y < 0]
if len(pos_vals) > 0 and len(neg_vals) > 0:
pos_rms = np.sqrt(np.mean(pos_vals ** 2))
neg_rms = np.sqrt(np.mean(neg_vals ** 2))
symmetry = float(pos_rms / (neg_rms + 1e-10))
details["waveform_symmetry"] = round(symmetry, 4)
if abs(symmetry - 1.0) > 0.3:
artifacts.append("asymmetric_waveform")
# 3. Silence segment quality
silence_mask = np.abs(y) < 0.001
if np.any(silence_mask):
silent_vals = y[silence_mask]
silence_noise_floor = float(np.std(silent_vals))
details["silence_noise_floor"] = round(silence_noise_floor, 6)
if silence_noise_floor < 1e-6 and len(silent_vals) > sr * 0.05:
artifacts.append("digitally_perfect_silence")
# 4. Periodicity (Optimized - simple zcr based check instead of expensive autocorrelation)
# Highly periodic signals (machines) have very stable low ZCR
# Re-using ZCR concept from temporal but specifically for hyper-periodicity
score = min(1.0, len(artifacts) * 0.25)
except Exception as e:
logger.warning(f"ArtifactDetector error: {e}")
score = 0.5
artifacts = []
details["error"] = str(e)
return AnalyzerResult(
name="artifact_detection",
score=round(score, 4),
verdict="AI_GENERATED" if score >= 0.5 else "HUMAN",
artifacts_found=artifacts,
details=details,
)
# ===============================================================
# Forensic Engine (orchestrates all analyzers)
# ===============================================================
class ForensicEngine:
"""
Runs all forensic analyzers and produces a combined result.
Orchestrates parallel execution for speed.
"""
def __init__(self):
self.spectral = SpectralAnalyzer()
self.temporal = TemporalAnalyzer()
self.formant = FormantAnalyzer()
self.artifact = ArtifactDetector()
# Initialize thread pool
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
def analyze(self, y: np.ndarray, sr: int) -> Dict[str, Any]:
"""Run all analyzers in PARALLEL and return combined report."""
results = {}
# Define tasks
tasks = {
self._executor.submit(self.spectral.analyze, y, sr): "spectral",
self._executor.submit(self.temporal.analyze, y, sr): "temporal",
self._executor.submit(self.formant.analyze, y, sr): "formant",
self._executor.submit(self.artifact.analyze, y, sr): "artifact"
}
# Wait for all to complete
for future in concurrent.futures.as_completed(tasks):
try:
result = future.result()
results[result.name] = {
"score": result.score,
"verdict": result.verdict,
"artifacts_found": result.artifacts_found,
"details": result.details,
}
except Exception as e:
logger.error(f"Analyzer failed: {e}")
# Provide strict fallback for failures
results["error"] = {"score": 0.5, "verdict": "UNKNOWN", "details": str(e)}
return results
def compute_forensic_score(self, forensic_results: Dict[str, Any]) -> float:
"""
Compute a weighted forensic score.
Returns 0.0 (definitely human) to 1.0 (definitely AI).
"""
weights = {
"spectral_analysis": 0.30,
"temporal_analysis": 0.25,
"formant_analysis": 0.25,
"artifact_detection": 0.20,
}
weighted_sum = 0.0
total_weight = 0.0
for name, result in forensic_results.items():
if name == "error": continue
w = weights.get(name, 0.25)
weighted_sum += result.get("score", 0.5) * w
total_weight += w
return round(weighted_sum / (total_weight + 1e-10), 4)
def get_all_artifacts(self, forensic_results: Dict[str, Any]) -> List[str]:
"""Collect all artifacts found across all analyzers."""
all_artifacts = []
for result in forensic_results.values():
all_artifacts.extend(result.get("artifacts_found", []))
return all_artifacts
# Singleton instance
forensic_engine = ForensicEngine()