Spaces:
Running
Running
| """ | |
| Audio processing utilities. | |
| Simple validation and file handling. | |
| """ | |
| import logging | |
| import uuid | |
| from pathlib import Path | |
| from typing import Optional, Tuple | |
| from app.core.config import get_settings | |
| import ffmpeg | |
| import asyncio | |
| from app.services.vocal_separator import VocalSeparator | |
| from app.services.denoiser import DenoiserService | |
| logger = logging.getLogger(__name__) | |
| settings = get_settings() | |
| class AudioProcessingError(Exception): | |
| """Custom exception for audio processing errors.""" | |
| pass | |
| class AudioProcessor: | |
| ALLOWED_EXTENSIONS = settings.allowed_extensions | |
| TARGET_SAMPLE_RATE = settings.sample_rate | |
| TARGET_CHANNELS = settings.channels | |
| def validate_file(cls, filename: str, file_size: int) -> None: | |
| """ | |
| Validate uploaded file. | |
| Args: | |
| filename: Original filename | |
| file_size: File size in bytes | |
| Raises: | |
| AudioProcessingError: If validation fails | |
| """ | |
| # Check extension | |
| ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else '' | |
| if ext not in settings.allowed_extensions: | |
| raise AudioProcessingError( | |
| f"File type '.{ext}' not supported. " | |
| f"Allowed: {', '.join(settings.allowed_extensions)}" | |
| ) | |
| # Check size | |
| if file_size > settings.max_upload_size_bytes: | |
| raise AudioProcessingError( | |
| f"File too large ({file_size / 1024 / 1024:.1f}MB). " | |
| f"Maximum size: {settings.max_upload_size_mb}MB" | |
| ) | |
| async def save_upload(cls, file_content: bytes, original_filename: str) -> Path: | |
| """ | |
| Save uploaded file to disk. | |
| Args: | |
| file_content: Raw file bytes | |
| original_filename: Original filename for extension | |
| Returns: | |
| Path to saved file | |
| """ | |
| import aiofiles | |
| # Generate unique filename | |
| ext = original_filename.rsplit('.', 1)[-1].lower() if '.' in original_filename else 'wav' | |
| unique_filename = f"{uuid.uuid4()}.{ext}" | |
| file_path = settings.upload_dir / unique_filename | |
| # Save file | |
| async with aiofiles.open(file_path, 'wb') as f: | |
| await f.write(file_content) | |
| logger.info(f"Saved upload: {file_path} ({len(file_content) / 1024:.1f}KB)") | |
| return file_path | |
| async def convert_to_wav(cls, input_path: Path) -> Path: | |
| """ | |
| Convert audio to 16kHz mono WAV using FFmpeg. | |
| Args: | |
| input_path: Path to input audio file | |
| Returns: | |
| Path to converted WAV file | |
| """ | |
| output_filename = f"{input_path.stem}_processed.wav" | |
| output_path = settings.processed_dir / output_filename | |
| try: | |
| # Run ffmpeg conversion in executor to not block | |
| loop = asyncio.get_event_loop() | |
| await loop.run_in_executor(None, lambda: cls._run_ffmpeg_conversion(input_path, output_path)) | |
| logger.info(f"Converted to WAV: {output_path}") | |
| return output_path | |
| except ffmpeg.Error as e: | |
| error_msg = e.stderr.decode() if e.stderr else str(e) | |
| logger.error(f"FFmpeg error: {error_msg}") | |
| raise AudioProcessingError(f"Audio conversion failed: {error_msg}") | |
| def _run_ffmpeg_conversion(input_path: Path, output_path: Path) -> None: | |
| """Run the actual FFmpeg conversion (blocking).""" | |
| stream = ffmpeg.input(str(input_path)) | |
| # Apply normalization if enabled (loudnorm is best for speech consistency) | |
| if settings.enable_loudnorm: | |
| logger.debug("Applying loudnorm normalization...") | |
| stream = stream.filter('loudnorm', I=-20, TP=-2, LRA=7) | |
| # Apply noise reduction if enabled (Note: basic filters are kept as minor cleanup) | |
| if settings.enable_noise_reduction: | |
| logger.debug("Applying subtle highpass filter...") | |
| stream = ( | |
| stream | |
| .filter('highpass', f=60) | |
| .filter('lowpass', f=7500) | |
| .filter( | |
| # Silence trimming | |
| 'silenceremove', | |
| stop_periods=-1, | |
| stop_duration=0.4, | |
| stop_threshold='-45dB' | |
| ) | |
| ) | |
| ( | |
| stream.output( | |
| str(output_path), | |
| acodec='pcm_s16le', | |
| ar=16000, | |
| ac=1 | |
| ) | |
| .overwrite_output() | |
| .run(quiet=True, capture_stderr=True) | |
| ) | |
| async def get_audio_duration(cls, filepath: Path) -> float: | |
| """ | |
| Get audio file duration in seconds. | |
| Args: | |
| filepath: Path to audio file | |
| Returns: | |
| Duration in seconds | |
| """ | |
| try: | |
| loop = asyncio.get_event_loop() | |
| probe = await loop.run_in_executor( | |
| None, | |
| lambda: ffmpeg.probe(str(filepath)) | |
| ) | |
| duration = float(probe['format'].get('duration', 0)) | |
| return duration | |
| except ffmpeg.Error as e: | |
| logger.warning(f"Could not probe audio duration: {e}") | |
| return 0.0 | |
| async def cleanup_files(cls, *paths: Path) -> None: | |
| """Remove temporary files.""" | |
| import asyncio | |
| for path in paths: | |
| try: | |
| if path and path.exists(): | |
| path.unlink() | |
| logger.debug(f"Cleaned up: {path}") | |
| except Exception as e: | |
| logger.warning(f"Failed to cleanup {path}: {e}") | |
| async def process_upload(cls, file_content: bytes, filename: str) -> Tuple[Path, float]: | |
| """ | |
| Full upload processing pipeline: validate, save, convert. | |
| Args: | |
| file_content: Uploaded file bytes | |
| filename: Original filename | |
| Returns: | |
| Tuple of (processed WAV path, duration in seconds) | |
| """ | |
| # Validate | |
| cls.validate_file(filename, len(file_content)) | |
| # Save original | |
| original_path = await cls.save_upload(file_content, filename) | |
| vocals_path = None | |
| try: | |
| # Step 1: Denoising (Speech Enhancement) | |
| if settings.enable_denoiser: | |
| denoised_path = await DenoiserService.enhance_audio(original_path) | |
| source_for_separation = denoised_path | |
| else: | |
| source_for_separation = original_path | |
| denoised_path = None | |
| # Step 2: Vocal separation using MDX-Net | |
| if settings.enable_vocal_separation: | |
| vocals_path = await VocalSeparator.separate_vocals(source_for_separation) | |
| source_for_conversion = vocals_path | |
| else: | |
| source_for_conversion = source_for_separation | |
| vocals_path = None | |
| # Step 3: Convert to 16kHz mono WAV (includes normalization) | |
| wav_path = await cls.convert_to_wav(source_for_conversion) | |
| # Get duration | |
| duration = await cls.get_audio_duration(wav_path) | |
| # Cleanup intermediate files | |
| to_cleanup = [original_path] | |
| if denoised_path and denoised_path != original_path: | |
| to_cleanup.append(denoised_path) | |
| if vocals_path and vocals_path not in [original_path, denoised_path]: | |
| to_cleanup.append(vocals_path) | |
| await cls.cleanup_files(*to_cleanup) | |
| return wav_path, duration | |
| except Exception as e: | |
| # Cleanup on error | |
| await cls.cleanup_files(original_path) | |
| if 'denoised_path' in locals() and denoised_path and denoised_path != original_path: | |
| await cls.cleanup_files(denoised_path) | |
| if 'vocals_path' in locals() and vocals_path and vocals_path not in [original_path, denoised_path]: | |
| await cls.cleanup_files(vocals_path) | |
| raise |