Spaces:
Sleeping
Sleeping
| # =============================================== | |
| # SCHOLAR LENS - RAG STREAMLIT APP | |
| # =============================================== | |
| # =============================================== | |
| # IMPORTS & CONFIGURATION | |
| # =============================================== | |
| import streamlit as st # Main web app framework | |
| import os # For environment variables | |
| import pypdf # For PDF text extraction | |
| import numpy as np # For numerical operations | |
| import chromadb # Vector database for storing embeddings | |
| from sentence_transformers import SentenceTransformer # For creating text embeddings | |
| import google.generativeai as genai # For Gemini LLM API | |
| from typing import List, Dict, Any, Optional # Type hints for better code clarity | |
| import re # For text processing | |
| # EXPLANATION | |
| # This section imports all the libraries we need. Streamlit creates our web interface, | |
| # pypdf reads PDF files, sentence-transformers creates embeddings (numerical representations of text), | |
| # ChromaDB stores and searches these embeddings, and google.generativeai connects to Gemini AI. | |
| # =============================================== | |
| # CONFIGURABLE CONSTANTS | |
| # =============================================== | |
| SIMILARITY_THRESHOLD = 0.25 # Minimum similarity score to consider a chunk relevant | |
| TOP_K_CHUNKS = 3 # Number of most relevant chunks to retrieve | |
| CHUNK_SIZE = 300 # Target number of words per text chunk | |
| EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" # Free embedding model | |
| # EXPLANATION | |
| # These are settings you can easily change. SIMILARITY_THRESHOLD controls how relevant | |
| # a piece of text must be to your question. Lower values (like 0.1) are more lenient, | |
| # higher values (like 0.5) are stricter. TOP_K_CHUNKS is how many pieces of text | |
| # the app will consider when answering your question. | |
| # =============================================== | |
| # PDF EXTRACTION FUNCTION | |
| # =============================================== | |
| def extract_text_from_pdf(pdf_file) -> Dict[str, Any]: | |
| """Extract text from uploaded PDF file with page numbers.""" | |
| try: | |
| pdf_reader = pypdf.PdfReader(pdf_file) # Create PDF reader object | |
| pages_text = [] # List to store text from each page | |
| for page_num, page in enumerate(pdf_reader.pages): # Loop through each page | |
| page_text = page.extract_text() # Extract text from current page | |
| if page_text.strip(): # Only add non-empty pages | |
| pages_text.append({ | |
| 'page_number': page_num + 1, # Page numbers start from 1 | |
| 'text': page_text.strip() # Remove extra whitespace | |
| }) | |
| return { | |
| 'success': True, # Indicate successful extraction | |
| 'pages': pages_text, # List of page dictionaries | |
| 'total_pages': len(pages_text) # Total number of pages processed | |
| } | |
| except Exception as e: # Handle any errors during PDF processing | |
| return { | |
| 'success': False, # Indicate failure | |
| 'error': str(e) # Store error message | |
| } | |
| # EXPLANATION | |
| # This function takes a PDF file and converts it into text that our program can understand. | |
| # It goes through each page one by one, extracts the text, and remembers which page | |
| # each piece of text came from. This is important for citing sources later. | |
| # =============================================== | |
| # CHUNKING FUNCTION | |
| # =============================================== | |
| def create_chunks(pages_text: List[Dict]) -> List[Dict]: | |
| """Split text into smaller chunks while preserving page information.""" | |
| chunks = [] # List to store all text chunks | |
| chunk_id = 0 # Unique identifier for each chunk | |
| for page_data in pages_text: # Process each page | |
| page_num = page_data['page_number'] # Get page number | |
| text = page_data['text'] # Get page text | |
| words = text.split() # Split text into individual words | |
| # Create chunks of approximately CHUNK_SIZE words | |
| for i in range(0, len(words), CHUNK_SIZE): # Step through words in chunks | |
| chunk_words = words[i:i + CHUNK_SIZE] # Get next group of words | |
| chunk_text = ' '.join(chunk_words) # Join words back into text | |
| if len(chunk_words) > 20: # Only keep substantial chunks (more than 20 words) | |
| chunks.append({ | |
| 'id': chunk_id, # Unique chunk identifier | |
| 'text': chunk_text, # The actual text content | |
| 'page_number': page_num, # Which page this came from | |
| 'word_count': len(chunk_words) # How many words in this chunk | |
| }) | |
| chunk_id += 1 # Increment for next chunk | |
| return chunks # Return list of all chunks | |
| # EXPLANATION | |
| # This function breaks down long pages of text into smaller, manageable pieces called chunks. | |
| # Think of it like cutting a long article into paragraphs. Each chunk remembers which page | |
| # it came from. This helps the AI find relevant information more accurately and cite sources properly. | |
| # =============================================== | |
| # EMBEDDING LOADING FUNCTION | |
| # =============================================== | |
| # Cache the model so it only loads once | |
| def load_embedding_model(): | |
| """Load the sentence transformer model for creating embeddings.""" | |
| try: | |
| model = SentenceTransformer(EMBEDDING_MODEL) # Load the embedding model | |
| return model # Return loaded model | |
| except Exception as e: # Handle loading errors | |
| st.error(f"Failed to load embedding model: {e}") # Show error to user | |
| return None # Return None to indicate failure | |
| # EXPLANATION | |
| # This function loads the AI model that converts text into numbers (embeddings). | |
| # These numbers capture the meaning of the text, allowing the computer to understand | |
| # which pieces of text are similar to your question. The @st.cache_resource decorator | |
| # ensures this model only loads once, making the app faster. | |
| # =============================================== | |
| # VECTOR DATABASE CREATION & QUERY FUNCTIONS | |
| # =============================================== | |
| def create_vector_database(chunks: List[Dict], embedding_model) -> Optional[Any]: | |
| """Create ChromaDB vector database with embeddings.""" | |
| try: | |
| client = chromadb.Client() # Create ChromaDB client | |
| collection = client.create_collection("pdf_chunks") # Create collection for our chunks | |
| texts = [chunk['text'] for chunk in chunks] # Extract text from each chunk | |
| embeddings = embedding_model.encode(texts).tolist() # Convert texts to embeddings | |
| # Add chunks to database with embeddings and metadata | |
| collection.add( | |
| embeddings=embeddings, # The numerical representations | |
| documents=texts, # The actual text content | |
| metadatas=[{ # Additional information about each chunk | |
| 'page_number': chunk['page_number'], | |
| 'chunk_id': chunk['id'], | |
| 'word_count': chunk['word_count'] | |
| } for chunk in chunks], | |
| ids=[str(chunk['id']) for chunk in chunks] # Unique identifiers | |
| ) | |
| return collection # Return the created database | |
| except Exception as e: # Handle database creation errors | |
| st.error(f"Failed to create vector database: {e}") # Show error to user | |
| return None # Return None to indicate failure | |
| def query_vector_database(collection, query: str, embedding_model, k: int = TOP_K_CHUNKS) -> List[Dict]: | |
| """Query the vector database for relevant chunks.""" | |
| try: | |
| query_embedding = embedding_model.encode([query]).tolist() # Convert question to embedding | |
| results = collection.query( # Search the database | |
| query_embeddings=query_embedding, | |
| n_results=k # Get top k most similar chunks | |
| ) | |
| relevant_chunks = [] # List to store results | |
| # Process each result | |
| for i in range(len(results['documents'][0])): # Loop through returned documents | |
| distance = results['distances'][0][i] # Get similarity distance | |
| similarity = max(0, 1 - distance) # Convert distance to similarity score | |
| # Only include chunks that meet our similarity threshold | |
| if similarity >= SIMILARITY_THRESHOLD: | |
| relevant_chunks.append({ | |
| 'text': results['documents'][0][i], # The chunk text | |
| 'page_number': results['metadatas'][0][i]['page_number'], # Source page | |
| 'similarity': similarity, # How relevant this chunk is | |
| 'chunk_id': results['metadatas'][0][i]['chunk_id'] # Unique identifier | |
| }) | |
| return relevant_chunks # Return list of relevant chunks | |
| except Exception as e: # Handle query errors | |
| st.error(f"Failed to query database: {e}") # Show error to user | |
| return [] # Return empty list | |
| # EXPLANATION | |
| # These functions create and search our vector database. The database stores the meaning | |
| # of each text chunk as numbers. When you ask a question, it converts your question to | |
| # numbers and finds chunks with similar numbers (similar meaning). The similarity threshold | |
| # determines how closely related the text must be to your question. | |
| # =============================================== | |
| # LLM WRAPPER FOR GEMINI | |
| # =============================================== | |
| def setup_gemini() -> bool: | |
| """Configure Gemini API if key is available.""" | |
| try: | |
| # Try to get API key from Streamlit secrets first | |
| api_key = st.secrets.get("GEMINI_API_KEY") # Check secrets | |
| if not api_key: # If not in secrets, try environment variable | |
| api_key = os.getenv("GEMINI_API_KEY") # Check environment | |
| if api_key: # If we found an API key | |
| genai.configure(api_key=api_key) # Configure Gemini with the key | |
| return True # Indicate success | |
| else: | |
| return False # No API key found | |
| except Exception as e: # Handle setup errors | |
| st.error(f"Failed to setup Gemini: {e}") # Show error to user | |
| return False # Indicate failure | |
| def generate_answer_with_gemini(query: str, relevant_chunks: List[Dict]) -> str: | |
| """Generate answer using Gemini with retrieved chunks as context.""" | |
| try: | |
| # Create context from relevant chunks | |
| context_parts = [] # List to build context | |
| for chunk in relevant_chunks: # Add each relevant chunk to context | |
| context_parts.append(f"[Page {chunk['page_number']}]: {chunk['text']}") | |
| context = "\n\n".join(context_parts) # Join all context parts | |
| # Create prompt for Gemini | |
| prompt = f"""Based ONLY on the following context from a PDF document, answer the user's question. | |
| Context: | |
| {context} | |
| Question: {query} | |
| Instructions: | |
| - Answer using ONLY the information provided in the context above | |
| - If the context does not contain enough information to answer the question, reply exactly: β Insufficient evidence | |
| - Always include page citations in your answer using the format [Page X] | |
| - Be accurate and concise | |
| - Do not add information not present in the context | |
| Answer:""" | |
| model = genai.GenerativeModel('gemini-pro') # Create Gemini model instance | |
| response = model.generate_content( # Generate response | |
| prompt, | |
| generation_config=genai.types.GenerationConfig( | |
| temperature=0.1, # Low temperature for consistent, factual responses | |
| max_output_tokens=500 # Limit response length | |
| ) | |
| ) | |
| return response.text # Return the generated answer | |
| except Exception as e: # Handle generation errors | |
| return f"Error generating answer: {str(e)}" # Return error message | |
| # EXPLANATION | |
| # These functions handle the AI that generates answers. Gemini reads your question and | |
| # the relevant chunks we found, then creates an answer based only on that information. | |
| # The low temperature setting makes the AI more factual and less creative, which is | |
| # important for accurate answers. | |
| # =============================================== | |
| # ANSWER GENERATION FUNCTION | |
| # =============================================== | |
| def generate_answer(query: str, relevant_chunks: List[Dict]) -> str: | |
| """Main function to generate answers using available LLM.""" | |
| if not relevant_chunks: # If no relevant chunks found | |
| return "β Insufficient evidence" # Return standard message | |
| # Try Gemini if available | |
| if setup_gemini(): # Check if Gemini is configured | |
| return generate_answer_with_gemini(query, relevant_chunks) # Use Gemini | |
| else: | |
| # Fallback response when no LLM is available | |
| return "β No LLM configured. Please add GEMINI_API_KEY to your secrets." | |
| # EXPLANATION | |
| # This is the main function that decides which AI to use for generating answers. | |
| # It first checks if we found any relevant information, then tries to use Gemini. | |
| # If no AI is available, it tells you to add an API key. | |
| # =============================================== | |
| # STREAMLIT UI | |
| # =============================================== | |
| def main(): | |
| """Main Streamlit application.""" | |
| # Page configuration | |
| st.set_page_config( # Configure the web page | |
| page_title="Scholar Lens", # Browser tab title | |
| page_icon="π", # Browser tab icon | |
| layout="wide" # Use full width of browser | |
| ) | |
| # Header | |
| st.title("π Scholar Lens") # Main app title | |
| st.markdown("**AI-Powered Document Q&A System**") # Subtitle | |
| st.markdown("Upload a PDF and ask questions about its content!") # Description | |
| # Initialize session state for storing data | |
| if 'vector_db' not in st.session_state: # Check if database exists in session | |
| st.session_state.vector_db = None # Initialize as None | |
| if 'embedding_model' not in st.session_state: # Check if model exists in session | |
| st.session_state.embedding_model = None # Initialize as None | |
| # Load embedding model | |
| if st.session_state.embedding_model is None: # If model not loaded | |
| with st.spinner("Loading embedding model..."): # Show loading spinner | |
| st.session_state.embedding_model = load_embedding_model() # Load model | |
| # File upload section | |
| st.header("π Upload Your PDF") # Section header | |
| uploaded_file = st.file_uploader( # File upload widget | |
| "Choose a PDF file", | |
| type="pdf", # Only allow PDF files | |
| help="Upload a PDF document to analyze" # Help text | |
| ) | |
| # Process uploaded file | |
| if uploaded_file is not None: # If user uploaded a file | |
| if st.button("π Process PDF"): # Process button | |
| with st.spinner("Processing PDF..."): # Show processing spinner | |
| # Extract text from PDF | |
| pdf_result = extract_text_from_pdf(uploaded_file) # Extract text | |
| if pdf_result['success']: # If extraction successful | |
| st.success(f"β Successfully processed {pdf_result['total_pages']} pages") # Show success | |
| # Create chunks | |
| chunks = create_chunks(pdf_result['pages']) # Split text into chunks | |
| st.info(f"π Created {len(chunks)} text chunks") # Show chunk count | |
| # Create vector database | |
| if st.session_state.embedding_model: # If embedding model is loaded | |
| st.session_state.vector_db = create_vector_database( # Create database | |
| chunks, st.session_state.embedding_model | |
| ) | |
| if st.session_state.vector_db: # If database created successfully | |
| st.success("β Vector database created successfully!") # Show success | |
| else: | |
| st.error("β Failed to create vector database") # Show error | |
| else: | |
| st.error("β Embedding model not available") # Show model error | |
| else: | |
| st.error(f"β Failed to process PDF: {pdf_result['error']}") # Show extraction error | |
| # Question answering section | |
| if st.session_state.vector_db is not None: # If database is ready | |
| st.header("β Ask Questions") # Section header | |
| # Question input | |
| question = st.text_input( # Text input for questions | |
| "Enter your question:", | |
| placeholder="What is the main topic of this document?", # Placeholder text | |
| help="Ask specific questions about the content of your PDF" # Help text | |
| ) | |
| # Answer generation | |
| if st.button("π Ask") and question.strip(): # Ask button and non-empty question | |
| with st.spinner("Finding answer..."): # Show searching spinner | |
| # Query vector database | |
| relevant_chunks = query_vector_database( # Search for relevant chunks | |
| st.session_state.vector_db, | |
| question, | |
| st.session_state.embedding_model | |
| ) | |
| if relevant_chunks: # If relevant chunks found | |
| # Generate answer | |
| answer = generate_answer(question, relevant_chunks) # Get AI answer | |
| # Display results | |
| st.subheader("π¬ Answer") # Answer section header | |
| st.write(answer) # Display the answer | |
| # Display source chunks | |
| st.subheader("π Sources") # Sources section header | |
| for i, chunk in enumerate(relevant_chunks): # Loop through sources | |
| with st.expander(f"Source {i+1} - Page {chunk['page_number']} (Similarity: {chunk['similarity']:.2f})"): # Expandable source | |
| st.write(chunk['text']) # Display chunk text | |
| else: | |
| st.warning("β Insufficient evidence") # No relevant chunks found | |
| else: | |
| st.info("π Please upload and process a PDF to start asking questions") # Instruction message | |
| # Configuration section in sidebar | |
| with st.sidebar: # Sidebar section | |
| st.header("βοΈ Configuration") # Sidebar header | |
| st.write(f"**Similarity Threshold:** {SIMILARITY_THRESHOLD}") # Display current threshold | |
| st.write(f"**Top K Chunks:** {TOP_K_CHUNKS}") # Display current top k | |
| st.write(f"**Chunk Size:** {CHUNK_SIZE} words") # Display chunk size | |
| st.markdown("---") # Horizontal line separator | |
| st.markdown("**How to adjust settings:**") # Instructions header | |
| st.markdown("- Edit constants at the top of `app.py`") # Instruction 1 | |
| st.markdown("- Lower threshold = more lenient matching") # Instruction 2 | |
| st.markdown("- Higher threshold = stricter matching") # Instruction 3 | |
| # Debug section | |
| st.markdown("---") # Horizontal line separator | |
| st.header("π§ Debug Info") # Debug section header | |
| # Check if secrets are accessible | |
| try: | |
| if hasattr(st, 'secrets'): # Check if secrets object exists | |
| available_secrets = list(st.secrets.keys()) if st.secrets else [] # Get secret keys | |
| st.write(f"**Available secrets:** {len(available_secrets)}") # Show count | |
| if 'GEMINI_API_KEY' in available_secrets: # Check if our key exists | |
| st.success("β GEMINI_API_KEY found in secrets") # Success message | |
| else: | |
| st.error("β GEMINI_API_KEY not found in secrets") # Error message | |
| st.write(f"Available keys: {available_secrets}") # Show what's available | |
| else: | |
| st.warning("β οΈ st.secrets not accessible") # Warning message | |
| except Exception as e: | |
| st.error(f"Debug error: {e}") # Show debug errors | |
| # Professional Footer | |
| st.markdown(""" | |
| <div class="footer"> | |
| <p style="margin: 0; font-size: 1rem;"> | |
| Made with β€οΈ using Streamlit & Gemini | Β© 2025 Anaa Jafar | |
| </p> | |
| </div> | |
| """, unsafe_allow_html=True) # Display centered professional footer | |
| # EXPLANATION | |
| # The UI improvements transform the app into a professional, modern interface: | |
| # 1. **Centered Layout**: Custom CSS limits max-width to 900px and centers content | |
| # 2. **Professional Theme**: Gradient header, white containers with shadows, soft gray background | |
| # 3. **Chat-Style Display**: Answers appear in rounded, shaded containers like chat bubbles | |
| # 4. **Enhanced Sidebar**: Organized into clear sections (About App, Settings, Developer info) | |
| # 5. **Better Visual Hierarchy**: Icons, proper spacing, styled buttons and containers | |
| # 6. **Responsive Design**: Uses columns for button centering and better mobile experience | |
| # 7. **Color Coding**: Different colors for different types of information (success=green, warning=yellow) | |
| # These changes make the app more visually appealing and professional while keeping all backend logic intact. | |
| # EXPLANATION | |
| # This is the main user interface of our app. It creates the web page with sections for | |
| # uploading PDFs, processing them, asking questions, and showing answers. The sidebar | |
| # shows current settings. Everything is organized in a logical flow from upload to Q&A. | |
| # =============================================== | |
| # RUN THE APPLICATION | |
| # =============================================== | |
| if __name__ == "__main__": # Only run if this file is executed directly | |
| main() # Start the Streamlit app | |
| # EXPLANATION | |
| # This final section starts our app when the Python file is run. It's like pressing | |
| # the "start" button for our Scholar Lens application. |