abjasrees's picture
Update hedis_engine.py
6edef9f verified
# hedis_engine.py
# hedis_engine.py - Updated with Interactive Features
import os
import json
from crewai import Agent, Task, Crew, Process
from crewai_tools import tool
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI
class HedisComplianceEngine:
"""
Crew that: (1) states official HEDIS measure criteria,
(2) extracts patient evidence from embeddings, and
(3) produces a markdown analysis report.
Now includes interactive feedback capabilities.
"""
def __init__(self, vectordb: Chroma, measure_code: str, measurement_year: int):
self.vectordb = vectordb
self.measure_code = measure_code
self.measurement_year = measurement_year
self.model = ChatOpenAI(model="gpt-4o", temperature=0, max_tokens=5096)
self.hedis_criteria = None # Store approved criteria
@tool("patient_chart_search")
def patient_chart_search(query: str) -> str:
"""Return top patient chart passages relevant to the query."""
results = self.vectordb.similarity_search(query, k=15)
return "\n".join([res.page_content for res in results])
# 1) HEDIS measure expert (external knowledge via model)
self.hedis_expert = Agent(
role="HEDIS Measure Expert",
goal=(
f"Provide accurate, official details for HEDIS measure {measure_code} "
"including required tests, codes, timeframes, and exclusions. "
"ALWAYS respond with valid JSON format only."
),
verbose=True,
memory=False,
tools=[], # design choice: no web/search tools here; rely on model knowledge
backstory=(
"Healthcare quality expert with deep knowledge of NCQA HEDIS specifications. "
"Must compute concrete absolute date ranges given the measurement year. "
"CRITICAL: Always respond with properly formatted JSON without any markdown or extra text."
),
llm=self.model,
)
# 2) Patient chart analyzer (RAG over embeddings)
self.patient_analyzer = Agent(
role="Patient Chart Analyzer",
goal=f"Extract tests, diagnoses, procedures, meds & dates relevant to HEDIS {measure_code} only. Don't include anything which is not related to this measure.",
verbose=True,
memory=False,
tools=[patient_chart_search],
backstory="Specialist in retrieving structured clinical data from patient chart embeddings.",
llm=self.model,
)
# 3) Reporter who composes a clean markdown report
self.hedis_chart_reporter = Agent(
role="HEDIS Chart Findings Reporter",
goal=f"Create a comprehensive markdown report of findings for HEDIS {measure_code}.",
verbose=True,
memory=False,
tools=[patient_chart_search],
backstory=(
"Clinical documentation specialist who organizes evidence and highlights "
"relevant data for quality measure analysis."
),
llm=self.model,
)
def get_hedis_criteria_with_feedback(self, suggestions: str = "") -> dict:
"""
Get HEDIS criteria from expert and present to user for feedback.
Returns approved criteria or None if user exits.
"""
# Build the task description carefully to avoid string formatting issues
base_description = f"""Provide the official criteria for HEDIS measure {self.measure_code}, including:
- Required tests or procedures
- Diagnosis/procedure codes
- Required medications
- Inclusion and exclusion criteria
- Timeframes for each requirement (compute absolute date ranges using {self.measurement_year})
You MUST compute and include absolute date ranges using {self.measurement_year}."""
if suggestions:
base_description += f"\n\nIncorporate these suggestions: {suggestions}"
formatting_requirements = """
CRITICAL FORMATTING REQUIREMENTS:
- Return ONLY valid JSON without any markdown formatting, code blocks, or extra text
- Do not wrap the JSON in code blocks
- Do not add any explanatory text before or after the JSON
- The response must start with {{ and end with }}
- Ensure all JSON keys and string values are properly quoted
- Required JSON fields: measure, required_tests, required_medications, codes, timeframes, inclusions, exclusions"""
full_description = base_description + formatting_requirements
hedis_task = Task(
description=full_description,
expected_output="Valid JSON object with required fields (no markdown, no code blocks)",
agent=self.hedis_expert,
)
# Create temporary crew just for getting criteria
criteria_crew = Crew(
agents=[self.hedis_expert],
tasks=[hedis_task],
process=Process.sequential,
verbose=True,
)
# Execute to get criteria
inputs = {
"measure_code": self.measure_code,
"measurement_year": self.measurement_year,
}
try:
result = criteria_crew.kickoff(inputs=inputs)
print(f"Raw crew result type: {type(result)}")
print(f"Raw crew result: {repr(str(result)[:200])}...") # Show first 200 chars
except Exception as e:
print(f"Crew execution error: {e}")
return {"raw_output": f"Crew execution failed: {str(e)}", "parse_error": str(e)}
try:
# Clean the result string - remove markdown code blocks and extra whitespace
result_str = str(result).strip()
# Remove markdown code blocks if present
if result_str.startswith("```json"):
result_str = result_str[7:] # Remove ```json
elif result_str.startswith("```"):
result_str = result_str[3:] # Remove ```
if result_str.endswith("```"):
result_str = result_str[:-3] # Remove closing ```
# Remove any leading/trailing whitespace and newlines
result_str = result_str.strip()
# Handle quoted JSON strings
if len(result_str) >= 2 and result_str.startswith('"') and result_str.endswith('"'):
try:
# Try to unquote and parse
unquoted = result_str[1:-1]
# Handle escaped quotes in the JSON
unquoted = unquoted.replace('\\"', '"').replace('\\n', '\n')
test_parse = json.loads(unquoted)
result_str = unquoted
except:
# If unquoting fails, keep original
pass
# Try to parse as JSON
criteria_json = json.loads(result_str)
return criteria_json
except json.JSONDecodeError as e:
print(f"Primary method failed, trying simplified approach...")
try:
return self.get_hedis_criteria_simple(suggestions)
except Exception as fallback_error:
print(f"Fallback method also failed: {fallback_error}")
return {"raw_output": str(result), "parse_error": f"Primary: {str(e)}, Fallback: {str(fallback_error)}"}
except Exception as e:
print(f"Unexpected error: {e}")
return {"raw_output": str(result) if 'result' in locals() else "No result", "parse_error": str(e)}
def get_hedis_criteria_simple(self, suggestions: str = "") -> dict:
"""
Simplified method to get HEDIS criteria with minimal string formatting.
"""
try:
# Create a simple, direct prompt
prompt_parts = [
f"Provide HEDIS measure {self.measure_code} criteria for year {self.measurement_year}.",
"Include: required tests, medications, codes, timeframes, inclusions, exclusions.",
]
if suggestions:
prompt_parts.append(f"Suggestions: {suggestions}")
prompt_parts.append("Respond with valid JSON only. No markdown, no code blocks, just JSON.")
simple_description = " ".join(prompt_parts)
hedis_task = Task(
description=simple_description,
expected_output="JSON object with measure criteria",
agent=self.hedis_expert,
)
# Create temporary crew
criteria_crew = Crew(
agents=[self.hedis_expert],
tasks=[hedis_task],
process=Process.sequential,
verbose=True,
)
# Execute
inputs = {"measure_code": self.measure_code, "measurement_year": self.measurement_year}
result = criteria_crew.kickoff(inputs=inputs)
# Try to parse the result
result_str = str(result).strip()
# Basic cleaning
if result_str.startswith("```"):
lines = result_str.split('\n')
# Remove first and last lines if they contain ```
if lines[0].strip().startswith('```'):
lines = lines[1:]
if lines and lines[-1].strip() == '```':
lines = lines[:-1]
result_str = '\n'.join(lines).strip()
return json.loads(result_str)
except Exception as e:
print(f"Simple method error: {e}")
return {"raw_output": str(result) if 'result' in locals() else "No result", "parse_error": str(e)}
def display_criteria(self, criteria: dict) -> None:
"""Display the HEDIS criteria in a user-friendly format."""
print(f"\n{'='*60}")
print(f"HEDIS MEASURE {self.measure_code} CRITERIA")
print(f"Measurement Year: {self.measurement_year}")
print(f"{'='*60}")
if "raw_output" in criteria:
print(criteria["raw_output"])
else:
if "measure" in criteria:
print(f"\n📋 Measure: {criteria['measure']}")
if "required_tests" in criteria:
print(f"\n🔬 Required Tests/Procedures:")
if isinstance(criteria['required_tests'], list):
for test in criteria['required_tests']:
print(f" • {test}")
else:
print(f" {criteria['required_tests']}")
if "required_medications" in criteria:
print(f"\n💊 Required Medications:")
if isinstance(criteria['required_medications'], list):
for med in criteria['required_medications']:
print(f" • {med}")
else:
print(f" {criteria['required_medications']}")
if "codes" in criteria:
print(f"\n🏷️ Relevant Codes:")
if isinstance(criteria['codes'], dict):
for code_type, codes in criteria['codes'].items():
print(f" {code_type}: {codes}")
else:
print(f" {criteria['codes']}")
if "timeframes" in criteria:
print(f"\n⏰ Timeframes:")
if isinstance(criteria['timeframes'], dict):
for requirement, timeframe in criteria['timeframes'].items():
print(f" {requirement}: {timeframe}")
else:
print(f" {criteria['timeframes']}")
if "inclusions" in criteria:
print(f"\n✅ Inclusion Criteria:")
if isinstance(criteria['inclusions'], list):
for inclusion in criteria['inclusions']:
print(f" • {inclusion}")
else:
print(f" {criteria['inclusions']}")
if "exclusions" in criteria:
print(f"\n❌ Exclusion Criteria:")
if isinstance(criteria['exclusions'], list):
for exclusion in criteria['exclusions']:
print(f" • {exclusion}")
else:
print(f" {criteria['exclusions']}")
def get_user_feedback(self) -> tuple[str, str]:
"""
Get user feedback on the displayed criteria.
Returns (action, suggestions) where action is 'continue', 'modify', or 'exit'
"""
print(f"\n{'='*60}")
print("REVIEW THE CRITERIA ABOVE")
print(f"{'='*60}")
print("Options:")
print("• Press ENTER to continue with these criteria")
print("• Type suggestions to modify the criteria")
print("• Type 'exit' to stop the analysis")
print(f"{'='*60}")
user_input = input("\nYour choice: ").strip()
if user_input.lower() == 'exit':
return 'exit', ''
elif user_input == '':
return 'continue', ''
else:
return 'modify', user_input
def run_interactive(self) -> str:
"""
Run the interactive HEDIS compliance analysis with user feedback loop.
"""
print(f"Starting Interactive HEDIS Analysis for measure {self.measure_code}")
suggestions = ""
while True:
# Get criteria from expert
print(f"\n🔍 Getting HEDIS criteria for {self.measure_code}...")
criteria = self.get_hedis_criteria_with_feedback(suggestions)
# Display criteria to user
self.display_criteria(criteria)
# Get user feedback
action, user_suggestions = self.get_user_feedback()
if action == 'exit':
print("\n❌ Analysis cancelled by user.")
return "Analysis cancelled by user."
elif action == 'continue':
print("\n✅ Criteria approved! Proceeding with chart analysis...")
self.hedis_criteria = criteria
break
elif action == 'modify':
print(f"\n🔄 Modifying criteria based on your suggestions...")
suggestions = user_suggestions
continue
# Now proceed with the rest of the analysis using approved criteria
return self.run_full_analysis()
def run_full_analysis(self) -> str:
"""
Run the full analysis with approved criteria.
"""
# Create tasks for patient analysis and reporting
patient_task = Task(
description=(
f"Using the patient_chart_search tool, extract only the tests, diagnoses, procedures, "
f"medications, and dates relevant to {self.measure_code}. Only include measure related info from the patient charts. "
f"Use these approved criteria as reference: {json.dumps(self.hedis_criteria, indent=2)}"
),
expected_output="Structured list of patient evidence with dates.",
agent=self.patient_analyzer,
async_execution=True,
)
chart_analysis_task = Task(
description=(
f"Create a comprehensive markdown report analyzing the patient chart for HEDIS {self.measure_code} "
f"for measurement year {self.measurement_year}.\n\n"
"Sections:\n"
"### HEDIS Measure Criteria (Approved)\n"
"### Diagnoses Found\n"
"### Procedures Found\n"
"### Laboratory Tests\n"
"### Medications\n"
"### Vital Signs\n"
"### Exclusions Found\n"
"### Missing Information\n"
"### Compliance Summary\n"
f"Use these approved criteria as reference: {json.dumps(self.hedis_criteria, indent=2)}\n"
"Make sure only measure related information is included."
),
expected_output="A thorough markdown report of HEDIS findings with compliance assessment.",
agent=self.hedis_chart_reporter,
context=[patient_task],
)
# Create crew for full analysis
analysis_crew = Crew(
agents=[self.patient_analyzer, self.hedis_chart_reporter],
tasks=[patient_task, chart_analysis_task],
process=Process.sequential,
verbose=True,
)
# Execute full analysis
inputs = {
"measure_code": self.measure_code,
"measurement_year": self.measurement_year,
}
print(f"\n🔄 Running patient chart analysis...")
result = analysis_crew.kickoff(inputs=inputs)
print(f"\n✅ Analysis complete!")
return str(result)
def run(self) -> str:
"""
Entry point - runs original non-interactive analysis for app compatibility.
Use run_interactive() for interactive mode.
"""
# Original tasks
hedis_task = Task(
description=(
"Provide the official criteria for HEDIS measure {measure_code}, including:\n"
"- Required tests or procedures\n"
"- Diagnosis/procedure codes\n"
"- Required medications\n"
"- Inclusion and exclusion criteria\n"
"- Timeframes for each requirement (e.g., within 10 years of {measurement_year})\n\n"
"You MUST compute and include absolute date ranges using {measurement_year}.\n"
"Return JSON with fields: 'measure','required_tests','required_medications',"
"'codes','timeframes','inclusions','exclusions'."
),
expected_output="Structured JSON with absolute date ranges.",
agent=self.hedis_expert,
async_execution=True,
)
patient_task = Task(
description=(
"Using the patient_chart_search tool, extract only the tests, diagnoses, procedures, "
"medications, and dates relevant to {measure_code}. Only include measure related info from the patient charts"
),
expected_output="Structured list of patient evidence with dates.",
agent=self.patient_analyzer,
async_execution=True,
)
chart_analysis_task = Task(
description=(
"Create a comprehensive markdown report analyzing the patient chart for HEDIS {measure_code} "
"for measurement year {measurement_year}.\n\n"
"Sections:\n"
"### Diagnoses Found\n"
"### Procedures Found\n"
"### Laboratory Tests\n"
"### Medications\n"
"### Vital Signs\n"
"### Exclusions Found\n"
"### Missing Information\n"
"### Summary\n"
"Make sure only measure related information is there."
),
expected_output="A thorough markdown report of HEDIS findings.",
agent=self.hedis_chart_reporter,
context=[hedis_task, patient_task],
)
crew = Crew(
agents=[self.hedis_expert, self.patient_analyzer, self.hedis_chart_reporter],
tasks=[hedis_task, patient_task, chart_analysis_task],
process=Process.sequential,
verbose=True,
)
inputs = {
"measure_code": self.measure_code,
"measurement_year": self.measurement_year,
}
result = crew.kickoff(inputs=inputs)
return str(result)