opd_zt / scripts /extract_archives.py
sdzt's picture
Add files using upload-large-folder tool
bf46e5d verified
Raw
History Blame Contribute Delete
3.4 kB
"""
Extract TB zip archives and LV178K tar.gz archives into ./videos/ subdirs.
Idempotent: skips already-extracted dirs (presence-based, not hash).
For TB: data/raw/tb/long_video_*.zip -> data/videos/tb/long_video/{ActivityNet,COIN,...}/...
For LV178K: data/raw/lv178k/<split>/*.tar.gz -> data/videos/lv178k/<split>/...
"""
from __future__ import annotations
import argparse
import os
import sys
import tarfile
import time
import zipfile
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed
ROOT = Path("/mnt/local-fast/opd_zt")
RAW = ROOT / "data" / "raw"
VIDEOS = ROOT / "data" / "videos"
def log(msg: str) -> None:
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {msg}", flush=True)
def extract_zip(zip_path: Path, dest: Path) -> str:
marker = dest / f".done_{zip_path.name}"
if marker.exists():
return f"SKIP {zip_path.name}"
dest.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path) as zf:
zf.extractall(dest)
marker.touch()
return f"OK {zip_path.name}"
def extract_tar(tar_path: Path, dest: Path) -> str:
marker = dest / f".done_{tar_path.name}"
if marker.exists():
return f"SKIP {tar_path.name}"
dest.mkdir(parents=True, exist_ok=True)
# Try detecting mode; LV178K uses .tar.gz
mode = "r:gz" if tar_path.name.endswith((".tar.gz", ".tgz")) else "r:*"
with tarfile.open(tar_path, mode) as tf:
tf.extractall(dest, filter="data")
marker.touch()
return f"OK {tar_path.name}"
def find_lv178k_archives() -> list[Path]:
base = RAW / "lv178k"
if not base.exists():
return []
# Anything ending in .tar.gz, .tgz, or .gz
return sorted(
p for p in base.rglob("*")
if p.is_file() and p.name.endswith((".tar.gz", ".tgz"))
)
def main() -> None:
p = argparse.ArgumentParser()
p.add_argument("--workers", type=int, default=4)
p.add_argument("--only", choices=["tb", "lv178k", "all"], default="all")
args = p.parse_args()
jobs: list[tuple] = [] # (kind, path, dest)
if args.only in ("tb", "all"):
tb_dest = VIDEOS / "tb"
for z in sorted((RAW / "tb").glob("long_video_*.zip")):
jobs.append(("zip", z, tb_dest))
if args.only in ("lv178k", "all"):
for t in find_lv178k_archives():
# Keep split subdir
rel = t.parent.relative_to(RAW / "lv178k")
jobs.append(("tar", t, VIDEOS / "lv178k" / rel))
log(f"queued {len(jobs)} extraction jobs with {args.workers} workers")
if not jobs:
log("nothing to do.")
return
with ProcessPoolExecutor(max_workers=args.workers) as ex:
fut_to_job = {}
for kind, path, dest in jobs:
fn = extract_zip if kind == "zip" else extract_tar
fut = ex.submit(fn, path, dest)
fut_to_job[fut] = (kind, path)
done = 0
for fut in as_completed(fut_to_job):
kind, path = fut_to_job[fut]
try:
msg = fut.result()
except Exception as e:
msg = f"FAIL {path.name}: {type(e).__name__}: {e}"
done += 1
log(f" [{done}/{len(jobs)}] {msg}")
log("ALL DONE")
if __name__ == "__main__":
try:
main()
except Exception:
import traceback
traceback.print_exc()
sys.exit(1)