cmd0160's picture
Adding kg updates
70de36c
import argparse
import os
import logging
from datetime import datetime, timezone
# Disable Chroma telemetry to avoid opentelemetry compatibility errors during ingestion
os.environ.setdefault("CHROMA_TELEMETRY_ENABLED", "false")
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OpenAIEmbeddings
# KG integration: import unconditionally so errors propagate if dependencies missing
from src.kg.extract import extract_triples_with_llm
from src.kg.store import KGStore
import uuid
import json
# Module logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def load_documents(data_dir: str):
from pathlib import Path
from langchain_community.document_loaders import CSVLoader, TextLoader
docs = []
for path in Path(data_dir).rglob("*"):
if not path.is_file():
continue
suffix = path.suffix.lower()
if suffix == ".txt":
loader = TextLoader(str(path))
elif suffix == ".csv":
loader = CSVLoader(file_path=str(path))
else:
continue
loaded = loader.load()
docs.extend(loaded)
logger.info(f"Loaded {len(docs)} documents from {data_dir}")
logger.debug("Documents ingested: %s", [ (d.metadata or {}).get('source') for d in docs ])
return docs
def ingest(data_dir: str, persist_dir: str, chunk_size: int, chunk_overlap: int, openai_api_key: str = None):
logger.info("Starting ingest: data_dir=%s persist_dir=%s chunk_size=%s chunk_overlap=%s", data_dir, persist_dir, chunk_size, chunk_overlap)
if not os.path.exists(data_dir):
logger.error("Data directory does not exist: %s", data_dir)
raise ValueError(f"Data directory does not exist: {data_dir}")
docs = load_documents(data_dir)
if not docs:
logger.error("No documents found in %s", data_dir)
raise ValueError(f"No .txt documents found in {data_dir}")
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
split_docs = splitter.split_documents(docs)
logger.info("Split into %d chunks", len(split_docs))
# Ensure persist dir exists and add file handler to logger
os.makedirs(persist_dir, exist_ok=True)
# Add file handler for detailed logs in persist_dir/ingest.log
try:
fh = logging.FileHandler(os.path.join(persist_dir, 'ingest.log'))
fh.setLevel(logging.DEBUG)
fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
# Avoid adding multiple file handlers on repeated calls
if not any(isinstance(h, logging.FileHandler) and getattr(h, 'baseFilename', None) == fh.baseFilename for h in logger.handlers):
logger.addHandler(fh)
except Exception:
logger.exception('Failed to add file handler for ingest log')
# Prepare KG store and local chunk index
chunks_index = {}
kg_path = os.path.join(persist_dir, "kg_store.ttl")
# Initialize embeddings; provide a clear error if OpenAI API key is missing
try:
logger.info('Initializing embeddings')
# If an API key was provided on the CLI, inject it into the environment
if openai_api_key:
os.environ['OPENAI_API_KEY'] = openai_api_key
logger.debug('Set OPENAI_API_KEY from CLI flag')
embeddings = OpenAIEmbeddings()
logger.info('Embeddings initialized')
except Exception as e:
logger.exception("Failed to initialize OpenAI embeddings. Ensure OPENAI_API_KEY is set in the environment or pass --openai-api-key.")
raise
# Initialize KG store unconditionally so errors are visible
try:
logger.info('Initializing KG store at %s', kg_path)
kg = KGStore(path=kg_path)
logger.info('KG store initialized')
except Exception:
logger.exception('Failed to initialize KGStore')
# re-raise so caller sees the failure
raise
# Annotate chunks with stable chunk_id and optionally extract/link KG triples
start_time = datetime.now(timezone.utc)
logger.info('Beginning per-chunk processing at %s UTC', start_time.isoformat())
for i, d in enumerate(split_docs, start=1):
print(i, d)
meta = d.metadata or {}
chunk_id = meta.get("chunk_id") or str(uuid.uuid4())
if not meta:
d.metadata = {}
d.metadata["chunk_id"] = chunk_id
# Save minimal chunk index for runtime retrieval (text and source metadata)
chunks_index[chunk_id] = {
"text": getattr(d, "page_content", "") or getattr(d, "content", ""),
"metadata": d.metadata,
}
# Log progress at intervals
if i % 50 == 0 or i <= 5:
logger.debug('Processing chunk %d/%d (id=%s)', i, len(split_docs), chunk_id)
# Attempt to extract triples and link the chunk (errors during extraction are non-fatal)
try:
triples = extract_triples_with_llm(chunks_index[chunk_id]["text"], max_triples=4)
if triples:
logger.debug('Extracted %d triples for chunk %s', len(triples), chunk_id)
for t in triples:
try:
kg.add_triple(
t.get("subject"),
t.get("predicate"),
t.get("object"),
provenance={"sentence": t.get("sentence"), "confidence": t.get("confidence")},
)
kg.link_chunk_to_entity(
chunk_id,
t.get("subject"),
sentence=t.get("sentence"),
confidence=t.get("confidence"),
)
except Exception:
logger.exception('Non-fatal error while adding triple or linking chunk %s', chunk_id)
continue
except Exception:
# LLM extraction failed or not configured; skip KG extraction for this chunk
logger.exception('KG extraction failed for chunk %s (continuing)', chunk_id)
pass
end_time = datetime.now(timezone.utc)
logger.info('Finished per-chunk processing at %s UTC (duration %s)', end_time.isoformat(), end_time - start_time)
# Persist Chroma vectorstore
try:
logger.info('Persisting Chroma vectorstore to %s', persist_dir)
Chroma.from_documents(
split_docs,
embedding=embeddings,
persist_directory=persist_dir,
)
logger.info('Vectorstore built and persisted to %s', persist_dir)
except Exception as e:
import traceback, sys
logger.exception('Chroma.from_documents failed to write the vectorstore:')
# ensure the log is flushed to file
for h in logger.handlers:
try:
h.flush()
except Exception:
pass
sys.exit(1)
# Persist chunks index for runtime (simple json mapping)
try:
idx_path = os.path.join(persist_dir, "chunks_index.json")
with open(idx_path, "w", encoding="utf-8") as fh:
json.dump(chunks_index, fh)
logger.info('Wrote chunks_index.json (%d entries)', len(chunks_index))
except Exception:
logger.exception('Failed to write chunks_index.json')
# Persist KG
try:
kg.save()
logger.info('KG persisted to %s', kg_path)
except Exception:
import traceback, sys
logger.exception('Failed to persist KG to disk:')
# ensure the log is flushed to file
for h in logger.handlers:
try:
h.flush()
except Exception:
pass
sys.exit(1)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--data-dir", type=str, default="./data")
parser.add_argument("--persist-dir", type=str, default="./vectorstore")
parser.add_argument("--chunk-size", type=int, default=200)
parser.add_argument("--chunk-overlap", type=int, default=50)
parser.add_argument("--openai-api-key", type=str, default=None, help="Optional OpenAI API key to use for embeddings (overrides env var)")
args = parser.parse_args()
ingest(
data_dir=args.data_dir,
persist_dir=args.persist_dir,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap,
openai_api_key=args.openai_api_key,
)
if __name__ == "__main__":
main()