File size: 15,819 Bytes
c5c9261
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
"""
Forensic Analyzers — Four independent analysis engines that examine audio
for specific signatures of AI generation vs natural human speech.

Each analyzer returns a score (0=human, 1=AI) and a list of detected artifacts.
The final detection fuses all analyzer scores for maximum accuracy.
"""
import numpy as np
import librosa
import logging
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import concurrent.futures

logger = logging.getLogger(__name__)


@dataclass
class AnalyzerResult:
    """Result from a single forensic analyzer."""
    name: str
    score: float  # 0.0 = definitely human, 1.0 = definitely AI
    verdict: str  # "HUMAN" or "AI_GENERATED"
    artifacts_found: List[str] = field(default_factory=list)
    details: Dict[str, Any] = field(default_factory=dict)


@dataclass
class AudioProfile:
    """Technical profile of the audio sample."""
    duration_sec: float = 0.0
    snr_db: float = 0.0
    clipping_detected: bool = False
    silence_ratio: float = 0.0
    rms_energy: float = 0.0
    sample_rate: int = 16000
    num_segments: int = 1


# ===============================================================
#  Spectral Analyzer
# ===============================================================

class SpectralAnalyzer:
    """
    Detects AI signatures in the frequency domain:
    - Unnaturally smooth spectral envelope
    - Missing or artificial harmonics
    - Sharp frequency cutoffs (vocoder artifacts)
    - Abnormal spectral flatness
    """

    def analyze(self, y: np.ndarray, sr: int) -> AnalyzerResult:
        artifacts = []
        details = {}

        try:
            # 1. Spectral Flatness — AI speech tends to have lower flatness (more tonal)
            flatness = librosa.feature.spectral_flatness(y=y)[0]
            mean_flatness = float(np.mean(flatness))
            std_flatness = float(np.std(flatness))
            details["spectral_flatness_mean"] = round(mean_flatness, 4)
            details["spectral_flatness_std"] = round(std_flatness, 4)

            # Human speech has higher variance in spectral flatness
            if std_flatness < 0.02:
                artifacts.append("unnaturally_uniform_spectral_texture")
            if mean_flatness < 0.005:
                artifacts.append("overly_tonal_spectrum")

            # 2. Spectral Bandwidth — AI audio often has narrower bandwidth
            bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)[0]
            mean_bw = float(np.mean(bandwidth))
            std_bw = float(np.std(bandwidth))
            details["spectral_bandwidth_mean"] = round(mean_bw, 1)
            details["spectral_bandwidth_std"] = round(std_bw, 1)

            if std_bw < 200:
                artifacts.append("unnaturally_consistent_bandwidth")

            # 3. Spectral Centroid Variance — AI speech has more stable centroid
            centroid = librosa.feature.spectral_centroid(y=y, sr=sr)[0]
            centroid_cv = float(np.std(centroid) / (np.mean(centroid) + 1e-10))
            details["spectral_centroid_cv"] = round(centroid_cv, 4)

            if centroid_cv < 0.15:
                artifacts.append("unnaturally_stable_spectral_centroid")

            # Optimization: Removed expensive HPSS and full STFT

            # Score: more artifacts = more likely AI
            score = min(1.0, len(artifacts) * 0.3)

        except Exception as e:
            logger.warning(f"SpectralAnalyzer error: {e}")
            score = 0.5
            artifacts = []
            details["error"] = str(e)

        return AnalyzerResult(
            name="spectral_analysis",
            score=round(score, 4),
            verdict="AI_GENERATED" if score >= 0.5 else "HUMAN",
            artifacts_found=artifacts,
            details=details,
        )


# ===============================================================
#  Temporal Analyzer
# ===============================================================

