Pinecone_Qdrant / app.py
DarshanaD's picture
Initial commit1
9bdea3f
import streamlit as st
import boto3
import os
import tempfile
import sys
from typing import List, Dict, Any
from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import BedrockEmbeddings
from langchain.vectorstores import Pinecone as LangchainPinecone
from langchain.llms.bedrock import Bedrock
from langchain.chains import RetrievalQA
from langchain.schema import Document
from dotenv import load_dotenv
import warnings
warnings.filterwarnings("ignore")
# Debug: Print Python environment and working directory
print("Python executable:", sys.executable)
print("Current working directory:", os.getcwd())
# Debug: Check if pinecone and qdrant-client are available
PINECONE_AVAILABLE = False
QDRANT_AVAILABLE = False
# Check for Pinecone
try:
from pinecone import Pinecone, ServerlessSpec
import pinecone
print("Pinecone version:", pinecone.__version__)
print("Pinecone module location:", pinecone.__file__)
PINECONE_AVAILABLE = True
except ImportError as e:
print("Failed to import Pinecone:", str(e))
# Check for Qdrant
try:
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from langchain.vectorstores import Qdrant
QDRANT_AVAILABLE = True
except ImportError as e:
print("Failed to import Qdrant:", str(e))
class ManagedVectorRAGChatbot:
def __init__(self):
self.embeddings = None
self.vectorstore = None
self.qa_chain = None
self.bedrock_client = None
self.documents_processed = False
self.session = None
self.vector_db_type = None
# Vector DB clients
self.pinecone_client = None
self.qdrant_client = None
# Load environment variables at initialization
self._load_env_vars()
# Automatically initialize Bedrock on instantiation
self.initialization_success, self.initialization_message = self.initialize_bedrock()
def _load_env_vars(self):
"""Load environment variables from .env file with debugging."""
print("Attempting to load .env file...")
# Try to load .env from the current working directory
env_loaded = load_dotenv(override=True)
if not env_loaded:
# Fallback: Try the script's directory
script_dir = os.path.dirname(os.path.abspath(__file__))
env_path = os.path.join(script_dir, ".env")
print(f"Trying to load .env from: {env_path}")
env_loaded = load_dotenv(env_path, override=True)
if not env_loaded:
print("Warning: .env file not found. Using system environment variables if set.")
else:
print(".env file loaded successfully")
def _setup_aws_client(self):
"""Set up the AWS Bedrock client with credentials from environment."""
try:
# Get AWS credentials from environment variables
aws_access_key = os.environ.get("AWS_ACCESS_KEY_ID")
aws_secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY")
aws_region = os.environ.get("AWS_REGION", "us-east-1")
# Debug statements
print("AWS_ACCESS_KEY_ID:", aws_access_key)
print("AWS_REGION:", aws_region)
# Check if credentials are provided
if not aws_access_key or not aws_secret_key:
raise ValueError("AWS_ACCESS_KEY_ID or AWS_SECRET_ACCESS_KEY not found in environment")
# Create a boto3 session with the credentials
self.session = boto3.Session(
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
region_name=aws_region
)
# Create a Bedrock client
self.bedrock_client = self.session.client(
service_name='bedrock-runtime'
)
return True, "AWS Bedrock client initialized successfully!"
except Exception as e:
return False, f"Error setting up AWS client: {str(e)}"
def initialize_bedrock(self):
"""Initialize AWS Bedrock client and embeddings."""
try:
# Set up AWS client using credentials from environment
success, message = self._setup_aws_client()
if not success:
return False, message
# Initialize Bedrock embeddings
self.embeddings = BedrockEmbeddings(
client=self.bedrock_client,
model_id="amazon.titan-embed-text-v1"
)
return True, "Bedrock initialized successfully!"
except Exception as e:
return False, f"Error initializing Bedrock: {str(e)}"
def initialize_pinecone(self):
"""Initialize Pinecone vector database."""
if not PINECONE_AVAILABLE:
return False, f"Pinecone library not installed. Run: pip install pinecone. Error: {str(sys.exc_info()[1])}"
try:
api_key = os.environ.get("PINECONE_API_KEY")
index_name = os.environ.get("PINECONE_INDEX_NAME", "rag-chatbot-index")
print("PINECONE_API_KEY:", api_key)
print("PINECONE_INDEX_NAME:", index_name)
if not api_key:
return False, "PINECONE_API_KEY not found in environment"
# Initialize Pinecone with new client
pc = Pinecone(api_key=api_key)
# Check if index exists, create if not
existing_indexes = [index.name for index in pc.list_indexes()]
if index_name not in existing_indexes:
# Create index with dimension 1536 (Titan embeddings dimension)
pc.create_index(
name=index_name,
dimension=1536,
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region="us-east-1"
)
)
st.info(f"Created new Pinecone index: {index_name}")
# Connect to index
index = pc.Index(index_name)
self.pinecone_client = index
self.vector_db_type = "pinecone"
return True, f"Pinecone initialized successfully with index: {index_name}"
except Exception as e:
return False, f"Error initializing Pinecone: {str(e)}"
def initialize_qdrant(self):
"""Initialize QDrant vector database."""
if not QDRANT_AVAILABLE:
return False, "QDrant library not installed. Run: pip install qdrant-client"
try:
url = os.environ.get("QDRANT_URL")
api_key = os.environ.get("QDRANT_API_KEY")
# Debug statements
print("QDRANT_URL:", url)
print("QDRANT_API_KEY:", api_key)
if not url:
return False, "QDRANT_URL not found in environment"
if not api_key:
return False, "QDRANT_API_KEY not found in environment"
# Initialize QDrant client
self.qdrant_client = QdrantClient(
url=url,
api_key=api_key,
)
# Test connection
collections = self.qdrant_client.get_collections()
self.vector_db_type = "qdrant"
return True, f"QDrant initialized successfully. Found {len(collections.collections)} collections."
except Exception as e:
return False, f"Error initializing QDrant: {str(e)}"
def process_pdf_with_pinecone(self, pdf_file) -> tuple[bool, str]:
"""Process PDF file and create vector embeddings with Pinecone."""
try:
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
tmp_file.write(pdf_file.getvalue())
tmp_file_path = tmp_file.name
# Load PDF
loader = PyPDFLoader(tmp_file_path)
documents = loader.load()
# Split documents into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
)
texts = text_splitter.split_documents(documents)
# Create Pinecone vector store
index_name = os.environ.get("PINECONE_INDEX_NAME", "rag-chatbot-index")
self.vectorstore = LangchainPinecone.from_documents(
documents=texts,
embedding=self.embeddings,
index_name=index_name,
namespace=None
)
# Initialize QA chain
self._initialize_qa_chain()
# Clean up temporary file
os.unlink(tmp_file_path)
self.documents_processed = True
return True, f"PDF processed successfully with Pinecone! Created {len(texts)} text chunks."
except Exception as e:
return False, f"Error processing PDF with Pinecone: {str(e)}"
def process_pdf_with_qdrant(self, pdf_file) -> tuple[bool, str]:
"""Process PDF file and create vector embeddings with QDrant."""
try:
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
tmp_file.write(pdf_file.getvalue())
tmp_file_path = tmp_file.name
# Load PDF
loader = PyPDFLoader(tmp_file_path)
documents = loader.load()
# Split documents into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
)
texts = text_splitter.split_documents(documents)
# Create QDrant vector store
collection_name = "rag_documents"
self.vectorstore = Qdrant.from_documents(
documents=texts,
embedding=self.embeddings,
url=os.environ.get("QDRANT_URL"),
api_key=os.environ.get("QDRANT_API_KEY"),
collection_name=collection_name,
force_recreate=True,
)
# Initialize QA chain
self._initialize_qa_chain()
# Clean up temporary file
os.unlink(tmp_file_path)
self.documents_processed = True
return True, f"PDF processed successfully with QDrant! Created {len(texts)} text chunks."
except Exception as e:
return False, f"Error processing PDF with QDrant: {str(e)}"
def _initialize_qa_chain(self):
"""Initialize the QA chain with the vector store."""
# Initialize LLM
llm = Bedrock(
client=self.bedrock_client,
model_id="anthropic.claude-v2",
model_kwargs={
"max_tokens_to_sample": 512,
"temperature": 0.1,
"top_p": 0.9,
}
)
# Create QA chain
self.qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=self.vectorstore.as_retriever(
search_kwargs={"k": 3}
),
return_source_documents=True
)
def query(self, question: str) -> Dict[str, Any]:
"""Query the RAG system."""
if not self.qa_chain:
return {
"answer": "Please upload and process a PDF file first.",
"sources": []
}
try:
# Get response from QA chain
response = self.qa_chain({"query": question})
# Extract source information
sources = []
if "source_documents" in response:
for doc in response["source_documents"]:
sources.append({
"content": doc.page_content[:200] + "...",
"metadata": doc.metadata
})
return {
"answer": response["result"],
"sources": sources
}
except Exception as e:
return {
"answer": f"Error processing query: {str(e)}",
"sources": []
}
def main():
st.set_page_config(
page_title="Enhanced RAG Chatbot with Managed Vector DBs",
page_icon="πŸš€",
layout="wide"
)
st.title("πŸš€ Enhanced RAG Chatbot with Managed Vector Databases")
st.markdown("Upload a PDF document and chat with it using Amazon Bedrock + Pinecone/QDrant!")
# Initialize session state
if "chatbot" not in st.session_state:
st.session_state.chatbot = ManagedVectorRAGChatbot()
if "messages" not in st.session_state:
st.session_state.messages = []
if "bedrock_initialized" not in st.session_state:
st.session_state.bedrock_initialized = st.session_state.chatbot.initialization_success
if "vector_db_initialized" not in st.session_state:
st.session_state.vector_db_initialized = False
if "vector_db_type" not in st.session_state:
st.session_state.vector_db_type = None
# Sidebar for configuration and file upload
with st.sidebar:
st.header("πŸ”§ Configuration")
# Display Bedrock initialization status
if st.session_state.bedrock_initialized:
st.success("βœ… AWS Bedrock initialized")
else:
st.error("❌ " + st.session_state.chatbot.initialization_message)
st.markdown("---")
# Vector Database Selection
st.header("πŸ—ƒοΈ Vector Database")
vector_db_choice = st.selectbox(
"Choose Vector Database:",
["Select...", "Pinecone", "QDrant"],
disabled=not st.session_state.bedrock_initialized
)
if vector_db_choice != "Select..." and st.session_state.bedrock_initialized:
if st.button(f"Initialize {vector_db_choice}"):
with st.spinner(f"Initializing {vector_db_choice}..."):
if vector_db_choice == "Pinecone":
success, message = st.session_state.chatbot.initialize_pinecone()
else: # QDrant
success, message = st.session_state.chatbot.initialize_qdrant()
if success:
st.success(message)
st.session_state.vector_db_initialized = True
st.session_state.vector_db_type = vector_db_choice.lower()
else:
st.error(message)
if st.session_state.vector_db_initialized:
st.success(f"βœ… {vector_db_choice} initialized")
st.markdown("---")
# File upload
st.header("πŸ“„ Document Upload")
uploaded_file = st.file_uploader(
"Choose a PDF file",
type="pdf",
disabled=not st.session_state.vector_db_initialized
)
if uploaded_file and st.session_state.vector_db_initialized:
if st.button("Process PDF"):
with st.spinner("Processing PDF..."):
if st.session_state.vector_db_type == "pinecone":
success, message = st.session_state.chatbot.process_pdf_with_pinecone(uploaded_file)
else: # qdrant
success, message = st.session_state.chatbot.process_pdf_with_qdrant(uploaded_file)
if success:
st.success(message)
else:
st.error(message)
# Main chat interface
col1, col2 = st.columns([2, 1])
with col1:
st.header("πŸ’¬ Chat Interface")
# Display chat messages
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Chat input
if prompt := st.chat_input("Ask a question about your document..."):
if not st.session_state.bedrock_initialized:
st.error("Bedrock initialization failed. Please check your environment variables and try again.")
st.stop()
if not st.session_state.vector_db_initialized:
st.error("Please initialize a vector database first!")
st.stop()
if not st.session_state.chatbot.documents_processed:
st.error("Please upload and process a PDF document first!")
st.stop()
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# Get bot response
with st.chat_message("assistant"):
with st.spinner("Thinking..."):
response = st.session_state.chatbot.query(prompt)
st.markdown(response["answer"])
# Add assistant response to chat history
st.session_state.messages.append({
"role": "assistant",
"content": response["answer"]
})
with col2:
st.header("πŸ“Š System Status")
# Status indicators
if st.session_state.bedrock_initialized:
st.success("βœ… Bedrock Ready")
else:
st.error("❌ Bedrock Failed")
if st.session_state.vector_db_initialized:
st.success(f"βœ… {st.session_state.vector_db_type.title()} Ready")
else:
st.warning("⚠️ No Vector DB")
if st.session_state.chatbot.documents_processed:
st.success("βœ… Document Processed")
else:
st.info("πŸ“„ No Document")
if st.button("πŸ—‘οΈ Clear Chat"):
st.session_state.messages = []
st.rerun()
st.markdown("---")
# Comparison info
with st.expander("πŸ” Vector DB Comparison"):
st.markdown("""
**Pinecone:**
- βœ… Managed service, fully hosted
- βœ… Excellent performance & scaling
- βœ… Great documentation
- ⚠️ Pricing can add up
**QDrant:**
- βœ… Open source option
- βœ… Good performance
- βœ… More cost-effective
- βœ… Self-hosting available
""")
st.markdown("---")
st.markdown("""
### πŸš€ How to use:
1. Ensure AWS & vector DB credentials are set (in .env or environment)
2. Choose Pinecone or QDrant
3. Initialize your chosen vector database
4. Upload a PDF file
5. Process the PDF
6. Start chatting!
### ✨ Enhanced Features:
- Choose between Pinecone & QDrant
- Managed vector database hosting
- Better scalability & performance
- Production-ready architecture
""")
##
if __name__ == "__main__":
main()