Spaces:
Sleeping
Sleeping
| """ | |
| eval-card-registry CLI. | |
| Commands: | |
| seed Load known entities from seed/ YAML files | |
| stats Print registry summary | |
| sync Batch sync one or all EEE configs → eval_results table | |
| """ | |
| import json | |
| from pathlib import Path | |
| from typing import Optional | |
| import typer | |
| import yaml | |
| def _json_encode_if_needed(value): | |
| """Encode lists/dicts as JSON strings; pass through anything else. | |
| seed/models.yaml uses YAML-native lists for `tags` (e.g. `["open-weight"]`) | |
| while seed/benchmarks.yaml stores them pre-encoded as strings (e.g. | |
| `'["instruction-following"]'`). The canonical_* parquet columns are all | |
| VARCHAR, so we coerce on the way in to keep both formats supported. | |
| """ | |
| if isinstance(value, (list, dict)): | |
| return json.dumps(value) | |
| return value | |
| def _legacy_parent_model_id_to_parents(entry: dict) -> None: | |
| """Translate a legacy `parent_model_id: X` field to the typed `parents` | |
| list shape. Mutates the entry in place. | |
| Legacy core.yaml / sources/*.generated.yaml use a single scalar | |
| `parent_model_id` to express a family/variant relationship (e.g. | |
| Llama-3-8B → Llama-3). The new schema replaces this with a typed list | |
| of parent edges. This shim converts on load so existing YAML keeps | |
| working until each file is migrated to emit `parents` natively. | |
| No-op when `parents` is already present (new shape wins) or when neither | |
| field is set. | |
| """ | |
| if "parents" in entry and entry["parents"] is not None: | |
| entry.pop("parent_model_id", None) | |
| return | |
| legacy = entry.pop("parent_model_id", None) | |
| if legacy: | |
| entry["parents"] = [{"id": legacy, "relationship": "variant", "axis": "size"}] | |
| from eval_card_registry.store.hf_store import get_store | |
| from eval_card_registry.store import queries, schemas | |
| from eval_card_registry.store.queries import _is_na | |
| app = typer.Typer(help="eval-card-registry CLI") | |
| def _load_store(): | |
| store = get_store() | |
| if not store.loaded: | |
| store.load() | |
| return store | |
| # ------------------------------------------------------------------ | |
| # seed | |
| # ------------------------------------------------------------------ | |
| def seed( | |
| local: bool = typer.Option(False, "--local", help="Write to fixtures/ instead of HF Hub"), | |
| seed_dir: str = typer.Option("./seed", "--seed-dir"), | |
| prune_stale: bool = typer.Option( | |
| False, | |
| "--prune-stale/--no-prune-stale", | |
| help="Remove reviewed seed entities and seed aliases absent from the current YAML snapshot.", | |
| ), | |
| ): | |
| """Load known canonical entities from seed YAML files.""" | |
| import os | |
| if local: | |
| os.environ["LOCAL_MODE"] = "true" | |
| store = _load_store() | |
| seed_path = Path(seed_dir) | |
| # ------------------------------------------------------------------ | |
| # Models — three-layer load from seed/models/: | |
| # sources/*.generated.yaml → external catalog data (e.g. models.dev), | |
| # flat lists, never hand-edited | |
| # core.yaml → curated canonicals (the source of truth), | |
| # flat list OR {skip_ids, entries} dict | |
| # enrichments/aliases.yaml → optional alias-only entries ({id, aliases}) | |
| # that union onto whatever exists | |
| # | |
| # Merge order: sources → core → enrichments. Field-level merge per entry | |
| # (aliases / tags UNION; other scalars prefer non-empty, last-write-wins). | |
| # `skip_ids` from core drops generated entries we don't want. | |
| # ------------------------------------------------------------------ | |
| def _load_models_merged() -> list[dict]: | |
| models_dir = seed_path / "models" | |
| sources_dir = models_dir / "sources" | |
| core_file = models_dir / "core.yaml" | |
| enrichments_file = models_dir / "enrichments" / "aliases.yaml" | |
| source_entries: list[dict] = [] | |
| core_entries: list[dict] = [] | |
| enrichment_entries: list[dict] = [] | |
| skip_ids: set[str] = set() | |
| if sources_dir.is_dir(): | |
| for src_path in sorted(sources_dir.glob("*.generated.yaml")): | |
| with open(src_path) as f: | |
| loaded = yaml.safe_load(f) or [] | |
| if not isinstance(loaded, list): | |
| raise typer.BadParameter(f"{src_path} must be a flat list") | |
| source_entries.extend(loaded) | |
| skip_source_ids: set[str] = set() | |
| if core_file.exists(): | |
| with open(core_file) as f: | |
| loaded = yaml.safe_load(f) or {} | |
| if isinstance(loaded, list): | |
| core_entries = loaded | |
| elif isinstance(loaded, dict): | |
| core_entries = loaded.get("entries", []) or [] | |
| skip_ids = set(loaded.get("skip_ids", []) or []) | |
| # `skip_source_ids` drops these ids from sources/enrichments only, | |
| # leaving core entries authoritative. Used when models.dev (or any | |
| # auto-generated source) ships bad aliases for a model that core.yaml | |
| # curates correctly — otherwise the loader's UNION-merge would | |
| # re-introduce the bad aliases on every refresh. | |
| skip_source_ids = set(loaded.get("skip_source_ids", []) or []) | |
| else: | |
| raise typer.BadParameter(f"{core_file} unexpected shape {type(loaded)}") | |
| if enrichments_file.exists(): | |
| with open(enrichments_file) as f: | |
| loaded = yaml.safe_load(f) or [] | |
| if not isinstance(loaded, list): | |
| raise typer.BadParameter(f"{enrichments_file} must be a flat list") | |
| enrichment_entries = loaded | |
| def _merge_into(target: dict, src: dict) -> dict: | |
| """Merge two entries with the same canonical_id. | |
| Field-level merge policy: | |
| - `aliases`: UNION (case-insensitive dedup). | |
| - `tags`: UNION (case-insensitive dedup). Both YAML-list and | |
| JSON-encoded-string forms supported. Protects against session | |
| additions overwriting `[open-weight, moe]` with `[open-weight]`. | |
| - Other scalars: prefer non-empty across the pair; when both | |
| sides have a non-empty value, last-write-wins. Protects against | |
| session-batch entries that omit `architecture` / | |
| `params_billions` from silently overwriting earlier rich entries. | |
| "Empty" means: None, "", [], {}, or default-looking '{}' / '[]'. | |
| """ | |
| import json as _json | |
| existing_aliases = list(target.get("aliases") or []) | |
| existing_lc = {a.lower() for a in existing_aliases if a} | |
| new_aliases = list(src.get("aliases") or []) | |
| for a in new_aliases: | |
| if a and a.lower() not in existing_lc: | |
| existing_aliases.append(a) | |
| existing_lc.add(a.lower()) | |
| def _decode_list_field(v): | |
| """tags / metadata may be either YAML-list or JSON-encoded | |
| string. Return a list (best-effort) and a boolean indicating | |
| whether to re-encode on write.""" | |
| if v is None: | |
| return [], False | |
| if isinstance(v, list): | |
| return list(v), False | |
| if isinstance(v, str): | |
| s = v.strip() | |
| if not s or s in ("[]", "null"): | |
| return [], True | |
| try: | |
| d = _json.loads(s) | |
| if isinstance(d, list): | |
| return list(d), True | |
| except (ValueError, TypeError): | |
| pass | |
| return [v], False | |
| # Union tags (handles both list and JSON-string formats) | |
| tgt_tags, tgt_was_json = _decode_list_field(target.get("tags")) | |
| src_tags, src_was_json = _decode_list_field(src.get("tags")) | |
| seen_tags_lc = {str(t).lower() for t in tgt_tags} | |
| for t in src_tags: | |
| if t is not None and str(t).lower() not in seen_tags_lc: | |
| tgt_tags.append(t) | |
| seen_tags_lc.add(str(t).lower()) | |
| # Re-encode if either source was a JSON string (the parquet column | |
| # is VARCHAR; _json_encode_if_needed downstream handles either). | |
| tags_merged = _json.dumps(tgt_tags) if (tgt_was_json or src_was_json) else tgt_tags | |
| def _is_empty(v) -> bool: | |
| if v is None: | |
| return True | |
| if isinstance(v, (list, dict)) and len(v) == 0: | |
| return True | |
| if isinstance(v, str) and v.strip() in ("", "[]", "{}"): | |
| return True | |
| return False | |
| # Union `parents` by id. For an edge present in both, field-merge | |
| # within the edge so a later source can fill in `axis` (or correct | |
| # `relationship`) without duplicating the edge. Edges from the | |
| # target preserve their order; new edges from src are appended. | |
| tgt_parents, tgt_p_was_json = _decode_list_field(target.get("parents")) | |
| src_parents, src_p_was_json = _decode_list_field(src.get("parents")) | |
| parents_by_id: dict[str, dict] = {} | |
| parents_order: list[str] = [] | |
| for p in tgt_parents: | |
| if isinstance(p, dict) and p.get("id"): | |
| pid = p["id"] | |
| if pid not in parents_by_id: | |
| parents_order.append(pid) | |
| parents_by_id[pid] = dict(p) | |
| for p in src_parents: | |
| if not isinstance(p, dict) or not p.get("id"): | |
| continue | |
| pid = p["id"] | |
| if pid in parents_by_id: | |
| merged_edge = dict(parents_by_id[pid]) | |
| for k, v in p.items(): | |
| if _is_empty(v): | |
| continue | |
| merged_edge[k] = v | |
| parents_by_id[pid] = merged_edge | |
| else: | |
| parents_order.append(pid) | |
| parents_by_id[pid] = dict(p) | |
| parents_list = [parents_by_id[pid] for pid in parents_order] | |
| parents_merged = ( | |
| _json.dumps(parents_list) | |
| if (tgt_p_was_json or src_p_was_json) | |
| else parents_list | |
| ) | |
| merged = dict(target) | |
| for k, v in src.items(): | |
| if k in ("aliases", "tags", "parents"): | |
| continue # handled separately | |
| if _is_empty(v): | |
| continue | |
| merged[k] = v | |
| merged["aliases"] = existing_aliases | |
| merged["tags"] = tags_merged | |
| # Only emit `parents` if at least one side had any (avoids creating | |
| # a spurious empty list on entries that never had a parents field). | |
| if tgt_parents or src_parents: | |
| merged["parents"] = parents_merged | |
| return merged | |
| by_id: dict[str, dict] = {} | |
| def _absorb(entries: list[dict], extra_skip: set[str] = frozenset()) -> None: | |
| drop = skip_ids | extra_skip | |
| for e in entries: | |
| if "id" not in e: | |
| raise typer.BadParameter(f"models seed entry missing id: {e!r}") | |
| if e["id"] in drop: | |
| continue | |
| # Translate legacy `parent_model_id` scalar to the typed | |
| # `parents` list before any merge / column-filter step. | |
| _legacy_parent_model_id_to_parents(e) | |
| if e["id"] in by_id: | |
| by_id[e["id"]] = _merge_into(by_id[e["id"]], e) | |
| else: | |
| by_id[e["id"]] = e | |
| # Sources/enrichments respect both skip_ids and skip_source_ids; | |
| # core entries respect only skip_ids so curated overrides always apply. | |
| _absorb(source_entries, extra_skip=skip_source_ids) | |
| _absorb(core_entries) | |
| _absorb(enrichment_entries, extra_skip=skip_source_ids) | |
| return list(by_id.values()) | |
| # ------------------------------------------------------------------ | |
| # Benchmarks — two-source load: | |
| # seed/benchmarks.yaml → curated canonicals (the | |
| # source of truth, hand-edited) | |
| # seed/benchmarks_generated/*.yaml → bulk auto-generated entries | |
| # (e.g. AIR-Bench 2024's 373 | |
| # categories from | |
| # scripts/refresh_air_bench_taxonomy.py) | |
| # | |
| # Merge order: generated → curated. Field-level merge per id (aliases | |
| # union; other scalars prefer non-empty, last-write-wins) so curated | |
| # entries can refine an auto-generated row without losing its aliases. | |
| # Generator scripts must use stable canonical_ids so refreshes are | |
| # idempotent. | |
| # ------------------------------------------------------------------ | |
| def _load_benchmarks_merged() -> list[dict]: | |
| curated_path = seed_path / "benchmarks.yaml" | |
| generated_dir = seed_path / "benchmarks_generated" | |
| generated_entries: list[dict] = [] | |
| if generated_dir.is_dir(): | |
| for src_path in sorted(generated_dir.glob("*.yaml")): | |
| with open(src_path) as f: | |
| loaded = yaml.safe_load(f) or [] | |
| if not isinstance(loaded, list): | |
| raise typer.BadParameter(f"{src_path} must be a flat list") | |
| generated_entries.extend(loaded) | |
| curated_entries: list[dict] = [] | |
| if curated_path.exists(): | |
| with open(curated_path) as f: | |
| loaded = yaml.safe_load(f) or [] | |
| if not isinstance(loaded, list): | |
| raise typer.BadParameter(f"{curated_path} must be a flat list") | |
| curated_entries = loaded | |
| def _merge_benchmark(generated: dict, curated: dict) -> dict: | |
| """Curated wins on every field it specifies; aliases are | |
| unioned (case-insensitive dedup) so generator-emitted aliases | |
| survive even when curated narrows the entry.""" | |
| merged = dict(generated) | |
| for k, v in curated.items(): | |
| if k == "aliases": | |
| continue | |
| merged[k] = v | |
| existing = list(generated.get("aliases") or []) | |
| existing_lc = {a.lower() for a in existing if a} | |
| for a in (curated.get("aliases") or []): | |
| if a and a.lower() not in existing_lc: | |
| existing.append(a) | |
| existing_lc.add(a.lower()) | |
| merged["aliases"] = existing | |
| return merged | |
| by_id: dict[str, dict] = {} | |
| for entry in generated_entries: | |
| if "id" not in entry: | |
| raise typer.BadParameter(f"benchmarks generated entry missing id: {entry!r}") | |
| by_id[entry["id"]] = entry | |
| for entry in curated_entries: | |
| if "id" not in entry: | |
| raise typer.BadParameter(f"benchmarks seed entry missing id: {entry!r}") | |
| if entry["id"] in by_id: | |
| by_id[entry["id"]] = _merge_benchmark(by_id[entry["id"]], entry) | |
| else: | |
| by_id[entry["id"]] = entry | |
| return list(by_id.values()) | |
| # ------------------------------------------------------------------ | |
| # Families — translate seed/families.yaml's nested {slug: {fields}} | |
| # shape into flat dicts ready for upsert. The YAML uses the slug as | |
| # the mapping key for human friendliness (`mmlu:` reads as a header); | |
| # the table needs `id` as a column. | |
| # | |
| # Output schema mirrors `canonical_families`: list-valued fields | |
| # (`benchmark_ids`, `folder_aliases`, `composite_keys`) are | |
| # JSON-encoded so they round-trip through the parquet StringDtype | |
| # column without losing structure. | |
| # ------------------------------------------------------------------ | |
| def _load_families_seed() -> list[dict]: | |
| path = seed_path / "families.yaml" | |
| if not path.exists(): | |
| return [] | |
| with open(path) as f: | |
| raw = yaml.safe_load(f) or {} | |
| if not isinstance(raw, dict): | |
| raise typer.BadParameter(f"{path} must be a top-level mapping {{slug: {{...}}}}") | |
| out: list[dict] = [] | |
| # Validation: each benchmark may only appear in one curated family. | |
| seen_benchmarks: dict[str, str] = {} | |
| for slug, fields in raw.items(): | |
| if not isinstance(fields, dict): | |
| raise typer.BadParameter(f"family {slug!r} entry must be a mapping, got {type(fields).__name__}") | |
| benchmark_ids = list(fields.get("benchmarks") or []) | |
| for bid in benchmark_ids: | |
| if bid in seen_benchmarks and seen_benchmarks[bid] != slug: | |
| raise typer.BadParameter( | |
| f"benchmark {bid!r} listed in two families: " | |
| f"{seen_benchmarks[bid]!r} and {slug!r}" | |
| ) | |
| seen_benchmarks[bid] = slug | |
| entry = { | |
| "id": slug, | |
| "display_name": fields.get("display") or slug, | |
| "category": fields.get("category"), | |
| "benchmark_ids": benchmark_ids, | |
| "primary_benchmark_key": fields.get("primary_benchmark_key"), | |
| "folder_aliases": list(fields.get("folder_aliases") or []), | |
| "composite_keys": list(fields.get("composite_keys") or []), | |
| "tags": fields.get("tags") or [], | |
| "metadata": fields.get("metadata") or {}, | |
| "review_status": fields.get("review_status") or "reviewed", | |
| } | |
| out.append(entry) | |
| return out | |
| # ------------------------------------------------------------------ | |
| # Composites — same translation as families. YAML shape: | |
| # {slug: {display, configs: [...], category?, family_id?}} | |
| # ------------------------------------------------------------------ | |
| def _load_composites_seed() -> list[dict]: | |
| path = seed_path / "composites.yaml" | |
| if not path.exists(): | |
| return [] | |
| with open(path) as f: | |
| raw = yaml.safe_load(f) or {} | |
| if not isinstance(raw, dict): | |
| raise typer.BadParameter(f"{path} must be a top-level mapping {{slug: {{...}}}}") | |
| out: list[dict] = [] | |
| for slug, fields in raw.items(): | |
| if not isinstance(fields, dict): | |
| raise typer.BadParameter(f"composite {slug!r} entry must be a mapping, got {type(fields).__name__}") | |
| raw_configs = fields.get("configs") | |
| if raw_configs is None: | |
| # Display-only override (no explicit `configs:`): implicit | |
| # single source_config equal to the slug. Some upstream | |
| # EEE folders are kebab (`arc-agi`), others snake | |
| # (`helm_classic`); ship both forms so the producer's | |
| # composite_config_map JOIN matches whichever the data | |
| # uses. De-dup when slug has no `-`. | |
| kebab = slug | |
| snake = slug.replace("-", "_") | |
| source_configs = [kebab] if kebab == snake else [kebab, snake] | |
| else: | |
| source_configs = [str(c) for c in raw_configs] | |
| entry = { | |
| "id": slug, | |
| "display_name": fields.get("display") or slug, | |
| "category": fields.get("category"), | |
| "source_configs": source_configs, | |
| "family_id": fields.get("family_id"), | |
| "tags": fields.get("tags") or [], | |
| "metadata": fields.get("metadata") or {}, | |
| "review_status": fields.get("review_status") or "reviewed", | |
| } | |
| out.append(entry) | |
| return out | |
| # ------------------------------------------------------------------ | |
| # Orgs — two-file load: | |
| # seed/orgs.yaml → curated first-party labs (the source | |
| # of truth, hand-edited) | |
| # seed/orgs.generated.yaml → auto-created orgs from hub-stats refresh | |
| # (HF authors that aren't curated labs) | |
| # | |
| # Curated wins on id collision. Unlike the models merge (field-level), | |
| # orgs use a simple "drop generated entry if id is in curated" policy: | |
| # curated entries are deliberate and richer; auto-created entries are | |
| # thin (just id, display_name, kind=unknown), so a partial overlay | |
| # would never improve the curated record. | |
| # ------------------------------------------------------------------ | |
| def _load_orgs_merged() -> list[dict]: | |
| curated_path = seed_path / "orgs.yaml" | |
| generated_path = seed_path / "orgs.generated.yaml" | |
| curated: list[dict] = [] | |
| if curated_path.exists(): | |
| with open(curated_path) as f: | |
| loaded = yaml.safe_load(f) or [] | |
| if not isinstance(loaded, list): | |
| raise typer.BadParameter(f"{curated_path} must be a flat list") | |
| curated = loaded | |
| generated: list[dict] = [] | |
| if generated_path.exists(): | |
| with open(generated_path) as f: | |
| loaded = yaml.safe_load(f) or [] | |
| if not isinstance(loaded, list): | |
| raise typer.BadParameter(f"{generated_path} must be a flat list") | |
| generated = loaded | |
| curated_ids = {e["id"] for e in curated if "id" in e} | |
| out = list(curated) | |
| for e in generated: | |
| if "id" not in e: | |
| raise typer.BadParameter(f"orgs.generated.yaml entry missing id: {e!r}") | |
| if e["id"] not in curated_ids: | |
| out.append(e) | |
| return out | |
| # table name, yaml file, label, entity_type (for alias creation) | |
| seed_specs = [ | |
| # Orgs: load via merge helper to combine curated + auto-generated. | |
| ("canonical_orgs", "__merged_orgs__", "orgs", "org"), | |
| # Benchmarks: load via merge helper. Curated entries live in | |
| # seed/benchmarks.yaml; bulk-generated entries (e.g. AIR-Bench | |
| # 2024's 373 categories from the refresh script) live in | |
| # seed/benchmarks_generated/*.yaml. Sentinel path triggers the | |
| # _load_benchmarks_merged() helper. | |
| ("canonical_benchmarks", "__merged_benchmarks__", "benchmarks", "benchmark"), | |
| ("canonical_metrics", seed_path / "metrics.yaml", "metrics", "metric"), | |
| ("eval_harnesses", seed_path / "harnesses.yaml", "harnesses", "harness"), | |
| # Families & composites are first-class registry entities since | |
| # the hierarchy-alignment work (notes/hierarchy-alignment.md | |
| # §4 / §7 Step 2). Their YAML uses {slug: {...}} shape, so we | |
| # need translation loaders rather than the flat-list path. | |
| # entity_type='family'/'composite' aliases are emitted for | |
| # consistency but aren't consulted by the resolver today. | |
| ("canonical_families", "__nested_families__", "families", "family"), | |
| ("canonical_composites", "__nested_composites__", "composites", "composite"), | |
| # Models: load via the merge helper; pass a sentinel path that | |
| # signals the loop below to invoke _load_models_merged() instead of | |
| # reading a single YAML file. | |
| ("canonical_models", "__merged_models__", "models", "model"), | |
| ] | |
| alias_count = 0 | |
| # Track all seed entity IDs and alias keys so we can remove stale ones. | |
| # Alias key: (raw_value, entity_type, canonical_id, source_config) | |
| seed_snapshot: list[tuple[str, str, set[str], set[tuple[str, str, str, Optional[str]]]]] = [] | |
| # Build the alias index once so add_alias collision checks are O(1) instead | |
| # of O(N) DataFrame mask scans. Combined with buffered=True below, this | |
| # avoids the O(N²) pd.concat-per-row cost on ~1k entities + ~13k aliases. | |
| queries._rebuild_alias_index(store) | |
| for table, yaml_file, label, entity_type in seed_specs: | |
| table_columns = set(schemas.empty(table).columns) | |
| if yaml_file == "__merged_models__": | |
| items = _load_models_merged() | |
| if not items: | |
| typer.echo(f" [skip] no model entries found in seed/models.yaml or _overrides/") | |
| continue | |
| elif yaml_file == "__merged_orgs__": | |
| items = _load_orgs_merged() | |
| if not items: | |
| typer.echo(f" [skip] no org entries found in seed/orgs.yaml or seed/orgs.generated.yaml") | |
| continue | |
| elif yaml_file == "__merged_benchmarks__": | |
| items = _load_benchmarks_merged() | |
| if not items: | |
| typer.echo(f" [skip] no benchmark entries found in seed/benchmarks.yaml or seed/benchmarks_generated/") | |
| continue | |
| elif yaml_file == "__nested_families__": | |
| items = _load_families_seed() | |
| if not items: | |
| typer.echo(f" [skip] no family entries found in seed/families.yaml") | |
| continue | |
| elif yaml_file == "__nested_composites__": | |
| items = _load_composites_seed() | |
| if not items: | |
| typer.echo(f" [skip] no composite entries found in seed/composites.yaml") | |
| continue | |
| else: | |
| if not yaml_file.exists(): | |
| typer.echo(f" [skip] {yaml_file} not found") | |
| continue | |
| with open(yaml_file) as f: | |
| items = yaml.safe_load(f) or [] | |
| yaml_ids: set[str] = set() | |
| yaml_alias_keys: set[tuple[str, str, str, Optional[str]]] = set() | |
| for original_item in items: | |
| item = dict(original_item) | |
| # Pop 'aliases' / 'scoped_aliases' before upserting — not table columns. | |
| extra_aliases = item.pop("aliases", []) or [] | |
| scoped_aliases = item.pop("scoped_aliases", {}) or {} | |
| # Normalize list/dict columns: YAML may have native lists/dicts, | |
| # but the canonical_* parquet columns are VARCHAR, so encode if | |
| # needed. `parents` is a list-of-edges on canonical_models. | |
| # `benchmark_ids` / `folder_aliases` / `composite_keys` are | |
| # list-valued on canonical_families. `source_configs` is | |
| # list-valued on canonical_composites. | |
| for col in ( | |
| "tags", "metadata", "parents", | |
| "input_modalities", "output_modalities", | |
| "benchmark_ids", "folder_aliases", "composite_keys", | |
| "source_configs", | |
| ): | |
| if col in item: | |
| item[col] = _json_encode_if_needed(item[col]) | |
| entity_item = {k: v for k, v in item.items() if k in table_columns} | |
| unknown_keys = sorted(set(item.keys()) - table_columns) | |
| if unknown_keys: | |
| typer.echo( | |
| f" [warn] {label} entry {item.get('id', '?')!r} has unknown " | |
| f"key(s) {unknown_keys} — silently dropped. Check for typos." | |
| ) | |
| if "id" not in entity_item: | |
| raise typer.BadParameter(f"{label} seed entry is missing required id: {original_item!r}") | |
| queries.upsert_entity(store, table, entity_item, buffered=True) | |
| canonical_id = entity_item["id"] | |
| display_name = entity_item.get("display_name", "") | |
| yaml_ids.add(canonical_id) | |
| # Global aliases (source_config=None): matched regardless of caller's source_config. | |
| # Scoped aliases (source_config=<name>): matched only when the caller passes that | |
| # source_config — lets short tokens ("Overall", "Arabic") map to different | |
| # benchmarks depending on which EEE config they came from. | |
| global_aliases = {canonical_id, display_name} | set(extra_aliases) | |
| alias_specs: list[tuple[str, Optional[str]]] = [ | |
| (raw, None) for raw in global_aliases if raw | |
| ] | |
| for source_cfg, raw_values in scoped_aliases.items(): | |
| for raw in raw_values or []: | |
| if raw: | |
| alias_specs.append((raw, source_cfg)) | |
| for raw_value, source_cfg in alias_specs: | |
| # Index stale-removal by (raw_value, entity_type, canonical_id, source_config) | |
| yaml_alias_keys.add((raw_value, entity_type, canonical_id, source_cfg)) | |
| try: | |
| queries.add_alias(store, { | |
| "raw_value": raw_value, | |
| "entity_type": entity_type, | |
| "canonical_id": canonical_id, | |
| "source_config": source_cfg, | |
| "source_field": "seed", | |
| "status": "confirmed", | |
| "strategy": "seed", | |
| "confidence": 1.0, | |
| "notes": None, | |
| }, buffered=True) | |
| alias_count += 1 | |
| except ValueError: | |
| # add_alias raises on uniqueness collision: an alias row | |
| # already exists for (entity_type, raw_value, source_config). | |
| # YAML is the source of truth, so if the existing row points | |
| # at a different canonical_id, this is a YAML rename and we | |
| # must REPOINT the existing row — NOT silently swallow it. | |
| # Without this, stale-removal at the end of seed would then | |
| # delete the row (its old key is no longer in | |
| # yaml_alias_keys), causing total alias loss. | |
| aliases_df = store.table("aliases") | |
| mask = ( | |
| (aliases_df["raw_value"] == raw_value) | |
| & (aliases_df["entity_type"] == entity_type) | |
| & (aliases_df["status"] != "rejected") | |
| ) | |
| if source_cfg is not None: | |
| mask = mask & (aliases_df["source_config"] == source_cfg) | |
| else: | |
| mask = mask & aliases_df["source_config"].isna() | |
| existing = aliases_df[mask] | |
| if existing.empty: | |
| # Collision came from the pending buffer (this run added | |
| # the same key earlier). For same-canonical re-adds this | |
| # is a no-op; for different-canonical we must mutate the | |
| # pending dict in place so the rename isn't lost on | |
| # flush. _alias_index points at the same dict, so | |
| # updating it here keeps the index consistent. | |
| for p in queries._get_pending(store, "aliases"): | |
| if (p.get("entity_type") == entity_type | |
| and p.get("raw_value") == raw_value | |
| and queries._source_config_key(p.get("source_config")) == queries._source_config_key(source_cfg) | |
| and p.get("status") != "rejected"): | |
| if p["canonical_id"] != canonical_id: | |
| prev = p["canonical_id"] | |
| p["canonical_id"] = canonical_id | |
| p["source_field"] = "seed" | |
| p["status"] = "confirmed" | |
| p["strategy"] = "seed" | |
| p["confidence"] = 1.0 | |
| typer.echo( | |
| f" [rename] alias {raw_value!r} ({entity_type}) " | |
| f"moved {prev!r} -> {canonical_id!r} (pending)" | |
| ) | |
| alias_count += 1 | |
| break | |
| continue | |
| row = existing.iloc[0] | |
| if row["canonical_id"] != canonical_id: | |
| # Rename: repoint the existing row at the new canonical. | |
| queries.update_alias(store, row["id"], { | |
| "canonical_id": canonical_id, | |
| "source_field": "seed", | |
| "status": "confirmed", | |
| "strategy": "seed", | |
| "confidence": 1.0, | |
| }) | |
| typer.echo( | |
| f" [rename] alias {raw_value!r} ({entity_type}) " | |
| f"moved {row['canonical_id']!r} -> {canonical_id!r}" | |
| ) | |
| alias_count += 1 | |
| # else: identical re-seed of an existing alias — no-op. | |
| seed_snapshot.append((table, entity_type, yaml_ids, yaml_alias_keys)) | |
| typer.echo(f" {label}: {len(items)}") | |
| # Flush all buffered upserts (entities + aliases) into their tables in a | |
| # single pd.concat per table. prune_stale below reads store.table(...) | |
| # directly, so this must happen before that block. | |
| queries.flush_pending(store) | |
| # Derive denormalized parent-walk caches now that all canonical_models | |
| # rows are present. `root_model_id` and `lineage_origin_org_id` are | |
| # computed from `parents` and need the full graph to be in place. | |
| lineage_counts = queries.derive_model_lineage_fields(store) | |
| typer.echo( | |
| f" derived: root_model_id={lineage_counts['root_set']}, " | |
| f"lineage_origin_org_id={lineage_counts['lineage_set']}, " | |
| f"open_weights_inherited={lineage_counts['open_weights_inherited']}, " | |
| f"release_date_from_id={lineage_counts['release_date_derived_from_id']}" | |
| ) | |
| removed_entities = 0 | |
| removed_aliases = 0 | |
| if prune_stale: | |
| # Remove seed-originated entities and aliases that are no longer in the YAML. | |
| # Only touches rows that were created by seed (strategy == "seed"), never | |
| # sync-created aliases or auto-draft entities. | |
| for table, entity_type, yaml_ids, yaml_alias_keys in seed_snapshot: | |
| # Remove stale seed aliases for this entity type. | |
| aliases_df = store.table("aliases") | |
| seed_mask = (aliases_df["strategy"] == "seed") & (aliases_df["entity_type"] == entity_type) | |
| if seed_mask.any(): | |
| seed_aliases = aliases_df[seed_mask] | |
| stale_alias_mask = seed_mask.copy() | |
| for idx in seed_aliases.index: | |
| row = seed_aliases.loc[idx] | |
| sc = row.get("source_config") | |
| if _is_na(sc): | |
| sc = None | |
| key = (row["raw_value"], row["entity_type"], row["canonical_id"], sc) | |
| if key in yaml_alias_keys: | |
| stale_alias_mask[idx] = False | |
| n_stale = stale_alias_mask.sum() | |
| if n_stale > 0: | |
| store.set_table("aliases", aliases_df[~stale_alias_mask].reset_index(drop=True)) | |
| removed_aliases += int(n_stale) | |
| # Remove stale seed entities — only those with review_status "reviewed" | |
| # that came from seed and are no longer in the YAML. | |
| entity_df = store.table(table) | |
| if len(entity_df) > 0: | |
| stale = entity_df["id"].isin(yaml_ids) | |
| stale_entities = entity_df[~stale & (entity_df["review_status"] == "reviewed")] | |
| # Only remove if every alias for this entity is also seed-originated, | |
| # meaning it wasn't referenced by sync data. | |
| current_aliases = store.table("aliases") | |
| for eid in stale_entities["id"]: | |
| entity_aliases = current_aliases[ | |
| (current_aliases["canonical_id"] == eid) | |
| & (current_aliases["entity_type"] == entity_type) | |
| ] | |
| if len(entity_aliases) == 0 or (entity_aliases["strategy"] == "seed").all(): | |
| entity_df = entity_df[entity_df["id"] != eid] | |
| # Also remove any remaining aliases pointing to it. | |
| current_aliases = current_aliases[ | |
| ~((current_aliases["canonical_id"] == eid) | |
| & (current_aliases["entity_type"] == entity_type)) | |
| ] | |
| removed_entities += 1 | |
| store.set_table(table, entity_df.reset_index(drop=True)) | |
| store.set_table("aliases", current_aliases.reset_index(drop=True)) | |
| typer.echo(f" aliases: {alias_count} added, {removed_aliases} removed") | |
| if removed_entities: | |
| typer.echo(f" stale entities removed: {removed_entities}") | |
| store.push_to_hub() | |
| typer.echo("Seed complete.") | |
| # ------------------------------------------------------------------ | |
| # stats | |
| # ------------------------------------------------------------------ | |
| def stats( | |
| local: bool = typer.Option(False, "--local", help="Read from fixtures/ instead of HF Hub"), | |
| ): | |
| """Print registry entity counts and pending review summary.""" | |
| import os | |
| if local: | |
| os.environ["LOCAL_MODE"] = "true" | |
| store = _load_store() | |
| def _row(table): | |
| df = store.table(table) | |
| total = len(df) | |
| draft = int((df["review_status"] == "draft").sum()) if "review_status" in df.columns else 0 | |
| return total, draft | |
| for label, table in [ | |
| ("models ", "canonical_models"), | |
| ("benchmarks", "canonical_benchmarks"), | |
| ("metrics ", "canonical_metrics"), | |
| ("harnesses ", "eval_harnesses"), | |
| ]: | |
| total, draft = _row(table) | |
| typer.echo(f" {label} total={total} draft={draft}") | |
| aliases_df = store.table("aliases") | |
| uncertain = int((aliases_df["status"] == "uncertain").sum()) if "status" in aliases_df.columns else 0 | |
| typer.echo(f"\n aliases total={len(aliases_df)} uncertain={uncertain}") | |
| typer.echo(f" eval_results total={len(store.table('eval_results'))}") | |
| typer.echo(f" resolution_log total={len(store.table('resolution_log'))}") | |
| typer.echo(f" sync_runs total={len(store.table('sync_runs'))}") | |
| # ------------------------------------------------------------------ | |
| # sync | |
| # ------------------------------------------------------------------ | |
| def sync( | |
| config: Optional[str] = typer.Option(None, "--config", help="EEE config name"), | |
| all_configs: bool = typer.Option(False, "--all", help="Sync all EEE configs"), | |
| rerun: bool = typer.Option(False, "--rerun", help="Re-resolve all raw strings even if already aliased"), | |
| local: bool = typer.Option(False, "--local"), | |
| ): | |
| """ | |
| Batch sync EEE config(s) → writes resolved results to eval_results table. | |
| Each result row is one (model × benchmark × metric) combination with resolved canonical IDs. | |
| """ | |
| import os | |
| if local: | |
| os.environ["LOCAL_MODE"] = "true" | |
| if not config and not all_configs: | |
| typer.echo("Specify --config <name> or --all", err=True) | |
| raise typer.Exit(1) | |
| from eval_card_registry.services.ingestion import run_sync | |
| import datasets as ds_lib | |
| store = _load_store() | |
| configs_to_run: list[str] = [] | |
| if all_configs: | |
| configs_to_run = ds_lib.get_dataset_config_names("evaleval/EEE_datastore") | |
| else: | |
| configs_to_run = [config] | |
| failed = [] | |
| for cfg in configs_to_run: | |
| typer.echo(f"Syncing {cfg}...") | |
| try: | |
| counts = run_sync(cfg, store, rerun=rerun) | |
| typer.echo(f" {cfg}: {counts}") | |
| except Exception as e: | |
| typer.echo(f" {cfg}: FAILED — {e}", err=True) | |
| failed.append(cfg) | |
| typer.echo("Persisting tables...") | |
| store.push_to_hub() | |
| if failed: | |
| typer.echo(f"Done with {len(failed)} failed config(s): {', '.join(failed)}") | |
| else: | |
| typer.echo("Done.") | |