File size: 40,371 Bytes
a969e99
 
 
 
 
 
 
 
e70d416
a969e99
 
 
 
 
 
e70d416
 
 
 
 
 
 
 
 
 
 
 
 
a23e42b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a969e99
e70d416
9d6d760
a969e99
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e70d416
 
 
 
 
a969e99
 
 
 
 
 
 
 
 
e70d416
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a23e42b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e70d416
 
a23e42b
e70d416
 
 
 
 
 
a23e42b
 
 
 
e70d416
 
 
 
 
 
 
 
 
 
 
a23e42b
 
 
e70d416
 
 
 
 
 
 
 
 
 
 
 
266f01b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a23e42b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a969e99
 
a23e42b
 
266f01b
 
 
 
 
 
a969e99
 
266f01b
 
 
 
 
 
 
 
e70d416
 
 
 
a969e99
 
 
 
 
 
 
e70d416
 
 
 
 
a969e99
e70d416
 
 
 
 
 
a23e42b
 
 
 
 
266f01b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e70d416
 
 
 
 
 
a969e99
 
 
 
e70d416
 
a969e99
 
 
a23e42b
 
 
266f01b
 
 
 
 
94d49c0
266f01b
 
 
e70d416
 
 
266f01b
 
 
 
 
 
e70d416
 
 
 
 
a969e99
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e70d416
a969e99
 
e70d416
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a969e99
 
 
 
e70d416
 
 
 
 
a23e42b
 
 
 
 
 
e921d3e
63cb6b2
 
a23e42b
 
a969e99
 
e70d416
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a969e99
e70d416
 
 
 
 
 
 
 
 
 
a969e99
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
"""
eval-card-registry CLI.

Commands:
  seed      Load known entities from seed/ YAML files
  stats     Print registry summary
  sync      Batch sync one or all EEE configs β†’ eval_results table
"""
import json
from pathlib import Path
from typing import Optional

import typer
import yaml


def _json_encode_if_needed(value):
    """Encode lists/dicts as JSON strings; pass through anything else.

    seed/models.yaml uses YAML-native lists for `tags` (e.g. `["open-weight"]`)
    while seed/benchmarks.yaml stores them pre-encoded as strings (e.g.
    `'["instruction-following"]'`). The canonical_* parquet columns are all
    VARCHAR, so we coerce on the way in to keep both formats supported.
    """
    if isinstance(value, (list, dict)):
        return json.dumps(value)
    return value


def _legacy_parent_model_id_to_parents(entry: dict) -> None:
    """Translate a legacy `parent_model_id: X` field to the typed `parents`
    list shape. Mutates the entry in place.

    Legacy core.yaml / sources/*.generated.yaml use a single scalar
    `parent_model_id` to express a family/variant relationship (e.g.
    Llama-3-8B β†’ Llama-3). The new schema replaces this with a typed list
    of parent edges. This shim converts on load so existing YAML keeps
    working until each file is migrated to emit `parents` natively.

    No-op when `parents` is already present (new shape wins) or when neither
    field is set.
    """
    if "parents" in entry and entry["parents"] is not None:
        entry.pop("parent_model_id", None)
        return
    legacy = entry.pop("parent_model_id", None)
    if legacy:
        entry["parents"] = [{"id": legacy, "relationship": "variant", "axis": "size"}]

from eval_card_registry.store.hf_store import get_store
from eval_card_registry.store import queries, schemas
from eval_card_registry.store.queries import _is_na

app = typer.Typer(help="eval-card-registry CLI")


def _load_store():
    store = get_store()
    if not store.loaded:
        store.load()
    return store


# ------------------------------------------------------------------
# seed
# ------------------------------------------------------------------

