""" DocMind — Grounded RAG Document Intelligence ============================================= Streamlit entry point: page layout, sidebar, routing, and main Q&A loop. """ import logging import sys import time from typing import Dict, List, Optional import streamlit as st # ── Setup logging ─────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(name)s | %(levelname)s | %(message)s", stream=sys.stdout, ) logger = logging.getLogger("docmind") # ── Page config (must be first st call) ───────────────────────────── st.set_page_config( page_title="DocMind — Grounded RAG", page_icon="🧠", layout="wide", initial_sidebar_state="expanded", ) # ── Imports (after st.set_page_config) ────────────────────────────── from config import CONFIG from pipeline.chunker import ( ChunkMetadata, parse_document, chunk_text, generate_doc_id, extract_section_titles, ) from pipeline.embedder import load_bge_model, embed_chunks, embed_query from pipeline.bm25_index import build_bm25_index, BM25Index from pipeline.qdrant_store import ( init_qdrant_client, init_collection, clear_collection, upsert_chunks, ) from pipeline.retriever import hybrid_retrieve from pipeline.router import route_query, IntentType from pipeline.llm import generate_answer from pipeline.attribution import parse_attributed_response, strip_unattributed from pipeline.grounding import grounding_gate, ConfidenceLevel from summarizer.quick_summary import generate_quick_summary from summarizer.structured_summary import generate_structured_summary from summarizer.key_points import extract_key_points from ui.styles import get_custom_css from ui.components import ( render_chat_message, render_empty_state, render_document_status, render_pipeline_progress, render_retrieval_stats, render_confidence_badge, render_grounding_bar, render_comparison_table, render_dashboard_metrics, render_keypoint_card, ) # ── Inject custom CSS ────────────────────────────────────────────── st.markdown(get_custom_css(), unsafe_allow_html=True) # ── Session state initialization ─────────────────────────────────── def init_session_state(): """Initialize all session state variables.""" defaults = { "chat_history": [], # List of {role, content, grounding, sources, stats} "chunk_store": {}, # chunk_id → ChunkMetadata "bm25_index": None, # BM25Index instance "doc_info": [], # List of {name, doc_id, chunk_count, page_count, color_idx} "all_chunks": [], # All chunks across all documents "doc_index_map": {}, # doc_id → color index "show_debug": False, # Toggle retrieval debug panel } for key, value in defaults.items(): if key not in st.session_state: st.session_state[key] = value init_session_state() # ── Document ingestion ───────────────────────────────────────────── def ingest_document(uploaded_file, doc_color_index: int) -> Optional[dict]: """ Full ingestion pipeline for a single document: Parse → Chunk → Embed → Index. Returns doc info dict or None on failure. """ filename = uploaded_file.name file_bytes = uploaded_file.read() # Validate file size max_bytes = CONFIG.ui.max_upload_mb * 1024 * 1024 if len(file_bytes) > max_bytes: st.error(f"❌ File too large: {len(file_bytes) / 1e6:.1f}MB (max {CONFIG.ui.max_upload_mb}MB)") return None doc_id = generate_doc_id(filename, file_bytes) # Check if already ingested if doc_id in st.session_state.doc_index_map: st.info(f"📄 '{filename}' is already loaded.") return None progress = st.progress(0, text="📄 Parsing document...") try: # Stage 1: Parse pages = parse_document(filename, file_bytes) progress.progress(25, text="✂️ Chunking text...") # Stage 2: Chunk chunks = chunk_text( pages, doc_id, filename, max_tokens=CONFIG.chunking.max_tokens, overlap_tokens=CONFIG.chunking.overlap_tokens, ) if not chunks: st.error("❌ No text content found in document.") progress.empty() return None progress.progress(50, text="🔢 Generating embeddings...") # Stage 3: Embed embed_model = load_bge_model() texts = [c.text for c in chunks] vectors = embed_chunks(embed_model, texts, batch_size=CONFIG.embedding.batch_size) progress.progress(75, text="📊 Indexing vectors...") # Stage 4: Index in Qdrant qdrant_client = init_qdrant_client() init_collection(qdrant_client) metadata = [ {"doc_id": c.doc_id, "doc_name": c.doc_name, "page_num": c.page_num} for c in chunks ] upsert_chunks( qdrant_client, chunk_ids=[c.chunk_id for c in chunks], vectors=vectors, metadata=metadata, ) # Store chunks in session state for chunk in chunks: st.session_state.chunk_store[chunk.chunk_id] = chunk st.session_state.all_chunks.extend(chunks) # Update BM25 index (rebuilt with all documents) all_chunk_ids = list(st.session_state.chunk_store.keys()) all_texts = [st.session_state.chunk_store[cid].text for cid in all_chunk_ids] st.session_state.bm25_index = build_bm25_index(all_chunk_ids, all_texts) # Track document st.session_state.doc_index_map[doc_id] = doc_color_index page_count = len(set(c.page_num for c in chunks)) doc_info = { "name": filename, "doc_id": doc_id, "chunk_count": len(chunks), "page_count": page_count, "color_idx": doc_color_index, } st.session_state.doc_info.append(doc_info) progress.progress(100, text="✅ Done!") time.sleep(0.5) progress.empty() st.success(f"✅ **{len(chunks)} chunks** indexed from **{page_count} pages** — {filename}") logger.info("Ingested '%s': %d chunks, %d pages", filename, len(chunks), page_count) return doc_info except Exception as e: progress.empty() st.error(f"❌ Ingestion failed: {e}") logger.exception("Ingestion error for '%s'", filename) return None # ── Q&A Pipeline ─────────────────────────────────────────────────── def run_qa_pipeline(query: str) -> dict: """ Execute the full 4-layer RAG pipeline: 1. Intent Router 2. Hybrid Retrieval 3. Attributed Generation 4. Grounding Gate Returns a dict with the answer and all metadata. """ result = { "answer": "", "grounding": None, "sources": [], "stats": None, "is_handoff": False, "is_refused": False, } # ── Layer 1: Intent Router ── intent = route_query(query) if intent.intent_type == IntentType.SENSITIVE: result["answer"] = intent.handoff_message result["is_handoff"] = True return result if intent.intent_type == IntentType.SUMMARY: st.info("Generating comprehensive summary...", icon="📊") result["answer"] = generate_structured_summary(st.session_state.all_chunks) result["is_handoff"] = True # Treat as handoff to bypass grounding UI return result if intent.intent_type == IntentType.KEY_POINTS: st.info("Extracting key points...", icon="🎯") kps = extract_key_points(st.session_state.all_chunks) md_bullets = "\n".join(f"- {kp.text} 📄 {kp.page_ref.replace('[PAGE ', 'p.').replace(']', '')}" for kp in kps) result["answer"] = md_bullets result["is_handoff"] = True return result # ── Layer 2: Hybrid Retrieval ── embed_model = load_bge_model() qdrant_client = init_qdrant_client() chunks, stats = hybrid_retrieve( query=query, embed_model=embed_model, bm25_index=st.session_state.bm25_index, qdrant_client=qdrant_client, chunk_store=st.session_state.chunk_store, ) result["stats"] = stats result["sources"] = chunks if not chunks: result["answer"] = ( "I couldn't find any relevant information in the uploaded documents " "for this query. Please try rephrasing or upload a relevant document." ) result["is_refused"] = True return result # ── Layer 3: Attributed Generation ── raw_answer = generate_answer(chunks, query) if raw_answer.strip() == "INSUFFICIENT_CONTEXT": result["answer"] = ( "The uploaded documents don't contain enough relevant information " "to answer this question confidently." ) result["is_refused"] = True return result # Parse attribution tags sentences = parse_attributed_response(raw_answer) valid_ids = set(st.session_state.chunk_store.keys()) sentences = strip_unattributed(sentences, valid_ids) if not sentences: result["answer"] = ( "I couldn't generate a verifiable answer from the documents. " "Try asking something more specific." ) result["is_refused"] = True return result # ── Layer 4: Grounding Gate ── grounding_result = grounding_gate(sentences, st.session_state.chunk_store) result["grounding"] = grounding_result if grounding_result.is_refused: result["answer"] = grounding_result.refusal_message result["is_refused"] = True else: # Build final answer from grounded sentences answer_parts = [] for sent in grounding_result.grounded_sentences: if sent.chunk_id in st.session_state.chunk_store: chunk = st.session_state.chunk_store[sent.chunk_id] doc_name = chunk.doc_name page = chunk.page_num citation = f'📄 p.{page}' answer_parts.append(f"{sent.text} {citation}") else: answer_parts.append(sent.text) result["answer"] = " ".join(answer_parts) return result # ── Chat export ──────────────────────────────────────────────────── def export_chat_as_text() -> str: """Export chat history as plain text.""" lines = ["DocMind — Chat Export", "=" * 40, ""] for msg in st.session_state.chat_history: role = "You" if msg["role"] == "user" else "DocMind" lines.append(f"{role}: {msg['content']}") if msg.get("grounding") and not msg["grounding"].is_refused: lines.append(f" [Grounding Score: {msg['grounding'].overall_score:.1%}]") lines.append("") return "\n".join(lines) # ── Sidebar ──────────────────────────────────────────────────────── with st.sidebar: # Branding st.markdown("""
""", unsafe_allow_html=True) st.markdown("---") # File upload st.markdown("### 📁 Upload Documents") uploaded_files = st.file_uploader( "Drop PDF, DOCX, or TXT files", type=["pdf", "docx", "txt"], accept_multiple_files=True, key="file_uploader", help=f"Max {CONFIG.ui.max_documents} documents, {CONFIG.ui.max_upload_mb}MB each", ) if uploaded_files: if len(uploaded_files) > CONFIG.ui.max_documents: st.warning(f"⚠️ Maximum {CONFIG.ui.max_documents} documents allowed.") uploaded_files = uploaded_files[:CONFIG.ui.max_documents] for idx, file in enumerate(uploaded_files): doc_id = generate_doc_id(file.name, file.read()) file.seek(0) # Reset after reading if doc_id not in st.session_state.doc_index_map: ingest_document(file, doc_color_index=idx) # Show loaded documents if st.session_state.doc_info: st.markdown("### 📄 Loaded Documents") for info in st.session_state.doc_info: render_document_status( info["name"], info["chunk_count"], info["page_count"], info["color_idx"], ) st.markdown("---") # Settings st.markdown("### ⚙️ Settings") st.session_state.show_debug = st.toggle( "Show retrieval debug", value=st.session_state.show_debug ) # Actions st.markdown("---") col1, col2 = st.columns(2) with col1: if st.button("🗑️ Clear Chat", use_container_width=True): st.session_state.chat_history = [] st.rerun() with col2: if st.session_state.chat_history: chat_text = export_chat_as_text() st.download_button( "💾 Export", data=chat_text, file_name="docmind_chat.txt", mime="text/plain", use_container_width=True, ) # Reset all if st.button("🔄 Reset Everything", use_container_width=True, type="secondary"): for key in list(st.session_state.keys()): del st.session_state[key] try: qdrant_client = init_qdrant_client() clear_collection(qdrant_client) except Exception: pass st.rerun() # ── Main area ────────────────────────────────────────────────────── # Main branding header st.markdown("""Enterprise-Grade Document Intelligence with Grounded RAG