""" RAG Pipeline — Data Engineering Knowledge Assistant ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Strategy : PDF → chunked → HuggingFace MiniLM embeddings → ChromaDB (in-memory) LLM : Groq llama-3.1-8b-instant (sub-500ms response, free tier) Compat : Works standalone OR registered as an MLflow PyFunc model on Databricks """ from __future__ import annotations import os from pathlib import Path from typing import List, Dict # ────────────────────────────────────────────────────────────────────────────── # Core RAG class # ────────────────────────────────────────────────────────────────────────────── class DataEngineeringRAG: """ Retrieval-Augmented Generation pipeline tuned for data-engineering content. Usage (standalone): rag = DataEngineeringRAG(pdf_path="knowledge/data_engineering_patterns.pdf", groq_api_key=os.environ["GROQ_API_KEY"]) rag.initialize() print(rag.search("What is the Lambda architecture?")) Usage (Databricks): Register via mlflow.pyfunc.log_model — see databricks/agent_notebook.py """ def __init__(self, pdf_path: str, groq_api_key: str): self.pdf_path = Path(pdf_path) self.groq_api_key = groq_api_key self.vectorstore = None self.retriever = None self._doc_count = 0 self._initialized = False # ── public ──────────────────────────────────────────────────────────────── def initialize(self) -> None: """Load PDF → embed → store. Safe to call multiple times (idempotent).""" if self._initialized: return if not self.pdf_path.exists(): print(f"⚠️ PDF not found at '{self.pdf_path}' — running in demo mode.") self._demo_mode() return self._build_vectorstore() self._initialized = True def search(self, query: str, k: int = 5) -> List[Dict]: """Return ranked chunks relevant to *query*.""" if not self.vectorstore: return [] docs_scores = self.vectorstore.similarity_search_with_score(query, k=k) return [ { "content": doc.page_content, "source": doc.metadata.get("source", "pdf"), "page": doc.metadata.get("page", 0), "score": round(1 - float(score), 4), # convert distance → similarity } for doc, score in docs_scores ] def get_retriever(self): return self.retriever def get_doc_count(self) -> int: return self._doc_count # ── private ─────────────────────────────────────────────────────────────── def _build_vectorstore(self) -> None: from langchain_community.document_loaders import PyPDFLoader try: # LangChain >= 0.2 — split into dedicated package from langchain_text_splitters import RecursiveCharacterTextSplitter except ImportError: # LangChain < 0.2 fallback from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import Chroma from langchain_community.embeddings import HuggingFaceEmbeddings print(f"📚 Loading '{self.pdf_path.name}' …") loader = PyPDFLoader(str(self.pdf_path)) documents = loader.load() print(f" → {len(documents)} pages loaded") # ── Chunk ────────────────────────────────────────────────────────── # Smaller chunks (800 chars) with generous overlap keep context intact # for technical patterns that often span several paragraphs. splitter = RecursiveCharacterTextSplitter( chunk_size=800, chunk_overlap=160, separators=["\n\n", "\n", ". ", "! ", "? ", ", ", " "], ) chunks = splitter.split_documents(documents) print(f" → {len(chunks)} chunks created") # ── Embed ────────────────────────────────────────────────────────── # all-MiniLM-L6-v2 : 22 MB, CPU-friendly, strong semantic accuracy print("🔢 Embedding chunks (CPU, ~30–60 s on first run) …") embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={"device": "cpu"}, encode_kwargs={"normalize_embeddings": True}, ) # ── Store ────────────────────────────────────────────────────────── # Chroma in-memory — no disk I/O, works on HF Spaces free tier self.vectorstore = Chroma.from_documents( documents=chunks, embedding=embeddings, collection_name="de_patterns", ) # MMR retriever: diversity + relevance self.retriever = self.vectorstore.as_retriever( search_type="mmr", search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.6}, ) self._doc_count = len(chunks) print(f"✅ Vector store ready — {self._doc_count} chunks indexed") def _demo_mode(self) -> None: """Lightweight fallback when PDF is missing (useful for CI / testing).""" from langchain_community.vectorstores import Chroma from langchain_community.embeddings import HuggingFaceEmbeddings try: from langchain_core.documents import Document except ImportError: from langchain.schema import Document demo_docs = [ Document( page_content=( "The Lambda Architecture splits processing into three layers: " "batch, speed, and serving. The batch layer reprocesses all historical " "data; the speed layer handles real-time incremental updates; the serving " "layer merges both for query." ), metadata={"source": "demo", "page": 0}, ), Document( page_content=( "The Kappa Architecture simplifies Lambda by removing the batch layer. " "All data flows through a single streaming path. Historical reprocessing " "is done by replaying the event log." ), metadata={"source": "demo", "page": 1}, ), Document( page_content=( "A Data Lakehouse combines the flexibility of a data lake with the " "structure and ACID guarantees of a data warehouse. Formats like Delta Lake, " "Apache Iceberg, and Apache Hudi implement this pattern." ), metadata={"source": "demo", "page": 2}, ), ] embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={"device": "cpu"}, ) self.vectorstore = Chroma.from_documents(demo_docs, embedding=embeddings) self.retriever = self.vectorstore.as_retriever(search_kwargs={"k": 3}) self._doc_count = len(demo_docs) self._initialized = True print("✅ Demo mode active — 3 built-in DE patterns loaded")