Rag / main.py
andrewammann's picture
Create main.py
06c5826 verified
import streamlit as st
import json
import pandas as pd
from typing import Dict, List, Any
import os
from pdf_processor import PDFProcessor
from rag_system import RAGSystem
from export_utils import ExportUtils
from datetime import datetime
# Page configuration
st.set_page_config(
page_title="Agentic PDF RAG System",
page_icon="🧠",
layout="wide"
)
# Initialize session state
if 'rag_results' not in st.session_state:
st.session_state.rag_results = []
if 'query_history' not in st.session_state:
st.session_state.query_history = []
if 'ingested_documents' not in st.session_state:
st.session_state.ingested_documents = []
def main():
st.title("🧠 Agentic PDF RAG System")
st.markdown("Upload PDFs to build your knowledge base, then ask questions to get AI-powered answers")
# Initialize processors
pdf_processor = PDFProcessor()
rag_system = RAGSystem()
export_utils = ExportUtils()
# Sidebar for configuration and status
with st.sidebar:
st.header("βš™οΈ System Configuration")
# API Key status
st.subheader("API Status")
gemini_api_key = os.getenv("GEMINI_API_KEY")
if gemini_api_key:
st.success("βœ… Google Gemini API key configured")
else:
st.error("❌ Google Gemini API key not found")
st.info("Please set GEMINI_API_KEY environment variable")
st.markdown("---")
# Knowledge Base Stats
st.subheader("πŸ“Š Knowledge Base Stats")
try:
stats = rag_system.get_vector_store_stats()
if stats['status'] == 'active':
st.metric("Total Documents", stats.get('total_documents', 0))
st.metric("Total Chunks", stats.get('total_chunks', 0))
st.success("Knowledge base is active")
elif stats['status'] == 'empty':
st.info("Knowledge base is empty")
st.metric("Total Documents", 0)
st.metric("Total Chunks", 0)
else:
st.error(f"Error: {stats.get('error', 'Unknown error')}")
except Exception as e:
st.warning(f"Could not load stats: {str(e)}")
st.markdown("---")
# Document Management
st.subheader("πŸ“š Document Management")
# Clear knowledge base button
if st.button("πŸ—‘οΈ Clear Knowledge Base", type="secondary"):
if st.session_state.get('confirm_clear', False):
result = rag_system.clear_knowledge_base()
if result['status'] == 'success':
st.session_state.ingested_documents = []
st.success("Knowledge base cleared!")
st.rerun()
else:
st.error(f"Error clearing: {result['error']}")
st.session_state.confirm_clear = False
else:
st.session_state.confirm_clear = True
st.warning("Click again to confirm clearing all documents")
# Document list
documents = rag_system.get_document_list()
if documents:
st.write("**Ingested Documents:**")
for doc in documents:
with st.expander(f"πŸ“„ {doc['filename']}", expanded=False):
st.write(f"**Type:** {doc['document_type']}")
st.write(f"**Chunks:** {doc['chunks_created']}")
st.write(f"**Added:** {doc['ingestion_timestamp'][:10]}") # Just date
# Main interface with tabs
tab1, tab2, tab3 = st.tabs(["πŸ“€ Upload Documents", "❓ Ask Questions", "πŸ“Š Query History"])
with tab1:
st.header("πŸ“ Document Upload & Ingestion")
# Document type selection
col1, col2 = st.columns([2, 1])
with col1:
uploaded_files = st.file_uploader(
"Choose PDF files to add to your knowledge base",
type=['pdf'],
accept_multiple_files=True,
help="Upload PDFs to build your searchable knowledge base"
)
with col2:
document_type = st.selectbox(
"Document Category:",
["General", "Research Paper", "Manual", "Report", "Book", "Article", "Other"]
)
if uploaded_files:
st.success(f"Ready to process {len(uploaded_files)} file(s)")
# Process files button
if st.button("πŸ”„ Ingest into Knowledge Base", type="primary", disabled=not gemini_api_key):
if not gemini_api_key:
st.error("Please configure Google Gemini API key to proceed")
return
progress_bar = st.progress(0)
ingestion_results = []
for i, uploaded_file in enumerate(uploaded_files):
st.info(f"Processing: {uploaded_file.name}")
try:
# Extract text from PDF
with st.spinner("Extracting text from PDF..."):
text_content = pdf_processor.extract_text(uploaded_file)
if not text_content.strip():
st.warning(f"No text found in {uploaded_file.name}")
continue
# Create metadata
metadata = pdf_processor.create_document_metadata(uploaded_file, document_type)
# Ingest into RAG system
with st.spinner("Creating embeddings and storing in knowledge base..."):
result = rag_system.ingest_document(text_content, metadata)
if result['status'] == 'success':
st.success(f"βœ… {uploaded_file.name} ingested successfully!")
st.info(f"Created {result['chunks_created']} chunks")
ingestion_results.append(result['document_info'])
else:
st.error(f"❌ Error ingesting {uploaded_file.name}: {result['error']}")
except Exception as e:
st.error(f"Error processing {uploaded_file.name}: {str(e)}")
progress_bar.progress((i + 1) / len(uploaded_files))
# Update session state
st.session_state.ingested_documents.extend(ingestion_results)
if ingestion_results:
st.balloons()
st.success("πŸŽ‰ Document ingestion complete! You can now ask questions.")
st.rerun()
with tab2:
st.header("❓ Ask Questions About Your Documents")
if not gemini_api_key:
st.warning("Please configure Google Gemini API key to ask questions")
return
# Check if documents are available
stats = rag_system.get_vector_store_stats()
if stats.get('total_documents', 0) == 0:
st.info("πŸ‘† Upload some PDFs first to build your knowledge base, then come back here to ask questions!")
return
# Query interface
user_question = st.text_area(
"What would you like to know about your documents?",
placeholder="Example: What are the main findings in the research papers? Summarize the key points from the manual. What does the report say about performance metrics?",
height=100
)
col1, col2 = st.columns([1, 4])
with col1:
ask_button = st.button("πŸ” Get Answer", type="primary", disabled=not user_question.strip())
with col2:
include_sources = st.checkbox("Show source references", value=True)
if ask_button and user_question.strip():
with st.spinner("🧠 Thinking... Searching through your documents and generating answer..."):
result = rag_system.query(user_question, return_source_docs=include_sources)
if result['status'] == 'success':
# Display answer
st.markdown("### πŸ€– Answer")
st.markdown(result['answer'])
# Display sources if available
if include_sources and 'sources' in result and result['sources']:
st.markdown("### πŸ“š Sources")
for i, source in enumerate(result['sources'], 1):
with st.expander(f"Source {i}: {source['metadata'].get('filename', 'Unknown')}"):
st.markdown("**Content snippet:**")
st.text(source['content'])
st.markdown("**Metadata:**")
st.json(source['metadata'])
# Save to history
query_record = {
'timestamp': datetime.now().isoformat(),
'question': user_question,
'answer': result['answer'],
'sources_count': len(result.get('sources', []))
}
st.session_state.query_history.append(query_record)
else:
st.error(f"❌ {result['error']}")
with tab3:
st.header("πŸ“Š Query History & Export")
if st.session_state.query_history:
# Display query history
for i, query in enumerate(reversed(st.session_state.query_history), 1):
with st.expander(f"Query {len(st.session_state.query_history) - i + 1}: {query['question'][:100]}..."):
st.markdown(f"**Question:** {query['question']}")
st.markdown(f"**Answer:** {query['answer']}")
st.markdown(f"**Sources Referenced:** {query['sources_count']}")
st.markdown(f"**Asked:** {query['timestamp'][:19]}") # Remove microseconds
# Export functionality
st.markdown("---")
st.subheader("πŸ“€ Export Query History")
col1, col2 = st.columns(2)
with col1:
if st.button("πŸ“„ Export as JSON"):
json_data = json.dumps(st.session_state.query_history, indent=2)
st.download_button(
label="Download JSON History",
data=json_data,
file_name=f"rag_query_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
mime="application/json"
)
with col2:
if st.button("πŸ“Š Export as CSV"):
try:
# Create simplified data for CSV
csv_data = []
for query in st.session_state.query_history:
csv_data.append({
'timestamp': query['timestamp'],
'question': query['question'],
'answer': query['answer'][:500] + '...' if len(query['answer']) > 500 else query['answer'],
'sources_count': query['sources_count']
})
df = pd.DataFrame(csv_data)
csv_string = df.to_csv(index=False)
st.download_button(
label="Download CSV History",
data=csv_string,
file_name=f"rag_query_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
mime="text/csv"
)
except Exception as e:
st.error(f"Error creating CSV: {str(e)}")
else:
st.info("No queries yet. Ask some questions about your documents to build up a history!")
# Footer
st.markdown("---")
st.markdown("**🧠 Powered by Google Gemini 2.5 Flash & LangChain RAG** | Upload PDFs β†’ Ask Questions β†’ Get Intelligent Answers")
if __name__ == "__main__":
main()