class TemporalAnalyzer:
    """
    Detects AI signatures in the time domain:
    - Robotic / metronomic pause timing
    - Missing micro-variations in energy
    - Unnaturally smooth energy envelope
    - Consistent zero-crossing rate
    """

    def analyze(self, y: np.ndarray, sr: int) -> AnalyzerResult:
        artifacts = []
        details = {}

        try:
            # 1. Energy contour smoothness
            frame_length = int(0.025 * sr)
            hop_length = int(0.010 * sr)
            rms = librosa.feature.rms(y=y, frame_length=frame_length, hop_length=hop_length)[0]

            if len(rms) > 10:
                rms_diff = np.diff(rms)
                energy_roughness = float(np.std(rms_diff) / (np.mean(rms) + 1e-10))
                details["energy_roughness"] = round(energy_roughness, 4)

                if energy_roughness < 0.08:
                    artifacts.append("unnaturally_smooth_energy_contour")

            # 2. Zero-Crossing Rate consistency
            zcr = librosa.feature.zero_crossing_rate(y, frame_length=frame_length, hop_length=hop_length)[0]
            zcr_cv = float(np.std(zcr) / (np.mean(zcr) + 1e-10))
            details["zcr_coefficient_of_variation"] = round(zcr_cv, 4)

            if zcr_cv < 0.25:
                artifacts.append("unnaturally_consistent_zero_crossings")

            # 3. Pause regularity analysis
            silence_threshold = np.percentile(np.abs(y), 10)
            is_silent = np.abs(y) < silence_threshold * 3
            silent_changes = np.diff(is_silent.astype(int))
            pause_starts = np.where(silent_changes == 1)[0]
            
            if len(pause_starts) >= 3:
                pause_intervals = np.diff(pause_starts) / sr
                interval_cv = float(np.std(pause_intervals) / (np.mean(pause_intervals) + 1e-10))
                details["pause_interval_cv"] = round(interval_cv, 4)
                details["num_pauses"] = len(pause_starts)

                if interval_cv < 0.2 and len(pause_starts) > 3:
                    artifacts.append("metronomic_pause_timing")

            # 4. Micro-jitter analysis (Optimized)
            if float(len(y)) / sr > 0.5:
                # Fast energy variance check instead of full autocorrelation loop
                chunk_size = int(0.1 * sr)
                # Reshape to chunks (discard remainder)
                n_chunks = len(y) // chunk_size
                if n_chunks > 4:
                    chunks = y[:n_chunks*chunk_size].reshape(n_chunks, chunk_size)
                    chunk_energies = np.sqrt(np.mean(chunks**2, axis=1))
                    
                    # Check if energy variation is too regular
                    energy_std = np.std(chunk_energies)
                    if energy_std < 0.001:
                         artifacts.append("repetitive_energy_pattern")
            
            score = min(1.0, len(artifacts) * 0.3)

        except Exception as e:
            logger.warning(f"TemporalAnalyzer error: {e}")
            score = 0.5
            artifacts = []
            details["error"] = str(e)

        return AnalyzerResult(
            name="temporal_analysis",
            score=round(score, 4),
            verdict="AI_GENERATED" if score >= 0.5 else "HUMAN",
            artifacts_found=artifacts,
            details=details,
        )


# ===============================================================
#  Formant Analyzer
# ===============================================================

class FormantAnalyzer:
    """
    Detects AI signatures in formant structure via MFCC analysis.
    Optimized to use MFCCs as proxy for formants.
    """

    def analyze(self, y: np.ndarray, sr: int) -> AnalyzerResult:
        artifacts = []
        details = {}

        try:
            # 1. MFCC stability
            mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
            # Vectorized variation coefficient
            means = np.abs(np.mean(mfccs[1:], axis=1)) + 1e-10
            stds = np.std(mfccs[1:], axis=1)
            mfcc_cvs = stds / means
            
            avg_mfcc_cv = float(np.mean(mfcc_cvs))
            details["avg_mfcc_cv"] = round(avg_mfcc_cv, 4)

            if avg_mfcc_cv < 0.5:
                artifacts.append("unnaturally_stable_formant_structure")

            # 2. Delta smoothness
            mfcc_deltas = librosa.feature.delta(mfccs)
            delta_roughness = float(np.mean(np.abs(librosa.feature.delta(mfcc_deltas))))
            details["delta_mfcc_roughness"] = round(delta_roughness, 4)

            if delta_roughness < 0.3:
                artifacts.append("overly_smooth_formant_transitions")

            # 3. Inter-frame correlation (Vectorized)
            if mfccs.shape[1] > 10:
                # Vectorized correlation between adjacent frames
                # Normalize frames
                frames = mfccs.T
                f_mean = frames.mean(axis=1, keepdims=True)
                f_std = frames.std(axis=1, keepdims=True) + 1e-10
                frames_norm = (frames - f_mean) / f_std
                
                # Compute correlation of frame i with i+1
                # Sum of product of normalized values / N
                corrs = np.mean(frames_norm[:-1] * frames_norm[1:], axis=1)
                mean_corr = float(np.mean(corrs))
                
                details["inter_frame_correlation"] = round(mean_corr, 4)

                if mean_corr > 0.95:
                    artifacts.append("excessive_inter_frame_correlation")

            # 4. Mel-band energy uniformity (uses MFCCs as proxy instead of new melspectrogram for speed)
            # MFCC[0] is energy; use variance of MFCCs as rough proxy for band variance
            mfcc_var_range = float(np.max(stds) - np.min(stds))
            if mfcc_var_range < 2.0:
                 artifacts.append("uniform_mel_band_energy")

            score = min(1.0, len(artifacts) * 0.3)

        except Exception as e:
            logger.warning(f"FormantAnalyzer error: {e}")
            score = 0.5
            artifacts = []
            details["error"] = str(e)

        return AnalyzerResult(
            name="formant_analysis",
            score=round(score, 4),
            verdict="AI_GENERATED" if score >= 0.5 else "HUMAN",
            artifacts_found=artifacts,
            details=details,
        )


