import os import json from datetime import datetime from typing import List, Dict, Optional from pathlib import Path import shutil class CaseManager: def __init__(self, base_path: str = "data/cases"): """Initialize CaseManager with a base directory to store cases.""" self.base_path = Path(base_path) self.base_path.mkdir(parents=True, exist_ok=True) self.cases = {} self._load_cases() def delete_case(self, case_id: str) -> bool: """Delete a case and all its associated data.""" try: if case_id not in self.cases: raise ValueError(f"Case with ID {case_id} not found") case_path = self.base_path / case_id # Delete all documents in the case directory if case_path.exists(): shutil.rmtree(case_path) # Remove from cases dictionary del self.cases[case_id] return True except Exception as e: raise Exception(f"Error deleting case: {str(e)}") def _load_cases(self): """Load existing cases from storage.""" try: for case_dir in self.base_path.iterdir(): if case_dir.is_dir(): metadata_file = case_dir / 'metadata.json' if metadata_file.exists(): with open(metadata_file, 'r') as f: case_data = json.load(f) self.cases[case_dir.name] = case_data except Exception as e: print(f"Error loading cases: {e}") self.cases = {} def create_case(self, title: str, description: str, case_type: str) -> str: """Create a new case and save it to storage.""" case_id = datetime.now().strftime('%Y%m%d_%H%M%S') case_path = self.base_path / case_id # Create case directory structure case_path.mkdir(exist_ok=True) (case_path / 'documents').mkdir(exist_ok=True) # Prepare case data case_data = { 'id': case_id, 'title': title, 'description': description, 'case_type': case_type, 'created_at': datetime.now().isoformat(), 'updated_at': datetime.now().isoformat(), 'status': 'active', 'documents': [] } # Save case metadata with open(case_path / 'metadata.json', 'w') as f: json.dump(case_data, f, indent=2) self.cases[case_id] = case_data return case_id def get_all_cases(self) -> List[Dict]: """Get a list of all cases.""" return list(self.cases.values()) def get_case(self, case_id: str) -> Optional[Dict]: """Get details of a specific case.""" return self.cases.get(case_id) def add_document(self, case_id: str, document_data: Dict): """Add a document to a case.""" if case_id not in self.cases: raise ValueError(f"Case with ID {case_id} not found.") # Update document metadata document_data['id'] = document_data.get('id', datetime.now().strftime('%Y%m%d_%H%M%S')) document_data['added_at'] = document_data.get('added_at', datetime.now().isoformat()) # Add document to case case = self.cases[case_id] case['documents'].append(document_data) case['updated_at'] = datetime.now().isoformat() # Save updated case metadata with open(self.base_path / case_id / 'metadata.json', 'w') as f: json.dump(case, f, indent=2) def list_documents(self, case_id: str) -> List[Dict]: """List all documents in a case.""" if case_id not in self.cases: raise ValueError(f"Case with ID {case_id} not found.") return self.cases[case_id].get('documents', []) def remove_document(self, case_id: str, document_id: str) -> bool: """Remove a document from a case.""" if case_id not in self.cases: return False case = self.cases[case_id] document = next((doc for doc in case['documents'] if doc['id'] == document_id), None) if not document: return False # Remove from case metadata case['documents'] = [doc for doc in case['documents'] if doc['id'] != document_id] case['updated_at'] = datetime.now().isoformat() # Save updated case metadata with open(self.base_path / case_id / 'metadata.json', 'w') as f: json.dump(case, f, indent=2) # Remove the physical file try: doc_path = Path(document['file_path']) if doc_path.exists(): doc_path.unlink() return True except Exception as e: print(f"Error removing document file: {e}") return False