Irminsul / ingest.py
MukulRay's picture
chore: code cleanup, add .env.example, update README
c8b552c
"""
ingest.py — Load documents from a directory, chunk them, embed them, push to Pinecone.
Usage:
python ingest.py --dir ./docs
python ingest.py --dir ./docs --chunk-size 400 --chunk-overlap 50
"""
import os
import uuid
import argparse
import logging
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
from pinecone import Pinecone, ServerlessSpec
from embedder import embed_texts
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_INDEX = os.getenv("PINECONE_INDEX", "llmops-rag")
EMBED_DIM = 384 # all-MiniLM-L6-v2 output dim
def chunk_text(text: str, chunk_size: int = 400, overlap: int = 50) -> list[str]:
"""Naive character-level chunker. Replace with sentence splitter if needed."""
words = text.split()
chunks, i = [], 0
while i < len(words):
chunk = " ".join(words[i : i + chunk_size])
chunks.append(chunk)
i += chunk_size - overlap
return chunks
def load_documents(directory: str) -> list[dict]:
"""Load .txt and .md files recursively. Returns list of {source, text}."""
docs = []
for path in Path(directory).rglob("*"):
if path.suffix in {".txt", ".md"}:
text = path.read_text(encoding="utf-8", errors="ignore").strip()
if text:
docs.append({"source": str(path), "text": text})
logger.info(f"Loaded {len(docs)} documents from {directory}")
return docs
def ensure_index(pc: Pinecone):
existing = [idx.name for idx in pc.list_indexes()]
if PINECONE_INDEX not in existing:
logger.info(f"Creating index '{PINECONE_INDEX}'...")
pc.create_index(
name=PINECONE_INDEX,
dimension=EMBED_DIM,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
logger.info("Index created.")
else:
logger.info(f"Index '{PINECONE_INDEX}' already exists.")
def ingest_documents(directory: str, chunk_size: int = 400, chunk_overlap: int = 50) -> int:
if not PINECONE_API_KEY:
raise EnvironmentError("PINECONE_API_KEY not set")
pc = Pinecone(api_key=PINECONE_API_KEY)
ensure_index(pc)
index = pc.Index(PINECONE_INDEX)
docs = load_documents(directory)
if not docs:
logger.warning("No documents found. Nothing ingested.")
return 0
all_chunks, all_meta = [], []
for doc in docs:
for chunk in chunk_text(doc["text"], chunk_size, chunk_overlap):
all_chunks.append(chunk)
all_meta.append({"source": doc["source"], "text": chunk})
logger.info(f"Embedding {len(all_chunks)} chunks...")
vectors = embed_texts(all_chunks)
# Upsert in batches of 100
BATCH = 100
total = 0
for i in range(0, len(all_chunks), BATCH):
batch_vectors = [
(str(uuid.uuid4()), vectors[j], all_meta[j])
for j in range(i, min(i + BATCH, len(all_chunks)))
]
index.upsert(vectors=batch_vectors)
total += len(batch_vectors)
logger.info(f" Upserted {total}/{len(all_chunks)}")
logger.info(f"Done. {total} vectors in Pinecone index '{PINECONE_INDEX}'.")
return total
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--dir", default="./docs", help="Directory containing .txt/.md files")
parser.add_argument("--chunk-size", type=int, default=400)
parser.add_argument("--chunk-overlap", type=int, default=50)
args = parser.parse_args()
ingest_documents(args.dir, args.chunk_size, args.chunk_overlap)