cryogenic22 commited on
Commit
a027a75
·
verified ·
1 Parent(s): 4a46b8c

Update utils/case_manager.py

Browse files
Files changed (1) hide show
  1. utils/case_manager.py +64 -73
utils/case_manager.py CHANGED
@@ -1,108 +1,99 @@
1
  import os
2
  import json
3
  from datetime import datetime
4
- from typing import Dict, List, Optional
5
- import streamlit as st
6
 
7
 
8
  class CaseManager:
9
  def __init__(self, base_path: str = "data/cases"):
 
 
 
10
  self.base_path = base_path
11
  os.makedirs(base_path, exist_ok=True)
12
  self._load_cases()
13
 
14
  def _load_cases(self):
15
  """Load existing cases from storage."""
16
- if "cases" not in st.session_state:
17
- st.session_state.cases = {}
18
- for case_id in os.listdir(self.base_path):
19
- case_path = os.path.join(self.base_path, case_id)
20
- if os.path.isdir(case_path):
21
- with open(os.path.join(case_path, "metadata.json"), "r") as f:
22
- st.session_state.cases[case_id] = json.load(f)
23
 
24
  def create_case(self, title: str, description: str, case_type: str) -> str:
25
- """Create a new case."""
 
 
26
  case_id = datetime.now().strftime('%Y%m%d_%H%M%S')
27
  case_path = os.path.join(self.base_path, case_id)
28
  os.makedirs(case_path, exist_ok=True)
29
- os.makedirs(os.path.join(case_path, "documents"), exist_ok=True)
30
 
31
  case_data = {
32
- "id": case_id,
33
- "title": title,
34
- "description": description,
35
- "case_type": case_type,
36
- "created_at": datetime.now().isoformat(),
37
- "documents": []
38
  }
39
 
40
- with open(os.path.join(case_path, "metadata.json"), "w") as f:
41
  json.dump(case_data, f)
42
-
43
- st.session_state.cases[case_id] = case_data
44
  return case_id
45
 
46
- def add_document(self, case_id: str, document_data: Dict) -> str:
47
- """Add a document to a case."""
48
- case = st.session_state.cases.get(case_id)
 
 
49
  if not case:
50
- raise ValueError(f"Case {case_id} not found")
51
 
52
- doc_id = datetime.now().strftime('%Y%m%d_%H%M%S')
53
- document_data["id"] = doc_id
54
- document_data["added_at"] = datetime.now().isoformat()
 
55
 
56
- # Save document to case directory
57
- case_path = os.path.join(self.base_path, case_id)
58
- doc_path = os.path.join(case_path, "documents", f"{doc_id}.json")
59
- with open(doc_path, "w") as f:
60
- json.dump(document_data, f)
61
 
62
- # Update case metadata
63
- case["documents"].append({"id": doc_id, "title": document_data["title"], "added_at": document_data["added_at"]})
64
- with open(os.path.join(case_path, "metadata.json"), "w") as f:
65
  json.dump(case, f)
66
 
67
- return doc_id
 
 
 
 
 
 
 
68
 
69
  def get_case(self, case_id: str) -> Optional[Dict]:
70
- """Get case details."""
71
- return st.session_state.cases.get(case_id)
 
 
72
 
73
  def get_all_cases(self) -> List[Dict]:
74
- """Get all cases."""
75
- return list(st.session_state.cases.values())
76
-
77
- def get_document(self, case_id: str, doc_id: str) -> Optional[Dict]:
78
- """Retrieve a specific document by ID."""
79
- case_path = os.path.join(self.base_path, case_id)
80
- doc_path = os.path.join(case_path, "documents", f"{doc_id}.json")
81
- if os.path.exists(doc_path):
82
- with open(doc_path, "r") as f:
83
- return json.load(f)
84
- return None
85
-
86
- def delete_document(self, case_id: str, doc_id: str):
87
- """Delete a specific document."""
88
- case = st.session_state.cases.get(case_id)
89
- if not case:
90
- raise ValueError(f"Case {case_id} not found")
91
-
92
- # Remove document file
93
- case_path = os.path.join(self.base_path, case_id)
94
- doc_path = os.path.join(case_path, "documents", f"{doc_id}.json")
95
- if os.path.exists(doc_path):
96
- os.remove(doc_path)
97
-
98
- # Update case metadata
99
- case["documents"] = [doc for doc in case["documents"] if doc["id"] != doc_id]
100
- with open(os.path.join(case_path, "metadata.json"), "w") as f:
101
- json.dump(case, f)
102
-
103
- def list_documents(self, case_id: str) -> List[Dict]:
104
- """List all documents for a case."""
105
- case = st.session_state.cases.get(case_id)
106
- if not case:
107
- return []
108
- return case["documents"]
 
