Spaces:
Sleeping
Sleeping
| """ | |
| Main Audio Analyzer - orchestrates all analysis phases. | |
| """ | |
| import os | |
| import uuid | |
| import json | |
| import time | |
| import tempfile | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Callable | |
| from dataclasses import dataclass, asdict | |
| import numpy as np | |
| import torch | |
| import torchaudio | |
| def to_python_type(obj): | |
| """Convert numpy types to Python native types for JSON serialization.""" | |
| if isinstance(obj, (np.bool_, bool)): | |
| return bool(obj) | |
| elif isinstance(obj, (np.integer, np.int64, np.int32)): | |
| return int(obj) | |
| elif isinstance(obj, (np.floating, np.float64, np.float32)): | |
| return float(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| elif isinstance(obj, dict): | |
| return {k: to_python_type(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [to_python_type(i) for i in obj] | |
| return obj | |
| from .phase1_foundation import ( | |
| AudioPreprocessor, | |
| VoiceActivityDetector, | |
| SpeakerDiarizer, | |
| VoiceprintExtractor, | |
| VoiceprintResult | |
| ) | |
| from .phase2_background import BackgroundAnalyzer, BackgroundAnomaly | |
| from .phase6_synthetic import SyntheticDetector, WakeWordDetector, PlaybackDetector | |
| from .fraud_detection import ( | |
| WhisperDetector, WhisperResult, | |
| ReadingPatternAnalyzer, ReadingPatternResult, | |
| SuspiciousPauseDetector, PauseResult | |
| ) | |
| from .database import Database | |
| class SpeakerResult: | |
| """Result for a detected speaker.""" | |
| voiceprint_id: str | |
| label: str | |
| role: str # "main" or "additional" | |
| total_seconds: float | |
| quality: str | |
| is_synthetic: bool | |
| synthetic_score: float | |
| is_playback: bool = False | |
| playback_score: float = 0.0 | |
| playback_indicators: List[str] = None | |
| times_seen: int = 1 | |
| is_flagged: bool = False | |
| segments: List[dict] = None | |
| clip_path: str = None # Path to audio sample for this speaker | |
| class AnalysisResult: | |
| """Complete analysis result.""" | |
| test_id: str | |
| filename: str | |
| duration_seconds: float | |
| analyzed_at: str | |
| # Speakers | |
| main_speaker: Optional[SpeakerResult] | |
| additional_speakers: List[SpeakerResult] | |
| # Background | |
| background_anomalies: List[dict] | |
| # Wake words | |
| wake_words: List[dict] | |
| assistant_responses: List[dict] | |
| # Prompt voice (audio from question prompts) | |
| prompt_voice_detected: bool | |
| prompt_voice_seconds: float | |
| # Playback detection (global) | |
| playback_detected: bool = False | |
| playback_score: float = 0.0 | |
| playback_indicators: List[str] = None | |
| # Fraud detection - Whisper (background voices) | |
| whisper_detected: bool = False | |
| whisper_instances: List[dict] = None | |
| # Fraud detection - Reading pattern | |
| reading_pattern_detected: bool = False | |
| reading_confidence: float = 0.0 | |
| reading_indicators: List[str] = None | |
| # Fraud detection - Suspicious pauses | |
| suspicious_pauses_detected: bool = False | |
| suspicious_pauses: List[dict] = None | |
| longest_pause: float = 0.0 | |
| def risk_score(self) -> int: | |
| """Consolidated risk score 0-100.""" | |
| score = 0.0 | |
| if self.main_speaker: | |
| score += self.main_speaker.synthetic_score * 25 | |
| score += self.playback_score * 15 | |
| score += self.reading_confidence * 20 | |
| whisper_count = len(self.whisper_instances) if self.whisper_instances else 0 | |
| score += min(whisper_count, 3) / 3 * 15 | |
| pause_count = len(self.suspicious_pauses) if self.suspicious_pauses else 0 | |
| score += min(pause_count, 3) / 3 * 10 | |
| wake_count = len(self.wake_words) if self.wake_words else 0 | |
| score += min(wake_count, 2) / 2 * 10 | |
| if self.main_speaker and self.main_speaker.times_seen >= 3: | |
| score += 5 | |
| # Hard fraud indicators force minimum risk levels | |
| additional_count = len(self.additional_speakers) if self.additional_speakers else 0 | |
| if additional_count > 0: | |
| score = max(score, 45) # 2+ speakers = MEDIUM minimum | |
| if wake_count > 0: | |
| score = max(score, 35) # Wake word = MEDIUM minimum | |
| if whisper_count > 0: | |
| score = max(score, 40) # Whispers = MEDIUM minimum | |
| return int(round(min(score, 100))) | |
| def risk_label(self) -> str: | |
| s = self.risk_score | |
| if s <= 30: | |
| return "LOW" | |
| elif s <= 60: | |
| return "MEDIUM" | |
| else: | |
| return "HIGH" | |
| def risk_color(self) -> str: | |
| s = self.risk_score | |
| if s <= 30: | |
| return "#22c55e" | |
| elif s <= 60: | |
| return "#eab308" | |
| else: | |
| return "#ef4444" | |
| def to_dict(self) -> dict: | |
| """Convert to dictionary.""" | |
| result = { | |
| 'test_id': self.test_id, | |
| 'filename': self.filename, | |
| 'duration_seconds': float(self.duration_seconds), | |
| 'analyzed_at': self.analyzed_at, | |
| 'risk_score': self.risk_score, | |
| 'risk_label': self.risk_label, | |
| 'main_speaker': to_python_type(asdict(self.main_speaker)) if self.main_speaker else None, | |
| 'additional_speakers': [to_python_type(asdict(s)) for s in self.additional_speakers], | |
| 'background_anomalies': to_python_type(self.background_anomalies), | |
| 'wake_words': to_python_type(self.wake_words), | |
| 'assistant_responses': to_python_type(self.assistant_responses), | |
| 'prompt_voice_detected': bool(self.prompt_voice_detected), | |
| 'prompt_voice_seconds': float(self.prompt_voice_seconds), | |
| 'playback_detected': bool(self.playback_detected), | |
| 'playback_score': float(self.playback_score), | |
| 'playback_indicators': self.playback_indicators or [], | |
| # Fraud detection fields | |
| 'whisper_detected': bool(self.whisper_detected), | |
| 'whisper_instances': to_python_type(self.whisper_instances or []), | |
| 'reading_pattern_detected': bool(self.reading_pattern_detected), | |
| 'reading_confidence': float(self.reading_confidence), | |
| 'reading_indicators': self.reading_indicators or [], | |
| 'suspicious_pauses_detected': bool(self.suspicious_pauses_detected), | |
| 'suspicious_pauses': to_python_type(self.suspicious_pauses or []), | |
| 'longest_pause': float(self.longest_pause) | |
| } | |
| return result | |
| def to_json(self) -> str: | |
| """Convert to JSON string.""" | |
| return json.dumps(self.to_dict(), indent=2) | |
| def from_dict(cls, d: dict) -> 'AnalysisResult': | |
| """Reconstruct an AnalysisResult from a stored dict (e.g. from DB JSON).""" | |
| main = None | |
| if d.get('main_speaker'): | |
| ms = d['main_speaker'] | |
| main = SpeakerResult( | |
| voiceprint_id=ms.get('voiceprint_id', ''), | |
| label=ms.get('label', ''), | |
| role=ms.get('role', 'main'), | |
| total_seconds=ms.get('total_seconds', 0), | |
| quality=ms.get('quality', 'Unknown'), | |
| is_synthetic=ms.get('is_synthetic', False), | |
| synthetic_score=ms.get('synthetic_score', 0), | |
| is_playback=ms.get('is_playback', False), | |
| playback_score=ms.get('playback_score', 0), | |
| playback_indicators=ms.get('playback_indicators'), | |
| times_seen=ms.get('times_seen', 1), | |
| is_flagged=ms.get('is_flagged', False), | |
| segments=ms.get('segments'), | |
| clip_path=ms.get('clip_path'), | |
| ) | |
| additional = [] | |
| for s in d.get('additional_speakers', []): | |
| additional.append(SpeakerResult( | |
| voiceprint_id=s.get('voiceprint_id', ''), | |
| label=s.get('label', ''), | |
| role=s.get('role', 'additional'), | |
| total_seconds=s.get('total_seconds', 0), | |
| quality=s.get('quality', 'Unknown'), | |
| is_synthetic=s.get('is_synthetic', False), | |
| synthetic_score=s.get('synthetic_score', 0), | |
| is_playback=s.get('is_playback', False), | |
| playback_score=s.get('playback_score', 0), | |
| playback_indicators=s.get('playback_indicators'), | |
| times_seen=s.get('times_seen', 1), | |
| is_flagged=s.get('is_flagged', False), | |
| segments=s.get('segments'), | |
| clip_path=s.get('clip_path'), | |
| )) | |
| return cls( | |
| test_id=d.get('test_id', ''), | |
| filename=d.get('filename', ''), | |
| duration_seconds=d.get('duration_seconds', 0), | |
| analyzed_at=d.get('analyzed_at', ''), | |
| main_speaker=main, | |
| additional_speakers=additional, | |
| background_anomalies=d.get('background_anomalies', []), | |
| wake_words=d.get('wake_words', []), | |
| assistant_responses=d.get('assistant_responses', []), | |
| prompt_voice_detected=d.get('prompt_voice_detected', False), | |
| prompt_voice_seconds=d.get('prompt_voice_seconds', 0), | |
| playback_detected=d.get('playback_detected', False), | |
| playback_score=d.get('playback_score', 0), | |
| playback_indicators=d.get('playback_indicators'), | |
| whisper_detected=d.get('whisper_detected', False), | |
| whisper_instances=d.get('whisper_instances'), | |
| reading_pattern_detected=d.get('reading_pattern_detected', False), | |
| reading_confidence=d.get('reading_confidence', 0), | |
| reading_indicators=d.get('reading_indicators'), | |
| suspicious_pauses_detected=d.get('suspicious_pauses_detected', False), | |
| suspicious_pauses=d.get('suspicious_pauses'), | |
| longest_pause=d.get('longest_pause', 0), | |
| ) | |
| class AudioAnalyzer: | |
| """Main analyzer that orchestrates all phases.""" | |
| def __init__(self, db_path: str = None, | |
| clips_dir: str = None, | |
| device: str = None): | |
| """ | |
| Initialize analyzer. | |
| Args: | |
| db_path: Path to SQLite database | |
| clips_dir: Directory to save audio clips | |
| device: torch device (cuda/cpu) | |
| """ | |
| self.device = device | |
| data_dir = os.environ.get("DATA_DIR", "data") | |
| if db_path is None: | |
| db_path = os.path.join(data_dir, "db", "voiceprints.db") | |
| if clips_dir is None: | |
| clips_dir = os.path.join(data_dir, "clips") | |
| self.clips_dir = clips_dir | |
| os.makedirs(clips_dir, exist_ok=True) | |
| # Initialize database | |
| self.db = Database(db_path) | |
| # Initialize components (lazy loaded) | |
| self._preprocessor = None | |
| self._vad = None | |
| self._diarizer = None | |
| self._voiceprint = None | |
| self._background = None | |
| self._synthetic = None | |
| self._playback = None | |
| self._wake_words = None | |
| # Fraud detectors | |
| self._whisper_detector = None | |
| self._reading_pattern = None | |
| self._pause_detector = None | |
| def preprocessor(self): | |
| if self._preprocessor is None: | |
| self._preprocessor = AudioPreprocessor() | |
| return self._preprocessor | |
| def vad(self): | |
| if self._vad is None: | |
| self._vad = VoiceActivityDetector(device=self.device) | |
| return self._vad | |
| def diarizer(self): | |
| if self._diarizer is None: | |
| self._diarizer = SpeakerDiarizer(device=self.device) | |
| return self._diarizer | |
| def voiceprint_extractor(self): | |
| if self._voiceprint is None: | |
| self._voiceprint = VoiceprintExtractor(device=self.device) | |
| return self._voiceprint | |
| def background_analyzer(self): | |
| if self._background is None: | |
| self._background = BackgroundAnalyzer() | |
| return self._background | |
| def synthetic_detector(self): | |
| if self._synthetic is None: | |
| self._synthetic = SyntheticDetector(device=self.device) | |
| return self._synthetic | |
| def playback_detector(self): | |
| if self._playback is None: | |
| self._playback = PlaybackDetector() | |
| return self._playback | |
| def wake_word_detector(self): | |
| if self._wake_words is None: | |
| self._wake_words = WakeWordDetector(model_size="base") | |
| return self._wake_words | |
| def whisper_detector(self): | |
| if self._whisper_detector is None: | |
| self._whisper_detector = WhisperDetector() | |
| return self._whisper_detector | |
| def reading_pattern_analyzer(self): | |
| if self._reading_pattern is None: | |
| self._reading_pattern = ReadingPatternAnalyzer() | |
| return self._reading_pattern | |
| def pause_detector(self): | |
| if self._pause_detector is None: | |
| self._pause_detector = SuspiciousPauseDetector() | |
| return self._pause_detector | |
| def analyze(self, audio_path: str, | |
| test_id: str = None, | |
| progress_callback: Callable[[str, int], None] = None, | |
| log_callback: Callable[[str], None] = None) -> AnalysisResult: | |
| """ | |
| Run full analysis on audio file. | |
| Args: | |
| audio_path: Path to audio file | |
| test_id: Optional test ID (generated if not provided) | |
| progress_callback: Optional callback for progress updates | |
| log_callback: Optional callback for detailed technical logs | |
| Returns: | |
| AnalysisResult with all findings | |
| """ | |
| t0 = time.time() | |
| def update_progress(msg: str, pct: int): | |
| if progress_callback: | |
| progress_callback(msg, pct) | |
| def log(msg: str): | |
| elapsed = time.time() - t0 | |
| entry = f"[{elapsed:06.1f}s] {msg}" | |
| if log_callback: | |
| log_callback(entry) | |
| # Generate test ID | |
| if test_id is None: | |
| test_id = f"test_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" | |
| filename = os.path.basename(audio_path) | |
| log(f"Starting analysis: {filename}") | |
| log(f"Test ID: {test_id}") | |
| # Step 1: Preprocess | |
| update_progress("Preprocessing audio...", 5) | |
| log("Step 1/9: Preprocessing audio (torchaudio)") | |
| waveform, sample_rate, metadata = self.preprocessor.process_file(audio_path) | |
| duration = metadata['normalized_duration'] | |
| log(f" torchaudio.load() → {duration:.1f}s audio, {sample_rate}Hz, mono") | |
| log(f" Normalized amplitude, resampled to {sample_rate}Hz") | |
| # Validate minimum audio duration (20 seconds) | |
| MIN_DURATION = 20.0 | |
| if duration < MIN_DURATION: | |
| log(f" ERROR: Audio too short ({duration:.1f}s < {MIN_DURATION:.0f}s)") | |
| raise ValueError(f"Audio too short: {duration:.1f}s. Minimum required: {MIN_DURATION:.0f}s") | |
| # Save normalized audio to temp file for other components | |
| with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: | |
| temp_path = f.name | |
| torchaudio.save(temp_path, waveform, sample_rate) | |
| try: | |
| # Step 2: VAD | |
| update_progress("Detecting voice activity...", 15) | |
| log("Step 2/9: Voice Activity Detection (SpeechBrain VAD)") | |
| log(" Model: speechbrain/vad-crdnn-libriparty") | |
| speech_segments = self.vad.detect(temp_path) | |
| total_speech = sum(s.end - s.start for s in speech_segments) | |
| speech_pct = (total_speech / duration * 100) if duration > 0 else 0 | |
| log(f" Result: {len(speech_segments)} speech segments detected") | |
| log(f" Total speech: {total_speech:.1f}s ({speech_pct:.1f}% of audio)") | |
| # Step 3: Speaker Diarization | |
| update_progress("Identifying speakers...", 30) | |
| log("Step 3/9: Speaker Diarization (SpeechBrain ECAPA-TDNN)") | |
| log(" Model: speechbrain/spkrec-ecapa-voxceleb") | |
| log(" Clustering: sklearn AgglomerativeClustering") | |
| speakers = self.diarizer.diarize(temp_path, speech_segments) | |
| log(f" Result: {len(speakers)} speaker(s) identified") | |
| # Step 4: Process speakers | |
| update_progress("Extracting voiceprints...", 45) | |
| log("Step 4/9: Voiceprint Extraction (ECAPA-TDNN 192-dim)") | |
| main_speaker_result = None | |
| additional_speakers = [] | |
| speaker_list = list(speakers.values()) | |
| # First pass: recalculate actual speaking time for all speakers | |
| for speaker_info in speaker_list: | |
| actual_speaking_time = sum(seg.end - seg.start for seg in speaker_info.segments) | |
| actual_speaking_time = min(actual_speaking_time, duration) # Cap to audio duration | |
| speaker_info.total_seconds = actual_speaking_time | |
| # Re-sort by speaking time (most speaking = main speaker) | |
| speaker_list = sorted(speaker_list, key=lambda s: s.total_seconds, reverse=True) | |
| for i, speaker_info in enumerate(speaker_list): | |
| role = "main" if i == 0 else "additional" | |
| log(f" Speaker {i} ({role}): {speaker_info.total_seconds:.1f}s speaking time") | |
| for i, speaker_info in enumerate(speaker_list): | |
| # Extract voiceprint | |
| if speaker_info.embedding is not None: | |
| vp_result = self.voiceprint_extractor.extract_from_embedding( | |
| speaker_info.embedding, | |
| speaker_info.total_seconds | |
| ) | |
| # Check for synthetic | |
| # Get speaker audio segments and run detection | |
| synthetic_result = self._detect_synthetic_for_speaker( | |
| waveform, sample_rate, speaker_info | |
| ) | |
| role = "main" if i == 0 else "additional" | |
| # Save to database and check for matches | |
| existing_vp, similarity = self.db.find_matching_voiceprint( | |
| vp_result.to_bytes(), | |
| threshold=0.75 | |
| ) | |
| if existing_vp: | |
| vp_id = existing_vp.id | |
| times_seen = existing_vp.times_seen + 1 | |
| is_flagged = existing_vp.is_flagged or times_seen >= 4 | |
| log(f" DB match: Speaker {i} → {vp_id} (similarity: {similarity:.2f}, seen {times_seen}x)") | |
| else: | |
| vp_id = vp_result.voiceprint_id | |
| times_seen = 1 | |
| is_flagged = False | |
| log(f" DB match: Speaker {i} → NEW voiceprint {vp_id}") | |
| log(f" Synthetic score: {synthetic_result.score:.2f} (is_synthetic: {synthetic_result.is_synthetic})") | |
| # Save clip for this speaker | |
| clip_path = self._save_speaker_clip( | |
| waveform, sample_rate, speaker_info, test_id, vp_id | |
| ) | |
| # Add to database | |
| self.db.add_voiceprint( | |
| vp_id=vp_id, | |
| embedding=vp_result.to_bytes(), | |
| test_id=test_id, | |
| filename=filename, | |
| role=role, | |
| duration=speaker_info.total_seconds, | |
| clip_path=clip_path | |
| ) | |
| speaker_result = SpeakerResult( | |
| voiceprint_id=vp_id, | |
| label=speaker_info.speaker_id, | |
| role=role, | |
| total_seconds=speaker_info.total_seconds, | |
| quality=self.voiceprint_extractor.quality_label(vp_result.quality_score), | |
| is_synthetic=synthetic_result.is_synthetic, | |
| synthetic_score=synthetic_result.score, | |
| times_seen=times_seen, | |
| is_flagged=is_flagged, | |
| segments=[{'start': s.start, 'end': s.end} for s in speaker_info.segments], | |
| clip_path=clip_path | |
| ) | |
| if i == 0: | |
| main_speaker_result = speaker_result | |
| else: | |
| additional_speakers.append(speaker_result) | |
| # Step 5: Background Analysis | |
| update_progress("Analyzing background audio...", 55) | |
| log("Step 5/9: Background Analysis (librosa)") | |
| log(" Analyzing non-speech segments for anomalies") | |
| waveform_np = waveform.squeeze().numpy() | |
| anomalies = self.background_analyzer.detect_anomalies( | |
| waveform_np, speech_segments | |
| ) | |
| log(f" Result: {len(anomalies)} background anomalies detected") | |
| for a in anomalies: | |
| log(f" {a.anomaly_type.value} at {a.start:.1f}s-{a.end:.1f}s ({a.amplitude_db:.1f}dB, conf: {a.confidence:.2f})") | |
| # Step 6: Playback Detection (detect if audio is from speakers) | |
| update_progress("Detecting playback/replay...", 65) | |
| log("Step 6/9: Playback Detection (librosa pitch/spectral)") | |
| playback_result = self.playback_detector.detect(waveform_np) | |
| log(f" Score: {playback_result.score:.2f}, is_playback: {playback_result.is_playback}") | |
| if playback_result.indicators: | |
| log(f" Indicators: {', '.join(playback_result.indicators)}") | |
| # Step 7: Wake Word Detection | |
| update_progress("Detecting wake words...", 70) | |
| log("Step 7/9: Wake Word Detection (OpenAI Whisper)") | |
| log(" Model: openai-whisper (base)") | |
| wake_analysis = self.wake_word_detector.analyze(temp_path) | |
| n_wake = len(wake_analysis.get('wake_words', [])) | |
| n_words = len(wake_analysis.get('word_timestamps', [])) | |
| transcription_text = wake_analysis.get('transcription', '') | |
| log(f" Transcribed {n_words} words, {n_wake} wake words found") | |
| if n_wake > 0: | |
| for w in wake_analysis['wake_words']: | |
| log(f" Wake word: '{w.word}' ({w.assistant}) at {w.time:.1f}s") | |
| # Step 8: Fraud Detection - Whisper, Reading Pattern, Suspicious Pauses | |
| update_progress("Running fraud detection...", 80) | |
| log("Step 8/9: Fraud Detection Modules") | |
| # 8a: Whisper detection (background voices) | |
| log(" 8a: Whisper Detector (background voice analysis)") | |
| main_speaker_segs = [] | |
| if main_speaker_result and main_speaker_result.segments: | |
| main_speaker_segs = main_speaker_result.segments | |
| whisper_result = self.whisper_detector.detect( | |
| waveform_np, sample_rate, main_speaker_segs | |
| ) | |
| log(f" Result: {len(whisper_result.instances)} background whispers detected") | |
| # 8b: Reading pattern detection (uses wake word transcription) | |
| log(" 8b: Reading Pattern Analyzer") | |
| word_timestamps = wake_analysis.get('word_timestamps', []) | |
| transcription = wake_analysis.get('transcription', '') | |
| reading_result = self.reading_pattern_analyzer.analyze( | |
| transcription, word_timestamps, duration | |
| ) | |
| log(f" is_reading: {reading_result.is_reading} (confidence: {reading_result.confidence:.2f})") | |
| if reading_result.indicators: | |
| log(f" Indicators: {', '.join(reading_result.indicators)}") | |
| # 8c: Suspicious pause detection | |
| log(" 8c: Suspicious Pause Detector") | |
| speech_segments_dict = [{'start': s.start, 'end': s.end} for s in speech_segments] | |
| pause_result = self.pause_detector.detect(speech_segments_dict, duration) | |
| log(f" Result: {len(pause_result.pauses)} suspicious pauses (longest: {pause_result.longest_pause:.1f}s)") | |
| for p in pause_result.pauses: | |
| log(f" Pause at {p.start:.1f}s-{p.end:.1f}s ({p.duration:.1f}s) - {p.context}") | |
| # Step 9: Compile results | |
| update_progress("Compiling results...", 90) | |
| log("Step 9/9: Compiling results & saving to database") | |
| # Detect prompt voice (simplified: assume first few seconds might be prompt) | |
| prompt_seconds = sum( | |
| s.duration for s in speech_segments | |
| if s.start < 5.0 # First 5 seconds | |
| ) | |
| result = AnalysisResult( | |
| test_id=test_id, | |
| filename=filename, | |
| duration_seconds=duration, | |
| analyzed_at=datetime.now().isoformat(), | |
| main_speaker=main_speaker_result, | |
| additional_speakers=additional_speakers, | |
| background_anomalies=[ | |
| { | |
| 'start': a.start, | |
| 'end': a.end, | |
| 'type': a.anomaly_type.value, | |
| 'amplitude_db': a.amplitude_db, | |
| 'confidence': a.confidence | |
| } | |
| for a in anomalies | |
| ], | |
| wake_words=[ | |
| { | |
| 'word': w.word, | |
| 'assistant': w.assistant, | |
| 'time': w.time, | |
| 'confidence': w.confidence, | |
| 'context': w.context | |
| } | |
| for w in wake_analysis['wake_words'] | |
| ], | |
| assistant_responses=wake_analysis['assistant_responses'], | |
| prompt_voice_detected=prompt_seconds > 0, | |
| prompt_voice_seconds=prompt_seconds, | |
| playback_detected=playback_result.is_playback, | |
| playback_score=playback_result.score, | |
| playback_indicators=playback_result.indicators, | |
| # Fraud detection results | |
| whisper_detected=whisper_result.detected, | |
| whisper_instances=[ | |
| {'start': w.start, 'end': w.end, 'confidence': w.confidence} | |
| for w in whisper_result.instances | |
| ], | |
| reading_pattern_detected=reading_result.is_reading, | |
| reading_confidence=reading_result.confidence, | |
| reading_indicators=reading_result.indicators, | |
| suspicious_pauses_detected=pause_result.detected, | |
| suspicious_pauses=[ | |
| {'start': p.start, 'end': p.end, 'duration': p.duration, 'context': p.context} | |
| for p in pause_result.pauses | |
| ], | |
| longest_pause=pause_result.longest_pause | |
| ) | |
| # Save analysis to database | |
| self.db.save_test_analysis( | |
| test_id=test_id, | |
| filename=filename, | |
| duration=duration, | |
| results=result.to_dict() | |
| ) | |
| log(f" Saved to SQLite database (test_id: {test_id})") | |
| total_time = time.time() - t0 | |
| n_speakers = 1 + len(additional_speakers) if main_speaker_result else 0 | |
| n_alerts = len(anomalies) + (1 if playback_result.is_playback else 0) + n_wake + len(whisper_result.instances) + len(pause_result.pauses) | |
| log(f"Analysis complete in {total_time:.1f}s — {n_speakers} speaker(s), {n_alerts} alert(s)") | |
| update_progress("Analysis complete!", 100) | |
| return result | |
| finally: | |
| # Cleanup temp file | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| def _detect_synthetic_for_speaker(self, waveform, sample_rate, speaker_info): | |
| """Run synthetic detection on speaker's audio. | |
| Combines both SyntheticDetector (voice characteristics) and | |
| PlaybackDetector (TTS/speaker playback) for better detection. | |
| """ | |
| from .phase6_synthetic import SyntheticResult | |
| # Concatenate speaker segments | |
| segments_audio = [] | |
| for seg in speaker_info.segments[:5]: # Limit to first 5 segments | |
| start_sample = int(seg.start * sample_rate) | |
| end_sample = int(seg.end * sample_rate) | |
| if end_sample <= waveform.shape[1]: | |
| segments_audio.append(waveform[:, start_sample:end_sample]) | |
| if not segments_audio: | |
| return SyntheticResult.from_score(0.0) | |
| speaker_audio = np.concatenate([s.squeeze().numpy() for s in segments_audio]) | |
| # Run both detectors on speaker's audio | |
| synthetic_result = self.synthetic_detector.detect(speaker_audio) | |
| playback_result = self.playback_detector.detect(speaker_audio) | |
| # Combine scores: if either detects synthetic/TTS, flag it | |
| # Playback with TTS indicators is strong evidence of synthetic | |
| tts_indicators = ['tts_flat_pitch', 'tts_low_pitch_variation', 'tts_regular_timing', | |
| 'smooth_spectrum', 'slightly_smooth_spectrum'] | |
| has_tts_indicators = any(ind in playback_result.indicators for ind in tts_indicators) | |
| # Calculate combined score | |
| if has_tts_indicators: | |
| # Strong TTS evidence from playback detector | |
| combined_score = max(synthetic_result.score, playback_result.score * 0.9) | |
| else: | |
| # Weight synthetic detector more, but consider playback | |
| combined_score = synthetic_result.score * 0.7 + playback_result.score * 0.3 | |
| # Boost if both detectors agree | |
| if synthetic_result.score > 0.4 and playback_result.score > 0.4: | |
| combined_score = min(1.0, combined_score * 1.2) | |
| return SyntheticResult.from_score(combined_score, threshold=0.45) | |
| def _save_speaker_clip(self, waveform, sample_rate, speaker_info, test_id, vp_id): | |
| """Save audio clip for a speaker (minimum 10 seconds for voice sample).""" | |
| segments = sorted(speaker_info.segments, key=lambda s: s.start) | |
| if not segments: | |
| return None | |
| # Merge overlapping segments first | |
| merged_segments = [] | |
| for seg in segments: | |
| if merged_segments and seg.start <= merged_segments[-1][1]: | |
| # Overlap - extend previous segment | |
| merged_segments[-1] = (merged_segments[-1][0], max(merged_segments[-1][1], seg.end)) | |
| else: | |
| merged_segments.append((seg.start, seg.end)) | |
| # Concatenate segments until we have at least 10 seconds for voice sample | |
| target_duration = 10.0 | |
| clips = [] | |
| total_duration = 0.0 | |
| for start, end in merged_segments: | |
| start_sample = int(start * sample_rate) | |
| end_sample = int(end * sample_rate) | |
| if end_sample <= waveform.shape[1]: | |
| clips.append(waveform[:, start_sample:end_sample]) | |
| total_duration += (end - start) | |
| if total_duration >= target_duration: | |
| break | |
| if not clips: | |
| return None | |
| # Concatenate all clips | |
| clip = torch.cat(clips, dim=1) | |
| # Convert to int16 PCM for browser compatibility | |
| clip_np = clip.squeeze(0).numpy() | |
| clip_int16 = np.clip(clip_np * 32767, -32768, 32767).astype(np.int16) | |
| # Save clip | |
| import soundfile as sf | |
| clip_filename = f"{test_id}_{vp_id}_{total_duration:.1f}s.wav" | |
| clip_path = os.path.join(self.clips_dir, clip_filename) | |
| sf.write(clip_path, clip_int16, sample_rate, subtype='PCM_16') | |
| return clip_path | |
| def get_voiceprint_history(self, vp_id: str) -> List[dict]: | |
| """Get appearance history for a voiceprint.""" | |
| appearances = self.db.get_voiceprint_appearances(vp_id) | |
| return [ | |
| { | |
| 'test_id': a.test_id, | |
| 'filename': a.test_filename, | |
| 'role': a.role, | |
| 'duration': a.duration_seconds, | |
| 'date': a.detected_at.isoformat() if a.detected_at else None, | |
| 'clip_path': a.clip_path | |
| } | |
| for a in appearances | |
| ] | |
| def get_database_stats(self) -> dict: | |
| """Get database statistics.""" | |
| return self.db.get_stats() | |