Spaces:
Build error
Build error
| import os | |
| import json | |
| from datetime import datetime | |
| from typing import List, Dict, Optional | |
| from pathlib import Path | |
| import shutil | |
| class CaseManager: | |
| def __init__(self, base_path: str = "data/cases"): | |
| """Initialize CaseManager with a base directory to store cases.""" | |
| self.base_path = Path(base_path) | |
| self.base_path.mkdir(parents=True, exist_ok=True) | |
| self.cases = {} | |
| self._load_cases() | |
| def delete_case(self, case_id: str) -> bool: | |
| """Delete a case and all its associated data.""" | |
| try: | |
| if case_id not in self.cases: | |
| raise ValueError(f"Case with ID {case_id} not found") | |
| case_path = self.base_path / case_id | |
| # Delete all documents in the case directory | |
| if case_path.exists(): | |
| shutil.rmtree(case_path) | |
| # Remove from cases dictionary | |
| del self.cases[case_id] | |
| return True | |
| except Exception as e: | |
| raise Exception(f"Error deleting case: {str(e)}") | |
| def _load_cases(self): | |
| """Load existing cases from storage.""" | |
| try: | |
| for case_dir in self.base_path.iterdir(): | |
| if case_dir.is_dir(): | |
| metadata_file = case_dir / 'metadata.json' | |
| if metadata_file.exists(): | |
| with open(metadata_file, 'r') as f: | |
| case_data = json.load(f) | |
| self.cases[case_dir.name] = case_data | |
| except Exception as e: | |
| print(f"Error loading cases: {e}") | |
| self.cases = {} | |
| def create_case(self, title: str, description: str, case_type: str) -> str: | |
| """Create a new case and save it to storage.""" | |
| case_id = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| case_path = self.base_path / case_id | |
| # Create case directory structure | |
| case_path.mkdir(exist_ok=True) | |
| (case_path / 'documents').mkdir(exist_ok=True) | |
| # Prepare case data | |
| case_data = { | |
| 'id': case_id, | |
| 'title': title, | |
| 'description': description, | |
| 'case_type': case_type, | |
| 'created_at': datetime.now().isoformat(), | |
| 'updated_at': datetime.now().isoformat(), | |
| 'status': 'active', | |
| 'documents': [] | |
| } | |
| # Save case metadata | |
| with open(case_path / 'metadata.json', 'w') as f: | |
| json.dump(case_data, f, indent=2) | |
| self.cases[case_id] = case_data | |
| return case_id | |
| def get_all_cases(self) -> List[Dict]: | |
| """Get a list of all cases.""" | |
| return list(self.cases.values()) | |
| def get_case(self, case_id: str) -> Optional[Dict]: | |
| """Get details of a specific case.""" | |
| return self.cases.get(case_id) | |
| def add_document(self, case_id: str, document_data: Dict): | |
| """Add a document to a case.""" | |
| if case_id not in self.cases: | |
| raise ValueError(f"Case with ID {case_id} not found.") | |
| # Update document metadata | |
| document_data['id'] = document_data.get('id', datetime.now().strftime('%Y%m%d_%H%M%S')) | |
| document_data['added_at'] = document_data.get('added_at', datetime.now().isoformat()) | |
| # Add document to case | |
| case = self.cases[case_id] | |
| case['documents'].append(document_data) | |
| case['updated_at'] = datetime.now().isoformat() | |
| # Save updated case metadata | |
| with open(self.base_path / case_id / 'metadata.json', 'w') as f: | |
| json.dump(case, f, indent=2) | |
| def list_documents(self, case_id: str) -> List[Dict]: | |
| """List all documents in a case.""" | |
| if case_id not in self.cases: | |
| raise ValueError(f"Case with ID {case_id} not found.") | |
| return self.cases[case_id].get('documents', []) | |
| def remove_document(self, case_id: str, document_id: str) -> bool: | |
| """Remove a document from a case.""" | |
| if case_id not in self.cases: | |
| return False | |
| case = self.cases[case_id] | |
| document = next((doc for doc in case['documents'] if doc['id'] == document_id), None) | |
| if not document: | |
| return False | |
| # Remove from case metadata | |
| case['documents'] = [doc for doc in case['documents'] if doc['id'] != document_id] | |
| case['updated_at'] = datetime.now().isoformat() | |
| # Save updated case metadata | |
| with open(self.base_path / case_id / 'metadata.json', 'w') as f: | |
| json.dump(case, f, indent=2) | |
| # Remove the physical file | |
| try: | |
| doc_path = Path(document['file_path']) | |
| if doc_path.exists(): | |
| doc_path.unlink() | |
| return True | |
| except Exception as e: | |
| print(f"Error removing document file: {e}") | |
| return False | |