#!/usr/bin/env python3 """Audit Ref-AVS style metadata for the TubeToken experiment plan. This script intentionally depends only on the dataset files. It does not import the training code, so it can run before model dependencies are fully settled. """ from __future__ import annotations import argparse import csv import json import math import os from collections import Counter, defaultdict from pathlib import Path from statistics import mean, median from typing import Dict, Iterable, List, Optional, Tuple try: from PIL import Image except Exception: # pragma: no cover - only used as an environment fallback Image = None AUDIO_KEYWORDS = ( "sound", "sounding", "making sound", "longest sound", "intermittent sound", "silent", "audio", "heard", "emitting", "playing instrument", "voice", "speaking", "talking", "singing", "barking", "meowing", "hitting", ) SPATIAL_KEYWORDS = ( "left", "right", "top", "bottom", "front", "back", "behind", "next to", "near", "far", "middle", "center", "between", "above", "below", "under", ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Audit Ref-AVS data for TubeToken Phase -1.") parser.add_argument("--data_dir", type=Path, default=Path("data")) parser.add_argument("--out_dir", type=Path, default=Path("runs/tubetoken_phase_minus1/audit")) parser.add_argument("--frames", type=int, default=10) parser.add_argument("--small_area", type=float, default=0.05) parser.add_argument("--mask_sample_limit", type=int, default=0, help="0 means audit every row.") return parser.parse_args() def read_metadata(path: Path) -> List[dict]: with path.open("r", newline="") as f: return list(csv.DictReader(f)) def video_id(row: dict) -> str: return row.get("vid") or row["uid"].rsplit("_", 2)[0] def fid_value(row: dict) -> str: return str(row.get("fid", "")).strip() def object_key(row: dict) -> Tuple[str, str]: return video_id(row), fid_value(row) def category_from_uid(row: dict) -> str: vid = video_id(row) uid = row.get("uid", "") suffix = uid[len(vid) + 1 :] if uid.startswith(vid + "_") else uid.rsplit("_", 2)[-2] if "_" in suffix: return suffix.rsplit("_", 1)[0] return suffix def has_any(text: str, keywords: Iterable[str]) -> bool: text = text.lower() return any(k in text for k in keywords) def mask_path(data_dir: Path, vid: str, fid: str, t: int) -> Path: return data_dir / "gt_mask" / vid / f"fid_{fid}" / f"0000{t}.png" def read_binary_mask_stats(path: Path) -> Optional[Tuple[int, int, int]]: if Image is None or not path.exists(): return None with Image.open(path) as img: gray = img.convert("L") width, height = gray.size hist = gray.histogram() positive = sum(hist[1:]) return positive, width, height def row_mask_stats(data_dir: Path, row: dict, frames: int, small_area: float) -> dict: vid = video_id(row) fid = fid_value(row) positives: List[int] = [] areas: List[float] = [] missing = 0 width = height = None for t in range(frames): stats = read_binary_mask_stats(mask_path(data_dir, vid, fid, t)) if stats is None: missing += 1 positives.append(0) areas.append(0.0) continue pos, width, height = stats positives.append(pos) denom = max(width * height, 1) areas.append(pos / denom) visible = [i for i, pos in enumerate(positives) if pos > 0] visible_areas = [areas[i] for i in visible] first_visible = min(visible) if visible else None mean_visible_area = mean(visible_areas) if visible_areas else 0.0 mean_all_area = mean(areas) if areas else 0.0 area_cv = 0.0 if len(visible_areas) > 1 and mean_visible_area > 0: var = sum((x - mean_visible_area) ** 2 for x in visible_areas) / len(visible_areas) area_cv = math.sqrt(var) / mean_visible_area return { "visible_frames": len(visible), "visible_ratio": len(visible) / frames, "first_visible": first_visible, "late_target": first_visible is not None and first_visible > 0.5 * frames, "mean_visible_area": mean_visible_area, "mean_all_area": mean_all_area, "small_target": mean_visible_area > 0 and mean_visible_area < small_area, "partial_target": 0 < len(visible) < 0.5 * frames, "area_cv": area_cv, "area_unstable": area_cv >= 1.0, "missing_masks": missing, "width": width, "height": height, } def pct(num: int, den: int) -> float: return 0.0 if den == 0 else 100.0 * num / den def summarize_counts(values: List[int]) -> dict: if not values: return {"mean": 0, "median": 0, "max": 0, "ge2": 0, "ge3": 0} return { "mean": mean(values), "median": median(values), "max": max(values), "ge2": sum(v >= 2 for v in values), "ge3": sum(v >= 3 for v in values), } def write_csv(path: Path, rows: List[dict], fieldnames: List[str]) -> None: with path.open("w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow({k: row.get(k, "") for k in fieldnames}) def main() -> None: args = parse_args() data_dir = args.data_dir out_dir = args.out_dir out_dir.mkdir(parents=True, exist_ok=True) rows = read_metadata(data_dir / "metadata.csv") if args.mask_sample_limit > 0: mask_rows = rows[: args.mask_sample_limit] else: mask_rows = rows by_split = Counter(row["split"] for row in rows) by_video: Dict[str, List[dict]] = defaultdict(list) by_object: Dict[Tuple[str, str], List[dict]] = defaultdict(list) by_video_category: Dict[Tuple[str, str], set] = defaultdict(set) enriched: List[dict] = [] for row in rows: vid = video_id(row) fid = fid_value(row) category = category_from_uid(row) expr = row.get("exp", "") row2 = dict(row) row2["vid"] = vid row2["fid"] = fid row2["category"] = category row2["is_null_split"] = row.get("split") == "test_n" row2["is_audio_keyword"] = has_any(expr, AUDIO_KEYWORDS) row2["is_spatial_keyword"] = has_any(expr, SPATIAL_KEYWORDS) by_video[vid].append(row2) by_object[(vid, fid)].append(row2) by_video_category[(vid, category)].add(fid) enriched.append(row2) mask_stats_by_uid: Dict[str, dict] = {} for row in mask_rows: uid = row["uid"] mask_stats_by_uid[uid] = row_mask_stats(data_dir, row, args.frames, args.small_area) sample_rows: List[dict] = [] for row in enriched: stats = mask_stats_by_uid.get(row["uid"], {}) same_cat_fids = by_video_category[(row["vid"], row["category"])] row2 = dict(row) row2.update(stats) row2["same_category_distractor_heuristic"] = len(same_cat_fids) >= 2 row2["multi_expr_video"] = len(by_video[row["vid"]]) >= 2 row2["multi_expr_object"] = len(by_object[(row["vid"], row["fid"])]) >= 2 row2["h3_candidate"] = row2["multi_expr_object"] and not row2["is_null_split"] sample_rows.append(row2) video_expr_counts = [len(v) for v in by_video.values()] object_expr_counts = [len(v) for v in by_object.values()] h3_objects = [k for k, v in by_object.items() if len(v) >= 2 and v[0]["split"] != "test_n"] null_rows = [r for r in enriched if r["is_null_split"]] audio_rows = [r for r in enriched if r["is_audio_keyword"]] spatial_rows = [r for r in enriched if r["is_spatial_keyword"]] same_cat_rows = [r for r in sample_rows if r.get("same_category_distractor_heuristic")] audited_mask_rows = [r for r in sample_rows if "visible_ratio" in r] late_rows = [r for r in audited_mask_rows if r.get("late_target")] small_rows = [r for r in audited_mask_rows if r.get("small_target")] partial_rows = [r for r in audited_mask_rows if r.get("partial_target")] unstable_rows = [r for r in audited_mask_rows if r.get("area_unstable")] summary = { "data_dir": str(data_dir), "num_expressions": len(rows), "num_videos": len(by_video), "num_objects_vid_fid": len(by_object), "splits": dict(by_split), "expressions_per_video": summarize_counts(video_expr_counts), "expressions_per_object": summarize_counts(object_expr_counts), "multi_expression_videos": sum(c >= 2 for c in video_expr_counts), "multi_expression_objects": sum(c >= 2 for c in object_expr_counts), "h3_candidate_objects": len(h3_objects), "h3_candidate_expressions": sum(len(by_object[k]) for k in h3_objects), "null_split_expressions": len(null_rows), "null_split_percent": pct(len(null_rows), len(rows)), "audio_keyword_expressions": len(audio_rows), "audio_keyword_percent": pct(len(audio_rows), len(rows)), "spatial_keyword_expressions": len(spatial_rows), "spatial_keyword_percent": pct(len(spatial_rows), len(rows)), "same_category_distractor_heuristic_expressions": len(same_cat_rows), "same_category_distractor_heuristic_percent": pct(len(same_cat_rows), len(rows)), "mask_rows_audited": len(audited_mask_rows), "late_target_expressions": len(late_rows), "small_target_expressions": len(small_rows), "partial_target_expressions": len(partial_rows), "area_unstable_expressions": len(unstable_rows), } with (out_dir / "audit_summary.json").open("w") as f: json.dump(summary, f, indent=2, sort_keys=True) fields = [ "uid", "vid", "split", "fid", "category", "exp", "is_null_split", "is_audio_keyword", "is_spatial_keyword", "multi_expr_video", "multi_expr_object", "h3_candidate", "same_category_distractor_heuristic", "visible_frames", "visible_ratio", "first_visible", "late_target", "mean_visible_area", "mean_all_area", "small_target", "partial_target", "area_cv", "area_unstable", "missing_masks", "width", "height", ] write_csv(out_dir / "audit_samples.csv", sample_rows, fields) h3_rows = [r for r in sample_rows if r.get("h3_candidate")] write_csv(out_dir / "h3_candidates.csv", h3_rows, fields) md = [ "# TubeToken Phase -1 Audit", "", f"- Expressions: {summary['num_expressions']}", f"- Videos: {summary['num_videos']}", f"- Objects `(vid, fid)`: {summary['num_objects_vid_fid']}", f"- Splits: `{dict(by_split)}`", "", "## Multi-expression", "", f"- Expressions/video mean: {summary['expressions_per_video']['mean']:.3f}", f"- Expressions/video median: {summary['expressions_per_video']['median']}", f"- Videos with >=2 expressions: {summary['multi_expression_videos']}", f"- Expressions/object mean: {summary['expressions_per_object']['mean']:.3f}", f"- Objects with >=2 expressions: {summary['multi_expression_objects']}", f"- H3 candidate objects: {summary['h3_candidate_objects']}", f"- H3 candidate expressions: {summary['h3_candidate_expressions']}", "", "## Diagnostic Subsets", "", f"- Null split expressions: {summary['null_split_expressions']} ({summary['null_split_percent']:.2f}%)", f"- Audio-keyword expressions: {summary['audio_keyword_expressions']} ({summary['audio_keyword_percent']:.2f}%)", f"- Spatial-keyword expressions: {summary['spatial_keyword_expressions']} ({summary['spatial_keyword_percent']:.2f}%)", f"- Same-category distractor heuristic expressions: {summary['same_category_distractor_heuristic_expressions']} ({summary['same_category_distractor_heuristic_percent']:.2f}%)", f"- Mask rows audited: {summary['mask_rows_audited']}", f"- Late-target expressions: {summary['late_target_expressions']}", f"- Small-target expressions: {summary['small_target_expressions']}", f"- Partial-target expressions: {summary['partial_target_expressions']}", f"- Area-unstable expressions: {summary['area_unstable_expressions']}", "", "## Phase -1 H3 Decision Hint", "", ] epv = summary["expressions_per_video"]["mean"] if epv > 1.5 and summary["h3_candidate_objects"] > 0: md.append("H3 can stay as a direct validation target: the data has multi-expression structure.") elif summary["h3_candidate_objects"] > 0: md.append("H3 should be treated as diagnostic: multi-expression objects exist, but average expressions/video is limited.") else: md.append("H3 should be downgraded: this audit did not find same-object multi-expression candidates.") md.append("") md.append("Generated files: `audit_summary.json`, `audit_samples.csv`, `h3_candidates.csv`.") (out_dir / "audit_report.md").write_text("\n".join(md) + "\n") print(json.dumps(summary, indent=2, sort_keys=True)) print(f"\nWrote audit files to: {out_dir}") if __name__ == "__main__": main()