File size: 20,083 Bytes
b915464
6edef9f
b915464
9fdbf51
b915464
 
 
b5d9ab1
 
 
6edef9f
b915464
6edef9f
b915464
 
6edef9f
 
b915464
 
 
 
 
7438c43
9fdbf51
b915464
 
 
 
cfcb5ae
b915464
 
 
 
 
 
 
962ac80
 
b915464
 
 
 
 
 
962ac80
 
b915464
b5d9ab1
b915464
 
 
 
 
7438c43
b915464
 
 
 
b5d9ab1
b915464
 
 
 
 
 
 
 
 
 
 
 
 
b5d9ab1
b915464
 
9fdbf51
 
 
 
 
ebf6043
 
 
 
 
 
 
 
 
 
9fdbf51
ebf6043
 
 
 
 
 
 
 
 
 
 
 
 
9fdbf51
 
ebf6043
 
9fdbf51
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ebf6043
 
 
 
 
 
 
9fdbf51
 
de92f5f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6edef9f
0ba0181
 
6edef9f
0ba0181
 
 
 
 
 
 
 
 
9fdbf51
de92f5f
9fdbf51
6edef9f
 
 
 
 
 
 
 
 
 
 
 
ebf6043
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9fdbf51
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b915464
9fdbf51
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6edef9f
9fdbf51
6edef9f
 
9fdbf51
6edef9f
9fdbf51
 
6edef9f
b915464
 
 
 
6edef9f
 
b915464
 
 
 
 
 
 
 
9fdbf51
b915464
6edef9f
 
b915464
 
 
 
 
 
9fdbf51
b915464
6edef9f
 
b915464
 
 
 
 
 
 
 
 
489ec8c
b915464
cfcb5ae
b915464
9fdbf51
b915464
 
9fdbf51
b915464
9fdbf51
b915464
 
 
 
 
 
 
 
9fdbf51
6edef9f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
# hedis_engine.py
# hedis_engine.py - Updated with Interactive Features
import os
import json
from crewai import Agent, Task, Crew, Process
from crewai_tools import tool
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI


class HedisComplianceEngine:
    """
    Crew that: (1) states official HEDIS measure criteria,
    (2) extracts patient evidence from embeddings, and
    (3) produces a markdown analysis report.
    
    Now includes interactive feedback capabilities.
    """
    def __init__(self, vectordb: Chroma, measure_code: str, measurement_year: int):
        self.vectordb = vectordb
        self.measure_code = measure_code
        self.measurement_year = measurement_year
        self.model = ChatOpenAI(model="gpt-4o", temperature=0, max_tokens=5096)
        self.hedis_criteria = None  # Store approved criteria

        @tool("patient_chart_search")
        def patient_chart_search(query: str) -> str:
            """Return top patient chart passages relevant to the query."""
            results = self.vectordb.similarity_search(query, k=15)
            return "\n".join([res.page_content for res in results])

        # 1) HEDIS measure expert (external knowledge via model)
        self.hedis_expert = Agent(
            role="HEDIS Measure Expert",
            goal=(
                f"Provide accurate, official details for HEDIS measure {measure_code} "
                "including required tests, codes, timeframes, and exclusions. "
                "ALWAYS respond with valid JSON format only."
            ),
            verbose=True,
            memory=False,
            tools=[],  # design choice: no web/search tools here; rely on model knowledge
            backstory=(
                "Healthcare quality expert with deep knowledge of NCQA HEDIS specifications. "
                "Must compute concrete absolute date ranges given the measurement year. "
                "CRITICAL: Always respond with properly formatted JSON without any markdown or extra text."
            ),
            llm=self.model,
        )

        # 2) Patient chart analyzer (RAG over embeddings)
        self.patient_analyzer = Agent(
            role="Patient Chart Analyzer",
            goal=f"Extract tests, diagnoses, procedures, meds & dates relevant to HEDIS {measure_code} only. Don't include anything which is not related to this measure.",
            verbose=True,
            memory=False,
            tools=[patient_chart_search],
            backstory="Specialist in retrieving structured clinical data from patient chart embeddings.",
            llm=self.model,
        )

        # 3) Reporter who composes a clean markdown report
        self.hedis_chart_reporter = Agent(
            role="HEDIS Chart Findings Reporter",
            goal=f"Create a comprehensive markdown report of findings for HEDIS {measure_code}.",
            verbose=True,
            memory=False,
            tools=[patient_chart_search],
            backstory=(
                "Clinical documentation specialist who organizes evidence and highlights "
                "relevant data for quality measure analysis."
            ),
            llm=self.model,
        )

    def get_hedis_criteria_with_feedback(self, suggestions: str = "") -> dict:
        """
        Get HEDIS criteria from expert and present to user for feedback.
        Returns approved criteria or None if user exits.
        """
        # Build the task description carefully to avoid string formatting issues
        base_description = f"""Provide the official criteria for HEDIS measure {self.measure_code}, including:
- Required tests or procedures
- Diagnosis/procedure codes  
- Required medications
- Inclusion and exclusion criteria
- Timeframes for each requirement (compute absolute date ranges using {self.measurement_year})

You MUST compute and include absolute date ranges using {self.measurement_year}."""

        if suggestions:
            base_description += f"\n\nIncorporate these suggestions: {suggestions}"

        formatting_requirements = """

CRITICAL FORMATTING REQUIREMENTS:
- Return ONLY valid JSON without any markdown formatting, code blocks, or extra text
- Do not wrap the JSON in code blocks
- Do not add any explanatory text before or after the JSON
- The response must start with {{ and end with }}
- Ensure all JSON keys and string values are properly quoted
- Required JSON fields: measure, required_tests, required_medications, codes, timeframes, inclusions, exclusions"""

        full_description = base_description + formatting_requirements

        hedis_task = Task(
            description=full_description,
            expected_output="Valid JSON object with required fields (no markdown, no code blocks)",
            agent=self.hedis_expert,
        )

        # Create temporary crew just for getting criteria
        criteria_crew = Crew(
            agents=[self.hedis_expert],
            tasks=[hedis_task],
            process=Process.sequential,
            verbose=True,
        )

        # Execute to get criteria
        inputs = {
            "measure_code": self.measure_code,
            "measurement_year": self.measurement_year,
        }
        
        try:
            result = criteria_crew.kickoff(inputs=inputs)
            print(f"Raw crew result type: {type(result)}")
            print(f"Raw crew result: {repr(str(result)[:200])}...")  # Show first 200 chars
        except Exception as e:
            print(f"Crew execution error: {e}")
            return {"raw_output": f"Crew execution failed: {str(e)}", "parse_error": str(e)}
        
        try:
            # Clean the result string - remove markdown code blocks and extra whitespace
            result_str = str(result).strip()
            
            # Remove markdown code blocks if present
            if result_str.startswith("```json"):
                result_str = result_str[7:]  # Remove ```json
            elif result_str.startswith("```"):
                result_str = result_str[3:]   # Remove ```
            
            if result_str.endswith("```"):
                result_str = result_str[:-3]  # Remove closing ```
            
            # Remove any leading/trailing whitespace and newlines
            result_str = result_str.strip()
            
            # Handle quoted JSON strings
            if len(result_str) >= 2 and result_str.startswith('"') and result_str.endswith('"'):
                try:
                    # Try to unquote and parse
                    unquoted = result_str[1:-1]
                    # Handle escaped quotes in the JSON
                    unquoted = unquoted.replace('\\"', '"').replace('\\n', '\n')
                    test_parse = json.loads(unquoted)
                    result_str = unquoted
                except:
                    # If unquoting fails, keep original
                    pass
            
            # Try to parse as JSON
            criteria_json = json.loads(result_str)
            return criteria_json
            
        except json.JSONDecodeError as e:
            print(f"Primary method failed, trying simplified approach...")
            try:
                return self.get_hedis_criteria_simple(suggestions)
            except Exception as fallback_error:
                print(f"Fallback method also failed: {fallback_error}")
                return {"raw_output": str(result), "parse_error": f"Primary: {str(e)}, Fallback: {str(fallback_error)}"}
        except Exception as e:
            print(f"Unexpected error: {e}")
            return {"raw_output": str(result) if 'result' in locals() else "No result", "parse_error": str(e)}

    def get_hedis_criteria_simple(self, suggestions: str = "") -> dict:
        """
        Simplified method to get HEDIS criteria with minimal string formatting.
        """
        try:
            # Create a simple, direct prompt
            prompt_parts = [
                f"Provide HEDIS measure {self.measure_code} criteria for year {self.measurement_year}.",
                "Include: required tests, medications, codes, timeframes, inclusions, exclusions.",
            ]
            
            if suggestions:
                prompt_parts.append(f"Suggestions: {suggestions}")
            
            prompt_parts.append("Respond with valid JSON only. No markdown, no code blocks, just JSON.")
            
            simple_description = " ".join(prompt_parts)
            
            hedis_task = Task(
                description=simple_description,
                expected_output="JSON object with measure criteria",
                agent=self.hedis_expert,
            )

            # Create temporary crew
            criteria_crew = Crew(
                agents=[self.hedis_expert],
                tasks=[hedis_task],
                process=Process.sequential,
                verbose=True,
            )

            # Execute
            inputs = {"measure_code": self.measure_code, "measurement_year": self.measurement_year}
            result = criteria_crew.kickoff(inputs=inputs)
            
            # Try to parse the result
            result_str = str(result).strip()
            
            # Basic cleaning
            if result_str.startswith("```"):
                lines = result_str.split('\n')
                # Remove first and last lines if they contain ```
                if lines[0].strip().startswith('```'):
                    lines = lines[1:]
                if lines and lines[-1].strip() == '```':
                    lines = lines[:-1]
                result_str = '\n'.join(lines).strip()
            
            return json.loads(result_str)
            
        except Exception as e:
            print(f"Simple method error: {e}")
            return {"raw_output": str(result) if 'result' in locals() else "No result", "parse_error": str(e)}

    def display_criteria(self, criteria: dict) -> None:
        """Display the HEDIS criteria in a user-friendly format."""
        print(f"\n{'='*60}")
        print(f"HEDIS MEASURE {self.measure_code} CRITERIA")
        print(f"Measurement Year: {self.measurement_year}")
        print(f"{'='*60}")
        
        if "raw_output" in criteria:
            print(criteria["raw_output"])
        else:
            if "measure" in criteria:
                print(f"\n📋 Measure: {criteria['measure']}")
            
            if "required_tests" in criteria:
                print(f"\n🔬 Required Tests/Procedures:")
                if isinstance(criteria['required_tests'], list):
                    for test in criteria['required_tests']:
                        print(f"  • {test}")
                else:
                    print(f"  {criteria['required_tests']}")
            
            if "required_medications" in criteria:
                print(f"\n💊 Required Medications:")
                if isinstance(criteria['required_medications'], list):
                    for med in criteria['required_medications']:
                        print(f"  • {med}")
                else:
                    print(f"  {criteria['required_medications']}")
            
            if "codes" in criteria:
                print(f"\n🏷️  Relevant Codes:")
                if isinstance(criteria['codes'], dict):
                    for code_type, codes in criteria['codes'].items():
                        print(f"  {code_type}: {codes}")
                else:
                    print(f"  {criteria['codes']}")
            
            if "timeframes" in criteria:
                print(f"\n⏰ Timeframes:")
                if isinstance(criteria['timeframes'], dict):
                    for requirement, timeframe in criteria['timeframes'].items():
                        print(f"  {requirement}: {timeframe}")
                else:
                    print(f"  {criteria['timeframes']}")
            
            if "inclusions" in criteria:
                print(f"\n✅ Inclusion Criteria:")
                if isinstance(criteria['inclusions'], list):
                    for inclusion in criteria['inclusions']:
                        print(f"  • {inclusion}")
                else:
                    print(f"  {criteria['inclusions']}")
            
            if "exclusions" in criteria:
                print(f"\n❌ Exclusion Criteria:")
                if isinstance(criteria['exclusions'], list):
                    for exclusion in criteria['exclusions']:
                        print(f"  • {exclusion}")
                else:
                    print(f"  {criteria['exclusions']}")

    def get_user_feedback(self) -> tuple[str, str]:
        """
        Get user feedback on the displayed criteria.
        Returns (action, suggestions) where action is 'continue', 'modify', or 'exit'
        """
        print(f"\n{'='*60}")
        print("REVIEW THE CRITERIA ABOVE")
        print(f"{'='*60}")
        print("Options:")
        print("• Press ENTER to continue with these criteria")
        print("• Type suggestions to modify the criteria")
        print("• Type 'exit' to stop the analysis")
        print(f"{'='*60}")
        
        user_input = input("\nYour choice: ").strip()
        
        if user_input.lower() == 'exit':
            return 'exit', ''
        elif user_input == '':
            return 'continue', ''
        else:
            return 'modify', user_input

    def run_interactive(self) -> str:
        """
        Run the interactive HEDIS compliance analysis with user feedback loop.
        """
        print(f"Starting Interactive HEDIS Analysis for measure {self.measure_code}")
        
        suggestions = ""
        while True:
            # Get criteria from expert
            print(f"\n🔍 Getting HEDIS criteria for {self.measure_code}...")
            criteria = self.get_hedis_criteria_with_feedback(suggestions)
            
            # Display criteria to user
            self.display_criteria(criteria)
            
            # Get user feedback
            action, user_suggestions = self.get_user_feedback()
            
            if action == 'exit':
                print("\n❌ Analysis cancelled by user.")
                return "Analysis cancelled by user."
            elif action == 'continue':
                print("\n✅ Criteria approved! Proceeding with chart analysis...")
                self.hedis_criteria = criteria
                break
            elif action == 'modify':
                print(f"\n🔄 Modifying criteria based on your suggestions...")
                suggestions = user_suggestions
                continue
        
        # Now proceed with the rest of the analysis using approved criteria
        return self.run_full_analysis()

    def run_full_analysis(self) -> str:
        """
        Run the full analysis with approved criteria.
        """
        # Create tasks for patient analysis and reporting
        patient_task = Task(
            description=(
                f"Using the patient_chart_search tool, extract only the tests, diagnoses, procedures, "
                f"medications, and dates relevant to {self.measure_code}. Only include measure related info from the patient charts. "
                f"Use these approved criteria as reference: {json.dumps(self.hedis_criteria, indent=2)}"
            ),
            expected_output="Structured list of patient evidence with dates.",
            agent=self.patient_analyzer,
            async_execution=True,
        )

        chart_analysis_task = Task(
            description=(
                f"Create a comprehensive markdown report analyzing the patient chart for HEDIS {self.measure_code} "
                f"for measurement year {self.measurement_year}.\n\n"
                "Sections:\n"
                "### HEDIS Measure Criteria (Approved)\n"
                "### Diagnoses Found\n"
                "### Procedures Found\n"
                "### Laboratory Tests\n"
                "### Medications\n"
                "### Vital Signs\n"
                "### Exclusions Found\n"
                "### Missing Information\n"
                "### Compliance Summary\n"
                f"Use these approved criteria as reference: {json.dumps(self.hedis_criteria, indent=2)}\n"
                "Make sure only measure related information is included."
            ),
            expected_output="A thorough markdown report of HEDIS findings with compliance assessment.",
            agent=self.hedis_chart_reporter,
            context=[patient_task],
        )

        # Create crew for full analysis
        analysis_crew = Crew(
            agents=[self.patient_analyzer, self.hedis_chart_reporter],
            tasks=[patient_task, chart_analysis_task],
            process=Process.sequential,
            verbose=True,
        )

        # Execute full analysis
        inputs = {
            "measure_code": self.measure_code,
            "measurement_year": self.measurement_year,
        }
        
        print(f"\n🔄 Running patient chart analysis...")
        result = analysis_crew.kickoff(inputs=inputs)
        
        print(f"\n✅ Analysis complete!")
        return str(result)

    def run(self) -> str:
        """
        Entry point - runs original non-interactive analysis for app compatibility.
        Use run_interactive() for interactive mode.
        """
        # Original tasks
        hedis_task = Task(
            description=(
                "Provide the official criteria for HEDIS measure {measure_code}, including:\n"
                "- Required tests or procedures\n"
                "- Diagnosis/procedure codes\n"
                "- Required medications\n"
                "- Inclusion and exclusion criteria\n"
                "- Timeframes for each requirement (e.g., within 10 years of {measurement_year})\n\n"
                "You MUST compute and include absolute date ranges using {measurement_year}.\n"
                "Return JSON with fields: 'measure','required_tests','required_medications',"
                "'codes','timeframes','inclusions','exclusions'."
            ),
            expected_output="Structured JSON with absolute date ranges.",
            agent=self.hedis_expert,
            async_execution=True,
        )

        patient_task = Task(
            description=(
                "Using the patient_chart_search tool, extract only the tests, diagnoses, procedures, "
                "medications, and dates relevant to {measure_code}. Only include measure related info from the patient charts"
            ),
            expected_output="Structured list of patient evidence with dates.",
            agent=self.patient_analyzer,
            async_execution=True,
        )

        chart_analysis_task = Task(
            description=(
                "Create a comprehensive markdown report analyzing the patient chart for HEDIS {measure_code} "
                "for measurement year {measurement_year}.\n\n"
                "Sections:\n"
                "### Diagnoses Found\n"
                "### Procedures Found\n"
                "### Laboratory Tests\n"
                "### Medications\n"
                "### Vital Signs\n"
                "### Exclusions Found\n"
                "### Missing Information\n"
                "### Summary\n"
                "Make sure only measure related information is there."
            ),
            expected_output="A thorough markdown report of HEDIS findings.",
            agent=self.hedis_chart_reporter,
            context=[hedis_task, patient_task],
        )

        crew = Crew(
            agents=[self.hedis_expert, self.patient_analyzer, self.hedis_chart_reporter],
            tasks=[hedis_task, patient_task, chart_analysis_task],
            process=Process.sequential,
            verbose=True,
        )

        inputs = {
            "measure_code": self.measure_code,
            "measurement_year": self.measurement_year,
        }
        result = crew.kickoff(inputs=inputs)
        return str(result)