@app.command()
def seed(
    local: bool = typer.Option(False, "--local", help="Write to fixtures/ instead of HF Hub"),
    seed_dir: str = typer.Option("./seed", "--seed-dir"),
    prune_stale: bool = typer.Option(
        False,
        "--prune-stale/--no-prune-stale",
        help="Remove reviewed seed entities and seed aliases absent from the current YAML snapshot.",
    ),
):
    """Load known canonical entities from seed YAML files."""
    import os
    if local:
        os.environ["LOCAL_MODE"] = "true"

    store = _load_store()
    seed_path = Path(seed_dir)

    # ------------------------------------------------------------------
    # Models β€” three-layer load from seed/models/:
    #   sources/*.generated.yaml  β†’ external catalog data (e.g. models.dev),
    #                               flat lists, never hand-edited
    #   core.yaml                 β†’ curated canonicals (the source of truth),
    #                               flat list OR {skip_ids, entries} dict
    #   enrichments/aliases.yaml  β†’ optional alias-only entries ({id, aliases})
    #                               that union onto whatever exists
    #
    # Merge order: sources β†’ core β†’ enrichments. Field-level merge per entry
    # (aliases / tags UNION; other scalars prefer non-empty, last-write-wins).
    # `skip_ids` from core drops generated entries we don't want.
    # ------------------------------------------------------------------
    def _load_models_merged() -> list[dict]:
        models_dir = seed_path / "models"
        sources_dir = models_dir / "sources"
        core_file = models_dir / "core.yaml"
        enrichments_file = models_dir / "enrichments" / "aliases.yaml"

        source_entries: list[dict] = []
        core_entries: list[dict] = []
        enrichment_entries: list[dict] = []
        skip_ids: set[str] = set()

        if sources_dir.is_dir():
            for src_path in sorted(sources_dir.glob("*.generated.yaml")):
                with open(src_path) as f:
                    loaded = yaml.safe_load(f) or []
                if not isinstance(loaded, list):
                    raise typer.BadParameter(f"{src_path} must be a flat list")
                source_entries.extend(loaded)

        skip_source_ids: set[str] = set()
        if core_file.exists():
            with open(core_file) as f:
                loaded = yaml.safe_load(f) or {}
            if isinstance(loaded, list):
                core_entries = loaded
            elif isinstance(loaded, dict):
                core_entries = loaded.get("entries", []) or []
                skip_ids = set(loaded.get("skip_ids", []) or [])
                # `skip_source_ids` drops these ids from sources/enrichments only,
                # leaving core entries authoritative. Used when models.dev (or any
                # auto-generated source) ships bad aliases for a model that core.yaml
                # curates correctly β€” otherwise the loader's UNION-merge would
                # re-introduce the bad aliases on every refresh.
                skip_source_ids = set(loaded.get("skip_source_ids", []) or [])
            else:
                raise typer.BadParameter(f"{core_file} unexpected shape {type(loaded)}")

        if enrichments_file.exists():
            with open(enrichments_file) as f:
                loaded = yaml.safe_load(f) or []
            if not isinstance(loaded, list):
                raise typer.BadParameter(f"{enrichments_file} must be a flat list")
            enrichment_entries = loaded

        def _merge_into(target: dict, src: dict) -> dict:
            """Merge two entries with the same canonical_id.

            Field-level merge policy:
            - `aliases`: UNION (case-insensitive dedup).
            - `tags`: UNION (case-insensitive dedup). Both YAML-list and
              JSON-encoded-string forms supported. Protects against session
              additions overwriting `[open-weight, moe]` with `[open-weight]`.
            - Other scalars: prefer non-empty across the pair; when both
              sides have a non-empty value, last-write-wins. Protects against
              session-batch entries that omit `architecture` /
              `params_billions` from silently overwriting earlier rich entries.

            "Empty" means: None, "", [], {}, or default-looking '{}' / '[]'.
            """
            import json as _json

            existing_aliases = list(target.get("aliases") or [])
            existing_lc = {a.lower() for a in existing_aliases if a}
            new_aliases = list(src.get("aliases") or [])
            for a in new_aliases:
                if a and a.lower() not in existing_lc:
                    existing_aliases.append(a)
                    existing_lc.add(a.lower())

            def _decode_list_field(v):
                """tags / metadata may be either YAML-list or JSON-encoded
                string. Return a list (best-effort) and a boolean indicating
                whether to re-encode on write."""
                if v is None:
                    return [], False
                if isinstance(v, list):
                    return list(v), False
                if isinstance(v, str):
                    s = v.strip()
                    if not s or s in ("[]", "null"):
                        return [], True
                    try:
                        d = _json.loads(s)
                        if isinstance(d, list):
                            return list(d), True
                    except (ValueError, TypeError):
                        pass
                return [v], False

            # Union tags (handles both list and JSON-string formats)
            tgt_tags, tgt_was_json = _decode_list_field(target.get("tags"))
            src_tags, src_was_json = _decode_list_field(src.get("tags"))
            seen_tags_lc = {str(t).lower() for t in tgt_tags}
            for t in src_tags:
                if t is not None and str(t).lower() not in seen_tags_lc:
                    tgt_tags.append(t)
                    seen_tags_lc.add(str(t).lower())
            # Re-encode if either source was a JSON string (the parquet column
            # is VARCHAR; _json_encode_if_needed downstream handles either).
            tags_merged = _json.dumps(tgt_tags) if (tgt_was_json or src_was_json) else tgt_tags

            def _is_empty(v) -> bool:
                if v is None:
                    return True
                if isinstance(v, (list, dict)) and len(v) == 0:
                    return True
                if isinstance(v, str) and v.strip() in ("", "[]", "{}"):
                    return True
                return False

            # Union `parents` by id. For an edge present in both, field-merge
            # within the edge so a later source can fill in `axis` (or correct
            # `relationship`) without duplicating the edge. Edges from the
            # target preserve their order; new edges from src are appended.
            tgt_parents, tgt_p_was_json = _decode_list_field(target.get("parents"))
            src_parents, src_p_was_json = _decode_list_field(src.get("parents"))
            parents_by_id: dict[str, dict] = {}
            parents_order: list[str] = []
            for p in tgt_parents:
                if isinstance(p, dict) and p.get("id"):
                    pid = p["id"]
                    if pid not in parents_by_id:
                        parents_order.append(pid)
                        parents_by_id[pid] = dict(p)
            for p in src_parents:
                if not isinstance(p, dict) or not p.get("id"):
                    continue
                pid = p["id"]
                if pid in parents_by_id:
                    merged_edge = dict(parents_by_id[pid])
                    for k, v in p.items():
                        if _is_empty(v):
                            continue
                        merged_edge[k] = v
                    parents_by_id[pid] = merged_edge
                else:
                    parents_order.append(pid)
                    parents_by_id[pid] = dict(p)
            parents_list = [parents_by_id[pid] for pid in parents_order]
            parents_merged = (
                _json.dumps(parents_list)
                if (tgt_p_was_json or src_p_was_json)
                else parents_list
            )

            merged = dict(target)
            for k, v in src.items():
                if k in ("aliases", "tags", "parents"):
                    continue  # handled separately
                if _is_empty(v):
                    continue
                merged[k] = v
            merged["aliases"] = existing_aliases
            merged["tags"] = tags_merged
            # Only emit `parents` if at least one side had any (avoids creating
            # a spurious empty list on entries that never had a parents field).
            if tgt_parents or src_parents:
                merged["parents"] = parents_merged
            return merged

        by_id: dict[str, dict] = {}

        def _absorb(entries: list[dict], extra_skip: set[str] = frozenset()) -> None:
            drop = skip_ids | extra_skip
            for e in entries:
                if "id" not in e:
                    raise typer.BadParameter(f"models seed entry missing id: {e!r}")
                if e["id"] in drop:
                    continue
                # Translate legacy `parent_model_id` scalar to the typed
                # `parents` list before any merge / column-filter step.
                _legacy_parent_model_id_to_parents(e)
                if e["id"] in by_id:
                    by_id[e["id"]] = _merge_into(by_id[e["id"]], e)
                else:
                    by_id[e["id"]] = e

        # Sources/enrichments respect both skip_ids and skip_source_ids;
        # core entries respect only skip_ids so curated overrides always apply.
        _absorb(source_entries, extra_skip=skip_source_ids)
        _absorb(core_entries)
        _absorb(enrichment_entries, extra_skip=skip_source_ids)
        return list(by_id.values())

    # ------------------------------------------------------------------
    # Benchmarks β€” two-source load:
    #   seed/benchmarks.yaml                 β†’ curated canonicals (the
    #                                          source of truth, hand-edited)
    #   seed/benchmarks_generated/*.yaml     β†’ bulk auto-generated entries
    #                                          (e.g. AIR-Bench 2024's 373
    #                                          categories from
    #                                          scripts/refresh_air_bench_taxonomy.py)
    #
    # Merge order: generated β†’ curated. Field-level merge per id (aliases
    # union; other scalars prefer non-empty, last-write-wins) so curated
    # entries can refine an auto-generated row without losing its aliases.
    # Generator scripts must use stable canonical_ids so refreshes are
    # idempotent.
    # ------------------------------------------------------------------
    def _load_benchmarks_merged() -> list[dict]:
        curated_path = seed_path / "benchmarks.yaml"
        generated_dir = seed_path / "benchmarks_generated"

        generated_entries: list[dict] = []
        if generated_dir.is_dir():
            for src_path in sorted(generated_dir.glob("*.yaml")):
                with open(src_path) as f:
                    loaded = yaml.safe_load(f) or []
                if not isinstance(loaded, list):
                    raise typer.BadParameter(f"{src_path} must be a flat list")
                generated_entries.extend(loaded)

        curated_entries: list[dict] = []
        if curated_path.exists():
            with open(curated_path) as f:
                loaded = yaml.safe_load(f) or []
            if not isinstance(loaded, list):
                raise typer.BadParameter(f"{curated_path} must be a flat list")
            curated_entries = loaded

        def _merge_benchmark(generated: dict, curated: dict) -> dict:
            """Curated wins on every field it specifies; aliases are
            unioned (case-insensitive dedup) so generator-emitted aliases
            survive even when curated narrows the entry."""
            merged = dict(generated)
            for k, v in curated.items():
                if k == "aliases":
                    continue
                merged[k] = v
            existing = list(generated.get("aliases") or [])
            existing_lc = {a.lower() for a in existing if a}
            for a in (curated.get("aliases") or []):
                if a and a.lower() not in existing_lc:
                    existing.append(a)
                    existing_lc.add(a.lower())
            merged["aliases"] = existing
            return merged

        by_id: dict[str, dict] = {}
        for entry in generated_entries:
            if "id" not in entry:
                raise typer.BadParameter(f"benchmarks generated entry missing id: {entry!r}")
            by_id[entry["id"]] = entry
        for entry in curated_entries:
            if "id" not in entry:
                raise typer.BadParameter(f"benchmarks seed entry missing id: {entry!r}")
            if entry["id"] in by_id:
                by_id[entry["id"]] = _merge_benchmark(by_id[entry["id"]], entry)
            else:
                by_id[entry["id"]] = entry
        return list(by_id.values())

    # ------------------------------------------------------------------
    # Families β€” translate seed/families.yaml's nested {slug: {fields}}
    # shape into flat dicts ready for upsert. The YAML uses the slug as
    # the mapping key for human friendliness (`mmlu:` reads as a header);
    # the table needs `id` as a column.
    #
    # Output schema mirrors `canonical_families`: list-valued fields
    # (`benchmark_ids`, `folder_aliases`, `composite_keys`) are
    # JSON-encoded so they round-trip through the parquet StringDtype
    # column without losing structure.
    # ------------------------------------------------------------------
    def _load_families_seed() -> list[dict]:
        path = seed_path / "families.yaml"
        if not path.exists():
            return []
        with open(path) as f:
            raw = yaml.safe_load(f) or {}
        if not isinstance(raw, dict):
            raise typer.BadParameter(f"{path} must be a top-level mapping {{slug: {{...}}}}")

        out: list[dict] = []
        # Validation: each benchmark may only appear in one curated family.
        seen_benchmarks: dict[str, str] = {}
        for slug, fields in raw.items():
            if not isinstance(fields, dict):
                raise typer.BadParameter(f"family {slug!r} entry must be a mapping, got {type(fields).__name__}")
            benchmark_ids = list(fields.get("benchmarks") or [])
            for bid in benchmark_ids:
                if bid in seen_benchmarks and seen_benchmarks[bid] != slug:
                    raise typer.BadParameter(
                        f"benchmark {bid!r} listed in two families: "
                        f"{seen_benchmarks[bid]!r} and {slug!r}"
                    )
                seen_benchmarks[bid] = slug
            entry = {
                "id": slug,
                "display_name": fields.get("display") or slug,
                "category": fields.get("category"),
                "benchmark_ids": benchmark_ids,
                "primary_benchmark_key": fields.get("primary_benchmark_key"),
                "folder_aliases": list(fields.get("folder_aliases") or []),
                "composite_keys": list(fields.get("composite_keys") or []),
                "tags": fields.get("tags") or [],
                "metadata": fields.get("metadata") or {},
                "review_status": fields.get("review_status") or "reviewed",
            }
            out.append(entry)
        return out

    # ------------------------------------------------------------------
    # Composites β€” same translation as families. YAML shape:
    #   {slug: {display, configs: [...], category?, family_id?}}
    # ------------------------------------------------------------------
    def _load_composites_seed() -> list[dict]:
        path = seed_path / "composites.yaml"
        if not path.exists():
            return []
        with open(path) as f:
            raw = yaml.safe_load(f) or {}
        if not isinstance(raw, dict):
            raise typer.BadParameter(f"{path} must be a top-level mapping {{slug: {{...}}}}")

        out: list[dict] = []
        for slug, fields in raw.items():
            if not isinstance(fields, dict):
                raise typer.BadParameter(f"composite {slug!r} entry must be a mapping, got {type(fields).__name__}")
            raw_configs = fields.get("configs")
            if raw_configs is None:
                # Display-only override (no explicit `configs:`): implicit
                # single source_config equal to the slug. Some upstream
                # EEE folders are kebab (`arc-agi`), others snake
                # (`helm_classic`); ship both forms so the producer's
                # composite_config_map JOIN matches whichever the data
                # uses. De-dup when slug has no `-`.
                kebab = slug
                snake = slug.replace("-", "_")
                source_configs = [kebab] if kebab == snake else [kebab, snake]
            else:
                source_configs = [str(c) for c in raw_configs]
            entry = {
                "id": slug,
                "display_name": fields.get("display") or slug,
                "category": fields.get("category"),
                "source_configs": source_configs,
                "family_id": fields.get("family_id"),
                "tags": fields.get("tags") or [],
                "metadata": fields.get("metadata") or {},
                "review_status": fields.get("review_status") or "reviewed",
            }
            out.append(entry)
        return out

    # ------------------------------------------------------------------
    # Orgs β€” two-file load:
    #   seed/orgs.yaml            β†’ curated first-party labs (the source
    #                               of truth, hand-edited)
    #   seed/orgs.generated.yaml  β†’ auto-created orgs from hub-stats refresh
    #                               (HF authors that aren't curated labs)
    #
    # Curated wins on id collision. Unlike the models merge (field-level),
    # orgs use a simple "drop generated entry if id is in curated" policy:
    # curated entries are deliberate and richer; auto-created entries are
    # thin (just id, display_name, kind=unknown), so a partial overlay
    # would never improve the curated record.
    # ------------------------------------------------------------------
    def _load_orgs_merged() -> list[dict]:
        curated_path = seed_path / "orgs.yaml"
        generated_path = seed_path / "orgs.generated.yaml"

        curated: list[dict] = []
        if curated_path.exists():
            with open(curated_path) as f:
                loaded = yaml.safe_load(f) or []
            if not isinstance(loaded, list):
                raise typer.BadParameter(f"{curated_path} must be a flat list")
            curated = loaded

        generated: list[dict] = []
        if generated_path.exists():
            with open(generated_path) as f:
                loaded = yaml.safe_load(f) or []
            if not isinstance(loaded, list):
                raise typer.BadParameter(f"{generated_path} must be a flat list")
            generated = loaded

        curated_ids = {e["id"] for e in curated if "id" in e}
        out = list(curated)
        for e in generated:
            if "id" not in e:
                raise typer.BadParameter(f"orgs.generated.yaml entry missing id: {e!r}")
            if e["id"] not in curated_ids:
                out.append(e)
        return out

    # table name, yaml file, label, entity_type (for alias creation)
    seed_specs = [
        # Orgs: load via merge helper to combine curated + auto-generated.
        ("canonical_orgs", "__merged_orgs__", "orgs", "org"),
        # Benchmarks: load via merge helper. Curated entries live in
        # seed/benchmarks.yaml; bulk-generated entries (e.g. AIR-Bench
        # 2024's 373 categories from the refresh script) live in
        # seed/benchmarks_generated/*.yaml. Sentinel path triggers the
        # _load_benchmarks_merged() helper.
        ("canonical_benchmarks", "__merged_benchmarks__", "benchmarks", "benchmark"),
        ("canonical_metrics", seed_path / "metrics.yaml", "metrics", "metric"),
        ("eval_harnesses", seed_path / "harnesses.yaml", "harnesses", "harness"),
        # Families & composites are first-class registry entities since
        # the hierarchy-alignment work (notes/hierarchy-alignment.md
        # Β§4 / Β§7 Step 2). Their YAML uses {slug: {...}} shape, so we
        # need translation loaders rather than the flat-list path.
        # entity_type='family'/'composite' aliases are emitted for
        # consistency but aren't consulted by the resolver today.
        ("canonical_families", "__nested_families__", "families", "family"),
        ("canonical_composites", "__nested_composites__", "composites", "composite"),
        # Models: load via the merge helper; pass a sentinel path that
        # signals the loop below to invoke _load_models_merged() instead of
        # reading a single YAML file.
        ("canonical_models", "__merged_models__", "models", "model"),
    ]

    alias_count = 0
    # Track all seed entity IDs and alias keys so we can remove stale ones.
    # Alias key: (raw_value, entity_type, canonical_id, source_config)
    seed_snapshot: list[tuple[str, str, set[str], set[tuple[str, str, str, Optional[str]]]]] = []

    # Build the alias index once so add_alias collision checks are O(1) instead
    # of O(N) DataFrame mask scans. Combined with buffered=True below, this
    # avoids the O(NΒ²) pd.concat-per-row cost on ~1k entities + ~13k aliases.
    queries._rebuild_alias_index(store)

    for table, yaml_file, label, entity_type in seed_specs:
        table_columns = set(schemas.empty(table).columns)
        if yaml_file == "__merged_models__":
            items = _load_models_merged()
            if not items:
                typer.echo(f"  [skip] no model entries found in seed/models.yaml or _overrides/")
                continue
        elif yaml_file == "__merged_orgs__":
            items = _load_orgs_merged()
            if not items:
                typer.echo(f"  [skip] no org entries found in seed/orgs.yaml or seed/orgs.generated.yaml")
                continue
        elif yaml_file == "__merged_benchmarks__":
            items = _load_benchmarks_merged()
            if not items:
                typer.echo(f"  [skip] no benchmark entries found in seed/benchmarks.yaml or seed/benchmarks_generated/")
                continue
        elif yaml_file == "__nested_families__":
            items = _load_families_seed()
            if not items:
                typer.echo(f"  [skip] no family entries found in seed/families.yaml")
                continue
        elif yaml_file == "__nested_composites__":
            items = _load_composites_seed()
            if not items:
                typer.echo(f"  [skip] no composite entries found in seed/composites.yaml")
                continue
        else:
            if not yaml_file.exists():
                typer.echo(f"  [skip] {yaml_file} not found")
                continue
            with open(yaml_file) as f:
                items = yaml.safe_load(f) or []

        yaml_ids: set[str] = set()
        yaml_alias_keys: set[tuple[str, str, str, Optional[str]]] = set()

        for original_item in items:
            item = dict(original_item)
            # Pop 'aliases' / 'scoped_aliases' before upserting β€” not table columns.
            extra_aliases = item.pop("aliases", []) or []
            scoped_aliases = item.pop("scoped_aliases", {}) or {}
            # Normalize list/dict columns: YAML may have native lists/dicts,
            # but the canonical_* parquet columns are VARCHAR, so encode if
            # needed. `parents` is a list-of-edges on canonical_models.
            # `benchmark_ids` / `folder_aliases` / `composite_keys` are
            # list-valued on canonical_families. `source_configs` is
            # list-valued on canonical_composites.
            for col in (
                "tags", "metadata", "parents",
                "input_modalities", "output_modalities",
                "benchmark_ids", "folder_aliases", "composite_keys",
                "source_configs",
            ):
                if col in item:
                    item[col] = _json_encode_if_needed(item[col])
            entity_item = {k: v for k, v in item.items() if k in table_columns}
            unknown_keys = sorted(set(item.keys()) - table_columns)
            if unknown_keys:
                typer.echo(
                    f"  [warn] {label} entry {item.get('id', '?')!r} has unknown "
                    f"key(s) {unknown_keys} β€” silently dropped. Check for typos."
                )
            if "id" not in entity_item:
                raise typer.BadParameter(f"{label} seed entry is missing required id: {original_item!r}")
            queries.upsert_entity(store, table, entity_item, buffered=True)
            canonical_id = entity_item["id"]
            display_name = entity_item.get("display_name", "")
            yaml_ids.add(canonical_id)

            # Global aliases (source_config=None): matched regardless of caller's source_config.
            # Scoped aliases (source_config=<name>): matched only when the caller passes that
            # source_config β€” lets short tokens ("Overall", "Arabic") map to different
            # benchmarks depending on which EEE config they came from.
            global_aliases = {canonical_id, display_name} | set(extra_aliases)

            alias_specs: list[tuple[str, Optional[str]]] = [
                (raw, None) for raw in global_aliases if raw
            ]
            for source_cfg, raw_values in scoped_aliases.items():
                for raw in raw_values or []:
                    if raw:
                        alias_specs.append((raw, source_cfg))

            for raw_value, source_cfg in alias_specs:
                # Index stale-removal by (raw_value, entity_type, canonical_id, source_config)
                yaml_alias_keys.add((raw_value, entity_type, canonical_id, source_cfg))
                try:
                    queries.add_alias(store, {
                        "raw_value": raw_value,
                        "entity_type": entity_type,
                        "canonical_id": canonical_id,
                        "source_config": source_cfg,
                        "source_field": "seed",
                        "status": "confirmed",
                        "strategy": "seed",
                        "confidence": 1.0,
                        "notes": None,
                    }, buffered=True)
                    alias_count += 1
                except ValueError:
                    # add_alias raises on uniqueness collision: an alias row
                    # already exists for (entity_type, raw_value, source_config).
                    # YAML is the source of truth, so if the existing row points
                    # at a different canonical_id, this is a YAML rename and we
                    # must REPOINT the existing row β€” NOT silently swallow it.
                    # Without this, stale-removal at the end of seed would then
                    # delete the row (its old key is no longer in
                    # yaml_alias_keys), causing total alias loss.
                    aliases_df = store.table("aliases")
                    mask = (
                        (aliases_df["raw_value"] == raw_value)
                        & (aliases_df["entity_type"] == entity_type)
                        & (aliases_df["status"] != "rejected")
                    )
                    if source_cfg is not None:
                        mask = mask & (aliases_df["source_config"] == source_cfg)
                    else:
                        mask = mask & aliases_df["source_config"].isna()
                    existing = aliases_df[mask]
                    if existing.empty:
                        # Collision came from the pending buffer (this run added
                        # the same key earlier). For same-canonical re-adds this
                        # is a no-op; for different-canonical we must mutate the
                        # pending dict in place so the rename isn't lost on
                        # flush. _alias_index points at the same dict, so
                        # updating it here keeps the index consistent.
                        for p in queries._get_pending(store, "aliases"):
                            if (p.get("entity_type") == entity_type
                                    and p.get("raw_value") == raw_value
                                    and queries._source_config_key(p.get("source_config")) == queries._source_config_key(source_cfg)
                                    and p.get("status") != "rejected"):
                                if p["canonical_id"] != canonical_id:
                                    prev = p["canonical_id"]
                                    p["canonical_id"] = canonical_id
                                    p["source_field"] = "seed"
                                    p["status"] = "confirmed"
                                    p["strategy"] = "seed"
                                    p["confidence"] = 1.0
                                    typer.echo(
                                        f"  [rename] alias {raw_value!r} ({entity_type}) "
                                        f"moved {prev!r} -> {canonical_id!r} (pending)"
                                    )
                                    alias_count += 1
                                break
                        continue
                    row = existing.iloc[0]
                    if row["canonical_id"] != canonical_id:
                        # Rename: repoint the existing row at the new canonical.
                        queries.update_alias(store, row["id"], {
                            "canonical_id": canonical_id,
                            "source_field": "seed",
                            "status": "confirmed",
                            "strategy": "seed",
                            "confidence": 1.0,
                        })
                        typer.echo(
                            f"  [rename] alias {raw_value!r} ({entity_type}) "
                            f"moved {row['canonical_id']!r} -> {canonical_id!r}"
                        )
                        alias_count += 1
                    # else: identical re-seed of an existing alias β€” no-op.

        seed_snapshot.append((table, entity_type, yaml_ids, yaml_alias_keys))
        typer.echo(f"  {label}: {len(items)}")

    # Flush all buffered upserts (entities + aliases) into their tables in a
    # single pd.concat per table. prune_stale below reads store.table(...)
    # directly, so this must happen before that block.
    queries.flush_pending(store)

    # Derive denormalized parent-walk caches now that all canonical_models
    # rows are present. `root_model_id` and `lineage_origin_org_id` are
    # computed from `parents` and need the full graph to be in place.
    lineage_counts = queries.derive_model_lineage_fields(store)
    typer.echo(
        f"  derived: root_model_id={lineage_counts['root_set']}, "
        f"lineage_origin_org_id={lineage_counts['lineage_set']}, "
        f"open_weights_inherited={lineage_counts['open_weights_inherited']}, "
        f"release_date_from_id={lineage_counts['release_date_derived_from_id']}"
    )

    removed_entities = 0
    removed_aliases = 0
    if prune_stale:
        # Remove seed-originated entities and aliases that are no longer in the YAML.
        # Only touches rows that were created by seed (strategy == "seed"), never
        # sync-created aliases or auto-draft entities.
        for table, entity_type, yaml_ids, yaml_alias_keys in seed_snapshot:
            # Remove stale seed aliases for this entity type.
            aliases_df = store.table("aliases")
            seed_mask = (aliases_df["strategy"] == "seed") & (aliases_df["entity_type"] == entity_type)
            if seed_mask.any():
                seed_aliases = aliases_df[seed_mask]
                stale_alias_mask = seed_mask.copy()
                for idx in seed_aliases.index:
                    row = seed_aliases.loc[idx]
                    sc = row.get("source_config")
                    if _is_na(sc):
                        sc = None
                    key = (row["raw_value"], row["entity_type"], row["canonical_id"], sc)
                    if key in yaml_alias_keys:
                        stale_alias_mask[idx] = False
                n_stale = stale_alias_mask.sum()
                if n_stale > 0:
                    store.set_table("aliases", aliases_df[~stale_alias_mask].reset_index(drop=True))
                    removed_aliases += int(n_stale)

            # Remove stale seed entities β€” only those with review_status "reviewed"
            # that came from seed and are no longer in the YAML.
            entity_df = store.table(table)
            if len(entity_df) > 0:
                stale = entity_df["id"].isin(yaml_ids)
                stale_entities = entity_df[~stale & (entity_df["review_status"] == "reviewed")]
                # Only remove if every alias for this entity is also seed-originated,
                # meaning it wasn't referenced by sync data.
                current_aliases = store.table("aliases")
                for eid in stale_entities["id"]:
                    entity_aliases = current_aliases[
                        (current_aliases["canonical_id"] == eid)
                        & (current_aliases["entity_type"] == entity_type)
                    ]
                    if len(entity_aliases) == 0 or (entity_aliases["strategy"] == "seed").all():
                        entity_df = entity_df[entity_df["id"] != eid]
                        # Also remove any remaining aliases pointing to it.
                        current_aliases = current_aliases[
                            ~((current_aliases["canonical_id"] == eid)
                              & (current_aliases["entity_type"] == entity_type))
                        ]
                        removed_entities += 1
                store.set_table(table, entity_df.reset_index(drop=True))
                store.set_table("aliases", current_aliases.reset_index(drop=True))

    typer.echo(f"  aliases: {alias_count} added, {removed_aliases} removed")
    if removed_entities:
        typer.echo(f"  stale entities removed: {removed_entities}")

    store.push_to_hub()
    typer.echo("Seed complete.")


