RAG-Chatbot-Agentic-AI / app /rag_pipeline.py
KUNAL SHAW
Initial commit: RAG Chatbot for Agentic AI eBook with LangGraph, Pinecone, and Groq
f9c215a
"""
rag_pipeline.py - LangGraph RAG Pipeline
This module implements the RAG pipeline using LangGraph for orchestration:
1. Receive user query
2. Embed query using sentence-transformers
3. Query Pinecone for top-k similar chunks
4. Generate answer using LLM (if available) or extractive fallback
5. Return structured response with answer, chunks, and confidence
The pipeline enforces strict grounding - answers must come from retrieved chunks only.
"""
import os
import json
from typing import List, Dict, Any, Optional, TypedDict
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Import LangGraph components
try:
from langgraph.graph import StateGraph, END
LANGGRAPH_AVAILABLE = True
except ImportError:
LANGGRAPH_AVAILABLE = False
print("WARNING: langgraph not installed. Using simplified pipeline.")
# Import embedding model
from sentence_transformers import SentenceTransformer
# Import local modules
from app.vectorstore import PineconeVectorStore, LocalVectorStore, get_vector_store
from app.utils import compute_confidence, normalize_score, format_chunks_for_llm, load_chunks_from_jsonl
# Try to import OpenAI
try:
from openai import OpenAI
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
# Try to import Groq (free LLM alternative)
try:
from groq import Groq
GROQ_AVAILABLE = True
except ImportError:
GROQ_AVAILABLE = False
# Cache the embedding model to avoid reloading
_EMBEDDING_MODEL_CACHE = {}
def get_embedding_model(model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
"""Load and cache the embedding model to avoid repeated loading."""
if model_name not in _EMBEDDING_MODEL_CACHE:
print(f"Loading embedding model: {model_name}")
# Use device='cpu' explicitly to avoid meta tensor issues
model = SentenceTransformer(model_name, device='cpu')
_EMBEDDING_MODEL_CACHE[model_name] = model
return _EMBEDDING_MODEL_CACHE[model_name]
# ============================================================================
# LangGraph State Definition
# ============================================================================
class RAGState(TypedDict):
"""State object passed through the RAG pipeline."""
query: str
query_embedding: Optional[List[float]]
retrieved_chunks: List[Dict]
raw_scores: List[float]
confidence: float
final_answer: str
use_llm: bool
error: Optional[str]
# ============================================================================
# Pipeline Nodes (Functions)
# ============================================================================
class RAGPipeline:
"""
RAG Pipeline implementation using LangGraph.
The pipeline has the following stages:
1. embed_query - Convert query to vector
2. retrieve_chunks - Get relevant chunks from Pinecone
3. compute_confidence - Calculate confidence score
4. generate_answer - Use LLM or extractive fallback
"""
# System prompt for LLM - VERY IMPORTANT for grounding
SYSTEM_PROMPT = """You are an assistant answering questions based on provided document excerpts from an eBook about Agentic AI.
IMPORTANT INSTRUCTIONS:
1. Synthesize information from ALL the provided excerpts to give a comprehensive answer.
2. The excerpts may contain relevant information even if they don't directly state the answer - look for definitions, explanations, and examples.
3. Combine information from multiple excerpts when helpful.
4. Cite page numbers when referencing specific information.
5. Do NOT add any information that is not in the excerpts.
6. If absolutely no relevant information exists in ANY excerpt, only then say: "I could not find a supported answer in the document."
Be helpful and thorough - users want complete answers based on the document content."""
def __init__(
self,
pinecone_api_key: Optional[str] = None,
index_name: str = "agentic-ai-ebook",
namespace: str = "agentic-ai",
openai_api_key: Optional[str] = None,
groq_api_key: Optional[str] = None,
embedding_model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
top_k: int = 6,
local_only: bool = False,
chunks_file: Optional[str] = None
):
"""
Initialize the RAG pipeline.
Args:
pinecone_api_key: Pinecone API key (or set PINECONE_API_KEY env var)
index_name: Name of Pinecone index
namespace: Pinecone namespace
openai_api_key: OpenAI API key for LLM (optional)
groq_api_key: Groq API key for LLM (optional, free alternative)
embedding_model_name: Name of embedding model
top_k: Number of chunks to retrieve
local_only: Use local vector store instead of Pinecone
chunks_file: Path to chunks.jsonl for local retrieval
"""
self.top_k = top_k
self.local_only = local_only
# Load embedding model (cached to avoid reloading)
self.embedding_model = get_embedding_model(embedding_model_name)
self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension()
# Initialize vector store
if local_only:
self.vector_store = LocalVectorStore(dimension=self.embedding_dim)
# Load vectors from file if provided
vectors_file = chunks_file.replace('chunks.jsonl', 'vectors.json') if chunks_file else './data/vectors.json'
if os.path.exists(vectors_file):
self.vector_store.load_from_file(vectors_file)
else:
api_key = pinecone_api_key or os.getenv("PINECONE_API_KEY")
self.vector_store = PineconeVectorStore(
api_key=api_key,
index_name=index_name,
namespace=namespace,
dimension=self.embedding_dim
)
# Connect to existing index
if self.vector_store.pc:
self.vector_store.index = self.vector_store.pc.Index(index_name)
# Load chunks for full text retrieval
if chunks_file and os.path.exists(chunks_file):
self.vector_store.load_chunks_map(chunks_file)
elif os.path.exists('./data/chunks.jsonl'):
self.vector_store.load_chunks_map('./data/chunks.jsonl')
# Initialize LLM client - prefer Groq (free), then OpenAI
self.openai_client = None
self.groq_client = None
self.llm_provider = None
# Try Groq first (it's free!)
groq_key = groq_api_key or os.getenv("GROQ_API_KEY")
if groq_key and GROQ_AVAILABLE:
self.groq_client = Groq(api_key=groq_key)
self.llm_provider = "groq"
print("Groq client initialized - will use Groq LLM for answer generation (FREE!)")
else:
# Fall back to OpenAI
openai_key = openai_api_key or os.getenv("OPENAI_API_KEY")
if openai_key and OPENAI_AVAILABLE:
self.openai_client = OpenAI(api_key=openai_key)
self.llm_provider = "openai"
print("OpenAI client initialized - will use OpenAI LLM for answer generation")
else:
print("No LLM key - will use extractive answer fallback")
# Build the LangGraph pipeline
if LANGGRAPH_AVAILABLE:
self.graph = self._build_graph()
else:
self.graph = None
def _build_graph(self) -> StateGraph:
"""Build the LangGraph state machine."""
# Create the graph
workflow = StateGraph(RAGState)
# Add nodes
workflow.add_node("embed_query", self._embed_query_node)
workflow.add_node("retrieve_chunks", self._retrieve_chunks_node)
workflow.add_node("calculate_confidence", self._calculate_confidence_node)
workflow.add_node("generate_answer", self._generate_answer_node)
# Define the flow
workflow.set_entry_point("embed_query")
workflow.add_edge("embed_query", "retrieve_chunks")
workflow.add_edge("retrieve_chunks", "calculate_confidence")
workflow.add_edge("calculate_confidence", "generate_answer")
workflow.add_edge("generate_answer", END)
# Compile the graph
return workflow.compile()
def _embed_query_node(self, state: RAGState) -> Dict:
"""Embed the user query."""
query = state["query"]
try:
# Generate embedding
embedding = self.embedding_model.encode(query, convert_to_numpy=True)
return {"query_embedding": embedding.tolist()}
except Exception as e:
return {"error": f"Embedding failed: {str(e)}"}
def _retrieve_chunks_node(self, state: RAGState) -> Dict:
"""Retrieve relevant chunks from vector store."""
query_embedding = state.get("query_embedding")
if not query_embedding:
return {"error": "No query embedding available"}
try:
# Query vector store
results = self.vector_store.query_top_k(
query_vector=query_embedding,
k=self.top_k
)
# Extract chunks and scores
retrieved_chunks = []
raw_scores = []
for result in results:
retrieved_chunks.append({
"id": result["id"],
"page": result["page"],
"text": result["text"],
"score": round(result["score"], 4)
})
raw_scores.append(result["score"])
return {
"retrieved_chunks": retrieved_chunks,
"raw_scores": raw_scores
}
except Exception as e:
return {"error": f"Retrieval failed: {str(e)}"}
def _calculate_confidence_node(self, state: RAGState) -> Dict:
"""Calculate confidence score from retrieval scores."""
raw_scores = state.get("raw_scores", [])
if not raw_scores:
return {"confidence": 0.0}
# Compute confidence using max of normalized scores
confidence = compute_confidence(raw_scores, method="max")
return {"confidence": confidence}
def _generate_answer_node(self, state: RAGState) -> Dict:
"""Generate the final answer using LLM or extractive fallback."""
query = state["query"]
chunks = state.get("retrieved_chunks", [])
use_llm = state.get("use_llm", True)
if not chunks:
return {
"final_answer": "I could not find any relevant information in the document."
}
# Format chunks for context
context = format_chunks_for_llm(chunks)
# Try LLM generation if available and requested
if use_llm and (self.groq_client or self.openai_client):
try:
answer = self._generate_with_llm(query, context)
return {"final_answer": answer}
except Exception as e:
print(f"LLM generation failed: {e}, falling back to extractive")
# Extractive fallback - return the most relevant chunks
answer = self._generate_extractive_answer(query, chunks)
return {"final_answer": answer}
def _generate_with_llm(self, query: str, context: str) -> str:
"""
Generate answer using LLM (Groq or OpenAI).
The prompt strictly instructs the model to only use provided context.
"""
# Construct the user message with context
user_message = f"""===BEGIN EXCERPTS===
{context}
===END EXCERPTS===
Question: {query}
Answer the question using ONLY the information from the excerpts above. If the answer is not in the excerpts, say "I could not find a supported answer in the document."
"""
# Use Groq if available (it's free!)
if self.groq_client:
response = self.groq_client.chat.completions.create(
model="llama-3.1-8b-instant", # Fast and free model
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": user_message}
],
temperature=0.1, # Low temperature for more factual responses
max_tokens=500
)
return response.choices[0].message.content
# Fall back to OpenAI
if self.openai_client:
response = self.openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": user_message}
],
temperature=0.1,
max_tokens=500
)
return response.choices[0].message.content
raise Exception("No LLM client available")
def _generate_extractive_answer(self, query: str, chunks: List[Dict]) -> str:
"""
Generate an extractive answer by returning the most relevant chunks.
This is the fallback when no LLM is available.
"""
# Header
answer_parts = [
"**Answer based on document excerpts:**\n",
"*(No LLM available - showing relevant passages from the document)*\n"
]
# Add top chunks (limit to top 2-3 for readability)
top_chunks = chunks[:3]
for i, chunk in enumerate(top_chunks, 1):
page = chunk.get("page", "unknown")
text = chunk.get("text", "")
score = chunk.get("score", 0)
# Truncate very long chunks
if len(text) > 500:
text = text[:500] + "..."
answer_parts.append(f"\n**[Excerpt {i}, Page {page}]** (relevance: {score:.2f})")
answer_parts.append(f"\n{text}\n")
return "".join(answer_parts)
def query(
self,
user_query: str,
top_k: Optional[int] = None,
use_llm: bool = True
) -> Dict[str, Any]:
"""
Run a query through the RAG pipeline.
Args:
user_query: The user's question
top_k: Number of chunks to retrieve (overrides default)
use_llm: Whether to use LLM for generation
Returns:
Dict with final_answer, retrieved_chunks, and confidence
"""
# Override top_k if provided
if top_k:
self.top_k = top_k
# Initial state
initial_state: RAGState = {
"query": user_query,
"query_embedding": None,
"retrieved_chunks": [],
"raw_scores": [],
"confidence": 0.0,
"final_answer": "",
"use_llm": use_llm and (self.groq_client is not None or self.openai_client is not None),
"error": None
}
# Run the pipeline
if self.graph:
# Use LangGraph
final_state = self.graph.invoke(initial_state)
else:
# Fallback: run nodes manually
final_state = initial_state
# Embed query
result = self._embed_query_node(final_state)
final_state.update(result)
# Retrieve chunks
result = self._retrieve_chunks_node(final_state)
final_state.update(result)
# Calculate confidence
result = self._calculate_confidence_node(final_state)
final_state.update(result)
# Generate answer
result = self._generate_answer_node(final_state)
final_state.update(result)
# Format response
response = {
"final_answer": final_state.get("final_answer", ""),
"retrieved_chunks": final_state.get("retrieved_chunks", []),
"confidence": final_state.get("confidence", 0.0)
}
# Add error if any
if final_state.get("error"):
response["error"] = final_state["error"]
return response
# ============================================================================
# Convenience function for simple usage
# ============================================================================
def create_rag_pipeline(
pinecone_api_key: Optional[str] = None,
openai_api_key: Optional[str] = None,
index_name: str = "agentic-ai-ebook",
local_only: bool = False
) -> RAGPipeline:
"""
Factory function to create a RAG pipeline.
Args:
pinecone_api_key: Pinecone API key
openai_api_key: OpenAI API key
index_name: Name of Pinecone index
local_only: Use local storage instead of Pinecone
Returns:
Configured RAGPipeline instance
"""
return RAGPipeline(
pinecone_api_key=pinecone_api_key,
openai_api_key=openai_api_key,
index_name=index_name,
local_only=local_only
)
# ============================================================================
# Main - Quick test
# ============================================================================
if __name__ == "__main__":
print("Testing RAG Pipeline...")
print("=" * 60)
# Check if we should use local mode
local_mode = not os.getenv("PINECONE_API_KEY")
if local_mode:
print("No PINECONE_API_KEY found, using local mode")
print("Make sure you have run ingest.py with --local-only first!")
# Create pipeline
pipeline = RAGPipeline(local_only=local_mode)
# Test query
test_query = "What is agentic AI?"
print(f"\nTest query: {test_query}")
print("-" * 40)
result = pipeline.query(test_query)
print(f"\nFinal Answer:")
print(result["final_answer"])
print(f"\nConfidence: {result['confidence']}")
print(f"\nRetrieved Chunks: {len(result['retrieved_chunks'])}")
for chunk in result["retrieved_chunks"]:
print(f" - {chunk['id']} (page {chunk['page']}, score: {chunk['score']})")
print("\n" + "=" * 60)
print("Pipeline test complete!")