artifactnet / inference /audio_utils.py
intrect's picture
feat(space): CPU ONNX runtime build (v9.4, full-song sliding aggregation)
0020ddc
raw
history blame
4.68 kB
# Purpose: Audio load/resample/sliding-chunk utilities for HF Spaces
# Dependencies: soundfile, torch, numpy
"""HF Space ์ „์šฉ โ€” demo/ ๋‚˜ vendor/ ์˜์กด์„ฑ ์—†์Œ.
- load_audio: soundfile ์šฐ์„ , ์‹คํŒจ์‹œ ffmpeg WAV ๋ณ€ํ™˜ fallback
- sliding_chunks: production infer.py::_sliding_chunks ์™€ ๋™์ผํ•œ ๊ทœ์น™
ยท stride=CHUNK_SAMPLES (4s)
ยท ๊ผฌ๋ฆฌ chunk ๋Š” actual_ratio >= 0.5 ์ผ ๋•Œ๋งŒ ์œ ์ง€
ยท ์ตœ์†Œ 1 chunk ๋ณด์žฅ (์งง์€ ๊ณก๋„ padding)
"""
from __future__ import annotations
import subprocess
import tempfile
from pathlib import Path
import numpy as np
import soundfile as sf
import torch
import torch.nn.functional as F
from config import SR, MAX_DURATION_SEC, CHUNK_SAMPLES
_NEEDS_FFMPEG = {".m4a", ".aac", ".wma", ".opus", ".mp4", ".webm"}
def _ffmpeg_to_wav(path: str) -> str | None:
tmp = tempfile.mktemp(suffix=".wav")
try:
r = subprocess.run(
["ffmpeg", "-hide_banner", "-loglevel", "error",
"-i", str(path), "-f", "wav", "-acodec", "pcm_f32le",
"-ac", "2", "-ar", str(SR), "-t", str(MAX_DURATION_SEC),
"-y", tmp],
capture_output=True, timeout=30,
)
return tmp if r.returncode == 0 else None
except Exception:
return None
def load_audio(path: str) -> tuple[np.ndarray, bool]:
"""Return (audio[samples, channels] float32, is_stereo)."""
ext = Path(path).suffix.lower()
converted = None
if ext in _NEEDS_FFMPEG:
converted = _ffmpeg_to_wav(path)
if converted is None:
raise RuntimeError(f"Failed to convert {ext} via ffmpeg")
path = converted
try:
audio, sr = sf.read(str(path), dtype="float32", always_2d=True)
if sr != SR:
try:
import torchaudio
t = torch.from_numpy(audio.T)
resampler = torchaudio.transforms.Resample(sr, SR)
audio = resampler(t).T.numpy()
except Exception:
# scipy fallback (linear) โ€” ํ’ˆ์งˆ ๋‚ฎ์ง€๋งŒ crash ๋ฐฉ์ง€
from scipy.signal import resample_poly
up, down = SR, sr
audio = np.stack([
resample_poly(audio[:, c], up, down)
for c in range(audio.shape[1])
], axis=1).astype(np.float32)
max_samples = MAX_DURATION_SEC * SR
if len(audio) > max_samples:
audio = audio[:max_samples]
is_stereo = audio.shape[1] >= 2
return audio.astype(np.float32), is_stereo
finally:
if converted:
Path(converted).unlink(missing_ok=True)
def load_audio_mono_tensor(path: str) -> tuple[torch.Tensor, np.ndarray, bool]:
audio, is_stereo = load_audio(path)
if is_stereo and audio.shape[1] >= 2:
mono = (audio[:, 0] + audio[:, 1]) / 2.0
else:
mono = audio[:, 0]
return torch.from_numpy(mono), audio, is_stereo
def sliding_chunks(wav: torch.Tensor, chunk_size: int = CHUNK_SAMPLES,
min_actual_ratio: float = 0.5) -> list[tuple[torch.Tensor, dict]]:
"""production ๊ณผ ๋™์ผ ๊ทœ์น™์œผ๋กœ ๊ณก ์ „์ฒด๋ฅผ 4s stride ๋กœ sliding.
๋ฐ˜ํ™˜: [(chunk_tensor, metadata), ...] โ€” metadata = start_sample, actual_samples, actual_ratio, rms
"""
n = wav.shape[0]
chunks: list[tuple[torch.Tensor, dict]] = []
if n < chunk_size // 2:
# 2์ดˆ ๋ฏธ๋งŒ โ€” ๋นˆ ๊ฒฐ๊ณผ (ํ˜ธ์ถœ์ธก์—์„œ "Too Short" ์ฒ˜๋ฆฌ)
return chunks
for start in range(0, n, chunk_size):
c = wav[start:start + chunk_size]
actual = c.shape[0]
actual_ratio = actual / chunk_size
if actual_ratio < min_actual_ratio:
continue
if actual < chunk_size:
c = F.pad(c, (0, chunk_size - actual))
rms = float(torch.sqrt(torch.mean(c ** 2)))
chunks.append((c, {
"start_sample": int(start),
"actual_samples": int(actual),
"actual_ratio": float(actual_ratio),
"rms": rms,
}))
if not chunks:
# 2~4 ์ดˆ ๊ณก โ€” 1 chunk ๋Š” padding ํ•ด์„œ ๋ณด์žฅ
c = wav[:chunk_size]
c = F.pad(c, (0, chunk_size - c.shape[0]))
chunks.append((c, {
"start_sample": 0,
"actual_samples": int(n),
"actual_ratio": float(n / chunk_size),
"rms": float(torch.sqrt(torch.mean(c ** 2))),
}))
return chunks
def get_audio_info(audio: np.ndarray, is_stereo: bool) -> dict:
duration = len(audio) / SR
return {
"duration": duration,
"sr": SR,
"channels": "Stereo" if is_stereo else "Mono",
"samples": len(audio),
}