| | """ |
| | Unified State Manager for SPARKNET Demo |
| | |
| | Enhanced state management for cross-module communication (Phase 1B): |
| | - Document processing state tracking |
| | - Indexed documents registry |
| | - Cross-module event system (pub/sub) |
| | - Real-time status updates |
| | - Evidence highlighting synchronization |
| | - Document selection synchronization |
| | - Query/response sharing between modules |
| | """ |
| |
|
| | import streamlit as st |
| | from pathlib import Path |
| | from typing import Dict, List, Any, Optional, Callable, Set |
| | from dataclasses import dataclass, field |
| | from datetime import datetime |
| | from enum import Enum |
| | import hashlib |
| | import json |
| | import sys |
| | import time |
| | from threading import Lock |
| |
|
| | PROJECT_ROOT = Path(__file__).parent.parent |
| | sys.path.insert(0, str(PROJECT_ROOT)) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class EventType(str, Enum): |
| | """Cross-module event types for synchronization.""" |
| | DOCUMENT_SELECTED = "document_selected" |
| | DOCUMENT_PROCESSED = "document_processed" |
| | DOCUMENT_INDEXED = "document_indexed" |
| | DOCUMENT_REMOVED = "document_removed" |
| | CHUNK_SELECTED = "chunk_selected" |
| | EVIDENCE_HIGHLIGHT = "evidence_highlight" |
| | RAG_QUERY_STARTED = "rag_query_started" |
| | RAG_QUERY_COMPLETED = "rag_query_completed" |
| | PAGE_CHANGED = "page_changed" |
| | PROCESSING_STARTED = "processing_started" |
| | PROCESSING_COMPLETED = "processing_completed" |
| | SYSTEM_STATUS_CHANGED = "system_status_changed" |
| |
|
| |
|
| | @dataclass |
| | class Event: |
| | """Cross-module event for synchronization.""" |
| | event_type: EventType |
| | source_module: str |
| | payload: Dict[str, Any] |
| | timestamp: datetime = field(default_factory=datetime.now) |
| | event_id: str = field(default_factory=lambda: hashlib.md5( |
| | f"{time.time()}".encode() |
| | ).hexdigest()[:8]) |
| |
|
| |
|
| | @dataclass |
| | class EvidenceHighlight: |
| | """Evidence highlight for cross-module visualization.""" |
| | doc_id: str |
| | chunk_id: str |
| | page: int |
| | bbox: tuple |
| | text_snippet: str |
| | confidence: float |
| | source_query: Optional[str] = None |
| | highlight_color: str = "#FFE082" |
| |
|
| |
|
| | @dataclass |
| | class ProcessedDocument: |
| | """Represents a processed document with all extracted data.""" |
| | doc_id: str |
| | filename: str |
| | file_type: str |
| | raw_text: str |
| | chunks: List[Dict[str, Any]] |
| | page_count: int = 1 |
| | page_images: List[bytes] = field(default_factory=list) |
| | ocr_regions: List[Dict[str, Any]] = field(default_factory=list) |
| | layout_data: Dict[str, Any] = field(default_factory=dict) |
| | metadata: Dict[str, Any] = field(default_factory=dict) |
| | indexed: bool = False |
| | indexed_chunks: int = 0 |
| | processing_time: float = 0.0 |
| | created_at: datetime = field(default_factory=datetime.now) |
| |
|
| | def to_dict(self) -> Dict[str, Any]: |
| | return { |
| | "doc_id": self.doc_id, |
| | "filename": self.filename, |
| | "file_type": self.file_type, |
| | "text_length": len(self.raw_text), |
| | "chunk_count": len(self.chunks), |
| | "page_count": self.page_count, |
| | "ocr_region_count": len(self.ocr_regions), |
| | "indexed": self.indexed, |
| | "indexed_chunks": self.indexed_chunks, |
| | "processing_time": self.processing_time, |
| | "created_at": self.created_at.isoformat(), |
| | } |
| |
|
| |
|
| | @dataclass |
| | class ProcessingStatus: |
| | """Tracks processing status for a document.""" |
| | doc_id: str |
| | stage: str |
| | progress: float |
| | message: str |
| | started_at: datetime = field(default_factory=datetime.now) |
| | completed_at: Optional[datetime] = None |
| | error: Optional[str] = None |
| |
|
| |
|
| | class UnifiedStateManager: |
| | """ |
| | Central state manager for SPARKNET demo. |
| | |
| | Enhanced with Phase 1B features: |
| | - Document processing state tracking |
| | - Indexed documents registry |
| | - Cross-module event system (pub/sub) |
| | - Real-time status updates |
| | - Evidence highlighting sync |
| | - Query/response sharing |
| | """ |
| |
|
| | def __init__(self): |
| | self._ensure_session_state() |
| | self._event_handlers: Dict[EventType, List[Callable]] = {} |
| |
|
| | def _ensure_session_state(self): |
| | """Initialize session state if not exists.""" |
| | if "unified_state" not in st.session_state: |
| | st.session_state.unified_state = { |
| | "documents": {}, |
| | "processing_status": {}, |
| | "indexed_doc_ids": set(), |
| | "active_doc_id": None, |
| | "active_page": 0, |
| | "active_chunk_id": None, |
| | "notifications": [], |
| | "rag_ready": False, |
| | "total_indexed_chunks": 0, |
| | "last_update": datetime.now().isoformat(), |
| | |
| | "event_queue": [], |
| | "evidence_highlights": [], |
| | "last_rag_query": None, |
| | "last_rag_response": None, |
| | "selected_sources": [], |
| | "module_states": {}, |
| | "sync_version": 0, |
| | } |
| |
|
| | @property |
| | def state(self) -> Dict: |
| | """Get the unified state dict.""" |
| | self._ensure_session_state() |
| | return st.session_state.unified_state |
| |
|
| | |
| |
|
| | def add_document(self, doc: ProcessedDocument) -> str: |
| | """Add a processed document to the state.""" |
| | self.state["documents"][doc.doc_id] = doc |
| | self._notify(f"Document '{doc.filename}' added", "info") |
| | self._update_timestamp() |
| | return doc.doc_id |
| |
|
| | def get_document(self, doc_id: str) -> Optional[ProcessedDocument]: |
| | """Get a document by ID.""" |
| | return self.state["documents"].get(doc_id) |
| |
|
| | def get_all_documents(self) -> List[ProcessedDocument]: |
| | """Get all documents.""" |
| | return list(self.state["documents"].values()) |
| |
|
| | def get_indexed_documents(self) -> List[ProcessedDocument]: |
| | """Get only indexed documents.""" |
| | return [d for d in self.state["documents"].values() if d.indexed] |
| |
|
| | def remove_document(self, doc_id: str): |
| | """Remove a document from state.""" |
| | if doc_id in self.state["documents"]: |
| | doc = self.state["documents"].pop(doc_id) |
| | self.state["indexed_doc_ids"].discard(doc_id) |
| | self._notify(f"Document '{doc.filename}' removed", "warning") |
| | self._update_timestamp() |
| |
|
| | def set_active_document(self, doc_id: Optional[str]): |
| | """Set the currently active document.""" |
| | self.state["active_doc_id"] = doc_id |
| | self._update_timestamp() |
| |
|
| | def get_active_document(self) -> Optional[ProcessedDocument]: |
| | """Get the currently active document.""" |
| | if self.state["active_doc_id"]: |
| | return self.get_document(self.state["active_doc_id"]) |
| | return None |
| |
|
| | |
| |
|
| | def start_processing(self, doc_id: str, filename: str): |
| | """Start processing a document.""" |
| | status = ProcessingStatus( |
| | doc_id=doc_id, |
| | stage="loading", |
| | progress=0.0, |
| | message=f"Loading {filename}..." |
| | ) |
| | self.state["processing_status"][doc_id] = status |
| | self._update_timestamp() |
| |
|
| | def update_processing(self, doc_id: str, stage: str, progress: float, message: str): |
| | """Update processing status.""" |
| | if doc_id in self.state["processing_status"]: |
| | status = self.state["processing_status"][doc_id] |
| | status.stage = stage |
| | status.progress = progress |
| | status.message = message |
| | self._update_timestamp() |
| |
|
| | def complete_processing(self, doc_id: str, success: bool = True, error: str = None): |
| | """Mark processing as complete.""" |
| | if doc_id in self.state["processing_status"]: |
| | status = self.state["processing_status"][doc_id] |
| | status.stage = "complete" if success else "error" |
| | status.progress = 1.0 if success else status.progress |
| | status.completed_at = datetime.now() |
| | status.error = error |
| | status.message = "Processing complete!" if success else f"Error: {error}" |
| |
|
| | if success: |
| | self._notify(f"Document processed successfully!", "success") |
| | else: |
| | self._notify(f"Processing failed: {error}", "error") |
| |
|
| | self._update_timestamp() |
| |
|
| | def get_processing_status(self, doc_id: str) -> Optional[ProcessingStatus]: |
| | """Get processing status for a document.""" |
| | return self.state["processing_status"].get(doc_id) |
| |
|
| | def is_processing(self, doc_id: str) -> bool: |
| | """Check if document is being processed.""" |
| | status = self.get_processing_status(doc_id) |
| | return status is not None and status.stage not in ["complete", "error"] |
| |
|
| | |
| |
|
| | def mark_indexed(self, doc_id: str, chunk_count: int): |
| | """Mark a document as indexed to RAG.""" |
| | if doc_id in self.state["documents"]: |
| | doc = self.state["documents"][doc_id] |
| | doc.indexed = True |
| | doc.indexed_chunks = chunk_count |
| | self.state["indexed_doc_ids"].add(doc_id) |
| | self.state["total_indexed_chunks"] += chunk_count |
| | self._notify(f"Indexed {chunk_count} chunks from '{doc.filename}'", "success") |
| | self._update_timestamp() |
| |
|
| | def is_indexed(self, doc_id: str) -> bool: |
| | """Check if document is indexed.""" |
| | return doc_id in self.state["indexed_doc_ids"] |
| |
|
| | def get_total_indexed_chunks(self) -> int: |
| | """Get total number of indexed chunks.""" |
| | return self.state["total_indexed_chunks"] |
| |
|
| | |
| |
|
| | def _notify(self, message: str, level: str = "info"): |
| | """Add a notification.""" |
| | self.state["notifications"].append({ |
| | "message": message, |
| | "level": level, |
| | "timestamp": datetime.now().isoformat(), |
| | }) |
| | |
| | if len(self.state["notifications"]) > 50: |
| | self.state["notifications"] = self.state["notifications"][-50:] |
| |
|
| | def get_notifications(self, limit: int = 10) -> List[Dict]: |
| | """Get recent notifications.""" |
| | return self.state["notifications"][-limit:] |
| |
|
| | def clear_notifications(self): |
| | """Clear all notifications.""" |
| | self.state["notifications"] = [] |
| |
|
| | |
| |
|
| | def set_rag_ready(self, ready: bool): |
| | """Set RAG system ready status.""" |
| | self.state["rag_ready"] = ready |
| | self._update_timestamp() |
| |
|
| | def is_rag_ready(self) -> bool: |
| | """Check if RAG is ready.""" |
| | return self.state["rag_ready"] |
| |
|
| | |
| |
|
| | def _update_timestamp(self): |
| | """Update the last update timestamp.""" |
| | self.state["last_update"] = datetime.now().isoformat() |
| | self.state["sync_version"] += 1 |
| |
|
| | def get_summary(self) -> Dict[str, Any]: |
| | """Get a summary of current state.""" |
| | return { |
| | "total_documents": len(self.state["documents"]), |
| | "indexed_documents": len(self.state["indexed_doc_ids"]), |
| | "total_indexed_chunks": self.state["total_indexed_chunks"], |
| | "active_doc_id": self.state["active_doc_id"], |
| | "active_page": self.state.get("active_page", 0), |
| | "rag_ready": self.state["rag_ready"], |
| | "last_update": self.state["last_update"], |
| | "sync_version": self.state.get("sync_version", 0), |
| | "processing_count": sum( |
| | 1 for s in self.state["processing_status"].values() |
| | if s.stage not in ["complete", "error"] |
| | ), |
| | "evidence_count": len(self.state.get("evidence_highlights", [])), |
| | } |
| |
|
| | def reset(self): |
| | """Reset all state.""" |
| | st.session_state.unified_state = { |
| | "documents": {}, |
| | "processing_status": {}, |
| | "indexed_doc_ids": set(), |
| | "active_doc_id": None, |
| | "active_page": 0, |
| | "active_chunk_id": None, |
| | "notifications": [], |
| | "rag_ready": False, |
| | "total_indexed_chunks": 0, |
| | "last_update": datetime.now().isoformat(), |
| | "event_queue": [], |
| | "evidence_highlights": [], |
| | "last_rag_query": None, |
| | "last_rag_response": None, |
| | "selected_sources": [], |
| | "module_states": {}, |
| | "sync_version": 0, |
| | } |
| |
|
| | |
| |
|
| | def publish_event( |
| | self, |
| | event_type: EventType, |
| | source_module: str, |
| | payload: Dict[str, Any] |
| | ) -> Event: |
| | """ |
| | Publish an event for cross-module synchronization. |
| | |
| | Args: |
| | event_type: Type of event |
| | source_module: Name of module publishing the event |
| | payload: Event data |
| | |
| | Returns: |
| | The created Event object |
| | """ |
| | event = Event( |
| | event_type=event_type, |
| | source_module=source_module, |
| | payload=payload |
| | ) |
| |
|
| | |
| | self.state["event_queue"].append(event) |
| |
|
| | |
| | if len(self.state["event_queue"]) > 100: |
| | self.state["event_queue"] = self.state["event_queue"][-100:] |
| |
|
| | |
| | if event_type in self._event_handlers: |
| | for handler in self._event_handlers[event_type]: |
| | try: |
| | handler(event) |
| | except Exception as e: |
| | self._notify(f"Event handler error: {e}", "error") |
| |
|
| | self._update_timestamp() |
| | return event |
| |
|
| | def subscribe(self, event_type: EventType, handler: Callable[[Event], None]): |
| | """ |
| | Subscribe to an event type. |
| | |
| | Args: |
| | event_type: Type of event to subscribe to |
| | handler: Callback function to handle the event |
| | """ |
| | if event_type not in self._event_handlers: |
| | self._event_handlers[event_type] = [] |
| | self._event_handlers[event_type].append(handler) |
| |
|
| | def unsubscribe(self, event_type: EventType, handler: Callable[[Event], None]): |
| | """Unsubscribe from an event type.""" |
| | if event_type in self._event_handlers: |
| | self._event_handlers[event_type] = [ |
| | h for h in self._event_handlers[event_type] if h != handler |
| | ] |
| |
|
| | def get_recent_events( |
| | self, |
| | event_type: Optional[EventType] = None, |
| | limit: int = 10 |
| | ) -> List[Event]: |
| | """Get recent events, optionally filtered by type.""" |
| | events = self.state.get("event_queue", []) |
| |
|
| | if event_type: |
| | events = [e for e in events if e.event_type == event_type] |
| |
|
| | return events[-limit:] |
| |
|
| | |
| |
|
| | def add_evidence_highlight(self, highlight: EvidenceHighlight): |
| | """ |
| | Add an evidence highlight for cross-module visualization. |
| | |
| | Used when RAG finds relevant evidence that should be displayed |
| | in the Document Viewer or Evidence Viewer. |
| | """ |
| | self.state["evidence_highlights"].append(highlight) |
| |
|
| | |
| | self.publish_event( |
| | EventType.EVIDENCE_HIGHLIGHT, |
| | source_module="rag", |
| | payload={ |
| | "doc_id": highlight.doc_id, |
| | "chunk_id": highlight.chunk_id, |
| | "page": highlight.page, |
| | "bbox": highlight.bbox, |
| | "text_snippet": highlight.text_snippet[:100], |
| | } |
| | ) |
| |
|
| | self._update_timestamp() |
| |
|
| | def clear_evidence_highlights(self, doc_id: Optional[str] = None): |
| | """Clear evidence highlights, optionally for a specific document.""" |
| | if doc_id: |
| | self.state["evidence_highlights"] = [ |
| | h for h in self.state["evidence_highlights"] |
| | if h.doc_id != doc_id |
| | ] |
| | else: |
| | self.state["evidence_highlights"] = [] |
| |
|
| | self._update_timestamp() |
| |
|
| | def get_evidence_highlights( |
| | self, |
| | doc_id: Optional[str] = None, |
| | page: Optional[int] = None |
| | ) -> List[EvidenceHighlight]: |
| | """Get evidence highlights, optionally filtered by doc_id and page.""" |
| | highlights = self.state.get("evidence_highlights", []) |
| |
|
| | if doc_id: |
| | highlights = [h for h in highlights if h.doc_id == doc_id] |
| |
|
| | if page is not None: |
| | highlights = [h for h in highlights if h.page == page] |
| |
|
| | return highlights |
| |
|
| | |
| |
|
| | def select_page(self, page: int, source_module: str = "unknown"): |
| | """ |
| | Set the active page and notify other modules. |
| | |
| | Used for synchronized scrolling between Document Viewer and Evidence Viewer. |
| | """ |
| | old_page = self.state.get("active_page", 0) |
| | self.state["active_page"] = page |
| |
|
| | if old_page != page: |
| | self.publish_event( |
| | EventType.PAGE_CHANGED, |
| | source_module=source_module, |
| | payload={"page": page, "previous_page": old_page} |
| | ) |
| |
|
| | def get_active_page(self) -> int: |
| | """Get the currently active page.""" |
| | return self.state.get("active_page", 0) |
| |
|
| | def select_chunk( |
| | self, |
| | chunk_id: str, |
| | doc_id: str, |
| | source_module: str = "unknown" |
| | ): |
| | """ |
| | Select a chunk and navigate to its location. |
| | |
| | Publishes event to trigger synchronized navigation. |
| | """ |
| | self.state["active_chunk_id"] = chunk_id |
| |
|
| | |
| | doc = self.get_document(doc_id) |
| | if doc: |
| | for chunk in doc.chunks: |
| | if chunk.get("chunk_id") == chunk_id: |
| | page = chunk.get("page", 0) |
| | self.select_page(page, source_module) |
| |
|
| | self.publish_event( |
| | EventType.CHUNK_SELECTED, |
| | source_module=source_module, |
| | payload={ |
| | "chunk_id": chunk_id, |
| | "doc_id": doc_id, |
| | "page": page, |
| | "bbox": chunk.get("bbox"), |
| | } |
| | ) |
| | break |
| |
|
| | def get_active_chunk_id(self) -> Optional[str]: |
| | """Get the currently selected chunk ID.""" |
| | return self.state.get("active_chunk_id") |
| |
|
| | |
| |
|
| | def store_rag_query( |
| | self, |
| | query: str, |
| | response: Dict[str, Any], |
| | sources: List[Dict[str, Any]] |
| | ): |
| | """ |
| | Store the last RAG query and response for cross-module access. |
| | |
| | Allows Evidence Viewer to display sources from Interactive RAG. |
| | """ |
| | self.state["last_rag_query"] = query |
| | self.state["last_rag_response"] = response |
| | self.state["selected_sources"] = sources |
| |
|
| | |
| | self.clear_evidence_highlights() |
| |
|
| | for source in sources: |
| | if all(k in source for k in ["doc_id", "chunk_id", "page"]): |
| | bbox = source.get("bbox", (0, 0, 1, 1)) |
| | if isinstance(bbox, dict): |
| | bbox = (bbox.get("x_min", 0), bbox.get("y_min", 0), |
| | bbox.get("x_max", 1), bbox.get("y_max", 1)) |
| |
|
| | highlight = EvidenceHighlight( |
| | doc_id=source["doc_id"], |
| | chunk_id=source["chunk_id"], |
| | page=source["page"], |
| | bbox=bbox, |
| | text_snippet=source.get("text", "")[:200], |
| | confidence=source.get("score", 0.0), |
| | source_query=query, |
| | ) |
| | self.add_evidence_highlight(highlight) |
| |
|
| | self.publish_event( |
| | EventType.RAG_QUERY_COMPLETED, |
| | source_module="rag", |
| | payload={ |
| | "query": query, |
| | "source_count": len(sources), |
| | "response_length": len(str(response)), |
| | } |
| | ) |
| |
|
| | self._update_timestamp() |
| |
|
| | def get_last_rag_query(self) -> Optional[str]: |
| | """Get the last RAG query.""" |
| | return self.state.get("last_rag_query") |
| |
|
| | def get_last_rag_response(self) -> Optional[Dict[str, Any]]: |
| | """Get the last RAG response.""" |
| | return self.state.get("last_rag_response") |
| |
|
| | def get_selected_sources(self) -> List[Dict[str, Any]]: |
| | """Get the sources from the last RAG query.""" |
| | return self.state.get("selected_sources", []) |
| |
|
| | |
| |
|
| | def set_module_state(self, module_name: str, state: Dict[str, Any]): |
| | """ |
| | Store custom state for a specific module. |
| | |
| | Allows modules to persist their own state across reruns. |
| | """ |
| | self.state["module_states"][module_name] = { |
| | **state, |
| | "updated_at": datetime.now().isoformat() |
| | } |
| |
|
| | def get_module_state(self, module_name: str) -> Dict[str, Any]: |
| | """Get custom state for a specific module.""" |
| | return self.state.get("module_states", {}).get(module_name, {}) |
| |
|
| | def get_sync_version(self) -> int: |
| | """ |
| | Get the current sync version. |
| | |
| | Modules can use this to detect if state has changed since last check. |
| | """ |
| | return self.state.get("sync_version", 0) |
| |
|
| |
|
| | def generate_doc_id(filename: str, content_hash: str = None) -> str: |
| | """Generate a unique document ID.""" |
| | timestamp = datetime.now().strftime("%Y%m%d%H%M%S") |
| | base = f"{filename}_{timestamp}" |
| | if content_hash: |
| | base = f"{base}_{content_hash[:8]}" |
| | return hashlib.md5(base.encode()).hexdigest()[:12] |
| |
|
| |
|
| | def get_state_manager() -> UnifiedStateManager: |
| | """Get or create the unified state manager.""" |
| | if "state_manager_instance" not in st.session_state: |
| | st.session_state.state_manager_instance = UnifiedStateManager() |
| | return st.session_state.state_manager_instance |
| |
|
| |
|
| | |
| |
|
| | def render_global_status_bar(): |
| | """Render a global status bar showing system state.""" |
| | manager = get_state_manager() |
| | summary = manager.get_summary() |
| |
|
| | |
| | try: |
| | from rag_config import get_unified_rag_system, check_ollama |
| | rag_system = get_unified_rag_system() |
| | ollama_ok, models = check_ollama() |
| | rag_status = rag_system["status"] |
| | llm_model = rag_system.get("llm_model", "N/A") |
| | except: |
| | ollama_ok = False |
| | rag_status = "error" |
| | llm_model = "N/A" |
| | models = [] |
| |
|
| | |
| | cols = st.columns(6) |
| |
|
| | with cols[0]: |
| | if ollama_ok: |
| | st.success(f"Ollama ({len(models)})") |
| | else: |
| | st.error("Ollama Offline") |
| |
|
| | with cols[1]: |
| | if rag_status == "ready": |
| | st.success("RAG Ready") |
| | else: |
| | st.error("RAG Error") |
| |
|
| | with cols[2]: |
| | st.info(f"{llm_model.split(':')[0]}") |
| |
|
| | with cols[3]: |
| | st.info(f"{summary['total_documents']} Docs") |
| |
|
| | with cols[4]: |
| | if summary['indexed_documents'] > 0: |
| | st.success(f"{summary['total_indexed_chunks']} Chunks") |
| | else: |
| | st.warning("0 Chunks") |
| |
|
| | with cols[5]: |
| | if summary['processing_count'] > 0: |
| | st.warning(f"Processing...") |
| | else: |
| | st.info("Idle") |
| |
|
| |
|
| | def render_notifications(): |
| | """Render recent notifications.""" |
| | manager = get_state_manager() |
| | notifications = manager.get_notifications(5) |
| |
|
| | if notifications: |
| | for notif in reversed(notifications): |
| | level = notif["level"] |
| | msg = notif["message"] |
| | if level == "success": |
| | st.success(msg) |
| | elif level == "error": |
| | st.error(msg) |
| | elif level == "warning": |
| | st.warning(msg) |
| | else: |
| | st.info(msg) |
| |
|
| |
|
| | |
| |
|
| | def render_evidence_panel(): |
| | """ |
| | Render a panel showing current evidence highlights. |
| | |
| | Can be used in any module to show sources from RAG queries. |
| | """ |
| | manager = get_state_manager() |
| | highlights = manager.get_evidence_highlights() |
| |
|
| | if not highlights: |
| | st.info("No evidence highlights. Run a RAG query to see sources.") |
| | return |
| |
|
| | st.subheader(f"Evidence Sources ({len(highlights)})") |
| |
|
| | for i, h in enumerate(highlights): |
| | with st.expander(f"Source {i+1}: Page {h.page + 1} ({h.confidence:.0%})"): |
| | st.markdown(f"**Document:** {h.doc_id}") |
| | st.markdown(f"**Text:** {h.text_snippet}") |
| |
|
| | if h.source_query: |
| | st.markdown(f"**Query:** _{h.source_query}_") |
| |
|
| | |
| | if st.button(f"View in Document", key=f"view_source_{i}"): |
| | manager.set_active_document(h.doc_id) |
| | manager.select_page(h.page, "evidence_panel") |
| | manager.select_chunk(h.chunk_id, h.doc_id, "evidence_panel") |
| | st.rerun() |
| |
|
| |
|
| | def render_sync_status(): |
| | """Render sync status indicator for debugging.""" |
| | manager = get_state_manager() |
| | summary = manager.get_summary() |
| |
|
| | with st.expander("Sync Status", expanded=False): |
| | st.json({ |
| | "sync_version": summary["sync_version"], |
| | "active_doc": summary["active_doc_id"], |
| | "active_page": summary["active_page"], |
| | "evidence_count": summary["evidence_count"], |
| | "last_update": summary["last_update"], |
| | }) |
| |
|
| | |
| | events = manager.get_recent_events(limit=5) |
| | if events: |
| | st.subheader("Recent Events") |
| | for event in reversed(events): |
| | st.text(f"{event.event_type.value}: {event.source_module}") |
| |
|
| |
|
| | def render_document_selector(): |
| | """ |
| | Render a document selector that syncs with state manager. |
| | |
| | Returns the selected document ID. |
| | """ |
| | manager = get_state_manager() |
| | documents = manager.get_all_documents() |
| |
|
| | if not documents: |
| | st.info("No documents uploaded. Upload a document to get started.") |
| | return None |
| |
|
| | |
| | active_doc_id = manager.state.get("active_doc_id") |
| |
|
| | |
| | options = {doc.doc_id: f"{doc.filename} ({doc.indexed_chunks} chunks)" for doc in documents} |
| | option_list = list(options.keys()) |
| |
|
| | |
| | current_index = option_list.index(active_doc_id) if active_doc_id in option_list else 0 |
| |
|
| | |
| | selected_id = st.selectbox( |
| | "Select Document", |
| | options=option_list, |
| | format_func=lambda x: options[x], |
| | index=current_index, |
| | key="global_doc_selector" |
| | ) |
| |
|
| | |
| | if selected_id != active_doc_id: |
| | manager.set_active_document(selected_id) |
| | manager.publish_event( |
| | EventType.DOCUMENT_SELECTED, |
| | source_module="selector", |
| | payload={"doc_id": selected_id} |
| | ) |
| |
|
| | return selected_id |
| |
|
| |
|
| | def create_sync_callback(module_name: str) -> Callable: |
| | """ |
| | Create a rerun callback for a module. |
| | |
| | Returns a function that can be used as an event handler |
| | to trigger Streamlit rerun when relevant events occur. |
| | """ |
| | def callback(event: Event): |
| | |
| | if event.source_module != module_name: |
| | |
| | st.session_state[f"_{module_name}_needs_rerun"] = True |
| |
|
| | return callback |
| |
|