PDF-Chatbot-RAG / src /rag_pipelline.py
Piyush86's picture
Deployment
204d2a4
import os
import re
import time
import random
from dotenv import load_dotenv
from PyPDF2 import PdfReader
from langchain.chat_models import init_chat_model # new universal model initializer
from langchain.agents import create_agent # replaces AgentExecutor
from langchain.tools import tool # tool decorator
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
load_dotenv()
# Constant values
CHUNK_SIZE = 2500
CHUNK_OVERLAP = 300
EMBED_DELAY_SECONDS = 1.5
TPM_SAFE_THRESHOLD = 27000
MAX_RETRIES = 3
# Extract Text from the PDF
def extract_text_from_pdf(pdf_file):
"""
Reads each page of the PDF and extracts raw text
Some page smay return None so we use ''or empty string' as safety
"""
pdf_reader = PdfReader(pdf_file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text() or ""
return text
# Split the text into Chunks
def split_text_into_chunks(raw_text):
"""
split the text into chunks of 1000 characters to avoid hiting the token limits
chunk_overlap = 200
"""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size = CHUNK_SIZE,
chunk_overlap = CHUNK_OVERLAP,
length_function = len
)
chunks = text_splitter.split_text(raw_text)
return chunks
# Create Vector Store
def create_vector_store(text_chunks):
"""
Embed chunks with Gemini and store in FAISS.
Respects all three free tier limits:
RPM = 100 β†’ actual ~40 RPM (1.5s delay)
TPM = 30K β†’ pauses at 27K tokens/min threshold
RPD = 1K β†’ chunk_size=2500 minimises daily calls
On 429 : reads Google's retry delay from error, waits + jitter
On RPD exhausted : raises clear message immediately
"""
embeddings = GoogleGenerativeAIEmbeddings(
model = "models/gemini-embedding-001"
)
total = len(text_chunks)
est_min = (total * EMBED_DELAY_SECONDS) / 60
print(f"\nEmbedding {total} chunks β€” est. {est_min:.1f} min")
print(f"RPD usage: {total}/1,000 daily quota\n")
vector_store = None
tokens_this_minute = 0
minute_start = time.time()
for idx, chunk in enumerate(text_chunks):
chunk_tokens = max(1, len(chunk) // 4) # 1 token β‰ˆ 4 chars
# --- TPM Guard: pause if approaching 30K tokens/min ---
elapsed = time.time() - minute_start
if elapsed < 60 and (tokens_this_minute + chunk_tokens) > TPM_SAFE_THRESHOLD:
wait = 60 - elapsed + 2
print(f" TPM guard β€” {tokens_this_minute:,} tokens sent. Waiting {wait:.0f}s...")
time.sleep(wait)
tokens_this_minute = 0
minute_start = time.time()
if time.time() - minute_start >= 60:
tokens_this_minute = 0
minute_start = time.time()
# --- Embed with retry ---
for attempt in range(1, MAX_RETRIES + 1):
try:
if vector_store is None:
vector_store = FAISS.from_texts(texts=[chunk], embedding=embeddings)
else:
vector_store.add_texts(texts=[chunk])
tokens_this_minute += chunk_tokens
if (idx + 1) % 10 == 0 or (idx + 1) == total:
print(f" βœ“ {idx+1}/{total} chunks | ~{tokens_this_minute:,} tokens this min")
break
except Exception as e:
err = str(e)
if "429" in err or "RESOURCE_EXHAUSTED" in err:
match = re.search(r"retry[^\d]*(\d+\.?\d*)\s*s", err, re.IGNORECASE)
g_wait = float(match.group(1)) if match else 30
wait = g_wait + random.uniform(1, 3)
print(f" 429 on chunk {idx+1} (attempt {attempt}/{MAX_RETRIES}). Waiting {wait:.0f}s...")
time.sleep(wait)
tokens_this_minute = 0
minute_start = time.time()
if attempt == MAX_RETRIES:
raise Exception(
f"Chunk {idx+1} failed after {MAX_RETRIES} retries. "
f"Daily quota may be exhausted β€” try again tomorrow."
) from e
elif "per day" in err.lower():
raise Exception(
"Daily RPD quota (1,000) exhausted. "
"Resets at midnight Pacific Time. Try again tomorrow."
) from e
else:
raise
time.sleep(EMBED_DELAY_SECONDS)
print(f"\nβœ… Done β€” {total} chunks stored in FAISS.\n")
return vector_store
# Build RAG agent
def create_rag_agent(vector_store):
model = init_chat_model(
"google_genai:gemini-2.5-flash",
temperature = 0
)
@tool(response_format = "content_and_artifact")
def retrieve_context(query:str):
"""
searches the PDF for context relevant to query and return relevant chunks from the PDF
"""
retrieved_docs = vector_store.similarity_search(query, k = 3)
# Format docs as readable text for the LLM
serialized = "\n\n".join(
f"[Chunk {i+1}] :\n{doc.page_content}"
for i, doc in enumerate(retrieved_docs)
)
return serialized, retrieved_docs # content for LLM, raw docs for UI
tools = [retrieve_context]
system_prompt = (
"You are a helpful assistant that answers questions about an uploaded PDF document "
"You have access to a retrieval tool that searches the PDF content for context relevant to the question"
"Always use the retrieval tool to find relevant information before answering the question"
"If the document does not contain the answer, say so clearly"
"Keep your answers concise, accurate and grounded in the document content"
)
agent = create_agent(model, tools, system_prompt = system_prompt)
return agent
# Get the answer with conversation history
def get_answer(agent, user_question, chat_history):
# convert history dicts to langchain message objects
messages = []
for msg in chat_history:
if msg['role'] == "user":
messages.append(HumanMessage(content = msg["content"]))
elif msg['role'] =="assistant":
messages.append(AIMessage(content = msg["content"]))
# Append the current question
messages.append(HumanMessage(content = user_question))
source_docs = []
final_answer = ""
# Stream through all agent steps
for step in agent.stream({"messages" : messages}, stream_mode = "values"):
last_message = step["messages"][-1]
#Collect source docs from tool message
if isinstance(last_message, ToolMessage):
if hasattr(last_message, "artifact") and isinstance(last_message.artifact, list):
source_docs = last_message.artifact
# Extract final answer from the AIMessage only
if isinstance(last_message, AIMessage):
content = last_message.content
# Handle string content (common case)
if isinstance(content, str) and content.strip():
final_answer = content
# Handle list of content blocks
elif isinstance(content, list):
text_parts = [
block.get("text", "") if isinstance(block, dict) else str(block)
for block in content
]
assembled = " ".join(part for part in text_parts if part.strip())
if assembled:
final_answer = assembled
# Fallback if no answer was captured
if not final_answer:
final_answer = "I was unable to generate response. Please try rephrasing your question"
return final_answer, source_docs