# ------------------------------------------------------------------
# stats
# ------------------------------------------------------------------

@app.command()
def stats(
    local: bool = typer.Option(False, "--local", help="Read from fixtures/ instead of HF Hub"),
):
    """Print registry entity counts and pending review summary."""
    import os
    if local:
        os.environ["LOCAL_MODE"] = "true"

    store = _load_store()

    def _row(table):
        df = store.table(table)
        total = len(df)
        draft = int((df["review_status"] == "draft").sum()) if "review_status" in df.columns else 0
        return total, draft

    for label, table in [
        ("models    ", "canonical_models"),
        ("benchmarks", "canonical_benchmarks"),
        ("metrics   ", "canonical_metrics"),
        ("harnesses ", "eval_harnesses"),
    ]:
        total, draft = _row(table)
        typer.echo(f"  {label}  total={total}  draft={draft}")

    aliases_df = store.table("aliases")
    uncertain = int((aliases_df["status"] == "uncertain").sum()) if "status" in aliases_df.columns else 0
    typer.echo(f"\n  aliases        total={len(aliases_df)}  uncertain={uncertain}")
    typer.echo(f"  eval_results   total={len(store.table('eval_results'))}")
    typer.echo(f"  resolution_log total={len(store.table('resolution_log'))}")
    typer.echo(f"  sync_runs      total={len(store.table('sync_runs'))}")


