JacobLinCool Codex commited on
Commit
9f8766d
·
verified ·
1 Parent(s): c810fc6

fix: heartbeat dashboard refresh locks

Browse files

Co-authored-by: Codex <noreply@openai.com>

Files changed (3) hide show
  1. README.md +1 -1
  2. app.py +21 -0
  3. tests/test_app.py +22 -0
README.md CHANGED
@@ -99,7 +99,7 @@ dashboard refresh if no refresh is already running. `ADVISOR_SCHEDULED_REFRESH=0
99
  `ADVISOR_REFRESH_INITIAL_DELAY_SECONDS`, and `ADVISOR_SCHEDULED_REFRESH_COMPUTE` tune the cadence and compute mode.
100
  Manual and scheduled refreshes both acquire `$ADVISOR_CACHE_DIR/refresh.lock` atomically before work starts, so multiple
101
  app processes do not analyze the same snapshot concurrently. Stale locks expire after `ADVISOR_REFRESH_LOCK_TTL_SECONDS`
102
- (default two hours).
103
 
104
  Set `ADVISOR_QUEST_ANALYZER_BACKEND=minicpm-transformers` for both local and deployed refresh runs. The local dashboard
105
  uses the same MiniCPM analyzer as the deployed Space; test doubles are only used inside pytest.
 
99
  `ADVISOR_REFRESH_INITIAL_DELAY_SECONDS`, and `ADVISOR_SCHEDULED_REFRESH_COMPUTE` tune the cadence and compute mode.
100
  Manual and scheduled refreshes both acquire `$ADVISOR_CACHE_DIR/refresh.lock` atomically before work starts, so multiple
101
  app processes do not analyze the same snapshot concurrently. Stale locks expire after `ADVISOR_REFRESH_LOCK_TTL_SECONDS`
102
+ (default two hours), and active jobs heartbeat the lock while they progress.
103
 
104
  Set `ADVISOR_QUEST_ANALYZER_BACKEND=minicpm-transformers` for both local and deployed refresh runs. The local dashboard
105
  uses the same MiniCPM analyzer as the deployed Space; test doubles are only used inside pytest.
app.py CHANGED
@@ -203,6 +203,7 @@ def _analyze_dashboard_quests(
203
  remaining_count=len(projects),
204
  last_project_id="",
205
  )
 
206
 
207
  for project in projects:
208
  lookup = read_quest_cache_entry(cache_dir, project, analyzer_fingerprint)
@@ -238,6 +239,7 @@ def _analyze_dashboard_quests(
238
  remaining_count=len(projects) - hit_count - analyzed_count,
239
  last_project_id=project.id,
240
  )
 
241
 
242
  for start in range(0, len(misses), batch_size):
243
  batch = misses[start : start + batch_size]
@@ -281,6 +283,7 @@ def _analyze_dashboard_quests(
281
  remaining_count=len(projects) - hit_count - analyzed_count,
282
  last_project_id=project.id,
283
  )
 
284
  validated = validate_matches_by_project(matches_by_project, projects, source=source)
285
  summary = {
286
  "project_count": len(projects),
@@ -456,6 +459,18 @@ def _release_refresh_lease(cache_dir: Path, run_id: str) -> None:
456
  print(f"[dashboard-refresh] released refresh lock run={run_id}", flush=True)
457
 
458
 
 
 
 
 
 
 
 
 
 
 
 
 
459
  def _read_refresh_lease(lock_path: Path) -> dict[str, Any] | None:
460
  try:
461
  payload = json.loads(lock_path.read_text(encoding="utf-8"))
@@ -531,6 +546,7 @@ def _run_refresh_job(run_id: str, cache_dir: Path, compute: str) -> None:
531
  compute=compute,
532
  )
533
  _set_refresh_state(stage="persisting")
 
534
  artifacts = persist_refresh_artifacts(
535
  cache_dir,
536
  run_id,
@@ -540,6 +556,7 @@ def _run_refresh_job(run_id: str, cache_dir: Path, compute: str) -> None:
540
  quest_analysis_payload=quest_analysis_payload,
541
  )
542
  _set_refresh_state(stage="swapping")
 
543
  _replace_runtime_from_files(artifacts.projects_path, artifacts.index_path, artifacts.dashboard)
544
  _release_refresh_lease(cache_dir, run_id)
545
  _set_refresh_state(
@@ -579,6 +596,7 @@ def _build_refresh_payloads(
579
 
580
  org = os.environ.get("ADVISOR_HF_ORG", DEFAULT_HF_ORG).strip() or DEFAULT_HF_ORG
581
  _set_refresh_state(stage="crawling")
 
582
  project_rows = sorted(crawl_projects(org), key=lambda project: project["id"].lower())
583
  projects_payload = {
584
  "generated_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
@@ -587,6 +605,7 @@ def _build_refresh_payloads(
587
  }
588
 
589
  _set_refresh_state(stage="embedding")
 
590
  with tempfile.TemporaryDirectory(prefix="advisor-refresh-") as directory:
591
  project_path = Path(directory) / "projects.json"
592
  project_path.write_text(json.dumps(projects_payload, ensure_ascii=False), encoding="utf-8")
@@ -608,6 +627,7 @@ def _build_refresh_payloads(
608
  )
609
 
610
  _set_refresh_state(stage="quest_analysis")
 
611
  quest_analysis = _analyze_dashboard_quests(
612
  [project.to_refresh_snapshot_dict() for project in projects],
613
  cache_dir=cache_dir,
@@ -615,6 +635,7 @@ def _build_refresh_payloads(
615
  run_id=run_id,
616
  )
617
  _set_refresh_state(stage="atlas")
 
618
  refreshed_dashboard = build_dashboard_payload(
619
  refreshed_index,
620
  quest_matches=quest_analysis["matches_by_project"],
 
203
  remaining_count=len(projects),
204
  last_project_id="",
205
  )
206
+ _refresh_lease_heartbeat(cache_dir, run_id)
207
 
208
  for project in projects:
209
  lookup = read_quest_cache_entry(cache_dir, project, analyzer_fingerprint)
 
239
  remaining_count=len(projects) - hit_count - analyzed_count,
240
  last_project_id=project.id,
241
  )
242
+ _refresh_lease_heartbeat(cache_dir, run_id)
243
 
244
  for start in range(0, len(misses), batch_size):
245
  batch = misses[start : start + batch_size]
 
283
  remaining_count=len(projects) - hit_count - analyzed_count,
284
  last_project_id=project.id,
285
  )
286
+ _refresh_lease_heartbeat(cache_dir, run_id)
287
  validated = validate_matches_by_project(matches_by_project, projects, source=source)
288
  summary = {
289
  "project_count": len(projects),
 
459
  print(f"[dashboard-refresh] released refresh lock run={run_id}", flush=True)
460
 
461
 
462
+ def _refresh_lease_heartbeat(cache_dir: Path, run_id: str) -> None:
463
+ lock_path = _refresh_lock_path(cache_dir)
464
+ existing = _read_refresh_lease(lock_path)
465
+ if existing is None or str(existing.get("run_id") or "") != run_id:
466
+ return
467
+ existing["heartbeat_at"] = datetime.now(timezone.utc).isoformat(timespec="seconds")
468
+ existing["expires_at_epoch"] = time.time() + _refresh_lock_ttl_seconds()
469
+ tmp_path = lock_path.with_name(f".{REFRESH_LOCK_FILENAME}.{run_id}.heartbeat.tmp")
470
+ tmp_path.write_text(json.dumps(existing, ensure_ascii=False) + "\n", encoding="utf-8")
471
+ os.replace(tmp_path, lock_path)
472
+
473
+
474
  def _read_refresh_lease(lock_path: Path) -> dict[str, Any] | None:
475
  try:
476
  payload = json.loads(lock_path.read_text(encoding="utf-8"))
 
546
  compute=compute,
547
  )
548
  _set_refresh_state(stage="persisting")
549
+ _refresh_lease_heartbeat(cache_dir, run_id)
550
  artifacts = persist_refresh_artifacts(
551
  cache_dir,
552
  run_id,
 
556
  quest_analysis_payload=quest_analysis_payload,
557
  )
558
  _set_refresh_state(stage="swapping")
559
+ _refresh_lease_heartbeat(cache_dir, run_id)
560
  _replace_runtime_from_files(artifacts.projects_path, artifacts.index_path, artifacts.dashboard)
561
  _release_refresh_lease(cache_dir, run_id)
562
  _set_refresh_state(
 
596
 
597
  org = os.environ.get("ADVISOR_HF_ORG", DEFAULT_HF_ORG).strip() or DEFAULT_HF_ORG
598
  _set_refresh_state(stage="crawling")
599
+ _refresh_lease_heartbeat(cache_dir, run_id)
600
  project_rows = sorted(crawl_projects(org), key=lambda project: project["id"].lower())
601
  projects_payload = {
602
  "generated_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
 
605
  }
606
 
607
  _set_refresh_state(stage="embedding")
608
+ _refresh_lease_heartbeat(cache_dir, run_id)
609
  with tempfile.TemporaryDirectory(prefix="advisor-refresh-") as directory:
610
  project_path = Path(directory) / "projects.json"
611
  project_path.write_text(json.dumps(projects_payload, ensure_ascii=False), encoding="utf-8")
 
627
  )
628
 
629
  _set_refresh_state(stage="quest_analysis")
630
+ _refresh_lease_heartbeat(cache_dir, run_id)
631
  quest_analysis = _analyze_dashboard_quests(
632
  [project.to_refresh_snapshot_dict() for project in projects],
633
  cache_dir=cache_dir,
 
635
  run_id=run_id,
636
  )
637
  _set_refresh_state(stage="atlas")
638
+ _refresh_lease_heartbeat(cache_dir, run_id)
639
  refreshed_dashboard = build_dashboard_payload(
640
  refreshed_index,
641
  quest_matches=quest_analysis["matches_by_project"],
tests/test_app.py CHANGED
@@ -205,6 +205,28 @@ def test_dashboard_refresh_rejects_existing_bucket_lock(monkeypatch, tmp_path) -
205
  raise AssertionError("dashboard refresh should honor an existing bucket lock")
206
 
207
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
208
  def test_dashboard_refresh_embedding_build_runs_in_subprocess(monkeypatch, tmp_path) -> None:
209
  project_path = tmp_path / "projects.json"
210
  index_path = tmp_path / "project_index.json"
 
205
  raise AssertionError("dashboard refresh should honor an existing bucket lock")
206
 
207
 
208
+ def test_dashboard_refresh_heartbeat_extends_bucket_lock(monkeypatch, tmp_path) -> None:
209
+ monkeypatch.setenv("ADVISOR_REFRESH_LOCK_TTL_SECONDS", "120")
210
+ lock_path = tmp_path / "refresh.lock"
211
+ lock_path.write_text(
212
+ json.dumps(
213
+ {
214
+ "run_id": "heartbeat-run",
215
+ "owner": "test",
216
+ "expires_at_epoch": time.time() - 10,
217
+ }
218
+ ),
219
+ encoding="utf-8",
220
+ )
221
+
222
+ app_module._refresh_lease_heartbeat(tmp_path, "heartbeat-run")
223
+ updated = json.loads(lock_path.read_text(encoding="utf-8"))
224
+
225
+ assert updated["run_id"] == "heartbeat-run"
226
+ assert updated["expires_at_epoch"] > time.time() + 100
227
+ assert updated["heartbeat_at"]
228
+
229
+
230
  def test_dashboard_refresh_embedding_build_runs_in_subprocess(monkeypatch, tmp_path) -> None:
231
  project_path = tmp_path / "projects.json"
232
  index_path = tmp_path / "project_index.json"