Buckets:
| # /// script | |
| # requires-python = ">=3.12" | |
| # dependencies = ["tqdm"] | |
| # /// | |
| """Split recordings into WAV/TXT samples using JSON segments.""" | |
| import json | |
| import shutil | |
| import subprocess | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from pathlib import Path | |
| from tqdm import tqdm | |
| DATA_DIR = Path(__file__).parent | |
| OUTPUT_DIR = DATA_DIR / "samples" | |
| MIN_DURATION = 1.0 | |
| MAX_DURATION = 30.0 | |
| PADDING = 0.05 | |
| def parse_timestamp_to_seconds(timestamp: str) -> tuple[float, float]: | |
| start_text, end_text = [part.strip() for part in timestamp.split("-", 1)] | |
| return parse_single_time(time_text=start_text), parse_single_time(time_text=end_text) | |
| def parse_single_time(*, time_text: str) -> float: | |
| parts = time_text.split(":") | |
| if len(parts) == 2: | |
| minutes, seconds = parts | |
| return int(minutes) * 60 + float(seconds) | |
| if len(parts) == 3: | |
| hours, minutes, seconds = parts | |
| return int(hours) * 3600 + int(minutes) * 60 + float(seconds) | |
| raise ValueError(f"Unsupported timestamp format: {time_text}") | |
| def get_jobs(*, audio_path: Path, transcript_path: Path, recording_index: int) -> list[tuple[float, float, str, Path, Path]]: | |
| segments = json.loads(transcript_path.read_text()) | |
| jobs = [] | |
| kept_segments = 0 | |
| for segment in segments: | |
| text = segment.get("text", "").strip() | |
| if not text: | |
| continue | |
| start_time, end_time = parse_timestamp_to_seconds(timestamp=segment["timestamp"]) | |
| start_time = max(0.0, start_time - PADDING) | |
| end_time = end_time + PADDING | |
| duration = end_time - start_time | |
| if duration < MIN_DURATION or duration > MAX_DURATION: | |
| continue | |
| kept_segments += 1 | |
| sample_id = f"recording_{recording_index:03d}_s{kept_segments:03d}" | |
| wav_path = OUTPUT_DIR / f"{sample_id}.wav" | |
| txt_path = OUTPUT_DIR / f"{sample_id}.txt" | |
| jobs.append((start_time, duration, text, wav_path, txt_path)) | |
| return jobs | |
| def extract_one(*, audio_path: Path, start_time: float, duration: float, text: str, wav_path: Path, txt_path: Path) -> None: | |
| subprocess.run( | |
| args=[ | |
| "ffmpeg", | |
| "-y", | |
| "-ss", | |
| str(start_time), | |
| "-i", | |
| str(audio_path), | |
| "-t", | |
| str(duration), | |
| "-vn", | |
| "-acodec", | |
| "pcm_s16le", | |
| "-ac", | |
| "1", | |
| "-ar", | |
| "44100", | |
| str(wav_path), | |
| ], | |
| check=True, | |
| capture_output=True, | |
| ) | |
| txt_path.write_text(text) | |
| def main() -> None: | |
| if OUTPUT_DIR.exists(): | |
| shutil.rmtree(OUTPUT_DIR) | |
| OUTPUT_DIR.mkdir(parents=True) | |
| recordings = sorted(DATA_DIR.glob("recording_*.m4a")) | |
| total_samples = 0 | |
| for index, audio_path in enumerate(recordings, start=1): | |
| transcript_path = audio_path.with_suffix(".json") | |
| if not transcript_path.exists(): | |
| continue | |
| jobs = get_jobs(audio_path=audio_path, transcript_path=transcript_path, recording_index=index) | |
| print(f"{audio_path.name}: {len(jobs)} samples") | |
| workers = min(8, len(jobs) or 1) | |
| progress = tqdm(total=len(jobs), desc=audio_path.stem, leave=True) | |
| with ThreadPoolExecutor(max_workers=workers) as pool: | |
| futures = [ | |
| pool.submit( | |
| extract_one, | |
| audio_path=audio_path, | |
| start_time=start_time, | |
| duration=duration, | |
| text=text, | |
| wav_path=wav_path, | |
| txt_path=txt_path, | |
| ) | |
| for start_time, duration, text, wav_path, txt_path in jobs | |
| ] | |
| for future in as_completed(futures): | |
| future.result() | |
| progress.update(1) | |
| progress.close() | |
| total_samples += len(jobs) | |
| print(f"Done: {total_samples} samples saved to {OUTPUT_DIR}") | |
| if __name__ == "__main__": | |
| main() | |
Xet Storage Details
- Size:
- 4.03 kB
- Xet hash:
- 6ad603aa721b61f1bd920e88588cfb38c9c12e8656182df56d8e186f78f27b0a
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.