Spaces:
Sleeping
Sleeping
| # ============================================================================== | |
| # Personal Knowledge Navigator - No Cache Version | |
| # ============================================================================== | |
| # This Streamlit application loads a pre-built knowledge base and allows users | |
| # to query it without any caching mechanisms for maximum compatibility. | |
| import streamlit as st | |
| import faiss | |
| import numpy as np | |
| import pickle | |
| import os | |
| from typing import List, Optional, Tuple | |
| import json | |
| from datetime import datetime | |
| # Simple imports without cache configuration | |
| from sentence_transformers import SentenceTransformer | |
| import google.generativeai as genai | |
| # --- Page Configuration --- | |
| st.set_page_config( | |
| page_title="π§ Knowledge Navigator", | |
| page_icon="π§ ", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # --- Custom CSS for Aesthetics --- | |
| st.markdown(""" | |
| <style> | |
| .main-header { | |
| background: linear-gradient(90deg, #667eea 0%, #764ba2 100%); | |
| padding: 2rem; | |
| border-radius: 10px; | |
| text-align: center; | |
| color: white; | |
| margin-bottom: 2rem; | |
| box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
| } | |
| .knowledge-card { | |
| background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); | |
| padding: 1.5rem; | |
| border-radius: 10px; | |
| border-left: 5px solid #667eea; | |
| margin: 1rem 0; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
| } | |
| .answer-box { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| color: white; | |
| padding: 1.5rem; | |
| border-radius: 10px; | |
| margin: 1rem 0; | |
| box-shadow: 0 4px 8px rgba(0,0,0,0.15); | |
| } | |
| .source-box { | |
| background: #f8f9fa; | |
| border: 1px solid #e9ecef; | |
| border-radius: 8px; | |
| padding: 1rem; | |
| margin: 0.5rem 0; | |
| border-left: 4px solid #28a745; | |
| } | |
| .upload-zone { | |
| border: 2px dashed #667eea; | |
| border-radius: 10px; | |
| padding: 2rem; | |
| text-align: center; | |
| background: linear-gradient(135deg, #f8f9ff 0%, #e6f3ff 100%); | |
| margin: 1rem 0; | |
| } | |
| .stats-container { | |
| display: flex; | |
| justify-content: space-around; | |
| margin: 1rem 0; | |
| } | |
| .stat-box { | |
| background: white; | |
| padding: 1rem; | |
| border-radius: 8px; | |
| text-align: center; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
| border-top: 3px solid #667eea; | |
| min-width: 120px; | |
| } | |
| .chat-container { | |
| background: white; | |
| border-radius: 10px; | |
| padding: 1.5rem; | |
| box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
| margin: 1rem 0; | |
| } | |
| .sidebar-info { | |
| background: linear-gradient(135deg, #ffecd2 0%, #fcb69f 100%); | |
| padding: 1rem; | |
| border-radius: 8px; | |
| margin: 1rem 0; | |
| } | |
| .error-box { | |
| background: #f8d7da; | |
| color: #721c24; | |
| padding: 1rem; | |
| border-radius: 8px; | |
| border-left: 4px solid #dc3545; | |
| margin: 1rem 0; | |
| } | |
| .success-box { | |
| background: #d4edda; | |
| color: #155724; | |
| padding: 1rem; | |
| border-radius: 8px; | |
| border-left: 4px solid #28a745; | |
| margin: 1rem 0; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # --- Constants --- | |
| DEFAULT_MODEL = 'all-MiniLM-L6-v2' | |
| KNOWLEDGE_BASE_DIR = 'knowledge_base' | |
| INDEX_FILE = 'faiss_index.index' | |
| CHUNKS_FILE = 'text_chunks.pkl' | |
| METADATA_FILE = 'metadata.json' | |
| TOP_K_DEFAULT = 5 | |
| # --- Session State Initialization --- | |
| def init_session_state(): | |
| """Initialize session state variables.""" | |
| if 'model_loaded' not in st.session_state: | |
| st.session_state.model_loaded = False | |
| if 'model' not in st.session_state: | |
| st.session_state.model = None | |
| if 'knowledge_base_loaded' not in st.session_state: | |
| st.session_state.knowledge_base_loaded = False | |
| if 'index' not in st.session_state: | |
| st.session_state.index = None | |
| if 'text_chunks' not in st.session_state: | |
| st.session_state.text_chunks = None | |
| if 'metadata' not in st.session_state: | |
| st.session_state.metadata = {} | |
| # --- Helper Functions --- | |
| def load_embedding_model(): | |
| """Load the sentence transformer model without caching.""" | |
| if st.session_state.model_loaded and st.session_state.model is not None: | |
| return st.session_state.model | |
| try: | |
| with st.spinner("π€ Loading AI model (this may take a moment)..."): | |
| model = SentenceTransformer(DEFAULT_MODEL) | |
| st.session_state.model = model | |
| st.session_state.model_loaded = True | |
| return model | |
| except Exception as e: | |
| st.error(f"β Failed to load embedding model: {e}") | |
| st.session_state.model_loaded = False | |
| return None | |
| def load_knowledge_base(): | |
| """Load the pre-built knowledge base from files.""" | |
| if st.session_state.knowledge_base_loaded: | |
| return st.session_state.index, st.session_state.text_chunks, st.session_state.metadata | |
| try: | |
| index_path = os.path.join(KNOWLEDGE_BASE_DIR, INDEX_FILE) | |
| chunks_path = os.path.join(KNOWLEDGE_BASE_DIR, CHUNKS_FILE) | |
| metadata_path = os.path.join(KNOWLEDGE_BASE_DIR, METADATA_FILE) | |
| if not all(os.path.exists(p) for p in [index_path, chunks_path]): | |
| return None, None, {} | |
| with st.spinner("π Loading knowledge base..."): | |
| # Load FAISS index | |
| index = faiss.read_index(index_path) | |
| # Load text chunks | |
| with open(chunks_path, 'rb') as f: | |
| text_chunks = pickle.load(f) | |
| # Load metadata if available | |
| metadata = {} | |
| if os.path.exists(metadata_path): | |
| with open(metadata_path, 'r') as f: | |
| metadata = json.load(f) | |
| # Store in session state | |
| st.session_state.index = index | |
| st.session_state.text_chunks = text_chunks | |
| st.session_state.metadata = metadata | |
| st.session_state.knowledge_base_loaded = True | |
| return index, text_chunks, metadata | |
| except Exception as e: | |
| st.error(f"β Error loading knowledge base: {e}") | |
| return None, None, {} | |
| def save_uploaded_knowledge_base(index_file, chunks_file, metadata_file=None): | |
| """Save uploaded knowledge base files to the repository structure.""" | |
| try: | |
| os.makedirs(KNOWLEDGE_BASE_DIR, exist_ok=True) | |
| # Save index file | |
| if index_file: | |
| index_bytes = index_file.read() | |
| with open(os.path.join(KNOWLEDGE_BASE_DIR, INDEX_FILE), 'wb') as f: | |
| f.write(index_bytes) | |
| # Save chunks file | |
| if chunks_file: | |
| chunks_bytes = chunks_file.read() | |
| with open(os.path.join(KNOWLEDGE_BASE_DIR, CHUNKS_FILE), 'wb') as f: | |
| f.write(chunks_bytes) | |
| # Save metadata file | |
| if metadata_file: | |
| metadata_bytes = metadata_file.read() | |
| with open(os.path.join(KNOWLEDGE_BASE_DIR, METADATA_FILE), 'wb') as f: | |
| f.write(metadata_bytes) | |
| # Reset session state to reload new knowledge base | |
| st.session_state.knowledge_base_loaded = False | |
| st.session_state.index = None | |
| st.session_state.text_chunks = None | |
| st.session_state.metadata = {} | |
| return True | |
| except Exception as e: | |
| st.error(f"β Error saving knowledge base: {e}") | |
| return False | |
| def search_knowledge_base(query: str, model: SentenceTransformer, | |
| index: faiss.Index, text_chunks: List[str], | |
| k: int = TOP_K_DEFAULT) -> Tuple[List[str], List[float]]: | |
| """Search the knowledge base and return relevant chunks with scores.""" | |
| try: | |
| query_embedding = model.encode([query]) | |
| query_embedding = np.array(query_embedding).astype('float32') | |
| faiss.normalize_L2(query_embedding) | |
| scores, indices = index.search(query_embedding, min(k, len(text_chunks))) | |
| retrieved_chunks = [] | |
| chunk_scores = [] | |
| for score, idx in zip(scores[0], indices[0]): | |
| if idx < len(text_chunks): | |
| retrieved_chunks.append(text_chunks[idx]) | |
| chunk_scores.append(float(score)) | |
| return retrieved_chunks, chunk_scores | |
| except Exception as e: | |
| st.error(f"β Search error: {e}") | |
| return [], [] | |
| def generate_answer(question: str, context: str, api_key: str) -> str: | |
| """Generate answer using Gemini API.""" | |
| try: | |
| genai.configure(api_key=api_key) | |
| prompt = f""" | |
| You are an intelligent assistant with access to a curated knowledge base. | |
| Answer the question based ONLY on the provided context. Be comprehensive yet concise. | |
| If the answer isn't in the context, say "I couldn't find that information in the knowledge base." | |
| CONTEXT: | |
| {context} | |
| QUESTION: {question} | |
| ANSWER: | |
| """ | |
| model = genai.GenerativeModel('gemini-pro') | |
| response = model.generate_content(prompt) | |
| return response.text | |
| except Exception as e: | |
| return f"β Error generating answer: {str(e)}" | |
| # --- Main Application --- | |
| def main(): | |
| # Initialize session state | |
| init_session_state() | |
| # Header | |
| st.markdown(""" | |
| <div class="main-header"> | |
| <h1>π§ Personal Knowledge Navigator</h1> | |
| <p>Your AI-powered document search and Q&A assistant</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Load models and knowledge base | |
| model = load_embedding_model() | |
| index, text_chunks, metadata = load_knowledge_base() | |
| # Sidebar Configuration | |
| with st.sidebar: | |
| st.markdown(""" | |
| <div class="sidebar-info"> | |
| <h3>π§ Configuration</h3> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # API Key Input | |
| api_key = st.text_input( | |
| "π Google Gemini API Key", | |
| type="password", | |
| help="Get your free API key from Google AI Studio" | |
| ) | |
| if api_key: | |
| st.markdown('<div class="success-box">β API Key configured!</div>', unsafe_allow_html=True) | |
| st.divider() | |
| # Model Status | |
| st.markdown("### π€ AI Model Status") | |
| if st.session_state.model_loaded: | |
| st.markdown('<div class="success-box">β Model loaded and ready!</div>', unsafe_allow_html=True) | |
| else: | |
| st.markdown('<div class="error-box">β οΈ Model not loaded</div>', unsafe_allow_html=True) | |
| if st.button("π Load Model"): | |
| load_embedding_model() | |
| st.rerun() | |
| st.divider() | |
| # Knowledge Base Status | |
| st.markdown("### π Knowledge Base Status") | |
| if index is not None and text_chunks is not None: | |
| st.markdown('<div class="success-box">β Knowledge base loaded!</div>', unsafe_allow_html=True) | |
| # Display metadata if available | |
| if metadata: | |
| with st.expander("π Knowledge Base Info"): | |
| st.json(metadata) | |
| # Stats | |
| st.markdown(f""" | |
| <div class="knowledge-card"> | |
| <div class="stats-container"> | |
| <div class="stat-box"> | |
| <h4>{len(text_chunks)}</h4> | |
| <p>Text Chunks</p> | |
| </div> | |
| <div class="stat-box"> | |
| <h4>{index.ntotal}</h4> | |
| <p>Vectors</p> | |
| </div> | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| else: | |
| st.markdown('<div class="error-box">β οΈ No knowledge base found</div>', unsafe_allow_html=True) | |
| st.info("π Upload your knowledge base files in the Upload tab") | |
| # Search Settings | |
| st.markdown("### βοΈ Search Settings") | |
| top_k = st.slider("Number of results", 3, 10, TOP_K_DEFAULT) | |
| show_scores = st.checkbox("Show relevance scores", True) | |
| show_sources = st.checkbox("Show source texts", True) | |
| st.divider() | |
| # Quick Actions | |
| if st.button("π Refresh All"): | |
| # Reset all session state | |
| for key in list(st.session_state.keys()): | |
| del st.session_state[key] | |
| st.rerun() | |
| # Main Content Tabs | |
| tab1, tab2 = st.tabs(["π¬ Ask Questions", "π€ Upload Knowledge Base"]) | |
| with tab1: | |
| if index is None or text_chunks is None: | |
| st.markdown(""" | |
| <div class="upload-zone"> | |
| <h3>π No Knowledge Base Found</h3> | |
| <p>Please upload your knowledge base files in the "Upload Knowledge Base" tab</p> | |
| <p>Or create one using our Google Colab notebook</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| return | |
| if model is None: | |
| st.markdown(""" | |
| <div class="error-box"> | |
| <h4>β AI Model Not Ready</h4> | |
| <p>Please wait for the model to load or click "Load Model" in the sidebar</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| return | |
| st.markdown(""" | |
| <div class="chat-container"> | |
| <h3>π€ Ask me anything about your documents!</h3> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Question input | |
| question = st.text_input( | |
| "Your question:", | |
| placeholder="What would you like to know?", | |
| key="question_input" | |
| ) | |
| # Search button | |
| col1, col2, col3 = st.columns([2, 1, 2]) | |
| with col2: | |
| search_clicked = st.button("π Search", type="primary", use_container_width=True) | |
| if search_clicked and question: | |
| if not api_key: | |
| st.warning("β οΈ Please enter your Gemini API Key in the sidebar") | |
| return | |
| with st.spinner("π Searching knowledge base..."): | |
| retrieved_chunks, scores = search_knowledge_base( | |
| question, model, index, text_chunks, top_k | |
| ) | |
| if not retrieved_chunks: | |
| st.warning("β No relevant information found") | |
| return | |
| # Generate answer | |
| with st.spinner("π€ Generating answer..."): | |
| context = "\n\n---\n\n".join(retrieved_chunks) | |
| answer = generate_answer(question, context, api_key) | |
| # Display answer | |
| st.markdown(f""" | |
| <div class="answer-box"> | |
| <h4>π― Answer:</h4> | |
| <p>{answer}</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Display sources | |
| if show_sources: | |
| with st.expander(f"π Sources ({len(retrieved_chunks)} found)", expanded=True): | |
| for i, (chunk, score) in enumerate(zip(retrieved_chunks, scores)): | |
| score_text = f" (Score: {score:.3f})" if show_scores else "" | |
| st.markdown(f""" | |
| <div class="source-box"> | |
| <h5>π Source {i+1}{score_text}</h5> | |
| <p>{chunk[:400]}{'...' if len(chunk) > 400 else ''}</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Sample questions | |
| if metadata and 'sample_questions' in metadata: | |
| st.markdown("### π‘ Try these sample questions:") | |
| cols = st.columns(min(3, len(metadata['sample_questions']))) | |
| for i, sample_q in enumerate(metadata['sample_questions'][:3]): | |
| with cols[i % 3]: | |
| if st.button(f"π {sample_q[:30]}...", key=f"sample_{i}"): | |
| st.session_state.question_input = sample_q | |
| st.rerun() | |
| with tab2: | |
| st.markdown(""" | |
| <div class="upload-zone"> | |
| <h3>π€ Upload Your Knowledge Base</h3> | |
| <p>Upload the files generated from your Google Colab notebook</p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| st.info(""" | |
| **Required files:** | |
| - `faiss_index.index` - The FAISS vector index | |
| - `text_chunks.pkl` - The processed text chunks | |
| - `metadata.json` - Optional metadata about your knowledge base | |
| """) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| index_file = st.file_uploader( | |
| "π FAISS Index File", | |
| type=['index'], | |
| help="Upload the faiss_index.index file" | |
| ) | |
| with col2: | |
| chunks_file = st.file_uploader( | |
| "π Text Chunks File", | |
| type=['pkl'], | |
| help="Upload the text_chunks.pkl file" | |
| ) | |
| metadata_file = st.file_uploader( | |
| "π Metadata File (Optional)", | |
| type=['json'], | |
| help="Upload the metadata.json file if available" | |
| ) | |
| if st.button("πΎ Save Knowledge Base", type="primary"): | |
| if not index_file or not chunks_file: | |
| st.error("β Please upload both the index and chunks files") | |
| return | |
| with st.spinner("πΎ Saving knowledge base..."): | |
| success = save_uploaded_knowledge_base(index_file, chunks_file, metadata_file) | |
| if success: | |
| st.success("β Knowledge base saved successfully!") | |
| st.balloons() | |
| st.info("π Please refresh the page to load the new knowledge base!") | |
| else: | |
| st.error("β Failed to save knowledge base") | |
| # Instructions | |
| with st.expander("π How to create a knowledge base"): | |
| st.markdown(""" | |
| **Step 1:** Use our Google Colab notebook to process your documents | |
| **Step 2:** The notebook will generate these files: | |
| - `faiss_index.index` - Vector search index | |
| - `text_chunks.pkl` - Processed text chunks | |
| - `metadata.json` - Information about your knowledge base | |
| **Step 3:** Upload these files using the form above | |
| **Step 4:** Refresh the page and start asking questions! | |
| [π Download Colab Template](https://colab.research.google.com/) | |
| """) | |
| if __name__ == "__main__": | |
| main() |