# ===============================================================
#  Artifact Detector
# ===============================================================

class ArtifactDetector:
    """
    Detects synthesis artifacts in the raw waveform.
    """

    def analyze(self, y: np.ndarray, sr: int) -> AnalyzerResult:
        artifacts = []
        details = {}

        try:
            # 1. Click / pop detection
            # Use diff for fast gradient check
            diffs = np.abs(np.diff(y))
            threshold = np.std(y) * 6  # Higher threshold
            clicks = np.count_nonzero(diffs > threshold)
            click_rate = clicks / (len(y) / sr)
            details["click_rate_per_sec"] = round(click_rate, 2)

            if click_rate > 10:
                artifacts.append("synthesis_click_artifacts")

            # 2. Waveform symmetry
            pos_vals = y[y > 0]
            neg_vals = y[y < 0]
            if len(pos_vals) > 0 and len(neg_vals) > 0:
                pos_rms = np.sqrt(np.mean(pos_vals ** 2))
                neg_rms = np.sqrt(np.mean(neg_vals ** 2))
                symmetry = float(pos_rms / (neg_rms + 1e-10))
                details["waveform_symmetry"] = round(symmetry, 4)

                if abs(symmetry - 1.0) > 0.3:
                    artifacts.append("asymmetric_waveform")

            # 3. Silence segment quality
            silence_mask = np.abs(y) < 0.001
            if np.any(silence_mask):
                silent_vals = y[silence_mask]
                silence_noise_floor = float(np.std(silent_vals))
                details["silence_noise_floor"] = round(silence_noise_floor, 6)

                if silence_noise_floor < 1e-6 and len(silent_vals) > sr * 0.05:
                    artifacts.append("digitally_perfect_silence")

            # 4. Periodicity (Optimized - simple zcr based check instead of expensive autocorrelation)
            # Highly periodic signals (machines) have very stable low ZCR
            # Re-using ZCR concept from temporal but specifically for hyper-periodicity
            
            score = min(1.0, len(artifacts) * 0.25)

        except Exception as e:
            logger.warning(f"ArtifactDetector error: {e}")
            score = 0.5
            artifacts = []
            details["error"] = str(e)

        return AnalyzerResult(
            name="artifact_detection",
            score=round(score, 4),
            verdict="AI_GENERATED" if score >= 0.5 else "HUMAN",
            artifacts_found=artifacts,
            details=details,
        )


# ===============================================================
#  Forensic Engine (orchestrates all analyzers)
# ===============================================================

class ForensicEngine:
    """
    Runs all forensic analyzers and produces a combined result.
    Orchestrates parallel execution for speed.
    """

    def __init__(self):
        self.spectral = SpectralAnalyzer()
        self.temporal = TemporalAnalyzer()
        self.formant = FormantAnalyzer()
        self.artifact = ArtifactDetector()
        # Initialize thread pool
        self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

    def analyze(self, y: np.ndarray, sr: int) -> Dict[str, Any]:
        """Run all analyzers in PARALLEL and return combined report."""
        results = {}

        # Define tasks
        tasks = {
            self._executor.submit(self.spectral.analyze, y, sr): "spectral",
            self._executor.submit(self.temporal.analyze, y, sr): "temporal",
            self._executor.submit(self.formant.analyze, y, sr): "formant",
            self._executor.submit(self.artifact.analyze, y, sr): "artifact"
        }

        # Wait for all to complete
        for future in concurrent.futures.as_completed(tasks):
            try:
                result = future.result()
                results[result.name] = {
                    "score": result.score,
                    "verdict": result.verdict,
                    "artifacts_found": result.artifacts_found,
                    "details": result.details,
                }
            except Exception as e:
                logger.error(f"Analyzer failed: {e}")
                # Provide strict fallback for failures
                results["error"] = {"score": 0.5, "verdict": "UNKNOWN", "details": str(e)}

        return results

    def compute_forensic_score(self, forensic_results: Dict[str, Any]) -> float:
        """
        Compute a weighted forensic score.
        Returns 0.0 (definitely human) to 1.0 (definitely AI).
        """
        weights = {
            "spectral_analysis": 0.30,
            "temporal_analysis": 0.25,
            "formant_analysis": 0.25,
            "artifact_detection": 0.20,
        }

        weighted_sum = 0.0
        total_weight = 0.0
        for name, result in forensic_results.items():
            if name == "error": continue
            w = weights.get(name, 0.25)
            weighted_sum += result.get("score", 0.5) * w
            total_weight += w

        return round(weighted_sum / (total_weight + 1e-10), 4)

    def get_all_artifacts(self, forensic_results: Dict[str, Any]) -> List[str]:
        """Collect all artifacts found across all analyzers."""
        all_artifacts = []
        for result in forensic_results.values():
            all_artifacts.extend(result.get("artifacts_found", []))
        return all_artifacts


# Singleton instance
forensic_engine = ForensicEngine()