1
  import os
2
  import json
3
  from datetime import datetime
4
+ from typing import List, Dict, Optional
 
5
 
6
 
7
  class CaseManager:
8
  def __init__(self, base_path: str = "data/cases"):
9
+ """
10
+ Initialize CaseManager with a base directory to store cases.
11
+ """
12
  self.base_path = base_path
13
  os.makedirs(base_path, exist_ok=True)
14
  self._load_cases()
15
 
16
  def _load_cases(self):
17
  """Load existing cases from storage."""
18
+ self.cases = {}
19
+ for case_id in os.listdir(self.base_path):
20
+ case_path = os.path.join(self.base_path, case_id)
21
+ if os.path.isdir(case_path):
22
+ with open(os.path.join(case_path, 'metadata.json'), 'r') as f:
23
+ self.cases[case_id] = json.load(f)
 
24
 
25
  def create_case(self, title: str, description: str, case_type: str) -> str:
26
+ """
27
+ Create a new case and save it to storage.
28
+ """
29
  case_id = datetime.now().strftime('%Y%m%d_%H%M%S')
30
  case_path = os.path.join(self.base_path, case_id)
31
  os.makedirs(case_path, exist_ok=True)
32
+ os.makedirs(os.path.join(case_path, 'documents'), exist_ok=True)
33
 
34
  case_data = {
35
+ 'id': case_id,
36
+ 'title': title,
37
+ 'description': description,
38
+ 'case_type': case_type,
39
+ 'created_at': datetime.now().isoformat(),
40
+ 'documents': []
41
  }
42
 
43
+ with open(os.path.join(case_path, 'metadata.json'), 'w') as f:
44
  json.dump(case_data, f)
45
+ self.cases[case_id] = case_data
 
46
  return case_id
47
 
48
+ def add_document(self, case_id: str, document_data: Dict):
49
+ """
50
+ Add a document to a case and save it to storage.
51
+ """
52
+ case = self.cases.get(case_id)
53
  if not case:
54
+ raise ValueError(f"Case with ID {case_id} not found.")
55
 
56
+ # Ensure the document has required fields
57
+ document_data['id'] = document_data.get('id', datetime.now().strftime('%Y%m%d_%H%M%S'))
58
+ document_data['added_at'] = document_data.get('added_at', datetime.now().isoformat())
59
+ document_data['text'] = document_data.get('text', '') # Ensure text is always present
60
 
61
+ case['documents'].append(document_data)
 
 
 
 
62
 
63
+ case_path = os.path.join(self.base_path, case_id)
64
+ with open(os.path.join(case_path, 'metadata.json'), 'w') as f:
 
65
  json.dump(case, f)
66
 
67
+ def list_documents(self, case_id: str) -> List[Dict]:
68
+ """
69
+ List all documents for a given case.
70
+ """
71
+ case = self.cases.get(case_id)
72
+ if not case:
73
+ raise ValueError(f"Case with ID {case_id} not found.")
74
+ return case.get('documents', [])
75
 
76
  def get_case(self, case_id: str) -> Optional[Dict]:
77
+ """
78
+ Get details of a specific case by ID.
79
+ """
80
+ return self.cases.get(case_id)
81
 
82
  def get_all_cases(self) -> List[Dict]:
83
+ """
84
+ Get a list of all cases.
85
+ """
86
+ return list(self.cases.values())
87
+
88
+ def search(self, query: str) -> List[Dict]:
89
+ """
90
+ Search for cases or documents by title or type.
91
+ """
92
+ results = []
93
+ for case in self.cases.values():
94
+ if query.lower() in case['title'].lower() or query.lower() in case['case_type'].lower():
95
+ results.append(case)
96
+ for document in case['documents']:
97
+ if query.lower() in document.get('title', '').lower():
98
+ results.append(document)
99
+ return results