yxc20098 commited on
Commit
8a0b07d
ยท
1 Parent(s): 385aa0a

collect_eval_data: cell-level retry on transient API errors

Browse files

The Phase 4 debug run lost 43/48 Kimi-K2.6 cells to Together
rate-limit 429s that the driver never retried (the inner
run_eval has provider-call retry, but a cell that exhausted
those retries was treated as 'done' by the driver). Likewise
the Qwen3.5-397B-A17B run lost ~30/48 cells at parallel=4 to
the same 429 pattern.

Add a cell-level retry loop wrapping _run_cell:

* New flags --cell-retries (default 3) and
--cell-retry-base-delay (default 30s).
* After each cell, if jsonl is incomplete AND the stats.json's
episode notes match a transient-error pattern (429, 5xx,
TimeoutException, TransportError, ReadTimeout, ConnectError,
RemoteProtocolError, 'rate limit', 'Too many requests'),
sleep base * 2^(attempt-1) (capped at 300s) and re-run the
same cell.
* Permanent errors (e.g. 404 model_not_available, 401 auth)
are NEVER retried โ€” they would just burn quota.

This is the driver-level outer retry; the inner provider call
keeps its own short backoff for transient HTTP errors mid-cell.
The two work together: provider-level rescues the API hiccup
within a turn, cell-level rescues the cell when the whole
subprocess's retry budget is exhausted on a long rate-limit
window.

Helpers tested with golden 429 and 404 stats.json fixtures.

Files changed (1) hide show
  1. scripts/collect_eval_data.py +92 -2
scripts/collect_eval_data.py CHANGED
@@ -244,6 +244,82 @@ def _pack_path_for(pack_id: str) -> Path:
244
  return PACKS_DIR / f"{pack_id}.yaml"
245
 
246
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
247
  def _run_cell(cell: dict, args, python_bin: str) -> dict:
248
  """Spawn one `python -m openra_bench.run_eval` for a single cell.
249
  Returns a result dict with rc / log_path / jsonl_path."""
@@ -367,6 +443,20 @@ def main(argv: list[str]) -> int:
367
  default=1,
368
  help="how many cell subprocesses to run at once",
369
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
370
  ap.add_argument(
371
  "--provider",
372
  default="together",
@@ -495,7 +585,7 @@ def main(argv: list[str]) -> int:
495
  started = time.time()
496
  if a.parallel_cells <= 1:
497
  for c in todo:
498
- r = _run_cell(c, a, a.python)
499
  results.append(r)
500
  completed += 1 if r["complete"] else 0
501
  fail += 0 if r["rc"] == 0 else 1
@@ -507,7 +597,7 @@ def main(argv: list[str]) -> int:
507
  )
508
  else:
509
  with ThreadPoolExecutor(max_workers=a.parallel_cells) as ex:
510
- futs = {ex.submit(_run_cell, c, a, a.python): c for c in todo}
511
  for fu in as_completed(futs):
512
  r = fu.result()
513
  results.append(r)
 
244
  return PACKS_DIR / f"{pack_id}.yaml"
245
 
246
 
