duarteocarmo/voice / split_samples.py
duarteocarmo's picture
download
raw
4.03 kB
# /// script
# requires-python = ">=3.12"
# dependencies = ["tqdm"]
# ///
"""Split recordings into WAV/TXT samples using JSON segments."""
import json
import shutil
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm
DATA_DIR = Path(__file__).parent
OUTPUT_DIR = DATA_DIR / "samples"
MIN_DURATION = 1.0
MAX_DURATION = 30.0
PADDING = 0.05
def parse_timestamp_to_seconds(timestamp: str) -> tuple[float, float]:
start_text, end_text = [part.strip() for part in timestamp.split("-", 1)]
return parse_single_time(time_text=start_text), parse_single_time(time_text=end_text)
def parse_single_time(*, time_text: str) -> float:
parts = time_text.split(":")
if len(parts) == 2:
minutes, seconds = parts
return int(minutes) * 60 + float(seconds)
if len(parts) == 3:
hours, minutes, seconds = parts
return int(hours) * 3600 + int(minutes) * 60 + float(seconds)
raise ValueError(f"Unsupported timestamp format: {time_text}")
def get_jobs(*, audio_path: Path, transcript_path: Path, recording_index: int) -> list[tuple[float, float, str, Path, Path]]:
segments = json.loads(transcript_path.read_text())
jobs = []
kept_segments = 0
for segment in segments:
text = segment.get("text", "").strip()
if not text:
continue
start_time, end_time = parse_timestamp_to_seconds(timestamp=segment["timestamp"])
start_time = max(0.0, start_time - PADDING)
end_time = end_time + PADDING
duration = end_time - start_time
if duration < MIN_DURATION or duration > MAX_DURATION:
continue
kept_segments += 1
sample_id = f"recording_{recording_index:03d}_s{kept_segments:03d}"
wav_path = OUTPUT_DIR / f"{sample_id}.wav"
txt_path = OUTPUT_DIR / f"{sample_id}.txt"
jobs.append((start_time, duration, text, wav_path, txt_path))
return jobs
def extract_one(*, audio_path: Path, start_time: float, duration: float, text: str, wav_path: Path, txt_path: Path) -> None:
subprocess.run(
args=[
"ffmpeg",
"-y",
"-ss",
str(start_time),
"-i",
str(audio_path),
"-t",
str(duration),
"-vn",
"-acodec",
"pcm_s16le",
"-ac",
"1",
"-ar",
"44100",
str(wav_path),
],
check=True,
capture_output=True,
)
txt_path.write_text(text)
def main() -> None:
if OUTPUT_DIR.exists():
shutil.rmtree(OUTPUT_DIR)
OUTPUT_DIR.mkdir(parents=True)
recordings = sorted(DATA_DIR.glob("recording_*.m4a"))
total_samples = 0
for index, audio_path in enumerate(recordings, start=1):
transcript_path = audio_path.with_suffix(".json")
if not transcript_path.exists():
continue
jobs = get_jobs(audio_path=audio_path, transcript_path=transcript_path, recording_index=index)
print(f"{audio_path.name}: {len(jobs)} samples")
workers = min(8, len(jobs) or 1)
progress = tqdm(total=len(jobs), desc=audio_path.stem, leave=True)
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = [
pool.submit(
extract_one,
audio_path=audio_path,
start_time=start_time,
duration=duration,
text=text,
wav_path=wav_path,
txt_path=txt_path,
)
for start_time, duration, text, wav_path, txt_path in jobs
]
for future in as_completed(futures):
future.result()
progress.update(1)
progress.close()
total_samples += len(jobs)
print(f"Done: {total_samples} samples saved to {OUTPUT_DIR}")
if __name__ == "__main__":
main()

Xet Storage Details

Size:
4.03 kB
·
Xet hash:
6ad603aa721b61f1bd920e88588cfb38c9c12e8656182df56d8e186f78f27b0a

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.