RAG-Insurance / chroma.py
mokhles's picture
Initial commit: Insurance RAG API
af37875
# chroma.py (minimal, no visualization, WITH sentence-transformers, with .env)
import os
import warnings
from pathlib import Path
from typing import List, Dict
import pandas as pd # (currently unused but kept if you need it later)
from dotenv import load_dotenv
from llama_parse import LlamaParse
from llama_index.core.node_parser import SentenceSplitter
import chromadb
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
from openai import OpenAI
import nest_asyncio
nest_asyncio.apply()
warnings.filterwarnings("ignore")
# ---------- LOAD .env ----------
load_dotenv()
# ---------- CONFIG ----------
CONFIG = {
"pdf_directory": r"C:\Users\Legion\Documents\Ominimo Job\Pdfs for RAG",
"output_directory": "./output/",
"llm_model": "gpt-4.1-mini",
"chunk_size": 512,
"chunk_overlap": 50,
"top_k_retrieval": 3,
# ✅ SentenceTransformer embedding model (384-D for MiniLM)
# Must match your retrieval embedding model.
"embedding_model": "all-MiniLM-L6-v2",
# Optional: force device ("cpu" or "cuda")
"embedding_device": os.getenv("EMB_DEVICE", "cpu"),
}
Path(CONFIG["output_directory"]).mkdir(parents=True, exist_ok=True)
# ---------- OPENAI CLIENT (for summaries only) ----------
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
raise RuntimeError("OPENAI_API_KEY is not set in the environment or .env file.")
client = OpenAI(api_key=OPENAI_API_KEY)
document_summaries: Dict[str, str] = {}
def summarize_document(text: str, client: OpenAI, model: str) -> str:
"""Generate a summary of the document using OpenAI (used only for summaries)."""
response = client.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": (
"You are a helpful assistant that creates concise "
"summaries of documents."
),
},
{
"role": "user",
"content": (
"Please provide a comprehensive summary of the "
"following document:\n\n"
f"{text[:4000]}"
),
},
],
temperature=0.3,
max_tokens=500,
)
return response.choices[0].message.content
# ---------- PDF PARSING ----------
def parse_pdfs_with_llamaparse(pdf_directory: str) -> List[Dict]:
"""Parse PDFs using LlamaParse with batch processing."""
pdf_files = list(Path(pdf_directory).glob("*.pdf"))
print(f"Found {len(pdf_files)} PDF files")
llama_key = os.environ.get("LLAMA_CLOUD_API_KEY")
if not llama_key:
raise RuntimeError("LLAMA_CLOUD_API_KEY is not set in the environment or .env.")
parser = LlamaParse(
api_key=llama_key,
result_type="markdown",
verbose=True,
language="en",
num_workers=4,
)
all_documents: List[Dict] = []
try:
print("\nParsing all PDFs in batch...")
pdf_paths = [str(pdf) for pdf in pdf_files]
documents_batch = parser.load_data(pdf_paths)
print(f"✓ Successfully parsed {len(documents_batch)} document sections")
doc_index = 0
for pdf_path in pdf_files:
print(f"\nProcessing: {pdf_path.name}")
pdf_docs = []
while doc_index < len(documents_batch):
doc = documents_batch[doc_index]
if hasattr(doc, "metadata") and doc.metadata.get("file_path"):
if pdf_path.name in doc.metadata.get("file_path", ""):
pdf_docs.append(doc)
doc_index += 1
else:
break
else:
pdf_docs.append(doc)
doc_index += 1
if doc_index >= len(documents_batch):
break
if pdf_docs:
full_text = " ".join([d.text for d in pdf_docs])
summary = summarize_document(full_text, client, CONFIG["llm_model"])
document_summaries[pdf_path.name] = summary
print(f"Summary for {pdf_path.name}:")
print(summary[:200] + "...\n")
for d in pdf_docs:
all_documents.append(
{
"text": d.text,
"source": pdf_path.name,
"metadata": d.metadata if hasattr(d, "metadata") else {},
}
)
else:
print(f"Warning: No content extracted from {pdf_path.name}")
document_summaries[pdf_path.name] = "No content extracted"
except Exception as e:
print(f"Batch processing failed: {str(e)}")
print("\nFalling back to individual file processing with sleep delays...")
import time
for pdf_path in pdf_files:
print(f"\nParsing: {pdf_path.name}")
try:
time.sleep(2)
documents = parser.load_data(str(pdf_path))
if documents:
full_text = " ".join([d.text for d in documents])
summary = summarize_document(full_text, client, CONFIG["llm_model"])
document_summaries[pdf_path.name] = summary
print(f"Summary for {pdf_path.name}:")
print(summary[:200] + "...\n")
for d in documents:
all_documents.append(
{
"text": d.text,
"source": pdf_path.name,
"metadata": d.metadata if hasattr(d, "metadata") else {},
}
)
else:
print(f"Warning: No content extracted from {pdf_path.name}")
document_summaries[pdf_path.name] = "No content extracted"
except Exception as e2:
print(f"Error parsing {pdf_path.name}: {str(e2)}")
document_summaries[pdf_path.name] = f"Failed to parse: {str(e2)}"
continue
return all_documents
# ---------- CHUNKING ----------
def chunk_documents(
documents: List[Dict],
chunk_size: int = 512,
chunk_overlap: int = 50,
) -> List[Dict]:
"""Chunk documents using semantic splitting."""
text_splitter = SentenceSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
)
all_chunks: List[Dict] = []
chunk_id = 0
for doc in documents:
chunks = text_splitter.split_text(doc["text"])
for chunk in chunks:
all_chunks.append(
{
"chunk_id": f"chunk_{chunk_id}",
"text": chunk,
"source": doc["source"],
"metadata": doc["metadata"],
}
)
chunk_id += 1
return all_chunks
# ---------- CHROMA (SBERT EMBEDDINGS, 384-D) ----------
def create_chromadb_collection(
chunks: List[Dict],
collection_name: str = "rag_documents",
) -> chromadb.Collection:
"""Create and populate ChromaDB collection using SentenceTransformer embeddings."""
sbert_ef = SentenceTransformerEmbeddingFunction(
model_name=CONFIG["embedding_model"],
device=CONFIG["embedding_device"],
)
client_db = chromadb.PersistentClient(
path=os.path.join(CONFIG["output_directory"], "chromadb")
)
# ✅ Delete existing collection to avoid old 1536-D vectors
try:
client_db.delete_collection(collection_name)
print(f"Deleted existing collection: {collection_name}")
except Exception:
pass
collection = client_db.create_collection(
name=collection_name,
metadata={
"description": "RAG document chunks",
"embedding_model": CONFIG["embedding_model"],
"embedding_dim": 384, # MiniLM dim
},
embedding_function=sbert_ef,
)
ids = [chunk["chunk_id"] for chunk in chunks]
documents = [chunk["text"] for chunk in chunks]
metadatas = [
{"source": chunk["source"], **(chunk["metadata"] or {})}
for chunk in chunks
]
batch_size = 100
for i in range(0, len(ids), batch_size):
batch_end = min(i + batch_size, len(ids))
collection.add(
ids=ids[i:batch_end],
documents=documents[i:batch_end],
metadatas=metadatas[i:batch_end],
)
print(
f"Added batch {i // batch_size + 1}/"
f"{(len(ids) - 1) // batch_size + 1}"
)
print(f"✓ ChromaDB collection created with {len(ids)} chunks")
return collection
# ---------- MAIN ----------
def main():
print("✓ Starting pipeline with .env configuration (SentenceTransformer embeddings)")
print("Starting PDF parsing...")
parsed_documents = parse_pdfs_with_llamaparse(CONFIG["pdf_directory"])
print(f"\n✓ Parsed {len(parsed_documents)} document sections from PDFs")
chunks = chunk_documents(
parsed_documents,
CONFIG["chunk_size"],
CONFIG["chunk_overlap"],
)
print(f"✓ Created {len(chunks)} chunks")
if chunks:
print("\nSample chunk:")
print(chunks[0])
chroma_collection = create_chromadb_collection(chunks)
print("ChromaDB collection ready for querying.")
if __name__ == "__main__":
main()