abjasrees commited on
Commit
9fdbf51
·
verified ·
1 Parent(s): 489ec8c

Update hedis_engine.py

Browse files
Files changed (1) hide show
  1. hedis_engine.py +276 -23
hedis_engine.py CHANGED
@@ -1,18 +1,16 @@
1
  # hedis_engine.py
 
2
  import os
 
3
  from crewai import Agent, Task, Crew, Process
4
  from crewai_tools import tool
5
  from langchain_community.vectorstores import Chroma
6
-
7
  from langchain_openai import ChatOpenAI
8
 
9
 
10
-
11
-
12
-
13
- class HedisComplianceEngine:
14
  """
15
- Crew that: (1) states official HEDIS measure criteria,
16
  (2) extracts patient evidence from embeddings, and
17
  (3) produces a markdown analysis report.
18
  """
@@ -21,6 +19,7 @@ class HedisComplianceEngine:
21
  self.measure_code = measure_code
22
  self.measurement_year = measurement_year
23
  self.model = ChatOpenAI(model="gpt-4o", temperature=0, max_tokens=5096)
 
24
 
25
  @tool("patient_chart_search")
26
  def patient_chart_search(query: str) -> str:
@@ -70,16 +69,248 @@ class HedisComplianceEngine:
70
  llm=self.model,
71
  )
72
 
73
- # Tasks
74
- self.hedis_task = Task(
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  description=(
76
- "Provide the official criteria for HEDIS measure {measure_code}, including:\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
  "- Required tests or procedures\n"
78
  "- Diagnosis/procedure codes\n"
79
  "- Required medications\n"
80
  "- Inclusion and exclusion criteria\n"
81
- "- Timeframes for each requirement (e.g., within 10 years of {measurement_year})\n\n"
82
- "You MUST compute and include absolute date ranges using {measurement_year}.\n"
83
  "Return JSON with fields: 'measure','required_tests','required_medications',"
84
  "'codes','timeframes','inclusions','exclusions'."
85
  ),
@@ -88,20 +319,20 @@ class HedisComplianceEngine:
88
  async_execution=True,
89
  )
90
 
91
- self.patient_task = Task(
92
  description=(
93
- "Using the patient_chart_search tool, extract only the tests, diagnoses, procedures, "
94
- "medications, and dates relevant to {measure_code}. Only include measure related info from the patient charts"
95
  ),
96
  expected_output="Structured list of patient evidence with dates.",
97
  agent=self.patient_analyzer,
98
  async_execution=True,
99
  )
100
 
101
- self.chart_analysis_task = Task(
102
  description=(
103
- "Create a comprehensive markdown report analyzing the patient chart for HEDIS {measure_code} "
104
- "for measurement year {measurement_year}.\n\n"
105
  "Sections:\n"
106
  "### Diagnoses Found\n"
107
  "### Procedures Found\n"
@@ -115,20 +346,42 @@ class HedisComplianceEngine:
115
  ),
116
  expected_output="A thorough markdown report of HEDIS findings.",
117
  agent=self.hedis_chart_reporter,
118
- context=[self.hedis_task, self.patient_task],
119
  )
120
 
121
- self.crew = Crew(
122
  agents=[self.hedis_expert, self.patient_analyzer, self.hedis_chart_reporter],
123
- tasks=[self.hedis_task, self.patient_task, self.chart_analysis_task],
124
  process=Process.sequential,
125
  verbose=True,
126
  )
127
 
128
- def run(self) -> str:
129
  inputs = {
130
  "measure_code": self.measure_code,
131
  "measurement_year": self.measurement_year,
132
  }
133
- result = self.crew.kickoff(inputs=inputs)
134
- return str(result)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # hedis_engine.py
2
+ # interactive_hedis_engine.py
3
  import os
4
+ import json
5
  from crewai import Agent, Task, Crew, Process
6
  from crewai_tools import tool
7
  from langchain_community.vectorstores import Chroma
 
8
  from langchain_openai import ChatOpenAI
9
 
10
 
11
+ class InteractiveHedisComplianceEngine:
 
 
 
12
  """
13
+ Interactive Crew that: (1) states official HEDIS measure criteria and gets user feedback,
14
  (2) extracts patient evidence from embeddings, and
15
  (3) produces a markdown analysis report.
16
  """
 
19
  self.measure_code = measure_code
