File size: 8,594 Bytes
9797603 ab5dcab 70de36c 9797603 ee749be 70de36c ee749be 70de36c ab5dcab 67fb4c0 70de36c 9797603 70de36c ab5dcab 70de36c ab5dcab 9797603 ab5dcab 70de36c ab5dcab 70de36c ab5dcab 70de36c 9797603 70de36c 9797603 ee749be 70de36c ee749be 70de36c ee749be 70de36c ee749be 70de36c ee749be 70de36c ee749be 70de36c ee749be 70de36c ee749be 70de36c 9797603 ab5dcab 9797603 ee749be 70de36c 9797603 ab5dcab 70de36c ab5dcab |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
import argparse
import os
import logging
from datetime import datetime, timezone
# Disable Chroma telemetry to avoid opentelemetry compatibility errors during ingestion
os.environ.setdefault("CHROMA_TELEMETRY_ENABLED", "false")
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
# KG integration: import unconditionally so errors propagate if dependencies missing
from src.kg.extract import extract_triples_with_llm
from src.kg.store import KGStore
import uuid
import json
# Module logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def load_documents(data_dir: str):
from pathlib import Path
from langchain_community.document_loaders import CSVLoader, TextLoader
docs = []
for path in Path(data_dir).rglob("*"):
if not path.is_file():
continue
suffix = path.suffix.lower()
if suffix == ".txt":
loader = TextLoader(str(path))
elif suffix == ".csv":
loader = CSVLoader(file_path=str(path))
else:
continue
loaded = loader.load()
docs.extend(loaded)
logger.info(f"Loaded {len(docs)} documents from {data_dir}")
logger.debug("Documents ingested: %s", [ (d.metadata or {}).get('source') for d in docs ])
return docs
def ingest(data_dir: str, persist_dir: str, chunk_size: int, chunk_overlap: int, openai_api_key: str = None):
logger.info("Starting ingest: data_dir=%s persist_dir=%s chunk_size=%s chunk_overlap=%s", data_dir, persist_dir, chunk_size, chunk_overlap)
if not os.path.exists(data_dir):
logger.error("Data directory does not exist: %s", data_dir)
raise ValueError(f"Data directory does not exist: {data_dir}")
docs = load_documents(data_dir)
if not docs:
logger.error("No documents found in %s", data_dir)
raise ValueError(f"No .txt documents found in {data_dir}")
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
split_docs = splitter.split_documents(docs)
logger.info("Split into %d chunks", len(split_docs))
# Ensure persist dir exists and add file handler to logger
os.makedirs(persist_dir, exist_ok=True)
# Add file handler for detailed logs in persist_dir/ingest.log
try:
fh = logging.FileHandler(os.path.join(persist_dir, 'ingest.log'))
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
# Avoid adding multiple file handlers on repeated calls
if not any(isinstance(h, logging.FileHandler) and getattr(h, 'baseFilename', None) == fh.baseFilename for h in logger.handlers):
logger.addHandler(fh)
except Exception:
logger.exception('Failed to add file handler for ingest log')
# Prepare KG store and local chunk index
chunks_index = {}
kg_path = os.path.join(persist_dir, "kg_store.ttl")
# Initialize embeddings; provide a clear error if OpenAI API key is missing
try:
logger.info('Initializing embeddings')
# If an API key was provided on the CLI, inject it into the environment
if openai_api_key:
os.environ['OPENAI_API_KEY'] = openai_api_key
logger.debug('Set OPENAI_API_KEY from CLI flag')
embeddings = OpenAIEmbeddings()
logger.info('Embeddings initialized')
except Exception as e:
logger.exception("Failed to initialize OpenAI embeddings. Ensure OPENAI_API_KEY is set in the environment or pass --openai-api-key.")
raise
# Initialize KG store unconditionally so errors are visible
try:
logger.info('Initializing KG store at %s', kg_path)
kg = KGStore(path=kg_path)
logger.info('KG store initialized')
except Exception:
logger.exception('Failed to initialize KGStore')
# re-raise so caller sees the failure
raise
# Annotate chunks with stable chunk_id and optionally extract/link KG triples
start_time = datetime.now(timezone.utc)
logger.info('Beginning per-chunk processing at %s UTC', start_time.isoformat())
for i, d in enumerate(split_docs, start=1):
print(i, d)
meta = d.metadata or {}
chunk_id = meta.get("chunk_id") or str(uuid.uuid4())
if not meta:
d.metadata = {}
d.metadata["chunk_id"] = chunk_id
# Save minimal chunk index for runtime retrieval (text and source metadata)
chunks_index[chunk_id] = {
"text": getattr(d, "page_content", "") or getattr(d, "content", ""),
"metadata": d.metadata,
}
# Log progress at intervals
if i % 50 == 0 or i <= 5:
logger.debug('Processing chunk %d/%d (id=%s)', i, len(split_docs), chunk_id)
# Attempt to extract triples and link the chunk (errors during extraction are non-fatal)
try:
triples = extract_triples_with_llm(chunks_index[chunk_id]["text"], max_triples=4)
if triples:
logger.debug('Extracted %d triples for chunk %s', len(triples), chunk_id)
for t in triples:
try:
kg.add_triple(
t.get("subject"),
t.get("predicate"),
t.get("object"),
provenance={"sentence": t.get("sentence"), "confidence": t.get("confidence")},
)
kg.link_chunk_to_entity(
chunk_id,
t.get("subject"),
sentence=t.get("sentence"),
confidence=t.get("confidence"),
)
except Exception:
logger.exception('Non-fatal error while adding triple or linking chunk %s', chunk_id)
continue
except Exception:
# LLM extraction failed or not configured; skip KG extraction for this chunk
logger.exception('KG extraction failed for chunk %s (continuing)', chunk_id)
pass
end_time = datetime.now(timezone.utc)
logger.info('Finished per-chunk processing at %s UTC (duration %s)', end_time.isoformat(), end_time - start_time)
# Persist Chroma vectorstore
try:
logger.info('Persisting Chroma vectorstore to %s', persist_dir)
Chroma.from_documents(
split_docs,
embedding=embeddings,
persist_directory=persist_dir,
)
logger.info('Vectorstore built and persisted to %s', persist_dir)
except Exception as e:
import traceback, sys
logger.exception('Chroma.from_documents failed to write the vectorstore:')
# ensure the log is flushed to file
for h in logger.handlers:
try:
h.flush()
except Exception:
pass
sys.exit(1)
# Persist chunks index for runtime (simple json mapping)
try:
idx_path = os.path.join(persist_dir, "chunks_index.json")
with open(idx_path, "w", encoding="utf-8") as fh:
json.dump(chunks_index, fh)
logger.info('Wrote chunks_index.json (%d entries)', len(chunks_index))
except Exception:
logger.exception('Failed to write chunks_index.json')
# Persist KG
try:
kg.save()
logger.info('KG persisted to %s', kg_path)
except Exception:
import traceback, sys
logger.exception('Failed to persist KG to disk:')
# ensure the log is flushed to file
for h in logger.handlers:
try:
h.flush()
except Exception:
pass
sys.exit(1)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--data-dir", type=str, default="./data")
parser.add_argument("--persist-dir", type=str, default="./vectorstore")
parser.add_argument("--chunk-size", type=int, default=200)
parser.add_argument("--chunk-overlap", type=int, default=50)
parser.add_argument("--openai-api-key", type=str, default=None, help="Optional OpenAI API key to use for embeddings (overrides env var)")
args = parser.parse_args()
ingest(
data_dir=args.data_dir,
persist_dir=args.persist_dir,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap,
openai_api_key=args.openai_api_key,
)
if __name__ == "__main__":
main()
|