import streamlit as st from typing import Dict, List from datetime import datetime import os class CaseManagerInterface: def __init__(self, case_manager, vector_store, document_processor): self.case_manager = case_manager self.vector_store = vector_store self.document_processor = document_processor def render(self): st.title("📁 Case Management") # Tab based interface for case management tab1, tab2, tab3 = st.tabs(["Case Overview", "Document Management", "Batch Operations"]) with tab1: self._render_case_overview() with tab2: self._render_document_manager() with tab3: self._render_batch_operations() def _render_case_overview(self): # Create new case section st.subheader("Create New Case") with st.form("create_case_form"): title = st.text_input("Case Title") description = st.text_area("Description") case_type = st.selectbox("Case Type", ["Judgment", "Contract", "MOU", "Will", "Other"]) create_case = st.form_submit_button("Create Case") if create_case and title: try: self.case_manager.create_case(title, description, case_type) st.success(f"Case '{title}' created successfully!") st.rerun() except Exception as e: st.error(f"Error creating case: {str(e)}") # List existing cases st.subheader("Existing Cases") cases = self.case_manager.get_all_cases() for case in cases: with st.container(): col1, col2 = st.columns([4, 1]) with col1: st.markdown(f"### {case['title']}") st.markdown(f"**Type:** {case['case_type']}") st.markdown(f"**Created:** {case['created_at']}") st.markdown(f"**Description:** {case['description']}") # Show document count docs = self.case_manager.list_documents(case['id']) st.markdown(f"**Documents:** {len(docs)}") with col2: if st.button("🗑️ Delete", key=f"del_case_{case['id']}"): if st.warning(f"Delete {case['title']}? This will remove all associated documents."): try: # Delete all documents from vector store for doc in docs: self.vector_store.delete_document(doc['id']) # Delete case self.case_manager.delete_case(case['id']) st.success("Case deleted!") st.rerun() except Exception as e: st.error(f"Error: {str(e)}") st.divider() def _render_document_manager(self): """Render document management interface with file preview.""" # Case selection cases = self.case_manager.get_all_cases() if not cases: st.info("No cases found. Please create a case first.") return selected_case = st.selectbox( "Select Case", cases, format_func=lambda x: x['title'] ) if selected_case: documents = self.case_manager.list_documents(selected_case['id']) # Document upload section st.subheader("Upload Documents") uploaded_files = st.file_uploader( "Upload one or more documents", accept_multiple_files=True, key=f"upload_{selected_case['id']}" ) if uploaded_files: for uploaded_file in uploaded_files: with st.status(f"Processing {uploaded_file.name}..."): try: text, chunks, metadata = self.document_processor.process_and_tag_document(uploaded_file) # Add chunks to vector store for chunk in chunks: self.vector_store.add_document( doc_id=metadata['doc_id'], text=chunk['text'], metadata={ **metadata, 'chunk_index': chunk.get('chunk_id', 0), 'total_chunks': len(chunks) } ) # Add document to case doc_data = { "id": metadata['doc_id'], "title": uploaded_file.name, "text": text, "metadata": metadata, "chunks": chunks } self.case_manager.add_document(selected_case["id"], doc_data) st.success(f"Added: {uploaded_file.name}") except Exception as e: st.error(f"Error processing {uploaded_file.name}: {str(e)}") # Document list if documents: st.subheader("Documents") # Add tabs for different document views doc_tab1, doc_tab2 = st.tabs(["List View", "Grid View"]) with doc_tab1: for doc in documents: with st.container(): # Create a clickable document title that shows preview if st.markdown(f"[📄 {doc['title']}](#{doc['id']})", help="Click to preview"): with st.expander("Document Preview", expanded=True): # Add tabs for different preview modes preview_tab1, preview_tab2, preview_tab3 = st.tabs(["Content", "Metadata", "Chunks"]) with preview_tab1: st.text_area("Document Content", doc.get('text', 'No content available'), height=300, key=f"content_{doc['id']}") with preview_tab2: st.json(doc.get('metadata', {})) with preview_tab3: chunks = doc.get('chunks', []) for idx, chunk in enumerate(chunks): st.markdown(f"**Chunk {idx + 1}**") st.text_area(f"Chunk Content", chunk.get('text', 'No content available'), height=100, key=f"chunk_{doc['id']}_{idx}") col1, col2, col3 = st.columns([1, 1, 1]) with col1: if st.button("📥 Download", key=f"download_{doc['id']}"): try: st.download_button( "Click to Download", doc.get('text', ''), file_name=doc['title'], mime='text/plain', key=f"download_button_{doc['id']}" ) except Exception as e: st.error(f"Error downloading: {str(e)}") with col2: if st.button("🔄 Reprocess", key=f"reprocess_{doc['id']}"): try: # Implement reprocessing logic st.info("Reprocessing document...") except Exception as e: st.error(f"Error reprocessing: {str(e)}") with col3: if st.button("🗑️ Delete", key=f"delete_{doc['id']}"): if st.warning("Are you sure you want to delete this document?"): try: self.vector_store.delete_document(doc['id']) self.case_manager.remove_document(selected_case['id'], doc['id']) st.success(f"Deleted: {doc['title']}") st.rerun() except Exception as e: st.error(f"Error deleting: {str(e)}") st.divider() with doc_tab2: # Grid view of documents cols = st.columns(3) for idx, doc in enumerate(documents): with cols[idx % 3]: st.markdown(f"### 📄 {doc['title']}") st.markdown(f"Added: {doc.get('added_at', 'Unknown')}") st.markdown(f"Size: {len(doc.get('text', ''))}") if st.button("Preview", key=f"grid_preview_{doc['id']}"): with st.expander("Document Preview", expanded=True): st.text_area("Content", doc.get('text', ''), height=200) else: st.info("No documents in this case yet.") def _render_batch_operations(self): st.subheader("Batch Operations") # Case selection for batch operations selected_case = st.selectbox( "Select Target Case", self.case_manager.get_all_cases(), format_func=lambda x: x['title'], key="batch_case" ) if selected_case: # Batch upload st.markdown("### 📤 Batch Upload") uploaded_files = st.file_uploader( "Upload multiple documents", accept_multiple_files=True, key="batch_upload" ) if uploaded_files: progress_bar = st.progress(0) status_text = st.empty() for idx, file in enumerate(uploaded_files): try: status_text.text(f"Processing {file.name}...") text, chunks, metadata = self.document_processor.process_and_tag_document(file) # Add to vector store and case for chunk in chunks: self.vector_store.add_document( doc_id=metadata['doc_id'], text=chunk['text'], metadata=metadata ) doc_data = { "id": metadata['doc_id'], "title": file.name, "text": text, "metadata": metadata, "chunks": chunks } self.case_manager.add_document(selected_case["id"], doc_data) # Update progress progress = (idx + 1) / len(uploaded_files) progress_bar.progress(progress) except Exception as e: st.error(f"Error processing {file.name}: {str(e)}") status_text.text("Batch upload complete!") st.success(f"Processed {len(uploaded_files)} documents")