JacobLinCool commited on
Commit
b7d5967
·
verified ·
1 Parent(s): 4791c0a

fix: harden dashboard refresh embedding

Browse files

Sync GitHub commit a6a05cd with bucket-backed HF cache, embedding refresh timeout, and streamed progress logs.

Files changed (4) hide show
  1. README.md +4 -1
  2. app.py +80 -4
  3. scripts/build_project_index.py +11 -1
  4. tests/test_app.py +23 -11
README.md CHANGED
@@ -220,6 +220,7 @@ ADVISOR_ADAPTER_REVISION=25de69bcde397e1bcdd852923b56a42f10222650
220
  ADVISOR_QUEST_ANALYZER_BACKEND=minicpm-transformers
221
  ADVISOR_QUEST_ADAPTER_ID=artifacts/quest-lora
222
  ADVISOR_CACHE_DIR=/data/advisor-cache
 
223
  ADVISOR_EMBEDDING_MODEL_REPO=ggml-org/embeddinggemma-300m-qat-q8_0-GGUF
224
  ADVISOR_EMBEDDING_MODEL_FILE=embeddinggemma-300m-qat-Q8_0.gguf
225
  ADVISOR_ASR_MODEL_ID=nvidia/nemotron-speech-streaming-en-0.6b
@@ -238,7 +239,9 @@ The retrieval query embedder downloads the GGUF model through `huggingface_hub`
238
  `ADVISOR_EMBEDDING_MODEL_PATH` points to a local file. `/api/transcribe` uses the same ZeroGPU wrapper for Nemotron ASR.
239
  On macOS local runs, the app automatically runs llama.cpp query embedding in a worker process so the MiniCPM PyTorch
240
  runtime and llama.cpp do not load conflicting OpenMP runtimes in the same Python process. Dashboard refresh also builds
241
- the GGUF embedding index in a subprocess before returning to the app process for MiniCPM quest analysis.
 
 
242
 
243
  ## Test
244
 
 
220
  ADVISOR_QUEST_ANALYZER_BACKEND=minicpm-transformers
221
  ADVISOR_QUEST_ADAPTER_ID=artifacts/quest-lora
222
  ADVISOR_CACHE_DIR=/data/advisor-cache
223
+ ADVISOR_REFRESH_EMBEDDING_TIMEOUT_SECONDS=1800
224
  ADVISOR_EMBEDDING_MODEL_REPO=ggml-org/embeddinggemma-300m-qat-q8_0-GGUF
225
  ADVISOR_EMBEDDING_MODEL_FILE=embeddinggemma-300m-qat-Q8_0.gguf
226
  ADVISOR_ASR_MODEL_ID=nvidia/nemotron-speech-streaming-en-0.6b
 
239
  `ADVISOR_EMBEDDING_MODEL_PATH` points to a local file. `/api/transcribe` uses the same ZeroGPU wrapper for Nemotron ASR.
240
  On macOS local runs, the app automatically runs llama.cpp query embedding in a worker process so the MiniCPM PyTorch
241
  runtime and llama.cpp do not load conflicting OpenMP runtimes in the same Python process. Dashboard refresh also builds
242
+ the GGUF embedding index in a subprocess before returning to the app process for MiniCPM quest analysis. When
243
+ `ADVISOR_CACHE_DIR` is set and `HF_HOME` is not, the refresh subprocess stores Hugging Face downloads under
244
+ `$ADVISOR_CACHE_DIR/huggingface` so the mounted bucket keeps the embedding model cache across refreshes and restarts.
245
 
246
  ## Test
247
 
app.py CHANGED
@@ -4,10 +4,12 @@ from datetime import datetime, timezone
4
  import json
5
  import os
6
  from pathlib import Path
 
7
  import subprocess
8
  import sys
9
  import tempfile
10
  from threading import Lock, Thread
 
11
  from typing import Any, Iterator
12
  from uuid import uuid4
13
 
@@ -60,6 +62,8 @@ PROFILE_FIELDS = ["skills", "time", "preferences", "constraints"]
60
  MAX_AUDIO_UPLOAD_BYTES = 25 * 1024 * 1024
