File size: 8,594 Bytes
9797603
ab5dcab
70de36c
 
 
 
 
9797603
ee749be
 
 
 
70de36c
 
 
 
ee749be
 
 
70de36c
 
 
ab5dcab
 
67fb4c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
70de36c
 
9797603
 
 
70de36c
 
 
ab5dcab
70de36c
ab5dcab
 
 
9797603
ab5dcab
70de36c
ab5dcab
 
 
 
 
 
 
70de36c
ab5dcab
70de36c
9797603
70de36c
 
 
 
 
 
 
 
 
 
9797603
ee749be
 
 
70de36c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ee749be
 
70de36c
 
 
 
ee749be
 
 
 
 
 
 
 
 
 
 
 
70de36c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ee749be
70de36c
ee749be
 
70de36c
ee749be
 
 
 
 
 
70de36c
ee749be
70de36c
ee749be
70de36c
 
 
 
 
 
 
 
 
 
 
 
 
 
9797603
 
ab5dcab
9797603
 
 
ee749be
 
70de36c
9797603
ab5dcab
 
 
 
 
 
70de36c
ab5dcab
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import argparse
import os
import logging
from datetime import datetime, timezone

# Disable Chroma telemetry to avoid opentelemetry compatibility errors during ingestion
os.environ.setdefault("CHROMA_TELEMETRY_ENABLED", "false")

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings

# KG integration: import unconditionally so errors propagate if dependencies missing
from src.kg.extract import extract_triples_with_llm
from src.kg.store import KGStore

import uuid
import json

# Module logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def load_documents(data_dir: str):
    from pathlib import Path
    from langchain_community.document_loaders import CSVLoader, TextLoader

    docs = []
    for path in Path(data_dir).rglob("*"):
        if not path.is_file():
            continue
        suffix = path.suffix.lower()
        if suffix == ".txt":
            loader = TextLoader(str(path))
        elif suffix == ".csv":
            loader = CSVLoader(file_path=str(path))
        else:
            continue
        loaded = loader.load()
        docs.extend(loaded)

    logger.info(f"Loaded {len(docs)} documents from {data_dir}")
    logger.debug("Documents ingested: %s", [ (d.metadata or {}).get('source') for d in docs ])
    return docs


