43.oT_eV / Meissonic /train /fix_extrach.py
BryanW's picture
Upload code from /mnt/43.oT_eV
c2925de verified
#!/usr/bin/env python3
import os, json, glob, csv
from pathlib import Path
OUTDIR = "/mnt/VideoGen/dataset/OpenVid1M/extracted_features_17_256_256_full_set"
MERGED = f"{OUTDIR}/metadata.json"
CSV_PATH = "/mnt/VideoGen/dataset/OpenVid1M/video_reorg/OpenVid1M_reorganized.csv"
WORLD = 8
def safe_json_load(p):
try:
with open(p, "r") as f:
return json.load(f)
except Exception:
return None
def count_csv_rows(csv_path):
# robust row count excluding header
n = 0
with open(csv_path, "r", newline="") as f:
reader = csv.reader(f)
header = next(reader, None)
for _ in reader:
n += 1
return n
def pick_header_source():
# prefer resume files
cand = []
for r in range(WORLD):
p = f"{OUTDIR}/metadata_process_{r}.resume.json"
if os.path.exists(p):
cand.append(p)
if cand:
return cand[0]
# fallback to old
for r in range(WORLD):
p = f"{OUTDIR}/metadata_process_{r}.json"
if os.path.exists(p):
# old may be truncated; but header is at top so json.load may still fail.
# try a few ranks
cand.append(p)
for p in cand:
m = safe_json_load(p)
if m is not None:
return p
return None
def main():
merged = safe_json_load(MERGED)
assert merged is not None, f"Cannot load {MERGED}"
samples = merged.get("samples", [])
n_list = len(samples)
idxs = [s.get("index") for s in samples if isinstance(s, dict)]
idxs_int = [int(x) for x in idxs if x is not None]
n_unique = len(set(idxs_int))
n_dup = n_list - n_unique
print("---- Current merged metadata.json ----")
print(f"samples list length = {n_list:,}")
print(f"unique indices = {n_unique:,}")
print(f"duplicates in samples list = {n_dup:,}")
n_csv = count_csv_rows(CSV_PATH)
print("---- CSV ----")
print(f"CSV rows (expected total) = {n_csv:,}")
header_src = pick_header_source()
header = {}
if header_src is not None:
src = safe_json_load(header_src)
if src is not None:
header = src
print(f"---- Header source ----\n{header_src}")
else:
print("---- Header source ----\nNone (json.load failed), will use minimal header")
else:
print("---- Header source ----\nNone found, will use minimal header")
# build new header like your 128-version format
new_meta = {
"num_samples_original": int(header.get("num_samples_original", n_csv)),
"resume_from_index": int(header.get("resume_from_index", 0)),
"num_samples_this_run": int(header.get("num_samples_this_run", n_csv)),
"num_attempted": int(header.get("num_attempted", n_unique)), # fallback
"num_extracted": int(header.get("num_extracted", n_unique)), # fallback
"num_failed": int(header.get("num_failed", 0)),
"num_processes": int(header.get("num_processes", WORLD)),
"ranks_seen": header.get("ranks_seen", list(range(WORLD))),
"world_size_used": int(header.get("world_size_used", WORLD)),
"extract_video": bool(header.get("extract_video", True)),
"extract_text": bool(header.get("extract_text", True)),
"text_encoder_architecture": header.get("text_encoder_architecture", "umt5-xxl"),
"video_tokenizer_model_id": header.get("video_tokenizer_model_id", "Cosmos-0.1-Tokenizer-DV4x8x8"),
"codebook_size": header.get("codebook_size", 64000),
"mask_token_id": header.get("mask_token_id", 64000),
"num_frames": int(header.get("num_frames", 17)),
"video_height": int(header.get("video_height", 256)),
"video_width": int(header.get("video_width", 256)),
"prompt_prefix": header.get("prompt_prefix", None),
"text_dtype": header.get("text_dtype", "bf16"),
"save_attention_mask": bool(header.get("save_attention_mask", True)),
"empty_embeds_shape": header.get("empty_embeds_shape", [1, 512, 4096]),
"empty_embeds_path": header.get("empty_embeds_path", "empty_embeds.npy"),
"samples": samples,
}
# backup & write
bak = f"{MERGED}.bak"
if os.path.exists(MERGED) and not os.path.exists(bak):
os.replace(MERGED, bak)
print(f"Backup: {MERGED} -> {bak}")
tmp = f"{MERGED}.tmp"
with open(tmp, "w") as f:
json.dump(new_meta, f, indent=2)
os.replace(tmp, MERGED)
print(f"Wrote: {MERGED}")
if __name__ == "__main__":
main()