cryogenic22 commited on
Commit
90451e9
·
verified ·
1 Parent(s): dc3b7e9

Update utils/case_manager.py

Browse files
Files changed (1) hide show
  1. utils/case_manager.py +115 -318
utils/case_manager.py CHANGED
@@ -1,392 +1,189 @@
1
  import os
2
  import json
3
- import shutil
4
  from datetime import datetime
5
- from typing import List, Dict, Optional, Union
6
  from pathlib import Path
7
- import uuid
8
 
9
  class CaseManager:
10
  def __init__(self, base_path: str = "data/cases"):
11
- """Initialize CaseManager with enhanced storage and indexing."""
12
  self.base_path = Path(base_path)
13
  self.base_path.mkdir(parents=True, exist_ok=True)
14
-
15
- # Initialize indexes
16
- self.case_index_path = self.base_path / "case_index.json"
17
- self.document_index_path = self.base_path / "document_index.json"
18
- self.cases = {}
19
- self.document_index = {}
20
-
21
- self._load_indexes()
22
- self._verify_integrity()
23
-
24
- def _load_indexes(self):
25
- """Load case and document indexes with error handling."""
26
- try:
27
- if self.case_index_path.exists():
28
- with open(self.case_index_path, 'r') as f:
29
- self.cases = json.load(f)
30
-
31
- if self.document_index_path.exists():
32
- with open(self.document_index_path, 'r') as f:
33
- self.document_index = json.load(f)
34
- except json.JSONDecodeError as e:
35
- print(f"Error loading indexes: {e}")
36
- self._backup_and_reset_indexes()
37
-
38
- def _backup_and_reset_indexes(self):
39
- """Create backup of corrupted indexes and reset."""
40
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
41
-
42
- if self.case_index_path.exists():
43
- backup_path = self.case_index_path.with_suffix(f'.backup_{timestamp}')
44
- shutil.copy2(self.case_index_path, backup_path)
45
-
46
- if self.document_index_path.exists():
47
- backup_path = self.document_index_path.with_suffix(f'.backup_{timestamp}')
48
- shutil.copy2(self.document_index_path, backup_path)
49
-
50
  self.cases = {}
51
- self.document_index = {}
52
- self._save_indexes()
53
-
54
- def _verify_integrity(self):
55
- """Verify and repair case and document integrity."""
56
- for case_id, case in list(self.cases.items()):
57
- case_path = self.base_path / case_id
58
- if not case_path.exists():
59
- print(f"Case directory missing for {case_id}, removing from index")
60
- del self.cases[case_id]
61
- continue
62
-
63
- # Verify and repair document references
64
- valid_docs = []
65
- for doc in case.get('documents', []):
66
- doc_id = doc.get('id')
67
- if doc_id in self.document_index:
68
- valid_docs.append(doc)
69
- else:
70
- print(f"Document {doc_id} missing from index, removing reference")
71
-
72
- case['documents'] = valid_docs
73
-
74
- self._save_indexes()
75
 
76
- def _save_indexes(self):
77
- """Save case and document indexes atomically."""
78
- # Save with temporary files first
79
- temp_case_index = self.case_index_path.with_suffix('.tmp')
80
- temp_doc_index = self.document_index_path.with_suffix('.tmp')
81
-
82
  try:
83
- with open(temp_case_index, 'w') as f:
84
- json.dump(self.cases, f, indent=2)
85
- with open(temp_doc_index, 'w') as f:
86
- json.dump(self.document_index, f, indent=2)
87
-
88
- # Rename temporary files to actual files
89
- if temp_case_index.exists():
90
- temp_case_index.replace(self.case_index_path)
91
- if temp_doc_index.exists():
92
- temp_doc_index.replace(self.document_index_path)
93
  except Exception as e:
94
- print(f"Error saving indexes: {e}")
95
- # Clean up temporary files
96
- temp_case_index.unlink(missing_ok=True)
97
- temp_doc_index.unlink(missing_ok=True)
98
- raise
99
 
100
- def create_case(self, title: str, description: str, case_type: str,
101
- tags: List[str] = None, priority: str = "normal") -> str:
102
- """Create a new case with enhanced metadata."""
103
- case_id = str(uuid.uuid4())
104
  case_path = self.base_path / case_id
 
 
105
  case_path.mkdir(exist_ok=True)
