| |
| """Audit Ref-AVS style metadata for the TubeToken experiment plan. |
| |
| This script intentionally depends only on the dataset files. It does not import |
| the training code, so it can run before model dependencies are fully settled. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import csv |
| import json |
| import math |
| import os |
| from collections import Counter, defaultdict |
| from pathlib import Path |
| from statistics import mean, median |
| from typing import Dict, Iterable, List, Optional, Tuple |
|
|
| try: |
| from PIL import Image |
| except Exception: |
| Image = None |
|
|
|
|
| AUDIO_KEYWORDS = ( |
| "sound", |
| "sounding", |
| "making sound", |
| "longest sound", |
| "intermittent sound", |
| "silent", |
| "audio", |
| "heard", |
| "emitting", |
| "playing instrument", |
| "voice", |
| "speaking", |
| "talking", |
| "singing", |
| "barking", |
| "meowing", |
| "hitting", |
| ) |
|
|
| SPATIAL_KEYWORDS = ( |
| "left", |
| "right", |
| "top", |
| "bottom", |
| "front", |
| "back", |
| "behind", |
| "next to", |
| "near", |
| "far", |
| "middle", |
| "center", |
| "between", |
| "above", |
| "below", |
| "under", |
| ) |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Audit Ref-AVS data for TubeToken Phase -1.") |
| parser.add_argument("--data_dir", type=Path, default=Path("data")) |
| parser.add_argument("--out_dir", type=Path, default=Path("runs/tubetoken_phase_minus1/audit")) |
| parser.add_argument("--frames", type=int, default=10) |
| parser.add_argument("--small_area", type=float, default=0.05) |
| parser.add_argument("--mask_sample_limit", type=int, default=0, help="0 means audit every row.") |
| return parser.parse_args() |
|
|
|
|
| def read_metadata(path: Path) -> List[dict]: |
| with path.open("r", newline="") as f: |
| return list(csv.DictReader(f)) |
|
|
|
|
| def video_id(row: dict) -> str: |
| return row.get("vid") or row["uid"].rsplit("_", 2)[0] |
|
|
|
|
| def fid_value(row: dict) -> str: |
| return str(row.get("fid", "")).strip() |
|
|
|
|
| def object_key(row: dict) -> Tuple[str, str]: |
| return video_id(row), fid_value(row) |
|
|
|
|
| def category_from_uid(row: dict) -> str: |
| vid = video_id(row) |
| uid = row.get("uid", "") |
| suffix = uid[len(vid) + 1 :] if uid.startswith(vid + "_") else uid.rsplit("_", 2)[-2] |
| if "_" in suffix: |
| return suffix.rsplit("_", 1)[0] |
| return suffix |
|
|
|
|
| def has_any(text: str, keywords: Iterable[str]) -> bool: |
| text = text.lower() |
| return any(k in text for k in keywords) |
|
|
|
|
| def mask_path(data_dir: Path, vid: str, fid: str, t: int) -> Path: |
| return data_dir / "gt_mask" / vid / f"fid_{fid}" / f"0000{t}.png" |
|
|
|
|
| def read_binary_mask_stats(path: Path) -> Optional[Tuple[int, int, int]]: |
| if Image is None or not path.exists(): |
| return None |
| with Image.open(path) as img: |
| gray = img.convert("L") |
| width, height = gray.size |
| hist = gray.histogram() |
| positive = sum(hist[1:]) |
| return positive, width, height |
|
|
|
|
| def row_mask_stats(data_dir: Path, row: dict, frames: int, small_area: float) -> dict: |
| vid = video_id(row) |
| fid = fid_value(row) |
| positives: List[int] = [] |
| areas: List[float] = [] |
| missing = 0 |
| width = height = None |
|
|
| for t in range(frames): |
| stats = read_binary_mask_stats(mask_path(data_dir, vid, fid, t)) |
| if stats is None: |
| missing += 1 |
| positives.append(0) |
| areas.append(0.0) |
| continue |
| pos, width, height = stats |
| positives.append(pos) |
| denom = max(width * height, 1) |
| areas.append(pos / denom) |
|
|
| visible = [i for i, pos in enumerate(positives) if pos > 0] |
| visible_areas = [areas[i] for i in visible] |
| first_visible = min(visible) if visible else None |
| mean_visible_area = mean(visible_areas) if visible_areas else 0.0 |
| mean_all_area = mean(areas) if areas else 0.0 |
| area_cv = 0.0 |
| if len(visible_areas) > 1 and mean_visible_area > 0: |
| var = sum((x - mean_visible_area) ** 2 for x in visible_areas) / len(visible_areas) |
| area_cv = math.sqrt(var) / mean_visible_area |
|
|
| return { |
| "visible_frames": len(visible), |
| "visible_ratio": len(visible) / frames, |
| "first_visible": first_visible, |
| "late_target": first_visible is not None and first_visible > 0.5 * frames, |
| "mean_visible_area": mean_visible_area, |
| "mean_all_area": mean_all_area, |
| "small_target": mean_visible_area > 0 and mean_visible_area < small_area, |
| "partial_target": 0 < len(visible) < 0.5 * frames, |
| "area_cv": area_cv, |
| "area_unstable": area_cv >= 1.0, |
| "missing_masks": missing, |
| "width": width, |
| "height": height, |
| } |
|
|
|
|
| def pct(num: int, den: int) -> float: |
| return 0.0 if den == 0 else 100.0 * num / den |
|
|
|
|
| def summarize_counts(values: List[int]) -> dict: |
| if not values: |
| return {"mean": 0, "median": 0, "max": 0, "ge2": 0, "ge3": 0} |
| return { |
| "mean": mean(values), |
| "median": median(values), |
| "max": max(values), |
| "ge2": sum(v >= 2 for v in values), |
| "ge3": sum(v >= 3 for v in values), |
| } |
|
|
|
|
| def write_csv(path: Path, rows: List[dict], fieldnames: List[str]) -> None: |
| with path.open("w", newline="") as f: |
| writer = csv.DictWriter(f, fieldnames=fieldnames) |
| writer.writeheader() |
| for row in rows: |
| writer.writerow({k: row.get(k, "") for k in fieldnames}) |
|
|
|
|
| def main() -> None: |
| args = parse_args() |
| data_dir = args.data_dir |
| out_dir = args.out_dir |
| out_dir.mkdir(parents=True, exist_ok=True) |
|
|
| rows = read_metadata(data_dir / "metadata.csv") |
| if args.mask_sample_limit > 0: |
| mask_rows = rows[: args.mask_sample_limit] |
| else: |
| mask_rows = rows |
|
|
| by_split = Counter(row["split"] for row in rows) |
| by_video: Dict[str, List[dict]] = defaultdict(list) |
| by_object: Dict[Tuple[str, str], List[dict]] = defaultdict(list) |
| by_video_category: Dict[Tuple[str, str], set] = defaultdict(set) |
|
|
| enriched: List[dict] = [] |
| for row in rows: |
| vid = video_id(row) |
| fid = fid_value(row) |
| category = category_from_uid(row) |
| expr = row.get("exp", "") |
| row2 = dict(row) |
| row2["vid"] = vid |
| row2["fid"] = fid |
| row2["category"] = category |
| row2["is_null_split"] = row.get("split") == "test_n" |
| row2["is_audio_keyword"] = has_any(expr, AUDIO_KEYWORDS) |
| row2["is_spatial_keyword"] = has_any(expr, SPATIAL_KEYWORDS) |
| by_video[vid].append(row2) |
| by_object[(vid, fid)].append(row2) |
| by_video_category[(vid, category)].add(fid) |
| enriched.append(row2) |
|
|
| mask_stats_by_uid: Dict[str, dict] = {} |
| for row in mask_rows: |
| uid = row["uid"] |
| mask_stats_by_uid[uid] = row_mask_stats(data_dir, row, args.frames, args.small_area) |
|
|
| sample_rows: List[dict] = [] |
| for row in enriched: |
| stats = mask_stats_by_uid.get(row["uid"], {}) |
| same_cat_fids = by_video_category[(row["vid"], row["category"])] |
| row2 = dict(row) |
| row2.update(stats) |
| row2["same_category_distractor_heuristic"] = len(same_cat_fids) >= 2 |
| row2["multi_expr_video"] = len(by_video[row["vid"]]) >= 2 |
| row2["multi_expr_object"] = len(by_object[(row["vid"], row["fid"])]) >= 2 |
| row2["h3_candidate"] = row2["multi_expr_object"] and not row2["is_null_split"] |
| sample_rows.append(row2) |
|
|
| video_expr_counts = [len(v) for v in by_video.values()] |
| object_expr_counts = [len(v) for v in by_object.values()] |
| h3_objects = [k for k, v in by_object.items() if len(v) >= 2 and v[0]["split"] != "test_n"] |
| null_rows = [r for r in enriched if r["is_null_split"]] |
| audio_rows = [r for r in enriched if r["is_audio_keyword"]] |
| spatial_rows = [r for r in enriched if r["is_spatial_keyword"]] |
| same_cat_rows = [r for r in sample_rows if r.get("same_category_distractor_heuristic")] |
|
|
| audited_mask_rows = [r for r in sample_rows if "visible_ratio" in r] |
| late_rows = [r for r in audited_mask_rows if r.get("late_target")] |
| small_rows = [r for r in audited_mask_rows if r.get("small_target")] |
| partial_rows = [r for r in audited_mask_rows if r.get("partial_target")] |
| unstable_rows = [r for r in audited_mask_rows if r.get("area_unstable")] |
|
|
| summary = { |
| "data_dir": str(data_dir), |
| "num_expressions": len(rows), |
| "num_videos": len(by_video), |
| "num_objects_vid_fid": len(by_object), |
| "splits": dict(by_split), |
| "expressions_per_video": summarize_counts(video_expr_counts), |
| "expressions_per_object": summarize_counts(object_expr_counts), |
| "multi_expression_videos": sum(c >= 2 for c in video_expr_counts), |
| "multi_expression_objects": sum(c >= 2 for c in object_expr_counts), |
| "h3_candidate_objects": len(h3_objects), |
| "h3_candidate_expressions": sum(len(by_object[k]) for k in h3_objects), |
| "null_split_expressions": len(null_rows), |
| "null_split_percent": pct(len(null_rows), len(rows)), |
| "audio_keyword_expressions": len(audio_rows), |
| "audio_keyword_percent": pct(len(audio_rows), len(rows)), |
| "spatial_keyword_expressions": len(spatial_rows), |
| "spatial_keyword_percent": pct(len(spatial_rows), len(rows)), |
| "same_category_distractor_heuristic_expressions": len(same_cat_rows), |
| "same_category_distractor_heuristic_percent": pct(len(same_cat_rows), len(rows)), |
| "mask_rows_audited": len(audited_mask_rows), |
| "late_target_expressions": len(late_rows), |
| "small_target_expressions": len(small_rows), |
| "partial_target_expressions": len(partial_rows), |
| "area_unstable_expressions": len(unstable_rows), |
| } |
|
|
| with (out_dir / "audit_summary.json").open("w") as f: |
| json.dump(summary, f, indent=2, sort_keys=True) |
|
|
| fields = [ |
| "uid", |
| "vid", |
| "split", |
| "fid", |
| "category", |
| "exp", |
| "is_null_split", |
| "is_audio_keyword", |
| "is_spatial_keyword", |
| "multi_expr_video", |
| "multi_expr_object", |
| "h3_candidate", |
| "same_category_distractor_heuristic", |
| "visible_frames", |
| "visible_ratio", |
| "first_visible", |
| "late_target", |
| "mean_visible_area", |
| "mean_all_area", |
| "small_target", |
| "partial_target", |
| "area_cv", |
| "area_unstable", |
| "missing_masks", |
| "width", |
| "height", |
| ] |
| write_csv(out_dir / "audit_samples.csv", sample_rows, fields) |
|
|
| h3_rows = [r for r in sample_rows if r.get("h3_candidate")] |
| write_csv(out_dir / "h3_candidates.csv", h3_rows, fields) |
|
|
| md = [ |
| "# TubeToken Phase -1 Audit", |
| "", |
| f"- Expressions: {summary['num_expressions']}", |
| f"- Videos: {summary['num_videos']}", |
| f"- Objects `(vid, fid)`: {summary['num_objects_vid_fid']}", |
| f"- Splits: `{dict(by_split)}`", |
| "", |
| "## Multi-expression", |
| "", |
| f"- Expressions/video mean: {summary['expressions_per_video']['mean']:.3f}", |
| f"- Expressions/video median: {summary['expressions_per_video']['median']}", |
| f"- Videos with >=2 expressions: {summary['multi_expression_videos']}", |
| f"- Expressions/object mean: {summary['expressions_per_object']['mean']:.3f}", |
| f"- Objects with >=2 expressions: {summary['multi_expression_objects']}", |
| f"- H3 candidate objects: {summary['h3_candidate_objects']}", |
| f"- H3 candidate expressions: {summary['h3_candidate_expressions']}", |
| "", |
| "## Diagnostic Subsets", |
| "", |
| f"- Null split expressions: {summary['null_split_expressions']} ({summary['null_split_percent']:.2f}%)", |
| f"- Audio-keyword expressions: {summary['audio_keyword_expressions']} ({summary['audio_keyword_percent']:.2f}%)", |
| f"- Spatial-keyword expressions: {summary['spatial_keyword_expressions']} ({summary['spatial_keyword_percent']:.2f}%)", |
| f"- Same-category distractor heuristic expressions: {summary['same_category_distractor_heuristic_expressions']} ({summary['same_category_distractor_heuristic_percent']:.2f}%)", |
| f"- Mask rows audited: {summary['mask_rows_audited']}", |
| f"- Late-target expressions: {summary['late_target_expressions']}", |
| f"- Small-target expressions: {summary['small_target_expressions']}", |
| f"- Partial-target expressions: {summary['partial_target_expressions']}", |
| f"- Area-unstable expressions: {summary['area_unstable_expressions']}", |
| "", |
| "## Phase -1 H3 Decision Hint", |
| "", |
| ] |
| epv = summary["expressions_per_video"]["mean"] |
| if epv > 1.5 and summary["h3_candidate_objects"] > 0: |
| md.append("H3 can stay as a direct validation target: the data has multi-expression structure.") |
| elif summary["h3_candidate_objects"] > 0: |
| md.append("H3 should be treated as diagnostic: multi-expression objects exist, but average expressions/video is limited.") |
| else: |
| md.append("H3 should be downgraded: this audit did not find same-object multi-expression candidates.") |
| md.append("") |
| md.append("Generated files: `audit_summary.json`, `audit_samples.csv`, `h3_candidates.csv`.") |
|
|
| (out_dir / "audit_report.md").write_text("\n".join(md) + "\n") |
| print(json.dumps(summary, indent=2, sort_keys=True)) |
| print(f"\nWrote audit files to: {out_dir}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|