# hedis_engine.py # hedis_engine.py - Updated with Interactive Features import os import json from crewai import Agent, Task, Crew, Process from crewai_tools import tool from langchain_community.vectorstores import Chroma from langchain_openai import ChatOpenAI class HedisComplianceEngine: """ Crew that: (1) states official HEDIS measure criteria, (2) extracts patient evidence from embeddings, and (3) produces a markdown analysis report. Now includes interactive feedback capabilities. """ def __init__(self, vectordb: Chroma, measure_code: str, measurement_year: int): self.vectordb = vectordb self.measure_code = measure_code self.measurement_year = measurement_year self.model = ChatOpenAI(model="gpt-4o", temperature=0, max_tokens=5096) self.hedis_criteria = None # Store approved criteria @tool("patient_chart_search") def patient_chart_search(query: str) -> str: """Return top patient chart passages relevant to the query.""" results = self.vectordb.similarity_search(query, k=15) return "\n".join([res.page_content for res in results]) # 1) HEDIS measure expert (external knowledge via model) self.hedis_expert = Agent( role="HEDIS Measure Expert", goal=( f"Provide accurate, official details for HEDIS measure {measure_code} " "including required tests, codes, timeframes, and exclusions. " "ALWAYS respond with valid JSON format only." ), verbose=True, memory=False, tools=[], # design choice: no web/search tools here; rely on model knowledge backstory=( "Healthcare quality expert with deep knowledge of NCQA HEDIS specifications. " "Must compute concrete absolute date ranges given the measurement year. " "CRITICAL: Always respond with properly formatted JSON without any markdown or extra text." ), llm=self.model, ) # 2) Patient chart analyzer (RAG over embeddings) self.patient_analyzer = Agent( role="Patient Chart Analyzer", goal=f"Extract tests, diagnoses, procedures, meds & dates relevant to HEDIS {measure_code} only. Don't include anything which is not related to this measure.", verbose=True, memory=False, tools=[patient_chart_search], backstory="Specialist in retrieving structured clinical data from patient chart embeddings.", llm=self.model, ) # 3) Reporter who composes a clean markdown report self.hedis_chart_reporter = Agent( role="HEDIS Chart Findings Reporter", goal=f"Create a comprehensive markdown report of findings for HEDIS {measure_code}.", verbose=True, memory=False, tools=[patient_chart_search], backstory=( "Clinical documentation specialist who organizes evidence and highlights " "relevant data for quality measure analysis." ), llm=self.model, ) def get_hedis_criteria_with_feedback(self, suggestions: str = "") -> dict: """ Get HEDIS criteria from expert and present to user for feedback. Returns approved criteria or None if user exits. """ # Build the task description carefully to avoid string formatting issues base_description = f"""Provide the official criteria for HEDIS measure {self.measure_code}, including: - Required tests or procedures - Diagnosis/procedure codes - Required medications - Inclusion and exclusion criteria - Timeframes for each requirement (compute absolute date ranges using {self.measurement_year}) You MUST compute and include absolute date ranges using {self.measurement_year}.""" if suggestions: base_description += f"\n\nIncorporate these suggestions: {suggestions}" formatting_requirements = """ CRITICAL FORMATTING REQUIREMENTS: - Return ONLY valid JSON without any markdown formatting, code blocks, or extra text - Do not wrap the JSON in code blocks - Do not add any explanatory text before or after the JSON - The response must start with {{ and end with }} - Ensure all JSON keys and string values are properly quoted - Required JSON fields: measure, required_tests, required_medications, codes, timeframes, inclusions, exclusions""" full_description = base_description + formatting_requirements hedis_task = Task( description=full_description, expected_output="Valid JSON object with required fields (no markdown, no code blocks)", agent=self.hedis_expert, ) # Create temporary crew just for getting criteria criteria_crew = Crew( agents=[self.hedis_expert], tasks=[hedis_task], process=Process.sequential, verbose=True, ) # Execute to get criteria inputs = { "measure_code": self.measure_code, "measurement_year": self.measurement_year, } try: result = criteria_crew.kickoff(inputs=inputs) print(f"Raw crew result type: {type(result)}") print(f"Raw crew result: {repr(str(result)[:200])}...") # Show first 200 chars except Exception as e: print(f"Crew execution error: {e}") return {"raw_output": f"Crew execution failed: {str(e)}", "parse_error": str(e)} try: # Clean the result string - remove markdown code blocks and extra whitespace result_str = str(result).strip() # Remove markdown code blocks if present if result_str.startswith("```json"): result_str = result_str[7:] # Remove ```json elif result_str.startswith("```"): result_str = result_str[3:] # Remove ``` if result_str.endswith("```"): result_str = result_str[:-3] # Remove closing ``` # Remove any leading/trailing whitespace and newlines result_str = result_str.strip() # Handle quoted JSON strings if len(result_str) >= 2 and result_str.startswith('"') and result_str.endswith('"'): try: # Try to unquote and parse unquoted = result_str[1:-1] # Handle escaped quotes in the JSON unquoted = unquoted.replace('\\"', '"').replace('\\n', '\n') test_parse = json.loads(unquoted) result_str = unquoted except: # If unquoting fails, keep original pass # Try to parse as JSON criteria_json = json.loads(result_str) return criteria_json except json.JSONDecodeError as e: print(f"Primary method failed, trying simplified approach...") try: return self.get_hedis_criteria_simple(suggestions) except Exception as fallback_error: print(f"Fallback method also failed: {fallback_error}") return {"raw_output": str(result), "parse_error": f"Primary: {str(e)}, Fallback: {str(fallback_error)}"} except Exception as e: print(f"Unexpected error: {e}") return {"raw_output": str(result) if 'result' in locals() else "No result", "parse_error": str(e)} def get_hedis_criteria_simple(self, suggestions: str = "") -> dict: """ Simplified method to get HEDIS criteria with minimal string formatting. """ try: # Create a simple, direct prompt prompt_parts = [ f"Provide HEDIS measure {self.measure_code} criteria for year {self.measurement_year}.", "Include: required tests, medications, codes, timeframes, inclusions, exclusions.", ] if suggestions: prompt_parts.append(f"Suggestions: {suggestions}") prompt_parts.append("Respond with valid JSON only. No markdown, no code blocks, just JSON.") simple_description = " ".join(prompt_parts) hedis_task = Task( description=simple_description, expected_output="JSON object with measure criteria", agent=self.hedis_expert, ) # Create temporary crew criteria_crew = Crew( agents=[self.hedis_expert], tasks=[hedis_task], process=Process.sequential, verbose=True, ) # Execute inputs = {"measure_code": self.measure_code, "measurement_year": self.measurement_year} result = criteria_crew.kickoff(inputs=inputs) # Try to parse the result result_str = str(result).strip() # Basic cleaning if result_str.startswith("```"): lines = result_str.split('\n') # Remove first and last lines if they contain ``` if lines[0].strip().startswith('```'): lines = lines[1:] if lines and lines[-1].strip() == '```': lines = lines[:-1] result_str = '\n'.join(lines).strip() return json.loads(result_str) except Exception as e: print(f"Simple method error: {e}") return {"raw_output": str(result) if 'result' in locals() else "No result", "parse_error": str(e)} def display_criteria(self, criteria: dict) -> None: """Display the HEDIS criteria in a user-friendly format.""" print(f"\n{'='*60}") print(f"HEDIS MEASURE {self.measure_code} CRITERIA") print(f"Measurement Year: {self.measurement_year}") print(f"{'='*60}") if "raw_output" in criteria: print(criteria["raw_output"]) else: if "measure" in criteria: print(f"\nšŸ“‹ Measure: {criteria['measure']}") if "required_tests" in criteria: print(f"\nšŸ”¬ Required Tests/Procedures:") if isinstance(criteria['required_tests'], list): for test in criteria['required_tests']: print(f" • {test}") else: print(f" {criteria['required_tests']}") if "required_medications" in criteria: print(f"\nšŸ’Š Required Medications:") if isinstance(criteria['required_medications'], list): for med in criteria['required_medications']: print(f" • {med}") else: print(f" {criteria['required_medications']}") if "codes" in criteria: print(f"\nšŸ·ļø Relevant Codes:") if isinstance(criteria['codes'], dict): for code_type, codes in criteria['codes'].items(): print(f" {code_type}: {codes}") else: print(f" {criteria['codes']}") if "timeframes" in criteria: print(f"\nā° Timeframes:") if isinstance(criteria['timeframes'], dict): for requirement, timeframe in criteria['timeframes'].items(): print(f" {requirement}: {timeframe}") else: print(f" {criteria['timeframes']}") if "inclusions" in criteria: print(f"\nāœ… Inclusion Criteria:") if isinstance(criteria['inclusions'], list): for inclusion in criteria['inclusions']: print(f" • {inclusion}") else: print(f" {criteria['inclusions']}") if "exclusions" in criteria: print(f"\nāŒ Exclusion Criteria:") if isinstance(criteria['exclusions'], list): for exclusion in criteria['exclusions']: print(f" • {exclusion}") else: print(f" {criteria['exclusions']}") def get_user_feedback(self) -> tuple[str, str]: """ Get user feedback on the displayed criteria. Returns (action, suggestions) where action is 'continue', 'modify', or 'exit' """ print(f"\n{'='*60}") print("REVIEW THE CRITERIA ABOVE") print(f"{'='*60}") print("Options:") print("• Press ENTER to continue with these criteria") print("• Type suggestions to modify the criteria") print("• Type 'exit' to stop the analysis") print(f"{'='*60}") user_input = input("\nYour choice: ").strip() if user_input.lower() == 'exit': return 'exit', '' elif user_input == '': return 'continue', '' else: return 'modify', user_input def run_interactive(self) -> str: """ Run the interactive HEDIS compliance analysis with user feedback loop. """ print(f"Starting Interactive HEDIS Analysis for measure {self.measure_code}") suggestions = "" while True: # Get criteria from expert print(f"\nšŸ” Getting HEDIS criteria for {self.measure_code}...") criteria = self.get_hedis_criteria_with_feedback(suggestions) # Display criteria to user self.display_criteria(criteria) # Get user feedback action, user_suggestions = self.get_user_feedback() if action == 'exit': print("\nāŒ Analysis cancelled by user.") return "Analysis cancelled by user." elif action == 'continue': print("\nāœ… Criteria approved! Proceeding with chart analysis...") self.hedis_criteria = criteria break elif action == 'modify': print(f"\nšŸ”„ Modifying criteria based on your suggestions...") suggestions = user_suggestions continue # Now proceed with the rest of the analysis using approved criteria return self.run_full_analysis() def run_full_analysis(self) -> str: """ Run the full analysis with approved criteria. """ # Create tasks for patient analysis and reporting patient_task = Task( description=( f"Using the patient_chart_search tool, extract only the tests, diagnoses, procedures, " f"medications, and dates relevant to {self.measure_code}. Only include measure related info from the patient charts. " f"Use these approved criteria as reference: {json.dumps(self.hedis_criteria, indent=2)}" ), expected_output="Structured list of patient evidence with dates.", agent=self.patient_analyzer, async_execution=True, ) chart_analysis_task = Task( description=( f"Create a comprehensive markdown report analyzing the patient chart for HEDIS {self.measure_code} " f"for measurement year {self.measurement_year}.\n\n" "Sections:\n" "### HEDIS Measure Criteria (Approved)\n" "### Diagnoses Found\n" "### Procedures Found\n" "### Laboratory Tests\n" "### Medications\n" "### Vital Signs\n" "### Exclusions Found\n" "### Missing Information\n" "### Compliance Summary\n" f"Use these approved criteria as reference: {json.dumps(self.hedis_criteria, indent=2)}\n" "Make sure only measure related information is included." ), expected_output="A thorough markdown report of HEDIS findings with compliance assessment.", agent=self.hedis_chart_reporter, context=[patient_task], ) # Create crew for full analysis analysis_crew = Crew( agents=[self.patient_analyzer, self.hedis_chart_reporter], tasks=[patient_task, chart_analysis_task], process=Process.sequential, verbose=True, ) # Execute full analysis inputs = { "measure_code": self.measure_code, "measurement_year": self.measurement_year, } print(f"\nšŸ”„ Running patient chart analysis...") result = analysis_crew.kickoff(inputs=inputs) print(f"\nāœ… Analysis complete!") return str(result) def run(self) -> str: """ Entry point - runs original non-interactive analysis for app compatibility. Use run_interactive() for interactive mode. """ # Original tasks hedis_task = Task( description=( "Provide the official criteria for HEDIS measure {measure_code}, including:\n" "- Required tests or procedures\n" "- Diagnosis/procedure codes\n" "- Required medications\n" "- Inclusion and exclusion criteria\n" "- Timeframes for each requirement (e.g., within 10 years of {measurement_year})\n\n" "You MUST compute and include absolute date ranges using {measurement_year}.\n" "Return JSON with fields: 'measure','required_tests','required_medications'," "'codes','timeframes','inclusions','exclusions'." ), expected_output="Structured JSON with absolute date ranges.", agent=self.hedis_expert, async_execution=True, ) patient_task = Task( description=( "Using the patient_chart_search tool, extract only the tests, diagnoses, procedures, " "medications, and dates relevant to {measure_code}. Only include measure related info from the patient charts" ), expected_output="Structured list of patient evidence with dates.", agent=self.patient_analyzer, async_execution=True, ) chart_analysis_task = Task( description=( "Create a comprehensive markdown report analyzing the patient chart for HEDIS {measure_code} " "for measurement year {measurement_year}.\n\n" "Sections:\n" "### Diagnoses Found\n" "### Procedures Found\n" "### Laboratory Tests\n" "### Medications\n" "### Vital Signs\n" "### Exclusions Found\n" "### Missing Information\n" "### Summary\n" "Make sure only measure related information is there." ), expected_output="A thorough markdown report of HEDIS findings.", agent=self.hedis_chart_reporter, context=[hedis_task, patient_task], ) crew = Crew( agents=[self.hedis_expert, self.patient_analyzer, self.hedis_chart_reporter], tasks=[hedis_task, patient_task, chart_analysis_task], process=Process.sequential, verbose=True, ) inputs = { "measure_code": self.measure_code, "measurement_year": self.measurement_year, } result = crew.kickoff(inputs=inputs) return str(result)