import streamlit as st import json import pandas as pd from typing import Dict, List, Any import os from pdf_processor import PDFProcessor from rag_system import RAGSystem from export_utils import ExportUtils from datetime import datetime # Page configuration st.set_page_config( page_title="Agentic PDF RAG System", page_icon="🧠", layout="wide" ) # Initialize session state if 'rag_results' not in st.session_state: st.session_state.rag_results = [] if 'query_history' not in st.session_state: st.session_state.query_history = [] if 'ingested_documents' not in st.session_state: st.session_state.ingested_documents = [] def main(): st.title("🧠 Agentic PDF RAG System") st.markdown("Upload PDFs to build your knowledge base, then ask questions to get AI-powered answers") # Initialize processors pdf_processor = PDFProcessor() rag_system = RAGSystem() export_utils = ExportUtils() # Sidebar for configuration and status with st.sidebar: st.header("⚙️ System Configuration") # API Key status st.subheader("API Status") gemini_api_key = os.getenv("GEMINI_API_KEY") if gemini_api_key: st.success("✅ Google Gemini API key configured") else: st.error("❌ Google Gemini API key not found") st.info("Please set GEMINI_API_KEY environment variable") st.markdown("---") # Knowledge Base Stats st.subheader("📊 Knowledge Base Stats") try: stats = rag_system.get_vector_store_stats() if stats['status'] == 'active': st.metric("Total Documents", stats.get('total_documents', 0)) st.metric("Total Chunks", stats.get('total_chunks', 0)) st.success("Knowledge base is active") elif stats['status'] == 'empty': st.info("Knowledge base is empty") st.metric("Total Documents", 0) st.metric("Total Chunks", 0) else: st.error(f"Error: {stats.get('error', 'Unknown error')}") except Exception as e: st.warning(f"Could not load stats: {str(e)}") st.markdown("---") # Document Management st.subheader("📚 Document Management") # Clear knowledge base button if st.button("🗑️ Clear Knowledge Base", type="secondary"): if st.session_state.get('confirm_clear', False): result = rag_system.clear_knowledge_base() if result['status'] == 'success': st.session_state.ingested_documents = [] st.success("Knowledge base cleared!") st.rerun() else: st.error(f"Error clearing: {result['error']}") st.session_state.confirm_clear = False else: st.session_state.confirm_clear = True st.warning("Click again to confirm clearing all documents") # Document list documents = rag_system.get_document_list() if documents: st.write("**Ingested Documents:**") for doc in documents: with st.expander(f"📄 {doc['filename']}", expanded=False): st.write(f"**Type:** {doc['document_type']}") st.write(f"**Chunks:** {doc['chunks_created']}") st.write(f"**Added:** {doc['ingestion_timestamp'][:10]}") # Just date # Main interface with tabs tab1, tab2, tab3 = st.tabs(["📤 Upload Documents", "❓ Ask Questions", "📊 Query History"]) with tab1: st.header("📁 Document Upload & Ingestion") # Document type selection col1, col2 = st.columns([2, 1]) with col1: uploaded_files = st.file_uploader( "Choose PDF files to add to your knowledge base", type=['pdf'], accept_multiple_files=True, help="Upload PDFs to build your searchable knowledge base" ) with col2: document_type = st.selectbox( "Document Category:", ["General", "Research Paper", "Manual", "Report", "Book", "Article", "Other"] ) if uploaded_files: st.success(f"Ready to process {len(uploaded_files)} file(s)") # Process files button if st.button("🔄 Ingest into Knowledge Base", type="primary", disabled=not gemini_api_key): if not gemini_api_key: st.error("Please configure Google Gemini API key to proceed") return progress_bar = st.progress(0) ingestion_results = [] for i, uploaded_file in enumerate(uploaded_files): st.info(f"Processing: {uploaded_file.name}") try: # Extract text from PDF with st.spinner("Extracting text from PDF..."): text_content = pdf_processor.extract_text(uploaded_file) if not text_content.strip(): st.warning(f"No text found in {uploaded_file.name}") continue # Create metadata metadata = pdf_processor.create_document_metadata(uploaded_file, document_type) # Ingest into RAG system with st.spinner("Creating embeddings and storing in knowledge base..."): result = rag_system.ingest_document(text_content, metadata) if result['status'] == 'success': st.success(f"✅ {uploaded_file.name} ingested successfully!") st.info(f"Created {result['chunks_created']} chunks") ingestion_results.append(result['document_info']) else: st.error(f"❌ Error ingesting {uploaded_file.name}: {result['error']}") except Exception as e: st.error(f"Error processing {uploaded_file.name}: {str(e)}") progress_bar.progress((i + 1) / len(uploaded_files)) # Update session state st.session_state.ingested_documents.extend(ingestion_results) if ingestion_results: st.balloons() st.success("🎉 Document ingestion complete! You can now ask questions.") st.rerun() with tab2: st.header("❓ Ask Questions About Your Documents") if not gemini_api_key: st.warning("Please configure Google Gemini API key to ask questions") return # Check if documents are available stats = rag_system.get_vector_store_stats() if stats.get('total_documents', 0) == 0: st.info("👆 Upload some PDFs first to build your knowledge base, then come back here to ask questions!") return # Query interface user_question = st.text_area( "What would you like to know about your documents?", placeholder="Example: What are the main findings in the research papers? Summarize the key points from the manual. What does the report say about performance metrics?", height=100 ) col1, col2 = st.columns([1, 4]) with col1: ask_button = st.button("🔍 Get Answer", type="primary", disabled=not user_question.strip()) with col2: include_sources = st.checkbox("Show source references", value=True) if ask_button and user_question.strip(): with st.spinner("🧠 Thinking... Searching through your documents and generating answer..."): result = rag_system.query(user_question, return_source_docs=include_sources) if result['status'] == 'success': # Display answer st.markdown("### 🤖 Answer") st.markdown(result['answer']) # Display sources if available if include_sources and 'sources' in result and result['sources']: st.markdown("### 📚 Sources") for i, source in enumerate(result['sources'], 1): with st.expander(f"Source {i}: {source['metadata'].get('filename', 'Unknown')}"): st.markdown("**Content snippet:**") st.text(source['content']) st.markdown("**Metadata:**") st.json(source['metadata']) # Save to history query_record = { 'timestamp': datetime.now().isoformat(), 'question': user_question, 'answer': result['answer'], 'sources_count': len(result.get('sources', [])) } st.session_state.query_history.append(query_record) else: st.error(f"❌ {result['error']}") with tab3: st.header("📊 Query History & Export") if st.session_state.query_history: # Display query history for i, query in enumerate(reversed(st.session_state.query_history), 1): with st.expander(f"Query {len(st.session_state.query_history) - i + 1}: {query['question'][:100]}..."): st.markdown(f"**Question:** {query['question']}") st.markdown(f"**Answer:** {query['answer']}") st.markdown(f"**Sources Referenced:** {query['sources_count']}") st.markdown(f"**Asked:** {query['timestamp'][:19]}") # Remove microseconds # Export functionality st.markdown("---") st.subheader("📤 Export Query History") col1, col2 = st.columns(2) with col1: if st.button("📄 Export as JSON"): json_data = json.dumps(st.session_state.query_history, indent=2) st.download_button( label="Download JSON History", data=json_data, file_name=f"rag_query_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", mime="application/json" ) with col2: if st.button("📊 Export as CSV"): try: # Create simplified data for CSV csv_data = [] for query in st.session_state.query_history: csv_data.append({ 'timestamp': query['timestamp'], 'question': query['question'], 'answer': query['answer'][:500] + '...' if len(query['answer']) > 500 else query['answer'], 'sources_count': query['sources_count'] }) df = pd.DataFrame(csv_data) csv_string = df.to_csv(index=False) st.download_button( label="Download CSV History", data=csv_string, file_name=f"rag_query_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", mime="text/csv" ) except Exception as e: st.error(f"Error creating CSV: {str(e)}") else: st.info("No queries yet. Ask some questions about your documents to build up a history!") # Footer st.markdown("---") st.markdown("**🧠 Powered by Google Gemini 2.5 Flash & LangChain RAG** | Upload PDFs → Ask Questions → Get Intelligent Answers") if __name__ == "__main__": main()