247
+ # โ”€โ”€ transient-error detection for cell-level retry โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
248
+ # A cell whose subprocess errored at the provider layer (429, 503,
249
+ # transport reset, read timeout) leaves an `episodes[*].notes` entry
250
+ # in the stats.json with the underlying exception text. We retry only
251
+ # on patterns known to be transient โ€” 404 / 400 / auth errors are
252
+ # never retried (would just burn quota).
253
+ _TRANSIENT_NOTE_MARKERS = (
254
+ "429",
255
+ "500 from provider",
256
+ "502 from provider",
257
+ "503 from provider",
258
+ "504 from provider",
259
+ "Too many requests",
260
+ "rate limit",
261
+ "RuntimeError: 5",
262
+ "TimeoutException",
263
+ "ReadTimeout",
264
+ "ConnectError",
265
+ "TransportError",
266
+ "RemoteProtocolError",
267
+ )
268
+
269
+
270
+ def _stats_path_for(cell: dict, args) -> Path:
271
+ return Path(args.output_dir) / ".logs" / f"{cell['cell_id']}.stats.json"
272
+
273
+
274
+ def _is_transient_failure(stats_path: Path) -> bool:
275
+ """True if the most recent attempt's stats.json shows a retryable
276
+ error in the episode notes. False on missing file, parse error,
277
+ no episodes, or a non-transient note (e.g. 404 model_not_available).
278
+ """
279
+ try:
280
+ d = json.loads(stats_path.read_text())
281
+ except (OSError, json.JSONDecodeError):
282
+ return False
283
+ eps = d.get("episodes") or []
284
+ if not eps:
285
+ return False
286
+ notes = " ".join(str(n) for n in (eps[-1].get("notes") or []))
287
+ if not notes:
288
+ return False
289
+ return any(m in notes for m in _TRANSIENT_NOTE_MARKERS)
290
+
291
+
292
+ def _run_cell_with_retry(cell: dict, args, python_bin: str) -> dict:
293
+ """Wrap _run_cell with bounded backoff on transient failures.
294
+ The cell counts as 'failed and retryable' when:
295
+ (a) `is_complete_cell(jsonl_path)` returns False, AND
296
+ (b) the cell's stats.json carries a transient-error note.
297
+ Any other outcome (success, or a permanent error like 404) is
298
+ returned immediately.
299
+ """
300
+ max_attempts = max(1, int(args.cell_retries))
301
+ base = float(args.cell_retry_base_delay)
302
+ cap = 300.0 # 5 min hard cap; long enough for Together rate windows
303
+ last = None
304
+ for attempt in range(1, max_attempts + 1):
305
+ r = _run_cell(cell, args, python_bin)
306
+ r["attempts"] = attempt
307
+ last = r
308
+ if r["complete"] or attempt >= max_attempts:
309
+ return r
310
+ stats_path = _stats_path_for(cell, args)
311
+ if not _is_transient_failure(stats_path):
312
+ return r # permanent error โ€” don't burn quota
313
+ delay = min(cap, base * (2 ** (attempt - 1)))
314
+ print(
315
+ f" โ†ป retry {attempt + 1}/{max_attempts} for {cell['cell_id']} "
316
+ f"after {delay:.0f}s (transient failure)",
317
+ flush=True,
318
+ )
319
+ time.sleep(delay)
320
+ return last # unreachable, but keeps type-checkers quiet
321
+
322
+
323
  def _run_cell(cell: dict, args, python_bin: str) -> dict:
324
  """Spawn one `python -m openra_bench.run_eval` for a single cell.
325
  Returns a result dict with rc / log_path / jsonl_path."""
 
443
  default=1,
444
  help="how many cell subprocesses to run at once",
445
  )
446
+ ap.add_argument(
447
+ "--cell-retries",
448
+ type=int,
449
+ default=3,
450
+ help="max attempts per cell on transient failure (429/5xx/"
451
+ "timeout/transport). 1 = no retry. Default: 3",
452
+ )
453
+ ap.add_argument(
454
+ "--cell-retry-base-delay",
455
+ type=float,
456
+ default=30.0,
457
+ help="base seconds before first retry; doubles each attempt, "
458
+ "capped at 300s. Default: 30",
459
+ )
460
  ap.add_argument(
461
  "--provider",
462
  default="together",
 
585
  started = time.time()
586
  if a.parallel_cells <= 1:
587
  for c in todo:
588
+ r = _run_cell_with_retry(c, a, a.python)
589
  results.append(r)
590
  completed += 1 if r["complete"] else 0
591
  fail += 0 if r["rc"] == 0 else 1
 
597
  )
598
  else:
599
  with ThreadPoolExecutor(max_workers=a.parallel_cells) as ex:
600
+ futs = {ex.submit(_run_cell_with_retry, c, a, a.python): c for c in todo}
601
  for fu in as_completed(futs):
602
  r = fu.result()
603
  results.append(r)