aristoteles78 commited on
Commit
19b064e
·
1 Parent(s): b7488d8

update worker: Entity Registry dedup, canonical_id in cards

Browse files
Files changed (1) hide show
  1. worker.py +164 -71
worker.py CHANGED
@@ -1,17 +1,20 @@
1
  """Background worker for benchmark card generation.
2
 
3
- Detects new benchmark folders in EEE_datastore, generates cards via
4
- process_single_benchmark(), and uploads them to evaleval/auto-benchmarkcards.
 
5
  """
6
 
7
  import json
8
  import logging
9
  import os
 
10
  import tempfile
11
  from datetime import datetime, timezone
12
  from pathlib import Path
13
  from typing import Any
14
 
 
15
  from huggingface_hub import HfApi, snapshot_download
16
 
17
  logger = logging.getLogger("worker")
@@ -20,6 +23,8 @@ EEE_REPO = "evaleval/EEE_datastore"
20
  CARDS_REPO = "evaleval/auto-benchmarkcards"
21
  MAX_BENCHMARKS_PER_JOB = 5
22
 
 
 
23
  # Persistent storage on HF Spaces (mounted volume).
24
  # Falls back to local /tmp for development.
25
  PERSISTENT_DIR = Path(os.environ.get("PERSISTENT_DIR", "/data"))
@@ -37,29 +42,68 @@ def load_state() -> dict:
37
 
38
 
39
  def save_state(state: dict) -> None:
40
- """Save persistent state to disk."""
41
  PERSISTENT_DIR.mkdir(parents=True, exist_ok=True)
42
- STATE_FILE.write_text(json.dumps(state, indent=2))
 
 
43
 
44
 
45
  def _normalize_name(name: str) -> str:
46
- """Normalize benchmark name for comparison (lowercase, hyphens to underscores)."""
47
  return name.lower().replace("-", "_").replace(" ", "_")
48
 
49
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
50
  def _extract_folders(file_list: list[str]) -> set[str]:
51
  """Extract unique top-level folder names under data/."""
52
  folders = set()
53
  for path in file_list:
54
  parts = path.split("/")
55
- # Only include entries that have files beneath them (depth > 2)
56
  if len(parts) >= 3 and parts[0] == "data":
57
  folders.add(parts[1])
58
  return folders
59
 
60
 
61
  def _get_existing_cards() -> set[str]:
62
- """List benchmark names that already have a card in the target dataset."""
63
  api = HfApi()
64
  try:
65
  files = api.list_repo_files(CARDS_REPO, repo_type="dataset")
@@ -70,16 +114,30 @@ def _get_existing_cards() -> set[str]:
70
  names = set()
71
  for f in files:
72
  if f.startswith("cards/") and f.endswith(".json"):
73
- # cards/mmlu.json -> mmlu
74
  names.add(f[len("cards/"):-len(".json")])
75
  return names
76
 
77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78
  def detect_new_benchmarks() -> list[str]:
79
- """Find EEE folders that don't have a card yet.
80
 
81
- Compares EEE_datastore folders against existing cards in the
82
- target dataset, so we never regenerate what's already there.
83
  """
84
  api = HfApi()
85
  try:
@@ -91,12 +149,20 @@ def detect_new_benchmarks() -> list[str]:
91
  current_folders = _extract_folders(all_files)
92
  existing_cards = _get_existing_cards()
93
 
94
- # Normalize both sides for comparison (arc-agi == arc_agi)
95
- normalized_cards = {_normalize_name(c) for c in existing_cards}
 
 
 
 
 
 
 
96
  new_folders = sorted(
97
  f for f in current_folders
98
  if _normalize_name(f) not in normalized_cards
99
  )
 
100
  if not new_folders:
101
  logger.info("All %d folders already have cards", len(current_folders))
102
  return []
@@ -112,28 +178,23 @@ def detect_new_benchmarks() -> list[str]:
112
  return new_folders
113
 
114
 
