|
|
import argparse |
|
|
import os |
|
|
import logging |
|
|
from datetime import datetime, timezone |
|
|
|
|
|
|
|
|
os.environ.setdefault("CHROMA_TELEMETRY_ENABLED", "false") |
|
|
|
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain_community.vectorstores import Chroma |
|
|
from langchain_community.embeddings import OpenAIEmbeddings |
|
|
|
|
|
|
|
|
from src.kg.extract import extract_triples_with_llm |
|
|
from src.kg.store import KGStore |
|
|
|
|
|
import uuid |
|
|
import json |
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
def load_documents(data_dir: str): |
|
|
from pathlib import Path |
|
|
from langchain_community.document_loaders import CSVLoader, TextLoader |
|
|
|
|
|
docs = [] |
|
|
for path in Path(data_dir).rglob("*"): |
|
|
if not path.is_file(): |
|
|
continue |
|
|
suffix = path.suffix.lower() |
|
|
if suffix == ".txt": |
|
|
loader = TextLoader(str(path)) |
|
|
elif suffix == ".csv": |
|
|
loader = CSVLoader(file_path=str(path)) |
|
|
else: |
|
|
continue |
|
|
loaded = loader.load() |
|
|
docs.extend(loaded) |
|
|
|
|
|
logger.info(f"Loaded {len(docs)} documents from {data_dir}") |
|
|
logger.debug("Documents ingested: %s", [ (d.metadata or {}).get('source') for d in docs ]) |
|
|
return docs |
|
|
|
|
|
|
|
|
def ingest(data_dir: str, persist_dir: str, chunk_size: int, chunk_overlap: int, openai_api_key: str = None): |
|
|
logger.info("Starting ingest: data_dir=%s persist_dir=%s chunk_size=%s chunk_overlap=%s", data_dir, persist_dir, chunk_size, chunk_overlap) |
|
|
|
|
|
if not os.path.exists(data_dir): |
|
|
logger.error("Data directory does not exist: %s", data_dir) |
|
|
raise ValueError(f"Data directory does not exist: {data_dir}") |
|
|
|
|
|
docs = load_documents(data_dir) |
|
|
|
|
|
if not docs: |
|
|
logger.error("No documents found in %s", data_dir) |
|
|
raise ValueError(f"No .txt documents found in {data_dir}") |
|
|
|
|
|
splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=chunk_size, |
|
|
chunk_overlap=chunk_overlap, |
|
|
) |
|
|
split_docs = splitter.split_documents(docs) |
|
|
logger.info("Split into %d chunks", len(split_docs)) |
|
|
|
|
|
|
|
|
os.makedirs(persist_dir, exist_ok=True) |
|
|
|
|
|
try: |
|
|
fh = logging.FileHandler(os.path.join(persist_dir, 'ingest.log')) |
|
|
fh.setLevel(logging.DEBUG) |
|
|
fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) |
|
|
|
|
|
if not any(isinstance(h, logging.FileHandler) and getattr(h, 'baseFilename', None) == fh.baseFilename for h in logger.handlers): |
|
|
logger.addHandler(fh) |
|
|
except Exception: |
|
|
logger.exception('Failed to add file handler for ingest log') |
|
|
|
|
|
|
|
|
chunks_index = {} |
|
|
kg_path = os.path.join(persist_dir, "kg_store.ttl") |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info('Initializing embeddings') |
|
|
|
|
|
if openai_api_key: |
|
|
os.environ['OPENAI_API_KEY'] = openai_api_key |
|
|
logger.debug('Set OPENAI_API_KEY from CLI flag') |
|
|
embeddings = OpenAIEmbeddings() |
|
|
logger.info('Embeddings initialized') |
|
|
except Exception as e: |
|
|
logger.exception("Failed to initialize OpenAI embeddings. Ensure OPENAI_API_KEY is set in the environment or pass --openai-api-key.") |
|
|
raise |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info('Initializing KG store at %s', kg_path) |
|
|
kg = KGStore(path=kg_path) |
|
|
logger.info('KG store initialized') |
|
|
except Exception: |
|
|
logger.exception('Failed to initialize KGStore') |
|
|
|
|
|
raise |
|
|
|
|
|
|
|
|
start_time = datetime.now(timezone.utc) |
|
|
logger.info('Beginning per-chunk processing at %s UTC', start_time.isoformat()) |
|
|
for i, d in enumerate(split_docs, start=1): |
|
|
print(i, d) |
|
|
meta = d.metadata or {} |
|
|
chunk_id = meta.get("chunk_id") or str(uuid.uuid4()) |
|
|
if not meta: |
|
|
d.metadata = {} |
|
|
d.metadata["chunk_id"] = chunk_id |
|
|
|
|
|
|
|
|
chunks_index[chunk_id] = { |
|
|
"text": getattr(d, "page_content", "") or getattr(d, "content", ""), |
|
|
"metadata": d.metadata, |
|
|
} |
|
|
|
|
|
|
|
|
if i % 50 == 0 or i <= 5: |
|
|
logger.debug('Processing chunk %d/%d (id=%s)', i, len(split_docs), chunk_id) |
|
|
|
|
|
|
|
|
try: |
|
|
triples = extract_triples_with_llm(chunks_index[chunk_id]["text"], max_triples=4) |
|
|
if triples: |
|
|
logger.debug('Extracted %d triples for chunk %s', len(triples), chunk_id) |
|
|
for t in triples: |
|
|
try: |
|
|
kg.add_triple( |
|
|
t.get("subject"), |
|
|
t.get("predicate"), |
|
|
t.get("object"), |
|
|
provenance={"sentence": t.get("sentence"), "confidence": t.get("confidence")}, |
|
|
) |
|
|
kg.link_chunk_to_entity( |
|
|
chunk_id, |
|
|
t.get("subject"), |
|
|
sentence=t.get("sentence"), |
|
|
confidence=t.get("confidence"), |
|
|
) |
|
|
except Exception: |
|
|
logger.exception('Non-fatal error while adding triple or linking chunk %s', chunk_id) |
|
|
continue |
|
|
except Exception: |
|
|
|
|
|
logger.exception('KG extraction failed for chunk %s (continuing)', chunk_id) |
|
|
pass |
|
|
|
|
|
end_time = datetime.now(timezone.utc) |
|
|
logger.info('Finished per-chunk processing at %s UTC (duration %s)', end_time.isoformat(), end_time - start_time) |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info('Persisting Chroma vectorstore to %s', persist_dir) |
|
|
Chroma.from_documents( |
|
|
split_docs, |
|
|
embedding=embeddings, |
|
|
persist_directory=persist_dir, |
|
|
) |
|
|
logger.info('Vectorstore built and persisted to %s', persist_dir) |
|
|
except Exception as e: |
|
|
import traceback, sys |
|
|
logger.exception('Chroma.from_documents failed to write the vectorstore:') |
|
|
|
|
|
for h in logger.handlers: |
|
|
try: |
|
|
h.flush() |
|
|
except Exception: |
|
|
pass |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
try: |
|
|
idx_path = os.path.join(persist_dir, "chunks_index.json") |
|
|
with open(idx_path, "w", encoding="utf-8") as fh: |
|
|
json.dump(chunks_index, fh) |
|
|
logger.info('Wrote chunks_index.json (%d entries)', len(chunks_index)) |
|
|
except Exception: |
|
|
logger.exception('Failed to write chunks_index.json') |
|
|
|
|
|
|
|
|
try: |
|
|
kg.save() |
|
|
logger.info('KG persisted to %s', kg_path) |
|
|
except Exception: |
|
|
import traceback, sys |
|
|
logger.exception('Failed to persist KG to disk:') |
|
|
|
|
|
for h in logger.handlers: |
|
|
try: |
|
|
h.flush() |
|
|
except Exception: |
|
|
pass |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser() |
|
|
parser.add_argument("--data-dir", type=str, default="./data") |
|
|
parser.add_argument("--persist-dir", type=str, default="./vectorstore") |
|
|
parser.add_argument("--chunk-size", type=int, default=200) |
|
|
parser.add_argument("--chunk-overlap", type=int, default=50) |
|
|
parser.add_argument("--openai-api-key", type=str, default=None, help="Optional OpenAI API key to use for embeddings (overrides env var)") |
|
|
args = parser.parse_args() |
|
|
|
|
|
ingest( |
|
|
data_dir=args.data_dir, |
|
|
persist_dir=args.persist_dir, |
|
|
chunk_size=args.chunk_size, |
|
|
chunk_overlap=args.chunk_overlap, |
|
|
openai_api_key=args.openai_api_key, |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|