def ingest(data_dir: str, persist_dir: str, chunk_size: int, chunk_overlap: int, openai_api_key: str = None):
    logger.info("Starting ingest: data_dir=%s persist_dir=%s chunk_size=%s chunk_overlap=%s", data_dir, persist_dir, chunk_size, chunk_overlap)

    if not os.path.exists(data_dir):
        logger.error("Data directory does not exist: %s", data_dir)
        raise ValueError(f"Data directory does not exist: {data_dir}")

    docs = load_documents(data_dir)

    if not docs:
        logger.error("No documents found in %s", data_dir)
        raise ValueError(f"No .txt documents found in {data_dir}")

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
    )
    split_docs = splitter.split_documents(docs)
    logger.info("Split into %d chunks", len(split_docs))

    # Ensure persist dir exists and add file handler to logger
    os.makedirs(persist_dir, exist_ok=True)
    # Add file handler for detailed logs in persist_dir/ingest.log
    try:
        fh = logging.FileHandler(os.path.join(persist_dir, 'ingest.log'))
        fh.setLevel(logging.DEBUG)
        fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
        # Avoid adding multiple file handlers on repeated calls
        if not any(isinstance(h, logging.FileHandler) and getattr(h, 'baseFilename', None) == fh.baseFilename for h in logger.handlers):
            logger.addHandler(fh)
    except Exception:
        logger.exception('Failed to add file handler for ingest log')

    # Prepare KG store and local chunk index
    chunks_index = {}
    kg_path = os.path.join(persist_dir, "kg_store.ttl")

    # Initialize embeddings; provide a clear error if OpenAI API key is missing
    try:
        logger.info('Initializing embeddings')
        # If an API key was provided on the CLI, inject it into the environment
        if openai_api_key:
            os.environ['OPENAI_API_KEY'] = openai_api_key
            logger.debug('Set OPENAI_API_KEY from CLI flag')
        embeddings = OpenAIEmbeddings()
        logger.info('Embeddings initialized')
    except Exception as e:
        logger.exception("Failed to initialize OpenAI embeddings. Ensure OPENAI_API_KEY is set in the environment or pass --openai-api-key.")
        raise

    # Initialize KG store unconditionally so errors are visible
    try:
        logger.info('Initializing KG store at %s', kg_path)
        kg = KGStore(path=kg_path)
        logger.info('KG store initialized')
    except Exception:
        logger.exception('Failed to initialize KGStore')
        # re-raise so caller sees the failure
        raise

    # Annotate chunks with stable chunk_id and optionally extract/link KG triples
    start_time = datetime.now(timezone.utc)
    logger.info('Beginning per-chunk processing at %s UTC', start_time.isoformat())
    for i, d in enumerate(split_docs, start=1):
        print(i, d)
        meta = d.metadata or {}
        chunk_id = meta.get("chunk_id") or str(uuid.uuid4())
        if not meta:
            d.metadata = {}
        d.metadata["chunk_id"] = chunk_id

        # Save minimal chunk index for runtime retrieval (text and source metadata)
        chunks_index[chunk_id] = {
            "text": getattr(d, "page_content", "") or getattr(d, "content", ""),
            "metadata": d.metadata,
        }

        # Log progress at intervals
        if i % 50 == 0 or i <= 5:
            logger.debug('Processing chunk %d/%d (id=%s)', i, len(split_docs), chunk_id)

        # Attempt to extract triples and link the chunk (errors during extraction are non-fatal)
        try:
            triples = extract_triples_with_llm(chunks_index[chunk_id]["text"], max_triples=4)
            if triples:
                logger.debug('Extracted %d triples for chunk %s', len(triples), chunk_id)
            for t in triples:
                try:
                    kg.add_triple(
                        t.get("subject"),
                        t.get("predicate"),
                        t.get("object"),
                        provenance={"sentence": t.get("sentence"), "confidence": t.get("confidence")},
                    )
                    kg.link_chunk_to_entity(
                        chunk_id,
                        t.get("subject"),
                        sentence=t.get("sentence"),
                        confidence=t.get("confidence"),
                    )
                except Exception:
                    logger.exception('Non-fatal error while adding triple or linking chunk %s', chunk_id)
                    continue
        except Exception:
            # LLM extraction failed or not configured; skip KG extraction for this chunk
            logger.exception('KG extraction failed for chunk %s (continuing)', chunk_id)
            pass

    end_time = datetime.now(timezone.utc)
    logger.info('Finished per-chunk processing at %s UTC (duration %s)', end_time.isoformat(), end_time - start_time)

    # Persist Chroma vectorstore
    try:
        logger.info('Persisting Chroma vectorstore to %s', persist_dir)
        Chroma.from_documents(
            split_docs,
            embedding=embeddings,
            persist_directory=persist_dir,
        )
        logger.info('Vectorstore built and persisted to %s', persist_dir)
    except Exception as e:
        import traceback, sys
        logger.exception('Chroma.from_documents failed to write the vectorstore:')
        # ensure the log is flushed to file
        for h in logger.handlers:
            try:
                h.flush()
            except Exception:
                pass
        sys.exit(1)

    # Persist chunks index for runtime (simple json mapping)
    try:
        idx_path = os.path.join(persist_dir, "chunks_index.json")
        with open(idx_path, "w", encoding="utf-8") as fh:
            json.dump(chunks_index, fh)
        logger.info('Wrote chunks_index.json (%d entries)', len(chunks_index))
    except Exception:
        logger.exception('Failed to write chunks_index.json')

    # Persist KG
    try:
        kg.save()
        logger.info('KG persisted to %s', kg_path)
    except Exception:
        import traceback, sys
        logger.exception('Failed to persist KG to disk:')
        # ensure the log is flushed to file
        for h in logger.handlers:
            try:
                h.flush()
            except Exception:
                pass
        sys.exit(1)


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--data-dir", type=str, default="./data")
    parser.add_argument("--persist-dir", type=str, default="./vectorstore")
    parser.add_argument("--chunk-size", type=int, default=200)
    parser.add_argument("--chunk-overlap", type=int, default=50)
    parser.add_argument("--openai-api-key", type=str, default=None, help="Optional OpenAI API key to use for embeddings (overrides env var)")
    args = parser.parse_args()

    ingest(
        data_dir=args.data_dir,
        persist_dir=args.persist_dir,
        chunk_size=args.chunk_size,
        chunk_overlap=args.chunk_overlap,
        openai_api_key=args.openai_api_key,
    )


if __name__ == "__main__":
    main()