Syed Taha commited on
Commit
545cdfa
·
1 Parent(s): 2b4c601

refactor: update embed_and_upsert.py to use chunking strategy and improve logging

Browse files
Files changed (1) hide show
  1. pipeline/embed_and_upsert.py +83 -61
pipeline/embed_and_upsert.py CHANGED
@@ -5,10 +5,11 @@ Embeds all function chunks and upserts to ChromaDB (local) or Pinecone (cloud).
5
  Backend is controlled by config.yaml (vector_store.backend)
6
 
7
  Usage:
8
- python pipeline/embed_and_upsert.py # uses active_profile from config.yaml
9
- python pipeline/embed_and_upsert.py --profile A3 # override profile
10
- python pipeline/embed_and_upsert.py --backend pinecone
11
- python pipeline/embed_and_upsert.py --dry-run # embed only, don't upsert
 
12
  """
13
 
14
  import argparse
@@ -17,6 +18,7 @@ import logging
17
  import pickle
18
  import time
19
  from pathlib import Path
 
20
 
21
  import numpy as np
22
  import yaml
@@ -36,13 +38,6 @@ def load_config(path: str = "config.yaml") -> dict:
36
  with open(path) as f:
37
  return yaml.safe_load(f)
38
 
39
-
40
- def get_profile(cfg: dict, profile_name: str | None = None) -> dict:
41
- name = profile_name or cfg["active_profile"]
42
- profile = cfg["profiles"][name]
43
- log.info("Active profile: %s - %s", name, profile["description"])
44
- return profile, name
45
-
46
  # Text builder
47
  def build_text(chunk: dict, template: str) -> str:
48
  """
@@ -59,31 +54,26 @@ def build_text(chunk: dict, template: str) -> str:
59
  ).strip()
60
 
61
  # Data loading
62
- def load_chunks(cfg: dict, profile_name: str) -> list[dict]:
63
  """Load all chunks from JSONL files."""
64
- chunks_dir = Path(cfg["repos"]["chunks_dir"])
65
  repo_names = cfg["repos"]["names"]
66
 
67
- # For fixed/recursive chunking profiles, use different chunk files
68
- chunking_strategy = cfg["profiles"][profile_name].get("chunking", "function")
 
 
 
 
69
 
70
  all_chunks = []
71
  for repo in repo_names:
72
- # function-level chunks always come from {repo}.jsonl
73
- # fixed/recursive chunks come from {repo}_{strategy}.jsonl
74
- if chunking_strategy == "function":
75
- jsonl_path = chunks_dir / f"{repo}.jsonl"
76
- else:
77
- jsonl_path = chunks_dir / f"{repo}_{chunking_strategy}.jsonl"
78
 
79
  if not jsonl_path.exists():
80
- if chunking_strategy == "function":
81
- log.error("Missing: %s - run parse_functions.py first", jsonl_path)
82
- else:
83
- log.warning(
84
- "Missing: %s - run parse_chunks_fixed.py or parse_chunks_recursive.py",
85
- jsonl_path
86
- )
87
  continue
88
 
89
  count = 0
