Spaces:
Sleeping
Sleeping
| """ | |
| RAG Pipeline β Data Engineering Knowledge Assistant | |
| βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| Strategy : PDF β chunked β HuggingFace MiniLM embeddings β ChromaDB (in-memory) | |
| LLM : Groq llama-3.1-8b-instant (sub-500ms response, free tier) | |
| Compat : Works standalone OR registered as an MLflow PyFunc model on Databricks | |
| """ | |
| from __future__ import annotations | |
| import os | |
| from pathlib import Path | |
| from typing import List, Dict | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Core RAG class | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class DataEngineeringRAG: | |
| """ | |
| Retrieval-Augmented Generation pipeline tuned for data-engineering content. | |
| Usage (standalone): | |
| rag = DataEngineeringRAG(pdf_path="knowledge/data_engineering_patterns.pdf", | |
| groq_api_key=os.environ["GROQ_API_KEY"]) | |
| rag.initialize() | |
| print(rag.search("What is the Lambda architecture?")) | |
| Usage (Databricks): | |
| Register via mlflow.pyfunc.log_model β see databricks/agent_notebook.py | |
| """ | |
| def __init__(self, pdf_path: str, groq_api_key: str): | |
| self.pdf_path = Path(pdf_path) | |
| self.groq_api_key = groq_api_key | |
| self.vectorstore = None | |
| self.retriever = None | |
| self._doc_count = 0 | |
| self._initialized = False | |
| # ββ public ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def initialize(self) -> None: | |
| """Load PDF β embed β store. Safe to call multiple times (idempotent).""" | |
| if self._initialized: | |
| return | |
| if not self.pdf_path.exists(): | |
| print(f"β οΈ PDF not found at '{self.pdf_path}' β running in demo mode.") | |
| self._demo_mode() | |
| return | |
| self._build_vectorstore() | |
| self._initialized = True | |
| def search(self, query: str, k: int = 5) -> List[Dict]: | |
| """Return ranked chunks relevant to *query*.""" | |
| if not self.vectorstore: | |
| return [] | |
| docs_scores = self.vectorstore.similarity_search_with_score(query, k=k) | |
| return [ | |
| { | |
| "content": doc.page_content, | |
| "source": doc.metadata.get("source", "pdf"), | |
| "page": doc.metadata.get("page", 0), | |
| "score": round(1 - float(score), 4), # convert distance β similarity | |
| } | |
| for doc, score in docs_scores | |
| ] | |
| def get_retriever(self): | |
| return self.retriever | |
| def get_doc_count(self) -> int: | |
| return self._doc_count | |
| # ββ private βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_vectorstore(self) -> None: | |
| from langchain_community.document_loaders import PyPDFLoader | |
| try: | |
| # LangChain >= 0.2 β split into dedicated package | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| except ImportError: | |
| # LangChain < 0.2 fallback | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| print(f"π Loading '{self.pdf_path.name}' β¦") | |
| loader = PyPDFLoader(str(self.pdf_path)) | |
| documents = loader.load() | |
| print(f" β {len(documents)} pages loaded") | |
| # ββ Chunk ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Smaller chunks (800 chars) with generous overlap keep context intact | |
| # for technical patterns that often span several paragraphs. | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=800, | |
| chunk_overlap=160, | |
| separators=["\n\n", "\n", ". ", "! ", "? ", ", ", " "], | |
| ) | |
| chunks = splitter.split_documents(documents) | |
| print(f" β {len(chunks)} chunks created") | |
| # ββ Embed ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # all-MiniLM-L6-v2 : 22 MB, CPU-friendly, strong semantic accuracy | |
| print("π’ Embedding chunks (CPU, ~30β60 s on first run) β¦") | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2", | |
| model_kwargs={"device": "cpu"}, | |
| encode_kwargs={"normalize_embeddings": True}, | |
| ) | |
| # ββ Store ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Chroma in-memory β no disk I/O, works on HF Spaces free tier | |
| self.vectorstore = Chroma.from_documents( | |
| documents=chunks, | |
| embedding=embeddings, | |
| collection_name="de_patterns", | |
| ) | |
| # MMR retriever: diversity + relevance | |
| self.retriever = self.vectorstore.as_retriever( | |
| search_type="mmr", | |
| search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.6}, | |
| ) | |
| self._doc_count = len(chunks) | |
| print(f"β Vector store ready β {self._doc_count} chunks indexed") | |
| def _demo_mode(self) -> None: | |
| """Lightweight fallback when PDF is missing (useful for CI / testing).""" | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| try: | |
| from langchain_core.documents import Document | |
| except ImportError: | |
| from langchain.schema import Document | |
| demo_docs = [ | |
| Document( | |
| page_content=( | |
| "The Lambda Architecture splits processing into three layers: " | |
| "batch, speed, and serving. The batch layer reprocesses all historical " | |
| "data; the speed layer handles real-time incremental updates; the serving " | |
| "layer merges both for query." | |
| ), | |
| metadata={"source": "demo", "page": 0}, | |
| ), | |
| Document( | |
| page_content=( | |
| "The Kappa Architecture simplifies Lambda by removing the batch layer. " | |
| "All data flows through a single streaming path. Historical reprocessing " | |
| "is done by replaying the event log." | |
| ), | |
| metadata={"source": "demo", "page": 1}, | |
| ), | |
| Document( | |
| page_content=( | |
| "A Data Lakehouse combines the flexibility of a data lake with the " | |
| "structure and ACID guarantees of a data warehouse. Formats like Delta Lake, " | |
| "Apache Iceberg, and Apache Hudi implement this pattern." | |
| ), | |
| metadata={"source": "demo", "page": 2}, | |
| ), | |
| ] | |
| embeddings = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-MiniLM-L6-v2", | |
| model_kwargs={"device": "cpu"}, | |
| ) | |
| self.vectorstore = Chroma.from_documents(demo_docs, embedding=embeddings) | |
| self.retriever = self.vectorstore.as_retriever(search_kwargs={"k": 3}) | |
| self._doc_count = len(demo_docs) | |
| self._initialized = True | |
| print("β Demo mode active β 3 built-in DE patterns loaded") |