# ============================================================================= # RAG Pipeline — DevSecOps Knowledge Assistant # ============================================================================= # Stack: LangChain + HuggingFace Embeddings + ChromaDB + vLLM # ============================================================================= import os from typing import List, Optional from dataclasses import dataclass from langchain_community.document_loaders import ( DirectoryLoader, GitLoader, PyPDFLoader, ) from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.vectorstores import Chroma from langchain_community.llms import VLLM @dataclass class RAGConfig: """RAG pipeline configuration.""" embedding_model: str = "BAAI/bge-large-en-v1.5" llm_model: str = "meta-llama/Llama-3.1-8B-Instruct" chunk_size: int = 512 chunk_overlap: int = 64 retriever_k: int = 4 persist_dir: str = "/data/chromadb" device: str = "cuda" class DevSecOpsRAG: """Retrieval-Augmented Generation pipeline for DevSecOps knowledge.""" def __init__(self, config: Optional[RAGConfig] = None): self.config = config or RAGConfig() self.embeddings = HuggingFaceEmbeddings( model_name=self.config.embedding_model, model_kwargs={"device": self.config.device}, encode_kwargs={"normalize_embeddings": True}, ) self.vectorstore = None self.llm = VLLM( model=self.config.llm_model, trust_remote_code=True, tensor_parallel_size=1, gpu_memory_utilization=0.85, max_model_len=4096, ) self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=self.config.chunk_size, chunk_overlap=self.config.chunk_overlap, separators=["\n## ", "\n### ", "\n\n", "\n", " "], ) def ingest_documents(self, source_path: str) -> int: """Load and index documents from a directory.""" loader = DirectoryLoader( source_path, glob="**/*.{md,txt,rst,py,yaml,yml,json,tf}", show_progress=True, ) documents = loader.load() chunks = self.text_splitter.split_documents(documents) self.vectorstore = Chroma.from_documents( documents=chunks, embedding=self.embeddings, persist_directory=self.config.persist_dir, collection_metadata={"hnsw:space": "cosine"}, ) self.vectorstore.persist() return len(chunks) def query(self, question: str) -> dict: """Query the RAG pipeline with a question.""" if not self.vectorstore: self.vectorstore = Chroma( persist_directory=self.config.persist_dir, embedding_function=self.embeddings, ) retriever = self.vectorstore.as_retriever( search_type="mmr", search_kwargs={"k": self.config.retriever_k}, ) docs = retriever.invoke(question) context = "\n\n---\n\n".join(d.page_content for d in docs) prompt = f"""You are a DevSecOps expert assistant. Answer the question based on the context below. If the context doesn't contain enough information, say so clearly. Always cite which document/section the answer comes from. Context: {context} Question: {question} Answer:""" response = self.llm.invoke(prompt) return { "question": question, "answer": response, "sources": [ {"content": d.page_content[:200], "metadata": d.metadata} for d in docs ], } if __name__ == "__main__": rag = DevSecOpsRAG() # Ingest platform documentation num_chunks = rag.ingest_documents("/app/devsecops-platform") print(f"Ingested {num_chunks} chunks") # Query result = rag.query("What security policies are enforced in the Kubernetes cluster?") print(f"Q: {result['question']}") print(f"A: {result['answer']}")