Legal_AI_Agent / utils /case_manager_interface.py
cryogenic22's picture
Update utils/case_manager_interface.py
4bc807f verified
import streamlit as st
from typing import Dict, List
from datetime import datetime
import os
class CaseManagerInterface:
def __init__(self, case_manager, vector_store, document_processor):
self.case_manager = case_manager
self.vector_store = vector_store
self.document_processor = document_processor
def render(self):
st.title("πŸ“ Case Management")
# Tab based interface for case management
tab1, tab2, tab3 = st.tabs(["Case Overview", "Document Management", "Batch Operations"])
with tab1:
self._render_case_overview()
with tab2:
self._render_document_manager()
with tab3:
self._render_batch_operations()
def _render_case_overview(self):
# Create new case section
st.subheader("Create New Case")
with st.form("create_case_form"):
title = st.text_input("Case Title")
description = st.text_area("Description")
case_type = st.selectbox("Case Type", ["Judgment", "Contract", "MOU", "Will", "Other"])
create_case = st.form_submit_button("Create Case")
if create_case and title:
try:
self.case_manager.create_case(title, description, case_type)
st.success(f"Case '{title}' created successfully!")
st.rerun()
except Exception as e:
st.error(f"Error creating case: {str(e)}")
# List existing cases
st.subheader("Existing Cases")
cases = self.case_manager.get_all_cases()
for case in cases:
with st.container():
col1, col2 = st.columns([4, 1])
with col1:
st.markdown(f"### {case['title']}")
st.markdown(f"**Type:** {case['case_type']}")
st.markdown(f"**Created:** {case['created_at']}")
st.markdown(f"**Description:** {case['description']}")
# Show document count
docs = self.case_manager.list_documents(case['id'])
st.markdown(f"**Documents:** {len(docs)}")
with col2:
if st.button("πŸ—‘οΈ Delete", key=f"del_case_{case['id']}"):
if st.warning(f"Delete {case['title']}? This will remove all associated documents."):
try:
# Delete all documents from vector store
for doc in docs:
self.vector_store.delete_document(doc['id'])
# Delete case
self.case_manager.delete_case(case['id'])
st.success("Case deleted!")
st.rerun()
except Exception as e:
st.error(f"Error: {str(e)}")
st.divider()
def _render_document_manager(self):
"""Render document management interface with file preview."""
# Case selection
cases = self.case_manager.get_all_cases()
if not cases:
st.info("No cases found. Please create a case first.")
return
selected_case = st.selectbox(
"Select Case",
cases,
format_func=lambda x: x['title']
)
if selected_case:
documents = self.case_manager.list_documents(selected_case['id'])
# Document upload section
st.subheader("Upload Documents")
uploaded_files = st.file_uploader(
"Upload one or more documents",
accept_multiple_files=True,
key=f"upload_{selected_case['id']}"
)
if uploaded_files:
for uploaded_file in uploaded_files:
with st.status(f"Processing {uploaded_file.name}..."):
try:
text, chunks, metadata = self.document_processor.process_and_tag_document(uploaded_file)
# Add chunks to vector store
for chunk in chunks:
self.vector_store.add_document(
doc_id=metadata['doc_id'],
text=chunk['text'],
metadata={
**metadata,
'chunk_index': chunk.get('chunk_id', 0),
'total_chunks': len(chunks)
}
)
# Add document to case
doc_data = {
"id": metadata['doc_id'],
"title": uploaded_file.name,
"text": text,
"metadata": metadata,
"chunks": chunks
}
self.case_manager.add_document(selected_case["id"], doc_data)
st.success(f"Added: {uploaded_file.name}")
except Exception as e:
st.error(f"Error processing {uploaded_file.name}: {str(e)}")
# Document list
if documents:
st.subheader("Documents")
# Add tabs for different document views
doc_tab1, doc_tab2 = st.tabs(["List View", "Grid View"])
with doc_tab1:
for doc in documents:
with st.container():
# Create a clickable document title that shows preview
if st.markdown(f"[πŸ“„ {doc['title']}](#{doc['id']})", help="Click to preview"):
with st.expander("Document Preview", expanded=True):
# Add tabs for different preview modes
preview_tab1, preview_tab2, preview_tab3 = st.tabs(["Content", "Metadata", "Chunks"])
with preview_tab1:
st.text_area("Document Content", doc.get('text', 'No content available'),
height=300, key=f"content_{doc['id']}")
with preview_tab2:
st.json(doc.get('metadata', {}))
with preview_tab3:
chunks = doc.get('chunks', [])
for idx, chunk in enumerate(chunks):
st.markdown(f"**Chunk {idx + 1}**")
st.text_area(f"Chunk Content",
chunk.get('text', 'No content available'),
height=100,
key=f"chunk_{doc['id']}_{idx}")
col1, col2, col3 = st.columns([1, 1, 1])
with col1:
if st.button("πŸ“₯ Download", key=f"download_{doc['id']}"):
try:
st.download_button(
"Click to Download",
doc.get('text', ''),
file_name=doc['title'],
mime='text/plain',
key=f"download_button_{doc['id']}"
)
except Exception as e:
st.error(f"Error downloading: {str(e)}")
with col2:
if st.button("πŸ”„ Reprocess", key=f"reprocess_{doc['id']}"):
try:
# Implement reprocessing logic
st.info("Reprocessing document...")
except Exception as e:
st.error(f"Error reprocessing: {str(e)}")
with col3:
if st.button("πŸ—‘οΈ Delete", key=f"delete_{doc['id']}"):
if st.warning("Are you sure you want to delete this document?"):
try:
self.vector_store.delete_document(doc['id'])
self.case_manager.remove_document(selected_case['id'], doc['id'])
st.success(f"Deleted: {doc['title']}")
st.rerun()
except Exception as e:
st.error(f"Error deleting: {str(e)}")
st.divider()
with doc_tab2:
# Grid view of documents
cols = st.columns(3)
for idx, doc in enumerate(documents):
with cols[idx % 3]:
st.markdown(f"### πŸ“„ {doc['title']}")
st.markdown(f"Added: {doc.get('added_at', 'Unknown')}")
st.markdown(f"Size: {len(doc.get('text', ''))}")
if st.button("Preview", key=f"grid_preview_{doc['id']}"):
with st.expander("Document Preview", expanded=True):
st.text_area("Content", doc.get('text', ''), height=200)
else:
st.info("No documents in this case yet.")
def _render_batch_operations(self):
st.subheader("Batch Operations")
# Case selection for batch operations
selected_case = st.selectbox(
"Select Target Case",
self.case_manager.get_all_cases(),
format_func=lambda x: x['title'],
key="batch_case"
)
if selected_case:
# Batch upload
st.markdown("### πŸ“€ Batch Upload")
uploaded_files = st.file_uploader(
"Upload multiple documents",
accept_multiple_files=True,
key="batch_upload"
)
if uploaded_files:
progress_bar = st.progress(0)
status_text = st.empty()
for idx, file in enumerate(uploaded_files):
try:
status_text.text(f"Processing {file.name}...")
text, chunks, metadata = self.document_processor.process_and_tag_document(file)
# Add to vector store and case
for chunk in chunks:
self.vector_store.add_document(
doc_id=metadata['doc_id'],
text=chunk['text'],
metadata=metadata
)
doc_data = {
"id": metadata['doc_id'],
"title": file.name,
"text": text,
"metadata": metadata,
"chunks": chunks
}
self.case_manager.add_document(selected_case["id"], doc_data)
# Update progress
progress = (idx + 1) / len(uploaded_files)
progress_bar.progress(progress)
except Exception as e:
st.error(f"Error processing {file.name}: {str(e)}")
status_text.text("Batch upload complete!")
st.success(f"Processed {len(uploaded_files)} documents")