study-sathi / rag /ingest.py
YousifCreates's picture
Updated 5th chapter of OS
f6bb754
import os
import torch
from dotenv import load_dotenv
from pinecone import Pinecone, ServerlessSpec
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
load_dotenv()
# ── Config ───────────────────────────────────────────────────────────────────
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_INDEX = os.getenv("PINECONE_INDEX", "study-saathi")
EMBEDDING_MODEL = "intfloat/multilingual-e5-large"
DATA_DIR = "data/os_notes"
CHUNK_SIZE = 512
CHUNK_OVERLAP = 64
BATCH_SIZE = 32
DIMENSION = 1024
# ── Device ───────────────────────────────────────────────────────────────────
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"[INFO] Using device: {device}")
# ── Load Embedding Model ──────────────────────────────────────────────────────
print("[INFO] Loading embedding model...")
embedder = SentenceTransformer(EMBEDDING_MODEL, device=device)
# ── Pinecone Setup ────────────────────────────────────────────────────────────
pc = Pinecone(api_key=PINECONE_API_KEY)
if PINECONE_INDEX not in [i.name for i in pc.list_indexes()]:
print(f"[INFO] Creating Pinecone index: {PINECONE_INDEX}")
pc.create_index(
name=PINECONE_INDEX,
dimension=DIMENSION,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
index = pc.Index(PINECONE_INDEX)
# ── Check if file already ingested ───────────────────────────────────────────
def is_already_ingested(filename: str) -> bool:
"""
Query Pinecone for any vector whose metadata source == filename.
If found, the file was already ingested β€” skip it.
"""
topic = os.path.splitext(filename)[0]
# use a dummy zero vector just to run a metadata filter query
dummy_vector = [0.0] * DIMENSION
results = index.query(
vector=dummy_vector,
top_k=1,
include_metadata=True,
filter={"source": {"$eq": filename}}
)
return len(results["matches"]) > 0
# ── Load Documents ────────────────────────────────────────────────────────────
def load_documents(filepath: str, filename: str) -> list:
if filename.endswith(".pdf"):
loader = PyPDFLoader(filepath)
elif filename.endswith(".txt"):
loader = TextLoader(filepath, encoding="utf-8")
else:
return []
loaded = loader.load()
topic = os.path.splitext(filename)[0]
for doc in loaded:
doc.metadata["topic"] = topic
doc.metadata["source"] = filename
print(f"[LOADED] {filename} β€” {len(loaded)} page(s)")
return loaded
# ── Chunk Documents ───────────────────────────────────────────────────────────
def chunk_documents(docs: list) -> list:
splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP
)
chunks = splitter.split_documents(docs)
print(f"[INFO] Total chunks: {len(chunks)}")
return chunks
# ── Embed & Upsert ────────────────────────────────────────────────────────────
def embed_and_upsert(chunks: list, filename: str):
texts = [f"passage: {chunk.page_content}" for chunk in chunks]
print("[INFO] Generating embeddings...")
all_vectors = []
for i in tqdm(range(0, len(texts), BATCH_SIZE)):
batch_texts = texts[i: i + BATCH_SIZE]
batch_chunks = chunks[i: i + BATCH_SIZE]
embeddings = embedder.encode(
batch_texts,
normalize_embeddings=True,
show_progress_bar=False
)
for j, (emb, chunk) in enumerate(zip(embeddings, batch_chunks)):
all_vectors.append({
"id": f"{os.path.splitext(filename)[0]}-chunk-{i + j}",
"values": emb.tolist(),
"metadata": {
"text": chunk.page_content,
"topic": chunk.metadata.get("topic", "unknown"),
"source": chunk.metadata.get("source", "unknown"),
}
})
print("[INFO] Upserting to Pinecone...")
for i in tqdm(range(0, len(all_vectors), 100)):
index.upsert(vectors=all_vectors[i: i + 100])
print(f"[DONE] Upserted {len(all_vectors)} chunks for '{filename}'.")
# ── Main ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
files = [f for f in os.listdir(DATA_DIR) if f.endswith((".pdf", ".txt"))]
if not files:
print("[ERROR] No files found in data/os_notes/")
exit(1)
print(f"[INFO] Found {len(files)} file(s): {files}\n")
for filename in files:
filepath = os.path.join(DATA_DIR, filename)
# ── SKIP CHECK ────────────────────────────────────────────────────
if is_already_ingested(filename):
print(f"[SKIP] '{filename}' already in Pinecone. Skipping...\n")
continue
print(f"[NEW] Processing '{filename}'...")
docs = load_documents(filepath, filename)
if not docs:
print(f"[WARN] Could not load '{filename}'. Skipping.\n")
continue
chunks = chunk_documents(docs)
embed_and_upsert(chunks, filename)
print()
print("[ALL DONE] Ingestion complete. Existing embeddings are untouched.")