# ------------------------------------------------------------------
# sync
# ------------------------------------------------------------------

@app.command()
def sync(
    config: Optional[str] = typer.Option(None, "--config", help="EEE config name"),
    all_configs: bool = typer.Option(False, "--all", help="Sync all EEE configs"),
    rerun: bool = typer.Option(False, "--rerun", help="Re-resolve all raw strings even if already aliased"),
    local: bool = typer.Option(False, "--local"),
):
    """
    Batch sync EEE config(s) β†’ writes resolved results to eval_results table.
    Each result row is one (model Γ— benchmark Γ— metric) combination with resolved canonical IDs.
    """
    import os
    if local:
        os.environ["LOCAL_MODE"] = "true"

    if not config and not all_configs:
        typer.echo("Specify --config <name> or --all", err=True)
        raise typer.Exit(1)

    from eval_card_registry.services.ingestion import run_sync
    import datasets as ds_lib

    store = _load_store()

    configs_to_run: list[str] = []
    if all_configs:
        configs_to_run = ds_lib.get_dataset_config_names("evaleval/EEE_datastore")
    else:
        configs_to_run = [config]

    failed = []
    for cfg in configs_to_run:
        typer.echo(f"Syncing {cfg}...")
        try:
            counts = run_sync(cfg, store, rerun=rerun)
            typer.echo(f"  {cfg}: {counts}")
        except Exception as e:
            typer.echo(f"  {cfg}: FAILED β€” {e}", err=True)
            failed.append(cfg)

    typer.echo("Persisting tables...")
    store.push_to_hub()

    if failed:
        typer.echo(f"Done with {len(failed)} failed config(s): {', '.join(failed)}")
    else:
        typer.echo("Done.")