File size: 12,167 Bytes
6e50f4d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
"""Create a concrete category-expansion proposal and estimate coverage impact.



Inputs:

  - data/analysis/tag_group_uncovered_after_topn_combined200.csv

  - data/category_registry.csv

  - data/tag_groups.json

  - fluffyrock_3m.csv

  - data/eval_samples/e621_sfw_sample_1000_seed123_buffer10000.jsonl



Outputs:

  - data/analysis/category_expansion_proposal.csv

  - data/analysis/category_expansion_coverage.json

"""
from __future__ import annotations

import csv
import json
from collections import Counter
from pathlib import Path
from typing import Dict, List, Set, Tuple


REPO_ROOT = Path(__file__).resolve().parents[1]
UNCOVERED_PATH = REPO_ROOT / "data" / "analysis" / "tag_group_uncovered_after_topn_combined200.csv"
REGISTRY_PATH = REPO_ROOT / "data" / "category_registry.csv"
TAG_GROUPS_PATH = REPO_ROOT / "data" / "tag_groups.json"
FLUFFYROCK_PATH = REPO_ROOT / "fluffyrock_3m.csv"
SAMPLE_PATH = REPO_ROOT / "data" / "eval_samples" / "e621_sfw_sample_1000_seed123_buffer10000.jsonl"

OUT_PROPOSAL = REPO_ROOT / "data" / "analysis" / "category_expansion_proposal.csv"
OUT_COVERAGE = REPO_ROOT / "data" / "analysis" / "category_expansion_coverage.json"

MIN_COUNT = 200
TOP_N_GROUPS = 15
MAX_STEPS = 25


def _load_counts(path: Path) -> Dict[str, int]:
    out: Dict[str, int] = {}
    with path.open("r", encoding="utf-8", newline="") as f:
        reader = csv.reader(f)
        for row in reader:
            if len(row) < 3:
                continue
            try:
                out[row[0]] = int(row[2]) if row[2] else 0
            except ValueError:
                out[row[0]] = 0
    return out


def _load_sample_tags(path: Path, counts: Dict[str, int], min_count: int) -> List[Set[str]]:
    rows: List[Set[str]] = []
    with path.open("r", encoding="utf-8") as f:
        for line in f:
            obj = json.loads(line)
            raw = obj.get("tags_ground_truth_categorized", "")
            if not raw:
                continue
            try:
                d = json.loads(raw)
            except Exception:
                continue
            tags: Set[str] = set()
            if isinstance(d, dict):
                for vals in d.values():
                    if isinstance(vals, list):
                        for t in vals:
                            if isinstance(t, str) and counts.get(t, 0) >= min_count:
                                tags.add(t)
            if tags:
                rows.append(tags)
    return rows


def _load_wiki_groups(path: Path) -> Dict[str, Set[str]]:
    with path.open("r", encoding="utf-8") as f:
        raw = json.load(f)
    return {k: set(v) for k, v in raw.items() if isinstance(v, list)}


