import streamlit as st from typing import Dict, List, Optional import pandas as pd from datetime import datetime import json import base64 from pathlib import Path import pypdf from PIL import Image import io class DocumentViewer: def __init__(self, case_manager, document_processor): """Initialize DocumentViewer with required components.""" self.case_manager = case_manager self.document_processor = document_processor # Initialize session state for viewer if 'current_document' not in st.session_state: st.session_state.current_document = None if 'current_page' not in st.session_state: st.session_state.current_page = 1 if 'chunk_view' not in st.session_state: st.session_state.chunk_view = False def render(self, case_id: str): """Render enhanced document viewer interface.""" case = self.case_manager.get_case(case_id) if not case: st.error("Case not found") return # Add custom CSS for document viewer self._add_viewer_styles() # Document selection and viewing area col1, col2 = st.columns([1, 3]) with col1: self._render_document_list(case) with col2: if st.session_state.current_document: self._render_document_viewer(st.session_state.current_document) def _add_viewer_styles(self): """Add custom CSS styles for document viewer.""" st.markdown(""" """, unsafe_allow_html=True) def _render_document_list(self, case: Dict): """Render enhanced document list with filtering and sorting.""" st.markdown("### 📑 Documents") # Search and filter options search = st.text_input("🔍 Search documents", key="doc_search") sort_by = st.selectbox( "Sort by", ["Recent first", "Name A-Z", "Name Z-A", "Size", "Type"], key="doc_sort" ) # Get and sort documents docs = case['documents'] if search: docs = [d for d in docs if search.lower() in d['title'].lower()] if sort_by == "Recent first": docs.sort(key=lambda x: x['added_at'], reverse=True) elif sort_by == "Name A-Z": docs.sort(key=lambda x: x['title']) elif sort_by == "Name Z-A": docs.sort(key=lambda x: x['title'], reverse=True) # Display document list for doc in docs: doc_container = st.container() with doc_container: if st.button( f"📄 {doc['title'][:30]}...", key=f"select_{doc['id']}", help=f"Click to view {doc['title']}" ): st.session_state.current_document = doc st.session_state.current_page = 1 st.session_state.chunk_view = False def _render_document_viewer(self, document: Dict): """Render enhanced document viewer with multiple view modes.""" st.markdown(f"### 📄 {document['title']}") # Document controls col1, col2, col3 = st.columns([2, 1, 1]) with col1: view_mode = st.radio( "View Mode", ["Document", "Chunks", "Analysis"], horizontal=True, key="view_mode" ) with col2: if st.button("Download", key="download_doc"): self._handle_download(document) with col3: if st.button("Process Again", key="reprocess_doc"): self._reprocess_document(document) # Render selected view mode if view_mode == "Document": self._render_document_content(document) elif view_mode == "Chunks": self._render_chunk_view(document) else: self._render_analysis_view(document) # Document metadata with st.expander("📋 Document Metadata", expanded=False): self._render_metadata(document) def _render_document_content(self, document: Dict): """Render document content with page navigation for PDFs.""" content = document.get('content', '') file_type = document.get('metadata', {}).get('file_type', '').lower() if file_type == 'pdf': self._render_pdf_viewer(document) elif file_type in ['png', 'jpg', 'jpeg']: self._render_image_viewer(document) else: st.text_area( "Content", value=content[:10000] + ("..." if len(content) > 10000 else ""), height=400, disabled=True ) def _render_pdf_viewer(self, document: Dict): """Render PDF viewer with navigation controls.""" pdf_path = document.get('file_path') if not pdf_path: st.error("PDF file path not found") return try: reader = pypdf.PdfReader(pdf_path) num_pages = len(reader.pages) # Navigation controls col1, col2, col3 = st.columns([1, 2, 1]) with col1: if st.button("◀️ Previous") and st.session_state.current_page > 1: st.session_state.current_page -= 1 with col2: st.session_state.current_page = st.select_slider( "Page", range(1, num_pages + 1), value=st.session_state.current_page ) with col3: if st.button("Next ▶️") and st.session_state.current_page < num_pages: st.session_state.current_page += 1 # Display current page page = reader.pages[st.session_state.current_page - 1] text = page.extract_text() st.text_area("Page Content", value=text, height=400, disabled=True) except Exception as e: st.error(f"Error displaying PDF: {str(e)}") def _render_chunk_view(self, document: Dict): """Render document chunks with enhanced visualization.""" chunks = document.get('chunks', []) if not chunks: st.warning("No chunks available. Try reprocessing the document.") return # Chunk navigation and filtering search = st.text_input("🔍 Search in chunks", key="chunk_search") # Display chunks for idx, chunk in enumerate(chunks): if not search or search.lower() in chunk['text'].lower(): with st.container(): st.markdown(f"""
Chunk {idx + 1}

