File size: 31,297 Bytes
d90b8a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
"""
app.py — 主旋律 / ハモリ 分離 & L/R パンニング プロトタイプ
=====================================================================
使い方:
    python app.py <入力ファイル> [--output output_panned.wav] [--inst-vol 0.15]

入力: mp3 / wav / mp4 / mov など ffmpeg が対応する任意フォーマット
出力: 左ch=主旋律(メインボーカル) / 右ch=ハモリ(コーラス) のステレオ WAV

アルゴリズム概要:
    1. subprocess で ffmpeg を直接呼び出し → 安定した音声抽出
    2. Demucs (htdemucs_6s) で6音源分離
         → vocals / other / drums / bass / guitar / piano
    3. パンニング
         - vocals → 左ch 100%
         - other  → 右ch 100% (コーラス/ハモリ)
         - 伴奏   → センター & 音量縮小 (--inst-vol で調整, 0=ミュート)
    4. ミックスして output_panned.wav を出力
"""

import argparse
import os
import sys
import tempfile
import shutil
import subprocess
from pathlib import Path

import numpy as np
import soundfile as sf

# ── torchaudio.save モンキーパッチ ─────────────────────────────
# Windows 環境で torchcodec の DLL が読み込めず torchaudio.save が
# 失敗する問題を回避するため、soundfile ベースの保存処理で置き換える。
# これにより Demucs の内部での WAV 書き出しが正常に動作する。
def _patched_torchaudio_save(uri, src, sample_rate, **kwargs):
    """torchaudio.save を soundfile で代替する"""
    import numpy as np
    import soundfile as sf
    data = src.detach().cpu().numpy()
    # torchaudio は (C, T) 形式、soundfile は (T, C) を期待する
    if data.ndim == 2:
        data = data.T  # (T, C) に変換
    elif data.ndim == 1:
        pass  # モノラルはそのまま
    sf.write(str(uri), data, sample_rate, subtype="PCM_16")

try:
    import torchaudio
    torchaudio.save = _patched_torchaudio_save
    # 新しいバージョンの内部モジュールもパッチ
    try:
        import torchaudio.backend.soundfile_backend as _sfb
        _sfb.save = _patched_torchaudio_save
    except Exception:
        pass
    try:
        import torchaudio._backend.utils as _bu
        # save 関数が複数箇所に散在するため torchaudio 本体だけで十分
    except Exception:
        pass
except ImportError:
    pass  # torchaudio がない場合は無視


# ── UVR MDX-NET によるリード/バッキングボーカル分離 ────────────
# UVR_MDXNET_KARA_2 モデル: ボーカルステムから
#   Lead (リードボーカル) と Backing (バッキング/ハモリ) を分離する

MDX_MODEL_REPO = "seanghay/uvr_models"
MDX_MODEL_FILE = "UVR_MDXNET_KARA_2.onnx"

# モデルの設定値 (UVR_MDXNET_KARA_2: 入力形状 [batch, 4, 2048, 256])
MDX_CONFIG = {
    "dim_f": 2048,    # 周波数ビン数(モデルの入力チャネル / 2)
    "dim_t": 256,     # 時間フレーム数
    "n_fft": 6144,    # STFT ウィンドウサイズ
    "hop_length": 1024,
    "sample_rate": 44100,
    "overlap": 0.75,
}


def download_mdx_model(cache_dir: Path) -> Path:
    """HuggingFace から UVR MDX-NET KARA モデルを取得する"""
    model_path = cache_dir / MDX_MODEL_FILE
    if model_path.exists():
        print(f"[INFO] MDX モデルはキャッシュ済み: {model_path}")
        return model_path

    print(f"[INFO] UVR MDX-NET KARA モデルをダウンロード中... (初回のみ・約53MB)")
    try:
        from huggingface_hub import hf_hub_download
        downloaded = hf_hub_download(
            repo_id=MDX_MODEL_REPO,
            filename=MDX_MODEL_FILE,
            local_dir=str(cache_dir),
        )
        return Path(downloaded)
    except Exception as e:
        raise RuntimeError(
            f"MDX モデルのダウンロードに失敗しました: {e}\n"
            "インターネット接続を確認してください。"
        )


