""" Simple chunk & upsert script for baseline comparison. Uses all-MiniLM-L6-v2 (fast, 384-dim) with minimal metadata. Strategies: recursive, semantic """ import argparse import json import os import sys import time from pathlib import Path import numpy as np from dotenv import load_dotenv from pinecone import Pinecone, ServerlessSpec from sentence_transformers import SentenceTransformer BASE_DIR = Path(__file__).resolve().parents[1] if str(BASE_DIR) not in sys.path: sys.path.insert(0, str(BASE_DIR)) from src.data_prep.chunking import ( chunk_markdown_baseline_recursive, chunk_markdown_baseline_semantic, ) MD_DIR = BASE_DIR / "dataset-pipeline" / "data" / "extracted" / "extracted" PROCESSED_DIR = BASE_DIR / "data" / "processed" INDEX_NAME = "ntsb-rag" MODEL_NAME = "all-MiniLM-L6-v2" BATCH_SIZE = 100 def parse_args(): parser = argparse.ArgumentParser(description="Simple chunk & upsert for baseline comparison.") parser.add_argument("--strategy", choices=["recursive", "semantic"], required=True) parser.add_argument("--reset-index", action="store_true", help="Delete and recreate the Pinecone index.") return parser.parse_args() def load_and_chunk(strategy: str) -> list[dict]: md_files = sorted(MD_DIR.glob("*.md")) if not md_files: raise RuntimeError(f"No markdown files in {MD_DIR}") print(f"Found {len(md_files)} markdown files") chunk_fn = chunk_markdown_baseline_recursive if strategy == "recursive" else chunk_markdown_baseline_semantic all_chunks = [] for idx, md_file in enumerate(md_files, 1): file_chunks = chunk_fn(str(md_file)) all_chunks.extend(file_chunks) if idx % 25 == 0 or idx == len(md_files): print(f" Chunked {idx}/{len(md_files)} files -> {len(all_chunks)} chunks", flush=True) return all_chunks def embed(chunks: list[dict], batch_size: int = 256) -> np.ndarray: print(f"Loading embedding model: {MODEL_NAME}") model = SentenceTransformer(MODEL_NAME) texts = [c["text"] for c in chunks] total = len(texts) print(f"Encoding {total} chunks (batch_size={batch_size})...") all_vecs = [] start = time.time() for i in range(0, total, batch_size): batch = texts[i : i + batch_size] vecs = model.encode(batch, show_progress_bar=False) all_vecs.append(np.asarray(vecs, dtype=np.float32)) done = min(i + batch_size, total) elapsed = max(time.time() - start, 1e-6) rate = done / elapsed eta = int((total - done) / max(rate, 1e-6)) print(f" Encoded {done}/{total} | {elapsed:.0f}s | {rate:.0f}/s | ETA: {eta}s", flush=True) embeddings = np.concatenate(all_vecs, axis=0) print(f"Embeddings shape: {embeddings.shape} | total: {time.time() - start:.0f}s") return embeddings def save_artifacts(chunks: list[dict], embeddings: np.ndarray, strategy: str): PROCESSED_DIR.mkdir(parents=True, exist_ok=True) chunks_path = PROCESSED_DIR / f"chunks_baseline_{strategy}.json" emb_path = PROCESSED_DIR / f"embeddings_baseline_{strategy}.npz" with chunks_path.open("w", encoding="utf-8") as f: json.dump(chunks, f, indent=2) np.savez_compressed(emb_path, chunk_ids=np.array([c["chunk_id"] for c in chunks]), embeddings=embeddings) print(f"Saved: {chunks_path.name}, {emb_path.name}") def init_index(dimension: int, reset: bool): load_dotenv(BASE_DIR / ".env") api_key = os.environ.get("PINECONE_API_KEY") if not api_key: raise RuntimeError("PINECONE_API_KEY not set") pc = Pinecone(api_key=api_key) existing = {idx.name for idx in pc.list_indexes()} if reset and INDEX_NAME in existing: print(f"Deleting index '{INDEX_NAME}'...") pc.delete_index(INDEX_NAME) existing.discard(INDEX_NAME) print("Waiting 30s for index deletion to propagate...") time.sleep(30) if INDEX_NAME not in existing: print(f"Creating index '{INDEX_NAME}' (dim={dimension})...") pc.create_index( name=INDEX_NAME, dimension=dimension, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1"), ) return pc.Index(INDEX_NAME) def upsert_vectors(index, chunks: list[dict], embeddings: np.ndarray, strategy: str): vectors = [] for chunk, emb in zip(chunks, embeddings): vectors.append({ "id": chunk["chunk_id"], "values": emb.tolist(), "metadata": { "report_id": chunk.get("report_id", ""), "ntsb_no": chunk.get("ntsb_no", ""), "event_date": chunk.get("event_date", "unknown"), "make": chunk.get("make", "unknown"), "model": chunk.get("model", "unknown"), "strategy": strategy, "text": chunk["text"][:1000], # store truncated text for retrieval display }, }) total = len(vectors) print(f"Upserting {total} vectors (batch={BATCH_SIZE})...") for i in range(0, total, BATCH_SIZE): batch = vectors[i : i + BATCH_SIZE] index.upsert(vectors=batch) done = min(i + BATCH_SIZE, total) print(f" Upserted {done}/{total}", flush=True) def main(): args = parse_args() strategy = args.strategy print(f"\n{'='*60}") print(f" Strategy: {strategy} | Model: {MODEL_NAME}") print(f"{'='*60}\n") # 1. Chunk chunks = load_and_chunk(strategy) # 2. Embed embeddings = embed(chunks) # 3. Save locally save_artifacts(chunks, embeddings, strategy) # 4. Upsert to Pinecone index = init_index(dimension=int(embeddings.shape[1]), reset=args.reset_index) upsert_vectors(index, chunks, embeddings, strategy) print("\nIndex stats:") print(index.describe_index_stats()) print("\nDone!") if __name__ == "__main__": main()