Spaces:
Sleeping
Sleeping
| """ | |
| app.py — 主旋律 / ハモリ 分離 & L/R パンニング プロトタイプ | |
| ===================================================================== | |
| 使い方: | |
| python app.py <入力ファイル> [--output output_panned.wav] [--inst-vol 0.15] | |
| 入力: mp3 / wav / mp4 / mov など ffmpeg が対応する任意フォーマット | |
| 出力: 左ch=主旋律(メインボーカル) / 右ch=ハモリ(コーラス) のステレオ WAV | |
| アルゴリズム概要: | |
| 1. subprocess で ffmpeg を直接呼び出し → 安定した音声抽出 | |
| 2. Demucs (htdemucs_6s) で6音源分離 | |
| → vocals / other / drums / bass / guitar / piano | |
| 3. パンニング | |
| - vocals → 左ch 100% | |
| - other → 右ch 100% (コーラス/ハモリ) | |
| - 伴奏 → センター & 音量縮小 (--inst-vol で調整, 0=ミュート) | |
| 4. ミックスして output_panned.wav を出力 | |
| """ | |
| import argparse | |
| import os | |
| import sys | |
| import tempfile | |
| import shutil | |
| import subprocess | |
| from pathlib import Path | |
| import numpy as np | |
| import soundfile as sf | |
| # ── torchaudio.save モンキーパッチ ───────────────────────────── | |
| # Windows 環境で torchcodec の DLL が読み込めず torchaudio.save が | |
| # 失敗する問題を回避するため、soundfile ベースの保存処理で置き換える。 | |
| # これにより Demucs の内部での WAV 書き出しが正常に動作する。 | |
| def _patched_torchaudio_save(uri, src, sample_rate, **kwargs): | |
| """torchaudio.save を soundfile で代替する""" | |
| import numpy as np | |
| import soundfile as sf | |
| data = src.detach().cpu().numpy() | |
| # torchaudio は (C, T) 形式、soundfile は (T, C) を期待する | |
| if data.ndim == 2: | |
| data = data.T # (T, C) に変換 | |
| elif data.ndim == 1: | |
| pass # モノラルはそのまま | |
| sf.write(str(uri), data, sample_rate, subtype="PCM_16") | |
| try: | |
| import torchaudio | |
| torchaudio.save = _patched_torchaudio_save | |
| # 新しいバージョンの内部モジュールもパッチ | |
| try: | |
| import torchaudio.backend.soundfile_backend as _sfb | |
| _sfb.save = _patched_torchaudio_save | |
| except Exception: | |
| pass | |
| try: | |
| import torchaudio._backend.utils as _bu | |
| # save 関数が複数箇所に散在するため torchaudio 本体だけで十分 | |
| except Exception: | |
| pass | |
| except ImportError: | |
| pass # torchaudio がない場合は無視 | |
| # ── UVR MDX-NET によるリード/バッキングボーカル分離 ──────────── | |
| # UVR_MDXNET_KARA_2 モデル: ボーカルステムから | |
| # Lead (リードボーカル) と Backing (バッキング/ハモリ) を分離する | |
| MDX_MODEL_REPO = "seanghay/uvr_models" | |
| MDX_MODEL_FILE = "UVR_MDXNET_KARA_2.onnx" | |
| # モデルの設定値 (UVR_MDXNET_KARA_2: 入力形状 [batch, 4, 2048, 256]) | |
| MDX_CONFIG = { | |
| "dim_f": 2048, # 周波数ビン数(モデルの入力チャネル / 2) | |
| "dim_t": 256, # 時間フレーム数 | |
| "n_fft": 6144, # STFT ウィンドウサイズ | |
| "hop_length": 1024, | |
| "sample_rate": 44100, | |
| "overlap": 0.75, | |
| } | |
| def download_mdx_model(cache_dir: Path) -> Path: | |
| """HuggingFace から UVR MDX-NET KARA モデルを取得する""" | |
| model_path = cache_dir / MDX_MODEL_FILE | |
| if model_path.exists(): | |
| print(f"[INFO] MDX モデルはキャッシュ済み: {model_path}") | |
| return model_path | |
| print(f"[INFO] UVR MDX-NET KARA モデルをダウンロード中... (初回のみ・約53MB)") | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| downloaded = hf_hub_download( | |
| repo_id=MDX_MODEL_REPO, | |
| filename=MDX_MODEL_FILE, | |
| local_dir=str(cache_dir), | |
| ) | |
| return Path(downloaded) | |
| except Exception as e: | |
| raise RuntimeError( | |
| f"MDX モデルのダウンロードに失敗しました: {e}\n" | |
| "インターネット接続を確認してください。" | |
| ) | |
| def mdx_separate_lead_backing( | |
| vocals_wav: np.ndarray, | |
| sr: int, | |
| model_path: Path, | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| """ | |
| UVR MDX-NET KARA モデルを使ってボーカルを | |
| リードボーカル と バッキング/ハモリ に分離する。 | |
| モデル入力形状: [1, 4, dim_f, dim_t] = [1, 4, 2048, 256] | |
| vocals_wav: (samples, 2) float32 のステレオ配列 | |
| 戻り値: (lead, backing) それぞれ (samples, 2) float32 | |
| """ | |
| import onnxruntime as ort | |
| print("[INFO] UVR MDX-NET KARA でリード/バッキング分離を開始...") | |
| dim_f = MDX_CONFIG["dim_f"] # 2048 | |
| dim_t = MDX_CONFIG["dim_t"] # 256 | |
| n_fft = MDX_CONFIG["n_fft"] # 6144 | |
| hop_length = MDX_CONFIG["hop_length"] # 1024 | |
| overlap = MDX_CONFIG["overlap"] # 0.75 | |
| # ONNX セッション | |
| sess_opts = ort.SessionOptions() | |
| sess_opts.log_severity_level = 3 | |
| session = ort.InferenceSession( | |
| str(model_path), | |
| sess_options=sess_opts, | |
| providers=["CPUExecutionProvider"], | |
| ) | |
| input_name = session.get_inputs()[0].name | |
| # 入力: (samples, 2) → (2, T) | |
| mix = vocals_wav.T.astype(np.float32) # (2, T) | |
| n_channels, T = mix.shape | |
| # チャンクサイズ = dim_t フレーム分のサンプル数 | |
| chunk_samples = hop_length * dim_t | |
| step_samples = int(chunk_samples * (1 - overlap)) | |
| pad_size = chunk_samples # 前後にゼロパディング | |
| mix_padded = np.pad(mix, ((0, 0), (pad_size, pad_size + chunk_samples))) | |
| # 出力バッファ | |
| out_sum = np.zeros((n_channels, mix_padded.shape[1]), dtype=np.float64) | |
| out_count = np.zeros(mix_padded.shape[1], dtype=np.float64) | |
| # ハン窓フェード | |
| fade_len = step_samples | |
| fade_in = np.linspace(0.0, 1.0, fade_len) | |
| fade_out = fade_in[::-1] | |
| win = np.ones(chunk_samples) | |
| win[:fade_len] = fade_in | |
| win[-fade_len:] = fade_out | |
| starts = range(0, mix_padded.shape[1] - chunk_samples, step_samples) | |
| total = len(list(starts)) | |
| print(f"[INFO] {total} チャンクを処理中...") | |
| for idx, start in enumerate(range(0, mix_padded.shape[1] - chunk_samples, step_samples)): | |
| chunk = mix_padded[:, start: start + chunk_samples] # (2, chunk_samples) | |
| # STFT → (2, n_fft//2+1, dim_t) | |
| stft_l = _np_stft(chunk[0], n_fft, hop_length, dim_t) | |
| stft_r = _np_stft(chunk[1], n_fft, hop_length, dim_t) | |
| # 上位 dim_f ビンだけ使う | |
| sl = stft_l[:dim_f, :] # (dim_f, dim_t) | |
| sr_ = stft_r[:dim_f, :] | |
| # real/imag → (1, 4, dim_f, dim_t) | |
| model_input = np.stack([ | |
| sl.real, sl.imag, | |
| sr_.real, sr_.imag, | |
| ], axis=0)[np.newaxis].astype(np.float32) # (1,4,dim_f,dim_t) | |
| # 推論 → (1, 4, dim_f, dim_t) | |
| pred = session.run(None, {input_name: model_input})[0][0] # (4, dim_f, dim_t) | |
| # マスクをそのまま分離マスクとして適用 | |
| sep_l_stft = (pred[0] + 1j * pred[1]) * sl | |
| sep_r_stft = (pred[2] + 1j * pred[3]) * sr_ | |
| # 周波数ビンを元の長さに戻す (残りはゼロ) | |
| full_l = np.zeros((n_fft // 2 + 1, dim_t), dtype=np.complex64) | |
| full_r = np.zeros((n_fft // 2 + 1, dim_t), dtype=np.complex64) | |
| full_l[:dim_f] = sep_l_stft | |
| full_r[:dim_f] = sep_r_stft | |
| # iSTFT | |
| sig_l = _np_istft(full_l, n_fft, hop_length, chunk_samples) | |
| sig_r = _np_istft(full_r, n_fft, hop_length, chunk_samples) | |
| sep = np.stack([sig_l, sig_r]) # (2, chunk_samples) | |
| # オーバーラップ加算 | |
| out_sum[:, start: start + chunk_samples] += sep * win | |
| out_count[start: start + chunk_samples] += win | |
| if (idx + 1) % max(1, total // 10) == 0: | |
| print(f" {idx+1}/{total} チャンク完了") | |
| np.maximum(out_count, 1e-8, out_count) | |
| result = out_sum / out_count | |
| # パディング除去 | |
| lead = result[:, pad_size: pad_size + T] # (2, T) | |
| backing = mix - lead # 残差 | |
| print("[INFO] リード/バッキング分離完了") | |
| return lead.T.astype(np.float32), backing.T.astype(np.float32) | |
| def _np_stft(signal: np.ndarray, n_fft: int, hop_length: int, n_frames: int) -> np.ndarray: | |
| """ | |
| NumPy で STFT を計算する。 | |
| signal: (T,) float32 | |
| 戻り値: (n_fft//2+1, n_frames) complex64 | |
| """ | |
| window = np.hanning(n_fft).astype(np.float32) | |
| # 必要なサンプル数になるようにパディング | |
| required = n_fft + hop_length * (n_frames - 1) | |
| if signal.shape[0] < required: | |
| signal = np.pad(signal, (0, required - signal.shape[0])) | |
| else: | |
| signal = signal[:required] | |
| out = np.zeros((n_fft // 2 + 1, n_frames), dtype=np.complex64) | |
| for i in range(n_frames): | |
| s = i * hop_length | |
| frame = signal[s: s + n_fft] * window | |
| out[:, i] = np.fft.rfft(frame, n=n_fft).astype(np.complex64) | |
| return out | |
| def _np_istft(stft: np.ndarray, n_fft: int, hop_length: int, length: int) -> np.ndarray: | |
| """ | |
| NumPy で iSTFT を計算する。 | |
| stft: (n_fft//2+1, T) complex64 | |
| 戻り値: (length,) float32 | |
| """ | |
| window = np.hanning(n_fft).astype(np.float32) | |
| n_frames = stft.shape[1] | |
| out = np.zeros(length + n_fft, dtype=np.float64) | |
| wsum = np.zeros(length + n_fft, dtype=np.float64) | |
| for i in range(n_frames): | |
| frame = np.fft.irfft(stft[:, i], n=n_fft).real * window | |
| s = i * hop_length | |
| e = s + n_fft | |
| if s >= length: | |
| break | |
| out[s:e] += frame | |
| wsum[s:e] += window ** 2 | |
| np.maximum(wsum, 1e-8, wsum) | |
| return (out / wsum)[:length].astype(np.float32) | |
| def download_mdx_model(cache_dir: Path) -> Path: | |
| """HuggingFace から UVR MDX-NET KARA モデルを取得する""" | |
| model_path = cache_dir / MDX_MODEL_FILE | |
| if model_path.exists(): | |
| print(f"[INFO] MDX モデルはキャッシュ済み: {model_path}") | |
| return model_path | |
| print(f"[INFO] UVR MDX-NET KARA モデルをダウンロード中... (初回のみ)") | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| downloaded = hf_hub_download( | |
| repo_id=MDX_MODEL_REPO, | |
| filename=MDX_MODEL_FILE, | |
| local_dir=str(cache_dir), | |
| ) | |
| return Path(downloaded) | |
| except Exception as e: | |
| raise RuntimeError( | |
| f"MDX モデルのダウンロードに失敗しました: {e}\n" | |
| "インターネット接続を確認してください。" | |
| ) | |
| # ── ユーティリティ ────────────────────────────────────────────── | |
| VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".m4v", ".flv", ".webm", ".ts"} | |
| AUDIO_EXTENSIONS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"} | |
| def is_url(text: str) -> bool: | |
| """http:// or https:// で始まる文字列を URL と判定する""" | |
| return text.startswith("http://") or text.startswith("https://") | |
| def download_audio_from_url(url: str, tmp_dir: Path) -> Path: | |
| """ | |
| yt-dlp で YouTube などの URL から音声をダウンロードし WAV に変換する。 | |
| YouTube 以外にも yt-dlp が対応するサイトは利用可能。 | |
| 戻り値: 一時 WAV ファイルの Path | |
| """ | |
| try: | |
| import yt_dlp | |
| except ImportError: | |
| raise ImportError( | |
| "yt-dlp が見つかりません。\n" | |
| "pip install yt-dlp でインストールしてください。" | |
| ) | |
| out_wav = tmp_dir / "source.wav" | |
| print(f"[INFO] URL を検出。yt-dlp で音声をダウンロード中...") | |
| print(f" URL: {url}") | |
| ydl_opts = { | |
| # WAV に直接変換して一時ディレクトリに保存 | |
| "format": "bestaudio/best", | |
| "outtmpl": str(tmp_dir / "ydl_download.%(ext)s"), | |
| "postprocessors": [ | |
| { | |
| "key": "FFmpegExtractAudio", | |
| "preferredcodec": "wav", | |
| "preferredquality": "0", | |
| } | |
| ], | |
| "quiet": False, | |
| "no_warnings": False, | |
| "noplaylist": True, # プレイリストでなく先頭の1曲のみ取得 | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=True) | |
| title = info.get("title", "unknown") | |
| print(f"[INFO] ダウンロード完了: {title}") | |
| # yt-dlp が生成した WAV を探す | |
| wav_candidates = list(tmp_dir.glob("ydl_download*.wav")) | |
| if not wav_candidates: | |
| raise FileNotFoundError( | |
| "yt-dlp の変換後 WAV が見つかりません。" | |
| "ffmpeg が正しくインストールされているか確認してください。" | |
| ) | |
| downloaded_wav = wav_candidates[0] | |
| # ffmpeg でサンプルレートを 44100Hz に統一 | |
| ffmpeg = find_ffmpeg() | |
| result = subprocess.run( | |
| [ | |
| ffmpeg, "-y", | |
| "-i", str(downloaded_wav), | |
| "-ar", "44100", | |
| "-ac", "2", | |
| str(out_wav), | |
| ], | |
| capture_output=True, text=True, encoding="utf-8", errors="replace", | |
| ) | |
| if result.returncode != 0: | |
| print(f"[ffmpeg stderr]\n{result.stderr[-2000:]}") | |
| raise RuntimeError("音声変換に失敗しました") | |
| print(f"[INFO] 音声変換完了 → {out_wav.name}") | |
| return out_wav | |
| def find_ffmpeg() -> str: | |
| """ffmpeg の実行パスを返す。見つからなければ例外を投げる""" | |
| ffmpeg_cmd = shutil.which("ffmpeg") | |
| if ffmpeg_cmd: | |
| return ffmpeg_cmd | |
| candidates = [ | |
| r"C:\ffmpeg\bin\ffmpeg.exe", | |
| r"C:\Program Files\ffmpeg\bin\ffmpeg.exe", | |
| r"C:\ProgramData\chocolatey\bin\ffmpeg.exe", | |
| ] | |
| for c in candidates: | |
| if Path(c).exists(): | |
| return c | |
| raise FileNotFoundError( | |
| "ffmpeg が見つかりません。\n" | |
| "Windows: winget install Gyan.FFmpeg\n" | |
| "Mac: brew install ffmpeg\n" | |
| "Linux: sudo apt install ffmpeg" | |
| ) | |
| def check_audio_stream(input_path: Path) -> None: | |
| """ | |
| ffprobe で音声ストリームの有無を確認する。 | |
| 音声がない場合は分かりやすいエラーメッセージを出して終了する。 | |
| """ | |
| ffprobe_cmd = shutil.which("ffprobe") | |
| if not ffprobe_cmd: | |
| # ffprobe がない場合はスキップ(ffmpegで失敗させる) | |
| return | |
| result = subprocess.run( | |
| [ | |
| ffprobe_cmd, | |
| "-v", "quiet", | |
| "-select_streams", "a", # 音声ストリームのみ | |
| "-show_entries", "stream=codec_type", | |
| "-of", "csv=p=0", | |
| str(input_path), | |
| ], | |
| capture_output=True, text=True, encoding="utf-8", errors="replace", | |
| ) | |
| if not result.stdout.strip(): | |
| raise ValueError( | |
| f"'{input_path.name}' に音声トラックが見つかりません。\n" | |
| "このファイルには映像のみが含まれているため処理できません。\n" | |
| "音声トラック付きのファイル(mp3/wav/mp4など)を指定してください。" | |
| ) | |
| def extract_audio(input_path: Path, out_wav: Path) -> None: | |
| """ | |
| subprocess で ffmpeg を直接呼び出して音声を WAV に変換。 | |
| 動画・音声どちらにも対応。事前に音声ストリームの有無を確認する。 | |
| """ | |
| ffmpeg = find_ffmpeg() | |
| # 音声トラックの有無を事前確認 | |
| check_audio_stream(input_path) | |
| print(f"[INFO] ffmpeg で音声変換中: {input_path.name} → {out_wav.name}") | |
| cmd = [ | |
| ffmpeg, | |
| "-y", # 上書き確認をスキップ | |
| "-i", str(input_path), | |
| "-vn", # 映像トラックを除外 | |
| "-acodec", "pcm_s16le", # 16bit PCM | |
| "-ar", "44100", # サンプルレート | |
| "-ac", "2", # ステレオ | |
| str(out_wav), | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", errors="replace") | |
| if result.returncode != 0: | |
| print(f"[ffmpeg stderr]\n{result.stderr[-2000:]}") # 末尾2000文字だけ表示 | |
| raise RuntimeError(f"ffmpeg が失敗しました (returncode={result.returncode})") | |
| print(f"[INFO] 音声変換完了 → {out_wav}") | |
| def prepare_audio(input_path_or_url: str | Path, tmp_dir: Path) -> Path: | |
| """URL もしくはファイルパスを受け取り、WAV に変換して返す""" | |
| # URLの場合は yt-dlp でダウンロード | |
| if isinstance(input_path_or_url, str) and is_url(input_path_or_url): | |
| return download_audio_from_url(input_path_or_url, tmp_dir) | |
| input_path = Path(input_path_or_url) | |
| ext = input_path.suffix.lower() | |
| out_wav = tmp_dir / "source.wav" | |
| if ext == ".wav": | |
| # WAV でもサンプルレートが異なる場合があるため ffmpeg で統一する | |
| extract_audio(input_path, out_wav) | |
| return out_wav | |
| if ext in VIDEO_EXTENSIONS or ext in AUDIO_EXTENSIONS: | |
| extract_audio(input_path, out_wav) | |
| return out_wav | |
| # 不明な拡張子でも ffmpeg に渡してみる | |
| print(f"[WARN] 未知の拡張子 '{ext}'。ffmpeg で変換を試みます。") | |
| extract_audio(input_path, out_wav) | |
| return out_wav | |
| # ── Demucs 分離 ────────────────────────────────────────────────── | |
| def run_demucs(wav_path: Path, out_dir: Path, model: str) -> tuple[dict, int]: | |
| """ | |
| Demucs でステム分離を実行。 | |
| 戻り値: (stems_dict, sample_rate) | |
| stems_dict = {stem_name: np.ndarray(shape=[samples, 2], dtype=float32)} | |
| """ | |
| print(f"\n[INFO] Demucs ({model}) で音源分離を開始...") | |
| print(" 初回実行時はモデルのダウンロードが発生します(数百MB)。しばらくお待ちください。\n") | |
| cmd = [ | |
| sys.executable, "-m", "demucs", | |
| "-n", model, | |
| "-o", str(out_dir), | |
| str(wav_path), | |
| ] | |
| # リアルタイムで進捗を表示しながら実行 | |
| proc = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| encoding="utf-8", | |
| errors="replace", | |
| ) | |
| output_lines = [] | |
| for line in proc.stdout: | |
| line_stripped = line.rstrip() | |
| output_lines.append(line_stripped) | |
| print(f" {line_stripped}") | |
| proc.wait() | |
| if proc.returncode != 0: | |
| raise RuntimeError( | |
| f"Demucs が失敗しました (returncode={proc.returncode})\n" | |
| + "\n".join(output_lines[-20:]) | |
| ) | |
| # ── 出力ディレクトリを特定 ── | |
| # Demucs の出力構造: <out_dir>/<model>/<入力ファイル名(拡張子なし)>/ | |
| stem_dir = out_dir / model / wav_path.stem | |
| if not stem_dir.exists(): | |
| # ディレクトリ名が一致しない場合はざっくり検索 | |
| found = list(out_dir.rglob("*.wav")) | |
| if not found: | |
| raise FileNotFoundError( | |
| f"Demucs の出力ファイルが見つかりません。出力先: {out_dir}\n" | |
| f"分離されたモデル名のフォルダを確認してください。" | |
| ) | |
| stem_dir = found[0].parent | |
| print(f"[INFO] ステムディレクトリを自動検出: {stem_dir}") | |
| # ── WAV ファイルを読み込む ── | |
| stems: dict[str, np.ndarray] = {} | |
| sr = 44100 | |
| for wav_file in sorted(stem_dir.glob("*.wav")): | |
| data, sr = sf.read(str(wav_file), always_2d=True) | |
| stems[wav_file.stem] = data.astype(np.float32) | |
| print(f" [stem] {wav_file.stem:12s}: {data.shape} @ {sr}Hz") | |
| if not stems: | |
| raise FileNotFoundError(f"ステム WAV が見つかりません: {stem_dir}") | |
| print(f"\n[INFO] 分離完了。検出ステム: {list(stems.keys())}") | |
| return stems, sr | |
| # ── ハモリ推定フォールバック ────────────────────────────────────── | |
| def fallback_split_vocals(vocals: np.ndarray) -> tuple[np.ndarray, np.ndarray]: | |
| """ | |
| htdemucs_6s で 'other' が得られない場合のフォールバック。 | |
| ステレオの Mid/Side 分解でハモリ成分を推定する。 | |
| - Mid (L+R)/2 → 主旋律(モノラル中心成分) | |
| - Side (L-R)/2 → ハモリ候補(ステレオ広がり成分) | |
| """ | |
| print("[INFO] フォールバック: Mid/Side 分解でハモリを推定します") | |
| L = vocals[:, 0] | |
| R = vocals[:, 1] if vocals.shape[1] > 1 else vocals[:, 0] | |
| mid = ((L + R) / 2.0).astype(np.float32) | |
| side = ((L - R) / 2.0).astype(np.float32) | |
| main_vocal = np.stack([mid, mid], axis=1) | |
| harmony = np.stack([side, side], axis=1) | |
| return main_vocal, harmony | |
| # ── パンニング & ミックス ───────────────────────────────────────── | |
| def to_stereo(audio: np.ndarray) -> np.ndarray: | |
| """入力を (samples, 2) のステレオ配列に整形""" | |
| if audio.ndim == 1: | |
| audio = np.stack([audio, audio], axis=1) | |
| elif audio.shape[1] == 1: | |
| audio = np.concatenate([audio, audio], axis=1) | |
| return audio[:, :2] # 3ch以上は最初の2chだけ使う | |
| def pan_to_left(audio: np.ndarray) -> np.ndarray: | |
| """ステレオ音源を左チャンネルのみに振る (Rch=0)""" | |
| stereo = to_stereo(audio) | |
| mono = np.mean(stereo, axis=1, keepdims=True) | |
| return np.concatenate([mono, np.zeros_like(mono)], axis=1) | |
| def pan_to_right(audio: np.ndarray) -> np.ndarray: | |
| """ステレオ音源を右チャンネルのみに振る (Lch=0)""" | |
| stereo = to_stereo(audio) | |
| mono = np.mean(stereo, axis=1, keepdims=True) | |
| return np.concatenate([np.zeros_like(mono), mono], axis=1) | |
| def mid_side_split(vocals: np.ndarray) -> tuple[np.ndarray, np.ndarray]: | |
| """ | |
| vocals ステムを Mid/Side 分解して主旋律とハモリを推定する。 | |
| Mid = (L + R) / 2 → センターに定位したメインボーカル → 左ch | |
| Side = (L - R) / 2 → 左右にパンされたハモリ/コーラス → 右ch | |
| プロの録音では: | |
| - メインボーカルはセンター(Mid 成分が大きい) | |
| - ハモリやコーラスは左右にパン(Side 成分に現れる) | |
| """ | |
| L = vocals[:, 0].astype(np.float32) | |
| R = vocals[:, 1].astype(np.float32) if vocals.shape[1] > 1 else L.copy() | |
| mid = (L + R) / 2.0 | |
| side = (L - R) / 2.0 | |
| # Side 成分が極端に小さい場合(ほぼモノラル録音)は警告 | |
| mid_rms = float(np.sqrt(np.mean(mid ** 2)) + 1e-9) | |
| side_rms = float(np.sqrt(np.mean(side ** 2)) + 1e-9) | |
| ratio = side_rms / mid_rms | |
| print(f"[INFO] Mid/Side 比率: Mid={mid_rms:.4f}, Side={side_rms:.4f}, Side/Mid={ratio:.3f}") | |
| if ratio < 0.05: | |
| print("[WARN] Side 成分が非常に小さいです。元音源がモノラル録音の可能性があります。") | |
| print(" 左右の差が小さいため、ハモリ分離の効果が限定的になる場合があります。") | |
| main_vocal = np.stack([mid, mid], axis=1) # モノ → ステレオ | |
| harmony = np.stack([side, side], axis=1) | |
| return main_vocal, harmony | |
| def mix_stems( | |
| stems: dict[str, np.ndarray], | |
| model: str, | |
| inst_vol: float, | |
| mdx_model_path: Path | None = None, | |
| sr: int = 44100, | |
| ) -> np.ndarray: | |
| """ | |
| 分離済みステムを L/R パンニングしてミックスする。 | |
| mdx_model_path が指定されている場合: | |
| UVR MDX-NET KARA で vocals を Lead/Backing に AI 分離 | |
| Lead(主旋律)→ 左ch, Backing(ハモリ)→ 右ch | |
| 未指定の場合: | |
| Mid/Side 分解(フォールバック) | |
| """ | |
| n_samples = max(s.shape[0] for s in stems.values()) | |
| def pad(arr: np.ndarray) -> np.ndarray: | |
| arr = to_stereo(arr) | |
| if arr.shape[0] < n_samples: | |
| arr = np.pad(arr, ((0, n_samples - arr.shape[0]), (0, 0))) | |
| return arr[:n_samples] | |
| mix = np.zeros((n_samples, 2), dtype=np.float32) | |
| if "vocals" in stems: | |
| vocals_padded = pad(stems["vocals"]) | |
| if mdx_model_path is not None: | |
| try: | |
| lead, backing = mdx_separate_lead_backing(vocals_padded, sr, mdx_model_path) | |
| # 長さを揃える | |
| L = min(lead.shape[0], n_samples) | |
| lead_out = np.zeros((n_samples, 2), dtype=np.float32) | |
| backing_out = np.zeros((n_samples, 2), dtype=np.float32) | |
| lead_out[:L] = lead[:L] | |
| backing_out[:L] = backing[:L] | |
| print("[INFO] パンニング: Lead(主旋律) → L, Backing(ハモリ) → R") | |
| mix += pan_to_left(lead_out) | |
| mix += pan_to_right(backing_out) | |
| except Exception as e: | |
| print(f"[WARN] UVR 推論失敗 ({e})。Mid/Side にフォールバックします。") | |
| main_vocal, harmony = mid_side_split(vocals_padded) | |
| mix += pan_to_left(main_vocal) | |
| mix += pan_to_right(harmony) | |
| else: | |
| print("[INFO] vocals を Mid/Side 分解: Mid → L(主旋律), Side → R(ハモリ)") | |
| main_vocal, harmony = mid_side_split(vocals_padded) | |
| mix += pan_to_left(main_vocal) | |
| mix += pan_to_right(harmony) | |
| # vocals 以外はすべて伴奏としてセンターに縮小 | |
| inst_keys = [k for k in stems if k != "vocals"] | |
| if inst_vol > 0 and inst_keys: | |
| print(f"[INFO] 伴奏ステム {inst_keys} → センター x {inst_vol}") | |
| for k in inst_keys: | |
| mix += pad(stems[k]) * inst_vol | |
| elif inst_keys: | |
| print(f"[INFO] 伴奏ステム {inst_keys} → ミュート (inst-vol=0)") | |
| else: | |
| print("[WARN] 'vocals' ステムが見つかりません。全ステムをセンター出力します。") | |
| for k, arr in stems.items(): | |
| mix += pad(arr) | |
| # ピーク正規化(クリッピング防止) | |
| peak = float(np.max(np.abs(mix))) | |
| if peak > 1e-6: | |
| mix = mix / peak * 0.95 | |
| return mix | |
| # ── メインフロー ────────────────────────────────────────────────── | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="主旋律 / ハモリ 分離 & L/R パンニング ツール", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| 例: | |
| python app.py song.mp3 | |
| python app.py movie.mp4 --output result.wav | |
| python app.py https://www.youtube.com/watch?v=XXXXXXXXXXX | |
| python app.py song.mp3 --inst-vol 0.0 # 伴奏ミュート | |
| python app.py song.mp3 --model htdemucs # 軽量モデル | |
| """, | |
| ) | |
| parser.add_argument("input", help="入力ファイル (mp3/wav/mp4/mov など) または URL (YouTube など)") | |
| parser.add_argument( | |
| "--output", default="output_panned.wav", | |
| help="出力 WAV ファイル名 (デフォルト: output_panned.wav)", | |
| ) | |
| parser.add_argument( | |
| "--inst-vol", type=float, default=0.15, | |
| help="伴奏の音量係数 0.0(ミュート)〜1.0 (デフォルト: 0.15)", | |
| ) | |
| parser.add_argument( | |
| "--model", | |
| default="htdemucs_6s", | |
| choices=["htdemucs_6s", "htdemucs", "mdx_extra", "mdx"], | |
| help="Demucs モデル (デフォルト: htdemucs_6s)", | |
| ) | |
| parser.add_argument( | |
| "--no-mdx", action="store_true", | |
| help="UVR MDX-NET によるリード/バッキング分離をスキップし Mid/Side 分解を使用", | |
| ) | |
| args = parser.parse_args() | |
| # URL かファイルパスか判定 | |
| input_arg = args.input | |
| if is_url(input_arg): | |
| # URL の場合はファイル存在チェック不要 | |
| input_for_process = input_arg | |
| else: | |
| input_path = Path(input_arg).resolve() | |
| if not input_path.exists(): | |
| print(f"[ERROR] ファイルが見つかりません: {input_path}") | |
| sys.exit(1) | |
| input_for_process = input_path | |
| tmp_dir = Path(tempfile.mkdtemp(prefix="musicbot_")) | |
| demucs_out = tmp_dir / "demucs_out" | |
| demucs_out.mkdir(parents=True, exist_ok=True) | |
| try: | |
| # Step 1: 音声準備(URL またはファイル) | |
| wav_path = prepare_audio(input_for_process, tmp_dir) | |
| # Step 2: Demucs 分離 | |
| model = args.model | |
| stems, sr = run_demucs(wav_path, demucs_out, model) | |
| # htdemucs_6s で 'other' がない場合は htdemucs にフォールバック | |
| if model == "htdemucs_6s" and "other" not in stems: | |
| print("\n[WARN] htdemucs_6s で 'other' ステムが見つかりません。htdemucs に切り替えます。") | |
| shutil.rmtree(demucs_out) | |
| demucs_out.mkdir(parents=True, exist_ok=True) | |
| model = "htdemucs" | |
| stems, sr = run_demucs(wav_path, demucs_out, model) | |
| # Step 3: UVR MDX-NET モデルを取得(必要なら) | |
| mdx_model_path = None | |
| if not args.no_mdx: | |
| mdx_cache = Path.home() / ".cache" / "musicbot" / "models" | |
| mdx_cache.mkdir(parents=True, exist_ok=True) | |
| try: | |
| mdx_model_path = download_mdx_model(mdx_cache) | |
| except Exception as e: | |
| print(f"[WARN] MDX モデルの取得に失敗しました: {e}") | |
| print(" Mid/Side 分解コードにフォールバックします。") | |
| # Step 4: パンニング & ミックス | |
| print("\n[INFO] L/R パンニング & ミックス処理中...") | |
| mixed = mix_stems(stems, model, args.inst_vol, mdx_model_path=mdx_model_path, sr=sr) | |
| # Step 4: 出力 | |
| output_path = Path(args.output).resolve() | |
| sf.write(str(output_path), mixed, sr, subtype="PCM_16") | |
| print(f""" | |
| ╔══════════════════════════════════════╗ | |
| ║ ✅ 処理完了! ║ | |
| ╠══════════════════════════════════════╣ | |
| ║ 出力: {output_path.name:<31}║ | |
| ║ 左チャンネル (L): 主旋律 ║ | |
| ║ 右チャンネル (R): ハモリ/コーラス ║ | |
| ║ 伴奏音量係数 : {args.inst_vol:<22.2f}║ | |
| ╚══════════════════════════════════════╝ | |
| → {output_path} | |
| """) | |
| except KeyboardInterrupt: | |
| print("\n[INFO] ユーザーによって中断されました。") | |
| sys.exit(0) | |
| except Exception as e: | |
| print(f"\n[ERROR] 処理中にエラーが発生しました: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| finally: | |
| shutil.rmtree(tmp_dir, ignore_errors=True) | |
| if __name__ == "__main__": | |
| main() | |