| | import os |
| | import json |
| | import base64 |
| | import argparse |
| | from pathlib import Path |
| | try: |
| | from datasets import load_dataset |
| | except ImportError: |
| | import subprocess |
| | import sys |
| | subprocess.check_call([sys.executable, "-m", "pip", "install", "datasets==2.19.1", "soundfile", "librosa", "huggingface_hub"]) |
| | from datasets import load_dataset |
| |
|
| | def main(): |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument("--output", required=True) |
| | args = parser.parse_args() |
| |
|
| | |
| | out_dir = Path(args.output) |
| | out_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | results = [] |
| | |
| | |
| | token = os.environ.get("HF_TOKEN") |
| | if not token: |
| | print("โ ๏ธ Warning: No HF_TOKEN found in environment. Common Voice 17.0 might fail because it is gated.", flush=True) |
| |
|
| | sources = [ |
| | {"name": "CommonVoice", "path": "mozilla-foundation/common_voice_17_0", "config": "wo", "split": "test", "limit": 25}, |
| | {"name": "FLEURS", "path": "google/fleurs", "config": "wo_sn", "split": "test", "limit": 25} |
| | ] |
| |
|
| | for source in sources: |
| | print(f"\n=> Loading {source['name']} ({source['path']} - {source['config']}) limit {source['limit']}...", flush=True) |
| | try: |
| | |
| | ds = load_dataset(source["path"], source["config"], split=source["split"], streaming=True, token=token, trust_remote_code=True) |
| | |
| | count = 0 |
| | for row in ds: |
| | if count >= source["limit"]: |
| | break |
| | |
| | |
| | audio_array = None |
| | sampling_rate = None |
| | original_text = "" |
| | |
| | if "audio" in row and row["audio"] is not None: |
| | audio_dict = row["audio"] |
| | if "array" in audio_dict: |
| | audio_array = audio_dict["array"] |
| | sampling_rate = audio_dict.get("sampling_rate", 16000) |
| | |
| | if "sentence" in row: |
| | original_text = row["sentence"] |
| | elif "transcription" in row: |
| | original_text = row["transcription"] |
| | elif "text" in row: |
| | original_text = row["text"] |
| | elif "raw_transcription" in row: |
| | original_text = row["raw_transcription"] |
| |
|
| | if audio_array is not None: |
| | import soundfile as sf |
| | from io import BytesIO |
| | |
| | buf = BytesIO() |
| | sf.write(buf, audio_array, sampling_rate, format='WAV') |
| | wav_data = buf.getvalue() |
| | b64_audio = base64.b64encode(wav_data).decode('utf-8') |
| | |
| | results.append({ |
| | "source": source["name"], |
| | "original_text": original_text, |
| | "audio_base64": b64_audio |
| | }) |
| | count += 1 |
| | if count % 5 == 0: |
| | print(f"Downloaded {count}/{source['limit']} from {source['name']}", flush=True) |
| | |
| | print(f"โ
Success for {source['name']}: {count} samples.", flush=True) |
| | |
| | except Exception as e: |
| | print(f"โ Failed to load {source['name']}: {str(e)}", flush=True) |
| |
|
| | |
| | out_file = out_dir / "hf_samples.json" |
| | with open(out_file, "w") as f: |
| | json.dump(results, f) |
| | |
| | print(f"\n๐ Finished fetching. Saved {len(results)} total samples to {out_file}", flush=True) |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|