Spaces:
Sleeping
Sleeping
| """ | |
| Background Audio Analysis - detect subtle anomalies. | |
| """ | |
| import torch | |
| import numpy as np | |
| import librosa | |
| from typing import List, Optional | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| class AnomalyType(Enum): | |
| WHISPER = "whisper" | |
| DISTANT_VOICE = "distant_voice" | |
| SPEAKER_AUDIO = "speaker_audio" | |
| UNKNOWN = "unknown" | |
| class AudioSource(Enum): | |
| DIRECT = "direct" | |
| SPEAKER = "speaker" | |
| PHONE = "phone" | |
| UNKNOWN = "unknown" | |
| class BackgroundAnomaly: | |
| """A detected background anomaly.""" | |
| start: float | |
| end: float | |
| anomaly_type: AnomalyType | |
| amplitude_db: float | |
| confidence: float | |
| def duration(self) -> float: | |
| return self.end - self.start | |
| class BackgroundAnalyzer: | |
| """Analyze background audio for anomalies.""" | |
| def __init__(self): | |
| self.sample_rate = 16000 | |
| def amplify_background(self, waveform: np.ndarray, | |
| threshold_db: float = -40, | |
| boost_db: float = 25) -> np.ndarray: | |
| """ | |
| Amplify quiet background regions. | |
| Args: | |
| waveform: Audio waveform (numpy array) | |
| threshold_db: Regions below this are amplified | |
| boost_db: Amount to boost by | |
| Returns: | |
| Amplified waveform | |
| """ | |
| # Convert to dB | |
| rms = np.sqrt(np.mean(waveform ** 2)) | |
| if rms == 0: | |
| return waveform | |
| # Simple amplitude-based boosting | |
| amplified = waveform.copy() | |
| # Calculate local energy in windows | |
| window_size = int(0.1 * self.sample_rate) # 100ms windows | |
| hop = window_size // 2 | |
| for i in range(0, len(waveform) - window_size, hop): | |
| window = waveform[i:i + window_size] | |
| window_rms = np.sqrt(np.mean(window ** 2)) | |
| if window_rms > 0: | |
| window_db = 20 * np.log10(window_rms + 1e-10) | |
| if window_db < threshold_db: | |
| # Boost this region | |
| boost_factor = 10 ** (boost_db / 20) | |
| amplified[i:i + window_size] *= boost_factor | |
| # Normalize to prevent clipping | |
| max_amp = np.abs(amplified).max() | |
| if max_amp > 0.95: | |
| amplified = amplified * 0.95 / max_amp | |
| return amplified | |
| def detect_anomalies(self, waveform: np.ndarray, | |
| speech_segments: List = None, | |
| threshold_db: float = -50) -> List[BackgroundAnomaly]: | |
| """ | |
| Detect anomalies in background audio. | |
| Args: | |
| waveform: Audio waveform | |
| speech_segments: Optional VAD segments to exclude | |
| threshold_db: Minimum amplitude to consider | |
| Returns: | |
| List of detected anomalies | |
| """ | |
| anomalies = [] | |
| # Amplify background | |
| amplified = self.amplify_background(waveform) | |
| # Analyze in windows | |
| window_size = int(0.5 * self.sample_rate) # 500ms | |
| hop = window_size // 4 | |
| for i in range(0, len(amplified) - window_size, hop): | |
| start_time = i / self.sample_rate | |
| end_time = (i + window_size) / self.sample_rate | |
| # Skip if in main speech | |
| if speech_segments: | |
| in_speech = any( | |
| s.start <= start_time + 0.25 <= s.end | |
| for s in speech_segments | |
| ) | |
| if in_speech: | |
| continue | |
| window = amplified[i:i + window_size] | |
| window_rms = np.sqrt(np.mean(window ** 2)) | |
| if window_rms == 0: | |
| continue | |
| window_db = 20 * np.log10(window_rms + 1e-10) | |
| # Check for anomaly | |
| if window_db > threshold_db: | |
| anomaly_type = self._classify_anomaly(window) | |
| confidence = self._calculate_confidence(window, window_db, threshold_db) | |
| if confidence > 0.3: # Minimum confidence threshold | |
| anomalies.append(BackgroundAnomaly( | |
| start=start_time, | |
| end=end_time, | |
| anomaly_type=anomaly_type, | |
| amplitude_db=window_db, | |
| confidence=confidence | |
| )) | |
| # Merge adjacent anomalies | |
| anomalies = self._merge_anomalies(anomalies) | |
| return anomalies | |
| def _classify_anomaly(self, window: np.ndarray) -> AnomalyType: | |
| """Classify the type of anomaly.""" | |
| # Extract spectral features | |
| if len(window) < 512: | |
| return AnomalyType.UNKNOWN | |
| # Compute spectrum | |
| spectrum = np.abs(np.fft.rfft(window)) | |
| freqs = np.fft.rfftfreq(len(window), 1/self.sample_rate) | |
| # Frequency band energies | |
| low_mask = freqs < 300 | |
| mid_mask = (freqs >= 300) & (freqs < 3000) | |
| high_mask = freqs >= 3000 | |
| low_energy = np.sum(spectrum[low_mask] ** 2) | |
| mid_energy = np.sum(spectrum[mid_mask] ** 2) | |
| high_energy = np.sum(spectrum[high_mask] ** 2) | |
| total = low_energy + mid_energy + high_energy + 1e-10 | |
| # Whisper: less low frequency, more high frequency | |
| if low_energy / total < 0.1 and high_energy / total > 0.3: | |
| return AnomalyType.WHISPER | |
| # Speaker/Phone: limited bandwidth | |
| if high_energy / total < 0.1: | |
| return AnomalyType.SPEAKER_AUDIO | |
| # Distant voice: high reverb indicator (simplified) | |
| if mid_energy / total > 0.5: | |
| return AnomalyType.DISTANT_VOICE | |
| return AnomalyType.UNKNOWN | |
| def _calculate_confidence(self, window: np.ndarray, | |
| db: float, threshold: float) -> float: | |
| """Calculate confidence score for anomaly.""" | |
| # Higher amplitude above threshold = higher confidence | |
| db_above = db - threshold | |
| confidence = min(1.0, db_above / 20) # Saturate at 20dB above | |
| return max(0.0, confidence) | |
| def _merge_anomalies(self, anomalies: List[BackgroundAnomaly], | |
| max_gap: float = 0.5) -> List[BackgroundAnomaly]: | |
| """Merge adjacent anomalies of same type.""" | |
| if not anomalies: | |
| return [] | |
| # Sort by start time | |
| anomalies = sorted(anomalies, key=lambda a: a.start) | |
| merged = [anomalies[0]] | |
| for anomaly in anomalies[1:]: | |
| last = merged[-1] | |
| # Merge if same type and close enough | |
| if (anomaly.anomaly_type == last.anomaly_type and | |
| anomaly.start - last.end < max_gap): | |
| # Extend the last anomaly | |
| merged[-1] = BackgroundAnomaly( | |
| start=last.start, | |
| end=anomaly.end, | |
| anomaly_type=last.anomaly_type, | |
| amplitude_db=max(last.amplitude_db, anomaly.amplitude_db), | |
| confidence=max(last.confidence, anomaly.confidence) | |
| ) | |
| else: | |
| merged.append(anomaly) | |
| return merged | |
| def classify_audio_source(self, waveform: np.ndarray) -> AudioSource: | |
| """Classify the source of audio (direct, speaker, phone).""" | |
| if len(waveform) < 1024: | |
| return AudioSource.UNKNOWN | |
| # Analyze frequency content | |
| spectrum = np.abs(np.fft.rfft(waveform)) | |
| freqs = np.fft.rfftfreq(len(waveform), 1/self.sample_rate) | |
| # Find effective bandwidth | |
| total_energy = np.sum(spectrum ** 2) | |
| if total_energy == 0: | |
| return AudioSource.UNKNOWN | |
| cumsum = np.cumsum(spectrum ** 2) | |
| idx_95 = np.searchsorted(cumsum, 0.95 * total_energy) | |
| max_freq = freqs[min(idx_95, len(freqs)-1)] | |
| # Phone typically cuts off around 3.4kHz | |
| if max_freq < 4000: | |
| return AudioSource.PHONE | |
| # Speaker typically has limited high freq | |
| if max_freq < 8000: | |
| return AudioSource.SPEAKER | |
| return AudioSource.DIRECT | |