106
  (case_path / 'documents').mkdir(exist_ok=True)
107
 
 
108
  case_data = {
109
  'id': case_id,
110
  'title': title,
111
  'description': description,
112
  'case_type': case_type,
113
- 'status': 'active',
114
- 'priority': priority,
115
- 'tags': tags or [],
116
  'created_at': datetime.now().isoformat(),
117
  'updated_at': datetime.now().isoformat(),
118
- 'documents': [],
119
- 'notes': [],
120
- 'statistics': {
121
- 'document_count': 0,
122
- 'total_pages': 0,
123
- 'last_activity': datetime.now().isoformat()
124
- }
125
  }
126
 
 
 
 
 
127
  self.cases[case_id] = case_data
128
- self._save_indexes()
129
  return case_id
130
 
131
- def add_document(self, case_id: str, document_data: Dict) -> str:
132
- """Add a document with enhanced metadata and validation."""
133
- case = self.cases.get(case_id)
134
- if not case:
135
- raise ValueError(f"Case with ID {case_id} not found.")
136
 
137
- # Generate document ID and enhance metadata
138
- doc_id = str(uuid.uuid4())
139
- document_data.update({
140
- 'id': doc_id,
141
- 'case_id': case_id,
142
- 'added_at': datetime.now().isoformat(),
143
- 'updated_at': datetime.now().isoformat(),
144
- 'status': document_data.get('status', 'active'),
145
- 'version': 1,
146
- 'tags': document_data.get('tags', []),
147
- 'metadata': {
148
- **document_data.get('metadata', {}),
149
- 'file_type': document_data.get('file_type', 'unknown'),
150
- 'page_count': document_data.get('page_count', 0),
151
- 'word_count': document_data.get('word_count', 0)
152
- }
153
- })
154
 
155
- # Update indexes
156
- self.document_index[doc_id] = document_data
157
- case['documents'].append({
158
- 'id': doc_id,
159
- 'title': document_data.get('title', 'Untitled'),
160
- 'added_at': document_data['added_at']
161
- })
162
 
163
- # Update case statistics
164
- case['statistics']['document_count'] += 1
165
- case['statistics']['total_pages'] += document_data['metadata']['page_count']
166
- case['statistics']['last_activity'] = datetime.now().isoformat()
167
  case['updated_at'] = datetime.now().isoformat()
168
 
169
- self._save_indexes()
170
- return doc_id
171
-
172
- def update_document(self, doc_id: str, updates: Dict) -> Dict:
173
- """Update document metadata and content."""
174
- if doc_id not in self.document_index:
175
- raise ValueError(f"Document with ID {doc_id} not found.")
176
-
177
- document = self.document_index[doc_id]
178
- document.update(updates)
179
- document['updated_at'] = datetime.now().isoformat()
180
- document['version'] += 1
181
-
182
- # Update case statistics if needed
183
- case = self.cases.get(document['case_id'])
184
- if case:
185
- case['statistics']['last_activity'] = datetime.now().isoformat()
186
- case['updated_at'] = datetime.now().isoformat()
187
 
188
- self._save_indexes()
189
- return document
190
 
191
- def delete_document(self, doc_id: str) -> bool:
192
- """Delete a document and update case statistics."""
193
- if doc_id not in self.document_index:
194
  return False
195
 
196
- document = self.document_index[doc_id]
197
- case_id = document['case_id']
198
- case = self.cases.get(case_id)
199
-
200
- if case:
201
- # Update case statistics
202
- case['statistics']['document_count'] -= 1
203
- case['statistics']['total_pages'] -= document['metadata']['page_count']
204
- case['statistics']['last_activity'] = datetime.now().isoformat()
205
- case['updated_at'] = datetime.now().isoformat()
206
-
207
- # Remove document reference from case
208
- case['documents'] = [doc for doc in case['documents'] if doc['id'] != doc_id]
209
-
210
- # Delete document from index
211
- del self.document_index[doc_id]
212
-
213
- # Delete document files if they exist
214
- doc_path = self.base_path / case_id / 'documents' / doc_id
215
- if doc_path.exists():
216
- shutil.rmtree(doc_path)
217
-
218
- self._save_indexes()
219
- return True
220
 
221
- def update_case(self, case_id: str, updates: Dict) -> Dict:
222
- """Update case metadata and properties."""
223
  if case_id not in self.cases:
