j-chim's picture
Upload folder using huggingface_hub
94d49c0 verified
"""
eval-card-registry CLI.
Commands:
seed Load known entities from seed/ YAML files
stats Print registry summary
sync Batch sync one or all EEE configs → eval_results table
"""
import json
from pathlib import Path
from typing import Optional
import typer
import yaml
def _json_encode_if_needed(value):
"""Encode lists/dicts as JSON strings; pass through anything else.
seed/models.yaml uses YAML-native lists for `tags` (e.g. `["open-weight"]`)
while seed/benchmarks.yaml stores them pre-encoded as strings (e.g.
`'["instruction-following"]'`). The canonical_* parquet columns are all
VARCHAR, so we coerce on the way in to keep both formats supported.
"""
if isinstance(value, (list, dict)):
return json.dumps(value)
return value
def _legacy_parent_model_id_to_parents(entry: dict) -> None:
"""Translate a legacy `parent_model_id: X` field to the typed `parents`
list shape. Mutates the entry in place.
Legacy core.yaml / sources/*.generated.yaml use a single scalar
`parent_model_id` to express a family/variant relationship (e.g.
Llama-3-8B → Llama-3). The new schema replaces this with a typed list
of parent edges. This shim converts on load so existing YAML keeps
working until each file is migrated to emit `parents` natively.
No-op when `parents` is already present (new shape wins) or when neither
field is set.
"""
if "parents" in entry and entry["parents"] is not None:
entry.pop("parent_model_id", None)
return
legacy = entry.pop("parent_model_id", None)
if legacy:
entry["parents"] = [{"id": legacy, "relationship": "variant", "axis": "size"}]
from eval_card_registry.store.hf_store import get_store
from eval_card_registry.store import queries, schemas
from eval_card_registry.store.queries import _is_na
app = typer.Typer(help="eval-card-registry CLI")
def _load_store():
store = get_store()
if not store.loaded:
store.load()
return store
# ------------------------------------------------------------------
# seed
# ------------------------------------------------------------------
@app.command()
def seed(
local: bool = typer.Option(False, "--local", help="Write to fixtures/ instead of HF Hub"),
seed_dir: str = typer.Option("./seed", "--seed-dir"),
prune_stale: bool = typer.Option(
False,
"--prune-stale/--no-prune-stale",
help="Remove reviewed seed entities and seed aliases absent from the current YAML snapshot.",
),
):
"""Load known canonical entities from seed YAML files."""
import os
if local:
os.environ["LOCAL_MODE"] = "true"
store = _load_store()
seed_path = Path(seed_dir)
# ------------------------------------------------------------------
# Models — three-layer load from seed/models/:
# sources/*.generated.yaml → external catalog data (e.g. models.dev),
# flat lists, never hand-edited
# core.yaml → curated canonicals (the source of truth),
# flat list OR {skip_ids, entries} dict
# enrichments/aliases.yaml → optional alias-only entries ({id, aliases})
# that union onto whatever exists
#
# Merge order: sources → core → enrichments. Field-level merge per entry
# (aliases / tags UNION; other scalars prefer non-empty, last-write-wins).
# `skip_ids` from core drops generated entries we don't want.
# ------------------------------------------------------------------
def _load_models_merged() -> list[dict]:
models_dir = seed_path / "models"
sources_dir = models_dir / "sources"
core_file = models_dir / "core.yaml"
enrichments_file = models_dir / "enrichments" / "aliases.yaml"
source_entries: list[dict] = []
core_entries: list[dict] = []
enrichment_entries: list[dict] = []
skip_ids: set[str] = set()
if sources_dir.is_dir():
for src_path in sorted(sources_dir.glob("*.generated.yaml")):
with open(src_path) as f:
loaded = yaml.safe_load(f) or []
if not isinstance(loaded, list):
raise typer.BadParameter(f"{src_path} must be a flat list")
source_entries.extend(loaded)
skip_source_ids: set[str] = set()
if core_file.exists():
with open(core_file) as f:
loaded = yaml.safe_load(f) or {}
if isinstance(loaded, list):
core_entries = loaded
elif isinstance(loaded, dict):
core_entries = loaded.get("entries", []) or []
skip_ids = set(loaded.get("skip_ids", []) or [])
# `skip_source_ids` drops these ids from sources/enrichments only,
# leaving core entries authoritative. Used when models.dev (or any
# auto-generated source) ships bad aliases for a model that core.yaml
# curates correctly — otherwise the loader's UNION-merge would
# re-introduce the bad aliases on every refresh.
skip_source_ids = set(loaded.get("skip_source_ids", []) or [])
else:
raise typer.BadParameter(f"{core_file} unexpected shape {type(loaded)}")
if enrichments_file.exists():
with open(enrichments_file) as f:
loaded = yaml.safe_load(f) or []
if not isinstance(loaded, list):
raise typer.BadParameter(f"{enrichments_file} must be a flat list")
enrichment_entries = loaded
def _merge_into(target: dict, src: dict) -> dict:
"""Merge two entries with the same canonical_id.
Field-level merge policy:
- `aliases`: UNION (case-insensitive dedup).
- `tags`: UNION (case-insensitive dedup). Both YAML-list and
JSON-encoded-string forms supported. Protects against session
additions overwriting `[open-weight, moe]` with `[open-weight]`.
- Other scalars: prefer non-empty across the pair; when both
sides have a non-empty value, last-write-wins. Protects against
session-batch entries that omit `architecture` /
`params_billions` from silently overwriting earlier rich entries.
"Empty" means: None, "", [], {}, or default-looking '{}' / '[]'.
"""
import json as _json
existing_aliases = list(target.get("aliases") or [])
existing_lc = {a.lower() for a in existing_aliases if a}
new_aliases = list(src.get("aliases") or [])
for a in new_aliases:
if a and a.lower() not in existing_lc:
existing_aliases.append(a)
existing_lc.add(a.lower())
def _decode_list_field(v):
"""tags / metadata may be either YAML-list or JSON-encoded
string. Return a list (best-effort) and a boolean indicating
whether to re-encode on write."""
if v is None:
return [], False
if isinstance(v, list):
return list(v), False
if isinstance(v, str):
s = v.strip()
if not s or s in ("[]", "null"):
return [], True
try:
d = _json.loads(s)
if isinstance(d, list):
return list(d), True
except (ValueError, TypeError):
pass
return [v], False
# Union tags (handles both list and JSON-string formats)
tgt_tags, tgt_was_json = _decode_list_field(target.get("tags"))
src_tags, src_was_json = _decode_list_field(src.get("tags"))
seen_tags_lc = {str(t).lower() for t in tgt_tags}
for t in src_tags:
if t is not None and str(t).lower() not in seen_tags_lc:
tgt_tags.append(t)
seen_tags_lc.add(str(t).lower())
# Re-encode if either source was a JSON string (the parquet column
# is VARCHAR; _json_encode_if_needed downstream handles either).
tags_merged = _json.dumps(tgt_tags) if (tgt_was_json or src_was_json) else tgt_tags
def _is_empty(v) -> bool:
if v is None:
return True
if isinstance(v, (list, dict)) and len(v) == 0:
return True
if isinstance(v, str) and v.strip() in ("", "[]", "{}"):
return True
return False
# Union `parents` by id. For an edge present in both, field-merge
# within the edge so a later source can fill in `axis` (or correct
# `relationship`) without duplicating the edge. Edges from the
# target preserve their order; new edges from src are appended.
tgt_parents, tgt_p_was_json = _decode_list_field(target.get("parents"))
src_parents, src_p_was_json = _decode_list_field(src.get("parents"))
parents_by_id: dict[str, dict] = {}
parents_order: list[str] = []
for p in tgt_parents:
if isinstance(p, dict) and p.get("id"):
pid = p["id"]
if pid not in parents_by_id:
parents_order.append(pid)
parents_by_id[pid] = dict(p)
for p in src_parents:
if not isinstance(p, dict) or not p.get("id"):
continue
pid = p["id"]
if pid in parents_by_id:
merged_edge = dict(parents_by_id[pid])
for k, v in p.items():
if _is_empty(v):
continue
merged_edge[k] = v
parents_by_id[pid] = merged_edge
else:
parents_order.append(pid)
parents_by_id[pid] = dict(p)
parents_list = [parents_by_id[pid] for pid in parents_order]
parents_merged = (
_json.dumps(parents_list)
if (tgt_p_was_json or src_p_was_json)
else parents_list
)
merged = dict(target)
for k, v in src.items():
if k in ("aliases", "tags", "parents"):
continue # handled separately
if _is_empty(v):
continue
merged[k] = v
merged["aliases"] = existing_aliases
merged["tags"] = tags_merged
# Only emit `parents` if at least one side had any (avoids creating
# a spurious empty list on entries that never had a parents field).
if tgt_parents or src_parents:
merged["parents"] = parents_merged
return merged
by_id: dict[str, dict] = {}
def _absorb(entries: list[dict], extra_skip: set[str] = frozenset()) -> None:
drop = skip_ids | extra_skip
for e in entries:
if "id" not in e:
raise typer.BadParameter(f"models seed entry missing id: {e!r}")
if e["id"] in drop:
continue
# Translate legacy `parent_model_id` scalar to the typed
# `parents` list before any merge / column-filter step.
_legacy_parent_model_id_to_parents(e)
if e["id"] in by_id:
by_id[e["id"]] = _merge_into(by_id[e["id"]], e)
else:
by_id[e["id"]] = e
# Sources/enrichments respect both skip_ids and skip_source_ids;
# core entries respect only skip_ids so curated overrides always apply.
_absorb(source_entries, extra_skip=skip_source_ids)
_absorb(core_entries)
_absorb(enrichment_entries, extra_skip=skip_source_ids)
return list(by_id.values())
# ------------------------------------------------------------------
# Benchmarks — two-source load:
# seed/benchmarks.yaml → curated canonicals (the
# source of truth, hand-edited)
# seed/benchmarks_generated/*.yaml → bulk auto-generated entries
# (e.g. AIR-Bench 2024's 373
# categories from
# scripts/refresh_air_bench_taxonomy.py)
#
# Merge order: generated → curated. Field-level merge per id (aliases
# union; other scalars prefer non-empty, last-write-wins) so curated
# entries can refine an auto-generated row without losing its aliases.
# Generator scripts must use stable canonical_ids so refreshes are
# idempotent.
# ------------------------------------------------------------------
def _load_benchmarks_merged() -> list[dict]:
curated_path = seed_path / "benchmarks.yaml"
generated_dir = seed_path / "benchmarks_generated"
generated_entries: list[dict] = []
if generated_dir.is_dir():
for src_path in sorted(generated_dir.glob("*.yaml")):
with open(src_path) as f:
loaded = yaml.safe_load(f) or []
if not isinstance(loaded, list):
raise typer.BadParameter(f"{src_path} must be a flat list")
generated_entries.extend(loaded)
curated_entries: list[dict] = []
if curated_path.exists():
with open(curated_path) as f:
loaded = yaml.safe_load(f) or []
if not isinstance(loaded, list):
raise typer.BadParameter(f"{curated_path} must be a flat list")
curated_entries = loaded
def _merge_benchmark(generated: dict, curated: dict) -> dict:
"""Curated wins on every field it specifies; aliases are
unioned (case-insensitive dedup) so generator-emitted aliases
survive even when curated narrows the entry."""
merged = dict(generated)
for k, v in curated.items():
if k == "aliases":
continue
merged[k] = v
existing = list(generated.get("aliases") or [])
existing_lc = {a.lower() for a in existing if a}
for a in (curated.get("aliases") or []):
if a and a.lower() not in existing_lc:
existing.append(a)
existing_lc.add(a.lower())
merged["aliases"] = existing
return merged
by_id: dict[str, dict] = {}
for entry in generated_entries:
if "id" not in entry:
raise typer.BadParameter(f"benchmarks generated entry missing id: {entry!r}")
by_id[entry["id"]] = entry
for entry in curated_entries:
if "id" not in entry:
raise typer.BadParameter(f"benchmarks seed entry missing id: {entry!r}")
if entry["id"] in by_id:
by_id[entry["id"]] = _merge_benchmark(by_id[entry["id"]], entry)
else:
by_id[entry["id"]] = entry
return list(by_id.values())
# ------------------------------------------------------------------
# Families — translate seed/families.yaml's nested {slug: {fields}}
# shape into flat dicts ready for upsert. The YAML uses the slug as
# the mapping key for human friendliness (`mmlu:` reads as a header);
# the table needs `id` as a column.
#
# Output schema mirrors `canonical_families`: list-valued fields
# (`benchmark_ids`, `folder_aliases`, `composite_keys`) are
# JSON-encoded so they round-trip through the parquet StringDtype
# column without losing structure.
# ------------------------------------------------------------------
def _load_families_seed() -> list[dict]:
path = seed_path / "families.yaml"
if not path.exists():
return []
with open(path) as f:
raw = yaml.safe_load(f) or {}
if not isinstance(raw, dict):
raise typer.BadParameter(f"{path} must be a top-level mapping {{slug: {{...}}}}")
out: list[dict] = []
# Validation: each benchmark may only appear in one curated family.
seen_benchmarks: dict[str, str] = {}
for slug, fields in raw.items():
if not isinstance(fields, dict):
raise typer.BadParameter(f"family {slug!r} entry must be a mapping, got {type(fields).__name__}")
benchmark_ids = list(fields.get("benchmarks") or [])
for bid in benchmark_ids:
if bid in seen_benchmarks and seen_benchmarks[bid] != slug:
raise typer.BadParameter(
f"benchmark {bid!r} listed in two families: "
f"{seen_benchmarks[bid]!r} and {slug!r}"
)
seen_benchmarks[bid] = slug
entry = {
"id": slug,
"display_name": fields.get("display") or slug,
"category": fields.get("category"),
"benchmark_ids": benchmark_ids,
"primary_benchmark_key": fields.get("primary_benchmark_key"),
"folder_aliases": list(fields.get("folder_aliases") or []),
"composite_keys": list(fields.get("composite_keys") or []),
"tags": fields.get("tags") or [],
"metadata": fields.get("metadata") or {},
"review_status": fields.get("review_status") or "reviewed",
}
out.append(entry)
return out
# ------------------------------------------------------------------
# Composites — same translation as families. YAML shape:
# {slug: {display, configs: [...], category?, family_id?}}
# ------------------------------------------------------------------
def _load_composites_seed() -> list[dict]:
path = seed_path / "composites.yaml"
if not path.exists():
return []
with open(path) as f:
raw = yaml.safe_load(f) or {}
if not isinstance(raw, dict):
raise typer.BadParameter(f"{path} must be a top-level mapping {{slug: {{...}}}}")
out: list[dict] = []
for slug, fields in raw.items():
if not isinstance(fields, dict):
raise typer.BadParameter(f"composite {slug!r} entry must be a mapping, got {type(fields).__name__}")
raw_configs = fields.get("configs")
if raw_configs is None:
# Display-only override (no explicit `configs:`): implicit
# single source_config equal to the slug. Some upstream
# EEE folders are kebab (`arc-agi`), others snake
# (`helm_classic`); ship both forms so the producer's
# composite_config_map JOIN matches whichever the data
# uses. De-dup when slug has no `-`.
kebab = slug
snake = slug.replace("-", "_")
source_configs = [kebab] if kebab == snake else [kebab, snake]
else:
source_configs = [str(c) for c in raw_configs]
entry = {
"id": slug,
"display_name": fields.get("display") or slug,
"category": fields.get("category"),
"source_configs": source_configs,
"family_id": fields.get("family_id"),
"tags": fields.get("tags") or [],
"metadata": fields.get("metadata") or {},
"review_status": fields.get("review_status") or "reviewed",
}
out.append(entry)
return out
# ------------------------------------------------------------------
# Orgs — two-file load:
# seed/orgs.yaml → curated first-party labs (the source
# of truth, hand-edited)
# seed/orgs.generated.yaml → auto-created orgs from hub-stats refresh
# (HF authors that aren't curated labs)
#
# Curated wins on id collision. Unlike the models merge (field-level),
# orgs use a simple "drop generated entry if id is in curated" policy:
# curated entries are deliberate and richer; auto-created entries are
# thin (just id, display_name, kind=unknown), so a partial overlay
# would never improve the curated record.
# ------------------------------------------------------------------
def _load_orgs_merged() -> list[dict]:
curated_path = seed_path / "orgs.yaml"
generated_path = seed_path / "orgs.generated.yaml"
curated: list[dict] = []
if curated_path.exists():
with open(curated_path) as f:
loaded = yaml.safe_load(f) or []
if not isinstance(loaded, list):
raise typer.BadParameter(f"{curated_path} must be a flat list")
curated = loaded
generated: list[dict] = []
if generated_path.exists():
with open(generated_path) as f:
loaded = yaml.safe_load(f) or []
if not isinstance(loaded, list):
raise typer.BadParameter(f"{generated_path} must be a flat list")
generated = loaded
curated_ids = {e["id"] for e in curated if "id" in e}
out = list(curated)
for e in generated:
if "id" not in e:
raise typer.BadParameter(f"orgs.generated.yaml entry missing id: {e!r}")
if e["id"] not in curated_ids:
out.append(e)
return out
# table name, yaml file, label, entity_type (for alias creation)
seed_specs = [
# Orgs: load via merge helper to combine curated + auto-generated.
("canonical_orgs", "__merged_orgs__", "orgs", "org"),
# Benchmarks: load via merge helper. Curated entries live in
# seed/benchmarks.yaml; bulk-generated entries (e.g. AIR-Bench
# 2024's 373 categories from the refresh script) live in
# seed/benchmarks_generated/*.yaml. Sentinel path triggers the
# _load_benchmarks_merged() helper.
("canonical_benchmarks", "__merged_benchmarks__", "benchmarks", "benchmark"),
("canonical_metrics", seed_path / "metrics.yaml", "metrics", "metric"),
("eval_harnesses", seed_path / "harnesses.yaml", "harnesses", "harness"),
# Families & composites are first-class registry entities since
# the hierarchy-alignment work (notes/hierarchy-alignment.md
# §4 / §7 Step 2). Their YAML uses {slug: {...}} shape, so we
# need translation loaders rather than the flat-list path.
# entity_type='family'/'composite' aliases are emitted for
# consistency but aren't consulted by the resolver today.
("canonical_families", "__nested_families__", "families", "family"),
("canonical_composites", "__nested_composites__", "composites", "composite"),
# Models: load via the merge helper; pass a sentinel path that
# signals the loop below to invoke _load_models_merged() instead of
# reading a single YAML file.
("canonical_models", "__merged_models__", "models", "model"),
]
alias_count = 0
# Track all seed entity IDs and alias keys so we can remove stale ones.
# Alias key: (raw_value, entity_type, canonical_id, source_config)
seed_snapshot: list[tuple[str, str, set[str], set[tuple[str, str, str, Optional[str]]]]] = []
# Build the alias index once so add_alias collision checks are O(1) instead
# of O(N) DataFrame mask scans. Combined with buffered=True below, this
# avoids the O(N²) pd.concat-per-row cost on ~1k entities + ~13k aliases.
queries._rebuild_alias_index(store)
for table, yaml_file, label, entity_type in seed_specs:
table_columns = set(schemas.empty(table).columns)
if yaml_file == "__merged_models__":
items = _load_models_merged()
if not items:
typer.echo(f" [skip] no model entries found in seed/models.yaml or _overrides/")
continue
elif yaml_file == "__merged_orgs__":
items = _load_orgs_merged()
if not items:
typer.echo(f" [skip] no org entries found in seed/orgs.yaml or seed/orgs.generated.yaml")
continue
elif yaml_file == "__merged_benchmarks__":
items = _load_benchmarks_merged()
if not items:
typer.echo(f" [skip] no benchmark entries found in seed/benchmarks.yaml or seed/benchmarks_generated/")
continue
elif yaml_file == "__nested_families__":
items = _load_families_seed()
if not items:
typer.echo(f" [skip] no family entries found in seed/families.yaml")
continue
elif yaml_file == "__nested_composites__":
items = _load_composites_seed()
if not items:
typer.echo(f" [skip] no composite entries found in seed/composites.yaml")
continue
else:
if not yaml_file.exists():
typer.echo(f" [skip] {yaml_file} not found")
continue
with open(yaml_file) as f:
items = yaml.safe_load(f) or []
yaml_ids: set[str] = set()
yaml_alias_keys: set[tuple[str, str, str, Optional[str]]] = set()
for original_item in items:
item = dict(original_item)
# Pop 'aliases' / 'scoped_aliases' before upserting — not table columns.
extra_aliases = item.pop("aliases", []) or []
scoped_aliases = item.pop("scoped_aliases", {}) or {}
# Normalize list/dict columns: YAML may have native lists/dicts,
# but the canonical_* parquet columns are VARCHAR, so encode if
# needed. `parents` is a list-of-edges on canonical_models.
# `benchmark_ids` / `folder_aliases` / `composite_keys` are
# list-valued on canonical_families. `source_configs` is
# list-valued on canonical_composites.
for col in (
"tags", "metadata", "parents",
"input_modalities", "output_modalities",
"benchmark_ids", "folder_aliases", "composite_keys",
"source_configs",
):
if col in item:
item[col] = _json_encode_if_needed(item[col])
entity_item = {k: v for k, v in item.items() if k in table_columns}
unknown_keys = sorted(set(item.keys()) - table_columns)
if unknown_keys:
typer.echo(
f" [warn] {label} entry {item.get('id', '?')!r} has unknown "
f"key(s) {unknown_keys} — silently dropped. Check for typos."
)
if "id" not in entity_item:
raise typer.BadParameter(f"{label} seed entry is missing required id: {original_item!r}")
queries.upsert_entity(store, table, entity_item, buffered=True)
canonical_id = entity_item["id"]
display_name = entity_item.get("display_name", "")
yaml_ids.add(canonical_id)
# Global aliases (source_config=None): matched regardless of caller's source_config.
# Scoped aliases (source_config=<name>): matched only when the caller passes that
# source_config — lets short tokens ("Overall", "Arabic") map to different
# benchmarks depending on which EEE config they came from.
global_aliases = {canonical_id, display_name} | set(extra_aliases)
alias_specs: list[tuple[str, Optional[str]]] = [
(raw, None) for raw in global_aliases if raw
]
for source_cfg, raw_values in scoped_aliases.items():
for raw in raw_values or []:
if raw:
alias_specs.append((raw, source_cfg))
for raw_value, source_cfg in alias_specs:
# Index stale-removal by (raw_value, entity_type, canonical_id, source_config)
yaml_alias_keys.add((raw_value, entity_type, canonical_id, source_cfg))
try:
queries.add_alias(store, {
"raw_value": raw_value,
"entity_type": entity_type,
"canonical_id": canonical_id,
"source_config": source_cfg,
"source_field": "seed",
"status": "confirmed",
"strategy": "seed",
"confidence": 1.0,
"notes": None,
}, buffered=True)
alias_count += 1
except ValueError:
# add_alias raises on uniqueness collision: an alias row
# already exists for (entity_type, raw_value, source_config).
# YAML is the source of truth, so if the existing row points
# at a different canonical_id, this is a YAML rename and we
# must REPOINT the existing row — NOT silently swallow it.
# Without this, stale-removal at the end of seed would then
# delete the row (its old key is no longer in
# yaml_alias_keys), causing total alias loss.
aliases_df = store.table("aliases")
mask = (
(aliases_df["raw_value"] == raw_value)
& (aliases_df["entity_type"] == entity_type)
& (aliases_df["status"] != "rejected")
)
if source_cfg is not None:
mask = mask & (aliases_df["source_config"] == source_cfg)
else:
mask = mask & aliases_df["source_config"].isna()
existing = aliases_df[mask]
if existing.empty:
# Collision came from the pending buffer (this run added
# the same key earlier). For same-canonical re-adds this
# is a no-op; for different-canonical we must mutate the
# pending dict in place so the rename isn't lost on
# flush. _alias_index points at the same dict, so
# updating it here keeps the index consistent.
for p in queries._get_pending(store, "aliases"):
if (p.get("entity_type") == entity_type
and p.get("raw_value") == raw_value
and queries._source_config_key(p.get("source_config")) == queries._source_config_key(source_cfg)
and p.get("status") != "rejected"):
if p["canonical_id"] != canonical_id:
prev = p["canonical_id"]
p["canonical_id"] = canonical_id
p["source_field"] = "seed"
p["status"] = "confirmed"
p["strategy"] = "seed"
p["confidence"] = 1.0
typer.echo(
f" [rename] alias {raw_value!r} ({entity_type}) "
f"moved {prev!r} -> {canonical_id!r} (pending)"
)
alias_count += 1
break
continue
row = existing.iloc[0]
if row["canonical_id"] != canonical_id:
# Rename: repoint the existing row at the new canonical.
queries.update_alias(store, row["id"], {
"canonical_id": canonical_id,
"source_field": "seed",
"status": "confirmed",
"strategy": "seed",
"confidence": 1.0,
})
typer.echo(
f" [rename] alias {raw_value!r} ({entity_type}) "
f"moved {row['canonical_id']!r} -> {canonical_id!r}"
)
alias_count += 1
# else: identical re-seed of an existing alias — no-op.
seed_snapshot.append((table, entity_type, yaml_ids, yaml_alias_keys))
typer.echo(f" {label}: {len(items)}")
# Flush all buffered upserts (entities + aliases) into their tables in a
# single pd.concat per table. prune_stale below reads store.table(...)
# directly, so this must happen before that block.
queries.flush_pending(store)
# Derive denormalized parent-walk caches now that all canonical_models
# rows are present. `root_model_id` and `lineage_origin_org_id` are
# computed from `parents` and need the full graph to be in place.
lineage_counts = queries.derive_model_lineage_fields(store)
typer.echo(
f" derived: root_model_id={lineage_counts['root_set']}, "
f"lineage_origin_org_id={lineage_counts['lineage_set']}, "
f"open_weights_inherited={lineage_counts['open_weights_inherited']}, "
f"release_date_from_id={lineage_counts['release_date_derived_from_id']}"
)
removed_entities = 0
removed_aliases = 0
if prune_stale:
# Remove seed-originated entities and aliases that are no longer in the YAML.
# Only touches rows that were created by seed (strategy == "seed"), never
# sync-created aliases or auto-draft entities.
for table, entity_type, yaml_ids, yaml_alias_keys in seed_snapshot:
# Remove stale seed aliases for this entity type.
aliases_df = store.table("aliases")
seed_mask = (aliases_df["strategy"] == "seed") & (aliases_df["entity_type"] == entity_type)
if seed_mask.any():
seed_aliases = aliases_df[seed_mask]
stale_alias_mask = seed_mask.copy()
for idx in seed_aliases.index:
row = seed_aliases.loc[idx]
sc = row.get("source_config")
if _is_na(sc):
sc = None
key = (row["raw_value"], row["entity_type"], row["canonical_id"], sc)
if key in yaml_alias_keys:
stale_alias_mask[idx] = False
n_stale = stale_alias_mask.sum()
if n_stale > 0:
store.set_table("aliases", aliases_df[~stale_alias_mask].reset_index(drop=True))
removed_aliases += int(n_stale)
# Remove stale seed entities — only those with review_status "reviewed"
# that came from seed and are no longer in the YAML.
entity_df = store.table(table)
if len(entity_df) > 0:
stale = entity_df["id"].isin(yaml_ids)
stale_entities = entity_df[~stale & (entity_df["review_status"] == "reviewed")]
# Only remove if every alias for this entity is also seed-originated,
# meaning it wasn't referenced by sync data.
current_aliases = store.table("aliases")
for eid in stale_entities["id"]:
entity_aliases = current_aliases[
(current_aliases["canonical_id"] == eid)
& (current_aliases["entity_type"] == entity_type)
]
if len(entity_aliases) == 0 or (entity_aliases["strategy"] == "seed").all():
entity_df = entity_df[entity_df["id"] != eid]
# Also remove any remaining aliases pointing to it.
current_aliases = current_aliases[
~((current_aliases["canonical_id"] == eid)
& (current_aliases["entity_type"] == entity_type))
]
removed_entities += 1
store.set_table(table, entity_df.reset_index(drop=True))
store.set_table("aliases", current_aliases.reset_index(drop=True))
typer.echo(f" aliases: {alias_count} added, {removed_aliases} removed")
if removed_entities:
typer.echo(f" stale entities removed: {removed_entities}")
store.push_to_hub()
typer.echo("Seed complete.")
# ------------------------------------------------------------------
# stats
# ------------------------------------------------------------------
@app.command()
def stats(
local: bool = typer.Option(False, "--local", help="Read from fixtures/ instead of HF Hub"),
):
"""Print registry entity counts and pending review summary."""
import os
if local:
os.environ["LOCAL_MODE"] = "true"
store = _load_store()
def _row(table):
df = store.table(table)
total = len(df)
draft = int((df["review_status"] == "draft").sum()) if "review_status" in df.columns else 0
return total, draft
for label, table in [
("models ", "canonical_models"),
("benchmarks", "canonical_benchmarks"),
("metrics ", "canonical_metrics"),
("harnesses ", "eval_harnesses"),
]:
total, draft = _row(table)
typer.echo(f" {label} total={total} draft={draft}")
aliases_df = store.table("aliases")
uncertain = int((aliases_df["status"] == "uncertain").sum()) if "status" in aliases_df.columns else 0
typer.echo(f"\n aliases total={len(aliases_df)} uncertain={uncertain}")
typer.echo(f" eval_results total={len(store.table('eval_results'))}")
typer.echo(f" resolution_log total={len(store.table('resolution_log'))}")
typer.echo(f" sync_runs total={len(store.table('sync_runs'))}")
# ------------------------------------------------------------------
# sync
# ------------------------------------------------------------------
@app.command()
def sync(
config: Optional[str] = typer.Option(None, "--config", help="EEE config name"),
all_configs: bool = typer.Option(False, "--all", help="Sync all EEE configs"),
rerun: bool = typer.Option(False, "--rerun", help="Re-resolve all raw strings even if already aliased"),
local: bool = typer.Option(False, "--local"),
):
"""
Batch sync EEE config(s) → writes resolved results to eval_results table.
Each result row is one (model × benchmark × metric) combination with resolved canonical IDs.
"""
import os
if local:
os.environ["LOCAL_MODE"] = "true"
if not config and not all_configs:
typer.echo("Specify --config <name> or --all", err=True)
raise typer.Exit(1)
from eval_card_registry.services.ingestion import run_sync
import datasets as ds_lib
store = _load_store()
configs_to_run: list[str] = []
if all_configs:
configs_to_run = ds_lib.get_dataset_config_names("evaleval/EEE_datastore")
else:
configs_to_run = [config]
failed = []
for cfg in configs_to_run:
typer.echo(f"Syncing {cfg}...")
try:
counts = run_sync(cfg, store, rerun=rerun)
typer.echo(f" {cfg}: {counts}")
except Exception as e:
typer.echo(f" {cfg}: FAILED — {e}", err=True)
failed.append(cfg)
typer.echo("Persisting tables...")
store.push_to_hub()
if failed:
typer.echo(f"Done with {len(failed)} failed config(s): {', '.join(failed)}")
else:
typer.echo("Done.")