115
- def _download_folder(folder_name: str) -> Path:
116
- """Download a single EEE folder to a temp directory."""
117
- target_dir = tempfile.mkdtemp(prefix=f"eee_{folder_name}_")
118
- logger.info("Downloading EEE folder '%s' to %s", folder_name, target_dir)
119
-
120
- snapshot_download(
121
- repo_id=EEE_REPO,
122
- repo_type="dataset",
123
- local_dir=target_dir,
124
- allow_patterns=[f"data/{folder_name}/**/*.json"],
125
- )
126
-
127
- data_path = Path(target_dir) / "data"
128
- return data_path
129
-
130
-
131
- def _upload_card(card: dict, benchmark_name: str) -> bool:
132
  """Upload a generated card to the auto-benchmarkcards dataset."""
133
  api = HfApi()
134
- safe_name = benchmark_name.lower().replace("-", "_").replace(" ", "_").replace("/", "_")
 
 
 
 
 
 
135
  remote_path = f"cards/{safe_name}.json"
136
 
 
 
 
 
 
137
  try:
138
  with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
139
  json.dump(card, f, indent=2)
@@ -160,19 +221,29 @@ def _upload_card(card: dict, benchmark_name: str) -> bool:
160
 
161
 
162
  def process_new_benchmarks(new_folders: list[str]) -> None:
163
- """Generate and upload cards for all benchmarks in the new folders.
164
 
165
- This runs in a background thread, called from app.py.
 
166
  """
167
  from auto_benchmarkcard.tools.eee.eee_tool import (
168
  scan_eee_folder,
169
  eee_to_pipeline_inputs,
170
  )
171
  from auto_benchmarkcard.eee_workflow import process_single_benchmark
172
- from auto_benchmarkcard.workflow import setup_logging_suppression
173
 
174
  setup_logging_suppression(debug_mode=False)
175
 
 
 
 
 
 
 
 
 
 
176
  state = load_state()
