harmosplit / app.py
indigo0511
initial: HarmoSplit app
d90b8a8
"""
app.py — 主旋律 / ハモリ 分離 & L/R パンニング プロトタイプ
=====================================================================
使い方:
python app.py <入力ファイル> [--output output_panned.wav] [--inst-vol 0.15]
入力: mp3 / wav / mp4 / mov など ffmpeg が対応する任意フォーマット
出力: 左ch=主旋律(メインボーカル) / 右ch=ハモリ(コーラス) のステレオ WAV
アルゴリズム概要:
1. subprocess で ffmpeg を直接呼び出し → 安定した音声抽出
2. Demucs (htdemucs_6s) で6音源分離
→ vocals / other / drums / bass / guitar / piano
3. パンニング
- vocals → 左ch 100%
- other → 右ch 100% (コーラス/ハモリ)
- 伴奏 → センター & 音量縮小 (--inst-vol で調整, 0=ミュート)
4. ミックスして output_panned.wav を出力
"""
import argparse
import os
import sys
import tempfile
import shutil
import subprocess
from pathlib import Path
import numpy as np
import soundfile as sf
# ── torchaudio.save モンキーパッチ ─────────────────────────────
# Windows 環境で torchcodec の DLL が読み込めず torchaudio.save が
# 失敗する問題を回避するため、soundfile ベースの保存処理で置き換える。
# これにより Demucs の内部での WAV 書き出しが正常に動作する。
def _patched_torchaudio_save(uri, src, sample_rate, **kwargs):
"""torchaudio.save を soundfile で代替する"""
import numpy as np
import soundfile as sf
data = src.detach().cpu().numpy()
# torchaudio は (C, T) 形式、soundfile は (T, C) を期待する
if data.ndim == 2:
data = data.T # (T, C) に変換
elif data.ndim == 1:
pass # モノラルはそのまま
sf.write(str(uri), data, sample_rate, subtype="PCM_16")
try:
import torchaudio
torchaudio.save = _patched_torchaudio_save
# 新しいバージョンの内部モジュールもパッチ
try:
import torchaudio.backend.soundfile_backend as _sfb
_sfb.save = _patched_torchaudio_save
except Exception:
pass
try:
import torchaudio._backend.utils as _bu
# save 関数が複数箇所に散在するため torchaudio 本体だけで十分
except Exception:
pass
except ImportError:
pass # torchaudio がない場合は無視
# ── UVR MDX-NET によるリード/バッキングボーカル分離 ────────────
# UVR_MDXNET_KARA_2 モデル: ボーカルステムから
# Lead (リードボーカル) と Backing (バッキング/ハモリ) を分離する
MDX_MODEL_REPO = "seanghay/uvr_models"
MDX_MODEL_FILE = "UVR_MDXNET_KARA_2.onnx"
# モデルの設定値 (UVR_MDXNET_KARA_2: 入力形状 [batch, 4, 2048, 256])
MDX_CONFIG = {
"dim_f": 2048, # 周波数ビン数(モデルの入力チャネル / 2)
"dim_t": 256, # 時間フレーム数
"n_fft": 6144, # STFT ウィンドウサイズ
"hop_length": 1024,
"sample_rate": 44100,
"overlap": 0.75,
}
def download_mdx_model(cache_dir: Path) -> Path:
"""HuggingFace から UVR MDX-NET KARA モデルを取得する"""
model_path = cache_dir / MDX_MODEL_FILE
if model_path.exists():
print(f"[INFO] MDX モデルはキャッシュ済み: {model_path}")
return model_path
print(f"[INFO] UVR MDX-NET KARA モデルをダウンロード中... (初回のみ・約53MB)")
try:
from huggingface_hub import hf_hub_download
downloaded = hf_hub_download(
repo_id=MDX_MODEL_REPO,
filename=MDX_MODEL_FILE,
local_dir=str(cache_dir),
)
return Path(downloaded)
except Exception as e:
raise RuntimeError(
f"MDX モデルのダウンロードに失敗しました: {e}\n"
"インターネット接続を確認してください。"
)
def mdx_separate_lead_backing(
vocals_wav: np.ndarray,
sr: int,
model_path: Path,
) -> tuple[np.ndarray, np.ndarray]:
"""
UVR MDX-NET KARA モデルを使ってボーカルを
リードボーカル と バッキング/ハモリ に分離する。
モデル入力形状: [1, 4, dim_f, dim_t] = [1, 4, 2048, 256]
vocals_wav: (samples, 2) float32 のステレオ配列
戻り値: (lead, backing) それぞれ (samples, 2) float32
"""
import onnxruntime as ort
print("[INFO] UVR MDX-NET KARA でリード/バッキング分離を開始...")
dim_f = MDX_CONFIG["dim_f"] # 2048
dim_t = MDX_CONFIG["dim_t"] # 256
n_fft = MDX_CONFIG["n_fft"] # 6144
hop_length = MDX_CONFIG["hop_length"] # 1024
overlap = MDX_CONFIG["overlap"] # 0.75
# ONNX セッション
sess_opts = ort.SessionOptions()
sess_opts.log_severity_level = 3
session = ort.InferenceSession(
str(model_path),
sess_options=sess_opts,
providers=["CPUExecutionProvider"],
)
input_name = session.get_inputs()[0].name
# 入力: (samples, 2) → (2, T)
mix = vocals_wav.T.astype(np.float32) # (2, T)
n_channels, T = mix.shape
# チャンクサイズ = dim_t フレーム分のサンプル数
chunk_samples = hop_length * dim_t
step_samples = int(chunk_samples * (1 - overlap))
pad_size = chunk_samples # 前後にゼロパディング
mix_padded = np.pad(mix, ((0, 0), (pad_size, pad_size + chunk_samples)))
# 出力バッファ
out_sum = np.zeros((n_channels, mix_padded.shape[1]), dtype=np.float64)
out_count = np.zeros(mix_padded.shape[1], dtype=np.float64)
# ハン窓フェード
fade_len = step_samples
fade_in = np.linspace(0.0, 1.0, fade_len)
fade_out = fade_in[::-1]
win = np.ones(chunk_samples)
win[:fade_len] = fade_in
win[-fade_len:] = fade_out
starts = range(0, mix_padded.shape[1] - chunk_samples, step_samples)
total = len(list(starts))
print(f"[INFO] {total} チャンクを処理中...")
for idx, start in enumerate(range(0, mix_padded.shape[1] - chunk_samples, step_samples)):
chunk = mix_padded[:, start: start + chunk_samples] # (2, chunk_samples)
# STFT → (2, n_fft//2+1, dim_t)
stft_l = _np_stft(chunk[0], n_fft, hop_length, dim_t)
stft_r = _np_stft(chunk[1], n_fft, hop_length, dim_t)
# 上位 dim_f ビンだけ使う
sl = stft_l[:dim_f, :] # (dim_f, dim_t)
sr_ = stft_r[:dim_f, :]
# real/imag → (1, 4, dim_f, dim_t)
model_input = np.stack([
sl.real, sl.imag,
sr_.real, sr_.imag,
], axis=0)[np.newaxis].astype(np.float32) # (1,4,dim_f,dim_t)
# 推論 → (1, 4, dim_f, dim_t)
pred = session.run(None, {input_name: model_input})[0][0] # (4, dim_f, dim_t)
# マスクをそのまま分離マスクとして適用
sep_l_stft = (pred[0] + 1j * pred[1]) * sl
sep_r_stft = (pred[2] + 1j * pred[3]) * sr_
# 周波数ビンを元の長さに戻す (残りはゼロ)
full_l = np.zeros((n_fft // 2 + 1, dim_t), dtype=np.complex64)
full_r = np.zeros((n_fft // 2 + 1, dim_t), dtype=np.complex64)
full_l[:dim_f] = sep_l_stft
full_r[:dim_f] = sep_r_stft
# iSTFT
sig_l = _np_istft(full_l, n_fft, hop_length, chunk_samples)
sig_r = _np_istft(full_r, n_fft, hop_length, chunk_samples)
sep = np.stack([sig_l, sig_r]) # (2, chunk_samples)
# オーバーラップ加算
out_sum[:, start: start + chunk_samples] += sep * win
out_count[start: start + chunk_samples] += win
if (idx + 1) % max(1, total // 10) == 0:
print(f" {idx+1}/{total} チャンク完了")
np.maximum(out_count, 1e-8, out_count)
result = out_sum / out_count
# パディング除去
lead = result[:, pad_size: pad_size + T] # (2, T)
backing = mix - lead # 残差
print("[INFO] リード/バッキング分離完了")
return lead.T.astype(np.float32), backing.T.astype(np.float32)
def _np_stft(signal: np.ndarray, n_fft: int, hop_length: int, n_frames: int) -> np.ndarray:
"""
NumPy で STFT を計算する。
signal: (T,) float32
戻り値: (n_fft//2+1, n_frames) complex64
"""
window = np.hanning(n_fft).astype(np.float32)
# 必要なサンプル数になるようにパディング
required = n_fft + hop_length * (n_frames - 1)
if signal.shape[0] < required:
signal = np.pad(signal, (0, required - signal.shape[0]))
else:
signal = signal[:required]
out = np.zeros((n_fft // 2 + 1, n_frames), dtype=np.complex64)
for i in range(n_frames):
s = i * hop_length
frame = signal[s: s + n_fft] * window
out[:, i] = np.fft.rfft(frame, n=n_fft).astype(np.complex64)
return out
def _np_istft(stft: np.ndarray, n_fft: int, hop_length: int, length: int) -> np.ndarray:
"""
NumPy で iSTFT を計算する。
stft: (n_fft//2+1, T) complex64
戻り値: (length,) float32
"""
window = np.hanning(n_fft).astype(np.float32)
n_frames = stft.shape[1]
out = np.zeros(length + n_fft, dtype=np.float64)
wsum = np.zeros(length + n_fft, dtype=np.float64)
for i in range(n_frames):
frame = np.fft.irfft(stft[:, i], n=n_fft).real * window
s = i * hop_length
e = s + n_fft
if s >= length:
break
out[s:e] += frame
wsum[s:e] += window ** 2
np.maximum(wsum, 1e-8, wsum)
return (out / wsum)[:length].astype(np.float32)
def download_mdx_model(cache_dir: Path) -> Path:
"""HuggingFace から UVR MDX-NET KARA モデルを取得する"""
model_path = cache_dir / MDX_MODEL_FILE
if model_path.exists():
print(f"[INFO] MDX モデルはキャッシュ済み: {model_path}")
return model_path
print(f"[INFO] UVR MDX-NET KARA モデルをダウンロード中... (初回のみ)")
try:
from huggingface_hub import hf_hub_download
downloaded = hf_hub_download(
repo_id=MDX_MODEL_REPO,
filename=MDX_MODEL_FILE,
local_dir=str(cache_dir),
)
return Path(downloaded)
except Exception as e:
raise RuntimeError(
f"MDX モデルのダウンロードに失敗しました: {e}\n"
"インターネット接続を確認してください。"
)
# ── ユーティリティ ──────────────────────────────────────────────
VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".m4v", ".flv", ".webm", ".ts"}
AUDIO_EXTENSIONS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"}
def is_url(text: str) -> bool:
"""http:// or https:// で始まる文字列を URL と判定する"""
return text.startswith("http://") or text.startswith("https://")
def download_audio_from_url(url: str, tmp_dir: Path) -> Path:
"""
yt-dlp で YouTube などの URL から音声をダウンロードし WAV に変換する。
YouTube 以外にも yt-dlp が対応するサイトは利用可能。
戻り値: 一時 WAV ファイルの Path
"""
try:
import yt_dlp
except ImportError:
raise ImportError(
"yt-dlp が見つかりません。\n"
"pip install yt-dlp でインストールしてください。"
)
out_wav = tmp_dir / "source.wav"
print(f"[INFO] URL を検出。yt-dlp で音声をダウンロード中...")
print(f" URL: {url}")
ydl_opts = {
# WAV に直接変換して一時ディレクトリに保存
"format": "bestaudio/best",
"outtmpl": str(tmp_dir / "ydl_download.%(ext)s"),
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "wav",
"preferredquality": "0",
}
],
"quiet": False,
"no_warnings": False,
"noplaylist": True, # プレイリストでなく先頭の1曲のみ取得
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
title = info.get("title", "unknown")
print(f"[INFO] ダウンロード完了: {title}")
# yt-dlp が生成した WAV を探す
wav_candidates = list(tmp_dir.glob("ydl_download*.wav"))
if not wav_candidates:
raise FileNotFoundError(
"yt-dlp の変換後 WAV が見つかりません。"
"ffmpeg が正しくインストールされているか確認してください。"
)
downloaded_wav = wav_candidates[0]
# ffmpeg でサンプルレートを 44100Hz に統一
ffmpeg = find_ffmpeg()
result = subprocess.run(
[
ffmpeg, "-y",
"-i", str(downloaded_wav),
"-ar", "44100",
"-ac", "2",
str(out_wav),
],
capture_output=True, text=True, encoding="utf-8", errors="replace",
)
if result.returncode != 0:
print(f"[ffmpeg stderr]\n{result.stderr[-2000:]}")
raise RuntimeError("音声変換に失敗しました")
print(f"[INFO] 音声変換完了 → {out_wav.name}")
return out_wav
def find_ffmpeg() -> str:
"""ffmpeg の実行パスを返す。見つからなければ例外を投げる"""
ffmpeg_cmd = shutil.which("ffmpeg")
if ffmpeg_cmd:
return ffmpeg_cmd
candidates = [
r"C:\ffmpeg\bin\ffmpeg.exe",
r"C:\Program Files\ffmpeg\bin\ffmpeg.exe",
r"C:\ProgramData\chocolatey\bin\ffmpeg.exe",
]
for c in candidates:
if Path(c).exists():
return c
raise FileNotFoundError(
"ffmpeg が見つかりません。\n"
"Windows: winget install Gyan.FFmpeg\n"
"Mac: brew install ffmpeg\n"
"Linux: sudo apt install ffmpeg"
)
def check_audio_stream(input_path: Path) -> None:
"""
ffprobe で音声ストリームの有無を確認する。
音声がない場合は分かりやすいエラーメッセージを出して終了する。
"""
ffprobe_cmd = shutil.which("ffprobe")
if not ffprobe_cmd:
# ffprobe がない場合はスキップ(ffmpegで失敗させる)
return
result = subprocess.run(
[
ffprobe_cmd,
"-v", "quiet",
"-select_streams", "a", # 音声ストリームのみ
"-show_entries", "stream=codec_type",
"-of", "csv=p=0",
str(input_path),
],
capture_output=True, text=True, encoding="utf-8", errors="replace",
)
if not result.stdout.strip():
raise ValueError(
f"'{input_path.name}' に音声トラックが見つかりません。\n"
"このファイルには映像のみが含まれているため処理できません。\n"
"音声トラック付きのファイル(mp3/wav/mp4など)を指定してください。"
)
def extract_audio(input_path: Path, out_wav: Path) -> None:
"""
subprocess で ffmpeg を直接呼び出して音声を WAV に変換。
動画・音声どちらにも対応。事前に音声ストリームの有無を確認する。
"""
ffmpeg = find_ffmpeg()
# 音声トラックの有無を事前確認
check_audio_stream(input_path)
print(f"[INFO] ffmpeg で音声変換中: {input_path.name}{out_wav.name}")
cmd = [
ffmpeg,
"-y", # 上書き確認をスキップ
"-i", str(input_path),
"-vn", # 映像トラックを除外
"-acodec", "pcm_s16le", # 16bit PCM
"-ar", "44100", # サンプルレート
"-ac", "2", # ステレオ
str(out_wav),
]
result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", errors="replace")
if result.returncode != 0:
print(f"[ffmpeg stderr]\n{result.stderr[-2000:]}") # 末尾2000文字だけ表示
raise RuntimeError(f"ffmpeg が失敗しました (returncode={result.returncode})")
print(f"[INFO] 音声変換完了 → {out_wav}")
def prepare_audio(input_path_or_url: str | Path, tmp_dir: Path) -> Path:
"""URL もしくはファイルパスを受け取り、WAV に変換して返す"""
# URLの場合は yt-dlp でダウンロード
if isinstance(input_path_or_url, str) and is_url(input_path_or_url):
return download_audio_from_url(input_path_or_url, tmp_dir)
input_path = Path(input_path_or_url)
ext = input_path.suffix.lower()
out_wav = tmp_dir / "source.wav"
if ext == ".wav":
# WAV でもサンプルレートが異なる場合があるため ffmpeg で統一する
extract_audio(input_path, out_wav)
return out_wav
if ext in VIDEO_EXTENSIONS or ext in AUDIO_EXTENSIONS:
extract_audio(input_path, out_wav)
return out_wav
# 不明な拡張子でも ffmpeg に渡してみる
print(f"[WARN] 未知の拡張子 '{ext}'。ffmpeg で変換を試みます。")
extract_audio(input_path, out_wav)
return out_wav
# ── Demucs 分離 ──────────────────────────────────────────────────
def run_demucs(wav_path: Path, out_dir: Path, model: str) -> tuple[dict, int]:
"""
Demucs でステム分離を実行。
戻り値: (stems_dict, sample_rate)
stems_dict = {stem_name: np.ndarray(shape=[samples, 2], dtype=float32)}
"""
print(f"\n[INFO] Demucs ({model}) で音源分離を開始...")
print(" 初回実行時はモデルのダウンロードが発生します(数百MB)。しばらくお待ちください。\n")
cmd = [
sys.executable, "-m", "demucs",
"-n", model,
"-o", str(out_dir),
str(wav_path),
]
# リアルタイムで進捗を表示しながら実行
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="replace",
)
output_lines = []
for line in proc.stdout:
line_stripped = line.rstrip()
output_lines.append(line_stripped)
print(f" {line_stripped}")
proc.wait()
if proc.returncode != 0:
raise RuntimeError(
f"Demucs が失敗しました (returncode={proc.returncode})\n"
+ "\n".join(output_lines[-20:])
)
# ── 出力ディレクトリを特定 ──
# Demucs の出力構造: <out_dir>/<model>/<入力ファイル名(拡張子なし)>/
stem_dir = out_dir / model / wav_path.stem
if not stem_dir.exists():
# ディレクトリ名が一致しない場合はざっくり検索
found = list(out_dir.rglob("*.wav"))
if not found:
raise FileNotFoundError(
f"Demucs の出力ファイルが見つかりません。出力先: {out_dir}\n"
f"分離されたモデル名のフォルダを確認してください。"
)
stem_dir = found[0].parent
print(f"[INFO] ステムディレクトリを自動検出: {stem_dir}")
# ── WAV ファイルを読み込む ──
stems: dict[str, np.ndarray] = {}
sr = 44100
for wav_file in sorted(stem_dir.glob("*.wav")):
data, sr = sf.read(str(wav_file), always_2d=True)
stems[wav_file.stem] = data.astype(np.float32)
print(f" [stem] {wav_file.stem:12s}: {data.shape} @ {sr}Hz")
if not stems:
raise FileNotFoundError(f"ステム WAV が見つかりません: {stem_dir}")
print(f"\n[INFO] 分離完了。検出ステム: {list(stems.keys())}")
return stems, sr
# ── ハモリ推定フォールバック ──────────────────────────────────────
def fallback_split_vocals(vocals: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
htdemucs_6s で 'other' が得られない場合のフォールバック。
ステレオの Mid/Side 分解でハモリ成分を推定する。
- Mid (L+R)/2 → 主旋律(モノラル中心成分)
- Side (L-R)/2 → ハモリ候補(ステレオ広がり成分)
"""
print("[INFO] フォールバック: Mid/Side 分解でハモリを推定します")
L = vocals[:, 0]
R = vocals[:, 1] if vocals.shape[1] > 1 else vocals[:, 0]
mid = ((L + R) / 2.0).astype(np.float32)
side = ((L - R) / 2.0).astype(np.float32)
main_vocal = np.stack([mid, mid], axis=1)
harmony = np.stack([side, side], axis=1)
return main_vocal, harmony
# ── パンニング & ミックス ─────────────────────────────────────────
def to_stereo(audio: np.ndarray) -> np.ndarray:
"""入力を (samples, 2) のステレオ配列に整形"""
if audio.ndim == 1:
audio = np.stack([audio, audio], axis=1)
elif audio.shape[1] == 1:
audio = np.concatenate([audio, audio], axis=1)
return audio[:, :2] # 3ch以上は最初の2chだけ使う
def pan_to_left(audio: np.ndarray) -> np.ndarray:
"""ステレオ音源を左チャンネルのみに振る (Rch=0)"""
stereo = to_stereo(audio)
mono = np.mean(stereo, axis=1, keepdims=True)
return np.concatenate([mono, np.zeros_like(mono)], axis=1)
def pan_to_right(audio: np.ndarray) -> np.ndarray:
"""ステレオ音源を右チャンネルのみに振る (Lch=0)"""
stereo = to_stereo(audio)
mono = np.mean(stereo, axis=1, keepdims=True)
return np.concatenate([np.zeros_like(mono), mono], axis=1)
def mid_side_split(vocals: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
vocals ステムを Mid/Side 分解して主旋律とハモリを推定する。
Mid = (L + R) / 2 → センターに定位したメインボーカル → 左ch
Side = (L - R) / 2 → 左右にパンされたハモリ/コーラス → 右ch
プロの録音では:
- メインボーカルはセンター(Mid 成分が大きい)
- ハモリやコーラスは左右にパン(Side 成分に現れる)
"""
L = vocals[:, 0].astype(np.float32)
R = vocals[:, 1].astype(np.float32) if vocals.shape[1] > 1 else L.copy()
mid = (L + R) / 2.0
side = (L - R) / 2.0
# Side 成分が極端に小さい場合(ほぼモノラル録音)は警告
mid_rms = float(np.sqrt(np.mean(mid ** 2)) + 1e-9)
side_rms = float(np.sqrt(np.mean(side ** 2)) + 1e-9)
ratio = side_rms / mid_rms
print(f"[INFO] Mid/Side 比率: Mid={mid_rms:.4f}, Side={side_rms:.4f}, Side/Mid={ratio:.3f}")
if ratio < 0.05:
print("[WARN] Side 成分が非常に小さいです。元音源がモノラル録音の可能性があります。")
print(" 左右の差が小さいため、ハモリ分離の効果が限定的になる場合があります。")
main_vocal = np.stack([mid, mid], axis=1) # モノ → ステレオ
harmony = np.stack([side, side], axis=1)
return main_vocal, harmony
def mix_stems(
stems: dict[str, np.ndarray],
model: str,
inst_vol: float,
mdx_model_path: Path | None = None,
sr: int = 44100,
) -> np.ndarray:
"""
分離済みステムを L/R パンニングしてミックスする。
mdx_model_path が指定されている場合:
UVR MDX-NET KARA で vocals を Lead/Backing に AI 分離
Lead(主旋律)→ 左ch, Backing(ハモリ)→ 右ch
未指定の場合:
Mid/Side 分解(フォールバック)
"""
n_samples = max(s.shape[0] for s in stems.values())
def pad(arr: np.ndarray) -> np.ndarray:
arr = to_stereo(arr)
if arr.shape[0] < n_samples:
arr = np.pad(arr, ((0, n_samples - arr.shape[0]), (0, 0)))
return arr[:n_samples]
mix = np.zeros((n_samples, 2), dtype=np.float32)
if "vocals" in stems:
vocals_padded = pad(stems["vocals"])
if mdx_model_path is not None:
try:
lead, backing = mdx_separate_lead_backing(vocals_padded, sr, mdx_model_path)
# 長さを揃える
L = min(lead.shape[0], n_samples)
lead_out = np.zeros((n_samples, 2), dtype=np.float32)
backing_out = np.zeros((n_samples, 2), dtype=np.float32)
lead_out[:L] = lead[:L]
backing_out[:L] = backing[:L]
print("[INFO] パンニング: Lead(主旋律) → L, Backing(ハモリ) → R")
mix += pan_to_left(lead_out)
mix += pan_to_right(backing_out)
except Exception as e:
print(f"[WARN] UVR 推論失敗 ({e})。Mid/Side にフォールバックします。")
main_vocal, harmony = mid_side_split(vocals_padded)
mix += pan_to_left(main_vocal)
mix += pan_to_right(harmony)
else:
print("[INFO] vocals を Mid/Side 分解: Mid → L(主旋律), Side → R(ハモリ)")
main_vocal, harmony = mid_side_split(vocals_padded)
mix += pan_to_left(main_vocal)
mix += pan_to_right(harmony)
# vocals 以外はすべて伴奏としてセンターに縮小
inst_keys = [k for k in stems if k != "vocals"]
if inst_vol > 0 and inst_keys:
print(f"[INFO] 伴奏ステム {inst_keys} → センター x {inst_vol}")
for k in inst_keys:
mix += pad(stems[k]) * inst_vol
elif inst_keys:
print(f"[INFO] 伴奏ステム {inst_keys} → ミュート (inst-vol=0)")
else:
print("[WARN] 'vocals' ステムが見つかりません。全ステムをセンター出力します。")
for k, arr in stems.items():
mix += pad(arr)
# ピーク正規化(クリッピング防止)
peak = float(np.max(np.abs(mix)))
if peak > 1e-6:
mix = mix / peak * 0.95
return mix
# ── メインフロー ──────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="主旋律 / ハモリ 分離 & L/R パンニング ツール",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
例:
python app.py song.mp3
python app.py movie.mp4 --output result.wav
python app.py https://www.youtube.com/watch?v=XXXXXXXXXXX
python app.py song.mp3 --inst-vol 0.0 # 伴奏ミュート
python app.py song.mp3 --model htdemucs # 軽量モデル
""",
)
parser.add_argument("input", help="入力ファイル (mp3/wav/mp4/mov など) または URL (YouTube など)")
parser.add_argument(
"--output", default="output_panned.wav",
help="出力 WAV ファイル名 (デフォルト: output_panned.wav)",
)
parser.add_argument(
"--inst-vol", type=float, default=0.15,
help="伴奏の音量係数 0.0(ミュート)〜1.0 (デフォルト: 0.15)",
)
parser.add_argument(
"--model",
default="htdemucs_6s",
choices=["htdemucs_6s", "htdemucs", "mdx_extra", "mdx"],
help="Demucs モデル (デフォルト: htdemucs_6s)",
)
parser.add_argument(
"--no-mdx", action="store_true",
help="UVR MDX-NET によるリード/バッキング分離をスキップし Mid/Side 分解を使用",
)
args = parser.parse_args()
# URL かファイルパスか判定
input_arg = args.input
if is_url(input_arg):
# URL の場合はファイル存在チェック不要
input_for_process = input_arg
else:
input_path = Path(input_arg).resolve()
if not input_path.exists():
print(f"[ERROR] ファイルが見つかりません: {input_path}")
sys.exit(1)
input_for_process = input_path
tmp_dir = Path(tempfile.mkdtemp(prefix="musicbot_"))
demucs_out = tmp_dir / "demucs_out"
demucs_out.mkdir(parents=True, exist_ok=True)
try:
# Step 1: 音声準備(URL またはファイル)
wav_path = prepare_audio(input_for_process, tmp_dir)
# Step 2: Demucs 分離
model = args.model
stems, sr = run_demucs(wav_path, demucs_out, model)
# htdemucs_6s で 'other' がない場合は htdemucs にフォールバック
if model == "htdemucs_6s" and "other" not in stems:
print("\n[WARN] htdemucs_6s で 'other' ステムが見つかりません。htdemucs に切り替えます。")
shutil.rmtree(demucs_out)
demucs_out.mkdir(parents=True, exist_ok=True)
model = "htdemucs"
stems, sr = run_demucs(wav_path, demucs_out, model)
# Step 3: UVR MDX-NET モデルを取得(必要なら)
mdx_model_path = None
if not args.no_mdx:
mdx_cache = Path.home() / ".cache" / "musicbot" / "models"
mdx_cache.mkdir(parents=True, exist_ok=True)
try:
mdx_model_path = download_mdx_model(mdx_cache)
except Exception as e:
print(f"[WARN] MDX モデルの取得に失敗しました: {e}")
print(" Mid/Side 分解コードにフォールバックします。")
# Step 4: パンニング & ミックス
print("\n[INFO] L/R パンニング & ミックス処理中...")
mixed = mix_stems(stems, model, args.inst_vol, mdx_model_path=mdx_model_path, sr=sr)
# Step 4: 出力
output_path = Path(args.output).resolve()
sf.write(str(output_path), mixed, sr, subtype="PCM_16")
print(f"""
╔══════════════════════════════════════╗
║ ✅ 処理完了! ║
╠══════════════════════════════════════╣
║ 出力: {output_path.name:<31}
║ 左チャンネル (L): 主旋律 ║
║ 右チャンネル (R): ハモリ/コーラス ║
║ 伴奏音量係数 : {args.inst_vol:<22.2f}
╚══════════════════════════════════════╝
{output_path}
""")
except KeyboardInterrupt:
print("\n[INFO] ユーザーによって中断されました。")
sys.exit(0)
except Exception as e:
print(f"\n[ERROR] 処理中にエラーが発生しました: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
if __name__ == "__main__":
main()