| |
| |
| import os |
| import json |
| from crewai import Agent, Task, Crew, Process |
| from crewai_tools import tool |
| from langchain_community.vectorstores import Chroma |
| from langchain_openai import ChatOpenAI |
|
|
|
|
| class HedisComplianceEngine: |
| """ |
| Crew that: (1) states official HEDIS measure criteria, |
| (2) extracts patient evidence from embeddings, and |
| (3) produces a markdown analysis report. |
| |
| Now includes interactive feedback capabilities. |
| """ |
| def __init__(self, vectordb: Chroma, measure_code: str, measurement_year: int): |
| self.vectordb = vectordb |
| self.measure_code = measure_code |
| self.measurement_year = measurement_year |
| self.model = ChatOpenAI(model="gpt-4o", temperature=0, max_tokens=5096) |
| self.hedis_criteria = None |
|
|
| @tool("patient_chart_search") |
| def patient_chart_search(query: str) -> str: |
| """Return top patient chart passages relevant to the query.""" |
| results = self.vectordb.similarity_search(query, k=15) |
| return "\n".join([res.page_content for res in results]) |
|
|
| |
| self.hedis_expert = Agent( |
| role="HEDIS Measure Expert", |
| goal=( |
| f"Provide accurate, official details for HEDIS measure {measure_code} " |
| "including required tests, codes, timeframes, and exclusions. " |
| "ALWAYS respond with valid JSON format only." |
| ), |
| verbose=True, |
| memory=False, |
| tools=[], |
| backstory=( |
| "Healthcare quality expert with deep knowledge of NCQA HEDIS specifications. " |
| "Must compute concrete absolute date ranges given the measurement year. " |
| "CRITICAL: Always respond with properly formatted JSON without any markdown or extra text." |
| ), |
| llm=self.model, |
| ) |
|
|
| |
| self.patient_analyzer = Agent( |
| role="Patient Chart Analyzer", |
| goal=f"Extract tests, diagnoses, procedures, meds & dates relevant to HEDIS {measure_code} only. Don't include anything which is not related to this measure.", |
| verbose=True, |
| memory=False, |
| tools=[patient_chart_search], |
| backstory="Specialist in retrieving structured clinical data from patient chart embeddings.", |
| llm=self.model, |
| ) |
|
|
| |
| self.hedis_chart_reporter = Agent( |
| role="HEDIS Chart Findings Reporter", |
| goal=f"Create a comprehensive markdown report of findings for HEDIS {measure_code}.", |
| verbose=True, |
| memory=False, |
| tools=[patient_chart_search], |
| backstory=( |
| "Clinical documentation specialist who organizes evidence and highlights " |
| "relevant data for quality measure analysis." |
| ), |
| llm=self.model, |
| ) |
|
|
| def get_hedis_criteria_with_feedback(self, suggestions: str = "") -> dict: |
| """ |
| Get HEDIS criteria from expert and present to user for feedback. |
| Returns approved criteria or None if user exits. |
| """ |
| |
| base_description = f"""Provide the official criteria for HEDIS measure {self.measure_code}, including: |
| - Required tests or procedures |
| - Diagnosis/procedure codes |
| - Required medications |
| - Inclusion and exclusion criteria |
| - Timeframes for each requirement (compute absolute date ranges using {self.measurement_year}) |
| |
| You MUST compute and include absolute date ranges using {self.measurement_year}.""" |
|
|
| if suggestions: |
| base_description += f"\n\nIncorporate these suggestions: {suggestions}" |
|
|
| formatting_requirements = """ |
| |
| CRITICAL FORMATTING REQUIREMENTS: |
| - Return ONLY valid JSON without any markdown formatting, code blocks, or extra text |
| - Do not wrap the JSON in code blocks |
| - Do not add any explanatory text before or after the JSON |
| - The response must start with {{ and end with }} |
| - Ensure all JSON keys and string values are properly quoted |
| - Required JSON fields: measure, required_tests, required_medications, codes, timeframes, inclusions, exclusions""" |
|
|
| full_description = base_description + formatting_requirements |
|
|
| hedis_task = Task( |
| description=full_description, |
| expected_output="Valid JSON object with required fields (no markdown, no code blocks)", |
| agent=self.hedis_expert, |
| ) |
|
|
| |
| criteria_crew = Crew( |
| agents=[self.hedis_expert], |
| tasks=[hedis_task], |
| process=Process.sequential, |
| verbose=True, |
| ) |
|
|
| |
| inputs = { |
| "measure_code": self.measure_code, |
| "measurement_year": self.measurement_year, |
| } |
| |
| try: |
| result = criteria_crew.kickoff(inputs=inputs) |
| print(f"Raw crew result type: {type(result)}") |
| print(f"Raw crew result: {repr(str(result)[:200])}...") |
| except Exception as e: |
| print(f"Crew execution error: {e}") |
| return {"raw_output": f"Crew execution failed: {str(e)}", "parse_error": str(e)} |
| |
| try: |
| |
| result_str = str(result).strip() |
| |
| |
| if result_str.startswith("```json"): |
| result_str = result_str[7:] |
| elif result_str.startswith("```"): |
| result_str = result_str[3:] |
| |
| if result_str.endswith("```"): |
| result_str = result_str[:-3] |
| |
| |
| result_str = result_str.strip() |
| |
| |
| if len(result_str) >= 2 and result_str.startswith('"') and result_str.endswith('"'): |
| try: |
| |
| unquoted = result_str[1:-1] |
| |
| unquoted = unquoted.replace('\\"', '"').replace('\\n', '\n') |
| test_parse = json.loads(unquoted) |
| result_str = unquoted |
| except: |
| |
| pass |
| |
| |
| criteria_json = json.loads(result_str) |
| return criteria_json |
| |
| except json.JSONDecodeError as e: |
| print(f"Primary method failed, trying simplified approach...") |
| try: |
| return self.get_hedis_criteria_simple(suggestions) |
| except Exception as fallback_error: |
| print(f"Fallback method also failed: {fallback_error}") |
| return {"raw_output": str(result), "parse_error": f"Primary: {str(e)}, Fallback: {str(fallback_error)}"} |
| except Exception as e: |
| print(f"Unexpected error: {e}") |
| return {"raw_output": str(result) if 'result' in locals() else "No result", "parse_error": str(e)} |
|
|
| def get_hedis_criteria_simple(self, suggestions: str = "") -> dict: |
| """ |
| Simplified method to get HEDIS criteria with minimal string formatting. |
| """ |
| try: |
| |
| prompt_parts = [ |
| f"Provide HEDIS measure {self.measure_code} criteria for year {self.measurement_year}.", |
| "Include: required tests, medications, codes, timeframes, inclusions, exclusions.", |
| ] |
| |
| if suggestions: |
| prompt_parts.append(f"Suggestions: {suggestions}") |
| |
| prompt_parts.append("Respond with valid JSON only. No markdown, no code blocks, just JSON.") |
| |
| simple_description = " ".join(prompt_parts) |
| |
| hedis_task = Task( |
| description=simple_description, |
| expected_output="JSON object with measure criteria", |
| agent=self.hedis_expert, |
| ) |
|
|
| |
| criteria_crew = Crew( |
| agents=[self.hedis_expert], |
| tasks=[hedis_task], |
| process=Process.sequential, |
| verbose=True, |
| ) |
|
|
| |
| inputs = {"measure_code": self.measure_code, "measurement_year": self.measurement_year} |
| result = criteria_crew.kickoff(inputs=inputs) |
| |
| |
| result_str = str(result).strip() |
| |
| |
| if result_str.startswith("```"): |
| lines = result_str.split('\n') |
| |
| if lines[0].strip().startswith('```'): |
| lines = lines[1:] |
| if lines and lines[-1].strip() == '```': |
| lines = lines[:-1] |
| result_str = '\n'.join(lines).strip() |
| |
| return json.loads(result_str) |
| |
| except Exception as e: |
| print(f"Simple method error: {e}") |
| return {"raw_output": str(result) if 'result' in locals() else "No result", "parse_error": str(e)} |
|
|
| def display_criteria(self, criteria: dict) -> None: |
| """Display the HEDIS criteria in a user-friendly format.""" |
| print(f"\n{'='*60}") |
| print(f"HEDIS MEASURE {self.measure_code} CRITERIA") |
| print(f"Measurement Year: {self.measurement_year}") |
| print(f"{'='*60}") |
| |
| if "raw_output" in criteria: |
| print(criteria["raw_output"]) |
| else: |
| if "measure" in criteria: |
| print(f"\n📋 Measure: {criteria['measure']}") |
| |
| if "required_tests" in criteria: |
| print(f"\n🔬 Required Tests/Procedures:") |
| if isinstance(criteria['required_tests'], list): |
| for test in criteria['required_tests']: |
| print(f" • {test}") |
| else: |
| print(f" {criteria['required_tests']}") |
| |
| if "required_medications" in criteria: |
| print(f"\n💊 Required Medications:") |
| if isinstance(criteria['required_medications'], list): |
| for med in criteria['required_medications']: |
| print(f" • {med}") |
| else: |
| print(f" {criteria['required_medications']}") |
| |
| if "codes" in criteria: |
| print(f"\n🏷️ Relevant Codes:") |
| if isinstance(criteria['codes'], dict): |
| for code_type, codes in criteria['codes'].items(): |
| print(f" {code_type}: {codes}") |
| else: |
| print(f" {criteria['codes']}") |
| |
| if "timeframes" in criteria: |
| print(f"\n⏰ Timeframes:") |
| if isinstance(criteria['timeframes'], dict): |
| for requirement, timeframe in criteria['timeframes'].items(): |
| print(f" {requirement}: {timeframe}") |
| else: |
| print(f" {criteria['timeframes']}") |
| |
| if "inclusions" in criteria: |
| print(f"\n✅ Inclusion Criteria:") |
| if isinstance(criteria['inclusions'], list): |
| for inclusion in criteria['inclusions']: |
| print(f" • {inclusion}") |
| else: |
| print(f" {criteria['inclusions']}") |
| |
| if "exclusions" in criteria: |
| print(f"\n❌ Exclusion Criteria:") |
| if isinstance(criteria['exclusions'], list): |
| for exclusion in criteria['exclusions']: |
| print(f" • {exclusion}") |
| else: |
| print(f" {criteria['exclusions']}") |
|
|
| def get_user_feedback(self) -> tuple[str, str]: |
| """ |
| Get user feedback on the displayed criteria. |
| Returns (action, suggestions) where action is 'continue', 'modify', or 'exit' |
| """ |
| print(f"\n{'='*60}") |
| print("REVIEW THE CRITERIA ABOVE") |
| print(f"{'='*60}") |
| print("Options:") |
| print("• Press ENTER to continue with these criteria") |
| print("• Type suggestions to modify the criteria") |
| print("• Type 'exit' to stop the analysis") |
| print(f"{'='*60}") |
| |
| user_input = input("\nYour choice: ").strip() |
| |
| if user_input.lower() == 'exit': |
| return 'exit', '' |
| elif user_input == '': |
| return 'continue', '' |
| else: |
| return 'modify', user_input |
|
|
| def run_interactive(self) -> str: |
| """ |
| Run the interactive HEDIS compliance analysis with user feedback loop. |
| """ |
| print(f"Starting Interactive HEDIS Analysis for measure {self.measure_code}") |
| |
| suggestions = "" |
| while True: |
| |
| print(f"\n🔍 Getting HEDIS criteria for {self.measure_code}...") |
| criteria = self.get_hedis_criteria_with_feedback(suggestions) |
| |
| |
| self.display_criteria(criteria) |
| |
| |
| action, user_suggestions = self.get_user_feedback() |
| |
| if action == 'exit': |
| print("\n❌ Analysis cancelled by user.") |
| return "Analysis cancelled by user." |
| elif action == 'continue': |
| print("\n✅ Criteria approved! Proceeding with chart analysis...") |
| self.hedis_criteria = criteria |
| break |
| elif action == 'modify': |
| print(f"\n🔄 Modifying criteria based on your suggestions...") |
| suggestions = user_suggestions |
| continue |
| |
| |
| return self.run_full_analysis() |
|
|
| def run_full_analysis(self) -> str: |
| """ |
| Run the full analysis with approved criteria. |
| """ |
| |
| patient_task = Task( |
| description=( |
| f"Using the patient_chart_search tool, extract only the tests, diagnoses, procedures, " |
| f"medications, and dates relevant to {self.measure_code}. Only include measure related info from the patient charts. " |
| f"Use these approved criteria as reference: {json.dumps(self.hedis_criteria, indent=2)}" |
| ), |
| expected_output="Structured list of patient evidence with dates.", |
| agent=self.patient_analyzer, |
| async_execution=True, |
| ) |
|
|
| chart_analysis_task = Task( |
| description=( |
| f"Create a comprehensive markdown report analyzing the patient chart for HEDIS {self.measure_code} " |
| f"for measurement year {self.measurement_year}.\n\n" |
| "Sections:\n" |
| "### HEDIS Measure Criteria (Approved)\n" |
| "### Diagnoses Found\n" |
| "### Procedures Found\n" |
| "### Laboratory Tests\n" |
| "### Medications\n" |
| "### Vital Signs\n" |
| "### Exclusions Found\n" |
| "### Missing Information\n" |
| "### Compliance Summary\n" |
| f"Use these approved criteria as reference: {json.dumps(self.hedis_criteria, indent=2)}\n" |
| "Make sure only measure related information is included." |
| ), |
| expected_output="A thorough markdown report of HEDIS findings with compliance assessment.", |
| agent=self.hedis_chart_reporter, |
| context=[patient_task], |
| ) |
|
|
| |
| analysis_crew = Crew( |
| agents=[self.patient_analyzer, self.hedis_chart_reporter], |
| tasks=[patient_task, chart_analysis_task], |
| process=Process.sequential, |
| verbose=True, |
| ) |
|
|
| |
| inputs = { |
| "measure_code": self.measure_code, |
| "measurement_year": self.measurement_year, |
| } |
| |
| print(f"\n🔄 Running patient chart analysis...") |
| result = analysis_crew.kickoff(inputs=inputs) |
| |
| print(f"\n✅ Analysis complete!") |
| return str(result) |
|
|
| def run(self) -> str: |
| """ |
| Entry point - runs original non-interactive analysis for app compatibility. |
| Use run_interactive() for interactive mode. |
| """ |
| |
| hedis_task = Task( |
| description=( |
| "Provide the official criteria for HEDIS measure {measure_code}, including:\n" |
| "- Required tests or procedures\n" |
| "- Diagnosis/procedure codes\n" |
| "- Required medications\n" |
| "- Inclusion and exclusion criteria\n" |
| "- Timeframes for each requirement (e.g., within 10 years of {measurement_year})\n\n" |
| "You MUST compute and include absolute date ranges using {measurement_year}.\n" |
| "Return JSON with fields: 'measure','required_tests','required_medications'," |
| "'codes','timeframes','inclusions','exclusions'." |
| ), |
| expected_output="Structured JSON with absolute date ranges.", |
| agent=self.hedis_expert, |
| async_execution=True, |
| ) |
|
|
| patient_task = Task( |
| description=( |
| "Using the patient_chart_search tool, extract only the tests, diagnoses, procedures, " |
| "medications, and dates relevant to {measure_code}. Only include measure related info from the patient charts" |
| ), |
| expected_output="Structured list of patient evidence with dates.", |
| agent=self.patient_analyzer, |
| async_execution=True, |
| ) |
|
|
| chart_analysis_task = Task( |
| description=( |
| "Create a comprehensive markdown report analyzing the patient chart for HEDIS {measure_code} " |
| "for measurement year {measurement_year}.\n\n" |
| "Sections:\n" |
| "### Diagnoses Found\n" |
| "### Procedures Found\n" |
| "### Laboratory Tests\n" |
| "### Medications\n" |
| "### Vital Signs\n" |
| "### Exclusions Found\n" |
| "### Missing Information\n" |
| "### Summary\n" |
| "Make sure only measure related information is there." |
| ), |
| expected_output="A thorough markdown report of HEDIS findings.", |
| agent=self.hedis_chart_reporter, |
| context=[hedis_task, patient_task], |
| ) |
|
|
| crew = Crew( |
| agents=[self.hedis_expert, self.patient_analyzer, self.hedis_chart_reporter], |
| tasks=[hedis_task, patient_task, chart_analysis_task], |
| process=Process.sequential, |
| verbose=True, |
| ) |
|
|
| inputs = { |
| "measure_code": self.measure_code, |
| "measurement_year": self.measurement_year, |
| } |
| result = crew.kickoff(inputs=inputs) |
| return str(result) |