20
  self.measurement_year = measurement_year
21
  self.model = ChatOpenAI(model="gpt-4o", temperature=0, max_tokens=5096)
22
+ self.hedis_criteria = None # Store approved criteria
23
 
24
  @tool("patient_chart_search")
25
  def patient_chart_search(query: str) -> str:
 
69
  llm=self.model,
70
  )
71
 
72
+ def get_hedis_criteria_with_feedback(self, suggestions: str = "") -> dict:
73
+ """
74
+ Get HEDIS criteria from expert and present to user for feedback.
75
+ Returns approved criteria or None if user exits.
76
+ """
77
+ # Create task for getting HEDIS criteria
78
+ hedis_task_description = (
79
+ f"Provide the official criteria for HEDIS measure {self.measure_code}, including:\n"
80
+ "- Required tests or procedures\n"
81
+ "- Diagnosis/procedure codes\n"
82
+ "- Required medications\n"
83
+ "- Inclusion and exclusion criteria\n"
84
+ f"- Timeframes for each requirement (e.g., within X years of {self.measurement_year})\n\n"
85
+ f"You MUST compute and include absolute date ranges using {self.measurement_year}.\n"
86
+ )
87
+
88
+ if suggestions:
89
+ hedis_task_description += f"\n\nIncorporate these suggestions: {suggestions}\n"
90
+
91
+ hedis_task_description += (
92
+ "Return JSON with fields: 'measure','required_tests','required_medications',"
93
+ "'codes','timeframes','inclusions','exclusions'."
94
+ )
95
+
96
+ hedis_task = Task(
97
+ description=hedis_task_description,
98
+ expected_output="Structured JSON with absolute date ranges.",
99
+ agent=self.hedis_expert,
100
+ )
101
+
102
+ # Create temporary crew just for getting criteria
103
+ criteria_crew = Crew(
104
+ agents=[self.hedis_expert],
105
+ tasks=[hedis_task],
106
+ process=Process.sequential,
107
+ verbose=True,
108
+ )
109
+
110
+ # Execute to get criteria
111
+ inputs = {
112
+ "measure_code": self.measure_code,
113
+ "measurement_year": self.measurement_year,
114
+ }
115
+
116
+ result = criteria_crew.kickoff(inputs=inputs)
117
+
118
+ try:
119
+ # Try to parse as JSON
120
+ criteria_json = json.loads(str(result))
121
+ return criteria_json
122
+ except json.JSONDecodeError:
123
+ # If not valid JSON, return as string
124
+ return {"raw_output": str(result)}
125
+
126
+ def display_criteria(self, criteria: dict) -> None:
127
+ """Display the HEDIS criteria in a user-friendly format."""
128
+ print(f"\n{'='*60}")
129
+ print(f"HEDIS MEASURE {self.measure_code} CRITERIA")
130
+ print(f"Measurement Year: {self.measurement_year}")
131
+ print(f"{'='*60}")
132
+
133
+ if "raw_output" in criteria:
134
+ print(criteria["raw_output"])
135
+ else:
136
+ if "measure" in criteria:
137
+ print(f"\n📋 Measure: {criteria['measure']}")
138
+
139
+ if "required_tests" in criteria:
140
+ print(f"\n🔬 Required Tests/Procedures:")
141
+ if isinstance(criteria['required_tests'], list):
142
+ for test in criteria['required_tests']:
143
+ print(f" • {test}")
144
+ else:
145
+ print(f" {criteria['required_tests']}")
146
+
147
+ if "required_medications" in criteria:
148
+ print(f"\n💊 Required Medications:")
149
+ if isinstance(criteria['required_medications'], list):
150
+ for med in criteria['required_medications']:
151
+ print(f" • {med}")
152
+ else:
153
+ print(f" {criteria['required_medications']}")
154
+
155
+ if "codes" in criteria:
156
+ print(f"\n🏷️ Relevant Codes:")
157
+ if isinstance(criteria['codes'], dict):
158
+ for code_type, codes in criteria['codes'].items():
159
+ print(f" {code_type}: {codes}")
160
+ else:
161
+ print(f" {criteria['codes']}")
162
+
163
+ if "timeframes" in criteria:
164
+ print(f"\n⏰ Timeframes:")
165
+ if isinstance(criteria['timeframes'], dict):
166
+ for requirement, timeframe in criteria['timeframes'].items():
167
+ print(f" {requirement}: {timeframe}")
168
+ else:
169
+ print(f" {criteria['timeframes']}")
170
+
171
+ if "inclusions" in criteria:
172
+ print(f"\n✅ Inclusion Criteria:")
173
+ if isinstance(criteria['inclusions'], list):
174
+ for inclusion in criteria['inclusions']:
175
+ print(f" • {inclusion}")
176
+ else:
177
+ print(f" {criteria['inclusions']}")
178
+
179
+ if "exclusions" in criteria:
180
+ print(f"\n❌ Exclusion Criteria:")
181
+ if isinstance(criteria['exclusions'], list):
182
+ for exclusion in criteria['exclusions']:
183
+ print(f" • {exclusion}")
184
+ else:
185
+ print(f" {criteria['exclusions']}")
186
+
187
+ def get_user_feedback(self) -> tuple[str, str]:
188
+ """
189
+ Get user feedback on the displayed criteria.
190
+ Returns (action, suggestions) where action is 'continue', 'modify', or 'exit'
191
+ """
192
+ print(f"\n{'='*60}")
193
+ print("REVIEW THE CRITERIA ABOVE")
194
+ print(f"{'='*60}")
195
+ print("Options:")
196
+ print("• Press ENTER to continue with these criteria")
197
+ print("• Type suggestions to modify the criteria")
198
+ print("• Type 'exit' to stop the analysis")
199
+ print(f"{'='*60}")
200
+
201
+ user_input = input("\nYour choice: ").strip()
202
+
203
+ if user_input.lower() == 'exit':
204
+ return 'exit', ''
205
+ elif user_input == '':
206
+ return 'continue', ''
207
+ else:
208
+ return 'modify', user_input
209
+
210
+ def run_interactive(self) -> str:
211
+ """
212
+ Run the interactive HEDIS compliance analysis with user feedback loop.
213
+ """
214
+ print(f"Starting Interactive HEDIS Analysis for measure {self.measure_code}")
215
+
216
+ suggestions = ""
217
+ while True:
218
+ # Get criteria from expert
219
+ print(f"\n🔍 Getting HEDIS criteria for {self.measure_code}...")
220
+ criteria = self.get_hedis_criteria_with_feedback(suggestions)
221
+
222
+ # Display criteria to user
223
+ self.display_criteria(criteria)
224
+
225
+ # Get user feedback
226
+ action, user_suggestions = self.get_user_feedback()
227
+
228
+ if action == 'exit':
229
+ print("\n❌ Analysis cancelled by user.")
230
+ return "Analysis cancelled by user."
231
+ elif action == 'continue':
232
+ print("\n✅ Criteria approved! Proceeding with chart analysis...")
233
+ self.hedis_criteria = criteria
234
+ break
235
+ elif action == 'modify':
236
+ print(f"\n🔄 Modifying criteria based on your suggestions...")
237
+ suggestions = user_suggestions
238
+ continue
239
+
240
+ # Now proceed with the rest of the analysis using approved criteria
241
+ return self.run_full_analysis()
242
+
243
+ def run_full_analysis(self) -> str:
244
+ """
245
+ Run the full analysis with approved criteria.
246
+ """
247
+ # Create tasks for patient analysis and reporting
248
+ patient_task = Task(
249
+ description=(
250
+ f"Using the patient_chart_search tool, extract only the tests, diagnoses, procedures, "
251
+ f"medications, and dates relevant to {self.measure_code}. Only include measure related info from the patient charts. "
252
+ f"Use these approved criteria as reference: {json.dumps(self.hedis_criteria, indent=2)}"
253
+ ),
254
+ expected_output="Structured list of patient evidence with dates.",
255
+ agent=self.patient_analyzer,
256
+ async_execution=True,
257
+ )
258
+
259
+ chart_analysis_task = Task(
260
  description=(
261
+ f"Create a comprehensive markdown report analyzing the patient chart for HEDIS {self.measure_code} "
262
+ f"for measurement year {self.measurement_year}.\n\n"
263
+ "Sections:\n"
264
+ "### HEDIS Measure Criteria (Approved)\n"
265
+ "### Diagnoses Found\n"
266
+ "### Procedures Found\n"
267
+ "### Laboratory Tests\n"
268
+ "### Medications\n"
269
+ "### Vital Signs\n"
270
+ "### Exclusions Found\n"
271
+ "### Missing Information\n"
272
+ "### Compliance Summary\n"
273
+ f"Use these approved criteria as reference: {json.dumps(self.hedis_criteria, indent=2)}\n"
274
+ "Make sure only measure related information is included."
275
+ ),
276
+ expected_output="A thorough markdown report of HEDIS findings with compliance assessment.",
277
+ agent=self.hedis_chart_reporter,
278
+ context=[patient_task],
279
+ )
280
+
281
+ # Create crew for full analysis
282
+ analysis_crew = Crew(
283
+ agents=[self.patient_analyzer, self.hedis_chart_reporter],
284
+ tasks=[patient_task, chart_analysis_task],
285
+ process=Process.sequential,
286
+ verbose=True,
287
+ )
288
+
289
+ # Execute full analysis
290
+ inputs = {
291
+ "measure_code": self.measure_code,
292
+ "measurement_year": self.measurement_year,
293
+ }
294
+
295
+ print(f"\n🔄 Running patient chart analysis...")
296
+ result = analysis_crew.kickoff(inputs=inputs)
297
+
298
+ print(f"\n✅ Analysis complete!")
299
+ return str(result)
300
+
301
+ def run_non_interactive(self) -> str:
302
+ """
303
+ Run the original non-interactive analysis (for backwards compatibility).
304
+ """
305
+ hedis_task = Task(
306
+ description=(
307
+ f"Provide the official criteria for HEDIS measure {self.measure_code}, including:\n"
308
  "- Required tests or procedures\n"
309
  "- Diagnosis/procedure codes\n"
310
  "- Required medications\n"
311
  "- Inclusion and exclusion criteria\n"
312
+ f"- Timeframes for each requirement (e.g., within X years of {self.measurement_year})\n\n"
313
+ f"You MUST compute and include absolute date ranges using {self.measurement_year}.\n"
314
  "Return JSON with fields: 'measure','required_tests','required_medications',"
315
  "'codes','timeframes','inclusions','exclusions'."
316
  ),
 
319
  async_execution=True,
320
  )
