docmind / app.py
AI Engineer
Initial commit for DocMind
6cca5b1
Raw
History Blame Contribute Delete
22 kB
"""
DocMind — Grounded RAG Document Intelligence
=============================================
Streamlit entry point: page layout, sidebar, routing, and main Q&A loop.
"""
import logging
import sys
import time
from typing import Dict, List, Optional
import streamlit as st
# ── Setup logging ───────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger("docmind")
# ── Page config (must be first st call) ─────────────────────────────
st.set_page_config(
page_title="DocMind — Grounded RAG",
page_icon="🧠",
layout="wide",
initial_sidebar_state="expanded",
)
# ── Imports (after st.set_page_config) ──────────────────────────────
from config import CONFIG
from pipeline.chunker import (
ChunkMetadata, parse_document, chunk_text, generate_doc_id,
extract_section_titles,
)
from pipeline.embedder import load_bge_model, embed_chunks, embed_query
from pipeline.bm25_index import build_bm25_index, BM25Index
from pipeline.qdrant_store import (
init_qdrant_client, init_collection, clear_collection, upsert_chunks,
)
from pipeline.retriever import hybrid_retrieve
from pipeline.router import route_query, IntentType
from pipeline.llm import generate_answer
from pipeline.attribution import parse_attributed_response, strip_unattributed
from pipeline.grounding import grounding_gate, ConfidenceLevel
from summarizer.quick_summary import generate_quick_summary
from summarizer.structured_summary import generate_structured_summary
from summarizer.key_points import extract_key_points
from ui.styles import get_custom_css
from ui.components import (
render_chat_message, render_empty_state, render_document_status,
render_pipeline_progress, render_retrieval_stats, render_confidence_badge,
render_grounding_bar, render_comparison_table, render_dashboard_metrics,
render_keypoint_card,
)
# ── Inject custom CSS ──────────────────────────────────────────────
st.markdown(get_custom_css(), unsafe_allow_html=True)
# ── Session state initialization ───────────────────────────────────
def init_session_state():
"""Initialize all session state variables."""
defaults = {
"chat_history": [], # List of {role, content, grounding, sources, stats}
"chunk_store": {}, # chunk_id → ChunkMetadata
"bm25_index": None, # BM25Index instance
"doc_info": [], # List of {name, doc_id, chunk_count, page_count, color_idx}
"all_chunks": [], # All chunks across all documents
"doc_index_map": {}, # doc_id → color index
"show_debug": False, # Toggle retrieval debug panel
}
for key, value in defaults.items():
if key not in st.session_state:
st.session_state[key] = value
init_session_state()
# ── Document ingestion ─────────────────────────────────────────────
def ingest_document(uploaded_file, doc_color_index: int) -> Optional[dict]:
"""
Full ingestion pipeline for a single document:
Parse → Chunk → Embed → Index.
Returns doc info dict or None on failure.
"""
filename = uploaded_file.name
file_bytes = uploaded_file.read()
# Validate file size
max_bytes = CONFIG.ui.max_upload_mb * 1024 * 1024
if len(file_bytes) > max_bytes:
st.error(f"❌ File too large: {len(file_bytes) / 1e6:.1f}MB (max {CONFIG.ui.max_upload_mb}MB)")
return None
doc_id = generate_doc_id(filename, file_bytes)
# Check if already ingested
if doc_id in st.session_state.doc_index_map:
st.info(f"📄 '{filename}' is already loaded.")
return None
progress = st.progress(0, text="📄 Parsing document...")
try:
# Stage 1: Parse
pages = parse_document(filename, file_bytes)
progress.progress(25, text="✂️ Chunking text...")
# Stage 2: Chunk
chunks = chunk_text(
pages, doc_id, filename,
max_tokens=CONFIG.chunking.max_tokens,
overlap_tokens=CONFIG.chunking.overlap_tokens,
)
if not chunks:
st.error("❌ No text content found in document.")
progress.empty()
return None
progress.progress(50, text="🔢 Generating embeddings...")
# Stage 3: Embed
embed_model = load_bge_model()
texts = [c.text for c in chunks]
vectors = embed_chunks(embed_model, texts, batch_size=CONFIG.embedding.batch_size)
progress.progress(75, text="📊 Indexing vectors...")
# Stage 4: Index in Qdrant
qdrant_client = init_qdrant_client()
init_collection(qdrant_client)
metadata = [
{"doc_id": c.doc_id, "doc_name": c.doc_name, "page_num": c.page_num}
for c in chunks
]
upsert_chunks(
qdrant_client,
chunk_ids=[c.chunk_id for c in chunks],
vectors=vectors,
metadata=metadata,
)
# Store chunks in session state
for chunk in chunks:
st.session_state.chunk_store[chunk.chunk_id] = chunk
st.session_state.all_chunks.extend(chunks)
# Update BM25 index (rebuilt with all documents)
all_chunk_ids = list(st.session_state.chunk_store.keys())
all_texts = [st.session_state.chunk_store[cid].text for cid in all_chunk_ids]
st.session_state.bm25_index = build_bm25_index(all_chunk_ids, all_texts)
# Track document
st.session_state.doc_index_map[doc_id] = doc_color_index
page_count = len(set(c.page_num for c in chunks))
doc_info = {
"name": filename,
"doc_id": doc_id,
"chunk_count": len(chunks),
"page_count": page_count,
"color_idx": doc_color_index,
}
st.session_state.doc_info.append(doc_info)
progress.progress(100, text="✅ Done!")
time.sleep(0.5)
progress.empty()
st.success(f"✅ **{len(chunks)} chunks** indexed from **{page_count} pages** — {filename}")
logger.info("Ingested '%s': %d chunks, %d pages", filename, len(chunks), page_count)
return doc_info
except Exception as e:
progress.empty()
st.error(f"❌ Ingestion failed: {e}")
logger.exception("Ingestion error for '%s'", filename)
return None
# ── Q&A Pipeline ───────────────────────────────────────────────────
def run_qa_pipeline(query: str) -> dict:
"""
Execute the full 4-layer RAG pipeline:
1. Intent Router
2. Hybrid Retrieval
3. Attributed Generation
4. Grounding Gate
Returns a dict with the answer and all metadata.
"""
result = {
"answer": "",
"grounding": None,
"sources": [],
"stats": None,
"is_handoff": False,
"is_refused": False,
}
# ── Layer 1: Intent Router ──
intent = route_query(query)
if intent.intent_type == IntentType.SENSITIVE:
result["answer"] = intent.handoff_message
result["is_handoff"] = True
return result
if intent.intent_type == IntentType.SUMMARY:
st.info("Generating comprehensive summary...", icon="📊")
result["answer"] = generate_structured_summary(st.session_state.all_chunks)
result["is_handoff"] = True # Treat as handoff to bypass grounding UI
return result
if intent.intent_type == IntentType.KEY_POINTS:
st.info("Extracting key points...", icon="🎯")
kps = extract_key_points(st.session_state.all_chunks)
md_bullets = "\n".join(f"- {kp.text} 📄 {kp.page_ref.replace('[PAGE ', 'p.').replace(']', '')}" for kp in kps)
result["answer"] = md_bullets
result["is_handoff"] = True
return result
# ── Layer 2: Hybrid Retrieval ──
embed_model = load_bge_model()
qdrant_client = init_qdrant_client()
chunks, stats = hybrid_retrieve(
query=query,
embed_model=embed_model,
bm25_index=st.session_state.bm25_index,
qdrant_client=qdrant_client,
chunk_store=st.session_state.chunk_store,
)
result["stats"] = stats
result["sources"] = chunks
if not chunks:
result["answer"] = (
"I couldn't find any relevant information in the uploaded documents "
"for this query. Please try rephrasing or upload a relevant document."
)
result["is_refused"] = True
return result
# ── Layer 3: Attributed Generation ──
raw_answer = generate_answer(chunks, query)
if raw_answer.strip() == "INSUFFICIENT_CONTEXT":
result["answer"] = (
"The uploaded documents don't contain enough relevant information "
"to answer this question confidently."
)
result["is_refused"] = True
return result
# Parse attribution tags
sentences = parse_attributed_response(raw_answer)
valid_ids = set(st.session_state.chunk_store.keys())
sentences = strip_unattributed(sentences, valid_ids)
if not sentences:
result["answer"] = (
"I couldn't generate a verifiable answer from the documents. "
"Try asking something more specific."
)
result["is_refused"] = True
return result
# ── Layer 4: Grounding Gate ──
grounding_result = grounding_gate(sentences, st.session_state.chunk_store)
result["grounding"] = grounding_result
if grounding_result.is_refused:
result["answer"] = grounding_result.refusal_message
result["is_refused"] = True
else:
# Build final answer from grounded sentences
answer_parts = []
for sent in grounding_result.grounded_sentences:
if sent.chunk_id in st.session_state.chunk_store:
chunk = st.session_state.chunk_store[sent.chunk_id]
doc_name = chunk.doc_name
page = chunk.page_num
citation = f'<span class="citation-badge" title="{doc_name} (Page {page})">📄 p.{page}</span>'
answer_parts.append(f"{sent.text} {citation}")
else:
answer_parts.append(sent.text)
result["answer"] = " ".join(answer_parts)
return result
# ── Chat export ────────────────────────────────────────────────────
def export_chat_as_text() -> str:
"""Export chat history as plain text."""
lines = ["DocMind — Chat Export", "=" * 40, ""]
for msg in st.session_state.chat_history:
role = "You" if msg["role"] == "user" else "DocMind"
lines.append(f"{role}: {msg['content']}")
if msg.get("grounding") and not msg["grounding"].is_refused:
lines.append(f" [Grounding Score: {msg['grounding'].overall_score:.1%}]")
lines.append("")
return "\n".join(lines)
# ── Sidebar ────────────────────────────────────────────────────────
with st.sidebar:
# Branding
st.markdown("""
<div class="sidebar-logo">
<h1>🧠 DocMind</h1>
</div>
<div class="sidebar-tagline">Grounded RAG Document Intelligence</div>
""", unsafe_allow_html=True)
st.markdown("---")
# File upload
st.markdown("### 📁 Upload Documents")
uploaded_files = st.file_uploader(
"Drop PDF, DOCX, or TXT files",
type=["pdf", "docx", "txt"],
accept_multiple_files=True,
key="file_uploader",
help=f"Max {CONFIG.ui.max_documents} documents, {CONFIG.ui.max_upload_mb}MB each",
)
if uploaded_files:
if len(uploaded_files) > CONFIG.ui.max_documents:
st.warning(f"⚠️ Maximum {CONFIG.ui.max_documents} documents allowed.")
uploaded_files = uploaded_files[:CONFIG.ui.max_documents]
for idx, file in enumerate(uploaded_files):
doc_id = generate_doc_id(file.name, file.read())
file.seek(0) # Reset after reading
if doc_id not in st.session_state.doc_index_map:
ingest_document(file, doc_color_index=idx)
# Show loaded documents
if st.session_state.doc_info:
st.markdown("### 📄 Loaded Documents")
for info in st.session_state.doc_info:
render_document_status(
info["name"], info["chunk_count"],
info["page_count"], info["color_idx"],
)
st.markdown("---")
# Settings
st.markdown("### ⚙️ Settings")
st.session_state.show_debug = st.toggle(
"Show retrieval debug", value=st.session_state.show_debug
)
# Actions
st.markdown("---")
col1, col2 = st.columns(2)
with col1:
if st.button("🗑️ Clear Chat", use_container_width=True):
st.session_state.chat_history = []
st.rerun()
with col2:
if st.session_state.chat_history:
chat_text = export_chat_as_text()
st.download_button(
"💾 Export",
data=chat_text,
file_name="docmind_chat.txt",
mime="text/plain",
use_container_width=True,
)
# Reset all
if st.button("🔄 Reset Everything", use_container_width=True, type="secondary"):
for key in list(st.session_state.keys()):
del st.session_state[key]
try:
qdrant_client = init_qdrant_client()
clear_collection(qdrant_client)
except Exception:
pass
st.rerun()
# ── Main area ──────────────────────────────────────────────────────
# Main branding header
st.markdown("""
<div style="text-align: center; margin-bottom: 2rem; padding-top: 1rem;">
<h1 style="font-size: 2.8rem; font-weight: 700; margin-bottom: 0.2rem; background: linear-gradient(135deg, #818CF8, #C084FC); -webkit-background-clip: text; -webkit-text-fill-color: transparent;">🧠 DocMind</h1>
<p style="color: #94A3B8; font-size: 1.05rem; letter-spacing: 0.02em;">Enterprise-Grade Document Intelligence with Grounded RAG</p>
</div>
""", unsafe_allow_html=True)
# Tabs
if st.session_state.doc_info:
# Render top-level metrics dashboard
total_docs = len(st.session_state.doc_info)
total_chunks = len(st.session_state.all_chunks)
render_dashboard_metrics(total_docs, total_chunks)
tab_qa, tab_summary, tab_compare = st.tabs([
"💬 Q&A", "📊 Summaries", "📑 Compare Documents"
])
else:
tab_qa = st.container()
tab_summary = None
tab_compare = None
# ── Q&A Tab ────────────────────────────────────────────────────────
with tab_qa:
if not st.session_state.doc_info:
render_empty_state()
else:
# Display chat history
for msg in st.session_state.chat_history:
render_chat_message(
role=msg["role"],
content=msg["content"],
grounding_result=msg.get("grounding"),
sources=msg.get("sources"),
doc_index_map=st.session_state.doc_index_map,
)
if msg["role"] == "bot" and msg.get("stats") and st.session_state.show_debug:
render_retrieval_stats(msg["stats"])
# Query input
query = st.chat_input("Ask a question about your documents...")
if query:
# Add user message to history
st.session_state.chat_history.append({
"role": "user",
"content": query,
})
render_chat_message("user", query)
# Run pipeline
with st.spinner("🔍 Searching documents & verifying claims..."):
try:
result = run_qa_pipeline(query)
except Exception as e:
result = {
"answer": f"🚨 **System Error:** {str(e)}",
"is_refused": True,
"grounding": None,
"sources": [],
"stats": None
}
# Add bot response to history
bot_msg = {
"role": "bot",
"content": result["answer"],
"grounding": result.get("grounding"),
"sources": result.get("sources", []),
"stats": result.get("stats"),
}
st.session_state.chat_history.append(bot_msg)
# Render bot response
render_chat_message(
role="bot",
content=result["answer"],
grounding_result=result.get("grounding"),
sources=result.get("sources"),
doc_index_map=st.session_state.doc_index_map,
)
if result.get("stats") and st.session_state.show_debug:
render_retrieval_stats(result["stats"])
# Trim history
max_history = CONFIG.ui.max_chat_history * 2 # pairs
if len(st.session_state.chat_history) > max_history:
st.session_state.chat_history = st.session_state.chat_history[-max_history:]
# ── Summary Tab ────────────────────────────────────────────────────
if tab_summary:
with tab_summary:
if not st.session_state.doc_info:
st.info("Upload a document first to generate summaries.")
else:
sum_tab1, sum_tab2, sum_tab3 = st.tabs([
"📝 Quick Summary", "📑 Structured Summary", "🎯 Key Points"
])
with sum_tab1:
if st.button("Generate Quick Summary", key="btn_quick", type="primary"):
with st.spinner("Generating summary..."):
summary = generate_quick_summary(st.session_state.all_chunks)
st.markdown(summary)
with sum_tab2:
if st.button("Generate Structured Summary", key="btn_structured", type="primary"):
with st.spinner("Generating structured summary..."):
summary = generate_structured_summary(st.session_state.all_chunks)
st.markdown(summary)
with sum_tab3:
if st.button("Extract Key Points", key="btn_keypoints", type="primary"):
with st.spinner("Extracting key points..."):
embed_model = load_bge_model()
points = extract_key_points(
st.session_state.all_chunks,
embed_model=embed_model,
)
if points:
for pt in points:
page_ref = f"PAGE: {pt.page_ref}" if pt.page_ref else ""
render_keypoint_card(pt.text, page_ref)
else:
st.info("No key points could be extracted.")
# ── Compare Tab ────────────────────────────────────────────────────
if tab_compare:
with tab_compare:
if len(st.session_state.doc_info) < 2:
st.info("Upload at least 2 documents to enable comparison.")
else:
st.markdown("### 📑 Document Comparison")
doc_names = [d["name"] for d in st.session_state.doc_info]
st.markdown(f"Comparing: **{' vs '.join(doc_names)}**")
if st.button("🔄 Generate Comparison", key="btn_compare", type="primary"):
with st.spinner("Comparing documents..."):
# Build comparison prompt
doc_summaries = []
for info in st.session_state.doc_info:
doc_chunks = [
c for c in st.session_state.all_chunks
if c.doc_id == info["doc_id"]
]
# Use first 3 chunks from each doc
excerpt = "\n".join(c.text for c in doc_chunks[:3])
doc_summaries.append(f"**{info['name']}**:\n{excerpt}")
combined = "\n\n---\n\n".join(doc_summaries)
prompt = (
"Compare these documents in a structured table format. "
"Include columns for: Aspect, "
+ ", ".join(doc_names)
+ ". Cover the main topics, scope, key differences, and similarities."
)
from pipeline.llm import generate_summary
comparison = generate_summary(combined, prompt)
st.markdown(comparison)