def _load_category_groups(path: Path) -> Dict[str, Set[str]]:
    groups: Dict[str, Set[str]] = {}
    with path.open("r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            if (row.get("category_enabled") or "").strip() not in {"1", "true", "True"}:
                continue
            c = (row.get("category_name") or "").strip()
            t = (row.get("tag") or "").strip()
            if c and t:
                groups.setdefault(f"cat:{c}", set()).add(t)
    return groups


def _greedy(groups: Dict[str, Set[str]], tag_occ: Counter, max_steps: int) -> Tuple[List[Dict[str, object]], Set[str]]:
    uncovered = Counter(tag_occ)
    chosen: Set[str] = set()
    selected: List[Dict[str, object]] = []
    total = sum(tag_occ.values())
    covered = 0
    for step in range(1, max_steps + 1):
        best, best_gain = None, 0
        best_new: Set[str] = set()
        for g, tags in groups.items():
            if g in chosen:
                continue
            gain = 0
            new_tags: Set[str] = set()
            for t in tags:
                c = uncovered.get(t, 0)
                if c > 0:
                    gain += c
                    new_tags.add(t)
            if gain > best_gain:
                best, best_gain, best_new = g, gain, new_tags
        if not best or best_gain <= 0:
            break
        chosen.add(best)
        for t in best_new:
            uncovered[t] = 0
        covered += best_gain
        selected.append(
            {
                "step": step,
                "group": best,
                "gain_occurrences": best_gain,
                "cumulative_covered_occurrences": covered,
                "cumulative_covered_pct": round(covered / total * 100.0, 2) if total else 0.0,
            }
        )
    return selected, chosen


def _recommend(tag: str) -> Tuple[str, str, str]:
    if tag in {"solo", "duo", "trio", "group", "solo_focus"}:
        return "new_category", "character_count", "mutually exclusive count-like options"
    if "/" in tag or tag in {"romantic_couple", "interspecies"}:
        return "new_category", "relationship_pairing", "relationship/pairing semantics shown best together"
    if tag in {"muscular", "muscular_anthro", "slightly_chubby", "overweight", "thick_thighs", "wide_hips", "big_butt"}:
        return "new_category", "body_build", "body-shape alternatives useful side-by-side"
    if tag in {
        "canid", "canis", "felid", "felis", "equid", "domestic_dog", "domestic_cat",
        "wolf", "fox", "dragon", "reptile", "leporid", "rabbit", "horse", "pony",
        "pantherine", "bovid", "animal_humanoid", "hybrid",
    }:
        return "new_category", "species_specific", "taxonomy/detail species cluster"
    if any(tag.startswith(c) for c in ("red_", "blue_", "green_", "yellow_", "black_", "white_", "brown_", "grey_", "purple_", "orange_", "teal_")):
        return "merge_existing", "color_markings", "color-region/attribute tag"
    if "hair" in tag:
        return "merge_existing", "hair", "hair style/color detail"
    if tag in {"nipples", "areola", "butt", "navel", "feet", "belly", "abs", "pecs", "teeth", "tongue", "tail", "horn", "wings", "claws", "fangs", "fingers", "toes"}:
        return "merge_existing", "anatomy_features", "anatomy/body-part trait"
    if tag in {"half-closed_eyes", "eyelashes", "eyebrows"}:
        return "merge_existing", "expression_detail", "eye/expression detail"
    if tag in {"bodily_fluids", "saliva", "sweat", "nude", "bound", "bottomless", "hyper"}:
        return "deprioritize", "none", "sensitive/noisy for default non-explicit-centric UX"
    if tag in {"pose", "holding_object", "rear_view", "licking", "biped"}:
        return "merge_existing", "pose_action_detail", "pose/action detail"
    if tag in {"eyewear", "jewelry", "glasses", "hat", "gloves", "panties"}:
        return "merge_existing", "clothing_detail", "attire/accessory detail"
    if tag in {"fur", "tuft", "feathers", "not_furry", "anthrofied"}:
        return "merge_existing", "fur_style", "fur/covering style detail"
    return "needs_review", "uncategorized_review", "high-frequency uncovered tag needing manual judgment"


def main() -> None:
    counts = _load_counts(FLUFFYROCK_PATH)
    sample_rows = _load_sample_tags(SAMPLE_PATH, counts, MIN_COUNT)
    wiki_groups = _load_wiki_groups(TAG_GROUPS_PATH)
    category_groups = _load_category_groups(REGISTRY_PATH)
    base_groups = {**wiki_groups, **category_groups}

    tag_occ = Counter()
    for tags in sample_rows:
        tag_occ.update(tags)

    # Baseline coverage with current wiki+category groups.
    covered_any_base = {t for t in tag_occ if any(t in g for g in base_groups.values())}
    greedy_base, _ = _greedy(base_groups, tag_occ, MAX_STEPS)

    # Build proposal from uncovered-after-topN file (already ranked by frequency).
    proposal_rows: List[Dict[str, str]] = []
    art_group = wiki_groups.get("art", set())
    with UNCOVERED_PATH.open("r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            tag = row["tag"]
            action, target, why = _recommend(tag)
            proposal_rows.append(
                {
                    "tag": tag,
                    "fluffyrock_count": row.get("fluffyrock_count", ""),
                    "sample_occurrences": row.get("sample_occurrences", ""),
                    "proposed_action": action,
                    "target_category": target,
                    "in_art_tag_group": "1" if tag in art_group else "0",
                    "reason": why,
                }
            )

    OUT_PROPOSAL.parent.mkdir(parents=True, exist_ok=True)
    with OUT_PROPOSAL.open("w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(
            f,
            fieldnames=[
                "tag",
                "fluffyrock_count",
                "sample_occurrences",
                "proposed_action",
                "target_category",
                "in_art_tag_group",
                "reason",
            ],
        )
        writer.writeheader()
        writer.writerows(proposal_rows)

    # Apply recommendations to projection groups.
    projected_groups: Dict[str, Set[str]] = {k: set(v) for k, v in base_groups.items()}
    for row in proposal_rows:
        action = row["proposed_action"]
        if action not in {"new_category", "merge_existing"}:
            continue
        target = row["target_category"].strip()
        if not target or target == "none":
            continue
        key = f"cat:{target}"
        projected_groups.setdefault(key, set()).add(row["tag"])

    covered_any_projected = {t for t in tag_occ if any(t in g for g in projected_groups.values())}
    greedy_projected, _ = _greedy(projected_groups, tag_occ, MAX_STEPS)

    topn = TOP_N_GROUPS
    base_topn_pct = greedy_base[topn - 1]["cumulative_covered_pct"] if len(greedy_base) >= topn else (greedy_base[-1]["cumulative_covered_pct"] if greedy_base else 0.0)
    proj_topn_pct = greedy_projected[topn - 1]["cumulative_covered_pct"] if len(greedy_projected) >= topn else (greedy_projected[-1]["cumulative_covered_pct"] if greedy_projected else 0.0)

    summary = {
        "inputs": {
            "min_count": MIN_COUNT,
            "top_n_groups": TOP_N_GROUPS,
            "sample_file": str(SAMPLE_PATH),
            "proposal_source_uncovered": str(UNCOVERED_PATH),
        },
        "proposal_counts": dict(Counter(r["proposed_action"] for r in proposal_rows)),
        "art_tags_in_proposal": [r for r in proposal_rows if r["in_art_tag_group"] == "1"],
        "coverage_baseline": {
            "n_groups": len(base_groups),
            "unique_covered_pct": round((len(covered_any_base) / len(tag_occ) * 100.0), 2) if tag_occ else 0.0,
            "top15_greedy_cumulative_pct": base_topn_pct,
            "top15_groups": [x["group"] for x in greedy_base[:TOP_N_GROUPS]],
        },
        "coverage_projected_with_proposal": {
            "n_groups": len(projected_groups),
            "unique_covered_pct": round((len(covered_any_projected) / len(tag_occ) * 100.0), 2) if tag_occ else 0.0,
            "top15_greedy_cumulative_pct": proj_topn_pct,
            "top15_groups": [x["group"] for x in greedy_projected[:TOP_N_GROUPS]],
        },
        "outputs": {
            "proposal_csv": str(OUT_PROPOSAL),
            "coverage_json": str(OUT_COVERAGE),
        },
    }

    with OUT_COVERAGE.open("w", encoding="utf-8") as f:
        json.dump(summary, f, indent=2, ensure_ascii=False)

    print("Proposal rows:", len(proposal_rows))
    print("Proposal action counts:", summary["proposal_counts"])
    print("Baseline unique covered %:", summary["coverage_baseline"]["unique_covered_pct"])
    print("Projected unique covered %:", summary["coverage_projected_with_proposal"]["unique_covered_pct"])
    print("Baseline top15 greedy %:", summary["coverage_baseline"]["top15_greedy_cumulative_pct"])
    print("Projected top15 greedy %:", summary["coverage_projected_with_proposal"]["top15_greedy_cumulative_pct"])
    print("Outputs:", OUT_PROPOSAL, OUT_COVERAGE)


if __name__ == "__main__":
    main()