KnowYourRIght-Bot / src /ingest_documents.py
menikev's picture
Update src/ingest_documents.py
397c2a6 verified
#!/usr/bin/env python3
import os
import sys
import shutil
import re
from pathlib import Path
from dotenv import load_dotenv
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain.docstore.document import Document
def extract_section_reference(text: str) -> str:
patterns = [
r"(Section\s+\d+[A-Za-z0-9\-]*)",
r"(Article\s+\d+[A-Za-z0-9\-]*)",
r"(Part\s+[IVXLC]+)",
r"(Chapter\s+\d+)",
]
for p in patterns:
m = re.search(p, text, re.IGNORECASE)
if m:
return m.group(1).strip()
return "Unknown Section"
def _discover_processed_dirs(project_root: Path):
candidates = [
project_root / "data" / "processed",
project_root / "src" / "data" / "processed",
]
return [p for p in candidates if p.exists()]
def main():
print("=== INGEST: Section-aware build (Spaces-friendly) ===")
project_root = Path(__file__).resolve().parent.parent
print(f"[dbg] project_root: {project_root}")
load_dotenv()
processed_dirs = _discover_processed_dirs(project_root)
if not processed_dirs:
print("ERROR: No processed directories found.")
print("Expected one of: ./data/processed or ./src/data/processed")
sys.exit(1)
text_files = []
for d in processed_dirs:
text_files += list(d.glob("*.txt"))
text_files = sorted(text_files)
if not text_files:
print("ERROR: No .txt files found in processed directories.")
print("Make sure you committed your processed text files to the repo.")
sys.exit(1)
print(f"Found {len(text_files)} processed files:")
for f in text_files:
try:
rel = f.relative_to(project_root)
except Exception:
rel = f
print(" -", rel)
splitter = RecursiveCharacterTextSplitter(
chunk_size=800, chunk_overlap=150, separators=['\n\n', '\n', '. ', ' ']
)
docs = []
for tf in text_files:
try:
content = tf.read_text(encoding="utf-8")
except Exception as e:
print(f"[warn] Could not read {tf}: {e}")
continue
if not content.strip():
print(f"[warn] Empty file, skipping: {tf}")
continue
chunks = splitter.split_text(content)
base = tf.stem
source_pdfish = base.replace("_text", "").replace("_TXT", "")
lowname = tf.name.lower()
if "constitution" in lowname:
doc_type = "constitution"
elif "labour" in lowname:
doc_type = "labour_law"
elif "fccpa" in lowname:
doc_type = "consumer_protection"
elif "data_protection" in lowname or "ndpr" in lowname:
doc_type = "data_protection"
else:
doc_type = "general"
for i, ch in enumerate(chunks):
ch = ch.strip()
if len(ch) < 25:
continue
section = extract_section_reference(ch)
docs.append(
Document(
page_content=ch,
metadata={
"document_type": doc_type,
"section": section,
"source": source_pdfish,
"chunk_index": i,
"total_chunks": len(chunks),
"file_path": str(tf.relative_to(project_root)),
"content_length": len(ch),
},
)
)
if not docs:
print("ERROR: No chunks prepared. Check your .txt content.")
sys.exit(1)
print(f"Prepared {len(docs)} chunks total.")
print("Initializing embeddings...")
embed = HuggingFaceEmbeddings(model_name="BAAI/bge-small-en", model_kwargs={"device": "cpu"})
test = embed.embed_query("hello")
print(f"[dbg] embedding dim: {len(test)}")
persist_dir = Path(os.getenv("VECTOR_DB_DIR", "vector_db"))
if persist_dir.exists():
shutil.rmtree(persist_dir)
print("[dbg] removed existing vector_db")
persist_dir.mkdir(parents=True, exist_ok=True)
print(f"Building Chroma at: {persist_dir}")
vectordb = Chroma.from_documents(
documents=docs,
embedding=embed,
persist_directory=str(persist_dir),
collection_name="legal_documents",
)
count = vectordb._collection.count()
print(f"✅ Ingestion complete. Stored {count} chunks in 'legal_documents'.")
if count == 0:
print("ERROR: Zero chunks after build. Investigate your input files.")
sys.exit(1)
if __name__ == "__main__":
main()