{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "provenance": [], "gpuType": "T4" }, "kernelspec": { "name": "python3", "display_name": "Python 3" }, "language_info": { "name": "python" }, "accelerator": "GPU" }, "cells": [ { "cell_type": "code", "source": [ "#@title Mount Google Drive\n", "from google.colab import drive\n", "drive.mount('/content/drive')" ], "metadata": { "id": "RkuSSLb7t39L", "cellView": "form" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "#@title Parameters\n", "#@markdown ### **1. General Settings**\n", "project_name = \"\" #@param {type:\"string\"}\n", "mode = \"Splitting\" #@param [\"Splitting\", \"Separate\"]\n", "demucs_model = \"htdemucs\" #@param [\"htdemucs\", \"demucs\", \"htdemucs_ft\", \"demucs_extra\"]\n", "\n", "#@markdown ---\n", "#@markdown ### **2. Input Source**\n", "dataset_source = \"Youtube\" #@param [\"Youtube\", \"Drive\"]\n", "#@markdown **If YouTube:** Provide one or more URLs, separated by commas.\n", "youtube_urls = \"\" #@param {type:\"string\"}\n", "#@markdown **If Drive:** Provide the full path to the FOLDER containing your audio files.\n", "google_drive_folder_path = \"\" #@param {type:\"string\"}\n", "\n", "#@markdown ---\n", "#@markdown ### **3. Processing Settings**\n", "#@markdown **YouTube Trimming (Optional):** Use HH:MM:SS format.\n", "start_time = \"\" #@param {type:\"string\"}\n", "end_time = \"\" #@param {type:\"string\"}\n", "#@markdown **Long Audio Handling:** Split audio longer than this duration before processing with Demucs.\n", "chunk_duration_in_minutes = \"30 minutes\" #@param [\"10 minutes\", \"15 minutes\", \"20 minutes\", \"30 minutes\", \"45 minutes\", \"60 minutes\"]\n", "\n", "#@markdown ---\n", "#@markdown ### **4. Output Settings**\n", "#@markdown Sample rate for the output files. `0` uses the original sample rate.\n", "output_sample_rate = \"48000\" #@param [\"0\", \"8000\", \"16000\", \"22050\", \"32000\", \"44100\", \"48000\"]\n", "output_format = \"mp3\" #@param [\"wav\", \"mp3\"]\n", "\n", "# --- Process Parameters for the next cell ---\n", "chunk_duration_map = {\n", " \"10 minutes\": 600,\n", " \"15 minutes\": 900,\n", " \"20 minutes\": 1200,\n", " \"30 minutes\": 1800,\n", " \"45 minutes\": 2700,\n", " \"60 minutes\": 3600\n", "}\n", "chunk_duration = chunk_duration_map[chunk_duration_in_minutes]\n", "output_sr = int(output_sample_rate)\n", "\n", "# Map new variables to old names for compatibility with the processing script\n", "url = youtube_urls\n", "drive_path = google_drive_folder_path\n", "dataset = dataset_source" ], "metadata": { "id": "23UmiGqUt_0a", "cellView": "form" }, "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "#@title Process Dataset\n", "import os\n", "import subprocess\n", "import glob\n", "import shutil\n", "\n", "print(\"Memulai proses...\\n\")\n", "\n", "# Pastikan runtime Colab menggunakan GPU\n", "print(\"GPU Info:\")\n", "!nvidia-smi\n", "print(\"\\n\")\n", "\n", "# --- Helper Functions ---\n", "def get_duration(file_path):\n", " try:\n", " result = subprocess.run(\n", " [\"ffprobe\", \"-v\", \"error\", \"-show_entries\", \"format=duration\",\n", " \"-of\", \"default=noprint_wrappers=1:nokey=1\", file_path],\n", " stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)\n", " return float(result.stdout.strip())\n", " except Exception as e:\n", " print(f\"Gagal mendapatkan durasi audio untuk {file_path}: {e}\")\n", " return None\n", "\n", "def run_command(command):\n", " result = subprocess.run(command, shell=True, capture_output=True, text=True)\n", " if result.stdout:\n", " print(result.stdout)\n", " if result.stderr:\n", " print(result.stderr)\n", "\n", "# --- Input Validation ---\n", "if not project_name:\n", " raise ValueError(\"Error: Project Name tidak boleh kosong!\")\n", "if dataset == \"Youtube\" and not url:\n", " raise ValueError(\"Error: URL tidak boleh kosong untuk dataset Youtube!\")\n", "if dataset == \"Drive\" and not drive_path:\n", " raise ValueError(\"Error: Drive Path tidak boleh kosong untuk dataset Drive!\")\n", "\n", "# --- Install Dependencies ---\n", "print(\"Menginstal/memperbarui dependensi (yt_dlp, ffmpeg, demucs, librosa)...\")\n", "run_command(\"python3 -m pip install --upgrade yt_dlp ffmpeg-python demucs librosa soundfile --quiet\")\n", "\n", "# === STEP 1: Gather All Audio Sources ===\n", "source_audio_paths = []\n", "temp_download_folder = \"temp_audio_downloads\"\n", "os.makedirs(temp_download_folder, exist_ok=True)\n", "\n", "if dataset == \"Youtube\":\n", " urls = [u.strip() for u in url.split(',') if u.strip()]\n", " print(f\"Ditemukan {len(urls)} URL YouTube untuk diproses.\")\n", " import yt_dlp\n", " for i, u in enumerate(urls):\n", " print(f\"\\nDownloading audio ({i+1}/{len(urls)}) dari: {u}\")\n", " try:\n", " ydl_opts = {\n", " 'format': 'bestaudio/best',\n", " 'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav'}],\n", " 'outtmpl': f'{temp_download_folder}/{project_name}_yt_{i+1}.%(ext)s'\n", " }\n", " with yt_dlp.YoutubeDL(ydl_opts) as ydl:\n", " ydl.download([u])\n", " downloaded_file = os.path.abspath(f\"{temp_download_folder}/{project_name}_yt_{i+1}.wav\")\n", "\n", " # Trimming Logic\n", " if start_time and end_time:\n", " print(f\"Melakukan trimming audio dari {start_time} ke {end_time}...\")\n", " trimmed_file = os.path.abspath(f\"{temp_download_folder}/{project_name}_yt_{i+1}_trimmed.wav\")\n", " trim_cmd = f'ffmpeg -i \"{downloaded_file}\" -ss {start_time} -to {end_time} -c copy \"{trimmed_file}\"'\n", " run_command(trim_cmd)\n", " if os.path.exists(trimmed_file):\n", " source_audio_paths.append(trimmed_file)\n", " else:\n", " print(f\"Warning: Gagal melakukan trimming, file akan diproses penuh.\")\n", " source_audio_paths.append(downloaded_file)\n", " else:\n", " source_audio_paths.append(downloaded_file)\n", " except Exception as e:\n", " print(f\"Gagal mendownload atau memproses URL {u}: {e}\")\n", "elif dataset == \"Drive\":\n", " print(f\"Mencari file audio di folder: {drive_path}\")\n", " allowed_extensions = [\"*.wav\", \"*.mp3\", \"*.flac\", \"*.m4a\"]\n", " for ext in allowed_extensions:\n", " source_audio_paths.extend(glob.glob(os.path.join(drive_path, ext)))\n", " print(f\"Ditemukan {len(source_audio_paths)} file audio.\")\n", "\n", "if not source_audio_paths:\n", " raise Exception(\"Tidak ada file audio sumber yang ditemukan. Hentikan proses.\")\n", "\n", "# === STEP 2: Process Each Audio Source with Demucs ===\n", "all_vocals_paths = []\n", "for idx, audio_input in enumerate(source_audio_paths):\n", " current_audio_name = os.path.splitext(os.path.basename(audio_input))[0]\n", " print(f\"\\n--- Memproses file {idx+1}/{len(source_audio_paths)}: {current_audio_name} ---\")\n", " duration = get_duration(audio_input)\n", " if duration is None:\n", " print(f\"Melewatkan file karena tidak bisa mendapatkan durasi.\")\n", " continue\n", " print(f\"Durasi audio: {duration:.0f} detik.\")\n", "\n", " if duration > chunk_duration:\n", " print(f\"Audio lebih panjang dari {chunk_duration} detik. Melakukan splitting audio menjadi beberapa chunk...\")\n", " chunk_folder = f\"chunks/{current_audio_name}\"\n", " os.makedirs(chunk_folder, exist_ok=True)\n", " split_cmd = f'ffmpeg -hide_banner -loglevel error -i \"{audio_input}\" -f segment -segment_time {chunk_duration} -c copy \"{chunk_folder}/{current_audio_name}_%03d.wav\"'\n", " run_command(split_cmd)\n", " chunk_files = sorted(glob.glob(f\"{chunk_folder}/{current_audio_name}_*.wav\"))\n", " if not chunk_files:\n", " print(f\"Warning: Gagal membuat chunk untuk {current_audio_name}. Melewatkan file ini.\")\n", " continue\n", "\n", " print(f\"Memproses {len(chunk_files)} chunk dengan Demucs...\")\n", " chunk_vocals_files = []\n", " for chunk_file in chunk_files:\n", " demucs_cmd = f'python3 -m demucs.separate --two-stems vocals -n {demucs_model} \"{chunk_file}\"'\n", " run_command(demucs_cmd)\n", " base = os.path.splitext(os.path.basename(chunk_file))[0]\n", " vocals_path = f\"separated/{demucs_model}/{base}/vocals.wav\"\n", " if os.path.exists(vocals_path):\n", " chunk_vocals_files.append(os.path.abspath(vocals_path))\n", " else:\n", " print(f\"Warning: Hasil Demucs untuk {chunk_file} tidak ditemukan.\")\n", "\n", " if not chunk_vocals_files:\n", " print(f\"Warning: Tidak ada vokal yang berhasil diekstrak untuk {current_audio_name}. Melewatkan file ini.\")\n", " continue\n", "\n", " list_file = \"chunks_list.txt\"\n", " with open(list_file, \"w\") as f:\n", " for file in chunk_vocals_files:\n", " # <<< FIX: Changed '\\\\n' to '\\n' to create a proper newline character\n", " f.write(f\"file '{file}'\\n\")\n", "\n", " combined_vocals_path = f\"separated/{demucs_model}/{current_audio_name}_vocals.wav\"\n", " concat_cmd = f'ffmpeg -hide_banner -loglevel error -f concat -safe 0 -i \"{list_file}\" -c copy \"{combined_vocals_path}\"'\n", " run_command(concat_cmd)\n", " print(\"Penggabungan vokal dari chunk selesai.\")\n", "\n", " # Convert combined vocals to the desired output format and sample rate\n", " final_vocals_output_path = os.path.splitext(combined_vocals_path)[0] + f'.{output_format}'\n", " print(f\"Mengonversi vokal gabungan ke format {output_format.upper()} (Sample Rate: {output_sr if output_sr != 0 else 'asli'})...\")\n", " convert_cmd = f'ffmpeg -hide_banner -loglevel error -i \"{combined_vocals_path}\"'\n", " if output_sr != 0:\n", " convert_cmd += f' -ar {output_sr}'\n", " convert_cmd += f' -y \"{final_vocals_output_path}\"'\n", " run_command(convert_cmd)\n", "\n", " if os.path.exists(final_vocals_output_path):\n", " if combined_vocals_path != final_vocals_output_path:\n", " os.remove(combined_vocals_path) # Clean up the intermediate .wav\n", " all_vocals_paths.append(os.path.abspath(final_vocals_output_path))\n", " else:\n", " print(f\"Warning: Gagal mengonversi vokal. Menyimpan file .wav asli.\")\n", " all_vocals_paths.append(os.path.abspath(combined_vocals_path))\n", " else:\n", " print(\"Memproses audio penuh dengan Demucs...\")\n", " demucs_cmd = f'python3 -m demucs.separate --two-stems vocals -n {demucs_model} -o \"separated\" --filename \"{current_audio_name}/{{stem}}.{{ext}}\" \"{audio_input}\"'\n", " run_command(demucs_cmd)\n", " vocals_final_wav = f\"separated/{demucs_model}/{current_audio_name}/vocals.wav\"\n", "\n", " if os.path.exists(vocals_final_wav):\n", " # Convert the final vocals to the desired output format and sample rate\n", " final_vocals_output_path = os.path.splitext(vocals_final_wav)[0] + f'.{output_format}'\n", " print(f\"Mengonversi vokal ke format {output_format.upper()} (Sample Rate: {output_sr if output_sr != 0 else 'asli'})...\")\n", " convert_cmd = f'ffmpeg -hide_banner -loglevel error -i \"{vocals_final_wav}\"'\n", " if output_sr != 0:\n", " convert_cmd += f' -ar {output_sr}'\n", " convert_cmd += f' -y \"{final_vocals_output_path}\"'\n", " run_command(convert_cmd)\n", "\n", " if os.path.exists(final_vocals_output_path):\n", " if vocals_final_wav != final_vocals_output_path:\n", " os.remove(vocals_final_wav) # Clean up the original .wav\n", " all_vocals_paths.append(os.path.abspath(final_vocals_output_path))\n", " else:\n", " print(f\"Warning: Gagal mengonversi vokal. Menyimpan file .wav asli.\")\n", " all_vocals_paths.append(os.path.abspath(vocals_final_wav))\n", " else:\n", " print(f\"Warning: Gagal memisahkan vokal untuk {current_audio_name}.\")\n", " continue\n", " print(\"Proses pemisahan vokal selesai.\")\n", "\n", "# === STEP 3: Splitting Vocals (Jika mode = \"Splitting\") ===\n", "if mode == \"Splitting\":\n", " print(\"\\n--- Melakukan Splitting pada Semua Hasil Vokal ---\")\n", " output_slicer_dir = f\"dataset/{project_name}\"\n", " os.makedirs(output_slicer_dir, exist_ok=True)\n", " try:\n", " import numpy as np\n", " import librosa\n", " import soundfile as sf\n", "\n", " def get_rms(y, frame_length=2048, hop_length=512, pad_mode=\"constant\"):\n", " padding = (int(frame_length // 2), int(frame_length // 2))\n", " y = np.pad(y, padding, mode=pad_mode)\n", " axis = -1\n", " out_strides = y.strides + (y.strides[axis],)\n", " x_shape_trimmed = list(y.shape)\n", " x_shape_trimmed[axis] -= frame_length - 1\n", " out_shape = tuple(x_shape_trimmed) + (frame_length,)\n", " xw = np.lib.stride_tricks.as_strided(y, shape=out_shape, strides=out_strides)\n", " if axis < 0:\n", " target_axis = axis - 1\n", " else:\n", " target_axis = axis + 1\n", " xw = np.moveaxis(xw, -1, target_axis)\n", " slices = [slice(None)] * xw.ndim\n", " slices[axis] = slice(0, None, hop_length)\n", " x = xw[tuple(slices)]\n", " power = np.mean(np.abs(x)**2, axis=-2, keepdims=True)\n", " return np.sqrt(power).squeeze(0)\n", "\n", " class Slicer:\n", " def __init__(self, sr, threshold=-40., min_length=5000, min_interval=300, hop_size=20, max_sil_kept=5000):\n", " if not min_length >= min_interval >= hop_size:\n", " raise ValueError('min_length >= min_interval >= hop_size harus terpenuhi')\n", " if not max_sil_kept >= hop_size:\n", " raise ValueError('max_sil_kept >= hop_size harus terpenuhi')\n", " min_interval = sr * min_interval / 1000\n", " self.threshold = 10 ** (threshold/20.)\n", " self.hop_size = round(sr * hop_size / 1000)\n", " self.win_size = min(round(min_interval), 4 * self.hop_size)\n", " self.min_length = round(sr * min_length / 1000 / self.hop_size)\n", " self.min_interval = round(min_interval / self.hop_size)\n", " self.max_sil_kept = round(sr * max_sil_kept / 1000 / self.hop_size)\n", "\n", " def _apply_slice(self, waveform, begin, end):\n", " return waveform[begin*self.hop_size: min(len(waveform), end*self.hop_size)]\n", "\n", " def slice(self, waveform):\n", " if len(waveform) <= self.min_length:\n", " return [waveform]\n", " rms_list = get_rms(waveform, frame_length=self.win_size, hop_length=self.hop_size)\n", " sil_tags = []\n", " silence_start = None\n", " clip_start = 0\n", " for i, rms in enumerate(rms_list):\n", " if rms < self.threshold:\n", " if silence_start is None:\n", " silence_start = i\n", " continue\n", " if silence_start is None:\n", " continue\n", " is_leading_silence = silence_start == 0 and i > self.max_sil_kept\n", " need_slice_middle = i - silence_start >= self.min_interval and i - clip_start >= self.min_length\n", " if not is_leading_silence and not need_slice_middle:\n", " silence_start = None\n", " continue\n", " if i - silence_start <= self.max_sil_kept:\n", " pos = rms_list[silence_start: i+1].argmin() + silence_start\n", " sil_tags.append((0, pos) if silence_start == 0 else (pos, pos))\n", " clip_start = pos\n", " elif i - silence_start <= self.max_sil_kept * 2:\n", " pos = rms_list[i-self.max_sil_kept: silence_start+self.max_sil_kept+1].argmin() + i-self.max_sil_kept\n", " pos_l = rms_list[silence_start: silence_start+self.max_sil_kept+1].argmin() + silence_start\n", " pos_r = rms_list[i-self.max_sil_kept: i+1].argmin() + i-self.max_sil_kept\n", " sil_tags.append((0, pos_r) if silence_start == 0 else (min(pos_l, pos), max(pos_r, pos)))\n", " clip_start = pos_r\n", " else:\n", " pos_l = rms_list[silence_start: silence_start+self.max_sil_kept+1].argmin() + silence_start\n", " pos_r = rms_list[i-self.max_sil_kept: i+1].argmin() + i-self.max_sil_kept\n", " sil_tags.append((0, pos_r) if silence_start == 0 else (pos_l, pos_r))\n", " clip_start = pos_r\n", " silence_start = None\n", " total_frames = len(rms_list)\n", " if silence_start is not None and total_frames - silence_start >= self.min_interval:\n", " silence_end = min(total_frames, silence_start+self.max_sil_kept)\n", " pos = rms_list[silence_start: silence_end+1].argmin() + silence_start\n", " sil_tags.append((pos, total_frames+1))\n", " if len(sil_tags) == 0:\n", " return [waveform]\n", " chunks = []\n", " if sil_tags[0][0] > 0:\n", " chunks.append(self._apply_slice(waveform, 0, sil_tags[0][0]))\n", " for i in range(len(sil_tags)-1):\n", " chunks.append(self._apply_slice(waveform, sil_tags[i][1], sil_tags[i+1][0]))\n", " if sil_tags[-1][1] < total_frames:\n", " chunks.append(self._apply_slice(waveform, sil_tags[-1][1], total_frames))\n", " return chunks\n", "\n", " global_chunk_count = 0\n", " for vocal_file in all_vocals_paths:\n", " print(f\"Slicing {os.path.basename(vocal_file)}...\")\n", " if not os.path.exists(vocal_file):\n", " print(f\" Warning: File vokal tidak ditemukan: {vocal_file}. Melewatkan.\")\n", " continue\n", "\n", " audio, sr = librosa.load(vocal_file, sr=None, mono=True)\n", " slicer_sr = output_sr if output_sr != 0 else sr\n", "\n", " slicer = Slicer(sr=slicer_sr, threshold=-40, min_length=5000, min_interval=500, hop_size=10, max_sil_kept=500)\n", " chunks = slicer.slice(audio)\n", " for chunk in chunks:\n", " sf.write(f\"{output_slicer_dir}/split_{global_chunk_count}.{output_format}\", chunk, slicer_sr)\n", " global_chunk_count += 1\n", " print(f\"\\nSplitting selesai. Total {global_chunk_count} file dibuat.\")\n", " except Exception as e:\n", " print(f\"Terjadi kesalahan saat splitting: {e}\")\n", " raise e\n", "\n", "# === STEP 4: Copy Hasil ke Google Drive ===\n", "print(\"\\n--- Menyalin Hasil ke Google Drive ---\")\n", "base_drive_folder = f\"/content/drive/MyDrive/dataset/{project_name}\"\n", "vocals_drive_folder = f\"{base_drive_folder}/vocals_only\"\n", "sliced_drive_folder = f\"{base_drive_folder}/sliced_mixed\"\n", "\n", "os.makedirs(vocals_drive_folder, exist_ok=True)\n", "os.makedirs(sliced_drive_folder, exist_ok=True)\n", "\n", "print(f\"Menyalin vokal mentah ke: {vocals_drive_folder}\")\n", "for vocal_path in all_vocals_paths:\n", " if os.path.exists(vocal_path):\n", " shutil.copy(vocal_path, vocals_drive_folder)\n", "\n", "if mode == \"Splitting\":\n", " print(f\"Menyalin dataset yang sudah di-slice ke: {sliced_drive_folder}\")\n", " local_sliced_folder = f\"dataset/{project_name}\"\n", " for item in os.listdir(local_sliced_folder):\n", " s = os.path.join(local_sliced_folder, item)\n", " d = os.path.join(sliced_drive_folder, item)\n", " if os.path.isdir(s):\n", " shutil.copytree(s, d, dirs_exist_ok=True)\n", " else:\n", " shutil.copy2(s, d)\n", "\n", "# --- Cleanup ---\n", "shutil.rmtree(\"temp_audio_downloads\", ignore_errors=True)\n", "shutil.rmtree(\"chunks\", ignore_errors=True)\n", "shutil.rmtree(\"separated\", ignore_errors=True)\n", "if os.path.exists(\"chunks_list.txt\"):\n", " os.remove(\"chunks_list.txt\")\n", "\n", "print(\"\\nProses selesai!\")" ], "metadata": { "id": "0L7br10ouMlL", "cellView": "form" }, "execution_count": null, "outputs": [] } ] }