def mdx_separate_lead_backing(
    vocals_wav: np.ndarray,
    sr: int,
    model_path: Path,
) -> tuple[np.ndarray, np.ndarray]:
    """
    UVR MDX-NET KARA モデルを使ってボーカルを
    リードボーカル と バッキング/ハモリ に分離する。

    モデル入力形状: [1, 4, dim_f, dim_t] = [1, 4, 2048, 256]
    vocals_wav: (samples, 2) float32 のステレオ配列
    戻り値: (lead, backing) それぞれ (samples, 2) float32
    """
    import onnxruntime as ort

    print("[INFO] UVR MDX-NET KARA でリード/バッキング分離を開始...")

    dim_f      = MDX_CONFIG["dim_f"]       # 2048
    dim_t      = MDX_CONFIG["dim_t"]       # 256
    n_fft      = MDX_CONFIG["n_fft"]       # 6144
    hop_length = MDX_CONFIG["hop_length"]  # 1024
    overlap    = MDX_CONFIG["overlap"]     # 0.75

    # ONNX セッション
    sess_opts = ort.SessionOptions()
    sess_opts.log_severity_level = 3
    session = ort.InferenceSession(
        str(model_path),
        sess_options=sess_opts,
        providers=["CPUExecutionProvider"],
    )
    input_name = session.get_inputs()[0].name

    # 入力: (samples, 2) → (2, T)
    mix = vocals_wav.T.astype(np.float32)  # (2, T)
    n_channels, T = mix.shape

    # チャンクサイズ = dim_t フレーム分のサンプル数
    chunk_samples = hop_length * dim_t
    step_samples  = int(chunk_samples * (1 - overlap))
    pad_size      = chunk_samples   # 前後にゼロパディング

    mix_padded = np.pad(mix, ((0, 0), (pad_size, pad_size + chunk_samples)))

    # 出力バッファ
    out_sum   = np.zeros((n_channels, mix_padded.shape[1]), dtype=np.float64)
    out_count = np.zeros(mix_padded.shape[1], dtype=np.float64)

    # ハン窓フェード
    fade_len = step_samples
    fade_in  = np.linspace(0.0, 1.0, fade_len)
    fade_out = fade_in[::-1]
    win      = np.ones(chunk_samples)
    win[:fade_len]  = fade_in
    win[-fade_len:] = fade_out

    starts = range(0, mix_padded.shape[1] - chunk_samples, step_samples)
    total  = len(list(starts))
    print(f"[INFO] {total} チャンクを処理中...")

    for idx, start in enumerate(range(0, mix_padded.shape[1] - chunk_samples, step_samples)):
        chunk = mix_padded[:, start: start + chunk_samples]  # (2, chunk_samples)

        # STFT → (2, n_fft//2+1, dim_t)
        stft_l = _np_stft(chunk[0], n_fft, hop_length, dim_t)
        stft_r = _np_stft(chunk[1], n_fft, hop_length, dim_t)

        # 上位 dim_f ビンだけ使う
        sl = stft_l[:dim_f, :]   # (dim_f, dim_t)
        sr_ = stft_r[:dim_f, :]

        # real/imag → (1, 4, dim_f, dim_t)
        model_input = np.stack([
            sl.real, sl.imag,
            sr_.real, sr_.imag,
        ], axis=0)[np.newaxis].astype(np.float32)  # (1,4,dim_f,dim_t)

        # 推論 → (1, 4, dim_f, dim_t)
        pred = session.run(None, {input_name: model_input})[0][0]  # (4, dim_f, dim_t)

        # マスクをそのまま分離マスクとして適用
        sep_l_stft = (pred[0] + 1j * pred[1]) * sl
        sep_r_stft = (pred[2] + 1j * pred[3]) * sr_

        # 周波数ビンを元の長さに戻す (残りはゼロ)
        full_l = np.zeros((n_fft // 2 + 1, dim_t), dtype=np.complex64)
        full_r = np.zeros((n_fft // 2 + 1, dim_t), dtype=np.complex64)
        full_l[:dim_f] = sep_l_stft
        full_r[:dim_f] = sep_r_stft

        # iSTFT
        sig_l = _np_istft(full_l, n_fft, hop_length, chunk_samples)
        sig_r = _np_istft(full_r, n_fft, hop_length, chunk_samples)
        sep = np.stack([sig_l, sig_r])  # (2, chunk_samples)

        # オーバーラップ加算
        out_sum[:, start: start + chunk_samples]  += sep * win
        out_count[start: start + chunk_samples]   += win

        if (idx + 1) % max(1, total // 10) == 0:
            print(f"  {idx+1}/{total} チャンク完了")

    np.maximum(out_count, 1e-8, out_count)
    result = out_sum / out_count

    # パディング除去
    lead    = result[:, pad_size: pad_size + T]   # (2, T)
    backing = mix - lead                           # 残差

    print("[INFO] リード/バッキング分離完了")
    return lead.T.astype(np.float32), backing.T.astype(np.float32)


def _np_stft(signal: np.ndarray, n_fft: int, hop_length: int, n_frames: int) -> np.ndarray:
    """
    NumPy で STFT を計算する。
    signal: (T,) float32
    戻り値: (n_fft//2+1, n_frames) complex64
    """
    window = np.hanning(n_fft).astype(np.float32)
    # 必要なサンプル数になるようにパディング
    required = n_fft + hop_length * (n_frames - 1)
    if signal.shape[0] < required:
        signal = np.pad(signal, (0, required - signal.shape[0]))
    else:
        signal = signal[:required]

    out = np.zeros((n_fft // 2 + 1, n_frames), dtype=np.complex64)
    for i in range(n_frames):
        s = i * hop_length
        frame = signal[s: s + n_fft] * window
        out[:, i] = np.fft.rfft(frame, n=n_fft).astype(np.complex64)
    return out


def _np_istft(stft: np.ndarray, n_fft: int, hop_length: int, length: int) -> np.ndarray:
    """
    NumPy で iSTFT を計算する。
    stft: (n_fft//2+1, T) complex64
    戻り値: (length,) float32
    """
    window = np.hanning(n_fft).astype(np.float32)
    n_frames = stft.shape[1]
    out  = np.zeros(length + n_fft, dtype=np.float64)
    wsum = np.zeros(length + n_fft, dtype=np.float64)
    for i in range(n_frames):
        frame = np.fft.irfft(stft[:, i], n=n_fft).real * window
        s = i * hop_length
        e = s + n_fft
        if s >= length:
            break
        out[s:e]  += frame
        wsum[s:e] += window ** 2
    np.maximum(wsum, 1e-8, wsum)
    return (out / wsum)[:length].astype(np.float32)




def download_mdx_model(cache_dir: Path) -> Path:
    """HuggingFace から UVR MDX-NET KARA モデルを取得する"""
    model_path = cache_dir / MDX_MODEL_FILE
    if model_path.exists():
        print(f"[INFO] MDX モデルはキャッシュ済み: {model_path}")
        return model_path

    print(f"[INFO] UVR MDX-NET KARA モデルをダウンロード中... (初回のみ)")
    try:
        from huggingface_hub import hf_hub_download
        downloaded = hf_hub_download(
            repo_id=MDX_MODEL_REPO,
            filename=MDX_MODEL_FILE,
            local_dir=str(cache_dir),
        )
        return Path(downloaded)
    except Exception as e:
        raise RuntimeError(
            f"MDX モデルのダウンロードに失敗しました: {e}\n"
            "インターネット接続を確認してください。"
        )




# ── ユーティリティ ──────────────────────────────────────────────


VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".m4v", ".flv", ".webm", ".ts"}
AUDIO_EXTENSIONS = {".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a"}


def is_url(text: str) -> bool:
    """http:// or https:// で始まる文字列を URL と判定する"""
    return text.startswith("http://") or text.startswith("https://")


def download_audio_from_url(url: str, tmp_dir: Path) -> Path:
    """
    yt-dlp で YouTube などの URL から音声をダウンロードし WAV に変換する。
    YouTube 以外にも yt-dlp が対応するサイトは利用可能。
    戻り値: 一時 WAV ファイルの Path
    """
    try:
        import yt_dlp
    except ImportError:
        raise ImportError(
            "yt-dlp が見つかりません。\n"
            "pip install yt-dlp でインストールしてください。"
        )

    out_wav = tmp_dir / "source.wav"
    print(f"[INFO] URL を検出。yt-dlp で音声をダウンロード中...")
    print(f"       URL: {url}")

    ydl_opts = {
        # WAV に直接変換して一時ディレクトリに保存
        "format": "bestaudio/best",
        "outtmpl": str(tmp_dir / "ydl_download.%(ext)s"),
        "postprocessors": [
            {
                "key": "FFmpegExtractAudio",
                "preferredcodec": "wav",
                "preferredquality": "0",
            }
        ],
        "quiet": False,
        "no_warnings": False,
        "noplaylist": True,   # プレイリストでなく先頭の1曲のみ取得
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=True)
        title = info.get("title", "unknown")
        print(f"[INFO] ダウンロード完了: {title}")

    # yt-dlp が生成した WAV を探す
    wav_candidates = list(tmp_dir.glob("ydl_download*.wav"))
    if not wav_candidates:
        raise FileNotFoundError(
            "yt-dlp の変換後 WAV が見つかりません。"
            "ffmpeg が正しくインストールされているか確認してください。"
        )
    downloaded_wav = wav_candidates[0]

    # ffmpeg でサンプルレートを 44100Hz に統一
    ffmpeg = find_ffmpeg()
    result = subprocess.run(
        [
            ffmpeg, "-y",
            "-i", str(downloaded_wav),
            "-ar", "44100",
            "-ac", "2",
            str(out_wav),
        ],
        capture_output=True, text=True, encoding="utf-8", errors="replace",
    )
    if result.returncode != 0:
        print(f"[ffmpeg stderr]\n{result.stderr[-2000:]}")
        raise RuntimeError("音声変換に失敗しました")

    print(f"[INFO] 音声変換完了 → {out_wav.name}")
    return out_wav


def find_ffmpeg() -> str:
    """ffmpeg の実行パスを返す。見つからなければ例外を投げる"""
    ffmpeg_cmd = shutil.which("ffmpeg")
    if ffmpeg_cmd:
        return ffmpeg_cmd
    candidates = [
        r"C:\ffmpeg\bin\ffmpeg.exe",
        r"C:\Program Files\ffmpeg\bin\ffmpeg.exe",
        r"C:\ProgramData\chocolatey\bin\ffmpeg.exe",
    ]
    for c in candidates:
        if Path(c).exists():
            return c
    raise FileNotFoundError(
        "ffmpeg が見つかりません。\n"
        "Windows: winget install Gyan.FFmpeg\n"
        "Mac:     brew install ffmpeg\n"
        "Linux:   sudo apt install ffmpeg"
    )


def check_audio_stream(input_path: Path) -> None:
    """
    ffprobe で音声ストリームの有無を確認する。
    音声がない場合は分かりやすいエラーメッセージを出して終了する。
    """
    ffprobe_cmd = shutil.which("ffprobe")
    if not ffprobe_cmd:
        # ffprobe がない場合はスキップ(ffmpegで失敗させる)
        return

    result = subprocess.run(
        [
            ffprobe_cmd,
            "-v", "quiet",
            "-select_streams", "a",   # 音声ストリームのみ
            "-show_entries", "stream=codec_type",
            "-of", "csv=p=0",
            str(input_path),
        ],
        capture_output=True, text=True, encoding="utf-8", errors="replace",
    )
    if not result.stdout.strip():
        raise ValueError(
            f"'{input_path.name}' に音声トラックが見つかりません。\n"
            "このファイルには映像のみが含まれているため処理できません。\n"
            "音声トラック付きのファイル(mp3/wav/mp4など)を指定してください。"
        )


def extract_audio(input_path: Path, out_wav: Path) -> None:
    """
    subprocess で ffmpeg を直接呼び出して音声を WAV に変換。
    動画・音声どちらにも対応。事前に音声ストリームの有無を確認する。
    """
    ffmpeg = find_ffmpeg()
    # 音声トラックの有無を事前確認
    check_audio_stream(input_path)
    print(f"[INFO] ffmpeg で音声変換中: {input_path.name}{out_wav.name}")

    cmd = [
        ffmpeg,
        "-y",                    # 上書き確認をスキップ
        "-i", str(input_path),
        "-vn",                   # 映像トラックを除外
        "-acodec", "pcm_s16le",  # 16bit PCM
        "-ar", "44100",          # サンプルレート
        "-ac", "2",              # ステレオ
        str(out_wav),
    ]

    result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", errors="replace")
    if result.returncode != 0:
        print(f"[ffmpeg stderr]\n{result.stderr[-2000:]}")  # 末尾2000文字だけ表示
        raise RuntimeError(f"ffmpeg が失敗しました (returncode={result.returncode})")

    print(f"[INFO] 音声変換完了 → {out_wav}")


def prepare_audio(input_path_or_url: str | Path, tmp_dir: Path) -> Path:
    """URL もしくはファイルパスを受け取り、WAV に変換して返す"""
    # URLの場合は yt-dlp でダウンロード
    if isinstance(input_path_or_url, str) and is_url(input_path_or_url):
        return download_audio_from_url(input_path_or_url, tmp_dir)

    input_path = Path(input_path_or_url)
    ext = input_path.suffix.lower()
    out_wav = tmp_dir / "source.wav"

    if ext == ".wav":
        # WAV でもサンプルレートが異なる場合があるため ffmpeg で統一する
        extract_audio(input_path, out_wav)
        return out_wav

    if ext in VIDEO_EXTENSIONS or ext in AUDIO_EXTENSIONS:
        extract_audio(input_path, out_wav)
        return out_wav

    # 不明な拡張子でも ffmpeg に渡してみる
    print(f"[WARN] 未知の拡張子 '{ext}'。ffmpeg で変換を試みます。")
    extract_audio(input_path, out_wav)
    return out_wav


# ── Demucs 分離 ──────────────────────────────────────────────────


def run_demucs(wav_path: Path, out_dir: Path, model: str) -> tuple[dict, int]:
    """
    Demucs でステム分離を実行。
    戻り値: (stems_dict, sample_rate)
        stems_dict = {stem_name: np.ndarray(shape=[samples, 2], dtype=float32)}
    """
    print(f"\n[INFO] Demucs ({model}) で音源分離を開始...")
    print("       初回実行時はモデルのダウンロードが発生します(数百MB)。しばらくお待ちください。\n")

    cmd = [
        sys.executable, "-m", "demucs",
        "-n", model,
        "-o", str(out_dir),
        str(wav_path),
    ]

    # リアルタイムで進捗を表示しながら実行
    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        encoding="utf-8",
        errors="replace",
    )
    output_lines = []
    for line in proc.stdout:
        line_stripped = line.rstrip()
        output_lines.append(line_stripped)
        print(f"  {line_stripped}")
    proc.wait()

    if proc.returncode != 0:
        raise RuntimeError(
            f"Demucs が失敗しました (returncode={proc.returncode})\n"
            + "\n".join(output_lines[-20:])
        )

    # ── 出力ディレクトリを特定 ──
    # Demucs の出力構造: <out_dir>/<model>/<入力ファイル名(拡張子なし)>/
    stem_dir = out_dir / model / wav_path.stem
    if not stem_dir.exists():
        # ディレクトリ名が一致しない場合はざっくり検索
        found = list(out_dir.rglob("*.wav"))
        if not found:
            raise FileNotFoundError(
                f"Demucs の出力ファイルが見つかりません。出力先: {out_dir}\n"
                f"分離されたモデル名のフォルダを確認してください。"
            )
        stem_dir = found[0].parent
        print(f"[INFO] ステムディレクトリを自動検出: {stem_dir}")

    # ── WAV ファイルを読み込む ──
    stems: dict[str, np.ndarray] = {}
    sr = 44100
    for wav_file in sorted(stem_dir.glob("*.wav")):
        data, sr = sf.read(str(wav_file), always_2d=True)
        stems[wav_file.stem] = data.astype(np.float32)
        print(f"  [stem] {wav_file.stem:12s}: {data.shape} @ {sr}Hz")

    if not stems:
        raise FileNotFoundError(f"ステム WAV が見つかりません: {stem_dir}")

    print(f"\n[INFO] 分離完了。検出ステム: {list(stems.keys())}")
    return stems, sr


# ── ハモリ推定フォールバック ──────────────────────────────────────


def fallback_split_vocals(vocals: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    """
    htdemucs_6s で 'other' が得られない場合のフォールバック。
    ステレオの Mid/Side 分解でハモリ成分を推定する。
    - Mid (L+R)/2 → 主旋律(モノラル中心成分)
    - Side (L-R)/2 → ハモリ候補(ステレオ広がり成分)
    """
    print("[INFO] フォールバック: Mid/Side 分解でハモリを推定します")
    L = vocals[:, 0]
    R = vocals[:, 1] if vocals.shape[1] > 1 else vocals[:, 0]

    mid = ((L + R) / 2.0).astype(np.float32)
    side = ((L - R) / 2.0).astype(np.float32)

    main_vocal = np.stack([mid, mid], axis=1)
    harmony = np.stack([side, side], axis=1)
    return main_vocal, harmony


# ── パンニング & ミックス ─────────────────────────────────────────


def to_stereo(audio: np.ndarray) -> np.ndarray:
    """入力を (samples, 2) のステレオ配列に整形"""
    if audio.ndim == 1:
        audio = np.stack([audio, audio], axis=1)
    elif audio.shape[1] == 1:
        audio = np.concatenate([audio, audio], axis=1)
    return audio[:, :2]  # 3ch以上は最初の2chだけ使う


def pan_to_left(audio: np.ndarray) -> np.ndarray:
    """ステレオ音源を左チャンネルのみに振る (Rch=0)"""
    stereo = to_stereo(audio)
    mono = np.mean(stereo, axis=1, keepdims=True)
    return np.concatenate([mono, np.zeros_like(mono)], axis=1)


def pan_to_right(audio: np.ndarray) -> np.ndarray:
    """ステレオ音源を右チャンネルのみに振る (Lch=0)"""
    stereo = to_stereo(audio)
    mono = np.mean(stereo, axis=1, keepdims=True)
    return np.concatenate([np.zeros_like(mono), mono], axis=1)


def mid_side_split(vocals: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
    """
    vocals ステムを Mid/Side 分解して主旋律とハモリを推定する。

    Mid  = (L + R) / 2  → センターに定位したメインボーカル → 左ch
    Side = (L - R) / 2  → 左右にパンされたハモリ/コーラス  → 右ch

    プロの録音では:
    - メインボーカルはセンター(Mid 成分が大きい)
    - ハモリやコーラスは左右にパン(Side 成分に現れる)
    """
    L = vocals[:, 0].astype(np.float32)
    R = vocals[:, 1].astype(np.float32) if vocals.shape[1] > 1 else L.copy()

    mid  = (L + R) / 2.0
    side = (L - R) / 2.0

    # Side 成分が極端に小さい場合(ほぼモノラル録音)は警告
    mid_rms  = float(np.sqrt(np.mean(mid  ** 2)) + 1e-9)
    side_rms = float(np.sqrt(np.mean(side ** 2)) + 1e-9)
    ratio = side_rms / mid_rms
    print(f"[INFO] Mid/Side 比率: Mid={mid_rms:.4f}, Side={side_rms:.4f}, Side/Mid={ratio:.3f}")
    if ratio < 0.05:
        print("[WARN] Side 成分が非常に小さいです。元音源がモノラル録音の可能性があります。")
        print("       左右の差が小さいため、ハモリ分離の効果が限定的になる場合があります。")

    main_vocal = np.stack([mid, mid], axis=1)   # モノ → ステレオ
    harmony    = np.stack([side, side], axis=1)
    return main_vocal, harmony


def mix_stems(
    stems: dict[str, np.ndarray],
    model: str,
    inst_vol: float,
    mdx_model_path: Path | None = None,
    sr: int = 44100,
) -> np.ndarray:
    """
    分離済みステムを L/R パンニングしてミックスする。

    mdx_model_path が指定されている場合:
        UVR MDX-NET KARA で vocals を Lead/Backing に AI 分離
        Lead(主旋律)→ 左ch, Backing(ハモリ)→ 右ch
    未指定の場合:
        Mid/Side 分解(フォールバック)
    """
    n_samples = max(s.shape[0] for s in stems.values())

    def pad(arr: np.ndarray) -> np.ndarray:
        arr = to_stereo(arr)
        if arr.shape[0] < n_samples:
            arr = np.pad(arr, ((0, n_samples - arr.shape[0]), (0, 0)))
        return arr[:n_samples]

    mix = np.zeros((n_samples, 2), dtype=np.float32)

    if "vocals" in stems:
        vocals_padded = pad(stems["vocals"])

        if mdx_model_path is not None:
            try:
                lead, backing = mdx_separate_lead_backing(vocals_padded, sr, mdx_model_path)
                # 長さを揃える
                L = min(lead.shape[0], n_samples)
                lead_out    = np.zeros((n_samples, 2), dtype=np.float32)
                backing_out = np.zeros((n_samples, 2), dtype=np.float32)
                lead_out[:L]    = lead[:L]
                backing_out[:L] = backing[:L]
                print("[INFO] パンニング: Lead(主旋律) → L, Backing(ハモリ) → R")
                mix += pan_to_left(lead_out)
                mix += pan_to_right(backing_out)
            except Exception as e:
                print(f"[WARN] UVR 推論失敗 ({e})。Mid/Side にフォールバックします。")
                main_vocal, harmony = mid_side_split(vocals_padded)
                mix += pan_to_left(main_vocal)
                mix += pan_to_right(harmony)
        else:
            print("[INFO] vocals を Mid/Side 分解: Mid → L(主旋律), Side → R(ハモリ)")
            main_vocal, harmony = mid_side_split(vocals_padded)
            mix += pan_to_left(main_vocal)
            mix += pan_to_right(harmony)

        # vocals 以外はすべて伴奏としてセンターに縮小
        inst_keys = [k for k in stems if k != "vocals"]
        if inst_vol > 0 and inst_keys:
            print(f"[INFO] 伴奏ステム {inst_keys} → センター x {inst_vol}")
            for k in inst_keys:
                mix += pad(stems[k]) * inst_vol
        elif inst_keys:
            print(f"[INFO] 伴奏ステム {inst_keys} → ミュート (inst-vol=0)")
    else:
        print("[WARN] 'vocals' ステムが見つかりません。全ステムをセンター出力します。")
        for k, arr in stems.items():
            mix += pad(arr)

    # ピーク正規化(クリッピング防止)
    peak = float(np.max(np.abs(mix)))
    if peak > 1e-6:
        mix = mix / peak * 0.95
    return mix



# ── メインフロー ──────────────────────────────────────────────────


def main():
    parser = argparse.ArgumentParser(
        description="主旋律 / ハモリ 分離 & L/R パンニング ツール",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
例:
  python app.py song.mp3
  python app.py movie.mp4 --output result.wav
  python app.py https://www.youtube.com/watch?v=XXXXXXXXXXX
  python app.py song.mp3 --inst-vol 0.0   # 伴奏ミュート
  python app.py song.mp3 --model htdemucs  # 軽量モデル
        """,
    )
    parser.add_argument("input", help="入力ファイル (mp3/wav/mp4/mov など) または URL (YouTube など)")
    parser.add_argument(
        "--output", default="output_panned.wav",
        help="出力 WAV ファイル名 (デフォルト: output_panned.wav)",
    )
    parser.add_argument(
        "--inst-vol", type=float, default=0.15,
        help="伴奏の音量係数 0.0(ミュート)〜1.0 (デフォルト: 0.15)",
    )
    parser.add_argument(
        "--model",
        default="htdemucs_6s",
        choices=["htdemucs_6s", "htdemucs", "mdx_extra", "mdx"],
        help="Demucs モデル (デフォルト: htdemucs_6s)",
    )
    parser.add_argument(
        "--no-mdx", action="store_true",
        help="UVR MDX-NET によるリード/バッキング分離をスキップし Mid/Side 分解を使用",
    )
    args = parser.parse_args()

    # URL かファイルパスか判定
    input_arg = args.input
    if is_url(input_arg):
        # URL の場合はファイル存在チェック不要
        input_for_process = input_arg
    else:
        input_path = Path(input_arg).resolve()
        if not input_path.exists():
            print(f"[ERROR] ファイルが見つかりません: {input_path}")
            sys.exit(1)
        input_for_process = input_path

    tmp_dir = Path(tempfile.mkdtemp(prefix="musicbot_"))
    demucs_out = tmp_dir / "demucs_out"
    demucs_out.mkdir(parents=True, exist_ok=True)

    try:
        # Step 1: 音声準備(URL またはファイル)
        wav_path = prepare_audio(input_for_process, tmp_dir)

        # Step 2: Demucs 分離
        model = args.model
        stems, sr = run_demucs(wav_path, demucs_out, model)

        # htdemucs_6s で 'other' がない場合は htdemucs にフォールバック
        if model == "htdemucs_6s" and "other" not in stems:
            print("\n[WARN] htdemucs_6s で 'other' ステムが見つかりません。htdemucs に切り替えます。")
            shutil.rmtree(demucs_out)
            demucs_out.mkdir(parents=True, exist_ok=True)
            model = "htdemucs"
            stems, sr = run_demucs(wav_path, demucs_out, model)

        # Step 3: UVR MDX-NET モデルを取得(必要なら)
        mdx_model_path = None
        if not args.no_mdx:
            mdx_cache = Path.home() / ".cache" / "musicbot" / "models"
            mdx_cache.mkdir(parents=True, exist_ok=True)
            try:
                mdx_model_path = download_mdx_model(mdx_cache)
            except Exception as e:
                print(f"[WARN] MDX モデルの取得に失敗しました: {e}")
                print("       Mid/Side 分解コードにフォールバックします。")

        # Step 4: パンニング & ミックス
        print("\n[INFO] L/R パンニング & ミックス処理中...")
        mixed = mix_stems(stems, model, args.inst_vol, mdx_model_path=mdx_model_path, sr=sr)

        # Step 4: 出力
        output_path = Path(args.output).resolve()
        sf.write(str(output_path), mixed, sr, subtype="PCM_16")

        print(f"""
╔══════════════════════════════════════╗
║  ✅  処理完了!                      ║
╠══════════════════════════════════════╣
║  出力: {output_path.name:<31}
║  左チャンネル (L): 主旋律             ║
║  右チャンネル (R): ハモリ/コーラス    ║
║  伴奏音量係数  : {args.inst_vol:<22.2f}
╚══════════════════════════════════════╝
{output_path}
""")

    except KeyboardInterrupt:
        print("\n[INFO] ユーザーによって中断されました。")
        sys.exit(0)
    except Exception as e:
        print(f"\n[ERROR] 処理中にエラーが発生しました: {e}")
        import traceback
        traceback.print_exc()
        sys.exit(1)
    finally:
        shutil.rmtree(tmp_dir, ignore_errors=True)


if __name__ == "__main__":
    main()