File size: 6,104 Bytes
c330598 f86c505 c330598 f86c505 c330598 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | """Build the candidate model pool consumed by the recommendation web app.
The output is a single .npz that bundles, for every candidate model:
- model_name (str)
- size_id (int, bucket id matching the trained MLPMetric)
- family_id (int)
- popularity (int, HF downloads in the last 30d; 0 if unknown)
- hf_url (str, https://huggingface.co/<name> if name looks like a repo id)
Run from the project root:
python web/build_model_pool.py \
--data-dir data/unified_augmented \
--args checkpoint/mlp/unified_augmented/ablation_no_model_id_no_dataset_id/args.json \
--out web/assets/model_pool.npz
"""
from __future__ import annotations
import argparse
import json
import os
import numpy as np
SIZE_EDGES_DEFAULT = [
0.001, 0.003, 0.01, 0.03, 0.06, 0.1, 0.15, 0.2, 0.3, 0.4,
0.5, 0.6, 0.8, 1, 3, 7, 14, 35, 70, 100, 1000,
]
def assign_size_bucket(size_b: float, size_edges: np.ndarray, unknown_id: int) -> int:
try:
x = float(size_b)
except (TypeError, ValueError):
return unknown_id
if not np.isfinite(x) or x == 0.0:
return unknown_id
return int(np.searchsorted(size_edges, x, side="right"))
def get_size_b(profile_entry) -> float:
if not isinstance(profile_entry, dict):
return float("nan")
size = profile_entry.get("size")
try:
if isinstance(size, str) and size.strip().lower() == "unknown":
return float("nan")
x = float(size)
return x if x != 0.0 else float("nan")
except Exception:
return float("nan")
def hf_url_for(name: str) -> str:
return f"https://huggingface.co/{name}" if "/" in name else ""
def main(argv=None):
p = argparse.ArgumentParser()
p.add_argument("--data-dir", default="data/unified_augmented")
p.add_argument(
"--args",
default="checkpoint/mlp/unified_augmented/ablation_no_model_id_no_dataset_id/args.json",
help="Path to the training args.json — used to read size_bucket so bucket ids align with the checkpoint.",
)
p.add_argument(
"--profile-dir",
default=None,
help=(
"Optional fallback directory to read model_profile.json / "
"model_popularity.json from when --data-dir lacks them (e.g. "
"v2 deployment data only ships ID maps)."
),
)
p.add_argument("--out", default="web/assets/model_pool.npz")
p.add_argument(
"--min-popularity",
type=int,
default=0,
help="Drop candidate models with HF download count below this. 0 keeps all.",
)
args = p.parse_args(argv)
os.makedirs(os.path.dirname(args.out), exist_ok=True)
with open(os.path.join(args.data_dir, "model2id.json")) as f:
model2id = json.load(f)
with open(os.path.join(args.data_dir, "model2family.json")) as f:
model2family = json.load(f)
with open(os.path.join(args.data_dir, "family2id.json")) as f:
family2id = json.load(f)
def _read_profile_files(d):
prof = {}
pop = {}
prof_path = os.path.join(d, "model_profile.json")
pop_path = os.path.join(d, "model_popularity.json")
if os.path.exists(prof_path):
with open(prof_path) as f:
prof = json.load(f)
if os.path.exists(pop_path):
pop_doc = json.load(open(pop_path))
models_field = pop_doc.get("models", pop_doc)
for name, entry in models_field.items():
if isinstance(entry, dict):
pop[name] = int(entry.get("downloads", 0) or 0)
else:
try:
pop[name] = int(entry)
except Exception:
pop[name] = 0
return prof, pop
model_profile, pop_map = _read_profile_files(args.data_dir)
if args.profile_dir:
fb_prof, fb_pop = _read_profile_files(args.profile_dir)
# Fill in any gaps from the fallback dir (e.g. v1 profile for v2 names).
for k, v in fb_prof.items():
model_profile.setdefault(k, v)
for k, v in fb_pop.items():
pop_map.setdefault(k, v)
if os.path.exists(args.args):
train_args = json.load(open(args.args))
size_edges = np.array(train_args.get("size_bucket", SIZE_EDGES_DEFAULT), dtype=float)
else:
size_edges = np.array(SIZE_EDGES_DEFAULT, dtype=float)
unknown_size_id = len(size_edges) + 1
unknown_family_id = family2id.get("unknown", len(family2id) - 1)
names = []
size_ids = []
sizes_b = []
family_ids = []
popularities = []
urls = []
dropped_pop = 0
for name in model2id.keys():
pop = pop_map.get(name, 0)
if pop < args.min_popularity:
dropped_pop += 1
continue
size_b = get_size_b(model_profile.get(name))
sid = assign_size_bucket(size_b, size_edges, unknown_size_id)
fam = model2family.get(name, "unknown")
fid = family2id.get(fam, unknown_family_id)
names.append(name)
size_ids.append(sid)
sizes_b.append(size_b) # NaN means unknown
family_ids.append(fid)
popularities.append(pop)
urls.append(hf_url_for(name))
names_arr = np.array(names, dtype=object)
size_arr = np.array(size_ids, dtype=np.int64)
sizes_b_arr = np.array(sizes_b, dtype=np.float32)
fam_arr = np.array(family_ids, dtype=np.int64)
pop_arr = np.array(popularities, dtype=np.int64)
url_arr = np.array(urls, dtype=object)
np.savez(
args.out,
names=names_arr,
size_ids=size_arr,
sizes_b=sizes_b_arr,
family_ids=fam_arr,
popularities=pop_arr,
urls=url_arr,
)
print(f"Wrote {len(names)} models to {args.out} (dropped {dropped_pop} below min-popularity={args.min_popularity})")
print(f" unique families: {len(set(family_ids))}, unique size buckets: {len(set(size_ids))}")
print(f" models with HF URL: {sum(1 for u in urls if u)} / {len(urls)}")
if __name__ == "__main__":
main()
|