Legal_AI_Agent / utils /case_manager.py
cryogenic22's picture
Update utils/case_manager.py
1fdfeb1 verified
import os
import json
from datetime import datetime
from typing import List, Dict, Optional
from pathlib import Path
import shutil
class CaseManager:
def __init__(self, base_path: str = "data/cases"):
"""Initialize CaseManager with a base directory to store cases."""
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
self.cases = {}
self._load_cases()
def delete_case(self, case_id: str) -> bool:
"""Delete a case and all its associated data."""
try:
if case_id not in self.cases:
raise ValueError(f"Case with ID {case_id} not found")
case_path = self.base_path / case_id
# Delete all documents in the case directory
if case_path.exists():
shutil.rmtree(case_path)
# Remove from cases dictionary
del self.cases[case_id]
return True
except Exception as e:
raise Exception(f"Error deleting case: {str(e)}")
def _load_cases(self):
"""Load existing cases from storage."""
try:
for case_dir in self.base_path.iterdir():
if case_dir.is_dir():
metadata_file = case_dir / 'metadata.json'
if metadata_file.exists():
with open(metadata_file, 'r') as f:
case_data = json.load(f)
self.cases[case_dir.name] = case_data
except Exception as e:
print(f"Error loading cases: {e}")
self.cases = {}
def create_case(self, title: str, description: str, case_type: str) -> str:
"""Create a new case and save it to storage."""
case_id = datetime.now().strftime('%Y%m%d_%H%M%S')
case_path = self.base_path / case_id
# Create case directory structure
case_path.mkdir(exist_ok=True)
(case_path / 'documents').mkdir(exist_ok=True)
# Prepare case data
case_data = {
'id': case_id,
'title': title,
'description': description,
'case_type': case_type,
'created_at': datetime.now().isoformat(),
'updated_at': datetime.now().isoformat(),
'status': 'active',
'documents': []
}
# Save case metadata
with open(case_path / 'metadata.json', 'w') as f:
json.dump(case_data, f, indent=2)
self.cases[case_id] = case_data
return case_id
def get_all_cases(self) -> List[Dict]:
"""Get a list of all cases."""
return list(self.cases.values())
def get_case(self, case_id: str) -> Optional[Dict]:
"""Get details of a specific case."""
return self.cases.get(case_id)
def add_document(self, case_id: str, document_data: Dict):
"""Add a document to a case."""
if case_id not in self.cases:
raise ValueError(f"Case with ID {case_id} not found.")
# Update document metadata
document_data['id'] = document_data.get('id', datetime.now().strftime('%Y%m%d_%H%M%S'))
document_data['added_at'] = document_data.get('added_at', datetime.now().isoformat())
# Add document to case
case = self.cases[case_id]
case['documents'].append(document_data)
case['updated_at'] = datetime.now().isoformat()
# Save updated case metadata
with open(self.base_path / case_id / 'metadata.json', 'w') as f:
json.dump(case, f, indent=2)
def list_documents(self, case_id: str) -> List[Dict]:
"""List all documents in a case."""
if case_id not in self.cases:
raise ValueError(f"Case with ID {case_id} not found.")
return self.cases[case_id].get('documents', [])
def remove_document(self, case_id: str, document_id: str) -> bool:
"""Remove a document from a case."""
if case_id not in self.cases:
return False
case = self.cases[case_id]
document = next((doc for doc in case['documents'] if doc['id'] == document_id), None)
if not document:
return False
# Remove from case metadata
case['documents'] = [doc for doc in case['documents'] if doc['id'] != document_id]
case['updated_at'] = datetime.now().isoformat()
# Save updated case metadata
with open(self.base_path / case_id / 'metadata.json', 'w') as f:
json.dump(case, f, indent=2)
# Remove the physical file
try:
doc_path = Path(document['file_path'])
if doc_path.exists():
doc_path.unlink()
return True
except Exception as e:
print(f"Error removing document file: {e}")
return False