| """ |
| Extract TB zip archives and LV178K tar.gz archives into ./videos/ subdirs. |
| Idempotent: skips already-extracted dirs (presence-based, not hash). |
| |
| For TB: data/raw/tb/long_video_*.zip -> data/videos/tb/long_video/{ActivityNet,COIN,...}/... |
| For LV178K: data/raw/lv178k/<split>/*.tar.gz -> data/videos/lv178k/<split>/... |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import os |
| import sys |
| import tarfile |
| import time |
| import zipfile |
| from pathlib import Path |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
|
|
| ROOT = Path("/mnt/local-fast/opd_zt") |
| RAW = ROOT / "data" / "raw" |
| VIDEOS = ROOT / "data" / "videos" |
|
|
|
|
| def log(msg: str) -> None: |
| print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {msg}", flush=True) |
|
|
|
|
| def extract_zip(zip_path: Path, dest: Path) -> str: |
| marker = dest / f".done_{zip_path.name}" |
| if marker.exists(): |
| return f"SKIP {zip_path.name}" |
| dest.mkdir(parents=True, exist_ok=True) |
| with zipfile.ZipFile(zip_path) as zf: |
| zf.extractall(dest) |
| marker.touch() |
| return f"OK {zip_path.name}" |
|
|
|
|
| def extract_tar(tar_path: Path, dest: Path) -> str: |
| marker = dest / f".done_{tar_path.name}" |
| if marker.exists(): |
| return f"SKIP {tar_path.name}" |
| dest.mkdir(parents=True, exist_ok=True) |
| |
| mode = "r:gz" if tar_path.name.endswith((".tar.gz", ".tgz")) else "r:*" |
| with tarfile.open(tar_path, mode) as tf: |
| tf.extractall(dest, filter="data") |
| marker.touch() |
| return f"OK {tar_path.name}" |
|
|
|
|
| def find_lv178k_archives() -> list[Path]: |
| base = RAW / "lv178k" |
| if not base.exists(): |
| return [] |
| |
| return sorted( |
| p for p in base.rglob("*") |
| if p.is_file() and p.name.endswith((".tar.gz", ".tgz")) |
| ) |
|
|
|
|
| def main() -> None: |
| p = argparse.ArgumentParser() |
| p.add_argument("--workers", type=int, default=4) |
| p.add_argument("--only", choices=["tb", "lv178k", "all"], default="all") |
| args = p.parse_args() |
|
|
| jobs: list[tuple] = [] |
|
|
| if args.only in ("tb", "all"): |
| tb_dest = VIDEOS / "tb" |
| for z in sorted((RAW / "tb").glob("long_video_*.zip")): |
| jobs.append(("zip", z, tb_dest)) |
|
|
| if args.only in ("lv178k", "all"): |
| for t in find_lv178k_archives(): |
| |
| rel = t.parent.relative_to(RAW / "lv178k") |
| jobs.append(("tar", t, VIDEOS / "lv178k" / rel)) |
|
|
| log(f"queued {len(jobs)} extraction jobs with {args.workers} workers") |
| if not jobs: |
| log("nothing to do.") |
| return |
|
|
| with ProcessPoolExecutor(max_workers=args.workers) as ex: |
| fut_to_job = {} |
| for kind, path, dest in jobs: |
| fn = extract_zip if kind == "zip" else extract_tar |
| fut = ex.submit(fn, path, dest) |
| fut_to_job[fut] = (kind, path) |
| done = 0 |
| for fut in as_completed(fut_to_job): |
| kind, path = fut_to_job[fut] |
| try: |
| msg = fut.result() |
| except Exception as e: |
| msg = f"FAIL {path.name}: {type(e).__name__}: {e}" |
| done += 1 |
| log(f" [{done}/{len(jobs)}] {msg}") |
|
|
| log("ALL DONE") |
|
|
|
|
| if __name__ == "__main__": |
| try: |
| main() |
| except Exception: |
| import traceback |
| traceback.print_exc() |
| sys.exit(1) |
|
|