Spaces:
Build error
Build error
File size: 12,572 Bytes
06c5826 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 | import streamlit as st
import json
import pandas as pd
from typing import Dict, List, Any
import os
from pdf_processor import PDFProcessor
from rag_system import RAGSystem
from export_utils import ExportUtils
from datetime import datetime
# Page configuration
st.set_page_config(
page_title="Agentic PDF RAG System",
page_icon="π§ ",
layout="wide"
)
# Initialize session state
if 'rag_results' not in st.session_state:
st.session_state.rag_results = []
if 'query_history' not in st.session_state:
st.session_state.query_history = []
if 'ingested_documents' not in st.session_state:
st.session_state.ingested_documents = []
def main():
st.title("π§ Agentic PDF RAG System")
st.markdown("Upload PDFs to build your knowledge base, then ask questions to get AI-powered answers")
# Initialize processors
pdf_processor = PDFProcessor()
rag_system = RAGSystem()
export_utils = ExportUtils()
# Sidebar for configuration and status
with st.sidebar:
st.header("βοΈ System Configuration")
# API Key status
st.subheader("API Status")
gemini_api_key = os.getenv("GEMINI_API_KEY")
if gemini_api_key:
st.success("β
Google Gemini API key configured")
else:
st.error("β Google Gemini API key not found")
st.info("Please set GEMINI_API_KEY environment variable")
st.markdown("---")
# Knowledge Base Stats
st.subheader("π Knowledge Base Stats")
try:
stats = rag_system.get_vector_store_stats()
if stats['status'] == 'active':
st.metric("Total Documents", stats.get('total_documents', 0))
st.metric("Total Chunks", stats.get('total_chunks', 0))
st.success("Knowledge base is active")
elif stats['status'] == 'empty':
st.info("Knowledge base is empty")
st.metric("Total Documents", 0)
st.metric("Total Chunks", 0)
else:
st.error(f"Error: {stats.get('error', 'Unknown error')}")
except Exception as e:
st.warning(f"Could not load stats: {str(e)}")
st.markdown("---")
# Document Management
st.subheader("π Document Management")
# Clear knowledge base button
if st.button("ποΈ Clear Knowledge Base", type="secondary"):
if st.session_state.get('confirm_clear', False):
result = rag_system.clear_knowledge_base()
if result['status'] == 'success':
st.session_state.ingested_documents = []
st.success("Knowledge base cleared!")
st.rerun()
else:
st.error(f"Error clearing: {result['error']}")
st.session_state.confirm_clear = False
else:
st.session_state.confirm_clear = True
st.warning("Click again to confirm clearing all documents")
# Document list
documents = rag_system.get_document_list()
if documents:
st.write("**Ingested Documents:**")
for doc in documents:
with st.expander(f"π {doc['filename']}", expanded=False):
st.write(f"**Type:** {doc['document_type']}")
st.write(f"**Chunks:** {doc['chunks_created']}")
st.write(f"**Added:** {doc['ingestion_timestamp'][:10]}") # Just date
# Main interface with tabs
tab1, tab2, tab3 = st.tabs(["π€ Upload Documents", "β Ask Questions", "π Query History"])
with tab1:
st.header("π Document Upload & Ingestion")
# Document type selection
col1, col2 = st.columns([2, 1])
with col1:
uploaded_files = st.file_uploader(
"Choose PDF files to add to your knowledge base",
type=['pdf'],
accept_multiple_files=True,
help="Upload PDFs to build your searchable knowledge base"
)
with col2:
document_type = st.selectbox(
"Document Category:",
["General", "Research Paper", "Manual", "Report", "Book", "Article", "Other"]
)
if uploaded_files:
st.success(f"Ready to process {len(uploaded_files)} file(s)")
# Process files button
if st.button("π Ingest into Knowledge Base", type="primary", disabled=not gemini_api_key):
if not gemini_api_key:
st.error("Please configure Google Gemini API key to proceed")
return
progress_bar = st.progress(0)
ingestion_results = []
for i, uploaded_file in enumerate(uploaded_files):
st.info(f"Processing: {uploaded_file.name}")
try:
# Extract text from PDF
with st.spinner("Extracting text from PDF..."):
text_content = pdf_processor.extract_text(uploaded_file)
if not text_content.strip():
st.warning(f"No text found in {uploaded_file.name}")
continue
# Create metadata
metadata = pdf_processor.create_document_metadata(uploaded_file, document_type)
# Ingest into RAG system
with st.spinner("Creating embeddings and storing in knowledge base..."):
result = rag_system.ingest_document(text_content, metadata)
if result['status'] == 'success':
st.success(f"β
{uploaded_file.name} ingested successfully!")
st.info(f"Created {result['chunks_created']} chunks")
ingestion_results.append(result['document_info'])
else:
st.error(f"β Error ingesting {uploaded_file.name}: {result['error']}")
except Exception as e:
st.error(f"Error processing {uploaded_file.name}: {str(e)}")
progress_bar.progress((i + 1) / len(uploaded_files))
# Update session state
st.session_state.ingested_documents.extend(ingestion_results)
if ingestion_results:
st.balloons()
st.success("π Document ingestion complete! You can now ask questions.")
st.rerun()
with tab2:
st.header("β Ask Questions About Your Documents")
if not gemini_api_key:
st.warning("Please configure Google Gemini API key to ask questions")
return
# Check if documents are available
stats = rag_system.get_vector_store_stats()
if stats.get('total_documents', 0) == 0:
st.info("π Upload some PDFs first to build your knowledge base, then come back here to ask questions!")
return
# Query interface
user_question = st.text_area(
"What would you like to know about your documents?",
placeholder="Example: What are the main findings in the research papers? Summarize the key points from the manual. What does the report say about performance metrics?",
height=100
)
col1, col2 = st.columns([1, 4])
with col1:
ask_button = st.button("π Get Answer", type="primary", disabled=not user_question.strip())
with col2:
include_sources = st.checkbox("Show source references", value=True)
if ask_button and user_question.strip():
with st.spinner("π§ Thinking... Searching through your documents and generating answer..."):
result = rag_system.query(user_question, return_source_docs=include_sources)
if result['status'] == 'success':
# Display answer
st.markdown("### π€ Answer")
st.markdown(result['answer'])
# Display sources if available
if include_sources and 'sources' in result and result['sources']:
st.markdown("### π Sources")
for i, source in enumerate(result['sources'], 1):
with st.expander(f"Source {i}: {source['metadata'].get('filename', 'Unknown')}"):
st.markdown("**Content snippet:**")
st.text(source['content'])
st.markdown("**Metadata:**")
st.json(source['metadata'])
# Save to history
query_record = {
'timestamp': datetime.now().isoformat(),
'question': user_question,
'answer': result['answer'],
'sources_count': len(result.get('sources', []))
}
st.session_state.query_history.append(query_record)
else:
st.error(f"β {result['error']}")
with tab3:
st.header("π Query History & Export")
if st.session_state.query_history:
# Display query history
for i, query in enumerate(reversed(st.session_state.query_history), 1):
with st.expander(f"Query {len(st.session_state.query_history) - i + 1}: {query['question'][:100]}..."):
st.markdown(f"**Question:** {query['question']}")
st.markdown(f"**Answer:** {query['answer']}")
st.markdown(f"**Sources Referenced:** {query['sources_count']}")
st.markdown(f"**Asked:** {query['timestamp'][:19]}") # Remove microseconds
# Export functionality
st.markdown("---")
st.subheader("π€ Export Query History")
col1, col2 = st.columns(2)
with col1:
if st.button("π Export as JSON"):
json_data = json.dumps(st.session_state.query_history, indent=2)
st.download_button(
label="Download JSON History",
data=json_data,
file_name=f"rag_query_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
mime="application/json"
)
with col2:
if st.button("π Export as CSV"):
try:
# Create simplified data for CSV
csv_data = []
for query in st.session_state.query_history:
csv_data.append({
'timestamp': query['timestamp'],
'question': query['question'],
'answer': query['answer'][:500] + '...' if len(query['answer']) > 500 else query['answer'],
'sources_count': query['sources_count']
})
df = pd.DataFrame(csv_data)
csv_string = df.to_csv(index=False)
st.download_button(
label="Download CSV History",
data=csv_string,
file_name=f"rag_query_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
mime="text/csv"
)
except Exception as e:
st.error(f"Error creating CSV: {str(e)}")
else:
st.info("No queries yet. Ask some questions about your documents to build up a history!")
# Footer
st.markdown("---")
st.markdown("**π§ Powered by Google Gemini 2.5 Flash & LangChain RAG** | Upload PDFs β Ask Questions β Get Intelligent Answers")
if __name__ == "__main__":
main() |