File size: 3,674 Bytes
ef5f450
 
 
 
 
 
 
 
 
 
 
 
 
c8b552c
ef5f450
c8b552c
ef5f450
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""
ingest.py — Load documents from a directory, chunk them, embed them, push to Pinecone.

Usage:
    python ingest.py --dir ./docs
    python ingest.py --dir ./docs --chunk-size 400 --chunk-overlap 50
"""

import os
import uuid
import argparse
import logging
from pathlib import Path

from dotenv import load_dotenv 

load_dotenv()                  

from pinecone import Pinecone, ServerlessSpec
from embedder import embed_texts

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_INDEX   = os.getenv("PINECONE_INDEX", "llmops-rag")
EMBED_DIM        = 384   # all-MiniLM-L6-v2 output dim


def chunk_text(text: str, chunk_size: int = 400, overlap: int = 50) -> list[str]:
    """Naive character-level chunker. Replace with sentence splitter if needed."""
    words = text.split()
    chunks, i = [], 0
    while i < len(words):
        chunk = " ".join(words[i : i + chunk_size])
        chunks.append(chunk)
        i += chunk_size - overlap
    return chunks


def load_documents(directory: str) -> list[dict]:
    """Load .txt and .md files recursively. Returns list of {source, text}."""
    docs = []
    for path in Path(directory).rglob("*"):
        if path.suffix in {".txt", ".md"}:
            text = path.read_text(encoding="utf-8", errors="ignore").strip()
            if text:
                docs.append({"source": str(path), "text": text})
    logger.info(f"Loaded {len(docs)} documents from {directory}")
    return docs


def ensure_index(pc: Pinecone):
    existing = [idx.name for idx in pc.list_indexes()]
    if PINECONE_INDEX not in existing:
        logger.info(f"Creating index '{PINECONE_INDEX}'...")
        pc.create_index(
            name=PINECONE_INDEX,
            dimension=EMBED_DIM,
            metric="cosine",
            spec=ServerlessSpec(cloud="aws", region="us-east-1"),
        )
        logger.info("Index created.")
    else:
        logger.info(f"Index '{PINECONE_INDEX}' already exists.")


def ingest_documents(directory: str, chunk_size: int = 400, chunk_overlap: int = 50) -> int:
    if not PINECONE_API_KEY:
        raise EnvironmentError("PINECONE_API_KEY not set")

    pc = Pinecone(api_key=PINECONE_API_KEY)
    ensure_index(pc)
    index = pc.Index(PINECONE_INDEX)

    docs = load_documents(directory)
    if not docs:
        logger.warning("No documents found. Nothing ingested.")
        return 0

    all_chunks, all_meta = [], []
    for doc in docs:
        for chunk in chunk_text(doc["text"], chunk_size, chunk_overlap):
            all_chunks.append(chunk)
            all_meta.append({"source": doc["source"], "text": chunk})

    logger.info(f"Embedding {len(all_chunks)} chunks...")
    vectors = embed_texts(all_chunks)

    # Upsert in batches of 100
    BATCH = 100
    total = 0
    for i in range(0, len(all_chunks), BATCH):
        batch_vectors = [
            (str(uuid.uuid4()), vectors[j], all_meta[j])
            for j in range(i, min(i + BATCH, len(all_chunks)))
        ]
        index.upsert(vectors=batch_vectors)
        total += len(batch_vectors)
        logger.info(f"  Upserted {total}/{len(all_chunks)}")

    logger.info(f"Done. {total} vectors in Pinecone index '{PINECONE_INDEX}'.")
    return total


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--dir", default="./docs", help="Directory containing .txt/.md files")
    parser.add_argument("--chunk-size", type=int, default=400)
    parser.add_argument("--chunk-overlap", type=int, default=50)
    args = parser.parse_args()
    ingest_documents(args.dir, args.chunk_size, args.chunk_overlap)