Spaces:
Build error
Build error
| import streamlit as st | |
| import json | |
| import pandas as pd | |
| from typing import Dict, List, Any | |
| import os | |
| from pdf_processor import PDFProcessor | |
| from rag_system import RAGSystem | |
| from export_utils import ExportUtils | |
| from datetime import datetime | |
| # Page configuration | |
| st.set_page_config( | |
| page_title="Agentic PDF RAG System", | |
| page_icon="π§ ", | |
| layout="wide" | |
| ) | |
| # Initialize session state | |
| if 'rag_results' not in st.session_state: | |
| st.session_state.rag_results = [] | |
| if 'query_history' not in st.session_state: | |
| st.session_state.query_history = [] | |
| if 'ingested_documents' not in st.session_state: | |
| st.session_state.ingested_documents = [] | |
| def main(): | |
| st.title("π§ Agentic PDF RAG System") | |
| st.markdown("Upload PDFs to build your knowledge base, then ask questions to get AI-powered answers") | |
| # Initialize processors | |
| pdf_processor = PDFProcessor() | |
| rag_system = RAGSystem() | |
| export_utils = ExportUtils() | |
| # Sidebar for configuration and status | |
| with st.sidebar: | |
| st.header("βοΈ System Configuration") | |
| # API Key status | |
| st.subheader("API Status") | |
| gemini_api_key = os.getenv("GEMINI_API_KEY") | |
| if gemini_api_key: | |
| st.success("β Google Gemini API key configured") | |
| else: | |
| st.error("β Google Gemini API key not found") | |
| st.info("Please set GEMINI_API_KEY environment variable") | |
| st.markdown("---") | |
| # Knowledge Base Stats | |
| st.subheader("π Knowledge Base Stats") | |
| try: | |
| stats = rag_system.get_vector_store_stats() | |
| if stats['status'] == 'active': | |
| st.metric("Total Documents", stats.get('total_documents', 0)) | |
| st.metric("Total Chunks", stats.get('total_chunks', 0)) | |
| st.success("Knowledge base is active") | |
| elif stats['status'] == 'empty': | |
| st.info("Knowledge base is empty") | |
| st.metric("Total Documents", 0) | |
| st.metric("Total Chunks", 0) | |
| else: | |
| st.error(f"Error: {stats.get('error', 'Unknown error')}") | |
| except Exception as e: | |
| st.warning(f"Could not load stats: {str(e)}") | |
| st.markdown("---") | |
| # Document Management | |
| st.subheader("π Document Management") | |
| # Clear knowledge base button | |
| if st.button("ποΈ Clear Knowledge Base", type="secondary"): | |
| if st.session_state.get('confirm_clear', False): | |
| result = rag_system.clear_knowledge_base() | |
| if result['status'] == 'success': | |
| st.session_state.ingested_documents = [] | |
| st.success("Knowledge base cleared!") | |
| st.rerun() | |
| else: | |
| st.error(f"Error clearing: {result['error']}") | |
| st.session_state.confirm_clear = False | |
| else: | |
| st.session_state.confirm_clear = True | |
| st.warning("Click again to confirm clearing all documents") | |
| # Document list | |
| documents = rag_system.get_document_list() | |
| if documents: | |
| st.write("**Ingested Documents:**") | |
| for doc in documents: | |
| with st.expander(f"π {doc['filename']}", expanded=False): | |
| st.write(f"**Type:** {doc['document_type']}") | |
| st.write(f"**Chunks:** {doc['chunks_created']}") | |
| st.write(f"**Added:** {doc['ingestion_timestamp'][:10]}") # Just date | |
| # Main interface with tabs | |
| tab1, tab2, tab3 = st.tabs(["π€ Upload Documents", "β Ask Questions", "π Query History"]) | |
| with tab1: | |
| st.header("π Document Upload & Ingestion") | |
| # Document type selection | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| uploaded_files = st.file_uploader( | |
| "Choose PDF files to add to your knowledge base", | |
| type=['pdf'], | |
| accept_multiple_files=True, | |
| help="Upload PDFs to build your searchable knowledge base" | |
| ) | |
| with col2: | |
| document_type = st.selectbox( | |
| "Document Category:", | |
| ["General", "Research Paper", "Manual", "Report", "Book", "Article", "Other"] | |
| ) | |
| if uploaded_files: | |
| st.success(f"Ready to process {len(uploaded_files)} file(s)") | |
| # Process files button | |
| if st.button("π Ingest into Knowledge Base", type="primary", disabled=not gemini_api_key): | |
| if not gemini_api_key: | |
| st.error("Please configure Google Gemini API key to proceed") | |
| return | |
| progress_bar = st.progress(0) | |
| ingestion_results = [] | |
| for i, uploaded_file in enumerate(uploaded_files): | |
| st.info(f"Processing: {uploaded_file.name}") | |
| try: | |
| # Extract text from PDF | |
| with st.spinner("Extracting text from PDF..."): | |
| text_content = pdf_processor.extract_text(uploaded_file) | |
| if not text_content.strip(): | |
| st.warning(f"No text found in {uploaded_file.name}") | |
| continue | |
| # Create metadata | |
| metadata = pdf_processor.create_document_metadata(uploaded_file, document_type) | |
| # Ingest into RAG system | |
| with st.spinner("Creating embeddings and storing in knowledge base..."): | |
| result = rag_system.ingest_document(text_content, metadata) | |
| if result['status'] == 'success': | |
| st.success(f"β {uploaded_file.name} ingested successfully!") | |
| st.info(f"Created {result['chunks_created']} chunks") | |
| ingestion_results.append(result['document_info']) | |
| else: | |
| st.error(f"β Error ingesting {uploaded_file.name}: {result['error']}") | |
| except Exception as e: | |
| st.error(f"Error processing {uploaded_file.name}: {str(e)}") | |
| progress_bar.progress((i + 1) / len(uploaded_files)) | |
| # Update session state | |
| st.session_state.ingested_documents.extend(ingestion_results) | |
| if ingestion_results: | |
| st.balloons() | |
| st.success("π Document ingestion complete! You can now ask questions.") | |
| st.rerun() | |
| with tab2: | |
| st.header("β Ask Questions About Your Documents") | |
| if not gemini_api_key: | |
| st.warning("Please configure Google Gemini API key to ask questions") | |
| return | |
| # Check if documents are available | |
| stats = rag_system.get_vector_store_stats() | |
| if stats.get('total_documents', 0) == 0: | |
| st.info("π Upload some PDFs first to build your knowledge base, then come back here to ask questions!") | |
| return | |
| # Query interface | |
| user_question = st.text_area( | |
| "What would you like to know about your documents?", | |
| placeholder="Example: What are the main findings in the research papers? Summarize the key points from the manual. What does the report say about performance metrics?", | |
| height=100 | |
| ) | |
| col1, col2 = st.columns([1, 4]) | |
| with col1: | |
| ask_button = st.button("π Get Answer", type="primary", disabled=not user_question.strip()) | |
| with col2: | |
| include_sources = st.checkbox("Show source references", value=True) | |
| if ask_button and user_question.strip(): | |
| with st.spinner("π§ Thinking... Searching through your documents and generating answer..."): | |
| result = rag_system.query(user_question, return_source_docs=include_sources) | |
| if result['status'] == 'success': | |
| # Display answer | |
| st.markdown("### π€ Answer") | |
| st.markdown(result['answer']) | |
| # Display sources if available | |
| if include_sources and 'sources' in result and result['sources']: | |
| st.markdown("### π Sources") | |
| for i, source in enumerate(result['sources'], 1): | |
| with st.expander(f"Source {i}: {source['metadata'].get('filename', 'Unknown')}"): | |
| st.markdown("**Content snippet:**") | |
| st.text(source['content']) | |
| st.markdown("**Metadata:**") | |
| st.json(source['metadata']) | |
| # Save to history | |
| query_record = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'question': user_question, | |
| 'answer': result['answer'], | |
| 'sources_count': len(result.get('sources', [])) | |
| } | |
| st.session_state.query_history.append(query_record) | |
| else: | |
| st.error(f"β {result['error']}") | |
| with tab3: | |
| st.header("π Query History & Export") | |
| if st.session_state.query_history: | |
| # Display query history | |
| for i, query in enumerate(reversed(st.session_state.query_history), 1): | |
| with st.expander(f"Query {len(st.session_state.query_history) - i + 1}: {query['question'][:100]}..."): | |
| st.markdown(f"**Question:** {query['question']}") | |
| st.markdown(f"**Answer:** {query['answer']}") | |
| st.markdown(f"**Sources Referenced:** {query['sources_count']}") | |
| st.markdown(f"**Asked:** {query['timestamp'][:19]}") # Remove microseconds | |
| # Export functionality | |
| st.markdown("---") | |
| st.subheader("π€ Export Query History") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("π Export as JSON"): | |
| json_data = json.dumps(st.session_state.query_history, indent=2) | |
| st.download_button( | |
| label="Download JSON History", | |
| data=json_data, | |
| file_name=f"rag_query_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", | |
| mime="application/json" | |
| ) | |
| with col2: | |
| if st.button("π Export as CSV"): | |
| try: | |
| # Create simplified data for CSV | |
| csv_data = [] | |
| for query in st.session_state.query_history: | |
| csv_data.append({ | |
| 'timestamp': query['timestamp'], | |
| 'question': query['question'], | |
| 'answer': query['answer'][:500] + '...' if len(query['answer']) > 500 else query['answer'], | |
| 'sources_count': query['sources_count'] | |
| }) | |
| df = pd.DataFrame(csv_data) | |
| csv_string = df.to_csv(index=False) | |
| st.download_button( | |
| label="Download CSV History", | |
| data=csv_string, | |
| file_name=f"rag_query_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", | |
| mime="text/csv" | |
| ) | |
| except Exception as e: | |
| st.error(f"Error creating CSV: {str(e)}") | |
| else: | |
| st.info("No queries yet. Ask some questions about your documents to build up a history!") | |
| # Footer | |
| st.markdown("---") | |
| st.markdown("**π§ Powered by Google Gemini 2.5 Flash & LangChain RAG** | Upload PDFs β Ask Questions β Get Intelligent Answers") | |
| if __name__ == "__main__": | |
| main() |