@@ -133,15 +123,12 @@ def embed_chunks(
133
  Embed all chunks.
134
  Returns: (ids, texts, embeddings_as_list)
135
  """
136
- log.info("Loading embedding model: %s", model_name)
137
  model = SentenceTransformer(model_name)
138
 
139
  ids = [c["chunk_id"] for c in chunks]
140
  texts = [build_text(c, template) for c in chunks]
141
 
142
- log.info("Embedding %d chunks in batches of %d (CPU - this will take a while)...",
143
- len(chunks), batch_size)
144
- log.info("Estimated time: ~%.0f minutes", len(chunks) / batch_size / 3)
145
 
146
  t0 = time.time()
147
  embeddings = model.encode(
@@ -153,11 +140,9 @@ def embed_chunks(
153
  )
154
  duration = time.time() - t0
155
 
156
- log.info(
157
- "- Embedded %d chunks in %.1fs (%.0f chunks/sec)",
158
- len(chunks), duration, len(chunks) / duration
159
- )
160
- log.info("Embedding matrix shape: %s", embeddings.shape)
161
 
162
  return ids, texts, embeddings
163
 
@@ -169,15 +154,14 @@ def upsert_chroma(
169
  texts: list[str],
170
  embeddings: np.ndarray,
171
  cfg: dict,
172
- profile_name: str,
173
  ) -> None:
174
  import chromadb
175
 
176
  chroma_cfg = cfg["vector_store"]["chroma"]
177
  persist_dir = chroma_cfg["persist_directory"]
178
  # One collection per chunking strategy
179
- chunking = cfg["profiles"][profile_name].get("chunking", "function") or "none"
180
- collection_name = chroma_cfg["collection_name"].format(chunking=chunking)
181
 
182
  log.info("Connecting to ChromaDB at: %s", persist_dir)
183
  client = chromadb.PersistentClient(path=persist_dir)
@@ -198,7 +182,7 @@ def upsert_chroma(
198
  total = len(chunks)
199
  log.info("Upserting %d vectors to collection '%s'...", total, collection_name)
200
 
201
- for i in tqdm(range(0, total, BATCH), desc="Upserting", unit="batch"):
202
  batch_ids = ids[i:i+BATCH]
203
  batch_texts = texts[i:i+BATCH]
204
  batch_embeddings = embeddings[i:i+BATCH].tolist()
@@ -254,7 +238,7 @@ def upsert_pinecone(
254
  total = len(chunks)
255
  log.info("Upserting %d vectors to Pinecone index '%s'...", total, index_name)
256
 
257
- for i in tqdm(range(0, total, BATCH), desc="Upserting", unit="batch"):
258
  batch_ids = ids[i:i+BATCH]
259
  batch_embeddings = embeddings[i:i+BATCH].tolist()
260
  batch_chunks = chunks[i:i+BATCH]
@@ -289,13 +273,13 @@ def save_embeddings_cache(
289
  ids: list[str],
290
  texts: list[str],
291
  embeddings: np.ndarray,
292
- profile_name: str,
293
  ) -> None:
294
  """Save embeddings + chunk data to disk so we don't re-embed during eval."""
295
  cache_dir = Path("data/embeddings")
296
  cache_dir.mkdir(parents=True, exist_ok=True)
297
 
298
- cache_path = cache_dir / f"embeddings_{profile_name}.pkl"
299
  payload = {
300
  "ids": ids,
301
  "texts": texts,
@@ -309,11 +293,57 @@ def save_embeddings_cache(
309
  log.info("- Saved embedding cache: %s (%.1f MB)", cache_path, size_mb)
310
 
311
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
312
  # Main
313
  def main() -> None:
314
  parser = argparse.ArgumentParser(description="Embed chunks and upsert to vector store")
315
- parser.add_argument("--profile", type=str, default=None,
316
- help="Ablation profile to use (A1–A5 or baseline). Default: from config.yaml")
 
 
 
 
 
317
  parser.add_argument("--backend", type=str, default=None,
318
  choices=["chroma", "pinecone"],
319
  help="Vector store backend. Default: from config.yaml")
@@ -324,29 +354,24 @@ def main() -> None:
324
  args = parser.parse_args()
325
 
326
  # Load config + env
327
- from dotenv import load_dotenv
328
  load_dotenv()
329
 
330
  cfg = load_config()
331
- profile, profile_name = get_profile(cfg, args.profile)
332
-
333
- if profile.get("chunking") is None and profile_name != "baseline":
334
- log.error("Profile '%s' has no chunking strategy - nothing to embed.", profile_name)
335
- return
336
 
337
  backend = args.backend or cfg["vector_store"]["backend"]
338
  embed_cfg = cfg["embedding"]
339
 
340
  log.info("=" * 65)
341
  log.info("Substrate - Embed & Upsert")
342
- log.info("Profile : %s", profile_name)
343
  log.info("Backend : %s", backend)
344
  log.info("Model : %s", embed_cfg["model"])
345
  log.info("=" * 65)
346
 
347
  # 1. Load chunks
348
- log.info("\nLoading chunks...")
349
- chunks = load_chunks(cfg, profile_name)
350
  if not chunks:
351
  log.error("No chunks loaded. Aborting.")
352
  return
@@ -366,7 +391,7 @@ def main() -> None:
366
 
367
  # 4. Save cache
368
  if not args.no_cache:
369
- save_embeddings_cache(chunks, ids, texts, embeddings, profile_name)
370
 
371
  if args.dry_run:
372
  log.info("Dry run - skipping upsert.")
@@ -374,13 +399,10 @@ def main() -> None:
374
 
375
  # 5. Upsert
376
  if backend == "chroma":
377
- upsert_chroma(chunks, ids, texts, embeddings, cfg, profile_name)
 
378
  elif backend == "pinecone":
379
  upsert_pinecone(chunks, ids, embeddings, cfg)
380
 
381
- log.info("")
382
- log.info("Done. Next step: python pipeline/build_bm25.py")
383
-
384
-
385
  if __name__ == "__main__":
386
  main()
 
5
  Backend is controlled by config.yaml (vector_store.backend)
6
 
7
  Usage:
8
+ python pipeline/embed_and_upsert.py --strategy function
9
+ python pipeline/embed_and_upsert.py --strategy fixed
10
+ python pipeline/embed_and_upsert.py --strategy recursive
11
+ python pipeline/embed_and_upsert.py --strategy fixed --backend pinecone
12
+ python pipeline/embed_and_upsert.py --strategy function --dry-run
13
  """
14
 
15
  import argparse
 
18
  import pickle
19
  import time
20
  from pathlib import Path
21
+ from dotenv import load_dotenv
22
 
23
  import numpy as np
24
  import yaml
 
38
  with open(path) as f:
39
  return yaml.safe_load(f)
40
 
 
 
 
 
 
 
 
41
  # Text builder
42
  def build_text(chunk: dict, template: str) -> str:
43
  """
 
54
  ).strip()
55
 
56
  # Data loading
57
+ def load_chunks(cfg: dict, chunking_strategy: str) -> list[dict]:
58
  """Load all chunks from JSONL files."""
 
59
  repo_names = cfg["repos"]["names"]
60
 
61
+ # Directory depends on strategy:
62
+ # function -> data/chunks_function/{repo}.jsonl
63
+ # fixed -> data/chunks_fixed/{repo}.jsonl
64
+ # recursive -> data/chunks_recursive/{repo}.jsonl
65
+ chunks_dir_template = cfg["repos"]["chunks_dir"]
66
+ chunks_dir = Path(chunks_dir_template.format(chunking=chunking_strategy))
67
 
68
  all_chunks = []
69
  for repo in repo_names:
70
+ jsonl_path = chunks_dir / f"{repo}.jsonl"
 
 
 
 
 
71
 
72
  if not jsonl_path.exists():
73
+ log.warning(
74
+ "Missing: %s - run parse_chunks.py --strategy %s first",
75
+ jsonl_path, chunking_strategy or "function"
76
+ )
 
 
 
77
  continue
78
 
79
  count = 0
 
123
  Embed all chunks.
124
  Returns: (ids, texts, embeddings_as_list)
125
  """
 
126
  model = SentenceTransformer(model_name)
127
 
128
  ids = [c["chunk_id"] for c in chunks]
129
  texts = [build_text(c, template) for c in chunks]
130
 
131
+ log.info("Embedding %d chunks with batch size %d...", len(chunks), batch_size)
 
 
132
 
133
  t0 = time.time()
134
  embeddings = model.encode(
 
140
  )
141
  duration = time.time() - t0
142
 
143
+ throughput = len(chunks) / duration
144
+ log.info("Embedded %d chunks in %.1fs (%.0f chunks/sec)", len(chunks), duration, throughput)
145
+ log.info(" Embedding matrix shape: %s", embeddings.shape)
 
 
146
 
147
  return ids, texts, embeddings
148
 
 
154
  texts: list[str],
155
  embeddings: np.ndarray,
156
  cfg: dict,
157
+ chunking_strategy: str,
158
  ) -> None:
159
  import chromadb
160
 
161
  chroma_cfg = cfg["vector_store"]["chroma"]
162
  persist_dir = chroma_cfg["persist_directory"]
163
  # One collection per chunking strategy
164
+ collection_name = chroma_cfg["collection_name"].format(chunking=chunking_strategy)
 
165
 
166
  log.info("Connecting to ChromaDB at: %s", persist_dir)
167
  client = chromadb.PersistentClient(path=persist_dir)
 
182
  total = len(chunks)
183
  log.info("Upserting %d vectors to collection '%s'...", total, collection_name)
184
 
185
+ for i in tqdm(range(0, total, BATCH), desc="Upserting", unit="batch", leave=False):
186
  batch_ids = ids[i:i+BATCH]
187
  batch_texts = texts[i:i+BATCH]
188
  batch_embeddings = embeddings[i:i+BATCH].tolist()
 
238
  total = len(chunks)
239
  log.info("Upserting %d vectors to Pinecone index '%s'...", total, index_name)
240
 
241
+ for i in tqdm(range(0, total, BATCH), desc="Upserting", unit="batch", leave=False):
242
  batch_ids = ids[i:i+BATCH]
243
  batch_embeddings = embeddings[i:i+BATCH].tolist()
244
  batch_chunks = chunks[i:i+BATCH]
 
273
  ids: list[str],
274
  texts: list[str],
275
  embeddings: np.ndarray,
276
+ chunking_strategy: str,
277
  ) -> None:
278
  """Save embeddings + chunk data to disk so we don't re-embed during eval."""
279
  cache_dir = Path("data/embeddings")
280
  cache_dir.mkdir(parents=True, exist_ok=True)
281
 
282
+ cache_path = cache_dir / f"embeddings_{chunking_strategy}.pkl"
283
  payload = {
284
  "ids": ids,
285
  "texts": texts,
 
293
  log.info("- Saved embedding cache: %s (%.1f MB)", cache_path, size_mb)
294
 
295
 
296
+ # Sanity check (query from database)
297
+ def sanity_check_chroma(
298
+ cfg: dict,
299
+ chunking_strategy: str,
300
+ model_name: str,
301
+ ) -> None:
302
+ """Query ChromaDB collection to verify data was upserted correctly."""
303
+ import chromadb
304
+
305
+ chroma_cfg = cfg["vector_store"]["chroma"]
306
+ persist_dir = chroma_cfg["persist_directory"]
307
+ collection_name = chroma_cfg["collection_name"].format(chunking=chunking_strategy)
308
+
309
+ log.info("Sanity check - querying ChromaDB collection '%s':", collection_name)
310
+
311
+ client = chromadb.PersistentClient(path=persist_dir)
312
+ collection = client.get_collection(name=collection_name)
313
+
314
+ # Embed query
315
+ model = SentenceTransformer(model_name)
316
+ query_text = "numpy dtype float64"
317
+ query_emb = model.encode(query_text, normalize_embeddings=True, show_progress_bar=False, convert_to_numpy=True)
318
+
319
+ # Query collection
320
+ results = collection.query(
321
+ query_embeddings=[query_emb.tolist()],
322
+ n_results=5,
323
+ )
324
+
325
+ if results and results["ids"] and len(results["ids"]) > 0:
326
+ for i, (id_, dist) in enumerate(zip(results["ids"][0], results["distances"][0])):
327
+ meta = results["metadatas"][0][i] if results["metadatas"] else {}
328
+ log.info(
329
+ " [%.3f] %s::%s::%s",
330
+ 1 - dist, # chromadb returns distance, convert to similarity
331
+ meta.get("repo", "?"),
332
+ meta.get("filepath", "?"),
333
+ meta.get("function_name", "?")
334
+ )
335
+
336
+
337
  # Main
338
  def main() -> None:
339
  parser = argparse.ArgumentParser(description="Embed chunks and upsert to vector store")
340
+ parser.add_argument(
341
+ "--strategy",
342
+ type=str,
343
+ choices=["function", "fixed", "recursive"],
344
+ required=True,
345
+ help="Chunking strategy (required)",
346
+ )
347
  parser.add_argument("--backend", type=str, default=None,
348
  choices=["chroma", "pinecone"],
349
  help="Vector store backend. Default: from config.yaml")
 
354
  args = parser.parse_args()
355
 
356
  # Load config + env
 
357
  load_dotenv()
358
 
359
  cfg = load_config()
360
+ chunking_strategy = args.strategy
 
 
 
 
361
 
362
  backend = args.backend or cfg["vector_store"]["backend"]
363
  embed_cfg = cfg["embedding"]
364
 
365
  log.info("=" * 65)
366
  log.info("Substrate - Embed & Upsert")
367
+ log.info("Strategy : %s", chunking_strategy)
368
  log.info("Backend : %s", backend)
369
  log.info("Model : %s", embed_cfg["model"])
370
  log.info("=" * 65)
371
 
372
  # 1. Load chunks
373
+ log.info("Loading chunks...")
374
+ chunks = load_chunks(cfg, chunking_strategy)
375
  if not chunks:
376
  log.error("No chunks loaded. Aborting.")
377
  return
 
391
 
392
  # 4. Save cache
393
  if not args.no_cache:
394
+ save_embeddings_cache(chunks, ids, texts, embeddings, chunking_strategy)
395
 
396
  if args.dry_run:
397
  log.info("Dry run - skipping upsert.")
 
399
 
400
  # 5. Upsert
401
  if backend == "chroma":
402
+ upsert_chroma(chunks, ids, texts, embeddings, cfg, chunking_strategy)
403
+ sanity_check_chroma(cfg, chunking_strategy, embed_cfg["model"])
404
  elif backend == "pinecone":
405
  upsert_pinecone(chunks, ids, embeddings, cfg)
406
 
 
 
 
 
407
  if __name__ == "__main__":
408
  main()