"""Create a concrete category-expansion proposal and estimate coverage impact. Inputs: - data/analysis/tag_group_uncovered_after_topn_combined200.csv - data/category_registry.csv - data/tag_groups.json - fluffyrock_3m.csv - data/eval_samples/e621_sfw_sample_1000_seed123_buffer10000.jsonl Outputs: - data/analysis/category_expansion_proposal.csv - data/analysis/category_expansion_coverage.json """ from __future__ import annotations import csv import json from collections import Counter from pathlib import Path from typing import Dict, List, Set, Tuple REPO_ROOT = Path(__file__).resolve().parents[1] UNCOVERED_PATH = REPO_ROOT / "data" / "analysis" / "tag_group_uncovered_after_topn_combined200.csv" REGISTRY_PATH = REPO_ROOT / "data" / "category_registry.csv" TAG_GROUPS_PATH = REPO_ROOT / "data" / "tag_groups.json" FLUFFYROCK_PATH = REPO_ROOT / "fluffyrock_3m.csv" SAMPLE_PATH = REPO_ROOT / "data" / "eval_samples" / "e621_sfw_sample_1000_seed123_buffer10000.jsonl" OUT_PROPOSAL = REPO_ROOT / "data" / "analysis" / "category_expansion_proposal.csv" OUT_COVERAGE = REPO_ROOT / "data" / "analysis" / "category_expansion_coverage.json" MIN_COUNT = 200 TOP_N_GROUPS = 15 MAX_STEPS = 25 def _load_counts(path: Path) -> Dict[str, int]: out: Dict[str, int] = {} with path.open("r", encoding="utf-8", newline="") as f: reader = csv.reader(f) for row in reader: if len(row) < 3: continue try: out[row[0]] = int(row[2]) if row[2] else 0 except ValueError: out[row[0]] = 0 return out def _load_sample_tags(path: Path, counts: Dict[str, int], min_count: int) -> List[Set[str]]: rows: List[Set[str]] = [] with path.open("r", encoding="utf-8") as f: for line in f: obj = json.loads(line) raw = obj.get("tags_ground_truth_categorized", "") if not raw: continue try: d = json.loads(raw) except Exception: continue tags: Set[str] = set() if isinstance(d, dict): for vals in d.values(): if isinstance(vals, list): for t in vals: if isinstance(t, str) and counts.get(t, 0) >= min_count: tags.add(t) if tags: rows.append(tags) return rows def _load_wiki_groups(path: Path) -> Dict[str, Set[str]]: with path.open("r", encoding="utf-8") as f: raw = json.load(f) return {k: set(v) for k, v in raw.items() if isinstance(v, list)} def _load_category_groups(path: Path) -> Dict[str, Set[str]]: groups: Dict[str, Set[str]] = {} with path.open("r", encoding="utf-8", newline="") as f: reader = csv.DictReader(f) for row in reader: if (row.get("category_enabled") or "").strip() not in {"1", "true", "True"}: continue c = (row.get("category_name") or "").strip() t = (row.get("tag") or "").strip() if c and t: groups.setdefault(f"cat:{c}", set()).add(t) return groups def _greedy(groups: Dict[str, Set[str]], tag_occ: Counter, max_steps: int) -> Tuple[List[Dict[str, object]], Set[str]]: uncovered = Counter(tag_occ) chosen: Set[str] = set() selected: List[Dict[str, object]] = [] total = sum(tag_occ.values()) covered = 0 for step in range(1, max_steps + 1): best, best_gain = None, 0 best_new: Set[str] = set() for g, tags in groups.items(): if g in chosen: continue gain = 0 new_tags: Set[str] = set() for t in tags: c = uncovered.get(t, 0) if c > 0: gain += c new_tags.add(t) if gain > best_gain: best, best_gain, best_new = g, gain, new_tags if not best or best_gain <= 0: break chosen.add(best) for t in best_new: uncovered[t] = 0 covered += best_gain selected.append( { "step": step, "group": best, "gain_occurrences": best_gain, "cumulative_covered_occurrences": covered, "cumulative_covered_pct": round(covered / total * 100.0, 2) if total else 0.0, } ) return selected, chosen def _recommend(tag: str) -> Tuple[str, str, str]: if tag in {"solo", "duo", "trio", "group", "solo_focus"}: return "new_category", "character_count", "mutually exclusive count-like options" if "/" in tag or tag in {"romantic_couple", "interspecies"}: return "new_category", "relationship_pairing", "relationship/pairing semantics shown best together" if tag in {"muscular", "muscular_anthro", "slightly_chubby", "overweight", "thick_thighs", "wide_hips", "big_butt"}: return "new_category", "body_build", "body-shape alternatives useful side-by-side" if tag in { "canid", "canis", "felid", "felis", "equid", "domestic_dog", "domestic_cat", "wolf", "fox", "dragon", "reptile", "leporid", "rabbit", "horse", "pony", "pantherine", "bovid", "animal_humanoid", "hybrid", }: return "new_category", "species_specific", "taxonomy/detail species cluster" if any(tag.startswith(c) for c in ("red_", "blue_", "green_", "yellow_", "black_", "white_", "brown_", "grey_", "purple_", "orange_", "teal_")): return "merge_existing", "color_markings", "color-region/attribute tag" if "hair" in tag: return "merge_existing", "hair", "hair style/color detail" if tag in {"nipples", "areola", "butt", "navel", "feet", "belly", "abs", "pecs", "teeth", "tongue", "tail", "horn", "wings", "claws", "fangs", "fingers", "toes"}: return "merge_existing", "anatomy_features", "anatomy/body-part trait" if tag in {"half-closed_eyes", "eyelashes", "eyebrows"}: return "merge_existing", "expression_detail", "eye/expression detail" if tag in {"bodily_fluids", "saliva", "sweat", "nude", "bound", "bottomless", "hyper"}: return "deprioritize", "none", "sensitive/noisy for default non-explicit-centric UX" if tag in {"pose", "holding_object", "rear_view", "licking", "biped"}: return "merge_existing", "pose_action_detail", "pose/action detail" if tag in {"eyewear", "jewelry", "glasses", "hat", "gloves", "panties"}: return "merge_existing", "clothing_detail", "attire/accessory detail" if tag in {"fur", "tuft", "feathers", "not_furry", "anthrofied"}: return "merge_existing", "fur_style", "fur/covering style detail" return "needs_review", "uncategorized_review", "high-frequency uncovered tag needing manual judgment" def main() -> None: counts = _load_counts(FLUFFYROCK_PATH) sample_rows = _load_sample_tags(SAMPLE_PATH, counts, MIN_COUNT) wiki_groups = _load_wiki_groups(TAG_GROUPS_PATH) category_groups = _load_category_groups(REGISTRY_PATH) base_groups = {**wiki_groups, **category_groups} tag_occ = Counter() for tags in sample_rows: tag_occ.update(tags) # Baseline coverage with current wiki+category groups. covered_any_base = {t for t in tag_occ if any(t in g for g in base_groups.values())} greedy_base, _ = _greedy(base_groups, tag_occ, MAX_STEPS) # Build proposal from uncovered-after-topN file (already ranked by frequency). proposal_rows: List[Dict[str, str]] = [] art_group = wiki_groups.get("art", set()) with UNCOVERED_PATH.open("r", encoding="utf-8", newline="") as f: reader = csv.DictReader(f) for row in reader: tag = row["tag"] action, target, why = _recommend(tag) proposal_rows.append( { "tag": tag, "fluffyrock_count": row.get("fluffyrock_count", ""), "sample_occurrences": row.get("sample_occurrences", ""), "proposed_action": action, "target_category": target, "in_art_tag_group": "1" if tag in art_group else "0", "reason": why, } ) OUT_PROPOSAL.parent.mkdir(parents=True, exist_ok=True) with OUT_PROPOSAL.open("w", encoding="utf-8", newline="") as f: writer = csv.DictWriter( f, fieldnames=[ "tag", "fluffyrock_count", "sample_occurrences", "proposed_action", "target_category", "in_art_tag_group", "reason", ], ) writer.writeheader() writer.writerows(proposal_rows) # Apply recommendations to projection groups. projected_groups: Dict[str, Set[str]] = {k: set(v) for k, v in base_groups.items()} for row in proposal_rows: action = row["proposed_action"] if action not in {"new_category", "merge_existing"}: continue target = row["target_category"].strip() if not target or target == "none": continue key = f"cat:{target}" projected_groups.setdefault(key, set()).add(row["tag"]) covered_any_projected = {t for t in tag_occ if any(t in g for g in projected_groups.values())} greedy_projected, _ = _greedy(projected_groups, tag_occ, MAX_STEPS) topn = TOP_N_GROUPS base_topn_pct = greedy_base[topn - 1]["cumulative_covered_pct"] if len(greedy_base) >= topn else (greedy_base[-1]["cumulative_covered_pct"] if greedy_base else 0.0) proj_topn_pct = greedy_projected[topn - 1]["cumulative_covered_pct"] if len(greedy_projected) >= topn else (greedy_projected[-1]["cumulative_covered_pct"] if greedy_projected else 0.0) summary = { "inputs": { "min_count": MIN_COUNT, "top_n_groups": TOP_N_GROUPS, "sample_file": str(SAMPLE_PATH), "proposal_source_uncovered": str(UNCOVERED_PATH), }, "proposal_counts": dict(Counter(r["proposed_action"] for r in proposal_rows)), "art_tags_in_proposal": [r for r in proposal_rows if r["in_art_tag_group"] == "1"], "coverage_baseline": { "n_groups": len(base_groups), "unique_covered_pct": round((len(covered_any_base) / len(tag_occ) * 100.0), 2) if tag_occ else 0.0, "top15_greedy_cumulative_pct": base_topn_pct, "top15_groups": [x["group"] for x in greedy_base[:TOP_N_GROUPS]], }, "coverage_projected_with_proposal": { "n_groups": len(projected_groups), "unique_covered_pct": round((len(covered_any_projected) / len(tag_occ) * 100.0), 2) if tag_occ else 0.0, "top15_greedy_cumulative_pct": proj_topn_pct, "top15_groups": [x["group"] for x in greedy_projected[:TOP_N_GROUPS]], }, "outputs": { "proposal_csv": str(OUT_PROPOSAL), "coverage_json": str(OUT_COVERAGE), }, } with OUT_COVERAGE.open("w", encoding="utf-8") as f: json.dump(summary, f, indent=2, ensure_ascii=False) print("Proposal rows:", len(proposal_rows)) print("Proposal action counts:", summary["proposal_counts"]) print("Baseline unique covered %:", summary["coverage_baseline"]["unique_covered_pct"]) print("Projected unique covered %:", summary["coverage_projected_with_proposal"]["unique_covered_pct"]) print("Baseline top15 greedy %:", summary["coverage_baseline"]["top15_greedy_cumulative_pct"]) print("Projected top15 greedy %:", summary["coverage_projected_with_proposal"]["top15_greedy_cumulative_pct"]) print("Outputs:", OUT_PROPOSAL, OUT_COVERAGE) if __name__ == "__main__": main()