BlackBox / scripts /simple_chunk_upsert.py
Anas Tabba
Add simple baseline chunk/upsert script and fix semantic chunking perf
8988605
"""
Simple chunk & upsert script for baseline comparison.
Uses all-MiniLM-L6-v2 (fast, 384-dim) with minimal metadata.
Strategies: recursive, semantic
"""
import argparse
import json
import os
import sys
import time
from pathlib import Path
import numpy as np
from dotenv import load_dotenv
from pinecone import Pinecone, ServerlessSpec
from sentence_transformers import SentenceTransformer
BASE_DIR = Path(__file__).resolve().parents[1]
if str(BASE_DIR) not in sys.path:
sys.path.insert(0, str(BASE_DIR))
from src.data_prep.chunking import (
chunk_markdown_baseline_recursive,
chunk_markdown_baseline_semantic,
)
MD_DIR = BASE_DIR / "dataset-pipeline" / "data" / "extracted" / "extracted"
PROCESSED_DIR = BASE_DIR / "data" / "processed"
INDEX_NAME = "ntsb-rag"
MODEL_NAME = "all-MiniLM-L6-v2"
BATCH_SIZE = 100
def parse_args():
parser = argparse.ArgumentParser(description="Simple chunk & upsert for baseline comparison.")
parser.add_argument("--strategy", choices=["recursive", "semantic"], required=True)
parser.add_argument("--reset-index", action="store_true", help="Delete and recreate the Pinecone index.")
return parser.parse_args()
def load_and_chunk(strategy: str) -> list[dict]:
md_files = sorted(MD_DIR.glob("*.md"))
if not md_files:
raise RuntimeError(f"No markdown files in {MD_DIR}")
print(f"Found {len(md_files)} markdown files")
chunk_fn = chunk_markdown_baseline_recursive if strategy == "recursive" else chunk_markdown_baseline_semantic
all_chunks = []
for idx, md_file in enumerate(md_files, 1):
file_chunks = chunk_fn(str(md_file))
all_chunks.extend(file_chunks)
if idx % 25 == 0 or idx == len(md_files):
print(f" Chunked {idx}/{len(md_files)} files -> {len(all_chunks)} chunks", flush=True)
return all_chunks
def embed(chunks: list[dict], batch_size: int = 256) -> np.ndarray:
print(f"Loading embedding model: {MODEL_NAME}")
model = SentenceTransformer(MODEL_NAME)
texts = [c["text"] for c in chunks]
total = len(texts)
print(f"Encoding {total} chunks (batch_size={batch_size})...")
all_vecs = []
start = time.time()
for i in range(0, total, batch_size):
batch = texts[i : i + batch_size]
vecs = model.encode(batch, show_progress_bar=False)
all_vecs.append(np.asarray(vecs, dtype=np.float32))
done = min(i + batch_size, total)
elapsed = max(time.time() - start, 1e-6)
rate = done / elapsed
eta = int((total - done) / max(rate, 1e-6))
print(f" Encoded {done}/{total} | {elapsed:.0f}s | {rate:.0f}/s | ETA: {eta}s", flush=True)
embeddings = np.concatenate(all_vecs, axis=0)
print(f"Embeddings shape: {embeddings.shape} | total: {time.time() - start:.0f}s")
return embeddings
def save_artifacts(chunks: list[dict], embeddings: np.ndarray, strategy: str):
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
chunks_path = PROCESSED_DIR / f"chunks_baseline_{strategy}.json"
emb_path = PROCESSED_DIR / f"embeddings_baseline_{strategy}.npz"
with chunks_path.open("w", encoding="utf-8") as f:
json.dump(chunks, f, indent=2)
np.savez_compressed(emb_path, chunk_ids=np.array([c["chunk_id"] for c in chunks]), embeddings=embeddings)
print(f"Saved: {chunks_path.name}, {emb_path.name}")
def init_index(dimension: int, reset: bool):
load_dotenv(BASE_DIR / ".env")
api_key = os.environ.get("PINECONE_API_KEY")
if not api_key:
raise RuntimeError("PINECONE_API_KEY not set")
pc = Pinecone(api_key=api_key)
existing = {idx.name for idx in pc.list_indexes()}
if reset and INDEX_NAME in existing:
print(f"Deleting index '{INDEX_NAME}'...")
pc.delete_index(INDEX_NAME)
existing.discard(INDEX_NAME)
print("Waiting 30s for index deletion to propagate...")
time.sleep(30)
if INDEX_NAME not in existing:
print(f"Creating index '{INDEX_NAME}' (dim={dimension})...")
pc.create_index(
name=INDEX_NAME,
dimension=dimension,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
return pc.Index(INDEX_NAME)
def upsert_vectors(index, chunks: list[dict], embeddings: np.ndarray, strategy: str):
vectors = []
for chunk, emb in zip(chunks, embeddings):
vectors.append({
"id": chunk["chunk_id"],
"values": emb.tolist(),
"metadata": {
"report_id": chunk.get("report_id", ""),
"ntsb_no": chunk.get("ntsb_no", ""),
"event_date": chunk.get("event_date", "unknown"),
"make": chunk.get("make", "unknown"),
"model": chunk.get("model", "unknown"),
"strategy": strategy,
"text": chunk["text"][:1000], # store truncated text for retrieval display
},
})
total = len(vectors)
print(f"Upserting {total} vectors (batch={BATCH_SIZE})...")
for i in range(0, total, BATCH_SIZE):
batch = vectors[i : i + BATCH_SIZE]
index.upsert(vectors=batch)
done = min(i + BATCH_SIZE, total)
print(f" Upserted {done}/{total}", flush=True)
def main():
args = parse_args()
strategy = args.strategy
print(f"\n{'='*60}")
print(f" Strategy: {strategy} | Model: {MODEL_NAME}")
print(f"{'='*60}\n")
# 1. Chunk
chunks = load_and_chunk(strategy)
# 2. Embed
embeddings = embed(chunks)
# 3. Save locally
save_artifacts(chunks, embeddings, strategy)
# 4. Upsert to Pinecone
index = init_index(dimension=int(embeddings.shape[1]), reset=args.reset_index)
upsert_vectors(index, chunks, embeddings, strategy)
print("\nIndex stats:")
print(index.describe_index_stats())
print("\nDone!")
if __name__ == "__main__":
main()