Spaces:
Sleeping
Sleeping
| # audio_utils.py - Audio processing utilities | |
| import logging | |
| from typing import List, Optional | |
| import os | |
| import tempfile | |
| from pydub import AudioSegment | |
| from pydub.utils import which | |
| logger = logging.getLogger(__name__) | |
| class AudioProcessor: | |
| def __init__(self): | |
| self._check_dependencies() | |
| def _check_dependencies(self): | |
| """Check if required audio processing tools are available.""" | |
| # Check for ffmpeg | |
| if not which("ffmpeg"): | |
| logger.warning("ffmpeg not found. Some audio operations may fail.") | |
| def merge_and_convert_to_mp3( | |
| self, | |
| audio_files: List[str], | |
| output_path: str | |
| ) -> Optional[str]: | |
| """ | |
| Merge multiple audio files and convert to MP3. | |
| Args: | |
| audio_files: List of paths to audio files to merge | |
| output_path: Path for the output MP3 file | |
| Returns: | |
| Path to the merged MP3 file, or None if failed | |
| """ | |
| try: | |
| if not audio_files: | |
| logger.error("No audio files to merge") | |
| return None | |
| logger.info(f"Merging {len(audio_files)} audio files...") | |
| # Start with empty audio | |
| merged_audio = AudioSegment.empty() | |
| for i, audio_file in enumerate(audio_files): | |
| if not os.path.exists(audio_file): | |
| logger.warning(f"Audio file not found: {audio_file}") | |
| continue | |
| try: | |
| # Load audio segment | |
| segment = AudioSegment.from_wav(audio_file) | |
| # Add a small pause between segments (500ms) | |
| if i > 0: | |
| pause = AudioSegment.silent(duration=500) | |
| merged_audio += pause | |
| # Add the segment | |
| merged_audio += segment | |
| logger.info(f"Added segment {i+1}/{len(audio_files)}") | |
| except Exception as e: | |
| logger.error(f"Failed to process audio file {audio_file}: {e}") | |
| continue | |
| if len(merged_audio) == 0: | |
| logger.error("No audio content to export") | |
| return None | |
| # Normalize audio levels | |
| merged_audio = self._normalize_audio(merged_audio) | |
| # Export as MP3 | |
| logger.info(f"Exporting to MP3: {output_path}") | |
| merged_audio.export( | |
| output_path, | |
| format="mp3", | |
| bitrate="128k", | |
| parameters=["-q:a", "2"] # Good quality | |
| ) | |
| # Verify the file was created | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| duration = len(merged_audio) / 1000.0 # Convert to seconds | |
| logger.info(f"Successfully created MP3: {duration:.1f} seconds") | |
| return output_path | |
| else: | |
| logger.error("Failed to create MP3 file") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to merge audio files: {e}") | |
| return None | |
| def _normalize_audio(self, audio: AudioSegment) -> AudioSegment: | |
| """Normalize audio levels.""" | |
| try: | |
| # Apply some basic audio processing | |
| # Normalize to -6dB to avoid clipping | |
| target_dBFS = -6.0 | |
| change_in_dBFS = target_dBFS - audio.dBFS | |
| normalized_audio = audio.apply_gain(change_in_dBFS) | |
| return normalized_audio | |
| except Exception as e: | |
| logger.warning(f"Failed to normalize audio: {e}") | |
| return audio | |