321
 
322
+ patient_task = Task(
323
  description=(
324
+ f"Using the patient_chart_search tool, extract only the tests, diagnoses, procedures, "
325
+ f"medications, and dates relevant to {self.measure_code}. Only include measure related info from the patient charts"
326
  ),
327
  expected_output="Structured list of patient evidence with dates.",
328
  agent=self.patient_analyzer,
329
  async_execution=True,
330
  )
331
 
332
+ chart_analysis_task = Task(
333
  description=(
334
+ f"Create a comprehensive markdown report analyzing the patient chart for HEDIS {self.measure_code} "
335
+ f"for measurement year {self.measurement_year}.\n\n"
336
  "Sections:\n"
337
  "### Diagnoses Found\n"
338
  "### Procedures Found\n"
 
346
  ),
347
  expected_output="A thorough markdown report of HEDIS findings.",
348
  agent=self.hedis_chart_reporter,
349
+ context=[hedis_task, patient_task],
350
  )
351
 
352
+ crew = Crew(
353
  agents=[self.hedis_expert, self.patient_analyzer, self.hedis_chart_reporter],
354
+ tasks=[hedis_task, patient_task, chart_analysis_task],
355
  process=Process.sequential,
356
  verbose=True,
357
  )
358
 
 
359
  inputs = {
360
  "measure_code": self.measure_code,
361
  "measurement_year": self.measurement_year,
362
  }
363
+ result = crew.kickoff(inputs=inputs)
364
+ return str(result)
365
+
366
+ def run(self) -> str:
367
+ """
368
+ Entry point - runs non-interactive analysis by default for app compatibility.
369
+ Use run_interactive() for interactive mode.
370
+ """
371
+ return self.run_non_interactive()
372
+
373
+
374
+ # Usage example:
375
+ """
376
+ # Initialize the engine
377
+ vectordb = your_chroma_db # Your Chroma vector database
378
+ engine = InteractiveHedisComplianceEngine(
379
+ vectordb=vectordb,
380
+ measure_code="CDC-A1C",
381
+ measurement_year=2024
382
+ )
383
+
384
+ # Run interactive analysis
385
+ result = engine.run()
386
+ print(result)
387
+ """