|
|
""" |
|
|
Audio Processing Module for Speech Pathology Diagnosis |
|
|
|
|
|
This module provides audio processing utilities including: |
|
|
- Audio loading, resampling, and normalization |
|
|
- Audio chunking for phone-level analysis |
|
|
- Voice Activity Detection (VAD) integration |
|
|
- Streaming audio buffer management |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import numpy as np |
|
|
import librosa |
|
|
import soundfile as sf |
|
|
import webrtcvad |
|
|
from typing import List, Optional, Tuple, Union, Iterator |
|
|
from pathlib import Path |
|
|
from dataclasses import dataclass |
|
|
from collections import deque |
|
|
import io |
|
|
|
|
|
from config import AudioConfig |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class AudioChunk: |
|
|
""" |
|
|
Container for an audio chunk with metadata. |
|
|
|
|
|
Attributes: |
|
|
data: Audio samples as numpy array |
|
|
sample_rate: Sample rate in Hz |
|
|
start_time_ms: Start time in milliseconds |
|
|
end_time_ms: End time in milliseconds |
|
|
is_speech: Whether VAD detected speech in this chunk |
|
|
chunk_index: Index of chunk in sequence |
|
|
""" |
|
|
data: np.ndarray |
|
|
sample_rate: int |
|
|
start_time_ms: float |
|
|
end_time_ms: float |
|
|
is_speech: bool = False |
|
|
chunk_index: int = 0 |
|
|
|
|
|
|
|
|
class AudioProcessor: |
|
|
""" |
|
|
Audio processing utility for speech pathology diagnosis. |
|
|
|
|
|
Handles: |
|
|
- Loading audio from files or arrays |
|
|
- Resampling to target sample rate (16kHz) |
|
|
- Normalization to [-1, 1] range |
|
|
- Chunking audio into phone-level frames (20ms) |
|
|
- Voice Activity Detection (VAD) integration |
|
|
""" |
|
|
|
|
|
def __init__(self, audio_config: Optional[AudioConfig] = None): |
|
|
""" |
|
|
Initialize AudioProcessor. |
|
|
|
|
|
Args: |
|
|
audio_config: Audio configuration. Uses default if None. |
|
|
""" |
|
|
from config import default_audio_config |
|
|
|
|
|
self.config = audio_config or default_audio_config |
|
|
self.target_sr = self.config.sample_rate |
|
|
self.chunk_duration_ms = self.config.chunk_duration_ms |
|
|
self.hop_length_ms = self.config.hop_length_ms |
|
|
|
|
|
|
|
|
self.chunk_size_samples = int(self.chunk_duration_ms * self.target_sr / 1000) |
|
|
self.hop_size_samples = int(self.hop_length_ms * self.target_sr / 1000) |
|
|
|
|
|
|
|
|
try: |
|
|
self.vad = webrtcvad.Vad(self.config.vad_aggressiveness) |
|
|
logger.info(f"VAD initialized with aggressiveness={self.config.vad_aggressiveness}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to initialize VAD: {e}. VAD features will be disabled.") |
|
|
self.vad = None |
|
|
|
|
|
logger.info(f"AudioProcessor initialized: target_sr={self.target_sr}Hz, " |
|
|
f"chunk_duration={self.chunk_duration_ms}ms, " |
|
|
f"hop_length={self.hop_length_ms}ms") |
|
|
|
|
|
def load_audio( |
|
|
self, |
|
|
audio_source: Union[str, Path, np.ndarray, bytes], |
|
|
target_sr: Optional[int] = None |
|
|
) -> Tuple[np.ndarray, int]: |
|
|
""" |
|
|
Load audio from file, array, or bytes. |
|
|
|
|
|
Args: |
|
|
audio_source: Audio file path, numpy array, or bytes |
|
|
target_sr: Target sample rate (defaults to config sample_rate) |
|
|
|
|
|
Returns: |
|
|
Tuple of (audio_array, sample_rate) |
|
|
|
|
|
Raises: |
|
|
ValueError: If audio cannot be loaded |
|
|
RuntimeError: If audio processing fails |
|
|
""" |
|
|
target_sr = target_sr or self.target_sr |
|
|
|
|
|
try: |
|
|
if isinstance(audio_source, (str, Path)): |
|
|
|
|
|
logger.debug(f"Loading audio from file: {audio_source}") |
|
|
audio_array, sr = librosa.load(str(audio_source), sr=target_sr, mono=True) |
|
|
logger.info(f"Loaded audio: {len(audio_array)} samples, {sr}Hz, " |
|
|
f"{len(audio_array)/sr:.2f}s duration") |
|
|
|
|
|
elif isinstance(audio_source, bytes): |
|
|
|
|
|
logger.debug("Loading audio from bytes") |
|
|
audio_io = io.BytesIO(audio_source) |
|
|
audio_array, sr = librosa.load(audio_io, sr=target_sr, mono=True) |
|
|
logger.info(f"Loaded audio from bytes: {len(audio_array)} samples, {sr}Hz") |
|
|
|
|
|
elif isinstance(audio_source, np.ndarray): |
|
|
|
|
|
audio_array = audio_source |
|
|
if len(audio_array.shape) > 1: |
|
|
audio_array = librosa.to_mono(audio_array) |
|
|
|
|
|
|
|
|
if target_sr and len(audio_array) > 0: |
|
|
|
|
|
|
|
|
|
|
|
if target_sr != self.target_sr: |
|
|
audio_array = librosa.resample( |
|
|
audio_array, |
|
|
orig_sr=self.target_sr, |
|
|
target_sr=target_sr |
|
|
) |
|
|
sr = target_sr |
|
|
logger.debug(f"Using audio array: {len(audio_array)} samples") |
|
|
|
|
|
else: |
|
|
raise ValueError(f"Unsupported audio source type: {type(audio_source)}") |
|
|
|
|
|
|
|
|
audio_array = self.normalize_audio(audio_array) |
|
|
|
|
|
return audio_array, sr |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load audio: {e}", exc_info=True) |
|
|
raise ValueError(f"Cannot load audio: {e}") from e |
|
|
|
|
|
def normalize_audio(self, audio: np.ndarray) -> np.ndarray: |
|
|
""" |
|
|
Normalize audio to [-1, 1] range. |
|
|
|
|
|
Args: |
|
|
audio: Audio array |
|
|
|
|
|
Returns: |
|
|
Normalized audio array |
|
|
""" |
|
|
if len(audio) == 0: |
|
|
return audio |
|
|
|
|
|
max_val = np.abs(audio).max() |
|
|
if max_val > 0: |
|
|
audio = audio / max_val |
|
|
|
|
|
|
|
|
audio = np.clip(audio, -1.0, 1.0) |
|
|
|
|
|
return audio |
|
|
|
|
|
def resample_audio( |
|
|
self, |
|
|
audio: np.ndarray, |
|
|
orig_sr: int, |
|
|
target_sr: Optional[int] = None |
|
|
) -> np.ndarray: |
|
|
""" |
|
|
Resample audio to target sample rate. |
|
|
|
|
|
Args: |
|
|
audio: Audio array |
|
|
orig_sr: Original sample rate |
|
|
target_sr: Target sample rate (defaults to config sample_rate) |
|
|
|
|
|
Returns: |
|
|
Resampled audio array |
|
|
""" |
|
|
target_sr = target_sr or self.target_sr |
|
|
|
|
|
if orig_sr == target_sr: |
|
|
return audio |
|
|
|
|
|
logger.debug(f"Resampling from {orig_sr}Hz to {target_sr}Hz") |
|
|
resampled = librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) |
|
|
return resampled |
|
|
|
|
|
def chunk_audio( |
|
|
self, |
|
|
audio: np.ndarray, |
|
|
sample_rate: Optional[int] = None, |
|
|
apply_vad: bool = False |
|
|
) -> Iterator[AudioChunk]: |
|
|
""" |
|
|
Chunk audio into overlapping frames for phone-level analysis. |
|
|
|
|
|
Args: |
|
|
audio: Audio array |
|
|
sample_rate: Sample rate (defaults to config sample_rate) |
|
|
apply_vad: Whether to apply VAD to detect speech chunks |
|
|
|
|
|
Yields: |
|
|
AudioChunk objects |
|
|
""" |
|
|
sample_rate = sample_rate or self.target_sr |
|
|
|
|
|
if len(audio) < self.chunk_size_samples: |
|
|
|
|
|
chunk = AudioChunk( |
|
|
data=audio, |
|
|
sample_rate=sample_rate, |
|
|
start_time_ms=0.0, |
|
|
end_time_ms=len(audio) / sample_rate * 1000, |
|
|
is_speech=self._detect_speech(audio, sample_rate) if apply_vad else False, |
|
|
chunk_index=0 |
|
|
) |
|
|
yield chunk |
|
|
return |
|
|
|
|
|
chunk_index = 0 |
|
|
for start_sample in range(0, len(audio) - self.chunk_size_samples + 1, |
|
|
self.hop_size_samples): |
|
|
end_sample = start_sample + self.chunk_size_samples |
|
|
|
|
|
chunk_data = audio[start_sample:end_sample] |
|
|
start_time_ms = start_sample / sample_rate * 1000 |
|
|
end_time_ms = end_sample / sample_rate * 1000 |
|
|
|
|
|
|
|
|
is_speech = False |
|
|
if apply_vad: |
|
|
is_speech = self._detect_speech(chunk_data, sample_rate) |
|
|
|
|
|
chunk = AudioChunk( |
|
|
data=chunk_data, |
|
|
sample_rate=sample_rate, |
|
|
start_time_ms=start_time_ms, |
|
|
end_time_ms=end_time_ms, |
|
|
is_speech=is_speech, |
|
|
chunk_index=chunk_index |
|
|
) |
|
|
|
|
|
yield chunk |
|
|
chunk_index += 1 |
|
|
|
|
|
def _detect_speech(self, audio_chunk: np.ndarray, sample_rate: int) -> bool: |
|
|
""" |
|
|
Detect if audio chunk contains speech using VAD. |
|
|
|
|
|
Args: |
|
|
audio_chunk: Audio chunk array |
|
|
sample_rate: Sample rate |
|
|
|
|
|
Returns: |
|
|
True if speech detected, False otherwise |
|
|
""" |
|
|
if self.vad is None: |
|
|
return True |
|
|
|
|
|
|
|
|
if sample_rate not in [8000, 16000, 32000, 48000]: |
|
|
logger.warning(f"VAD requires sample rate 8/16/32/48kHz, got {sample_rate}Hz. Skipping VAD.") |
|
|
return True |
|
|
|
|
|
|
|
|
frame_duration_ms = len(audio_chunk) / sample_rate * 1000 |
|
|
if frame_duration_ms not in [10, 20, 30]: |
|
|
logger.debug(f"Frame duration {frame_duration_ms}ms not optimal for VAD. Using anyway.") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
int16_audio = (audio_chunk * 32767).astype(np.int16) |
|
|
|
|
|
|
|
|
audio_bytes = int16_audio.tobytes() |
|
|
|
|
|
|
|
|
is_speech = self.vad.is_speech(audio_bytes, sample_rate) |
|
|
|
|
|
return is_speech |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"VAD detection failed: {e}. Assuming speech.") |
|
|
return True |
|
|
|
|
|
def get_speech_segments( |
|
|
self, |
|
|
audio: np.ndarray, |
|
|
sample_rate: Optional[int] = None, |
|
|
min_speech_duration_ms: float = 100.0 |
|
|
) -> List[Tuple[float, float]]: |
|
|
""" |
|
|
Get speech segments from audio using VAD. |
|
|
|
|
|
Args: |
|
|
audio: Audio array |
|
|
sample_rate: Sample rate |
|
|
min_speech_duration_ms: Minimum duration of speech segment to include |
|
|
|
|
|
Returns: |
|
|
List of (start_ms, end_ms) tuples for speech segments |
|
|
""" |
|
|
sample_rate = sample_rate or self.target_sr |
|
|
|
|
|
if self.vad is None: |
|
|
|
|
|
duration_ms = len(audio) / sample_rate * 1000 |
|
|
return [(0.0, duration_ms)] |
|
|
|
|
|
speech_segments = [] |
|
|
in_speech = False |
|
|
speech_start_ms = 0.0 |
|
|
|
|
|
|
|
|
for chunk in self.chunk_audio(audio, sample_rate, apply_vad=True): |
|
|
if chunk.is_speech and not in_speech: |
|
|
|
|
|
in_speech = True |
|
|
speech_start_ms = chunk.start_time_ms |
|
|
elif not chunk.is_speech and in_speech: |
|
|
|
|
|
in_speech = False |
|
|
duration_ms = chunk.start_time_ms - speech_start_ms |
|
|
if duration_ms >= min_speech_duration_ms: |
|
|
speech_segments.append((speech_start_ms, chunk.start_time_ms)) |
|
|
|
|
|
|
|
|
if in_speech: |
|
|
duration_ms = len(audio) / sample_rate * 1000 - speech_start_ms |
|
|
if duration_ms >= min_speech_duration_ms: |
|
|
speech_segments.append((speech_start_ms, len(audio) / sample_rate * 1000)) |
|
|
|
|
|
logger.info(f"Detected {len(speech_segments)} speech segments") |
|
|
return speech_segments |
|
|
|
|
|
def process_audio_file( |
|
|
self, |
|
|
file_path: Union[str, Path], |
|
|
apply_vad: bool = False |
|
|
) -> Tuple[np.ndarray, int, List[AudioChunk]]: |
|
|
""" |
|
|
Complete audio processing pipeline: load, normalize, chunk. |
|
|
|
|
|
Args: |
|
|
file_path: Path to audio file |
|
|
apply_vad: Whether to apply VAD |
|
|
|
|
|
Returns: |
|
|
Tuple of (audio_array, sample_rate, chunks_list) |
|
|
""" |
|
|
logger.info(f"Processing audio file: {file_path}") |
|
|
|
|
|
|
|
|
audio, sr = self.load_audio(file_path) |
|
|
|
|
|
|
|
|
chunks = list(self.chunk_audio(audio, sr, apply_vad=apply_vad)) |
|
|
|
|
|
logger.info(f"Processed audio: {len(audio)} samples, {len(chunks)} chunks") |
|
|
|
|
|
return audio, sr, chunks |
|
|
|
|
|
|
|
|
class StreamingAudioBuffer: |
|
|
""" |
|
|
Buffer for managing streaming audio chunks. |
|
|
|
|
|
Maintains a sliding window buffer for real-time audio processing. |
|
|
Handles chunk accumulation, overflow, and underflow scenarios. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
buffer_duration_ms: float = 1000.0, |
|
|
chunk_duration_ms: float = 20.0, |
|
|
sample_rate: int = 16000 |
|
|
): |
|
|
""" |
|
|
Initialize streaming audio buffer. |
|
|
|
|
|
Args: |
|
|
buffer_duration_ms: Maximum buffer duration in milliseconds |
|
|
chunk_duration_ms: Expected chunk duration in milliseconds |
|
|
sample_rate: Sample rate in Hz |
|
|
""" |
|
|
self.sample_rate = sample_rate |
|
|
self.chunk_duration_ms = chunk_duration_ms |
|
|
self.buffer_duration_ms = buffer_duration_ms |
|
|
|
|
|
|
|
|
self.buffer_size_samples = int(buffer_duration_ms * sample_rate / 1000) |
|
|
self.chunk_size_samples = int(chunk_duration_ms * sample_rate / 1000) |
|
|
|
|
|
|
|
|
self.buffer = deque(maxlen=self.buffer_size_samples) |
|
|
|
|
|
|
|
|
self.total_samples_received = 0 |
|
|
self.total_chunks_received = 0 |
|
|
self.overflow_count = 0 |
|
|
self.underflow_count = 0 |
|
|
|
|
|
logger.info(f"StreamingAudioBuffer initialized: " |
|
|
f"buffer_duration={buffer_duration_ms}ms, " |
|
|
f"chunk_duration={chunk_duration_ms}ms, " |
|
|
f"sample_rate={sample_rate}Hz") |
|
|
|
|
|
def add_chunk(self, audio_chunk: np.ndarray) -> bool: |
|
|
""" |
|
|
Add audio chunk to buffer. |
|
|
|
|
|
Args: |
|
|
audio_chunk: Audio chunk array |
|
|
|
|
|
Returns: |
|
|
True if chunk added successfully, False if buffer overflow |
|
|
""" |
|
|
if len(audio_chunk) == 0: |
|
|
return True |
|
|
|
|
|
|
|
|
if len(self.buffer) + len(audio_chunk) > self.buffer_size_samples: |
|
|
self.overflow_count += 1 |
|
|
logger.warning(f"Buffer overflow! Dropping oldest samples. " |
|
|
f"Buffer: {len(self.buffer)}/{self.buffer_size_samples} samples") |
|
|
|
|
|
samples_to_remove = len(self.buffer) + len(audio_chunk) - self.buffer_size_samples |
|
|
for _ in range(samples_to_remove): |
|
|
if self.buffer: |
|
|
self.buffer.popleft() |
|
|
|
|
|
|
|
|
self.buffer.extend(audio_chunk) |
|
|
self.total_samples_received += len(audio_chunk) |
|
|
self.total_chunks_received += 1 |
|
|
|
|
|
return True |
|
|
|
|
|
def get_chunk(self, chunk_duration_ms: Optional[float] = None) -> Optional[np.ndarray]: |
|
|
""" |
|
|
Get next chunk from buffer. |
|
|
|
|
|
Args: |
|
|
chunk_duration_ms: Chunk duration in milliseconds (defaults to configured) |
|
|
|
|
|
Returns: |
|
|
Audio chunk array or None if buffer doesn't have enough samples |
|
|
""" |
|
|
chunk_duration_ms = chunk_duration_ms or self.chunk_duration_ms |
|
|
chunk_size_samples = int(chunk_duration_ms * self.sample_rate / 1000) |
|
|
|
|
|
if len(self.buffer) < chunk_size_samples: |
|
|
self.underflow_count += 1 |
|
|
return None |
|
|
|
|
|
|
|
|
chunk = np.array([self.buffer.popleft() for _ in range(chunk_size_samples)]) |
|
|
|
|
|
return chunk |
|
|
|
|
|
def get_buffer(self, max_samples: Optional[int] = None) -> np.ndarray: |
|
|
""" |
|
|
Get entire buffer contents. |
|
|
|
|
|
Args: |
|
|
max_samples: Maximum number of samples to return (None = all) |
|
|
|
|
|
Returns: |
|
|
Audio array from buffer |
|
|
""" |
|
|
if max_samples is None: |
|
|
return np.array(self.buffer) |
|
|
else: |
|
|
return np.array(list(self.buffer)[:max_samples]) |
|
|
|
|
|
def clear(self): |
|
|
"""Clear the buffer.""" |
|
|
self.buffer.clear() |
|
|
logger.debug("Buffer cleared") |
|
|
|
|
|
def get_stats(self) -> dict: |
|
|
""" |
|
|
Get buffer statistics. |
|
|
|
|
|
Returns: |
|
|
Dictionary with buffer statistics |
|
|
""" |
|
|
return { |
|
|
"buffer_size_samples": len(self.buffer), |
|
|
"buffer_capacity_samples": self.buffer_size_samples, |
|
|
"buffer_utilization": len(self.buffer) / self.buffer_size_samples, |
|
|
"total_samples_received": self.total_samples_received, |
|
|
"total_chunks_received": self.total_chunks_received, |
|
|
"overflow_count": self.overflow_count, |
|
|
"underflow_count": self.underflow_count, |
|
|
"buffer_duration_ms": len(self.buffer) / self.sample_rate * 1000 |
|
|
} |
|
|
|
|
|
def has_enough_data(self, chunk_duration_ms: Optional[float] = None) -> bool: |
|
|
""" |
|
|
Check if buffer has enough data for a chunk. |
|
|
|
|
|
Args: |
|
|
chunk_duration_ms: Chunk duration in milliseconds |
|
|
|
|
|
Returns: |
|
|
True if buffer has enough samples |
|
|
""" |
|
|
chunk_duration_ms = chunk_duration_ms or self.chunk_duration_ms |
|
|
chunk_size_samples = int(chunk_duration_ms * self.sample_rate / 1000) |
|
|
return len(self.buffer) >= chunk_size_samples |
|
|
|
|
|
def get_available_duration_ms(self) -> float: |
|
|
""" |
|
|
Get available audio duration in buffer in milliseconds. |
|
|
|
|
|
Returns: |
|
|
Duration in milliseconds |
|
|
""" |
|
|
return len(self.buffer) / self.sample_rate * 1000 |
|
|
|
|
|
|