61
  AUDIO_UPLOAD_SUFFIXES = {".aac", ".aif", ".aiff", ".flac", ".m4a", ".mp3", ".oga", ".ogg", ".opus", ".wav", ".webm"}
62
  DEFAULT_HF_ORG = "build-small-hackathon"
 
 
63
  REFRESH_STAGE_LABELS = {
64
  "crawling": "Fetching public Spaces",
65
  "embedding": "Rebuilding the embedding index",
@@ -283,10 +287,7 @@ def _build_refresh_index_payload(project_path: Path, index_path: Path) -> dict[s
283
  if n_threads:
284
  command.extend(["--n-threads", n_threads])
285
 
286
- completed = subprocess.run(command, cwd=ROOT, capture_output=True, text=True, check=False)
287
- if completed.returncode != 0:
288
- detail = "\n".join(part for part in (completed.stdout.strip(), completed.stderr.strip()) if part)
289
- raise RuntimeError(f"refresh embedding index build failed with exit code {completed.returncode}: {detail}")
290
  try:
291
  payload = json.loads(index_path.read_text(encoding="utf-8"))
292
  except (OSError, json.JSONDecodeError) as error:
@@ -296,6 +297,81 @@ def _build_refresh_index_payload(project_path: Path, index_path: Path) -> dict[s
296
  return payload
297
 
298
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
299
  def _replace_runtime_from_files(projects_path: Path, index_path: Path, refreshed_dashboard: dict[str, Any]) -> None:
300
  global index, engine, _cpu_engine, dashboard_payload
301
  new_index = ProjectIndex.from_files(projects_path, index_path)
 
4
  import json
5
  import os
6
  from pathlib import Path
7
+ import selectors
8
  import subprocess
9
  import sys
10
  import tempfile
11
  from threading import Lock, Thread
12
+ import time
13
  from typing import Any, Iterator
14
  from uuid import uuid4
15
 
 
62
  MAX_AUDIO_UPLOAD_BYTES = 25 * 1024 * 1024
63
  AUDIO_UPLOAD_SUFFIXES = {".aac", ".aif", ".aiff", ".flac", ".m4a", ".mp3", ".oga", ".ogg", ".opus", ".wav", ".webm"}
64
  DEFAULT_HF_ORG = "build-small-hackathon"
65
+ DEFAULT_REFRESH_EMBEDDING_TIMEOUT_SECONDS = 1800
66
+ REFRESH_SUBPROCESS_LOG_TAIL_LINES = 80
67
  REFRESH_STAGE_LABELS = {
68
  "crawling": "Fetching public Spaces",
69
  "embedding": "Rebuilding the embedding index",
 
287
  if n_threads:
288
  command.extend(["--n-threads", n_threads])
289
 
290
+ _run_refresh_index_command(command)
 
 
 
291
  try:
292
  payload = json.loads(index_path.read_text(encoding="utf-8"))
293
  except (OSError, json.JSONDecodeError) as error:
 
297
  return payload
298
 
299
 
300
+ def _run_refresh_index_command(command: list[str]) -> None:
301
+ timeout_seconds = _refresh_embedding_timeout_seconds()
302
+ output_tail: list[str] = []
303
+ process = subprocess.Popen(
304
+ command,
305
+ cwd=ROOT,
306
+ env=_refresh_subprocess_env(),
307
+ stdout=subprocess.PIPE,
308
+ stderr=subprocess.STDOUT,
309
+ text=True,
310
+ bufsize=1,
311
+ )
312
+ assert process.stdout is not None
313
+ selector = selectors.DefaultSelector()
314
+ selector.register(process.stdout, selectors.EVENT_READ)
315
+ started = time.monotonic()
316
+ try:
317
+ while process.poll() is None:
318
+ for key, _event in selector.select(timeout=1):
319
+ line = key.fileobj.readline()
320
+ if line:
321
+ _record_refresh_subprocess_line(output_tail, line)
322
+ if time.monotonic() - started > timeout_seconds:
323
+ process.kill()
324
+ process.wait(timeout=5)
325
+ raise RuntimeError(
326
+ "refresh embedding index build timed out "
327
+ f"after {timeout_seconds} seconds. Last output:\n{_format_output_tail(output_tail)}"
328
+ )
329
+ for line in process.stdout:
330
+ _record_refresh_subprocess_line(output_tail, line)
331
+ finally:
332
+ selector.close()
333
+ process.stdout.close()
334
+ if process.returncode != 0:
335
+ raise RuntimeError(
336
+ "refresh embedding index build failed "
337
+ f"with exit code {process.returncode}. Last output:\n{_format_output_tail(output_tail)}"
338
+ )
339
+
340
+
341
+ def _refresh_subprocess_env() -> dict[str, str]:
342
+ env = os.environ.copy()
343
+ if not env.get("HF_HOME"):
344
+ cache_dir = cache_dir_from_env()
345
+ if cache_dir is not None:
346
+ hf_home = cache_dir / "huggingface"
347
+ hf_home.mkdir(parents=True, exist_ok=True)
348
+ env["HF_HOME"] = str(hf_home)
349
+ return env
350
+
351
+
352
+ def _refresh_embedding_timeout_seconds() -> int:
353
+ raw = os.environ.get("ADVISOR_REFRESH_EMBEDDING_TIMEOUT_SECONDS", "").strip()
354
+ if not raw:
355
+ return DEFAULT_REFRESH_EMBEDDING_TIMEOUT_SECONDS
356
+ timeout = int(raw)
357
+ if timeout <= 0:
358
+ raise RuntimeError("ADVISOR_REFRESH_EMBEDDING_TIMEOUT_SECONDS must be a positive integer.")
359
+ return timeout
360
+
361
+
362
+ def _record_refresh_subprocess_line(output_tail: list[str], raw_line: str) -> None:
363
+ line = raw_line.rstrip()
364
+ if not line:
365
+ return
366
+ print(f"[dashboard-refresh embedding] {line}", flush=True)
367
+ output_tail.append(line)
368
+ del output_tail[:-REFRESH_SUBPROCESS_LOG_TAIL_LINES]
369
+
370
+
371
+ def _format_output_tail(output_tail: list[str]) -> str:
372
+ return "\n".join(output_tail) if output_tail else "(no output)"
373
+
374
+
375
  def _replace_runtime_from_files(projects_path: Path, index_path: Path, refreshed_dashboard: dict[str, Any]) -> None:
376
  global index, engine, _cpu_engine, dashboard_payload
377
  new_index = ProjectIndex.from_files(projects_path, index_path)
scripts/build_project_index.py CHANGED
@@ -68,6 +68,7 @@ def build_payload(
68
  ) -> dict:
69
  data = json.loads(project_path.read_text(encoding="utf-8"))
70
  projects = [Project.from_dict(item) for item in data["projects"]]
 
71
  embedder = LlamaCppEmbedder(
72
  model_repo=model_repo,
73
  model_file=model_file,
@@ -76,7 +77,16 @@ def build_payload(
76
  n_threads=n_threads,
77
  verbose=False,
78
  )
79
- embeddings = [embedder.embed(project.searchable_text) for project in projects]
 
 
 
 
 
 
 
 
 
80
  metadata = {
81
  "model_repo": model_repo,
82
  "model_file": model_file,
 
68
  ) -> dict:
69
  data = json.loads(project_path.read_text(encoding="utf-8"))
70
  projects = [Project.from_dict(item) for item in data["projects"]]
71
+ print(f"loaded {len(projects)} projects from {project_path}", flush=True)
72
  embedder = LlamaCppEmbedder(
73
  model_repo=model_repo,
74
  model_file=model_file,
 
77
  n_threads=n_threads,
78
  verbose=False,
79
  )
80
+ print(
81
+ "embedding projects with "
82
+ f"{model_repo}/{model_file}; first vector may download and load the GGUF model",
83
+ flush=True,
84
+ )
85
+ embeddings = []
86
+ for index, project in enumerate(projects, start=1):
87
+ embeddings.append(embedder.embed(project.searchable_text))
88
+ if index == 1 or index % 10 == 0 or index == len(projects):
89
+ print(f"embedded {index}/{len(projects)} projects", flush=True)
90
  metadata = {
91
  "model_repo": model_repo,
92
  "model_file": model_file,
tests/test_app.py CHANGED
@@ -173,25 +173,16 @@ def test_dashboard_refresh_embedding_build_runs_in_subprocess(monkeypatch, tmp_p
173
  monkeypatch.setenv("ADVISOR_EMBEDDING_MODEL_PATH", "/tmp/model.gguf")
174
  captured = {}
175
 
176
- def fake_run(command, *, cwd, capture_output, text, check):
177
  captured["command"] = command
178
- captured["cwd"] = cwd
179
- captured["capture_output"] = capture_output
180
- captured["text"] = text
181
- captured["check"] = check
182
  index_path.write_text(json.dumps({"schema": "ok"}), encoding="utf-8")
183
- return app_module.subprocess.CompletedProcess(command, 0, "wrote index", "")
184
 
185
- monkeypatch.setattr(app_module.subprocess, "run", fake_run)
186
 
187
  payload = app_module._build_refresh_index_payload(project_path, index_path)
188
 
189
  command = captured["command"]
190
  assert payload == {"schema": "ok"}
191
- assert captured["cwd"] == app_module.ROOT
192
- assert captured["capture_output"] is True
193
- assert captured["text"] is True
194
- assert captured["check"] is False
195
  assert command[1].endswith("scripts/build_project_index.py")
196
  assert command[command.index("--model-repo") + 1] == "test/repo"
197
  assert command[command.index("--model-file") + 1] == "model.gguf"
@@ -200,6 +191,27 @@ def test_dashboard_refresh_embedding_build_runs_in_subprocess(monkeypatch, tmp_p
200
  assert command[command.index("--builder") + 1] == "app.py:/api/dashboard/refresh"
201
 
202
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
203
  def test_dashboard_refresh_persists_and_swaps_latest(monkeypatch, tmp_path) -> None:
204
  monkeypatch.setenv("ADVISOR_CACHE_DIR", str(tmp_path))
205
  _reset_refresh_state()
 
173
  monkeypatch.setenv("ADVISOR_EMBEDDING_MODEL_PATH", "/tmp/model.gguf")
174
  captured = {}
175
 
176
+ def fake_run_refresh_index_command(command):
177
  captured["command"] = command
 
 
 
 
178
  index_path.write_text(json.dumps({"schema": "ok"}), encoding="utf-8")
 
179
 
180
+ monkeypatch.setattr(app_module, "_run_refresh_index_command", fake_run_refresh_index_command)
181
 
182
  payload = app_module._build_refresh_index_payload(project_path, index_path)
183
 
184
  command = captured["command"]
185
  assert payload == {"schema": "ok"}
 
 
 
 
186
  assert command[1].endswith("scripts/build_project_index.py")
187
  assert command[command.index("--model-repo") + 1] == "test/repo"
188
  assert command[command.index("--model-file") + 1] == "model.gguf"
 
191
  assert command[command.index("--builder") + 1] == "app.py:/api/dashboard/refresh"
192
 
193
 
194
+ def test_refresh_subprocess_env_uses_cache_dir_for_hf_home(monkeypatch, tmp_path) -> None:
195
+ monkeypatch.setenv("ADVISOR_CACHE_DIR", str(tmp_path))
196
+ monkeypatch.delenv("HF_HOME", raising=False)
197
+
198
+ env = app_module._refresh_subprocess_env()
199
+
200
+ assert env["HF_HOME"] == str(tmp_path / "huggingface")
201
+ assert (tmp_path / "huggingface").is_dir()
202
+
203
+
204
+ def test_refresh_embedding_timeout_rejects_non_positive_env(monkeypatch) -> None:
205
+ monkeypatch.setenv("ADVISOR_REFRESH_EMBEDDING_TIMEOUT_SECONDS", "0")
206
+
207
+ try:
208
+ app_module._refresh_embedding_timeout_seconds()
209
+ except RuntimeError as error:
210
+ assert "must be a positive integer" in str(error)
211
+ else:
212
+ raise AssertionError("non-positive refresh embedding timeout should fail")
213
+
214
+
215
  def test_dashboard_refresh_persists_and_swaps_latest(monkeypatch, tmp_path) -> None:
216
  monkeypatch.setenv("ADVISOR_CACHE_DIR", str(tmp_path))
217
  _reset_refresh_state()