177
  job_record: dict[str, Any] = {
178
  "started_at": datetime.now(timezone.utc).isoformat(),
@@ -183,8 +254,10 @@ def process_new_benchmarks(new_folders: list[str]) -> None:
183
  for folder_name in new_folders:
184
  logger.info("Processing folder: %s", folder_name)
185
 
 
186
  try:
187
  data_path = _download_folder(folder_name)
 
188
  except Exception:
189
  logger.exception("Failed to download folder %s", folder_name)
190
  job_record["results"].append({
@@ -192,56 +265,76 @@ def process_new_benchmarks(new_folders: list[str]) -> None:
192
  })
193
  continue
194
 
195
- # Scan the downloaded folder
196
  try:
197
  scan_result = scan_eee_folder(str(data_path))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
198
  except Exception:
199
- logger.exception("Failed to scan folder %s", folder_name)
200
  job_record["results"].append({
201
  "folder": folder_name, "status": "scan_failed",
202
  })
203
- continue
204
 
205
- # Process individual benchmarks
206
- for name, bench in sorted(scan_result.benchmarks.items()):
207
- inputs = eee_to_pipeline_inputs(bench)
208
-
209
- if not inputs.get("hf_repo"):
210
- logger.warning("Skipping %s: no HF repo", name)
211
- job_record["results"].append({
212
- "folder": folder_name, "benchmark": name, "status": "no_hf_repo",
213
- })
214
- continue
215
-
216
- card = process_single_benchmark(
217
- benchmark_name=name,
218
- pipeline_inputs=inputs,
219
- base_output_path=str(PERSISTENT_DIR / "output"),
220
- debug=False,
221
- )
222
-
223
- if card:
224
- uploaded = _upload_card(card, name)
225
- job_record["results"].append({
226
- "folder": folder_name, "benchmark": name,
227
- "status": "uploaded" if uploaded else "upload_failed",
228
- })
229
- else:
230
- job_record["results"].append({
231
- "folder": folder_name, "benchmark": name, "status": "generation_failed",
232
- })
233
-
234
- # Mark folder as known
235
  if folder_name not in state["known_folders"]:
236
  state["known_folders"].append(folder_name)
237
 
238
  job_record["completed_at"] = datetime.now(timezone.utc).isoformat()
239
 
240
- # Summarize
241
  results = job_record["results"]
242
- uploaded = sum(1 for r in results if r["status"] == "uploaded")
243
- failed = sum(1 for r in results if r["status"] not in ("uploaded", "no_hf_repo"))
244
- skipped = sum(1 for r in results if r["status"] == "no_hf_repo")
245
  logger.info("Job complete: %d uploaded, %d failed, %d skipped", uploaded, failed, skipped)
246
 
247
  state["jobs"].append(job_record)
 
1
  """Background worker for benchmark card generation.
2
 
3
+ Detects new benchmarks in EEE_datastore by scanning folders for actual
4
+ benchmark names, resolves them via the Entity Registry for canonical IDs,
5
+ and skips benchmarks that already have cards in evaleval/auto-benchmarkcards.
6
  """
7
 
8
  import json
9
  import logging
10
  import os
11
+ import shutil
12
  import tempfile
13
  from datetime import datetime, timezone
14
  from pathlib import Path
15
  from typing import Any
16
 
17
+ import requests
18
  from huggingface_hub import HfApi, snapshot_download
19
 
20
  logger = logging.getLogger("worker")
 
23
  CARDS_REPO = "evaleval/auto-benchmarkcards"
24
  MAX_BENCHMARKS_PER_JOB = 5
25
 
26
+ ENTITY_REGISTRY_URL = "https://evaleval-entity-registry.hf.space/api/v1"
27
+
28
  # Persistent storage on HF Spaces (mounted volume).
29
  # Falls back to local /tmp for development.
30
  PERSISTENT_DIR = Path(os.environ.get("PERSISTENT_DIR", "/data"))
 
42
 
43
 
44
  def save_state(state: dict) -> None:
45
+ """Save persistent state to disk (atomic write via temp + rename)."""
46
  PERSISTENT_DIR.mkdir(parents=True, exist_ok=True)
47
+ tmp = STATE_FILE.with_suffix(".tmp")
48
+ tmp.write_text(json.dumps(state, indent=2))
49
+ tmp.rename(STATE_FILE)
50
 
51
 
52
  def _normalize_name(name: str) -> str:
53
+ """Local fallback normalization (lowercase, collapse separators to underscore)."""
54
  return name.lower().replace("-", "_").replace(" ", "_")
55
 
56
 
57
+ def _resolve_names(names: list[str]) -> dict[str, str]:
58
+ """Resolve benchmark names to canonical IDs via Entity Registry.
59
+
60
+ Returns a mapping {raw_name: canonical_id}. Names the registry doesn't
61
+ recognize get a locally-normalized fallback ID so dedup still works.
62
+ """
63
+ resolved = {}
64
+
65
+ if not names:
66
+ return resolved
67
+
68
+ try:
69
+ resp = requests.post(
70
+ f"{ENTITY_REGISTRY_URL}/resolve/batch",
71
+ json=[{"raw_value": n, "entity_type": "benchmark"} for n in names],
72
+ timeout=15,
73
+ )
74
+ resp.raise_for_status()
75
+ results = resp.json()
76
+
77
+ # Response is a list in the same order as the input
78
+ for name, item in zip(names, results):
79
+ canonical = item.get("canonical_id")
80
+ resolved[name] = canonical if canonical else _normalize_name(name)
81
+
82
+ registry_hits = sum(1 for item in results if item.get("canonical_id"))
83
+ logger.info(
84
+ "Entity Registry resolved %d/%d names",
85
+ registry_hits, len(names),
86
+ )
87
+ except Exception:
88
+ logger.warning("Entity Registry unreachable, falling back to local normalization")
89
+ for n in names:
90
+ resolved[n] = _normalize_name(n)
91
+
92
+ return resolved
93
+
94
+
95
  def _extract_folders(file_list: list[str]) -> set[str]:
96
  """Extract unique top-level folder names under data/."""
97
  folders = set()
98
  for path in file_list:
99
  parts = path.split("/")
 
100
  if len(parts) >= 3 and parts[0] == "data":
101
  folders.add(parts[1])
102
  return folders
103
 
104
 
105
  def _get_existing_cards() -> set[str]:
106
+ """List card names already in the target dataset (without path/extension)."""
107
  api = HfApi()
108
  try:
109
  files = api.list_repo_files(CARDS_REPO, repo_type="dataset")
 
114
  names = set()
115
  for f in files:
116
  if f.startswith("cards/") and f.endswith(".json"):
 
117
  names.add(f[len("cards/"):-len(".json")])
118
  return names
119
 
120
 
121
+ def _download_folder(folder_name: str) -> Path:
122
+ """Download a single EEE folder to a temp directory."""
123
+ target_dir = tempfile.mkdtemp(prefix=f"eee_{folder_name}_")
124
+ logger.info("Downloading EEE folder '%s' to %s", folder_name, target_dir)
125
+
126
+ snapshot_download(
127
+ repo_id=EEE_REPO,
128
+ repo_type="dataset",
129
+ local_dir=target_dir,
130
+ allow_patterns=[f"data/{folder_name}/**/*.json"],
131
+ )
132
+
133
+ return Path(target_dir) / "data"
134
+
135
+
136
  def detect_new_benchmarks() -> list[str]:
137
+ """Find EEE folders that contain benchmarks without cards.
138
 
139
+ Returns folder names that have at least one benchmark not yet in the
140
+ cards dataset. The actual per-benchmark filtering happens during processing.
141
  """
142
  api = HfApi()
143
  try:
 
149
  current_folders = _extract_folders(all_files)
150
  existing_cards = _get_existing_cards()
151
 
152
+ # Normalize existing card names for comparison (match both hyphen and underscore)
153
+ normalized_cards = set()
154
+ for c in existing_cards:
155
+ normalized_cards.add(c)
156
+ normalized_cards.add(c.replace("-", "_"))
157
+ normalized_cards.add(c.replace("_", "-"))
158
+
159
+ # A folder is "new" if its normalized name doesn't match any card.
160
+ # This is a coarse filter — per-benchmark dedup happens in process_new_benchmarks.
161
  new_folders = sorted(
162
  f for f in current_folders
163
  if _normalize_name(f) not in normalized_cards
164
  )
165
+
166
  if not new_folders:
167
  logger.info("All %d folders already have cards", len(current_folders))
168
  return []
 
178
  return new_folders
179
 
180
 
181
+ def _upload_card(card: dict, benchmark_name: str, canonical_id: str | None = None) -> bool:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
  """Upload a generated card to the auto-benchmarkcards dataset."""
183
  api = HfApi()
184
+
185
+ # Use canonical ID for filename when available, fall back to local normalization
186
+ if canonical_id:
187
+ safe_name = canonical_id
188
+ else:
189
+ safe_name = _normalize_name(benchmark_name).replace("/", "_")
190
+
191
  remote_path = f"cards/{safe_name}.json"
192
 
193
+ # Embed canonical ID in the card so it's self-consistent with the filename
194
+ if canonical_id:
195
+ inner = card.get("benchmark_card", card)
196
+ inner.setdefault("card_info", {})["canonical_id"] = canonical_id
197
+
198
  try:
199
  with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
200
  json.dump(card, f, indent=2)
 
221
 
222
 
223
  def process_new_benchmarks(new_folders: list[str]) -> None:
224
+ """Generate and upload cards for benchmarks that don't have one yet.
225
 
226
+ Downloads each folder, scans for benchmarks, resolves names via the
227
+ Entity Registry, and skips any benchmark that already has a card.
228
  """
229
  from auto_benchmarkcard.tools.eee.eee_tool import (
230
  scan_eee_folder,
231
  eee_to_pipeline_inputs,
232
  )
233
  from auto_benchmarkcard.eee_workflow import process_single_benchmark
234
+ from auto_benchmarkcard.logging_setup import setup_logging_suppression
235
 
236
  setup_logging_suppression(debug_mode=False)
237
 
238
+ # Fetch existing cards once for the whole job.
239
+ # Normalize to both hyphen and underscore forms so we match old and new naming.
240
+ existing_cards = _get_existing_cards()
241
+ existing_normalized = set()
242
+ for c in existing_cards:
243
+ existing_normalized.add(c)
244
+ existing_normalized.add(c.replace("-", "_"))
245
+ existing_normalized.add(c.replace("_", "-"))
246
+
247
  state = load_state()
248
  job_record: dict[str, Any] = {
249
  "started_at": datetime.now(timezone.utc).isoformat(),
 
254
  for folder_name in new_folders:
255
  logger.info("Processing folder: %s", folder_name)
256
 
257
+ tmp_root = None
258
  try:
259
  data_path = _download_folder(folder_name)
260
+ tmp_root = data_path.parent
261
  except Exception:
262
  logger.exception("Failed to download folder %s", folder_name)
263
  job_record["results"].append({
 
265
  })
266
  continue
267
 
 
268
  try:
269
  scan_result = scan_eee_folder(str(data_path))
270
+
271
+ # Resolve all benchmark names in this folder at once
272
+ bench_names = list(scan_result.benchmarks.keys())
273
+ resolved = _resolve_names(bench_names)
274
+
275
+ for name, bench in sorted(scan_result.benchmarks.items()):
276
+ canonical = resolved.get(name, _normalize_name(name))
277
+
278
+ # Per-benchmark dedup: skip if a card already exists
279
+ if canonical in existing_normalized:
280
+ logger.info("Skipping %s: card already exists (canonical: %s)", name, canonical)
281
+ job_record["results"].append({
282
+ "folder": folder_name, "benchmark": name,
283
+ "canonical_id": canonical, "status": "already_exists",
284
+ })
285
+ continue
286
+
287
+ inputs = eee_to_pipeline_inputs(bench)
288
+
289
+ if not inputs.get("hf_repo"):
290
+ logger.warning("Skipping %s: no HF repo", name)
291
+ job_record["results"].append({
292
+ "folder": folder_name, "benchmark": name, "status": "no_hf_repo",
293
+ })
294
+ continue
295
+
296
+ card = process_single_benchmark(
297
+ benchmark_name=name,
298
+ pipeline_inputs=inputs,
299
+ base_output_path=str(PERSISTENT_DIR / "output"),
300
+ debug=False,
301
+ )
302
+
303
+ if card:
304
+ uploaded = _upload_card(card, name, canonical_id=canonical)
305
+ job_record["results"].append({
306
+ "folder": folder_name, "benchmark": name,
307
+ "canonical_id": canonical,
308
+ "status": "uploaded" if uploaded else "upload_failed",
309
+ })
310
+ # Add to existing set so later benchmarks in same job are deduped
311
+ if uploaded:
312
+ existing_normalized.add(canonical)
313
+ existing_normalized.add(canonical.replace("-", "_"))
314
+ else:
315
+ job_record["results"].append({
316
+ "folder": folder_name, "benchmark": name, "status": "generation_failed",
317
+ })
318
+
319
  except Exception:
320
+ logger.exception("Failed to process folder %s", folder_name)
321
  job_record["results"].append({
322
  "folder": folder_name, "status": "scan_failed",
323
  })
 
324
 
325
+ finally:
326
+ if tmp_root and tmp_root.exists():
327
+ shutil.rmtree(tmp_root, ignore_errors=True)
328
+
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
329
  if folder_name not in state["known_folders"]:
330
  state["known_folders"].append(folder_name)
331
 
332
  job_record["completed_at"] = datetime.now(timezone.utc).isoformat()
333
 
 
334
  results = job_record["results"]
335
+ uploaded = sum(1 for r in results if r.get("status") == "uploaded")
336
+ failed = sum(1 for r in results if r.get("status") not in ("uploaded", "no_hf_repo", "already_exists"))
337
+ skipped = sum(1 for r in results if r.get("status") in ("no_hf_repo", "already_exists"))
338
  logger.info("Job complete: %d uploaded, %d failed, %d skipped", uploaded, failed, skipped)
339
 
340
  state["jobs"].append(job_record)