{self._highlight_text(chunk['text'], search) if search else chunk['text']}

Characters: {len(chunk['text'])} | Location: {chunk.get('start_idx', 'N/A')} - {chunk.get('end_idx', 'N/A')}
""", unsafe_allow_html=True) def _render_analysis_view(self, document: Dict): """Render document analysis with insights.""" metadata = document.get('metadata', {}) # Document statistics st.markdown("### 📊 Document Analysis") col1, col2 = st.columns(2) with col1: st.metric("Total Pages", metadata.get('page_count', 'N/A')) st.metric("Word Count", metadata.get('word_count', 'N/A')) with col2: st.metric("Chunks", len(document.get('chunks', []))) st.metric("File Size", self._format_file_size(metadata.get('file_size', 0))) # Entity extraction if 'entities' in metadata: st.markdown("### 🏷️ Named Entities") entities_df = pd.DataFrame(metadata['entities']) st.dataframe(entities_df) # Key phrases if 'key_phrases' in metadata: st.markdown("### 🔑 Key Phrases") st.write(", ".join(metadata['key_phrases'])) def _render_metadata(self, document: Dict): """Render detailed document metadata.""" metadata = document.get('metadata', {}) col1, col2 = st.columns(2) with col1: st.markdown("**Basic Information**") st.write(f"Type: {metadata.get('file_type', 'Unknown')}") st.write(f"Added: {document.get('added_at', 'Unknown')}") st.write(f"Size: {self._format_file_size(metadata.get('file_size', 0))}") with col2: st.markdown("**Processing Information**") st.write(f"Chunks: {len(document.get('chunks', []))}") st.write(f"Processing Status: {metadata.get('processing_status', 'Unknown')}") st.write(f"Last Updated: {metadata.get('updated_at', 'Unknown')}") def _highlight_text(self, text: str, search: str) -> str: """Highlight search terms in text.""" if not search: return text return text.replace( search, f'{search}' ) def _format_file_size(self, size_bytes: int) -> str: """Format file size in human-readable format.""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024 return f"{size_bytes:.1f} TB" def _handle_download(self, document: Dict): """Handle document download with progress bar.""" try: file_path = document.get('file_path') if file_path and Path(file_path).exists(): with open(file_path, 'rb') as f: data = f.read() st.download_button( "Download File", data, file_name=document['title'], mime=document.get('metadata', {}).get('mime_type', 'application/octet-stream') ) else: st.error("File not found") except Exception as e: st.error(f"Error downloading file: {str(e)}") def _reprocess_document(self, document: Dict): """Reprocess document to update chunks and metadata.""" try: with st.spinner("Reprocessing document..."): # Get file path file_path = document.get('file_path') if not file_path: st.error("File not found") return # Reprocess document with open(file_path, 'rb') as f: text, chunks, metadata = self.document_processor.process_and_tag_document(f) # Update document in case manager document.update({ 'content': text, 'chunks': chunks, 'metadata': {**document.get('metadata', {}), **metadata}, 'updated_at': datetime.now().isoformat() }) self.case_manager.update_document(document['id'], document) st.success("Document reprocessed successfully") st.experimental_rerun() except Exception as e: st.error(f"Error reprocessing document: {str(e)}") def render_chunk_references(self, references: List[Dict]): """Render referenced chunks with enhanced visualization.""" st.markdown("### 📎 Referenced Chunks") for ref in references: with st.container(): st.markdown(f"""
{ref['title']} (Chunk {ref['chunk_id']})

{ref['snippet']}

Page: {ref.get('page', 'N/A')} | Confidence: {ref.get('confidence', 'N/A')}
""", unsafe_allow_html=True)