""" Background Audio Analysis - detect subtle anomalies. """ import torch import numpy as np import librosa from typing import List, Optional from dataclasses import dataclass from enum import Enum class AnomalyType(Enum): WHISPER = "whisper" DISTANT_VOICE = "distant_voice" SPEAKER_AUDIO = "speaker_audio" UNKNOWN = "unknown" class AudioSource(Enum): DIRECT = "direct" SPEAKER = "speaker" PHONE = "phone" UNKNOWN = "unknown" @dataclass class BackgroundAnomaly: """A detected background anomaly.""" start: float end: float anomaly_type: AnomalyType amplitude_db: float confidence: float @property def duration(self) -> float: return self.end - self.start class BackgroundAnalyzer: """Analyze background audio for anomalies.""" def __init__(self): self.sample_rate = 16000 def amplify_background(self, waveform: np.ndarray, threshold_db: float = -40, boost_db: float = 25) -> np.ndarray: """ Amplify quiet background regions. Args: waveform: Audio waveform (numpy array) threshold_db: Regions below this are amplified boost_db: Amount to boost by Returns: Amplified waveform """ # Convert to dB rms = np.sqrt(np.mean(waveform ** 2)) if rms == 0: return waveform # Simple amplitude-based boosting amplified = waveform.copy() # Calculate local energy in windows window_size = int(0.1 * self.sample_rate) # 100ms windows hop = window_size // 2 for i in range(0, len(waveform) - window_size, hop): window = waveform[i:i + window_size] window_rms = np.sqrt(np.mean(window ** 2)) if window_rms > 0: window_db = 20 * np.log10(window_rms + 1e-10) if window_db < threshold_db: # Boost this region boost_factor = 10 ** (boost_db / 20) amplified[i:i + window_size] *= boost_factor # Normalize to prevent clipping max_amp = np.abs(amplified).max() if max_amp > 0.95: amplified = amplified * 0.95 / max_amp return amplified def detect_anomalies(self, waveform: np.ndarray, speech_segments: List = None, threshold_db: float = -50) -> List[BackgroundAnomaly]: """ Detect anomalies in background audio. Args: waveform: Audio waveform speech_segments: Optional VAD segments to exclude threshold_db: Minimum amplitude to consider Returns: List of detected anomalies """ anomalies = [] # Amplify background amplified = self.amplify_background(waveform) # Analyze in windows window_size = int(0.5 * self.sample_rate) # 500ms hop = window_size // 4 for i in range(0, len(amplified) - window_size, hop): start_time = i / self.sample_rate end_time = (i + window_size) / self.sample_rate # Skip if in main speech if speech_segments: in_speech = any( s.start <= start_time + 0.25 <= s.end for s in speech_segments ) if in_speech: continue window = amplified[i:i + window_size] window_rms = np.sqrt(np.mean(window ** 2)) if window_rms == 0: continue window_db = 20 * np.log10(window_rms + 1e-10) # Check for anomaly if window_db > threshold_db: anomaly_type = self._classify_anomaly(window) confidence = self._calculate_confidence(window, window_db, threshold_db) if confidence > 0.3: # Minimum confidence threshold anomalies.append(BackgroundAnomaly( start=start_time, end=end_time, anomaly_type=anomaly_type, amplitude_db=window_db, confidence=confidence )) # Merge adjacent anomalies anomalies = self._merge_anomalies(anomalies) return anomalies def _classify_anomaly(self, window: np.ndarray) -> AnomalyType: """Classify the type of anomaly.""" # Extract spectral features if len(window) < 512: return AnomalyType.UNKNOWN # Compute spectrum spectrum = np.abs(np.fft.rfft(window)) freqs = np.fft.rfftfreq(len(window), 1/self.sample_rate) # Frequency band energies low_mask = freqs < 300 mid_mask = (freqs >= 300) & (freqs < 3000) high_mask = freqs >= 3000 low_energy = np.sum(spectrum[low_mask] ** 2) mid_energy = np.sum(spectrum[mid_mask] ** 2) high_energy = np.sum(spectrum[high_mask] ** 2) total = low_energy + mid_energy + high_energy + 1e-10 # Whisper: less low frequency, more high frequency if low_energy / total < 0.1 and high_energy / total > 0.3: return AnomalyType.WHISPER # Speaker/Phone: limited bandwidth if high_energy / total < 0.1: return AnomalyType.SPEAKER_AUDIO # Distant voice: high reverb indicator (simplified) if mid_energy / total > 0.5: return AnomalyType.DISTANT_VOICE return AnomalyType.UNKNOWN def _calculate_confidence(self, window: np.ndarray, db: float, threshold: float) -> float: """Calculate confidence score for anomaly.""" # Higher amplitude above threshold = higher confidence db_above = db - threshold confidence = min(1.0, db_above / 20) # Saturate at 20dB above return max(0.0, confidence) def _merge_anomalies(self, anomalies: List[BackgroundAnomaly], max_gap: float = 0.5) -> List[BackgroundAnomaly]: """Merge adjacent anomalies of same type.""" if not anomalies: return [] # Sort by start time anomalies = sorted(anomalies, key=lambda a: a.start) merged = [anomalies[0]] for anomaly in anomalies[1:]: last = merged[-1] # Merge if same type and close enough if (anomaly.anomaly_type == last.anomaly_type and anomaly.start - last.end < max_gap): # Extend the last anomaly merged[-1] = BackgroundAnomaly( start=last.start, end=anomaly.end, anomaly_type=last.anomaly_type, amplitude_db=max(last.amplitude_db, anomaly.amplitude_db), confidence=max(last.confidence, anomaly.confidence) ) else: merged.append(anomaly) return merged def classify_audio_source(self, waveform: np.ndarray) -> AudioSource: """Classify the source of audio (direct, speaker, phone).""" if len(waveform) < 1024: return AudioSource.UNKNOWN # Analyze frequency content spectrum = np.abs(np.fft.rfft(waveform)) freqs = np.fft.rfftfreq(len(waveform), 1/self.sample_rate) # Find effective bandwidth total_energy = np.sum(spectrum ** 2) if total_energy == 0: return AudioSource.UNKNOWN cumsum = np.cumsum(spectrum ** 2) idx_95 = np.searchsorted(cumsum, 0.95 * total_energy) max_freq = freqs[min(idx_95, len(freqs)-1)] # Phone typically cuts off around 3.4kHz if max_freq < 4000: return AudioSource.PHONE # Speaker typically has limited high freq if max_freq < 8000: return AudioSource.SPEAKER return AudioSource.DIRECT