daasime's picture
Add SOP Audio Analyzer app files
ebba35f
"""
Background Audio Analysis - detect subtle anomalies.
"""
import torch
import numpy as np
import librosa
from typing import List, Optional
from dataclasses import dataclass
from enum import Enum
class AnomalyType(Enum):
WHISPER = "whisper"
DISTANT_VOICE = "distant_voice"
SPEAKER_AUDIO = "speaker_audio"
UNKNOWN = "unknown"
class AudioSource(Enum):
DIRECT = "direct"
SPEAKER = "speaker"
PHONE = "phone"
UNKNOWN = "unknown"
@dataclass
class BackgroundAnomaly:
"""A detected background anomaly."""
start: float
end: float
anomaly_type: AnomalyType
amplitude_db: float
confidence: float
@property
def duration(self) -> float:
return self.end - self.start
class BackgroundAnalyzer:
"""Analyze background audio for anomalies."""
def __init__(self):
self.sample_rate = 16000
def amplify_background(self, waveform: np.ndarray,
threshold_db: float = -40,
boost_db: float = 25) -> np.ndarray:
"""
Amplify quiet background regions.
Args:
waveform: Audio waveform (numpy array)
threshold_db: Regions below this are amplified
boost_db: Amount to boost by
Returns:
Amplified waveform
"""
# Convert to dB
rms = np.sqrt(np.mean(waveform ** 2))
if rms == 0:
return waveform
# Simple amplitude-based boosting
amplified = waveform.copy()
# Calculate local energy in windows
window_size = int(0.1 * self.sample_rate) # 100ms windows
hop = window_size // 2
for i in range(0, len(waveform) - window_size, hop):
window = waveform[i:i + window_size]
window_rms = np.sqrt(np.mean(window ** 2))
if window_rms > 0:
window_db = 20 * np.log10(window_rms + 1e-10)
if window_db < threshold_db:
# Boost this region
boost_factor = 10 ** (boost_db / 20)
amplified[i:i + window_size] *= boost_factor
# Normalize to prevent clipping
max_amp = np.abs(amplified).max()
if max_amp > 0.95:
amplified = amplified * 0.95 / max_amp
return amplified
def detect_anomalies(self, waveform: np.ndarray,
speech_segments: List = None,
threshold_db: float = -50) -> List[BackgroundAnomaly]:
"""
Detect anomalies in background audio.
Args:
waveform: Audio waveform
speech_segments: Optional VAD segments to exclude
threshold_db: Minimum amplitude to consider
Returns:
List of detected anomalies
"""
anomalies = []
# Amplify background
amplified = self.amplify_background(waveform)
# Analyze in windows
window_size = int(0.5 * self.sample_rate) # 500ms
hop = window_size // 4
for i in range(0, len(amplified) - window_size, hop):
start_time = i / self.sample_rate
end_time = (i + window_size) / self.sample_rate
# Skip if in main speech
if speech_segments:
in_speech = any(
s.start <= start_time + 0.25 <= s.end
for s in speech_segments
)
if in_speech:
continue
window = amplified[i:i + window_size]
window_rms = np.sqrt(np.mean(window ** 2))
if window_rms == 0:
continue
window_db = 20 * np.log10(window_rms + 1e-10)
# Check for anomaly
if window_db > threshold_db:
anomaly_type = self._classify_anomaly(window)
confidence = self._calculate_confidence(window, window_db, threshold_db)
if confidence > 0.3: # Minimum confidence threshold
anomalies.append(BackgroundAnomaly(
start=start_time,
end=end_time,
anomaly_type=anomaly_type,
amplitude_db=window_db,
confidence=confidence
))
# Merge adjacent anomalies
anomalies = self._merge_anomalies(anomalies)
return anomalies
def _classify_anomaly(self, window: np.ndarray) -> AnomalyType:
"""Classify the type of anomaly."""
# Extract spectral features
if len(window) < 512:
return AnomalyType.UNKNOWN
# Compute spectrum
spectrum = np.abs(np.fft.rfft(window))
freqs = np.fft.rfftfreq(len(window), 1/self.sample_rate)
# Frequency band energies
low_mask = freqs < 300
mid_mask = (freqs >= 300) & (freqs < 3000)
high_mask = freqs >= 3000
low_energy = np.sum(spectrum[low_mask] ** 2)
mid_energy = np.sum(spectrum[mid_mask] ** 2)
high_energy = np.sum(spectrum[high_mask] ** 2)
total = low_energy + mid_energy + high_energy + 1e-10
# Whisper: less low frequency, more high frequency
if low_energy / total < 0.1 and high_energy / total > 0.3:
return AnomalyType.WHISPER
# Speaker/Phone: limited bandwidth
if high_energy / total < 0.1:
return AnomalyType.SPEAKER_AUDIO
# Distant voice: high reverb indicator (simplified)
if mid_energy / total > 0.5:
return AnomalyType.DISTANT_VOICE
return AnomalyType.UNKNOWN
def _calculate_confidence(self, window: np.ndarray,
db: float, threshold: float) -> float:
"""Calculate confidence score for anomaly."""
# Higher amplitude above threshold = higher confidence
db_above = db - threshold
confidence = min(1.0, db_above / 20) # Saturate at 20dB above
return max(0.0, confidence)
def _merge_anomalies(self, anomalies: List[BackgroundAnomaly],
max_gap: float = 0.5) -> List[BackgroundAnomaly]:
"""Merge adjacent anomalies of same type."""
if not anomalies:
return []
# Sort by start time
anomalies = sorted(anomalies, key=lambda a: a.start)
merged = [anomalies[0]]
for anomaly in anomalies[1:]:
last = merged[-1]
# Merge if same type and close enough
if (anomaly.anomaly_type == last.anomaly_type and
anomaly.start - last.end < max_gap):
# Extend the last anomaly
merged[-1] = BackgroundAnomaly(
start=last.start,
end=anomaly.end,
anomaly_type=last.anomaly_type,
amplitude_db=max(last.amplitude_db, anomaly.amplitude_db),
confidence=max(last.confidence, anomaly.confidence)
)
else:
merged.append(anomaly)
return merged
def classify_audio_source(self, waveform: np.ndarray) -> AudioSource:
"""Classify the source of audio (direct, speaker, phone)."""
if len(waveform) < 1024:
return AudioSource.UNKNOWN
# Analyze frequency content
spectrum = np.abs(np.fft.rfft(waveform))
freqs = np.fft.rfftfreq(len(waveform), 1/self.sample_rate)
# Find effective bandwidth
total_energy = np.sum(spectrum ** 2)
if total_energy == 0:
return AudioSource.UNKNOWN
cumsum = np.cumsum(spectrum ** 2)
idx_95 = np.searchsorted(cumsum, 0.95 * total_energy)
max_freq = freqs[min(idx_95, len(freqs)-1)]
# Phone typically cuts off around 3.4kHz
if max_freq < 4000:
return AudioSource.PHONE
# Speaker typically has limited high freq
if max_freq < 8000:
return AudioSource.SPEAKER
return AudioSource.DIRECT