Spaces:
Build error
Build error
| import streamlit as st | |
| from typing import Dict, List, Optional | |
| import pandas as pd | |
| from datetime import datetime | |
| import json | |
| import base64 | |
| from pathlib import Path | |
| import pypdf | |
| from PIL import Image | |
| import io | |
| class DocumentViewer: | |
| def __init__(self, case_manager, document_processor): | |
| """Initialize DocumentViewer with required components.""" | |
| self.case_manager = case_manager | |
| self.document_processor = document_processor | |
| # Initialize session state for viewer | |
| if 'current_document' not in st.session_state: | |
| st.session_state.current_document = None | |
| if 'current_page' not in st.session_state: | |
| st.session_state.current_page = 1 | |
| if 'chunk_view' not in st.session_state: | |
| st.session_state.chunk_view = False | |
| def render(self, case_id: str): | |
| """Render enhanced document viewer interface.""" | |
| case = self.case_manager.get_case(case_id) | |
| if not case: | |
| st.error("Case not found") | |
| return | |
| # Add custom CSS for document viewer | |
| self._add_viewer_styles() | |
| # Document selection and viewing area | |
| col1, col2 = st.columns([1, 3]) | |
| with col1: | |
| self._render_document_list(case) | |
| with col2: | |
| if st.session_state.current_document: | |
| self._render_document_viewer(st.session_state.current_document) | |
| def _add_viewer_styles(self): | |
| """Add custom CSS styles for document viewer.""" | |
| st.markdown(""" | |
| <style> | |
| .document-list { | |
| border-right: 1px solid #ddd; | |
| padding-right: 1rem; | |
| height: 100%; | |
| } | |
| .document-viewer { | |
| padding: 1rem; | |
| background: white; | |
| border-radius: 5px; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
| } | |
| .chunk-card { | |
| background: #f8f9fa; | |
| border-left: 3px solid #2196F3; | |
| padding: 1rem; | |
| margin: 0.5rem 0; | |
| border-radius: 0 5px 5px 0; | |
| } | |
| .document-controls { | |
| display: flex; | |
| justify-content: space-between; | |
| align-items: center; | |
| padding: 1rem; | |
| background: #f8f9fa; | |
| border-radius: 5px; | |
| margin-bottom: 1rem; | |
| } | |
| .page-viewer { | |
| text-align: center; | |
| padding: 1rem; | |
| background: #fff; | |
| border: 1px solid #ddd; | |
| border-radius: 5px; | |
| } | |
| .metadata-section { | |
| background: #f8f9fa; | |
| padding: 1rem; | |
| border-radius: 5px; | |
| margin-top: 1rem; | |
| } | |
| .highlight { | |
| background-color: #fff3cd; | |
| padding: 0.2rem; | |
| border-radius: 3px; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| def _render_document_list(self, case: Dict): | |
| """Render enhanced document list with filtering and sorting.""" | |
| st.markdown("### ๐ Documents") | |
| # Search and filter options | |
| search = st.text_input("๐ Search documents", key="doc_search") | |
| sort_by = st.selectbox( | |
| "Sort by", | |
| ["Recent first", "Name A-Z", "Name Z-A", "Size", "Type"], | |
| key="doc_sort" | |
| ) | |
| # Get and sort documents | |
| docs = case['documents'] | |
| if search: | |
| docs = [d for d in docs if search.lower() in d['title'].lower()] | |
| if sort_by == "Recent first": | |
| docs.sort(key=lambda x: x['added_at'], reverse=True) | |
| elif sort_by == "Name A-Z": | |
| docs.sort(key=lambda x: x['title']) | |
| elif sort_by == "Name Z-A": | |
| docs.sort(key=lambda x: x['title'], reverse=True) | |
| # Display document list | |
| for doc in docs: | |
| doc_container = st.container() | |
| with doc_container: | |
| if st.button( | |
| f"๐ {doc['title'][:30]}...", | |
| key=f"select_{doc['id']}", | |
| help=f"Click to view {doc['title']}" | |
| ): | |
| st.session_state.current_document = doc | |
| st.session_state.current_page = 1 | |
| st.session_state.chunk_view = False | |
| def _render_document_viewer(self, document: Dict): | |
| """Render enhanced document viewer with multiple view modes.""" | |
| st.markdown(f"### ๐ {document['title']}") | |
| # Document controls | |
| col1, col2, col3 = st.columns([2, 1, 1]) | |
| with col1: | |
| view_mode = st.radio( | |
| "View Mode", | |
| ["Document", "Chunks", "Analysis"], | |
| horizontal=True, | |
| key="view_mode" | |
| ) | |
| with col2: | |
| if st.button("Download", key="download_doc"): | |
| self._handle_download(document) | |
| with col3: | |
| if st.button("Process Again", key="reprocess_doc"): | |
| self._reprocess_document(document) | |
| # Render selected view mode | |
| if view_mode == "Document": | |
| self._render_document_content(document) | |
| elif view_mode == "Chunks": | |
| self._render_chunk_view(document) | |
| else: | |
| self._render_analysis_view(document) | |
| # Document metadata | |
| with st.expander("๐ Document Metadata", expanded=False): | |
| self._render_metadata(document) | |
| def _render_document_content(self, document: Dict): | |
| """Render document content with page navigation for PDFs.""" | |
| content = document.get('content', '') | |
| file_type = document.get('metadata', {}).get('file_type', '').lower() | |
| if file_type == 'pdf': | |
| self._render_pdf_viewer(document) | |
| elif file_type in ['png', 'jpg', 'jpeg']: | |
| self._render_image_viewer(document) | |
| else: | |
| st.text_area( | |
| "Content", | |
| value=content[:10000] + ("..." if len(content) > 10000 else ""), | |
| height=400, | |
| disabled=True | |
| ) | |
| def _render_pdf_viewer(self, document: Dict): | |
| """Render PDF viewer with navigation controls.""" | |
| pdf_path = document.get('file_path') | |
| if not pdf_path: | |
| st.error("PDF file path not found") | |
| return | |
| try: | |
| reader = pypdf.PdfReader(pdf_path) | |
| num_pages = len(reader.pages) | |
| # Navigation controls | |
| col1, col2, col3 = st.columns([1, 2, 1]) | |
| with col1: | |
| if st.button("โ๏ธ Previous") and st.session_state.current_page > 1: | |
| st.session_state.current_page -= 1 | |
| with col2: | |
| st.session_state.current_page = st.select_slider( | |
| "Page", | |
| range(1, num_pages + 1), | |
| value=st.session_state.current_page | |
| ) | |
| with col3: | |
| if st.button("Next โถ๏ธ") and st.session_state.current_page < num_pages: | |
| st.session_state.current_page += 1 | |
| # Display current page | |
| page = reader.pages[st.session_state.current_page - 1] | |
| text = page.extract_text() | |
| st.text_area("Page Content", value=text, height=400, disabled=True) | |
| except Exception as e: | |
| st.error(f"Error displaying PDF: {str(e)}") | |
| def _render_chunk_view(self, document: Dict): | |
| """Render document chunks with enhanced visualization.""" | |
| chunks = document.get('chunks', []) | |
| if not chunks: | |
| st.warning("No chunks available. Try reprocessing the document.") | |
| return | |
| # Chunk navigation and filtering | |
| search = st.text_input("๐ Search in chunks", key="chunk_search") | |
| # Display chunks | |
| for idx, chunk in enumerate(chunks): | |
| if not search or search.lower() in chunk['text'].lower(): | |
| with st.container(): | |
| st.markdown(f""" | |
| <div class="chunk-card"> | |
| <strong>Chunk {idx + 1}</strong> | |
| <p>{self._highlight_text(chunk['text'], search) if search else chunk['text']}</p> | |
| <div style="font-size: 0.8em; color: #666;"> | |
| Characters: {len(chunk['text'])} | | |
| Location: {chunk.get('start_idx', 'N/A')} - {chunk.get('end_idx', 'N/A')} | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| def _render_analysis_view(self, document: Dict): | |
| """Render document analysis with insights.""" | |
| metadata = document.get('metadata', {}) | |
| # Document statistics | |
| st.markdown("### ๐ Document Analysis") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.metric("Total Pages", metadata.get('page_count', 'N/A')) | |
| st.metric("Word Count", metadata.get('word_count', 'N/A')) | |
| with col2: | |
| st.metric("Chunks", len(document.get('chunks', []))) | |
| st.metric("File Size", self._format_file_size(metadata.get('file_size', 0))) | |
| # Entity extraction | |
| if 'entities' in metadata: | |
| st.markdown("### ๐ท๏ธ Named Entities") | |
| entities_df = pd.DataFrame(metadata['entities']) | |
| st.dataframe(entities_df) | |
| # Key phrases | |
| if 'key_phrases' in metadata: | |
| st.markdown("### ๐ Key Phrases") | |
| st.write(", ".join(metadata['key_phrases'])) | |
| def _render_metadata(self, document: Dict): | |
| """Render detailed document metadata.""" | |
| metadata = document.get('metadata', {}) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("**Basic Information**") | |
| st.write(f"Type: {metadata.get('file_type', 'Unknown')}") | |
| st.write(f"Added: {document.get('added_at', 'Unknown')}") | |
| st.write(f"Size: {self._format_file_size(metadata.get('file_size', 0))}") | |
| with col2: | |
| st.markdown("**Processing Information**") | |
| st.write(f"Chunks: {len(document.get('chunks', []))}") | |
| st.write(f"Processing Status: {metadata.get('processing_status', 'Unknown')}") | |
| st.write(f"Last Updated: {metadata.get('updated_at', 'Unknown')}") | |
| def _highlight_text(self, text: str, search: str) -> str: | |
| """Highlight search terms in text.""" | |
| if not search: | |
| return text | |
| return text.replace( | |
| search, | |
| f'<span class="highlight">{search}</span>' | |
| ) | |
| def _format_file_size(self, size_bytes: int) -> str: | |
| """Format file size in human-readable format.""" | |
| for unit in ['B', 'KB', 'MB', 'GB']: | |
| if size_bytes < 1024: | |
| return f"{size_bytes:.1f} {unit}" | |
| size_bytes /= 1024 | |
| return f"{size_bytes:.1f} TB" | |
| def _handle_download(self, document: Dict): | |
| """Handle document download with progress bar.""" | |
| try: | |
| file_path = document.get('file_path') | |
| if file_path and Path(file_path).exists(): | |
| with open(file_path, 'rb') as f: | |
| data = f.read() | |
| st.download_button( | |
| "Download File", | |
| data, | |
| file_name=document['title'], | |
| mime=document.get('metadata', {}).get('mime_type', 'application/octet-stream') | |
| ) | |
| else: | |
| st.error("File not found") | |
| except Exception as e: | |
| st.error(f"Error downloading file: {str(e)}") | |
| def _reprocess_document(self, document: Dict): | |
| """Reprocess document to update chunks and metadata.""" | |
| try: | |
| with st.spinner("Reprocessing document..."): | |
| # Get file path | |
| file_path = document.get('file_path') | |
| if not file_path: | |
| st.error("File not found") | |
| return | |
| # Reprocess document | |
| with open(file_path, 'rb') as f: | |
| text, chunks, metadata = self.document_processor.process_and_tag_document(f) | |
| # Update document in case manager | |
| document.update({ | |
| 'content': text, | |
| 'chunks': chunks, | |
| 'metadata': {**document.get('metadata', {}), **metadata}, | |
| 'updated_at': datetime.now().isoformat() | |
| }) | |
| self.case_manager.update_document(document['id'], document) | |
| st.success("Document reprocessed successfully") | |
| st.experimental_rerun() | |
| except Exception as e: | |
| st.error(f"Error reprocessing document: {str(e)}") | |
| def render_chunk_references(self, references: List[Dict]): | |
| """Render referenced chunks with enhanced visualization.""" | |
| st.markdown("### ๐ Referenced Chunks") | |
| for ref in references: | |
| with st.container(): | |
| st.markdown(f""" | |
| <div class="chunk-card"> | |
| <strong>{ref['title']}</strong> (Chunk {ref['chunk_id']}) | |
| <p>{ref['snippet']}</p> | |
| <div style="font-size: 0.8em; color: #666;"> | |
| Page: {ref.get('page', 'N/A')} | | |
| Confidence: {ref.get('confidence', 'N/A')} | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) |