agentic-corrective-rag / ingestion.py
3v324v23's picture
Auto deploy backend
3c72c9d
import os, pickle, hashlib
from pathlib import Path
import numpy as np
import chromadb
from sentence_transformers import SentenceTransformer
from rank_bm25 import BM25Okapi
from langchain.text_splitter import RecursiveCharacterTextSplitter
from config import (
DOCS_DIR, CHROMA_PATH, CHROMA_COLLECTION,
BM25_PATH, EMBEDDER_NAME, CHUNK_SIZE, CHUNK_OVERLAP
)
# ── helpers ───────────────────────────────────────────
def read_pdf_text(fpath):
import fitz
doc = fitz.open(fpath)
return "\n".join(page.get_text() for page in doc).strip()
def clean_text(text):
return " ".join(text.split())
def doc_hash(text: str) -> str:
"""SHA-256 of the document β€” used to skip duplicate ingestion."""
return hashlib.sha256(text.encode()).hexdigest()[:16]
# ── loading ───────────────────────────────────────────
def load_documents():
docs, filenames = [], []
path = Path(DOCS_DIR)
path.mkdir(exist_ok=True)
for fpath in path.glob("*.txt"):
try:
text = clean_text(fpath.read_text(encoding="utf-8"))
docs.append(text); filenames.append(fpath.name)
print(f" Loaded txt: {fpath.name}")
except Exception as e:
print(f" Skipped {fpath.name}: {e}")
for fpath in path.glob("*.pdf"):
try:
text = clean_text(read_pdf_text(fpath))
if text:
docs.append(text); filenames.append(fpath.name)
print(f" Loaded pdf: {fpath.name}")
except Exception as e:
print(f" Skipped {fpath.name}: {e}")
if not docs:
raise FileNotFoundError(
f"No .txt or .pdf files found in '{DOCS_DIR}'."
)
print(f"\nLoaded {len(docs)} document(s)")
return docs, filenames
# ── chunking ──────────────────────────────────────────
def semantic_chunk(docs, filenames):
splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
separators=["\n\n", "\n", ". ", " "],
)
all_chunks, all_sources = [], []
for doc, fname in zip(docs, filenames):
chunks = splitter.split_text(doc)
all_chunks.extend(chunks)
all_sources.extend([fname] * len(chunks))
avg = sum(len(c) for c in all_chunks) // len(all_chunks)
print(f"Created {len(all_chunks)} chunks (avg {avg} chars)")
return all_chunks, all_sources
# ── indexing ──────────────────────────────────────────
def build_and_save_indexes(chunks, sources, model=None):
if model is None:
model = SentenceTransformer(EMBEDDER_NAME)
print("\nBuilding embeddings...")
embeddings = model.encode(
chunks, show_progress_bar=True, batch_size=32
).tolist()
# ── ChromaDB ──
client = chromadb.PersistentClient(path=CHROMA_PATH)
collection = client.get_or_create_collection(
name=CHROMA_COLLECTION,
metadata={"hnsw:space": "cosine"}
)
# Skip chunks already indexed (dedup by content hash)
existing_ids = set(collection.get()["ids"])
new_chunks, new_embeddings, new_sources, new_ids, new_meta = [], [], [], [], []
for i, (chunk, emb, src) in enumerate(zip(chunks, embeddings, sources)):
chunk_id = f"doc_{doc_hash(chunk)}"
if chunk_id not in existing_ids:
new_chunks.append(chunk)
new_embeddings.append(emb)
new_sources.append(src)
new_ids.append(chunk_id)
new_meta.append({"source": src})
if new_chunks:
collection.add(
documents=new_chunks,
embeddings=new_embeddings,
ids=new_ids,
metadatas=new_meta,
)
print(f"Added {len(new_chunks)} new chunks to ChromaDB")
else:
print("No new chunks β€” all already indexed")
# ── BM25 (full rebuild, cheap) ──
all_chunks_in_db = collection.get()["documents"]
all_sources_in_db = [m["source"] for m in collection.get()["metadatas"]]
tokenized = [c.lower().split() for c in all_chunks_in_db]
bm25_index = BM25Okapi(tokenized)
with open(BM25_PATH, "wb") as f:
pickle.dump({
"bm25": bm25_index,
"chunks": all_chunks_in_db,
"sources": all_sources_in_db
}, f)
print(f"BM25 saved β€” {len(all_chunks_in_db)} total chunks")
# ── entry point ───────────────────────────────────────
def run_ingestion(model=None):
print("=== Starting ingestion ===\n")
docs, filenames = load_documents()
chunks, sources = semantic_chunk(docs, filenames)
build_and_save_indexes(chunks, sources, model=model)
print("\n=== Ingestion complete ===")
if __name__ == "__main__":
run_ingestion()