224
  raise ValueError(f"Case with ID {case_id} not found.")
225
 
226
- case = self.cases[case_id]
227
- for key, value in updates.items():
228
- if key not in ['id', 'created_at', 'documents']:
229
- case[key] = value
230
 
 
 
 
231
  case['updated_at'] = datetime.now().isoformat()
232
- self._save_indexes()
233
- return case
 
 
234
 
235
- def delete_case(self, case_id: str) -> bool:
236
- """Delete a case and all associated documents."""
237
  if case_id not in self.cases:
238
  return False
239
 
240
  case = self.cases[case_id]
 
 
241
 
242
- # Delete all associated documents
243
- for doc in case['documents']:
244
- self.delete_document(doc['id'])
245
 
246
- # Delete case directory
247
- case_path = self.base_path / case_id
248
- if case_path.exists():
249
- shutil.rmtree(case_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
250
 
251
- # Remove case from index
252
- del self.cases[case_id]
253
- self._save_indexes()
254
- return True
255
 
256
- def search(self, query: str, filters: Dict = None) -> List[Dict]:
257
- """Enhanced search with filtering and sorting."""
258
  results = []
259
  query = query.lower()
260
 
261
  for case in self.cases.values():
262
- # Apply filters if provided
263
- if filters:
264
- if not self._matches_filters(case, filters):
265
- continue
266
-
267
  # Search in case metadata
268
  if (query in case['title'].lower() or
269
  query in case['description'].lower() or
270
- query in case['case_type'].lower() or
271
- any(query in tag.lower() for tag in case['tags'])):
272
  results.append({
273
  'type': 'case',
274
- 'data': case,
275
- 'relevance': self._calculate_relevance(query, case)
276
  })
277
 
278
  # Search in documents
279
- for doc_ref in case['documents']:
280
- doc = self.document_index.get(doc_ref['id'])
281
- if doc and (query in doc['title'].lower() or
282
- any(query in tag.lower() for tag in doc.get('tags', []))):
283
  results.append({
284
  'type': 'document',
285
- 'data': doc,
286
  'case_id': case['id'],
287
- 'relevance': self._calculate_relevance(query, doc)
288
  })
289
 
290
- # Sort results by relevance
291
- results.sort(key=lambda x: x['relevance'], reverse=True)
292
  return results
293
 
294
- def _matches_filters(self, case: Dict, filters: Dict) -> bool:
295
- """Check if case matches all specified filters."""
296
- for key, value in filters.items():
297
- if key == 'date_range':
298
- case_date = datetime.fromisoformat(case['created_at'])
299
- if not (value['start'] <= case_date <= value['end']):
300
- return False
301
- elif key == 'tags':
302
- if not any(tag in case['tags'] for tag in value):
303
- return False
304
- elif key in case and case[key] != value:
305
- return False
306
- return True
307
-
308
- def _calculate_relevance(self, query: str, item: Dict) -> float:
309
- """Calculate search result relevance score."""
310
- score = 0.0
311
-
312
- # Title match
313
- if query in item['title'].lower():
314
- score += 1.0
315
-
316
- # Tag matches
317
- for tag in item.get('tags', []):
318
- if query in tag.lower():
319
- score += 0.5
320
-
321
- # Recent items get higher score
322
- days_old = (datetime.now() - datetime.fromisoformat(item['created_at'])).days
323
- score += max(0, 1 - (days_old / 365)) # Decay over a year
324
-
325
- return score
326
-
327
- def get_case_statistics(self, case_id: str) -> Dict:
328
- """Get detailed statistics for a case."""
329
- case = self.cases.get(case_id)
330
- if not case:
331
- raise ValueError(f"Case with ID {case_id} not found.")
332
-
333
- stats = case['statistics'].copy()
334
- stats.update({
335
- 'document_types': self._count_document_types(case),
336
- 'activity_timeline': self._generate_activity_timeline(case),
337
- 'tag_distribution': self._count_tags(case)
338
- })
339
-
340
- return stats
341
-
342
- def _count_document_types(self, case: Dict) -> Dict:
343
- """Count documents by type in a case."""
344
- type_counts = {}
345
- for doc_ref in case['documents']:
346
- doc = self.document_index.get(doc_ref['id'])
347
- if doc:
348
- doc_type = doc['metadata']['file_type']
349
- type_counts[doc_type] = type_counts.get(doc_type, 0) + 1
350
- return type_counts
351
-
352
- def _generate_activity_timeline(self, case: Dict) -> List[Dict]:
353
- """Generate activity timeline for a case."""
354
- timeline = []
355
-
356
- # Add case creation
357
- timeline.append({
358
- 'date': case['created_at'],
359
- 'type': 'case_created',
360
- 'description': f"Case '{case['title']}' created"
361
- })
362
-
363
- # Add document activities
364
- for doc_ref in case['documents']:
365
- doc = self.document_index.get(doc_ref['id'])
366
- if doc:
367
- timeline.append({
368
- 'date': doc['added_at'],
369
- 'type': 'document_added',
370
- 'description': f"Document '{doc['title']}' added"
371
- })
372
-
373
- # Sort timeline by date
374
- timeline.sort(key=lambda x: x['date'])
375
- return timeline
376
-
377
- def _count_tags(self, case: Dict) -> Dict:
378
- """Count tag occurrences in a case."""
379
- tag_counts = {}
380
-
381
- # Count case tags
382
- for tag in case['tags']:
383
- tag_counts[tag] = tag_counts.get(tag, 0) + 1
384
-
385
- # Count document tags
386
- for doc_ref in case['documents']:
387
- doc = self.document_index.get(doc_ref['id'])
388
- if doc:
389
- for tag in doc.get('tags', []):
390
- tag_counts[tag] = tag_counts.get(tag, 0) + 1
391
 
392
- return tag_counts
 
 
 
 
 
 
 
1
  import os
2
  import json
 
3
  from datetime import datetime
4
+ from typing import List, Dict, Optional
5
  from pathlib import Path
6
+ import shutil
7
 
8
  class CaseManager:
9
  def __init__(self, base_path: str = "data/cases"):
10
+ """Initialize CaseManager with a base directory to store cases."""
11
  self.base_path = Path(base_path)
12
  self.base_path.mkdir(parents=True, exist_ok=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  self.cases = {}
14
+ self._load_cases()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
 
16
+ def _load_cases(self):
17
+ """Load existing cases from storage."""
 
 
 
 
18
  try:
19
+ for case_dir in self.base_path.iterdir():
20
+ if case_dir.is_dir():
21
+ metadata_file = case_dir / 'metadata.json'
22
+ if metadata_file.exists():
23
+ with open(metadata_file, 'r') as f:
24
+ case_data = json.load(f)
25
+ self.cases[case_dir.name] = case_data
 
 
 
26
  except Exception as e:
27
+ print(f"Error loading cases: {e}")
28
+ self.cases = {}
 
 
 
29
 
30
+ def create_case(self, title: str, description: str, case_type: str) -> str:
31
+ """Create a new case and save it to storage."""
32
+ case_id = datetime.now().strftime('%Y%m%d_%H%M%S')
 
33
  case_path = self.base_path / case_id
34
+
35
+ # Create case directory structure
36
  case_path.mkdir(exist_ok=True)
37
  (case_path / 'documents').mkdir(exist_ok=True)
38
 
39
+ # Prepare case data
40
  case_data = {
41
  'id': case_id,
42
  'title': title,
43
  'description': description,
44
  'case_type': case_type,
 
 
 
45
  'created_at': datetime.now().isoformat(),
46
  'updated_at': datetime.now().isoformat(),
47
+ 'status': 'active',
48
+ 'documents': []
 
 
 
 
 
49
  }
50
 
51
+ # Save case metadata
52
+ with open(case_path / 'metadata.json', 'w') as f:
53
+ json.dump(case_data, f, indent=2)
54
+
55
  self.cases[case_id] = case_data
 
56
  return case_id
57
 
58
+ def get_all_cases(self) -> List[Dict]:
59
+ """Get a list of all cases."""
60
+ return list(self.cases.values())
 
 
61
 
62
+ def get_case(self, case_id: str) -> Optional[Dict]:
63
+ """Get details of a specific case."""
64
+ return self.cases.get(case_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65
 
66
+ def update_case(self, case_id: str, updates: Dict) -> Optional[Dict]:
67
+ """Update case details."""
68
+ if case_id not in self.cases:
69
+ return None
 
 
 
70
 
71
+ case = self.cases[case_id]
72
+ case.update(updates)
 
 
73
  case['updated_at'] = datetime.now().isoformat()
74
 
75
+ # Save updated metadata
76
+ with open(self.base_path / case_id / 'metadata.json', 'w') as f:
77
+ json.dump(case, f, indent=2)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78
 
79
+ return case
 
80
 
81
+ def delete_case(self, case_id: str) -> bool:
82
+ """Delete a case and all its files."""
83
+ if case_id not in self.cases:
84
  return False
85
 
86
+ try:
87
+ # Remove case directory and all contents
88
+ shutil.rmtree(self.base_path / case_id)
89
+ del self.cases[case_id]
90
+ return True
91
+ except Exception as e:
92
+ print(f"Error deleting case {case_id}: {e}")
93
+ return False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
 
95
+ def add_document(self, case_id: str, document_data: Dict):
96
+ """Add a document to a case."""
97
  if case_id not in self.cases:
98
  raise ValueError(f"Case with ID {case_id} not found.")
99
 
100
+ # Update document metadata
101
+ document_data['id'] = document_data.get('id', datetime.now().strftime('%Y%m%d_%H%M%S'))
102
+ document_data['added_at'] = document_data.get('added_at', datetime.now().isoformat())
 
103
 
104
+ # Add document to case
105
+ case = self.cases[case_id]
106
+ case['documents'].append(document_data)
107
  case['updated_at'] = datetime.now().isoformat()
108
+
109
+ # Save updated case metadata
110
+ with open(self.base_path / case_id / 'metadata.json', 'w') as f:
111
+ json.dump(case, f, indent=2)
112
 
113
+ def remove_document(self, case_id: str, document_id: str) -> bool:
114
+ """Remove a document from a case."""
115
  if case_id not in self.cases:
116
  return False
117
 
118
  case = self.cases[case_id]
119
+ case['documents'] = [doc for doc in case['documents'] if doc['id'] != document_id]
120
+ case['updated_at'] = datetime.now().isoformat()
121
 
122
+ # Save updated case metadata
123
+ with open(self.base_path / case_id / 'metadata.json', 'w') as f:
124
+ json.dump(case, f, indent=2)
125
 
126
+ # Remove document files
127
+ try:
128
+ doc_path = self.base_path / case_id / 'documents' / document_id
129
+ if doc_path.exists():
130
+ shutil.rmtree(doc_path)
131
+ return True
132
+ except Exception as e:
133
+ print(f"Error removing document files: {e}")
134
+ return False
135
+
136
+ def list_documents(self, case_id: str) -> List[Dict]:
137
+ """List all documents in a case."""
138
+ if case_id not in self.cases:
139
+ raise ValueError(f"Case with ID {case_id} not found.")
140
+ return self.cases[case_id].get('documents', [])
141
+
142
+ def get_document(self, case_id: str, document_id: str) -> Optional[Dict]:
143
+ """Get a specific document from a case."""
144
+ if case_id not in self.cases:
145
+ return None
146
 
147
+ for doc in self.cases[case_id].get('documents', []):
148
+ if doc['id'] == document_id:
149
+ return doc
150
+ return None
151
 
152
+ def search(self, query: str) -> List[Dict]:
153
+ """Search for cases or documents."""
154
  results = []
155
  query = query.lower()
156
 
157
  for case in self.cases.values():
 
 
 
 
 
158
  # Search in case metadata
159
  if (query in case['title'].lower() or
160
  query in case['description'].lower() or
161
+ query in case['case_type'].lower()):
 
162
  results.append({
163
  'type': 'case',
164
+ 'data': case
 
165
  })
166
 
167
  # Search in documents
168
+ for doc in case.get('documents', []):
169
+ if query in doc.get('title', '').lower():
 
 
170
  results.append({
171
  'type': 'document',
 
172
  'case_id': case['id'],
173
+ 'data': doc
174
  })
175
 
 
 
176
  return results
177
 
178
+ def get_case_stats(self, case_id: str) -> Optional[Dict]:
179
+ """Get statistics for a case."""
180
+ if case_id not in self.cases:
181
+ return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
 
183
+ case = self.cases[case_id]
184
+ return {
185
+ 'document_count': len(case.get('documents', [])),
186
+ 'created_at': case['created_at'],
187
+ 'last_updated': case['updated_at'],
188
+ 'status': case.get('status', 'active')
189
+ }