Spaces:
Sleeping
Sleeping
| """ | |
| Forensic Analyzers — Four independent analysis engines that examine audio | |
| for specific signatures of AI generation vs natural human speech. | |
| Each analyzer returns a score (0=human, 1=AI) and a list of detected artifacts. | |
| The final detection fuses all analyzer scores for maximum accuracy. | |
| """ | |
| import numpy as np | |
| import librosa | |
| import logging | |
| from dataclasses import dataclass, field | |
| from typing import List, Dict, Any, Optional | |
| import concurrent.futures | |
| logger = logging.getLogger(__name__) | |
| class AnalyzerResult: | |
| """Result from a single forensic analyzer.""" | |
| name: str | |
| score: float # 0.0 = definitely human, 1.0 = definitely AI | |
| verdict: str # "HUMAN" or "AI_GENERATED" | |
| artifacts_found: List[str] = field(default_factory=list) | |
| details: Dict[str, Any] = field(default_factory=dict) | |
| class AudioProfile: | |
| """Technical profile of the audio sample.""" | |
| duration_sec: float = 0.0 | |
| snr_db: float = 0.0 | |
| clipping_detected: bool = False | |
| silence_ratio: float = 0.0 | |
| rms_energy: float = 0.0 | |
| sample_rate: int = 16000 | |
| num_segments: int = 1 | |
| # =============================================================== | |
| # Spectral Analyzer | |
| # =============================================================== | |
| class SpectralAnalyzer: | |
| """ | |
| Detects AI signatures in the frequency domain: | |
| - Unnaturally smooth spectral envelope | |
| - Missing or artificial harmonics | |
| - Sharp frequency cutoffs (vocoder artifacts) | |
| - Abnormal spectral flatness | |
| """ | |
| def analyze(self, y: np.ndarray, sr: int) -> AnalyzerResult: | |
| artifacts = [] | |
| details = {} | |
| try: | |
| # 1. Spectral Flatness — AI speech tends to have lower flatness (more tonal) | |
| flatness = librosa.feature.spectral_flatness(y=y)[0] | |
| mean_flatness = float(np.mean(flatness)) | |
| std_flatness = float(np.std(flatness)) | |
| details["spectral_flatness_mean"] = round(mean_flatness, 4) | |
| details["spectral_flatness_std"] = round(std_flatness, 4) | |
| # Human speech has higher variance in spectral flatness | |
| if std_flatness < 0.02: | |
| artifacts.append("unnaturally_uniform_spectral_texture") | |
| if mean_flatness < 0.005: | |
| artifacts.append("overly_tonal_spectrum") | |
| # 2. Spectral Bandwidth — AI audio often has narrower bandwidth | |
| bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)[0] | |
| mean_bw = float(np.mean(bandwidth)) | |
| std_bw = float(np.std(bandwidth)) | |
| details["spectral_bandwidth_mean"] = round(mean_bw, 1) | |
| details["spectral_bandwidth_std"] = round(std_bw, 1) | |
| if std_bw < 200: | |
| artifacts.append("unnaturally_consistent_bandwidth") | |
| # 3. Spectral Centroid Variance — AI speech has more stable centroid | |
| centroid = librosa.feature.spectral_centroid(y=y, sr=sr)[0] | |
| centroid_cv = float(np.std(centroid) / (np.mean(centroid) + 1e-10)) | |
| details["spectral_centroid_cv"] = round(centroid_cv, 4) | |
| if centroid_cv < 0.15: | |
| artifacts.append("unnaturally_stable_spectral_centroid") | |
| # Optimization: Removed expensive HPSS and full STFT | |
| # Score: more artifacts = more likely AI | |
| score = min(1.0, len(artifacts) * 0.3) | |
| except Exception as e: | |
| logger.warning(f"SpectralAnalyzer error: {e}") | |
| score = 0.5 | |
| artifacts = [] | |
| details["error"] = str(e) | |
| return AnalyzerResult( | |
| name="spectral_analysis", | |
| score=round(score, 4), | |
| verdict="AI_GENERATED" if score >= 0.5 else "HUMAN", | |
| artifacts_found=artifacts, | |
| details=details, | |
| ) | |
| # =============================================================== | |
| # Temporal Analyzer | |
| # =============================================================== | |
| class TemporalAnalyzer: | |
| """ | |
| Detects AI signatures in the time domain: | |
| - Robotic / metronomic pause timing | |
| - Missing micro-variations in energy | |
| - Unnaturally smooth energy envelope | |
| - Consistent zero-crossing rate | |
| """ | |
| def analyze(self, y: np.ndarray, sr: int) -> AnalyzerResult: | |
| artifacts = [] | |
| details = {} | |
| try: | |
| # 1. Energy contour smoothness | |
| frame_length = int(0.025 * sr) | |
| hop_length = int(0.010 * sr) | |
| rms = librosa.feature.rms(y=y, frame_length=frame_length, hop_length=hop_length)[0] | |
| if len(rms) > 10: | |
| rms_diff = np.diff(rms) | |
| energy_roughness = float(np.std(rms_diff) / (np.mean(rms) + 1e-10)) | |
| details["energy_roughness"] = round(energy_roughness, 4) | |
| if energy_roughness < 0.08: | |
| artifacts.append("unnaturally_smooth_energy_contour") | |
| # 2. Zero-Crossing Rate consistency | |
| zcr = librosa.feature.zero_crossing_rate(y, frame_length=frame_length, hop_length=hop_length)[0] | |
| zcr_cv = float(np.std(zcr) / (np.mean(zcr) + 1e-10)) | |
| details["zcr_coefficient_of_variation"] = round(zcr_cv, 4) | |
| if zcr_cv < 0.25: | |
| artifacts.append("unnaturally_consistent_zero_crossings") | |
| # 3. Pause regularity analysis | |
| silence_threshold = np.percentile(np.abs(y), 10) | |
| is_silent = np.abs(y) < silence_threshold * 3 | |
| silent_changes = np.diff(is_silent.astype(int)) | |
| pause_starts = np.where(silent_changes == 1)[0] | |
| if len(pause_starts) >= 3: | |
| pause_intervals = np.diff(pause_starts) / sr | |
| interval_cv = float(np.std(pause_intervals) / (np.mean(pause_intervals) + 1e-10)) | |
| details["pause_interval_cv"] = round(interval_cv, 4) | |
| details["num_pauses"] = len(pause_starts) | |
| if interval_cv < 0.2 and len(pause_starts) > 3: | |
| artifacts.append("metronomic_pause_timing") | |
| # 4. Micro-jitter analysis (Optimized) | |
| if float(len(y)) / sr > 0.5: | |
| # Fast energy variance check instead of full autocorrelation loop | |
| chunk_size = int(0.1 * sr) | |
| # Reshape to chunks (discard remainder) | |
| n_chunks = len(y) // chunk_size | |
| if n_chunks > 4: | |
| chunks = y[:n_chunks*chunk_size].reshape(n_chunks, chunk_size) | |
| chunk_energies = np.sqrt(np.mean(chunks**2, axis=1)) | |
| # Check if energy variation is too regular | |
| energy_std = np.std(chunk_energies) | |
| if energy_std < 0.001: | |
| artifacts.append("repetitive_energy_pattern") | |
| score = min(1.0, len(artifacts) * 0.3) | |
| except Exception as e: | |
| logger.warning(f"TemporalAnalyzer error: {e}") | |
| score = 0.5 | |
| artifacts = [] | |
| details["error"] = str(e) | |
| return AnalyzerResult( | |
| name="temporal_analysis", | |
| score=round(score, 4), | |
| verdict="AI_GENERATED" if score >= 0.5 else "HUMAN", | |
| artifacts_found=artifacts, | |
| details=details, | |
| ) | |
| # =============================================================== | |
| # Formant Analyzer | |
| # =============================================================== | |
| class FormantAnalyzer: | |
| """ | |
| Detects AI signatures in formant structure via MFCC analysis. | |
| Optimized to use MFCCs as proxy for formants. | |
| """ | |
| def analyze(self, y: np.ndarray, sr: int) -> AnalyzerResult: | |
| artifacts = [] | |
| details = {} | |
| try: | |
| # 1. MFCC stability | |
| mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13) | |
| # Vectorized variation coefficient | |
| means = np.abs(np.mean(mfccs[1:], axis=1)) + 1e-10 | |
| stds = np.std(mfccs[1:], axis=1) | |
| mfcc_cvs = stds / means | |
| avg_mfcc_cv = float(np.mean(mfcc_cvs)) | |
| details["avg_mfcc_cv"] = round(avg_mfcc_cv, 4) | |
| if avg_mfcc_cv < 0.5: | |
| artifacts.append("unnaturally_stable_formant_structure") | |
| # 2. Delta smoothness | |
| mfcc_deltas = librosa.feature.delta(mfccs) | |
| delta_roughness = float(np.mean(np.abs(librosa.feature.delta(mfcc_deltas)))) | |
| details["delta_mfcc_roughness"] = round(delta_roughness, 4) | |
| if delta_roughness < 0.3: | |
| artifacts.append("overly_smooth_formant_transitions") | |
| # 3. Inter-frame correlation (Vectorized) | |
| if mfccs.shape[1] > 10: | |
| # Vectorized correlation between adjacent frames | |
| # Normalize frames | |
| frames = mfccs.T | |
| f_mean = frames.mean(axis=1, keepdims=True) | |
| f_std = frames.std(axis=1, keepdims=True) + 1e-10 | |
| frames_norm = (frames - f_mean) / f_std | |
| # Compute correlation of frame i with i+1 | |
| # Sum of product of normalized values / N | |
| corrs = np.mean(frames_norm[:-1] * frames_norm[1:], axis=1) | |
| mean_corr = float(np.mean(corrs)) | |
| details["inter_frame_correlation"] = round(mean_corr, 4) | |
| if mean_corr > 0.95: | |
| artifacts.append("excessive_inter_frame_correlation") | |
| # 4. Mel-band energy uniformity (uses MFCCs as proxy instead of new melspectrogram for speed) | |
| # MFCC[0] is energy; use variance of MFCCs as rough proxy for band variance | |
| mfcc_var_range = float(np.max(stds) - np.min(stds)) | |
| if mfcc_var_range < 2.0: | |
| artifacts.append("uniform_mel_band_energy") | |
| score = min(1.0, len(artifacts) * 0.3) | |
| except Exception as e: | |
| logger.warning(f"FormantAnalyzer error: {e}") | |
| score = 0.5 | |
| artifacts = [] | |
| details["error"] = str(e) | |
| return AnalyzerResult( | |
| name="formant_analysis", | |
| score=round(score, 4), | |
| verdict="AI_GENERATED" if score >= 0.5 else "HUMAN", | |
| artifacts_found=artifacts, | |
| details=details, | |
| ) | |
| # =============================================================== | |
| # Artifact Detector | |
| # =============================================================== | |
| class ArtifactDetector: | |
| """ | |
| Detects synthesis artifacts in the raw waveform. | |
| """ | |
| def analyze(self, y: np.ndarray, sr: int) -> AnalyzerResult: | |
| artifacts = [] | |
| details = {} | |
| try: | |
| # 1. Click / pop detection | |
| # Use diff for fast gradient check | |
| diffs = np.abs(np.diff(y)) | |
| threshold = np.std(y) * 6 # Higher threshold | |
| clicks = np.count_nonzero(diffs > threshold) | |
| click_rate = clicks / (len(y) / sr) | |
| details["click_rate_per_sec"] = round(click_rate, 2) | |
| if click_rate > 10: | |
| artifacts.append("synthesis_click_artifacts") | |
| # 2. Waveform symmetry | |
| pos_vals = y[y > 0] | |
| neg_vals = y[y < 0] | |
| if len(pos_vals) > 0 and len(neg_vals) > 0: | |
| pos_rms = np.sqrt(np.mean(pos_vals ** 2)) | |
| neg_rms = np.sqrt(np.mean(neg_vals ** 2)) | |
| symmetry = float(pos_rms / (neg_rms + 1e-10)) | |
| details["waveform_symmetry"] = round(symmetry, 4) | |
| if abs(symmetry - 1.0) > 0.3: | |
| artifacts.append("asymmetric_waveform") | |
| # 3. Silence segment quality | |
| silence_mask = np.abs(y) < 0.001 | |
| if np.any(silence_mask): | |
| silent_vals = y[silence_mask] | |
| silence_noise_floor = float(np.std(silent_vals)) | |
| details["silence_noise_floor"] = round(silence_noise_floor, 6) | |
| if silence_noise_floor < 1e-6 and len(silent_vals) > sr * 0.05: | |
| artifacts.append("digitally_perfect_silence") | |
| # 4. Periodicity (Optimized - simple zcr based check instead of expensive autocorrelation) | |
| # Highly periodic signals (machines) have very stable low ZCR | |
| # Re-using ZCR concept from temporal but specifically for hyper-periodicity | |
| score = min(1.0, len(artifacts) * 0.25) | |
| except Exception as e: | |
| logger.warning(f"ArtifactDetector error: {e}") | |
| score = 0.5 | |
| artifacts = [] | |
| details["error"] = str(e) | |
| return AnalyzerResult( | |
| name="artifact_detection", | |
| score=round(score, 4), | |
| verdict="AI_GENERATED" if score >= 0.5 else "HUMAN", | |
| artifacts_found=artifacts, | |
| details=details, | |
| ) | |
| # =============================================================== | |
| # Forensic Engine (orchestrates all analyzers) | |
| # =============================================================== | |
| class ForensicEngine: | |
| """ | |
| Runs all forensic analyzers and produces a combined result. | |
| Orchestrates parallel execution for speed. | |
| """ | |
| def __init__(self): | |
| self.spectral = SpectralAnalyzer() | |
| self.temporal = TemporalAnalyzer() | |
| self.formant = FormantAnalyzer() | |
| self.artifact = ArtifactDetector() | |
| # Initialize thread pool | |
| self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) | |
| def analyze(self, y: np.ndarray, sr: int) -> Dict[str, Any]: | |
| """Run all analyzers in PARALLEL and return combined report.""" | |
| results = {} | |
| # Define tasks | |
| tasks = { | |
| self._executor.submit(self.spectral.analyze, y, sr): "spectral", | |
| self._executor.submit(self.temporal.analyze, y, sr): "temporal", | |
| self._executor.submit(self.formant.analyze, y, sr): "formant", | |
| self._executor.submit(self.artifact.analyze, y, sr): "artifact" | |
| } | |
| # Wait for all to complete | |
| for future in concurrent.futures.as_completed(tasks): | |
| try: | |
| result = future.result() | |
| results[result.name] = { | |
| "score": result.score, | |
| "verdict": result.verdict, | |
| "artifacts_found": result.artifacts_found, | |
| "details": result.details, | |
| } | |
| except Exception as e: | |
| logger.error(f"Analyzer failed: {e}") | |
| # Provide strict fallback for failures | |
| results["error"] = {"score": 0.5, "verdict": "UNKNOWN", "details": str(e)} | |
| return results | |
| def compute_forensic_score(self, forensic_results: Dict[str, Any]) -> float: | |
| """ | |
| Compute a weighted forensic score. | |
| Returns 0.0 (definitely human) to 1.0 (definitely AI). | |
| """ | |
| weights = { | |
| "spectral_analysis": 0.30, | |
| "temporal_analysis": 0.25, | |
| "formant_analysis": 0.25, | |
| "artifact_detection": 0.20, | |
| } | |
| weighted_sum = 0.0 | |
| total_weight = 0.0 | |
| for name, result in forensic_results.items(): | |
| if name == "error": continue | |
| w = weights.get(name, 0.25) | |
| weighted_sum += result.get("score", 0.5) * w | |
| total_weight += w | |
| return round(weighted_sum / (total_weight + 1e-10), 4) | |
| def get_all_artifacts(self, forensic_results: Dict[str, Any]) -> List[str]: | |
| """Collect all artifacts found across all analyzers.""" | |
| all_artifacts = [] | |
| for result in forensic_results.values(): | |
| all_artifacts.extend(result.get("artifacts_found", [])) | |
| return all_artifacts | |
| # Singleton instance | |
| forensic_engine = ForensicEngine() | |