Spaces:
Sleeping
Sleeping
| """ | |
| Audio post-processing for phone announcements. | |
| Handles background music mixing, normalization, and export. | |
| """ | |
| import io | |
| import os | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Optional, Union | |
| import numpy as np | |
| from loguru import logger | |
| class AudioProcessingConfig: | |
| """Configuration for audio post-processing.""" | |
| # Background music settings | |
| background_music_path: Optional[str] = None | |
| music_volume_db: float = -20.0 # Relative volume of background music | |
| # Fade settings | |
| fade_in_ms: int = 500 | |
| fade_out_ms: int = 500 | |
| # Padding (silence before/after speech) | |
| padding_start_ms: int = 300 | |
| padding_end_ms: int = 300 | |
| # Output settings | |
| normalize: bool = True | |
| target_loudness_db: float = -16.0 # Target LUFS for normalization | |
| output_sample_rate: int = 44100 | |
| output_format: str = "mp3" | |
| class AudioProcessor: | |
| """ | |
| Post-processor for TTS audio. | |
| Adds background music, applies fades, normalizes, and exports. | |
| """ | |
| # Default background music directory | |
| ASSETS_DIR = Path(__file__).parent / "data" / "assets" | |
| def __init__(self, config: Optional[AudioProcessingConfig] = None): | |
| self.config = config or AudioProcessingConfig() | |
| def process( | |
| self, | |
| audio: np.ndarray, | |
| sample_rate: int, | |
| output_path: Optional[str] = None, | |
| **override_config, | |
| ) -> Union[bytes, str]: | |
| """ | |
| Process audio with background music, fades, and normalization. | |
| Args: | |
| audio: Input audio as numpy array | |
| sample_rate: Sample rate of input audio | |
| output_path: Optional path to save the output (returns bytes if None) | |
| **override_config: Override any config settings for this call | |
| Returns: | |
| Path to output file if output_path is provided, otherwise MP3 bytes | |
| """ | |
| # Merge config overrides | |
| config = AudioProcessingConfig(**{**self.config.__dict__, **override_config}) | |
| # Work in numpy to avoid pydub.set_frame_rate (it changes speed/pitch). | |
| speech = self._ensure_mono_float32(audio) | |
| speech_sr = int(sample_rate) | |
| # Boost speech slightly for clarity (+3 dB) | |
| speech = self._apply_gain_db(speech, 3.0) | |
| # Normalize speech BEFORE adding music so music doesn't make speech quieter. | |
| if config.normalize: | |
| speech = self._normalize_numpy(speech, config.target_loudness_db) | |
| # Add padding (silence before/after speech) | |
| if config.padding_start_ms > 0: | |
| pad = int(round(speech_sr * (config.padding_start_ms / 1000.0))) | |
| if pad > 0: | |
| speech = np.concatenate([np.zeros(pad, dtype=np.float32), speech]) | |
| if config.padding_end_ms > 0: | |
| pad = int(round(speech_sr * (config.padding_end_ms / 1000.0))) | |
| if pad > 0: | |
| speech = np.concatenate([speech, np.zeros(pad, dtype=np.float32)]) | |
| mixed = speech | |
| # Mix with background music if specified | |
| if config.background_music_path: | |
| mixed = self._mix_background_music_numpy( | |
| speech=mixed, | |
| speech_sample_rate=speech_sr, | |
| music_path=config.background_music_path, | |
| music_gain_db=config.music_volume_db, | |
| ) | |
| # Apply fades to the final mix | |
| if config.fade_in_ms > 0: | |
| mixed = self._apply_fade_in(mixed, speech_sr, config.fade_in_ms) | |
| if config.fade_out_ms > 0: | |
| mixed = self._apply_fade_out(mixed, speech_sr, config.fade_out_ms) | |
| # Resample the final audio to the requested output sample rate (real resampling) | |
| out_sr = int(config.output_sample_rate) | |
| if speech_sr != out_sr: | |
| mixed = self._resample_numpy(mixed, orig_sr=speech_sr, target_sr=out_sr) | |
| # Prevent clipping after mixing | |
| mixed = self._peak_limit(mixed, peak=0.98) | |
| # Export via pydub (mp3/wav/etc.) | |
| audio_segment = self._numpy_to_audiosegment(mixed, out_sr) | |
| if output_path: | |
| audio_segment.export(output_path, format=config.output_format) | |
| return output_path | |
| buffer = io.BytesIO() | |
| audio_segment.export(buffer, format=config.output_format) | |
| return buffer.getvalue() | |
| def _numpy_to_audiosegment( | |
| self, audio: np.ndarray, sample_rate: int | |
| ) -> "AudioSegment": | |
| """Convert numpy array to pydub AudioSegment.""" | |
| from pydub import AudioSegment | |
| # Ensure float32 and normalize | |
| if audio.dtype != np.float32: | |
| audio = audio.astype(np.float32) | |
| # Clip and convert to int16 | |
| audio = np.clip(audio, -1.0, 1.0) | |
| audio_int16 = (audio * 32767).astype(np.int16) | |
| # Create AudioSegment | |
| return AudioSegment( | |
| data=audio_int16.tobytes(), | |
| sample_width=2, # 16-bit | |
| frame_rate=sample_rate, | |
| channels=1, # Mono | |
| ) | |
| def _resolve_music_path(self, music_path: str) -> str: | |
| """Resolve a preset music name/path to an existing file path.""" | |
| if not music_path: | |
| return music_path | |
| # Absolute path | |
| if os.path.isabs(music_path) and os.path.exists(music_path): | |
| return music_path | |
| # Relative / preset name: try assets dir. | |
| candidate = self.ASSETS_DIR / f"{music_path}.mp3" | |
| if candidate.exists(): | |
| return str(candidate) | |
| candidate = self.ASSETS_DIR / music_path | |
| if candidate.exists(): | |
| return str(candidate) | |
| return music_path | |
| def _load_audio_file_numpy(self, path: str) -> tuple[np.ndarray, int]: | |
| """Load an audio file to mono float32 numpy, returning (samples, sample_rate).""" | |
| from pydub import AudioSegment | |
| seg = AudioSegment.from_file(path) | |
| seg = seg.set_channels(1) | |
| sample_rate = int(seg.frame_rate) | |
| samples = np.array(seg.get_array_of_samples()) | |
| # Convert PCM integers to float32 in [-1, 1] | |
| max_val = float(1 << (8 * seg.sample_width - 1)) | |
| audio = (samples.astype(np.float32) / max_val).clip(-1.0, 1.0) | |
| return audio, sample_rate | |
| def _resample_numpy( | |
| self, audio: np.ndarray, orig_sr: int, target_sr: int | |
| ) -> np.ndarray: | |
| if orig_sr == target_sr: | |
| return audio | |
| import librosa | |
| return librosa.resample(audio, orig_sr=orig_sr, target_sr=target_sr) | |
| def _mix_background_music_numpy( | |
| self, | |
| speech: np.ndarray, | |
| speech_sample_rate: int, | |
| music_path: str, | |
| music_gain_db: float, | |
| ) -> np.ndarray: | |
| """Mix background music into speech (numpy domain) without changing speech tempo.""" | |
| resolved = self._resolve_music_path(music_path) | |
| if not os.path.exists(resolved): | |
| logger.warning(f"Background music not found: {resolved}") | |
| return speech | |
| try: | |
| music, music_sr = self._load_audio_file_numpy(resolved) | |
| if music_sr != speech_sample_rate: | |
| music = self._resample_numpy( | |
| music, orig_sr=music_sr, target_sr=speech_sample_rate | |
| ) | |
| # Loop / trim to match speech length | |
| if music.size == 0: | |
| return speech | |
| if music.shape[0] < speech.shape[0]: | |
| reps = int(np.ceil(speech.shape[0] / music.shape[0])) | |
| music = np.tile(music, reps) | |
| music = music[: speech.shape[0]] | |
| # Apply music gain | |
| music = self._apply_gain_db(music, music_gain_db) | |
| return (speech + music).astype(np.float32) | |
| except Exception as e: | |
| logger.error(f"Failed to add background music: {e}") | |
| return speech | |
| def _ensure_mono_float32(self, audio: np.ndarray) -> np.ndarray: | |
| audio = np.asarray(audio) | |
| if audio.ndim == 2: | |
| # If (n, channels), downmix | |
| audio = audio.mean(axis=1) | |
| if audio.dtype != np.float32: | |
| audio = audio.astype(np.float32) | |
| return np.clip(audio, -1.0, 1.0) | |
| def _apply_gain_db(self, audio: np.ndarray, gain_db: float) -> np.ndarray: | |
| factor = float(10 ** (gain_db / 20.0)) | |
| return (audio * factor).astype(np.float32) | |
| def _rms_dbfs(self, audio: np.ndarray) -> float: | |
| rms = float(np.sqrt(np.mean(np.square(audio), dtype=np.float64))) | |
| return float(20.0 * np.log10(rms + 1e-9)) | |
| def _normalize_numpy(self, audio: np.ndarray, target_dbfs: float) -> np.ndarray: | |
| current = self._rms_dbfs(audio) | |
| gain_db = float(target_dbfs - current) | |
| return self._apply_gain_db(audio, gain_db) | |
| def _apply_fade_in( | |
| self, audio: np.ndarray, sample_rate: int, fade_ms: int | |
| ) -> np.ndarray: | |
| n = int(round(sample_rate * (fade_ms / 1000.0))) | |
| if n <= 0: | |
| return audio | |
| n = min(n, audio.shape[0]) | |
| ramp = np.linspace(0.0, 1.0, n, dtype=np.float32) | |
| out = audio.copy() | |
| out[:n] *= ramp | |
| return out | |
| def _apply_fade_out( | |
| self, audio: np.ndarray, sample_rate: int, fade_ms: int | |
| ) -> np.ndarray: | |
| n = int(round(sample_rate * (fade_ms / 1000.0))) | |
| if n <= 0: | |
| return audio | |
| n = min(n, audio.shape[0]) | |
| ramp = np.linspace(1.0, 0.0, n, dtype=np.float32) | |
| out = audio.copy() | |
| out[-n:] *= ramp | |
| return out | |
| def _peak_limit(self, audio: np.ndarray, peak: float = 0.98) -> np.ndarray: | |
| max_abs = float(np.max(np.abs(audio))) if audio.size else 0.0 | |
| if max_abs <= 0: | |
| return audio | |
| if max_abs <= peak: | |
| return np.clip(audio, -1.0, 1.0) | |
| scale = float(peak / max_abs) | |
| return np.clip(audio * scale, -1.0, 1.0).astype(np.float32) | |
| def list_available_music(self) -> list[str]: | |
| """List available background music files in the assets directory.""" | |
| logger.debug(f"Looking for music in: {self.ASSETS_DIR}") | |
| logger.debug(f"ASSETS_DIR exists: {self.ASSETS_DIR.exists()}") | |
| if not self.ASSETS_DIR.exists(): | |
| logger.warning(f"Assets directory not found: {self.ASSETS_DIR}") | |
| return [] | |
| music_files = [] | |
| for ext in ["mp3", "wav", "flac", "ogg"]: | |
| found = list(self.ASSETS_DIR.glob(f"*.{ext}")) | |
| logger.debug(f"Found {len(found)} .{ext} files") | |
| music_files.extend([f.stem for f in found]) | |
| result = sorted(set(music_files)) | |
| logger.info(f"Available background music: {result}") | |
| return result | |