Spaces:
Build error
Build error
Update utils/case_manager.py
Browse files- utils/case_manager.py +23 -23
utils/case_manager.py
CHANGED
|
@@ -5,6 +5,7 @@ from typing import List, Dict, Optional
|
|
| 5 |
from pathlib import Path
|
| 6 |
import shutil
|
| 7 |
|
|
|
|
| 8 |
class CaseManager:
|
| 9 |
def __init__(self, base_path: str = "data/cases"):
|
| 10 |
"""Initialize CaseManager with a base directory to store cases."""
|
|
@@ -31,11 +32,11 @@ class CaseManager:
|
|
| 31 |
"""Create a new case and save it to storage."""
|
| 32 |
case_id = datetime.now().strftime('%Y%m%d_%H%M%S')
|
| 33 |
case_path = self.base_path / case_id
|
| 34 |
-
|
| 35 |
# Create case directory structure
|
| 36 |
case_path.mkdir(exist_ok=True)
|
| 37 |
(case_path / 'documents').mkdir(exist_ok=True)
|
| 38 |
-
|
| 39 |
# Prepare case data
|
| 40 |
case_data = {
|
| 41 |
'id': case_id,
|
|
@@ -47,11 +48,11 @@ class CaseManager:
|
|
| 47 |
'status': 'active',
|
| 48 |
'documents': []
|
| 49 |
}
|
| 50 |
-
|
| 51 |
# Save case metadata
|
| 52 |
with open(case_path / 'metadata.json', 'w') as f:
|
| 53 |
json.dump(case_data, f, indent=2)
|
| 54 |
-
|
| 55 |
self.cases[case_id] = case_data
|
| 56 |
return case_id
|
| 57 |
|
|
@@ -64,23 +65,22 @@ class CaseManager:
|
|
| 64 |
return self.cases.get(case_id)
|
| 65 |
|
| 66 |
def add_document(self, case_id: str, document_data: Dict):
|
| 67 |
-
|
| 68 |
-
|
| 69 |
-
|
| 70 |
-
|
| 71 |
-
# Update document metadata
|
| 72 |
-
document_data['id'] = document_data.get('id', datetime.now().strftime('%Y%m%d_%H%M%S'))
|
| 73 |
-
document_data['added_at'] = document_data.get('added_at', datetime.now().isoformat())
|
| 74 |
|
| 75 |
-
|
| 76 |
-
|
| 77 |
-
|
| 78 |
-
case['updated_at'] = datetime.now().isoformat()
|
| 79 |
|
| 80 |
-
|
| 81 |
-
|
| 82 |
-
|
|
|
|
| 83 |
|
|
|
|
|
|
|
|
|
|
| 84 |
|
| 85 |
def list_documents(self, case_id: str) -> List[Dict]:
|
| 86 |
"""List all documents in a case."""
|
|
@@ -92,21 +92,21 @@ class CaseManager:
|
|
| 92 |
"""Remove a document from a case."""
|
| 93 |
if case_id not in self.cases:
|
| 94 |
return False
|
| 95 |
-
|
| 96 |
case = self.cases[case_id]
|
| 97 |
document = next((doc for doc in case['documents'] if doc['id'] == document_id), None)
|
| 98 |
-
|
| 99 |
if not document:
|
| 100 |
return False
|
| 101 |
-
|
| 102 |
# Remove from case metadata
|
| 103 |
case['documents'] = [doc for doc in case['documents'] if doc['id'] != document_id]
|
| 104 |
case['updated_at'] = datetime.now().isoformat()
|
| 105 |
-
|
| 106 |
# Save updated case metadata
|
| 107 |
with open(self.base_path / case_id / 'metadata.json', 'w') as f:
|
| 108 |
json.dump(case, f, indent=2)
|
| 109 |
-
|
| 110 |
# Remove the physical file
|
| 111 |
try:
|
| 112 |
doc_path = Path(document['file_path'])
|
|
|
|
| 5 |
from pathlib import Path
|
| 6 |
import shutil
|
| 7 |
|
| 8 |
+
|
| 9 |
class CaseManager:
|
| 10 |
def __init__(self, base_path: str = "data/cases"):
|
| 11 |
"""Initialize CaseManager with a base directory to store cases."""
|
|
|
|
| 32 |
"""Create a new case and save it to storage."""
|
| 33 |
case_id = datetime.now().strftime('%Y%m%d_%H%M%S')
|
| 34 |
case_path = self.base_path / case_id
|
| 35 |
+
|
| 36 |
# Create case directory structure
|
| 37 |
case_path.mkdir(exist_ok=True)
|
| 38 |
(case_path / 'documents').mkdir(exist_ok=True)
|
| 39 |
+
|
| 40 |
# Prepare case data
|
| 41 |
case_data = {
|
| 42 |
'id': case_id,
|
|
|
|
| 48 |
'status': 'active',
|
| 49 |
'documents': []
|
| 50 |
}
|
| 51 |
+
|
| 52 |
# Save case metadata
|
| 53 |
with open(case_path / 'metadata.json', 'w') as f:
|
| 54 |
json.dump(case_data, f, indent=2)
|
| 55 |
+
|
| 56 |
self.cases[case_id] = case_data
|
| 57 |
return case_id
|
| 58 |
|
|
|
|
| 65 |
return self.cases.get(case_id)
|
| 66 |
|
| 67 |
def add_document(self, case_id: str, document_data: Dict):
|
| 68 |
+
"""Add a document to a case."""
|
| 69 |
+
if case_id not in self.cases:
|
| 70 |
+
raise ValueError(f"Case with ID {case_id} not found.")
|
|
|
|
|
|
|
|
|
|
|
|
|
| 71 |
|
| 72 |
+
# Update document metadata
|
| 73 |
+
document_data['id'] = document_data.get('id', datetime.now().strftime('%Y%m%d_%H%M%S'))
|
| 74 |
+
document_data['added_at'] = document_data.get('added_at', datetime.now().isoformat())
|
|
|
|
| 75 |
|
| 76 |
+
# Add document to case
|
| 77 |
+
case = self.cases[case_id]
|
| 78 |
+
case['documents'].append(document_data)
|
| 79 |
+
case['updated_at'] = datetime.now().isoformat()
|
| 80 |
|
| 81 |
+
# Save updated case metadata
|
| 82 |
+
with open(self.base_path / case_id / 'metadata.json', 'w') as f:
|
| 83 |
+
json.dump(case, f, indent=2)
|
| 84 |
|
| 85 |
def list_documents(self, case_id: str) -> List[Dict]:
|
| 86 |
"""List all documents in a case."""
|
|
|
|
| 92 |
"""Remove a document from a case."""
|
| 93 |
if case_id not in self.cases:
|
| 94 |
return False
|
| 95 |
+
|
| 96 |
case = self.cases[case_id]
|
| 97 |
document = next((doc for doc in case['documents'] if doc['id'] == document_id), None)
|
| 98 |
+
|
| 99 |
if not document:
|
| 100 |
return False
|
| 101 |
+
|
| 102 |
# Remove from case metadata
|
| 103 |
case['documents'] = [doc for doc in case['documents'] if doc['id'] != document_id]
|
| 104 |
case['updated_at'] = datetime.now().isoformat()
|
| 105 |
+
|
| 106 |
# Save updated case metadata
|
| 107 |
with open(self.base_path / case_id / 'metadata.json', 'w') as f:
|
| 108 |
json.dump(case, f, indent=2)
|
| 109 |
+
|
| 110 |
# Remove the physical file
|
| 111 |
try:
|
| 112 |
doc_path = Path(document['file_path'])
|