focustiki's picture
Update rag.py
23527a9 verified
"""
RAG Pipeline β€” Data Engineering Knowledge Assistant
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Strategy : PDF β†’ chunked β†’ HuggingFace MiniLM embeddings β†’ ChromaDB (in-memory)
LLM : Groq llama-3.1-8b-instant (sub-500ms response, free tier)
Compat : Works standalone OR registered as an MLflow PyFunc model on Databricks
"""
from __future__ import annotations
import os
from pathlib import Path
from typing import List, Dict
# ──────────────────────────────────────────────────────────────────────────────
# Core RAG class
# ──────────────────────────────────────────────────────────────────────────────
class DataEngineeringRAG:
"""
Retrieval-Augmented Generation pipeline tuned for data-engineering content.
Usage (standalone):
rag = DataEngineeringRAG(pdf_path="knowledge/data_engineering_patterns.pdf",
groq_api_key=os.environ["GROQ_API_KEY"])
rag.initialize()
print(rag.search("What is the Lambda architecture?"))
Usage (Databricks):
Register via mlflow.pyfunc.log_model β€” see databricks/agent_notebook.py
"""
def __init__(self, pdf_path: str, groq_api_key: str):
self.pdf_path = Path(pdf_path)
self.groq_api_key = groq_api_key
self.vectorstore = None
self.retriever = None
self._doc_count = 0
self._initialized = False
# ── public ────────────────────────────────────────────────────────────────
def initialize(self) -> None:
"""Load PDF β†’ embed β†’ store. Safe to call multiple times (idempotent)."""
if self._initialized:
return
if not self.pdf_path.exists():
print(f"⚠️ PDF not found at '{self.pdf_path}' β€” running in demo mode.")
self._demo_mode()
return
self._build_vectorstore()
self._initialized = True
def search(self, query: str, k: int = 5) -> List[Dict]:
"""Return ranked chunks relevant to *query*."""
if not self.vectorstore:
return []
docs_scores = self.vectorstore.similarity_search_with_score(query, k=k)
return [
{
"content": doc.page_content,
"source": doc.metadata.get("source", "pdf"),
"page": doc.metadata.get("page", 0),
"score": round(1 - float(score), 4), # convert distance β†’ similarity
}
for doc, score in docs_scores
]
def get_retriever(self):
return self.retriever
def get_doc_count(self) -> int:
return self._doc_count
# ── private ───────────────────────────────────────────────────────────────
def _build_vectorstore(self) -> None:
from langchain_community.document_loaders import PyPDFLoader
try:
# LangChain >= 0.2 β€” split into dedicated package
from langchain_text_splitters import RecursiveCharacterTextSplitter
except ImportError:
# LangChain < 0.2 fallback
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
print(f"πŸ“š Loading '{self.pdf_path.name}' …")
loader = PyPDFLoader(str(self.pdf_path))
documents = loader.load()
print(f" β†’ {len(documents)} pages loaded")
# ── Chunk ──────────────────────────────────────────────────────────
# Smaller chunks (800 chars) with generous overlap keep context intact
# for technical patterns that often span several paragraphs.
splitter = RecursiveCharacterTextSplitter(
chunk_size=800,
chunk_overlap=160,
separators=["\n\n", "\n", ". ", "! ", "? ", ", ", " "],
)
chunks = splitter.split_documents(documents)
print(f" β†’ {len(chunks)} chunks created")
# ── Embed ──────────────────────────────────────────────────────────
# all-MiniLM-L6-v2 : 22 MB, CPU-friendly, strong semantic accuracy
print("πŸ”’ Embedding chunks (CPU, ~30–60 s on first run) …")
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={"device": "cpu"},
encode_kwargs={"normalize_embeddings": True},
)
# ── Store ──────────────────────────────────────────────────────────
# Chroma in-memory β€” no disk I/O, works on HF Spaces free tier
self.vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
collection_name="de_patterns",
)
# MMR retriever: diversity + relevance
self.retriever = self.vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.6},
)
self._doc_count = len(chunks)
print(f"βœ… Vector store ready β€” {self._doc_count} chunks indexed")
def _demo_mode(self) -> None:
"""Lightweight fallback when PDF is missing (useful for CI / testing)."""
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
try:
from langchain_core.documents import Document
except ImportError:
from langchain.schema import Document
demo_docs = [
Document(
page_content=(
"The Lambda Architecture splits processing into three layers: "
"batch, speed, and serving. The batch layer reprocesses all historical "
"data; the speed layer handles real-time incremental updates; the serving "
"layer merges both for query."
),
metadata={"source": "demo", "page": 0},
),
Document(
page_content=(
"The Kappa Architecture simplifies Lambda by removing the batch layer. "
"All data flows through a single streaming path. Historical reprocessing "
"is done by replaying the event log."
),
metadata={"source": "demo", "page": 1},
),
Document(
page_content=(
"A Data Lakehouse combines the flexibility of a data lake with the "
"structure and ACID guarantees of a data warehouse. Formats like Delta Lake, "
"Apache Iceberg, and Apache Hudi implement this pattern."
),
metadata={"source": "demo", "page": 2},
),
]
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={"device": "cpu"},
)
self.vectorstore = Chroma.from_documents(demo_docs, embedding=embeddings)
self.retriever = self.vectorstore.as_retriever(search_kwargs={"k": 3})
self._doc_count = len(demo_docs)
self._initialized = True
print("βœ… Demo mode active β€” 3 built-in DE patterns loaded")