# Purpose: Audio load/resample/sliding-chunk utilities for HF Spaces # Dependencies: soundfile, torch, numpy """HF Space 전용 — demo/ 나 vendor/ 의존성 없음. - load_audio: soundfile 우선, 실패시 ffmpeg WAV 변환 fallback - sliding_chunks: production infer.py::_sliding_chunks 와 동일한 규칙 · stride=CHUNK_SAMPLES (4s) · 꼬리 chunk 는 actual_ratio >= 0.5 일 때만 유지 · 최소 1 chunk 보장 (짧은 곡도 padding) """ from __future__ import annotations import subprocess import tempfile from pathlib import Path import numpy as np import soundfile as sf import torch import torch.nn.functional as F from config import SR, MAX_DURATION_SEC, CHUNK_SAMPLES _NEEDS_FFMPEG = {".m4a", ".aac", ".wma", ".opus", ".mp4", ".webm"} def _ffmpeg_to_wav(path: str) -> str | None: tmp = tempfile.mktemp(suffix=".wav") try: r = subprocess.run( ["ffmpeg", "-hide_banner", "-loglevel", "error", "-i", str(path), "-f", "wav", "-acodec", "pcm_f32le", "-ac", "2", "-ar", str(SR), "-t", str(MAX_DURATION_SEC), "-y", tmp], capture_output=True, timeout=30, ) return tmp if r.returncode == 0 else None except Exception: return None def load_audio(path: str) -> tuple[np.ndarray, bool]: """Return (audio[samples, channels] float32, is_stereo).""" ext = Path(path).suffix.lower() converted = None if ext in _NEEDS_FFMPEG: converted = _ffmpeg_to_wav(path) if converted is None: raise RuntimeError(f"Failed to convert {ext} via ffmpeg") path = converted try: audio, sr = sf.read(str(path), dtype="float32", always_2d=True) if sr != SR: try: import torchaudio t = torch.from_numpy(audio.T) resampler = torchaudio.transforms.Resample(sr, SR) audio = resampler(t).T.numpy() except Exception: # scipy fallback (linear) — 품질 낮지만 crash 방지 from scipy.signal import resample_poly up, down = SR, sr audio = np.stack([ resample_poly(audio[:, c], up, down) for c in range(audio.shape[1]) ], axis=1).astype(np.float32) max_samples = MAX_DURATION_SEC * SR if len(audio) > max_samples: audio = audio[:max_samples] is_stereo = audio.shape[1] >= 2 return audio.astype(np.float32), is_stereo finally: if converted: Path(converted).unlink(missing_ok=True) def load_audio_mono_tensor(path: str) -> tuple[torch.Tensor, np.ndarray, bool]: audio, is_stereo = load_audio(path) if is_stereo and audio.shape[1] >= 2: mono = (audio[:, 0] + audio[:, 1]) / 2.0 else: mono = audio[:, 0] return torch.from_numpy(mono), audio, is_stereo def sliding_chunks(wav: torch.Tensor, chunk_size: int = CHUNK_SAMPLES, min_actual_ratio: float = 0.5) -> list[tuple[torch.Tensor, dict]]: """production 과 동일 규칙으로 곡 전체를 4s stride 로 sliding. 반환: [(chunk_tensor, metadata), ...] — metadata = start_sample, actual_samples, actual_ratio, rms """ n = wav.shape[0] chunks: list[tuple[torch.Tensor, dict]] = [] if n < chunk_size // 2: # 2초 미만 — 빈 결과 (호출측에서 "Too Short" 처리) return chunks for start in range(0, n, chunk_size): c = wav[start:start + chunk_size] actual = c.shape[0] actual_ratio = actual / chunk_size if actual_ratio < min_actual_ratio: continue if actual < chunk_size: c = F.pad(c, (0, chunk_size - actual)) rms = float(torch.sqrt(torch.mean(c ** 2))) chunks.append((c, { "start_sample": int(start), "actual_samples": int(actual), "actual_ratio": float(actual_ratio), "rms": rms, })) if not chunks: # 2~4 초 곡 — 1 chunk 는 padding 해서 보장 c = wav[:chunk_size] c = F.pad(c, (0, chunk_size - c.shape[0])) chunks.append((c, { "start_sample": 0, "actual_samples": int(n), "actual_ratio": float(n / chunk_size), "rms": float(torch.sqrt(torch.mean(c ** 2))), })) return chunks def get_audio_info(audio: np.ndarray, is_stereo: bool) -> dict: duration = len(audio) / SR return { "duration": duration, "sr": SR, "channels": "Stereo" if is_stereo else "Mono", "samples": len(audio), }