demo2 / streamlit_app.py
Dinesh310's picture
Update streamlit_app.py
c00effc verified
raw
history blame
7.61 kB
"""
Streamlit UI for Agentic RAG System
- Default URL ingestion
- Sidebar PDF upload
- Incremental indexing
- Question answering with sources
"""
import streamlit as st
from pathlib import Path
import sys
import time
import os
# -------------------------------------------------
# Path setup
# -------------------------------------------------
sys.path.append(str(Path(__file__).parent))
# -------------------------------------------------
# Project imports
# -------------------------------------------------
from src.config.config import Config
from src.document_ingestion.document_processor import DocumentProcessor
from src.vectorstore.vectorstore import VectorStore
from src.graph_builder.graph_builder import GraphBuilder
# -------------------------------------------------
# Page configuration
# -------------------------------------------------
st.set_page_config(
page_title="πŸ€– Agentic RAG Search",
page_icon="πŸ”",
layout="centered"
)
# -------------------------------------------------
# Simple CSS
# -------------------------------------------------
st.markdown(
"""
<style>
.stButton > button {
width: 100%;
background-color: #4CAF50;
color: white;
font-weight: bold;
}
</style>
""",
unsafe_allow_html=True
)
# -------------------------------------------------
# Session state initialization
# -------------------------------------------------
def init_session_state():
if "rag_system" not in st.session_state:
st.session_state.rag_system = None
if "initialized" not in st.session_state:
st.session_state.initialized = False
if "history" not in st.session_state:
st.session_state.history = []
if "processed_files" not in st.session_state:
st.session_state.processed_files = []
# -------------------------------------------------
# RAG system initialization (cached)
# -------------------------------------------------
@st.cache_resource
def initialize_rag():
"""
Initializes RAG using default URLs.
This runs ONLY once due to caching.
"""
try:
llm = Config.get_llm()
doc_processor = DocumentProcessor(
chunk_size=Config.CHUNK_SIZE,
chunk_overlap=Config.CHUNK_OVERLAP
)
vector_store = VectorStore()
# Load default URLs
urls = Config.DEFAULT_URLS
documents = doc_processor.process_urls(urls)
# Create vector store
vector_store.create_vectorstore(documents)
# Build agentic graph
graph_builder = GraphBuilder(
retriever=vector_store.get_retriever(),
llm=llm
)
graph_builder.build()
return graph_builder, vector_store, doc_processor, len(documents)
except Exception as e:
st.error(f"Initialization failed: {str(e)}")
return None, None, None, 0
# -------------------------------------------------
# Main app
# -------------------------------------------------
def main():
init_session_state()
# -------------------------------
# Title
# -------------------------------
st.title("πŸ” Agentic RAG Document Search")
st.markdown("Ask questions over default docs or uploaded PDFs")
# -------------------------------
# Initialize RAG system
# -------------------------------
if not st.session_state.initialized:
with st.spinner("Loading RAG system..."):
rag_system, vector_store, doc_processor, num_chunks = initialize_rag()
if rag_system:
st.session_state.rag_system = rag_system
st.session_state.vector_store = vector_store
st.session_state.doc_processor = doc_processor
st.session_state.initialized = True
st.success(f"βœ… System ready! ({num_chunks} chunks indexed)")
# -------------------------------------------------
# Sidebar: PDF Upload
# -------------------------------------------------
st.sidebar.header("πŸ“„ Upload Project PDFs")
uploaded_files = st.sidebar.file_uploader(
"Upload PDF documents",
type="pdf",
accept_multiple_files=True
)
if uploaded_files:
uploaded_names = {f.name for f in uploaded_files}
if (
not st.session_state.processed_files
or set(st.session_state.processed_files) != uploaded_names
):
with st.spinner("Analyzing uploaded PDFs..."):
temp_dir = "temp"
os.makedirs(temp_dir, exist_ok=True)
paths = []
for f in uploaded_files:
path = os.path.join(temp_dir, f.name)
with open(path, "wb") as out:
out.write(f.getbuffer())
paths.append(path)
# Process PDFs
documents = st.session_state.doc_processor.process_pdfs(paths)
# Add to existing vector store
st.session_state.vector_store.add_documents(documents)
# Update processed file list
st.session_state.processed_files = list(uploaded_names)
st.sidebar.success("πŸ“š PDFs indexed successfully!")
st.markdown("---")
# -------------------------------------------------
# Query input
# -------------------------------------------------
with st.form("search_form"):
question = st.text_input(
"Enter your question:",
placeholder="Ask something about the documents..."
)
submit = st.form_submit_button("πŸ” Search")
# -------------------------------------------------
# Query processing
# -------------------------------------------------
if submit and question:
if st.session_state.rag_system:
with st.spinner("Searching..."):
start_time = time.time()
result = st.session_state.rag_system.run(question)
elapsed_time = time.time() - start_time
# Save history
st.session_state.history.append(
{
"question": question,
"answer": result["answer"],
"time": elapsed_time,
}
)
# Display answer
st.markdown("### πŸ’‘ Answer")
st.success(result["answer"])
# Show retrieved documents
with st.expander("πŸ“„ Source Documents"):
for i, doc in enumerate(result["retrieved_docs"], 1):
st.text_area(
f"Document {i}",
doc.page_content[:300] + "...",
height=100,
disabled=True,
)
st.caption(f"⏱️ Response time: {elapsed_time:.2f} seconds")
# -------------------------------------------------
# Search history
# -------------------------------------------------
if st.session_state.history:
st.markdown("---")
st.markdown("### πŸ“œ Recent Searches")
for item in reversed(st.session_state.history[-3:]):
st.markdown(f"**Q:** {item['question']}")
st.markdown(f"**A:** {item['answer'][:200]}...")
st.caption(f"Time: {item['time']:.2f}s")
# -------------------------------------------------
# Entry point
# -------------------------------------------------
if __name__ == "__main__":
main()