import argparse import os import logging from datetime import datetime, timezone # Disable Chroma telemetry to avoid opentelemetry compatibility errors during ingestion os.environ.setdefault("CHROMA_TELEMETRY_ENABLED", "false") from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import Chroma from langchain_community.embeddings import OpenAIEmbeddings # KG integration: import unconditionally so errors propagate if dependencies missing from src.kg.extract import extract_triples_with_llm from src.kg.store import KGStore import uuid import json # Module logger logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def load_documents(data_dir: str): from pathlib import Path from langchain_community.document_loaders import CSVLoader, TextLoader docs = [] for path in Path(data_dir).rglob("*"): if not path.is_file(): continue suffix = path.suffix.lower() if suffix == ".txt": loader = TextLoader(str(path)) elif suffix == ".csv": loader = CSVLoader(file_path=str(path)) else: continue loaded = loader.load() docs.extend(loaded) logger.info(f"Loaded {len(docs)} documents from {data_dir}") logger.debug("Documents ingested: %s", [ (d.metadata or {}).get('source') for d in docs ]) return docs def ingest(data_dir: str, persist_dir: str, chunk_size: int, chunk_overlap: int, openai_api_key: str = None): logger.info("Starting ingest: data_dir=%s persist_dir=%s chunk_size=%s chunk_overlap=%s", data_dir, persist_dir, chunk_size, chunk_overlap) if not os.path.exists(data_dir): logger.error("Data directory does not exist: %s", data_dir) raise ValueError(f"Data directory does not exist: {data_dir}") docs = load_documents(data_dir) if not docs: logger.error("No documents found in %s", data_dir) raise ValueError(f"No .txt documents found in {data_dir}") splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, ) split_docs = splitter.split_documents(docs) logger.info("Split into %d chunks", len(split_docs)) # Ensure persist dir exists and add file handler to logger os.makedirs(persist_dir, exist_ok=True) # Add file handler for detailed logs in persist_dir/ingest.log try: fh = logging.FileHandler(os.path.join(persist_dir, 'ingest.log')) fh.setLevel(logging.DEBUG) fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) # Avoid adding multiple file handlers on repeated calls if not any(isinstance(h, logging.FileHandler) and getattr(h, 'baseFilename', None) == fh.baseFilename for h in logger.handlers): logger.addHandler(fh) except Exception: logger.exception('Failed to add file handler for ingest log') # Prepare KG store and local chunk index chunks_index = {} kg_path = os.path.join(persist_dir, "kg_store.ttl") # Initialize embeddings; provide a clear error if OpenAI API key is missing try: logger.info('Initializing embeddings') # If an API key was provided on the CLI, inject it into the environment if openai_api_key: os.environ['OPENAI_API_KEY'] = openai_api_key logger.debug('Set OPENAI_API_KEY from CLI flag') embeddings = OpenAIEmbeddings() logger.info('Embeddings initialized') except Exception as e: logger.exception("Failed to initialize OpenAI embeddings. Ensure OPENAI_API_KEY is set in the environment or pass --openai-api-key.") raise # Initialize KG store unconditionally so errors are visible try: logger.info('Initializing KG store at %s', kg_path) kg = KGStore(path=kg_path) logger.info('KG store initialized') except Exception: logger.exception('Failed to initialize KGStore') # re-raise so caller sees the failure raise # Annotate chunks with stable chunk_id and optionally extract/link KG triples start_time = datetime.now(timezone.utc) logger.info('Beginning per-chunk processing at %s UTC', start_time.isoformat()) for i, d in enumerate(split_docs, start=1): print(i, d) meta = d.metadata or {} chunk_id = meta.get("chunk_id") or str(uuid.uuid4()) if not meta: d.metadata = {} d.metadata["chunk_id"] = chunk_id # Save minimal chunk index for runtime retrieval (text and source metadata) chunks_index[chunk_id] = { "text": getattr(d, "page_content", "") or getattr(d, "content", ""), "metadata": d.metadata, } # Log progress at intervals if i % 50 == 0 or i <= 5: logger.debug('Processing chunk %d/%d (id=%s)', i, len(split_docs), chunk_id) # Attempt to extract triples and link the chunk (errors during extraction are non-fatal) try: triples = extract_triples_with_llm(chunks_index[chunk_id]["text"], max_triples=4) if triples: logger.debug('Extracted %d triples for chunk %s', len(triples), chunk_id) for t in triples: try: kg.add_triple( t.get("subject"), t.get("predicate"), t.get("object"), provenance={"sentence": t.get("sentence"), "confidence": t.get("confidence")}, ) kg.link_chunk_to_entity( chunk_id, t.get("subject"), sentence=t.get("sentence"), confidence=t.get("confidence"), ) except Exception: logger.exception('Non-fatal error while adding triple or linking chunk %s', chunk_id) continue except Exception: # LLM extraction failed or not configured; skip KG extraction for this chunk logger.exception('KG extraction failed for chunk %s (continuing)', chunk_id) pass end_time = datetime.now(timezone.utc) logger.info('Finished per-chunk processing at %s UTC (duration %s)', end_time.isoformat(), end_time - start_time) # Persist Chroma vectorstore try: logger.info('Persisting Chroma vectorstore to %s', persist_dir) Chroma.from_documents( split_docs, embedding=embeddings, persist_directory=persist_dir, ) logger.info('Vectorstore built and persisted to %s', persist_dir) except Exception as e: import traceback, sys logger.exception('Chroma.from_documents failed to write the vectorstore:') # ensure the log is flushed to file for h in logger.handlers: try: h.flush() except Exception: pass sys.exit(1) # Persist chunks index for runtime (simple json mapping) try: idx_path = os.path.join(persist_dir, "chunks_index.json") with open(idx_path, "w", encoding="utf-8") as fh: json.dump(chunks_index, fh) logger.info('Wrote chunks_index.json (%d entries)', len(chunks_index)) except Exception: logger.exception('Failed to write chunks_index.json') # Persist KG try: kg.save() logger.info('KG persisted to %s', kg_path) except Exception: import traceback, sys logger.exception('Failed to persist KG to disk:') # ensure the log is flushed to file for h in logger.handlers: try: h.flush() except Exception: pass sys.exit(1) def main(): parser = argparse.ArgumentParser() parser.add_argument("--data-dir", type=str, default="./data") parser.add_argument("--persist-dir", type=str, default="./vectorstore") parser.add_argument("--chunk-size", type=int, default=200) parser.add_argument("--chunk-overlap", type=int, default=50) parser.add_argument("--openai-api-key", type=str, default=None, help="Optional OpenAI API key to use for embeddings (overrides env var)") args = parser.parse_args() ingest( data_dir=args.data_dir, persist_dir=args.persist_dir, chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap, openai_api_key=args.openai_api_key, ) if __name__ == "__main__": main()