devsecops-platform / model /rag_pipeline.py
shaikhsalman's picture
refactor: merged structure - model at center, DevSecOps wrapped around it
9d4d5c7 verified
# =============================================================================
# RAG Pipeline — DevSecOps Knowledge Assistant
# =============================================================================
# Stack: LangChain + HuggingFace Embeddings + ChromaDB + vLLM
# =============================================================================
import os
from typing import List, Optional
from dataclasses import dataclass
from langchain_community.document_loaders import (
DirectoryLoader,
GitLoader,
PyPDFLoader,
)
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_community.llms import VLLM
@dataclass
class RAGConfig:
"""RAG pipeline configuration."""
embedding_model: str = "BAAI/bge-large-en-v1.5"
llm_model: str = "meta-llama/Llama-3.1-8B-Instruct"
chunk_size: int = 512
chunk_overlap: int = 64
retriever_k: int = 4
persist_dir: str = "/data/chromadb"
device: str = "cuda"
class DevSecOpsRAG:
"""Retrieval-Augmented Generation pipeline for DevSecOps knowledge."""
def __init__(self, config: Optional[RAGConfig] = None):
self.config = config or RAGConfig()
self.embeddings = HuggingFaceEmbeddings(
model_name=self.config.embedding_model,
model_kwargs={"device": self.config.device},
encode_kwargs={"normalize_embeddings": True},
)
self.vectorstore = None
self.llm = VLLM(
model=self.config.llm_model,
trust_remote_code=True,
tensor_parallel_size=1,
gpu_memory_utilization=0.85,
max_model_len=4096,
)
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.config.chunk_size,
chunk_overlap=self.config.chunk_overlap,
separators=["\n## ", "\n### ", "\n\n", "\n", " "],
)
def ingest_documents(self, source_path: str) -> int:
"""Load and index documents from a directory."""
loader = DirectoryLoader(
source_path,
glob="**/*.{md,txt,rst,py,yaml,yml,json,tf}",
show_progress=True,
)
documents = loader.load()
chunks = self.text_splitter.split_documents(documents)
self.vectorstore = Chroma.from_documents(
documents=chunks,
embedding=self.embeddings,
persist_directory=self.config.persist_dir,
collection_metadata={"hnsw:space": "cosine"},
)
self.vectorstore.persist()
return len(chunks)
def query(self, question: str) -> dict:
"""Query the RAG pipeline with a question."""
if not self.vectorstore:
self.vectorstore = Chroma(
persist_directory=self.config.persist_dir,
embedding_function=self.embeddings,
)
retriever = self.vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": self.config.retriever_k},
)
docs = retriever.invoke(question)
context = "\n\n---\n\n".join(d.page_content for d in docs)
prompt = f"""You are a DevSecOps expert assistant. Answer the question
based on the context below. If the context doesn't contain enough information,
say so clearly. Always cite which document/section the answer comes from.
Context:
{context}
Question: {question}
Answer:"""
response = self.llm.invoke(prompt)
return {
"question": question,
"answer": response,
"sources": [
{"content": d.page_content[:200], "metadata": d.metadata}
for d in docs
],
}
if __name__ == "__main__":
rag = DevSecOpsRAG()
# Ingest platform documentation
num_chunks = rag.ingest_documents("/app/devsecops-platform")
print(f"Ingested {num_chunks} chunks")
# Query
result = rag.query("What security policies are enforced in the Kubernetes cluster?")
print(f"Q: {result['question']}")
print(f"A: {result['answer']}")