Spaces:
Running
Running
File size: 8,517 Bytes
4d6b6c4 298639e 4d6b6c4 29a8a4a 4d6b6c4 29a8a4a 4d6b6c4 298639e 4d6b6c4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
"""
Audio processing utilities.
Simple validation and file handling.
"""
import logging
import uuid
from pathlib import Path
from typing import Optional, Tuple
from app.core.config import get_settings
import ffmpeg
import asyncio
from app.services.vocal_separator import VocalSeparator
from app.services.denoiser import DenoiserService
logger = logging.getLogger(__name__)
settings = get_settings()
class AudioProcessingError(Exception):
"""Custom exception for audio processing errors."""
pass
class AudioProcessor:
ALLOWED_EXTENSIONS = settings.allowed_extensions
TARGET_SAMPLE_RATE = settings.sample_rate
TARGET_CHANNELS = settings.channels
@classmethod
def validate_file(cls, filename: str, file_size: int) -> None:
"""
Validate uploaded file.
Args:
filename: Original filename
file_size: File size in bytes
Raises:
AudioProcessingError: If validation fails
"""
# Check extension
ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
if ext not in settings.allowed_extensions:
raise AudioProcessingError(
f"File type '.{ext}' not supported. "
f"Allowed: {', '.join(settings.allowed_extensions)}"
)
# Check size
if file_size > settings.max_upload_size_bytes:
raise AudioProcessingError(
f"File too large ({file_size / 1024 / 1024:.1f}MB). "
f"Maximum size: {settings.max_upload_size_mb}MB"
)
@classmethod
async def save_upload(cls, file_content: bytes, original_filename: str) -> Path:
"""
Save uploaded file to disk.
Args:
file_content: Raw file bytes
original_filename: Original filename for extension
Returns:
Path to saved file
"""
import aiofiles
# Generate unique filename
ext = original_filename.rsplit('.', 1)[-1].lower() if '.' in original_filename else 'wav'
unique_filename = f"{uuid.uuid4()}.{ext}"
file_path = settings.upload_dir / unique_filename
# Save file
async with aiofiles.open(file_path, 'wb') as f:
await f.write(file_content)
logger.info(f"Saved upload: {file_path} ({len(file_content) / 1024:.1f}KB)")
return file_path
@classmethod
async def convert_to_wav(cls, input_path: Path) -> Path:
"""
Convert audio to 16kHz mono WAV using FFmpeg.
Args:
input_path: Path to input audio file
Returns:
Path to converted WAV file
"""
output_filename = f"{input_path.stem}_processed.wav"
output_path = settings.processed_dir / output_filename
try:
# Run ffmpeg conversion in executor to not block
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, lambda: cls._run_ffmpeg_conversion(input_path, output_path))
logger.info(f"Converted to WAV: {output_path}")
return output_path
except ffmpeg.Error as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
logger.error(f"FFmpeg error: {error_msg}")
raise AudioProcessingError(f"Audio conversion failed: {error_msg}")
@staticmethod
def _run_ffmpeg_conversion(input_path: Path, output_path: Path) -> None:
"""Run the actual FFmpeg conversion (blocking)."""
stream = ffmpeg.input(str(input_path))
# Apply normalization if enabled (loudnorm is best for speech consistency)
if settings.enable_loudnorm:
logger.debug("Applying loudnorm normalization...")
stream = stream.filter('loudnorm', I=-20, TP=-2, LRA=7)
# Apply noise reduction if enabled (Note: basic filters are kept as minor cleanup)
if settings.enable_noise_reduction:
logger.debug("Applying subtle highpass filter...")
stream = (
stream
.filter('highpass', f=60)
.filter('lowpass', f=7500)
.filter(
# Silence trimming
'silenceremove',
stop_periods=-1,
stop_duration=0.4,
stop_threshold='-45dB'
)
)
(
stream.output(
str(output_path),
acodec='pcm_s16le',
ar=16000,
ac=1
)
.overwrite_output()
.run(quiet=True, capture_stderr=True)
)
@classmethod
async def get_audio_duration(cls, filepath: Path) -> float:
"""
Get audio file duration in seconds.
Args:
filepath: Path to audio file
Returns:
Duration in seconds
"""
try:
loop = asyncio.get_event_loop()
probe = await loop.run_in_executor(
None,
lambda: ffmpeg.probe(str(filepath))
)
duration = float(probe['format'].get('duration', 0))
return duration
except ffmpeg.Error as e:
logger.warning(f"Could not probe audio duration: {e}")
return 0.0
@classmethod
async def cleanup_files(cls, *paths: Path) -> None:
"""Remove temporary files."""
import asyncio
for path in paths:
try:
if path and path.exists():
path.unlink()
logger.debug(f"Cleaned up: {path}")
except Exception as e:
logger.warning(f"Failed to cleanup {path}: {e}")
@classmethod
async def process_upload(cls, file_content: bytes, filename: str) -> Tuple[Path, float]:
"""
Full upload processing pipeline: validate, save, convert.
Args:
file_content: Uploaded file bytes
filename: Original filename
Returns:
Tuple of (processed WAV path, duration in seconds)
"""
# Validate
cls.validate_file(filename, len(file_content))
# Save original
original_path = await cls.save_upload(file_content, filename)
vocals_path = None
try:
# Step 1: Denoising (Speech Enhancement)
if settings.enable_denoiser:
denoised_path = await DenoiserService.enhance_audio(original_path)
source_for_separation = denoised_path
else:
source_for_separation = original_path
denoised_path = None
# Step 2: Vocal separation using MDX-Net
if settings.enable_vocal_separation:
vocals_path = await VocalSeparator.separate_vocals(source_for_separation)
source_for_conversion = vocals_path
else:
source_for_conversion = source_for_separation
vocals_path = None
# Step 3: Convert to 16kHz mono WAV (includes normalization)
wav_path = await cls.convert_to_wav(source_for_conversion)
# Get duration
duration = await cls.get_audio_duration(wav_path)
# Cleanup intermediate files
to_cleanup = [original_path]
if denoised_path and denoised_path != original_path:
to_cleanup.append(denoised_path)
if vocals_path and vocals_path not in [original_path, denoised_path]:
to_cleanup.append(vocals_path)
await cls.cleanup_files(*to_cleanup)
return wav_path, duration
except Exception as e:
# Cleanup on error
await cls.cleanup_files(original_path)
if 'denoised_path' in locals() and denoised_path and denoised_path != original_path:
await cls.cleanup_files(denoised_path)
if 'vocals_path' in locals() and vocals_path and vocals_path not in [original_path, denoised_path]:
await cls.cleanup_files(vocals_path)
raise |