SPARKNET / demo /pages /1_πŸ”¬_Live_Processing.py
MHamdan's picture
Fix: Use backend API for document processing on Streamlit Cloud
c1a790c
"""
Live Document Processing Demo - SPARKNET
Real-time document processing with integrated state management and auto-indexing.
"""
import streamlit as st
import sys
from pathlib import Path
import time
import io
import base64
from datetime import datetime
import hashlib
PROJECT_ROOT = Path(__file__).parent.parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
sys.path.insert(0, str(PROJECT_ROOT / "demo"))
# Import state manager and RAG config
from state_manager import (
get_state_manager,
ProcessedDocument as StateDocument,
generate_doc_id,
render_global_status_bar,
)
from rag_config import (
get_unified_rag_system,
auto_index_processed_document,
check_ollama,
)
st.set_page_config(page_title="Live Processing - SPARKNET", page_icon="πŸ”¬", layout="wide")
# Authentication
from auth import check_password, show_logout_button
if not check_password():
st.stop()
show_logout_button()
# Custom CSS
st.markdown("""
<style>
.stage-card {
background: linear-gradient(135deg, #1a1a2e 0%, #16213e 100%);
padding: 15px;
border-radius: 10px;
margin: 10px 0;
border-left: 4px solid #4ECDC4;
}
.stage-active {
border-left-color: #ffc107;
animation: pulse 1s infinite;
}
.stage-done {
border-left-color: #28a745;
}
.stage-error {
border-left-color: #dc3545;
}
@keyframes pulse {
0% { opacity: 1; }
50% { opacity: 0.7; }
100% { opacity: 1; }
}
.metric-card {
background: #161b22;
border-radius: 8px;
padding: 12px;
text-align: center;
border: 1px solid #30363d;
}
.metric-value {
font-size: 24px;
font-weight: bold;
color: #4ECDC4;
}
.metric-label {
font-size: 11px;
color: #8b949e;
text-transform: uppercase;
}
.action-btn {
margin: 5px;
}
.nav-card {
background: #0d1117;
border-radius: 10px;
padding: 15px;
margin: 10px 0;
border: 1px solid #30363d;
cursor: pointer;
}
.nav-card:hover {
border-color: #4ECDC4;
}
</style>
""", unsafe_allow_html=True)
# Initialize state manager
state_manager = get_state_manager()
def process_document_actual(file_bytes: bytes, filename: str, options: dict) -> dict:
"""
Process document using the actual document processing pipeline.
Returns processing results with all extracted data.
Priority:
1. Backend API (GPU server) - if configured
2. Local processing - if dependencies available
3. Fallback text extraction
"""
import tempfile
import os
# First, try to use backend API if configured
try:
from backend_client import BackendClient, is_backend_configured
if is_backend_configured():
client = BackendClient()
response = client.process_document(
file_bytes=file_bytes,
filename=filename,
ocr_engine=options.get("ocr_engine", "paddleocr"),
max_pages=options.get("max_pages", 10),
enable_layout=options.get("enable_layout", True),
preserve_tables=options.get("preserve_tables", True),
)
if response.success:
return {
"success": True,
"raw_text": response.data.get("text", ""),
"chunks": response.data.get("chunks", []),
"ocr_regions": response.data.get("ocr_regions", []),
"layout_regions": response.data.get("layout_regions", []),
"page_count": response.data.get("page_count", 0),
"ocr_confidence": response.data.get("ocr_confidence", 0.0),
"layout_confidence": response.data.get("layout_confidence", 0.0),
}
# Backend failed, continue to local processing
except Exception as e:
pass # Backend not available, try local processing
# Create temp file for local processing
suffix = Path(filename).suffix
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp.write(file_bytes)
tmp_path = tmp.name
try:
# Try to use actual document processor locally
try:
from src.document.pipeline.processor import (
DocumentProcessor,
PipelineConfig,
)
from src.document.ocr import OCRConfig
from src.document.layout import LayoutConfig
from src.document.chunking.chunker import ChunkerConfig
# Configure chunking with table preservation options
chunker_config = ChunkerConfig(
preserve_table_structure=options.get("preserve_tables", True),
detect_table_headers=options.get("detect_headers", True),
chunk_tables=True,
chunk_figures=True,
include_captions=True,
)
# Configure layout detection
layout_config = LayoutConfig(
method="rule_based",
detect_tables=True,
detect_figures=True,
detect_headers=True,
detect_titles=True,
detect_lists=True,
min_confidence=0.3, # Lower threshold to detect more regions
heading_font_ratio=1.1, # More sensitive heading detection
)
# Configure pipeline with all options
config = PipelineConfig(
ocr=OCRConfig(engine=options.get("ocr_engine", "paddleocr")),
layout=layout_config,
chunking=chunker_config,
max_pages=options.get("max_pages", 10),
include_ocr_regions=True,
include_layout_regions=options.get("enable_layout", True),
generate_full_text=True,
)
processor = DocumentProcessor(config)
processor.initialize()
# Process document
result = processor.process(tmp_path)
# Convert to dict format for state
chunks_list = []
for chunk in result.chunks:
chunks_list.append({
"chunk_id": chunk.chunk_id,
"text": chunk.text,
"page": chunk.page,
"chunk_type": chunk.chunk_type.value,
"confidence": chunk.confidence,
"bbox": chunk.bbox.to_xyxy() if chunk.bbox else None,
})
ocr_regions = []
for region in result.ocr_regions:
ocr_regions.append({
"text": region.text,
"confidence": region.confidence,
"page": region.page,
"bbox": region.bbox.to_xyxy() if region.bbox else None,
})
layout_regions = []
for region in result.layout_regions:
layout_regions.append({
"id": region.id,
"type": region.type.value,
"confidence": region.confidence,
"page": region.page,
"bbox": region.bbox.to_xyxy() if region.bbox else None,
})
return {
"success": True,
"raw_text": result.full_text,
"chunks": chunks_list,
"ocr_regions": ocr_regions,
"layout_regions": layout_regions,
"page_count": result.metadata.num_pages,
"ocr_confidence": result.metadata.ocr_confidence_avg or 0.0,
"layout_confidence": result.metadata.layout_confidence_avg or 0.0,
}
except Exception as e:
# Fallback: Use simple text extraction
return process_document_fallback(file_bytes, filename, options, str(e))
finally:
# Cleanup
if os.path.exists(tmp_path):
os.unlink(tmp_path)
def process_document_fallback(file_bytes: bytes, filename: str, options: dict, reason: str) -> dict:
"""
Fallback document processing using simple text extraction.
"""
text = ""
page_count = 1
suffix = Path(filename).suffix.lower()
# Try PyMuPDF for PDFs
if suffix == ".pdf":
try:
import fitz
pdf_stream = io.BytesIO(file_bytes)
doc = fitz.open(stream=pdf_stream, filetype="pdf")
page_count = len(doc)
max_pages = min(options.get("max_pages", 5), page_count)
text_parts = []
for page_num in range(max_pages):
page = doc[page_num]
text_parts.append(f"--- Page {page_num + 1} ---\n{page.get_text()}")
text = "\n\n".join(text_parts)
doc.close()
except Exception as pdf_e:
text = f"PDF extraction failed: {pdf_e}"
elif suffix in [".txt", ".md"]:
try:
text = file_bytes.decode("utf-8")
except:
text = file_bytes.decode("latin-1", errors="ignore")
else:
text = f"Unsupported file type: {suffix}"
# Simple chunking
chunk_size = 500
overlap = 50
chunks = []
for i in range(0, len(text), chunk_size - overlap):
chunk_text = text[i:i + chunk_size]
if len(chunk_text.strip()) > 20:
chunks.append({
"chunk_id": f"chunk_{len(chunks)}",
"text": chunk_text,
"page": 0,
"chunk_type": "text",
"confidence": 0.9,
"bbox": None,
})
return {
"success": True,
"raw_text": text,
"chunks": chunks,
"ocr_regions": [],
"layout_regions": [],
"page_count": page_count,
"ocr_confidence": 0.9,
"layout_confidence": 0.0,
"fallback_reason": reason,
}
def get_page_images(file_bytes: bytes, filename: str, max_pages: int = 5) -> list:
"""Extract page images from PDF for visualization."""
images = []
suffix = Path(filename).suffix.lower()
if suffix == ".pdf":
try:
import fitz
pdf_stream = io.BytesIO(file_bytes)
doc = fitz.open(stream=pdf_stream, filetype="pdf")
page_count = min(len(doc), max_pages)
for page_num in range(page_count):
page = doc[page_num]
pix = page.get_pixmap(dpi=100)
img_bytes = pix.tobytes("png")
images.append({
"page": page_num,
"data": base64.b64encode(img_bytes).decode(),
"width": pix.width,
"height": pix.height,
})
doc.close()
except:
pass
return images
# Header
st.markdown("# πŸ”¬ Live Document Processing")
st.markdown("Process documents in real-time with auto-indexing to RAG")
# Global status bar
render_global_status_bar()
st.markdown("---")
# Main content
col_upload, col_status = st.columns([2, 1])
with col_upload:
st.markdown("### πŸ“€ Upload Document")
uploaded_file = st.file_uploader(
"Choose a document",
type=["pdf", "txt", "md"],
help="Upload PDF, TXT, or MD files for processing"
)
# Or select from existing files
docs_path = PROJECT_ROOT / "Dataset"
existing_docs = sorted([f.name for f in docs_path.glob("*.pdf")]) if docs_path.exists() else []
if existing_docs:
st.markdown("**Or select from samples:**")
selected_sample = st.selectbox("Sample documents", ["-- Select --"] + existing_docs)
with col_status:
st.markdown("### πŸ“Š System Status")
ollama_ok, models = check_ollama()
rag_system = get_unified_rag_system()
rag_mode = rag_system.get("mode", "error")
# Check for cloud providers
try:
from rag_config import check_cloud_providers
cloud_providers = check_cloud_providers()
except:
cloud_providers = {}
status_cols = st.columns(2)
with status_cols[0]:
if ollama_ok:
st.success(f"Ollama ({len(models)})")
elif cloud_providers:
st.info(f"Cloud ({len(cloud_providers)})")
else:
st.warning("Demo Mode")
with status_cols[1]:
if rag_system["status"] == "ready":
st.success("RAG Ready")
elif rag_mode == "cloud":
st.info("Cloud LLM")
elif rag_mode == "demo":
st.warning("Demo Mode")
else:
st.error("RAG Error")
# State summary
summary = state_manager.get_summary()
st.metric("Processed Docs", summary["total_documents"])
# Show different metrics based on mode
if rag_mode == "cloud":
st.metric("Cloud Providers", len(cloud_providers))
st.caption("RAG indexing requires Ollama")
else:
st.metric("Indexed Chunks", summary["total_indexed_chunks"])
st.markdown("---")
# Processing Options
st.markdown("### βš™οΈ Processing Options")
opt_cols = st.columns(4)
with opt_cols[0]:
ocr_engine = st.radio("OCR Engine", ["paddleocr", "tesseract"], horizontal=True,
help="PaddleOCR is faster and more accurate for most documents")
with opt_cols[1]:
max_pages = st.slider("Max pages", 1, 50, 10, help="Maximum number of pages to process")
with opt_cols[2]:
enable_layout = st.checkbox("Layout detection", value=True,
help="Detect tables, figures, headings and other layout elements")
with opt_cols[3]:
auto_index = st.checkbox("Auto-index to RAG", value=True,
help="Automatically index processed documents for RAG queries")
# Advanced options (expanded by default for visibility)
with st.expander("πŸ”§ Advanced Options", expanded=False):
adv_cols = st.columns(3)
with adv_cols[0]:
preserve_tables = st.checkbox("Preserve table structure", value=True,
help="Convert tables to markdown format with structure")
with adv_cols[1]:
detect_headers = st.checkbox("Detect table headers", value=True,
help="Automatically identify header rows in tables")
with adv_cols[2]:
generate_embeddings = st.checkbox("Generate embeddings", value=True,
help="Create embeddings for semantic search")
# Determine what to process
file_to_process = None
file_bytes = None
filename = None
if uploaded_file is not None:
file_bytes = uploaded_file.read()
filename = uploaded_file.name
file_to_process = "upload"
elif existing_docs and selected_sample != "-- Select --":
file_path = docs_path / selected_sample
file_bytes = file_path.read_bytes()
filename = selected_sample
file_to_process = "sample"
# Process button
if file_to_process and st.button("πŸš€ Start Processing", type="primary", use_container_width=True):
# Generate document ID
content_hash = hashlib.md5(file_bytes[:1000]).hexdigest()[:8]
doc_id = generate_doc_id(filename, content_hash)
# Start processing in state manager
state_manager.start_processing(doc_id, filename)
# Pipeline stages
stages = [
("loading", "πŸ“„ Loading Document", "Reading and preparing document..."),
("ocr", f"πŸ” {ocr_engine.upper()} Extraction", "Extracting text from document..."),
("layout", "πŸ“ Layout Detection", "Identifying document structure..."),
("chunking", "βœ‚οΈ Semantic Chunking", "Creating meaningful text chunks..."),
("indexing", "πŸ“š RAG Indexing", "Adding to vector store..."),
]
# Progress container
progress_container = st.container()
results_container = st.container()
with progress_container:
progress_bar = st.progress(0)
status_text = st.empty()
# Metrics row
metric_cols = st.columns(5)
metric_placeholders = {
"pages": metric_cols[0].empty(),
"ocr_regions": metric_cols[1].empty(),
"layout_regions": metric_cols[2].empty(),
"chunks": metric_cols[3].empty(),
"confidence": metric_cols[4].empty(),
}
processing_start = time.time()
processing_result = None
error_msg = None
try:
# Stage 1: Loading
status_text.markdown("**πŸ“„ Loading document...**")
state_manager.update_processing(doc_id, "loading", 0.1, "Loading document...")
progress_bar.progress(10)
time.sleep(0.3)
# Get page images for visualization
page_images = get_page_images(file_bytes, filename, max_pages)
metric_placeholders["pages"].metric("Pages", len(page_images) if page_images else "N/A")
# Stage 2-3: OCR + Layout
status_text.markdown(f"**πŸ” Running {ocr_engine.upper()}...**")
state_manager.update_processing(doc_id, "ocr", 0.3, f"Running {ocr_engine}...")
progress_bar.progress(30)
# Actual processing with all options
options = {
"ocr_engine": ocr_engine,
"max_pages": max_pages,
"enable_layout": enable_layout,
"preserve_tables": preserve_tables,
"detect_headers": detect_headers,
"generate_embeddings": generate_embeddings,
}
processing_result = process_document_actual(file_bytes, filename, options)
# Update metrics
metric_placeholders["pages"].metric("Pages", processing_result.get("page_count", 0))
metric_placeholders["ocr_regions"].metric("OCR Regions", len(processing_result.get("ocr_regions", [])))
status_text.markdown("**πŸ“ Layout detection...**")
state_manager.update_processing(doc_id, "layout", 0.5, "Detecting layout...")
progress_bar.progress(50)
time.sleep(0.2)
metric_placeholders["layout_regions"].metric("Layout Regions", len(processing_result.get("layout_regions", [])))
# Stage 4: Chunking
status_text.markdown("**βœ‚οΈ Creating chunks...**")
state_manager.update_processing(doc_id, "chunking", 0.7, "Creating chunks...")
progress_bar.progress(70)
time.sleep(0.2)
chunks = processing_result.get("chunks", [])
metric_placeholders["chunks"].metric("Chunks", len(chunks))
metric_placeholders["confidence"].metric(
"Confidence",
f"{processing_result.get('ocr_confidence', 0) * 100:.0f}%"
)
# Stage 5: RAG Indexing
indexed_count = 0
if auto_index and chunks:
if rag_system["status"] == "ready":
status_text.markdown("**πŸ“š Indexing to RAG...**")
state_manager.update_processing(doc_id, "indexing", 0.9, "Indexing to RAG...")
progress_bar.progress(90)
# Auto-index
index_result = auto_index_processed_document(
doc_id=doc_id,
text=processing_result.get("raw_text", ""),
chunks=chunks,
metadata={"filename": filename, "source": file_to_process}
)
if index_result["success"]:
indexed_count = index_result["num_chunks"]
state_manager.mark_indexed(doc_id, indexed_count)
elif rag_mode == "cloud":
status_text.markdown("**☁️ Cloud mode - skipping RAG indexing...**")
state_manager.update_processing(doc_id, "indexing", 0.9, "Cloud mode - no indexing")
progress_bar.progress(90)
# In cloud mode, document is processed but not indexed
# Users can still query documents via cloud LLM
# Complete
progress_bar.progress(100)
processing_time = time.time() - processing_start
# Add to state manager
state_doc = StateDocument(
doc_id=doc_id,
filename=filename,
file_type=Path(filename).suffix[1:].upper(),
raw_text=processing_result.get("raw_text", ""),
chunks=chunks,
page_count=processing_result.get("page_count", 1),
page_images=[img["data"] for img in page_images],
ocr_regions=processing_result.get("ocr_regions", []),
layout_data={"regions": processing_result.get("layout_regions", [])},
indexed=indexed_count > 0,
indexed_chunks=indexed_count,
processing_time=processing_time,
)
state_manager.add_document(state_doc)
state_manager.complete_processing(doc_id, success=True)
state_manager.set_active_document(doc_id)
status_text.success(f"βœ… Processing complete in {processing_time:.2f}s!")
except Exception as e:
error_msg = str(e)
state_manager.complete_processing(doc_id, success=False, error=error_msg)
status_text.error(f"❌ Processing failed: {error_msg}")
# Results
if processing_result and processing_result.get("success"):
with results_container:
st.markdown("---")
st.markdown("### πŸ“‹ Processing Results")
# Summary cards
sum_cols = st.columns(5)
sum_cols[0].markdown(f"""
<div class="metric-card">
<div class="metric-value">{processing_result.get('page_count', 0)}</div>
<div class="metric-label">Pages</div>
</div>
""", unsafe_allow_html=True)
sum_cols[1].markdown(f"""
<div class="metric-card">
<div class="metric-value">{len(processing_result.get('ocr_regions', []))}</div>
<div class="metric-label">OCR Regions</div>
</div>
""", unsafe_allow_html=True)
sum_cols[2].markdown(f"""
<div class="metric-card">
<div class="metric-value">{len(processing_result.get('layout_regions', []))}</div>
<div class="metric-label">Layout Regions</div>
</div>
""", unsafe_allow_html=True)
sum_cols[3].markdown(f"""
<div class="metric-card">
<div class="metric-value">{len(chunks)}</div>
<div class="metric-label">Chunks</div>
</div>
""", unsafe_allow_html=True)
sum_cols[4].markdown(f"""
<div class="metric-card">
<div class="metric-value">{indexed_count}</div>
<div class="metric-label">Indexed</div>
</div>
""", unsafe_allow_html=True)
# Show fallback warning prominently if fallback was used
if processing_result.get("fallback_reason"):
st.error(f"⚠️ **Fallback Mode**: Document processor failed, using simple text extraction. Layout detection unavailable. Reason: {processing_result['fallback_reason']}")
# Tabs for detailed results
tab_text, tab_chunks, tab_layout, tab_pages = st.tabs([
"πŸ“ Extracted Text",
"πŸ“¦ Chunks",
"πŸ—ΊοΈ Layout",
"πŸ“„ Pages"
])
with tab_text:
text_preview = processing_result.get("raw_text", "")[:5000]
if len(processing_result.get("raw_text", "")) > 5000:
text_preview += "\n\n... [truncated] ..."
st.text_area("Full Text", text_preview, height=400)
if processing_result.get("fallback_reason"):
st.warning(f"Using fallback extraction: {processing_result['fallback_reason']}")
with tab_chunks:
for i, chunk in enumerate(chunks[:20]):
chunk_type = chunk.get("chunk_type", "text")
conf = chunk.get("confidence", 0)
color = "#4ECDC4" if conf > 0.8 else "#ffc107" if conf > 0.6 else "#dc3545"
with st.expander(f"[{i+1}] {chunk_type.upper()} - {chunk.get('text', '')[:50]}..."):
col1, col2, col3 = st.columns([2, 1, 1])
col1.markdown(f"**Chunk ID:** `{chunk.get('chunk_id', 'N/A')}`")
col2.markdown(f"**Page:** {chunk.get('page', 0) + 1}")
col3.markdown(f"**Confidence:** <span style='color:{color}'>{conf:.0%}</span>", unsafe_allow_html=True)
st.code(chunk.get("text", ""), language=None)
if len(chunks) > 20:
st.info(f"Showing 20 of {len(chunks)} chunks")
with tab_layout:
layout_regions = processing_result.get("layout_regions", [])
if layout_regions:
# Group by type
by_type = {}
for r in layout_regions:
t = r.get("type", "unknown")
by_type[t] = by_type.get(t, 0) + 1
st.markdown("**Detected Region Types:**")
type_cols = st.columns(min(len(by_type), 6))
for i, (rtype, count) in enumerate(by_type.items()):
type_cols[i % 6].metric(rtype.title(), count)
st.markdown("**Regions:**")
for r in layout_regions[:15]:
conf = r.get("confidence", 0)
color = "#4ECDC4" if conf > 0.8 else "#ffc107" if conf > 0.6 else "#dc3545"
st.markdown(f"- **{r.get('type', 'unknown').upper()}** (page {r.get('page', 0) + 1}) - Confidence: <span style='color:{color}'>{conf:.0%}</span>", unsafe_allow_html=True)
else:
# Provide helpful message based on cause
if processing_result.get("fallback_reason"):
st.warning("Layout detection unavailable - document processor is using fallback mode. Check the error message above.")
elif not enable_layout:
st.info("Layout detection is disabled. Enable it in the options above.")
else:
st.info("No layout regions detected. The document may have minimal structure or the OCR results didn't contain enough text patterns for layout analysis.")
with tab_pages:
if page_images:
for img_data in page_images:
st.markdown(f"**Page {img_data['page'] + 1}** ({img_data['width']}x{img_data['height']})")
st.image(
f"data:image/png;base64,{img_data['data']}",
use_container_width=True
)
else:
st.info("Page images not available")
# Navigation to other modules
st.markdown("---")
st.markdown("### πŸ”— Continue With This Document")
nav_cols = st.columns(3)
with nav_cols[0]:
st.markdown("""
<div class="nav-card">
<h4>πŸ’¬ Interactive RAG</h4>
<p style="color: #8b949e;">Ask questions about this document using the RAG system.</p>
</div>
""", unsafe_allow_html=True)
if st.button("Go to Interactive RAG", key="nav_rag", use_container_width=True):
st.switch_page("pages/2_πŸ’¬_Interactive_RAG.py")
with nav_cols[1]:
st.markdown("""
<div class="nav-card">
<h4>πŸ“„ Document Viewer</h4>
<p style="color: #8b949e;">View chunks, layout, and visual annotations.</p>
</div>
""", unsafe_allow_html=True)
if st.button("Go to Document Viewer", key="nav_viewer", use_container_width=True):
st.switch_page("pages/5_πŸ“„_Document_Viewer.py")
with nav_cols[2]:
st.markdown("""
<div class="nav-card">
<h4>🎯 Evidence Viewer</h4>
<p style="color: #8b949e;">Inspect OCR regions and evidence grounding.</p>
</div>
""", unsafe_allow_html=True)
if st.button("Go to Evidence Viewer", key="nav_evidence", use_container_width=True):
st.switch_page("pages/4_🎯_Evidence_Viewer.py")
# Show recent processed documents
st.markdown("---")
st.markdown("### πŸ“š Recently Processed")
all_docs = state_manager.get_all_documents()
if all_docs:
for doc in reversed(all_docs[-5:]):
col1, col2, col3, col4 = st.columns([3, 1, 1, 1])
col1.markdown(f"**{doc.filename}** (`{doc.doc_id[:8]}...`)")
col2.markdown(f"πŸ“„ {doc.page_count} pages")
col3.markdown(f"πŸ“¦ {len(doc.chunks)} chunks")
if doc.indexed:
col4.success(f"βœ“ Indexed ({doc.indexed_chunks})")
else:
col4.warning("Not indexed")
else:
st.info("No documents processed yet. Upload or select a document above.")