File size: 3,177 Bytes
33f5651
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import os
import uuid
from typing import List, Dict, Any
from dotenv import load_dotenv

from chunker import chunk_text
from llm import LLMProvider
from pinecone_client import PineconeClient

try:
    from pypdf import PdfReader
except Exception:  # pragma: no cover
    PdfReader = None

load_dotenv()


def read_txt(path: str) -> str:
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        return f.read()


def read_pdf(path: str) -> str:
    if PdfReader is None:
        raise RuntimeError("pypdf is not installed. Please install pypdf to read PDFs.")
    reader = PdfReader(path)
    texts: List[str] = []
    for page in reader.pages:
        texts.append(page.extract_text() or "")
    return "\n".join(texts)


def load_documents(data_dir: str) -> List[Dict[str, Any]]:
    docs: List[Dict[str, Any]] = []
    for root, _, files in os.walk(data_dir):
        for name in files:
            path = os.path.join(root, name)
            ext = os.path.splitext(name)[1].lower()
            try:
                if ext in [".txt", ".md", ".log"]:
                    text = read_txt(path)
                elif ext in [".pdf"]:
                    text = read_pdf(path)
                else:
                    continue
                if text and text.strip():
                    docs.append({"path": path, "text": text})
            except Exception as e:  # skip problematic files
                print(f"[warn] Failed to read {path}: {e}")
    return docs


def ingest(data_dir: str = None, chunk_size: int = None, chunk_overlap: int = None) -> int:
    data_dir = data_dir or os.getenv("DATA_DIR", "./data")
    chunk_size = int(chunk_size or os.getenv("CHUNK_SIZE", 800))
    chunk_overlap = int(chunk_overlap or os.getenv("CHUNK_OVERLAP", 120))

    os.makedirs(data_dir, exist_ok=True)

    docs = load_documents(data_dir)
    if not docs:
        print(f"No documents found in {data_dir}")
        return 0

    llm = LLMProvider()
    pc = PineconeClient()

    # Ensure index exists based on embedding dimension
    test_vec = llm.embed_texts(["dimension probe"])[0]
    pc.ensure_index(dimension=len(test_vec))

    total_chunks = 0
    batch: List[Dict[str, Any]] = []

    for doc in docs:
        path = doc["path"]
        chunks = chunk_text(doc["text"], chunk_size=chunk_size, chunk_overlap=chunk_overlap)
        embeddings = llm.embed_texts(chunks)
        for i, (text, vec) in enumerate(zip(chunks, embeddings)):
            total_chunks += 1
            item = {
                "id": str(uuid.uuid4()),
                "values": vec,
                "metadata": {
                    "text": text,
                    "source": path,
                    "chunk": i,
                },
            }
            batch.append(item)
            if len(batch) >= 100:
                pc.upsert_embeddings(batch)
                batch = []
    if batch:
        pc.upsert_embeddings(batch)

    print(f"Ingested {total_chunks} chunks from {len(docs)} documents.")
    return total_chunks


if __name__ == "__main__":
    ingest()