PoC_ASR_v6_dev / app /services /audio_processor.py
vyluong's picture
Update app/services/audio_processor.py
413d60c verified
"""
Audio processing utilities.
Simple validation and file handling.
"""
import logging
import uuid
from pathlib import Path
from typing import Optional, Tuple
from app.core.config import get_settings
import ffmpeg
import asyncio
from app.services.denoiser import DenoiserService
logger = logging.getLogger(__name__)
settings = get_settings()
class AudioProcessingError(Exception):
"""Custom exception for audio processing errors."""
pass
class AudioProcessor:
ALLOWED_EXTENSIONS = settings.allowed_extensions
TARGET_SAMPLE_RATE = settings.sample_rate
TARGET_CHANNELS = settings.channels
@classmethod
def validate_file(cls, filename: str, file_size: int) -> None:
"""
Validate uploaded file.
Args:
filename: Original filename
file_size: File size in bytes
Raises:
AudioProcessingError: If validation fails
"""
# Check extension
ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
if ext not in settings.allowed_extensions:
raise AudioProcessingError(
f"File type '.{ext}' not supported. "
f"Allowed: {', '.join(settings.allowed_extensions)}"
)
# Check size
if file_size > settings.max_upload_size_bytes:
raise AudioProcessingError(
f"File too large ({file_size / 1024 / 1024:.1f}MB). "
f"Maximum size: {settings.max_upload_size_mb}MB"
)
@classmethod
async def save_upload(cls, file_content: bytes, original_filename: str) -> Path:
"""
Save uploaded file to disk.
Args:
file_content: Raw file bytes
original_filename: Original filename for extension
Returns:
Path to saved file
"""
import aiofiles
# Generate unique filename
ext = original_filename.rsplit('.', 1)[-1].lower() if '.' in original_filename else 'wav'
unique_filename = f"{uuid.uuid4()}.{ext}"
file_path = settings.upload_dir / unique_filename
# Save file
async with aiofiles.open(file_path, 'wb') as f:
await f.write(file_content)
logger.info(f"Saved upload: {file_path} ({len(file_content) / 1024:.1f}KB)")
return file_path
@classmethod
async def convert_to_wav(cls, input_path: Path) -> Path:
"""
Convert audio to 16kHz mono WAV using FFmpeg.
Args:
input_path: Path to input audio file
Returns:
Path to converted WAV file
"""
output_filename = f"{input_path.stem}_processed.wav"
output_path = settings.processed_dir / output_filename
try:
# Run ffmpeg conversion in executor to not block
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: cls._run_ffmpeg_conversion(input_path, output_path))
logger.info(f"Converted to WAV: {output_path}")
return output_path
except ffmpeg.Error as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
logger.error(f"FFmpeg error: {error_msg}")
raise AudioProcessingError(f"Audio conversion failed: {error_msg}")
@staticmethod
def _run_ffmpeg_conversion(input_path: Path, output_path: Path) -> None:
"""Run the actual FFmpeg conversion (blocking)."""
stream = ffmpeg.input(str(input_path))
# Apply normalization if enabled (loudnorm is best for speech consistency)
if settings.enable_loudnorm:
logger.debug("Applying loudnorm normalization...")
stream = stream.filter('loudnorm', I=-20, TP=-2, LRA=7)
# Apply noise reduction if enabled (Note: basic filters are kept as minor cleanup)
if settings.enable_noise_reduction:
logger.debug("Applying subtle highpass filter...")
stream = (
stream
.filter('highpass', f=60)
.filter('lowpass', f=7500)
.filter(
# Silence trimming
'silenceremove',
stop_periods=-1,
stop_duration=0.4,
stop_threshold='-45dB'
)
)
(
stream.output(
str(output_path),
acodec='pcm_s16le',
ar=16000,
ac=1
)
.overwrite_output()
.run(quiet=True, capture_stderr=True)
)
@classmethod
async def get_audio_duration(cls, filepath: Path) -> float:
"""
Get audio file duration in seconds.
Args:
filepath: Path to audio file
Returns:
Duration in seconds
"""
try:
loop = asyncio.get_event_loop()
probe = await loop.run_in_executor(
None,
lambda: ffmpeg.probe(str(filepath))
)
duration = float(probe['format'].get('duration', 0))
return duration
except ffmpeg.Error as e:
logger.warning(f"Could not probe audio duration: {e}")
return 0.0
@classmethod
async def cleanup_files(cls, *paths: Path) -> None:
"""Remove temporary files."""
import asyncio
for path in paths:
try:
if path and path.exists():
path.unlink()
logger.debug(f"Cleaned up: {path}")
except Exception as e:
logger.warning(f"Failed to cleanup {path}: {e}")
@classmethod
async def process_upload(cls, file_content: bytes, filename: str) -> Tuple[Path, float]:
"""
Full upload processing pipeline: validate, save, convert.
Args:
file_content: Uploaded file bytes
filename: Original filename
Returns:
Tuple of (processed WAV path, duration in seconds)
"""
# Validate
cls.validate_file(filename, len(file_content))
# Save original
original_path = await cls.save_upload(file_content, filename)
vocals_path = None
try:
# Step 1: Denoising (Speech Enhancement)
if settings.enable_denoiser:
denoised_path = await DenoiserService.enhance_audio(original_path)
source_for_separation = denoised_path
else:
source_for_separation = original_path
denoised_path = None
# Step 2: Vocal separation using MDX-Net
if settings.enable_vocal_separation:
vocals_path = await VocalSeparator.separate_vocals(source_for_separation)
source_for_conversion = vocals_path
else:
source_for_conversion = source_for_separation
vocals_path = None
# Step 3: Convert to 16kHz mono WAV (includes normalization)
wav_path = await cls.convert_to_wav(source_for_conversion)
# Get duration
duration = await cls.get_audio_duration(wav_path)
# Cleanup intermediate files
to_cleanup = [original_path]
if denoised_path and denoised_path != original_path:
to_cleanup.append(denoised_path)
if vocals_path and vocals_path not in [original_path, denoised_path]:
to_cleanup.append(vocals_path)
await cls.cleanup_files(*to_cleanup)
return wav_path, duration
except Exception as e:
# Cleanup on error
await cls.cleanup_files(original_path)
if 'denoised_path' in locals() and denoised_path and denoised_path != original_path:
await cls.cleanup_files(denoised_path)
if 'vocals_path' in locals() and vocals_path and vocals_path not in [original_path, denoised_path]:
await cls.cleanup_files(vocals_path)
raise