PoC_ASR_v1 / app /services /audio_processor.py
vyluong's picture
Upload folder using huggingface_hub
5ab6c6e verified
"""
Audio processing service using FFmpeg.
Handles file validation, conversion to 16kHz mono WAV, and cleanup.
"""
import os
import uuid
import asyncio
import logging
from pathlib import Path
from typing import Optional, Tuple
import ffmpeg
from app.core.config import get_settings
from app.services.vocal_separator import VocalSeparator
from app.services.denoiser import DenoiserService
logger = logging.getLogger(__name__)
settings = get_settings()
class AudioProcessingError(Exception):
"""Custom exception for audio processing errors."""
pass
class AudioProcessor:
"""Service for processing audio files."""
ALLOWED_EXTENSIONS = settings.allowed_extensions
TARGET_SAMPLE_RATE = settings.sample_rate
TARGET_CHANNELS = settings.channels
@classmethod
def validate_file(cls, filename: str, file_size: int) -> bool:
"""
Validate uploaded file.
Args:
filename: Original filename
file_size: File size in bytes
Returns:
True if valid
Raises:
AudioProcessingError: If validation fails
"""
# Check extension
ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
if ext not in cls.ALLOWED_EXTENSIONS:
raise AudioProcessingError(
f"Invalid file type: .{ext}. Allowed: {', '.join(cls.ALLOWED_EXTENSIONS)}"
)
# Check size
if file_size > settings.max_upload_size_bytes:
raise AudioProcessingError(
f"File too large: {file_size / (1024*1024):.1f}MB. "
f"Maximum: {settings.max_upload_size_mb}MB"
)
return True
@classmethod
async def save_upload(cls, file_content: bytes, original_filename: str) -> Path:
"""
Save uploaded file to temporary location.
Args:
file_content: File bytes
original_filename: Original filename for extension
Returns:
Path to saved file
"""
ext = original_filename.rsplit('.', 1)[-1].lower() if '.' in original_filename else 'wav'
unique_id = str(uuid.uuid4())[:8]
filename = f"{unique_id}.{ext}"
filepath = settings.upload_dir / filename
# Write file asynchronously
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: filepath.write_bytes(file_content))
logger.debug(f"Saved upload: {filepath}")
return filepath
@classmethod
async def convert_to_wav(cls, input_path: Path) -> Path:
"""
Convert audio to 16kHz mono WAV using FFmpeg.
Args:
input_path: Path to input audio file
Returns:
Path to converted WAV file
"""
output_filename = f"{input_path.stem}_processed.wav"
output_path = settings.processed_dir / output_filename
try:
# Run ffmpeg conversion in executor to not block
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: cls._run_ffmpeg_conversion(input_path, output_path))
logger.info(f"Converted to WAV: {output_path}")
return output_path
except ffmpeg.Error as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
logger.error(f"FFmpeg error: {error_msg}")
raise AudioProcessingError(f"Audio conversion failed: {error_msg}")
@staticmethod
def _run_ffmpeg_conversion(input_path: Path, output_path: Path) -> None:
"""Run the actual FFmpeg conversion (blocking)."""
stream = ffmpeg.input(str(input_path))
# Apply normalization if enabled (loudnorm is best for speech consistency)
if settings.enable_loudnorm:
logger.debug("Applying loudnorm normalization...")
stream = stream.filter('loudnorm', I=-16, TP=-1.5, LRA=11)
# Apply noise reduction if enabled (Note: basic filters are kept as minor cleanup)
if settings.enable_noise_reduction:
logger.debug("Applying subtle highpass filter...")
stream = stream.filter('highpass', f=80)
(
stream
.output(
str(output_path),
acodec='pcm_s16le',
ar=16000,
ac=1
)
.overwrite_output()
.run(quiet=True, capture_stderr=True)
)
@classmethod
async def get_audio_duration(cls, filepath: Path) -> float:
"""
Get audio file duration in seconds.
Args:
filepath: Path to audio file
Returns:
Duration in seconds
"""
try:
loop = asyncio.get_event_loop()
probe = await loop.run_in_executor(
None,
lambda: ffmpeg.probe(str(filepath))
)
duration = float(probe['format'].get('duration', 0))
return duration
except ffmpeg.Error as e:
logger.warning(f"Could not probe audio duration: {e}")
return 0.0
@classmethod
async def cleanup_files(cls, *filepaths: Path) -> None:
"""
Delete temporary files.
Args:
filepaths: Paths to files to delete
"""
for filepath in filepaths:
try:
if filepath and filepath.exists():
filepath.unlink()
logger.debug(f"Cleaned up: {filepath}")
except Exception as e:
logger.warning(f"Failed to clean up {filepath}: {e}")
@classmethod
async def process_upload(cls, file_content: bytes, filename: str) -> Tuple[Path, float]:
"""
Full upload processing pipeline: validate, save, convert.
Args:
file_content: Uploaded file bytes
filename: Original filename
Returns:
Tuple of (processed WAV path, duration in seconds)
"""
# Validate
cls.validate_file(filename, len(file_content))
# Save original
original_path = await cls.save_upload(file_content, filename)
vocals_path = None
try:
# Step 1: Denoising (Speech Enhancement)
if settings.enable_denoiser:
denoised_path = await DenoiserService.enhance_audio(original_path)
source_for_separation = denoised_path
else:
source_for_separation = original_path
denoised_path = None
# Step 2: Vocal separation using MDX-Net
if settings.enable_vocal_separation:
vocals_path = await VocalSeparator.separate_vocals(source_for_separation)
source_for_conversion = vocals_path
else:
source_for_conversion = source_for_separation
vocals_path = None
# Step 3: Convert to 16kHz mono WAV (includes normalization)
wav_path = await cls.convert_to_wav(source_for_conversion)
# Get duration
duration = await cls.get_audio_duration(wav_path)
# Cleanup intermediate files
to_cleanup = [original_path]
if denoised_path and denoised_path != original_path:
to_cleanup.append(denoised_path)
if vocals_path and vocals_path not in [original_path, denoised_path]:
to_cleanup.append(vocals_path)
await cls.cleanup_files(*to_cleanup)
return wav_path, duration
except Exception as e:
# Cleanup on error
await cls.cleanup_files(original_path)
if 'denoised_path' in locals() and denoised_path and denoised_path != original_path:
await cls.cleanup_files(denoised_path)
if 'vocals_path' in locals() and vocals_path and vocals_path not in [original_path, denoised_path]:
await cls.cleanup_files(vocals_path)
raise