import os import torch from dotenv import load_dotenv from pinecone import Pinecone, ServerlessSpec from langchain_community.document_loaders import PyPDFLoader, TextLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from sentence_transformers import SentenceTransformer from tqdm import tqdm load_dotenv() # ── Config ─────────────────────────────────────────────────────────────────── PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") PINECONE_INDEX = os.getenv("PINECONE_INDEX", "study-saathi") EMBEDDING_MODEL = "intfloat/multilingual-e5-large" DATA_DIR = "data/os_notes" CHUNK_SIZE = 512 CHUNK_OVERLAP = 64 BATCH_SIZE = 32 DIMENSION = 1024 # ── Device ─────────────────────────────────────────────────────────────────── device = "cuda" if torch.cuda.is_available() else "cpu" print(f"[INFO] Using device: {device}") # ── Load Embedding Model ────────────────────────────────────────────────────── print("[INFO] Loading embedding model...") embedder = SentenceTransformer(EMBEDDING_MODEL, device=device) # ── Pinecone Setup ──────────────────────────────────────────────────────────── pc = Pinecone(api_key=PINECONE_API_KEY) if PINECONE_INDEX not in [i.name for i in pc.list_indexes()]: print(f"[INFO] Creating Pinecone index: {PINECONE_INDEX}") pc.create_index( name=PINECONE_INDEX, dimension=DIMENSION, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-east-1") ) index = pc.Index(PINECONE_INDEX) # ── Check if file already ingested ─────────────────────────────────────────── def is_already_ingested(filename: str) -> bool: """ Query Pinecone for any vector whose metadata source == filename. If found, the file was already ingested — skip it. """ topic = os.path.splitext(filename)[0] # use a dummy zero vector just to run a metadata filter query dummy_vector = [0.0] * DIMENSION results = index.query( vector=dummy_vector, top_k=1, include_metadata=True, filter={"source": {"$eq": filename}} ) return len(results["matches"]) > 0 # ── Load Documents ──────────────────────────────────────────────────────────── def load_documents(filepath: str, filename: str) -> list: if filename.endswith(".pdf"): loader = PyPDFLoader(filepath) elif filename.endswith(".txt"): loader = TextLoader(filepath, encoding="utf-8") else: return [] loaded = loader.load() topic = os.path.splitext(filename)[0] for doc in loaded: doc.metadata["topic"] = topic doc.metadata["source"] = filename print(f"[LOADED] {filename} — {len(loaded)} page(s)") return loaded # ── Chunk Documents ─────────────────────────────────────────────────────────── def chunk_documents(docs: list) -> list: splitter = RecursiveCharacterTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP ) chunks = splitter.split_documents(docs) print(f"[INFO] Total chunks: {len(chunks)}") return chunks # ── Embed & Upsert ──────────────────────────────────────────────────────────── def embed_and_upsert(chunks: list, filename: str): texts = [f"passage: {chunk.page_content}" for chunk in chunks] print("[INFO] Generating embeddings...") all_vectors = [] for i in tqdm(range(0, len(texts), BATCH_SIZE)): batch_texts = texts[i: i + BATCH_SIZE] batch_chunks = chunks[i: i + BATCH_SIZE] embeddings = embedder.encode( batch_texts, normalize_embeddings=True, show_progress_bar=False ) for j, (emb, chunk) in enumerate(zip(embeddings, batch_chunks)): all_vectors.append({ "id": f"{os.path.splitext(filename)[0]}-chunk-{i + j}", "values": emb.tolist(), "metadata": { "text": chunk.page_content, "topic": chunk.metadata.get("topic", "unknown"), "source": chunk.metadata.get("source", "unknown"), } }) print("[INFO] Upserting to Pinecone...") for i in tqdm(range(0, len(all_vectors), 100)): index.upsert(vectors=all_vectors[i: i + 100]) print(f"[DONE] Upserted {len(all_vectors)} chunks for '{filename}'.") # ── Main ────────────────────────────────────────────────────────────────────── if __name__ == "__main__": files = [f for f in os.listdir(DATA_DIR) if f.endswith((".pdf", ".txt"))] if not files: print("[ERROR] No files found in data/os_notes/") exit(1) print(f"[INFO] Found {len(files)} file(s): {files}\n") for filename in files: filepath = os.path.join(DATA_DIR, filename) # ── SKIP CHECK ──────────────────────────────────────────────────── if is_already_ingested(filename): print(f"[SKIP] '{filename}' already in Pinecone. Skipping...\n") continue print(f"[NEW] Processing '{filename}'...") docs = load_documents(filepath, filename) if not docs: print(f"[WARN] Could not load '{filename}'. Skipping.\n") continue chunks = chunk_documents(docs) embed_and_upsert(chunks, filename) print() print("[ALL DONE] Ingestion complete. Existing embeddings are untouched.")