Spaces:
Running
Running
| """ | |
| Voice Analysis Engine. | |
| Combines Wav2Vec2 deepfake detection with signal forensics. | |
| """ | |
| import logging | |
| import os | |
| import numpy as np | |
| import librosa | |
| import torch | |
| from scipy.stats import entropy | |
| from typing import Dict, Tuple, List, Optional | |
| from dataclasses import dataclass | |
| import warnings | |
| from config import settings | |
| logger = logging.getLogger(__name__) | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| # Heuristic thresholds (env-configurable for tuning) | |
| HEURISTIC_THRESHOLDS = { | |
| # Pitch scoring | |
| "pitch_optimal_stability": float(os.getenv("PITCH_OPTIMAL_STABILITY", "0.20")), | |
| "pitch_stability_range": float(os.getenv("PITCH_STABILITY_RANGE", "0.20")), | |
| "pitch_optimal_jitter": float(os.getenv("PITCH_OPTIMAL_JITTER", "0.04")), | |
| "pitch_jitter_range": float(os.getenv("PITCH_JITTER_RANGE", "0.05")), | |
| # Spectral scoring | |
| "spectral_optimal_entropy": float(os.getenv("SPECTRAL_OPTIMAL_ENTROPY", "5.8")), | |
| "spectral_entropy_range": float(os.getenv("SPECTRAL_ENTROPY_RANGE", "2.5")), | |
| "spectral_optimal_flatness": float(os.getenv("SPECTRAL_OPTIMAL_FLATNESS", "0.06")), | |
| "spectral_flatness_range": float(os.getenv("SPECTRAL_FLATNESS_RANGE", "0.08")), | |
| # Acoustic anomaly | |
| "anomaly_flatness_threshold": float(os.getenv("ANOMALY_FLATNESS_THRESHOLD", "0.13")), | |
| "anomaly_voiced_low": float(os.getenv("ANOMALY_VOICED_LOW", "0.35")), | |
| "anomaly_voiced_high": float(os.getenv("ANOMALY_VOICED_HIGH", "0.95")), | |
| "anomaly_hnr_low": float(os.getenv("ANOMALY_HNR_LOW", "6.0")), | |
| "anomaly_hnr_high": float(os.getenv("ANOMALY_HNR_HIGH", "35.0")), | |
| } | |
| # Global model cache | |
| _model = None | |
| _processor = None | |
| _device = None | |
| class AnalysisResult: | |
| """Result of voice analysis.""" | |
| classification: str # "AI_GENERATED" or "HUMAN" | |
| confidence_score: float # 0.0 to 1.0 | |
| explanation: str | |
| features: Dict[str, float] # Individual feature scores for debugging | |
| def get_device(): | |
| """Get the best available device (GPU or CPU).""" | |
| global _device | |
| if _device is None: | |
| if torch.cuda.is_available(): | |
| _device = "cuda" | |
| else: | |
| _device = "cpu" | |
| logger.info("Using device: %s", _device) | |
| return _device | |
| _invert_labels: bool = False | |
| def _detect_label_inversion(model): | |
| """Check once at load time whether this model needs label flipping.""" | |
| global _invert_labels | |
| name = getattr(model.config, '_name_or_path', '').lower() | |
| _invert_labels = 'shivam-2211' in name or 'voice-detection-model' in name | |
| if _invert_labels: | |
| logger.info("Label inversion enabled for model: %s", name) | |
| def load_model(): | |
| """ | |
| Load the Wav2Vec2 deepfake detection model. | |
| Prioritizes HuggingFace Hub model, with local fallback. | |
| """ | |
| global _model, _processor, _invert_labels | |
| if _model is None: | |
| from transformers import Wav2Vec2ForSequenceClassification, Wav2Vec2FeatureExtractor | |
| # Model priority: | |
| # 1. Local fine-tuned model (for development) | |
| # 2. HuggingFace Hub model (for production/deployment) | |
| # 3. Fallback to public model | |
| local_path = settings.VOICE_MODEL_LOCAL_PATH | |
| hf_model = settings.VOICE_MODEL_ID | |
| backup_model = settings.VOICE_MODEL_BACKUP_ID | |
| if os.path.exists(local_path): | |
| logger.info("Loading local model from: %s", local_path) | |
| model_name = local_path | |
| else: | |
| logger.info("Loading model from HuggingFace Hub: %s", hf_model) | |
| model_name = hf_model | |
| try: | |
| _processor = Wav2Vec2FeatureExtractor.from_pretrained(model_name) | |
| _model = Wav2Vec2ForSequenceClassification.from_pretrained(model_name) | |
| _model.to(get_device()) | |
| _model.eval() | |
| logger.info("Model loaded: %s", model_name) | |
| _detect_label_inversion(_model) | |
| except Exception as e: | |
| logger.error("Failed to load model %s: %s", model_name, e) | |
| if model_name != backup_model: | |
| logger.warning("Trying backup model...") | |
| model_name = backup_model | |
| try: | |
| _processor = Wav2Vec2FeatureExtractor.from_pretrained(model_name) | |
| _model = Wav2Vec2ForSequenceClassification.from_pretrained(model_name) | |
| _model.to(get_device()) | |
| _model.eval() | |
| logger.info("Backup model loaded: %s", model_name) | |
| _detect_label_inversion(_model) | |
| except Exception as e2: | |
| raise RuntimeError(f"Could not load any model: {e2}") | |
| else: | |
| raise e | |
| return _model, _processor | |
| def extract_signal_features(audio: np.ndarray, sr: int, fast_mode: bool = False) -> Dict[str, float]: | |
| """Extract signal-based features (pitch, entropy, silence).""" | |
| features = {} | |
| try: | |
| # Use smaller FFT in fast mode for realtime throughput. | |
| n_fft = 512 if fast_mode else 2048 | |
| hop_length = 256 if fast_mode else 512 | |
| S = np.abs(librosa.stft(audio, n_fft=n_fft, hop_length=hop_length)) | |
| # Pitch analysis. | |
| if fast_mode: | |
| # Approximate pitch variability from centroid dynamics to avoid expensive pYIN on realtime path. | |
| spec_centroid = librosa.feature.spectral_centroid(S=S, sr=sr)[0] | |
| centroid_mean = float(np.mean(spec_centroid) + 1e-8) | |
| features["pitch_stability"] = float(np.clip(np.var(spec_centroid) / (centroid_mean ** 2), 0.0, 1.5)) | |
| features["jitter"] = float(np.clip(np.mean(np.abs(np.diff(spec_centroid))) / centroid_mean, 0.0, 0.2)) | |
| voiced_flag = librosa.feature.rms(y=audio, frame_length=n_fft, hop_length=hop_length)[0] > 0.02 | |
| else: | |
| f0, voiced_flag, _ = librosa.pyin( | |
| audio, | |
| fmin=librosa.note_to_hz('C2'), | |
| fmax=librosa.note_to_hz('C7'), | |
| sr=sr | |
| ) | |
| f0_voiced = f0[~np.isnan(f0)] | |
| if len(f0_voiced) > 10: | |
| pitch_mean = np.mean(f0_voiced) | |
| pitch_std = np.std(f0_voiced) | |
| features["pitch_stability"] = pitch_std / pitch_mean if pitch_mean > 0 else 0 | |
| features["jitter"] = np.mean(np.abs(np.diff(f0_voiced))) / pitch_mean if pitch_mean > 0 else 0 | |
| else: | |
| features["pitch_stability"] = 0.5 | |
| features["jitter"] = 0.05 | |
| # Spectral features | |
| spec_centroid = librosa.feature.spectral_centroid(S=S, sr=sr)[0] | |
| features["spectral_centroid_var"] = float(np.var(spec_centroid)) | |
| spec_flatness = librosa.feature.spectral_flatness(S=S)[0] | |
| features["spectral_flatness"] = float(np.mean(spec_flatness)) | |
| # Entropy | |
| S_norm = S / (np.sum(S, axis=0, keepdims=True) + 1e-10) | |
| frame_entropies = [entropy(frame + 1e-10) for frame in S_norm.T] | |
| features["spectral_entropy"] = float(np.mean(frame_entropies)) | |
| # Silence detection | |
| silence_threshold = 1e-5 | |
| features["silence_ratio"] = float(np.sum(np.abs(audio) < silence_threshold) / len(audio)) | |
| features["perfect_silence"] = float(np.sum(audio == 0) / len(audio)) | |
| # Zero crossing rate | |
| zcr = librosa.feature.zero_crossing_rate(audio)[0] | |
| features["zcr_variance"] = float(np.var(zcr)) | |
| # Additional acoustic heuristics for suspicious audio artifacts. | |
| spec_rolloff = librosa.feature.spectral_rolloff(S=S, sr=sr)[0] | |
| features["spectral_rolloff_var"] = float(np.var(spec_rolloff)) | |
| features["voiced_ratio"] = float(np.mean(voiced_flag.astype(np.float32))) if voiced_flag is not None else 0.0 | |
| rms = librosa.feature.rms(y=audio)[0] | |
| features["rms_var"] = float(np.var(rms)) | |
| if fast_mode: | |
| # Cheap HNR approximation from flatness and entropy for realtime throughput. | |
| hnr_db = float(max(0.0, 30.0 - (features["spectral_flatness"] * 120.0))) | |
| else: | |
| harmonic, percussive = librosa.effects.hpss(audio) | |
| harmonic_rms = float(np.sqrt(np.mean(np.square(harmonic))) + 1e-8) | |
| percussive_rms = float(np.sqrt(np.mean(np.square(percussive))) + 1e-8) | |
| hnr_db = float(20.0 * np.log10(harmonic_rms / percussive_rms)) | |
| features["harmonic_noise_ratio_db"] = hnr_db | |
| except Exception as e: | |
| logger.warning("Feature extraction error: %s", e) | |
| features = { | |
| "pitch_stability": 0.5, | |
| "jitter": 0.05, | |
| "spectral_centroid_var": 1000, | |
| "spectral_flatness": 0.1, | |
| "spectral_entropy": 5.0, | |
| "silence_ratio": 0.0, | |
| "perfect_silence": 0.0, | |
| "zcr_variance": 0.01, | |
| "spectral_rolloff_var": 50000.0, | |
| "voiced_ratio": 0.65, | |
| "rms_var": 0.005, | |
| "harmonic_noise_ratio_db": 14.0, | |
| } | |
| return features | |
| def generate_explanation( | |
| classification: str, | |
| ml_confidence: float, | |
| features: Dict[str, float] | |
| ) -> str: | |
| """Generate a data-driven forensic explanation for the classification.""" | |
| # Calculate acoustic anomaly scores (0-100 scale) | |
| pitch_score = _calculate_pitch_score(features) | |
| spectral_score = _calculate_spectral_score(features) | |
| temporal_score = _calculate_temporal_score(features) | |
| # Overall authenticity score (inverted for AI detection) | |
| authenticity_score = (pitch_score + spectral_score + temporal_score) / 3 | |
| # Confidence tier affects explanation style | |
| if ml_confidence >= 0.95: | |
| confidence_tier = "high" | |
| elif ml_confidence >= 0.75: | |
| confidence_tier = "moderate" | |
| else: | |
| confidence_tier = "low" | |
| if classification == "AI_GENERATED": | |
| return _explain_ai_detection( | |
| confidence_tier, ml_confidence, authenticity_score, | |
| pitch_score, spectral_score, temporal_score, features | |
| ) | |
| else: | |
| return _explain_human_detection( | |
| confidence_tier, ml_confidence, authenticity_score, | |
| pitch_score, spectral_score, temporal_score, features | |
| ) | |
| def _calculate_pitch_score(features: Dict[str, float]) -> float: | |
| """Calculate pitch naturalness score (0-100). Higher = more human-like. | |
| Uses peaked scoring centred on the human sweet-spot so that both | |
| extremes (too perfect = AI, too erratic = glitch) are penalised. | |
| """ | |
| pitch_stability = features.get("pitch_stability", 0.5) | |
| jitter = features.get("jitter", 0.05) | |
| # Human sweet-spot: stability ≈ 0.15-0.25 (natural micro-variation) | |
| # AI tends to be TOO stable (> 0.30) — penalise perfection. | |
| optimal_stability = HEURISTIC_THRESHOLDS["pitch_optimal_stability"] | |
| stability_dev = abs(pitch_stability - optimal_stability) / HEURISTIC_THRESHOLDS["pitch_stability_range"] | |
| stability_score = max(0.0, min(100.0, 100.0 * (1.0 - stability_dev))) | |
| # Human jitter ≈ 0.02-0.06 (natural pitch wobble) | |
| # AI jitter often < 0.01 (too clean/monotone) | |
| optimal_jitter = HEURISTIC_THRESHOLDS["pitch_optimal_jitter"] | |
| jitter_dev = abs(jitter - optimal_jitter) / HEURISTIC_THRESHOLDS["pitch_jitter_range"] | |
| jitter_score = max(0.0, min(100.0, 100.0 * (1.0 - jitter_dev))) | |
| return (stability_score * 0.6 + jitter_score * 0.4) | |
| def _calculate_spectral_score(features: Dict[str, float]) -> float: | |
| """Calculate spectral naturalness score (0-100). Higher = more human-like. | |
| Peaked scoring — too-uniform spectrum (low flatness, very high | |
| entropy) is penalised as suspicious synthetic perfection. | |
| """ | |
| entropy = features.get("spectral_entropy", 5.0) | |
| flatness = features.get("spectral_flatness", 0.1) | |
| # Human sweet-spot: entropy ≈ 5.0-6.5 (rich harmonic content) | |
| # AI can have extremely high entropy (uniform noise floor) or | |
| # very low entropy (monotone vocoder). | |
| optimal_entropy = HEURISTIC_THRESHOLDS["spectral_optimal_entropy"] | |
| entropy_dev = abs(entropy - optimal_entropy) / HEURISTIC_THRESHOLDS["spectral_entropy_range"] | |
| entropy_score = max(0.0, min(100.0, 100.0 * (1.0 - entropy_dev))) | |
| # Human flatness ≈ 0.03-0.10 (varied spectral shape) | |
| # AI often has very low (< 0.02) or very high (> 0.15) flatness. | |
| optimal_flatness = HEURISTIC_THRESHOLDS["spectral_optimal_flatness"] | |
| flatness_dev = abs(flatness - optimal_flatness) / HEURISTIC_THRESHOLDS["spectral_flatness_range"] | |
| flatness_score = max(0.0, min(100.0, 100.0 * (1.0 - flatness_dev))) | |
| return (entropy_score * 0.5 + flatness_score * 0.5) | |
| def _calculate_temporal_score(features: Dict[str, float]) -> float: | |
| """Calculate temporal/rhythm naturalness score (0-100). Higher = more human-like.""" | |
| zcr_var = features.get("zcr_variance", 0.01) | |
| silence_ratio = features.get("silence_ratio", 0.0) | |
| perfect_silence = features.get("perfect_silence", 0.0) | |
| # Penalize digital silence (exact zeros) - strong AI indicator | |
| digital_penalty = min(50, perfect_silence * 500) | |
| zcr_score = min(100, max(0, zcr_var / 0.02 * 100)) | |
| return max(0, zcr_score - digital_penalty) | |
| def _calculate_acoustic_anomaly_score(features: Dict[str, float]) -> float: | |
| """ | |
| Estimate suspicious acoustic artifact intensity (0-100). | |
| Higher score indicates stronger synthetic/spoof-like signal artifacts. | |
| """ | |
| perfect_silence = features.get("perfect_silence", 0.0) | |
| spectral_flatness = features.get("spectral_flatness", 0.1) | |
| rolloff_var = features.get("spectral_rolloff_var", 50000.0) | |
| voiced_ratio = features.get("voiced_ratio", 0.65) | |
| hnr_db = features.get("harmonic_noise_ratio_db", 14.0) | |
| digital_artifact_score = min(100.0, perfect_silence * 10000.0) | |
| flatness_artifact_score = min(100.0, max(0.0, (spectral_flatness - HEURISTIC_THRESHOLDS["anomaly_flatness_threshold"]) * 500.0)) | |
| rolloff_score = min(100.0, max(0.0, (np.log10(rolloff_var + 1.0) - 3.8) * 45.0)) | |
| if voiced_ratio < HEURISTIC_THRESHOLDS["anomaly_voiced_low"]: | |
| voiced_ratio_score = min(100.0, (HEURISTIC_THRESHOLDS["anomaly_voiced_low"] - voiced_ratio) * 180.0) | |
| elif voiced_ratio > HEURISTIC_THRESHOLDS["anomaly_voiced_high"]: | |
| voiced_ratio_score = min(100.0, (voiced_ratio - HEURISTIC_THRESHOLDS["anomaly_voiced_high"]) * 180.0) | |
| else: | |
| voiced_ratio_score = 0.0 | |
| if hnr_db < HEURISTIC_THRESHOLDS["anomaly_hnr_low"]: | |
| hnr_score = min(100.0, (HEURISTIC_THRESHOLDS["anomaly_hnr_low"] - hnr_db) * 8.0) | |
| elif hnr_db > HEURISTIC_THRESHOLDS["anomaly_hnr_high"]: | |
| # Raised from 28 dB — clean human recordings regularly exceed 28 dB | |
| hnr_score = min(100.0, (hnr_db - HEURISTIC_THRESHOLDS["anomaly_hnr_high"]) * 4.0) | |
| else: | |
| hnr_score = 0.0 | |
| anomaly_score = ( | |
| (digital_artifact_score * 0.35) | |
| + (flatness_artifact_score * 0.20) | |
| + (rolloff_score * 0.20) | |
| + (voiced_ratio_score * 0.15) | |
| + (hnr_score * 0.10) | |
| ) | |
| return float(max(0.0, min(100.0, anomaly_score))) | |
| def _explain_ai_detection( | |
| confidence_tier: str, | |
| ml_confidence: float, | |
| authenticity_score: float, | |
| pitch_score: float, | |
| spectral_score: float, | |
| temporal_score: float, | |
| features: Dict[str, float] | |
| ) -> str: | |
| """Generate explanation for AI-detected audio.""" | |
| # Find the weakest scores (most AI-like characteristics) | |
| scores = { | |
| "vocal pitch patterns": pitch_score, | |
| "spectral characteristics": spectral_score, | |
| "temporal dynamics": temporal_score | |
| } | |
| sorted_scores = sorted(scores.items(), key=lambda x: x[1]) | |
| # Build forensic-style explanation | |
| primary_indicator = sorted_scores[0][0] | |
| primary_score = sorted_scores[0][1] | |
| if confidence_tier == "high": | |
| intro = f"Strong synthetic markers detected (confidence: {ml_confidence:.0%}). " | |
| elif confidence_tier == "moderate": | |
| intro = f"Synthetic patterns identified (confidence: {ml_confidence:.0%}). " | |
| else: | |
| intro = f"Possible synthetic audio (confidence: {ml_confidence:.0%}). " | |
| # Specific findings based on lowest scoring area | |
| if primary_indicator == "vocal pitch patterns": | |
| jitter = features.get("jitter", 0) | |
| stability = features.get("pitch_stability", 0) | |
| detail = f"Pitch analysis shows unusually consistent patterns (stability: {stability:.3f}, micro-variation: {jitter:.4f}) - typical of synthesized speech." | |
| elif primary_indicator == "spectral characteristics": | |
| entropy = features.get("spectral_entropy", 0) | |
| flatness = features.get("spectral_flatness", 0) | |
| detail = f"Spectral fingerprint indicates synthetic generation (complexity: {entropy:.2f}, flatness: {flatness:.3f}) - lacking natural harmonic richness." | |
| else: | |
| perfect_silence = features.get("perfect_silence", 0) | |
| if perfect_silence > 0.005: | |
| detail = f"Digital artifacts detected: {perfect_silence:.1%} exact-zero samples found, indicating synthetic audio processing." | |
| else: | |
| detail = f"Temporal patterns suggest algorithmic generation - rhythm lacks natural human irregularities." | |
| # Add authenticity score as a unique metric | |
| authenticity_label = "very low" if authenticity_score < 25 else "low" if authenticity_score < 50 else "borderline" | |
| return f"{intro}{detail} Authenticity score: {authenticity_score:.0f}/100 ({authenticity_label})." | |
| def _explain_human_detection( | |
| confidence_tier: str, | |
| ml_confidence: float, | |
| authenticity_score: float, | |
| pitch_score: float, | |
| spectral_score: float, | |
| temporal_score: float, | |
| features: Dict[str, float] | |
| ) -> str: | |
| """Generate explanation for human-detected audio.""" | |
| # Find the strongest scores (most human-like characteristics) | |
| scores = { | |
| "vocal pitch patterns": pitch_score, | |
| "spectral characteristics": spectral_score, | |
| "temporal dynamics": temporal_score | |
| } | |
| sorted_scores = sorted(scores.items(), key=lambda x: x[1], reverse=True) | |
| primary_indicator = sorted_scores[0][0] | |
| primary_score = sorted_scores[0][1] | |
| if confidence_tier == "high": | |
| intro = f"Strong human voice markers detected (confidence: {ml_confidence:.0%}). " | |
| elif confidence_tier == "moderate": | |
| intro = f"Human speech patterns identified (confidence: {ml_confidence:.0%}). " | |
| else: | |
| intro = f"Likely human voice (confidence: {ml_confidence:.0%}). " | |
| # Specific findings based on highest scoring area | |
| if primary_indicator == "vocal pitch patterns": | |
| jitter = features.get("jitter", 0) | |
| stability = features.get("pitch_stability", 0) | |
| detail = f"Natural pitch dynamics confirmed (variability: {stability:.3f}, micro-fluctuations: {jitter:.4f}) - consistent with biological speech production." | |
| elif primary_indicator == "spectral characteristics": | |
| entropy = features.get("spectral_entropy", 0) | |
| detail = f"Rich harmonic structure detected (complexity score: {entropy:.2f}) - characteristic of natural vocal tract resonance." | |
| else: | |
| zcr_var = features.get("zcr_variance", 0) | |
| detail = f"Organic speech rhythm detected (variance: {zcr_var:.4f}) - natural breathing and articulation patterns present." | |
| # Add authenticity score | |
| authenticity_label = "excellent" if authenticity_score > 75 else "good" if authenticity_score > 50 else "moderate" | |
| return f"{intro}{detail} Authenticity score: {authenticity_score:.0f}/100 ({authenticity_label})." | |
| def classify_with_model(audio: np.ndarray, sr: int) -> Tuple[str, float]: | |
| """ | |
| Classify audio using the Wav2Vec2 model. | |
| Returns: | |
| Tuple of (classification, confidence) | |
| """ | |
| model, processor = load_model() | |
| device = get_device() | |
| # Normalize audio | |
| max_val = np.max(np.abs(audio)) | |
| if max_val > 0: | |
| audio = audio / max_val | |
| # Resample to 16kHz if needed | |
| target_sr = 16000 | |
| if sr != target_sr: | |
| audio = librosa.resample(audio, orig_sr=sr, target_sr=target_sr) | |
| # Process audio | |
| inputs = processor( | |
| audio, | |
| sampling_rate=target_sr, | |
| return_tensors="pt", | |
| padding=True | |
| ) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| logits = outputs.logits | |
| # Temperature scaling: soften probability distribution so the | |
| # heuristic cross-check can still correct misclassifications. | |
| temperature = float(settings.MODEL_LOGIT_TEMPERATURE) | |
| if temperature > 1.0: | |
| logits = logits / temperature | |
| probabilities = torch.softmax(logits, dim=-1) | |
| # Get prediction | |
| predicted_class = torch.argmax(probabilities, dim=-1).item() | |
| confidence = probabilities[0][predicted_class].item() | |
| # Normalise id2label keys from str to int (HF convention mismatch). | |
| raw_id2label = getattr(model.config, 'id2label', None) or {} | |
| id2label = {int(k): v for k, v in raw_id2label.items()} | |
| label = id2label.get(predicted_class, 'UNKNOWN') | |
| logger.debug( | |
| "Model id2label=%s predicted_class=%d resolved_label=%s probs=%s", | |
| id2label, predicted_class, label, | |
| [f"{p:.4f}" for p in probabilities[0].cpu().tolist()], | |
| ) | |
| # Label interpretation — see _detect_label_inversion() for rationale. | |
| if _invert_labels: | |
| classification = "HUMAN" if predicted_class == 0 else "AI_GENERATED" | |
| else: | |
| if label.upper() in ['FAKE', 'SPOOF', 'SYNTHETIC', 'AI']: | |
| classification = "AI_GENERATED" | |
| else: | |
| classification = "HUMAN" | |
| return classification, confidence | |
| def analyze_voice(audio: np.ndarray, sr: int, language: str = "English", realtime: bool = False, source: str = "file") -> AnalysisResult: | |
| """ | |
| Analyze a voice sample and classify as AI-generated or Human. | |
| Args: | |
| audio: Audio waveform as numpy array | |
| sr: Sample rate | |
| language: Language of the audio (for context) | |
| Returns: | |
| AnalysisResult with classification, confidence, and explanation | |
| Raises: | |
| ValueError: If audio is too short for reliable analysis | |
| """ | |
| # Validate minimum audio duration (at least 0.5 seconds for reliable analysis) | |
| min_duration = 0.5 # seconds | |
| duration = len(audio) / sr | |
| if duration < min_duration: | |
| raise ValueError(f"Audio too short ({duration:.2f}s). Minimum {min_duration}s required for reliable analysis.") | |
| fast_mode = bool(realtime and settings.REALTIME_LIGHTWEIGHT_AUDIO) | |
| # Get model prediction (legacy/deep path) or defer to lightweight realtime heuristic. | |
| ml_fallback = False | |
| classification = "HUMAN" | |
| ml_confidence = 0.5 | |
| if not fast_mode: | |
| try: | |
| classification, ml_confidence = classify_with_model(audio, sr) | |
| except Exception as e: | |
| logger.error("ML model error: %s, falling back to signal analysis", e) | |
| ml_fallback = True | |
| classification = "HUMAN" | |
| ml_confidence = 0.5 | |
| # Extract signal features for explainability. | |
| features = extract_signal_features(audio, sr, fast_mode=fast_mode) | |
| # Calculate scores explicitly for return. | |
| pitch_score = _calculate_pitch_score(features) | |
| spectral_score = _calculate_spectral_score(features) | |
| temporal_score = _calculate_temporal_score(features) | |
| authenticity_score = (pitch_score + spectral_score + temporal_score) / 3 | |
| acoustic_anomaly_score = _calculate_acoustic_anomaly_score(features) | |
| # Lightweight realtime path avoids transformer inference for throughput. | |
| if fast_mode: | |
| ai_probability = max( | |
| acoustic_anomaly_score / 100.0, | |
| max(0.0, min(1.0, (52.0 - authenticity_score) / 52.0)), | |
| ) | |
| classification = "AI_GENERATED" if ai_probability >= 0.56 else "HUMAN" | |
| ml_confidence = ai_probability if classification == "AI_GENERATED" else (1.0 - ai_probability) | |
| ml_confidence = float(max(0.5, min(0.99, ml_confidence))) | |
| # Authenticity cross-check (realtime mic only): when the model says | |
| # AI_GENERATED but signal forensics show human-like audio, moderate | |
| # the confidence or flip the classification. Not applied to file | |
| # uploads where the model should be trusted. | |
| if realtime and source == "mic" and classification == "AI_GENERATED" and authenticity_score > 35: | |
| moderation_factor = max(0.50, 1.0 - (authenticity_score - 35) / 100.0) | |
| if ml_confidence > moderation_factor: | |
| logger.info( | |
| "Authenticity cross-check: moderated AI confidence %.2f -> %.2f " | |
| "(authenticity=%.1f, anomaly=%.1f)", | |
| ml_confidence, moderation_factor, | |
| authenticity_score, acoustic_anomaly_score, | |
| ) | |
| ml_confidence = moderation_factor | |
| # Override when signal evidence strongly contradicts the model. | |
| if authenticity_score > 40 and acoustic_anomaly_score < 65: | |
| logger.info( | |
| "Authenticity override: flipping AI_GENERATED → HUMAN " | |
| "(authenticity=%.1f, anomaly=%.1f, original_conf=%.2f)", | |
| authenticity_score, acoustic_anomaly_score, ml_confidence, | |
| ) | |
| classification = "HUMAN" | |
| ml_confidence = max(0.55, 1.0 - ml_confidence) # invert confidence | |
| features["ml_confidence"] = ml_confidence | |
| features["ml_fallback"] = float(ml_fallback) | |
| features["realtime_heuristic_mode"] = float(fast_mode) | |
| features["audio_source"] = source | |
| # Add computed high-level scores to features for API response. | |
| features["authenticity_score"] = round(authenticity_score, 1) | |
| features["pitch_naturalness"] = round(pitch_score, 1) | |
| features["spectral_naturalness"] = round(spectral_score, 1) | |
| features["temporal_naturalness"] = round(temporal_score, 1) | |
| features["acoustic_anomaly_score"] = round(acoustic_anomaly_score, 1) | |
| # Generate explanation | |
| explanation = generate_explanation(classification, ml_confidence, features) | |
| return AnalysisResult( | |
| classification=classification, | |
| confidence_score=round(ml_confidence, 2), | |
| explanation=explanation, | |
| features=features | |
| ) | |
| # Pre-load model at module import (optional, for faster first request) | |
| def preload_model(): | |
| """Pre-load the model to speed up first request.""" | |
| try: | |
| load_model() | |
| except Exception as e: | |
| logger.error("Model preload failed: %s", e) | |