Spaces:
Paused
Paused
| import os | |
| import gradio as gr | |
| import zipfile | |
| import tempfile | |
| from zerorvc import prepare | |
| from datasets import load_dataset, load_from_disk | |
| from .constants import ROOT_EXP_DIR, BATCH_SIZE | |
| from .zero import zero | |
| from .model import accelerator | |
| def extract_audio_files(zip_file: str, target_dir: str) -> list[str]: | |
| with zipfile.ZipFile(zip_file, "r") as zip_ref: | |
| zip_ref.extractall(target_dir) | |
| audio_files = [ | |
| os.path.join(target_dir, f) | |
| for f in os.listdir(target_dir) | |
| if f.endswith((".wav", ".mp3", ".ogg")) | |
| ] | |
| if not audio_files: | |
| raise gr.Error("No audio files found at the top level of the zip file") | |
| return audio_files | |
| def make_dataset_from_zip(exp_dir: str, zip_file: str): | |
| if not exp_dir: | |
| exp_dir = tempfile.mkdtemp(dir=ROOT_EXP_DIR) | |
| print(f"Using exp dir: {exp_dir}") | |
| data_dir = os.path.join(exp_dir, "raw_data") | |
| if not os.path.exists(data_dir): | |
| os.makedirs(data_dir) | |
| extract_audio_files(zip_file, data_dir) | |
| ds = prepare( | |
| data_dir, | |
| accelerator=accelerator, | |
| batch_size=BATCH_SIZE, | |
| stage=1, | |
| ) | |
| return exp_dir, str(ds) | |
| def make_dataset_from_zip_stage_2(exp_dir: str): | |
| data_dir = os.path.join(exp_dir, "raw_data") | |
| ds = prepare( | |
| data_dir, | |
| accelerator=accelerator, | |
| batch_size=BATCH_SIZE, | |
| stage=2, | |
| ) | |
| return exp_dir, str(ds) | |
| def make_dataset_from_zip_stage_3(exp_dir: str): | |
| data_dir = os.path.join(exp_dir, "raw_data") | |
| ds = prepare( | |
| data_dir, | |
| accelerator=accelerator, | |
| batch_size=BATCH_SIZE, | |
| stage=3, | |
| ) | |
| dataset = os.path.join(exp_dir, "dataset") | |
| ds.save_to_disk(dataset) | |
| return exp_dir, str(ds) | |
| def make_dataset_from_repo(repo: str, hf_token: str): | |
| ds = load_dataset(repo, token=hf_token) | |
| ds = prepare( | |
| ds, | |
| accelerator=accelerator, | |
| batch_size=BATCH_SIZE, | |
| stage=1, | |
| ) | |
| return str(ds) | |
| def make_dataset_from_repo_stage_2(repo: str, hf_token: str): | |
| ds = load_dataset(repo, token=hf_token) | |
| ds = prepare( | |
| ds, | |
| accelerator=accelerator, | |
| batch_size=BATCH_SIZE, | |
| stage=2, | |
| ) | |
| return str(ds) | |
| def make_dataset_from_repo_stage_3(exp_dir: str, repo: str, hf_token: str): | |
| ds = load_dataset(repo, token=hf_token) | |
| ds = prepare( | |
| ds, | |
| accelerator=accelerator, | |
| batch_size=BATCH_SIZE, | |
| stage=3, | |
| ) | |
| if not exp_dir: | |
| exp_dir = tempfile.mkdtemp(dir=ROOT_EXP_DIR) | |
| print(f"Using exp dir: {exp_dir}") | |
| dataset = os.path.join(exp_dir, "dataset") | |
| ds.save_to_disk(dataset) | |
| return exp_dir, str(ds) | |
| def use_dataset(exp_dir: str, repo: str, hf_token: str): | |
| gr.Info("Fetching dataset") | |
| ds = load_dataset(repo, token=hf_token) | |
| if not exp_dir: | |
| exp_dir = tempfile.mkdtemp(dir=ROOT_EXP_DIR) | |
| print(f"Using exp dir: {exp_dir}") | |
| dataset = os.path.join(exp_dir, "dataset") | |
| ds.save_to_disk(dataset) | |
| return exp_dir, str(ds) | |
| def upload_dataset(exp_dir: str, repo: str, hf_token: str): | |
| dataset = os.path.join(exp_dir, "dataset") | |
| if not os.path.exists(dataset): | |
| raise gr.Error("Dataset not found") | |
| gr.Info("Uploading dataset") | |
| ds = load_from_disk(dataset) | |
| ds.push_to_hub(repo, token=hf_token, private=True) | |
| gr.Info("Dataset uploaded successfully") | |
| class DatasetTab: | |
| def __init__(self): | |
| pass | |
| def ui(self): | |
| gr.Markdown("# Dataset") | |
| gr.Markdown("The suggested dataset size is > 5 minutes of audio.") | |
| gr.Markdown("## Create Dataset from ZIP") | |
| gr.Markdown( | |
| "Create a dataset by simply upload a zip file containing audio files. The audio files should be at the top level of the zip file." | |
| ) | |
| with gr.Row(): | |
| self.zip_file = gr.File( | |
| label="Upload a zip file containing audio files", | |
| file_types=["zip"], | |
| ) | |
| self.make_ds_from_dir = gr.Button( | |
| value="Create Dataset from ZIP", variant="primary" | |
| ) | |
| gr.Markdown("## Create Dataset from Dataset Repository") | |
| gr.Markdown( | |
| "You can also create a dataset from any Hugging Face dataset repository that has 'audio' column." | |
| ) | |
| with gr.Row(): | |
| self.repo = gr.Textbox( | |
| label="Hugging Face Dataset Repository", | |
| placeholder="username/dataset-name", | |
| ) | |
| self.make_ds_from_repo = gr.Button( | |
| value="Create Dataset from Repo", variant="primary" | |
| ) | |
| gr.Markdown("## Sync Preprocessed Dataset") | |
| gr.Markdown( | |
| "After you have preprocessed the dataset, you can upload the dataset to Hugging Face. And fetch it back later directly." | |
| ) | |
| with gr.Row(): | |
| self.preprocessed_repo = gr.Textbox( | |
| label="Hugging Face Dataset Repository", | |
| placeholder="username/dataset-name", | |
| ) | |
| self.fetch_ds = gr.Button(value="Fetch Dataset", variant="primary") | |
| self.upload_ds = gr.Button(value="Upload Dataset", variant="primary") | |
| self.ds_state = gr.Textbox(label="Dataset Info", lines=5) | |
| def build(self, exp_dir: gr.Textbox, hf_token: gr.Textbox): | |
| self.make_ds_from_dir.click( | |
| fn=make_dataset_from_zip, | |
| inputs=[exp_dir, self.zip_file], | |
| outputs=[exp_dir, self.ds_state], | |
| ).success( | |
| fn=make_dataset_from_zip_stage_2, | |
| inputs=[exp_dir], | |
| outputs=[exp_dir, self.ds_state], | |
| ).success( | |
| fn=make_dataset_from_zip_stage_3, | |
| inputs=[exp_dir], | |
| outputs=[exp_dir, self.ds_state], | |
| ) | |
| self.make_ds_from_repo.click( | |
| fn=make_dataset_from_repo, | |
| inputs=[self.repo, hf_token], | |
| outputs=[self.ds_state], | |
| ).success( | |
| fn=make_dataset_from_repo_stage_2, | |
| inputs=[self.repo, hf_token], | |
| outputs=[self.ds_state], | |
| ).success( | |
| fn=make_dataset_from_repo_stage_3, | |
| inputs=[exp_dir, self.repo, hf_token], | |
| outputs=[exp_dir, self.ds_state], | |
| ) | |
| self.fetch_ds.click( | |
| fn=use_dataset, | |
| inputs=[exp_dir, self.preprocessed_repo, hf_token], | |
| outputs=[exp_dir, self.ds_state], | |
| ) | |
| self.upload_ds.click( | |
| fn=upload_dataset, | |
| inputs=[exp_dir, self.preprocessed_repo, hf_token], | |
| outputs=[], | |
| ) | |