Spaces:
Sleeping
Sleeping
File size: 14,077 Bytes
a7fab2a 17bdc17 a7fab2a 87535ae 17bdc17 a7fab2a 17bdc17 a790bd3 a7fab2a a790bd3 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a 17bdc17 a7fab2a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 | # comorbidity_checker.py
import json
from typing import List, Dict
from json_repair import repair_json
from crewai import Agent, Task, Crew, Process
from crewai_tools import tool, SerperDevTool
from langchain_openai import ChatOpenAI
from embedding_manager import DirectoryEmbeddingManager
class ComorbidityCheckerAgent:
"""
Two-step flow:
1) Identify clinically significant comorbidities for the primary diagnosis (HCC-aware).
2) Verify each comorbidity against the patient chart embeddings (top-15).
"""
def __init__(self, pdf_dir_or_file: str, hcc_code: str, model_version: str, model: str = "gpt-4o"):
self.embed_manager = DirectoryEmbeddingManager(pdf_dir_or_file)
self.llm = ChatOpenAI(model=model, temperature=0)
self.hcc_code = hcc_code.strip()
self.model_version = model_version.strip().upper()
self.search_tool = SerperDevTool() # available if you want to expand later
@tool("patient_chart_search")
def patient_chart_search(query: str) -> str:
"""
Query persistent patient-chart embeddings.
Returns the top-15 results concatenated with separators.
"""
print(f"\n[TOOL LOG] Searching patient chart for: '{query}'")
vectordb = self.embed_manager.get_or_create_embeddings()
results = vectordb.similarity_search(query, k=15)
return "\n---\n".join([res.page_content for res in results])
self.patient_chart_search = patient_chart_search
self.agent = Agent(
role="Clinical Coding and Comorbidity Analyst",
goal=(
"Identify clinically significant comorbidities for a primary diagnosis relevant to HCC; "
"verify presence in the patient's chart with embeddings."
),
backstory=(
"Expert risk-adjustment analyst who cross-references guidelines with chart evidence."
),
tools=[self.patient_chart_search],
verbose=True,
memory=False,
llm=self.llm,
)
def _check_comorbidities_for_one(self, diagnosis_entry: Dict) -> Dict:
primary_diagnosis = diagnosis_entry["diagnosis"]
final_result: Dict = {"diagnosis": primary_diagnosis, "comorbidities": []}
# Task 1 — Identify comorbidities
identify_task = Task(
description=(
f"For primary diagnosis '{primary_diagnosis}', list common and clinically meaningful comorbidities "
f"that matter for HCC {self.hcc_code} in {self.model_version}."
"Return STRICT JSON: {\"potential_comorbidities\": [\"...\"]}"
),
expected_output="Strict JSON with key potential_comorbidities (list of strings).",
agent=self.agent,
json_mode=True
)
crew = Crew(agents=[self.agent], tasks=[identify_task], process=Process.sequential)
identified = crew.kickoff()
try:
comorbidities = json.loads(repair_json(identified)).get("potential_comorbidities", [])
except Exception:
comorbidities = []
if not comorbidities:
return final_result
# Task 2 — Verify each comorbidity via patient_chart_search
verify_task = Task(
description=(
f"Primary diagnosis: '{primary_diagnosis}'. Potential comorbidities: {comorbidities}.\n"
"For EACH comorbidity, call the patient_chart_search tool (top-15). "
"Decide presence/absence using ONLY returned snippets.\n\n"
"Return STRICT JSON:\n"
"{ \"comorbidity_analysis\": [\n"
" {\"condition\":\"...\",\"is_present\":true/false,\"context\":\"<combined snippets>\",\"rationale\":\"...\"},\n"
" ... ] }"
),
expected_output="Strict JSON with key comorbidity_analysis (list of objects).",
agent=self.agent,
json_mode=True
)
crew = Crew(agents=[self.agent], tasks=[verify_task], process=Process.sequential)
verified = crew.kickoff()
try:
analysis = json.loads(repair_json(verified))
final_result["comorbidities"] = analysis.get("comorbidity_analysis", [])
except Exception:
final_result["comorbidities"] = []
return final_result
def run(self, meat_validated_results: List[Dict]) -> List[Dict]:
"""
Accepts entries that already passed MEAT (i.e., meat dict exists and has True somewhere).
"""
out: List[Dict] = []
for entry in meat_validated_results:
meat = entry.get("meat", {})
if isinstance(meat, dict) and any(meat.values()):
print(f"[INFO] Checking structured comorbidities for: {entry['diagnosis']}")
out.append(self._check_comorbidities_for_one(entry))
else:
# If earlier stages claim 'yes' but MEAT not met, pass through with a note.
if entry.get("answer_explicit", "").lower() == "yes" or entry.get("answer_implicit", "").lower() == "yes":
entry["comorbidities"] = {
"status": "MEAT criteria not met; not proceeding with comorbidity analysis."
}
out.append(entry)
return out
# import os
# import json
# import pandas as pd
# from PyPDF2 import PdfReader
# from json_repair import repair_json
# from typing import List, Dict, Any, Optional
# from crewai import Agent, Task, Crew, Process
# from crewai_tools import SerperDevTool,tool
# from langchain_openai import ChatOpenAI
# from langchain_community.vectorstores import Chroma
# from embedding_manager import DirectoryEmbeddingManager
# SEED_SOURCES = [
# "https://www.cms.gov/medicare/payment/medicare-advantage-rates-statistics/risk-adjustment",
# "https://www.cms.gov/data-research/monitoring-programs/medicare-risk-adjustment-data-validation-program",
# "https://www.cms.gov/files/document/fy-2024-icd-10-cm-coding-guidelines-updated-02/01/2024.pdf",
# "https://www.aapc.com/blog/41212-include-meat-in-your-risk-adjustment-documentation/",
# ]
# class ComorbidityCheckerAgent:
# """
# Uses a two-step AI agent process to first identify potential comorbidities for a
# MEAT-validated diagnosis and then verifies each one against the patient chart context.
# """
# def __init__(self, pdf_dir: str, hcc_code: str, model_version: str):
# self.embed_manager = DirectoryEmbeddingManager(pdf_dir)
# self.llm = ChatOpenAI(model=os.environ.get("OPENAI_MODEL_NAME", "gpt-4o"), temperature=0)
# self.hcc_code = hcc_code.strip()
# self.model_version = model_version.strip().upper()
# self.search_tool = SerperDevTool()
# #self.search_tool = SerperDevTool(seed_sources=SEED_SOURCES)
# @tool("patient_chart_search")
# def patient_chart_search(query: str) -> str:
# """
# Search the patient chart embeddings and return all top 15 results as a single string.
# Each result is preserved individually and then combined at the end.
# """
# print(f"\n[TOOL LOG] Searching patient chart for: '{query}'")
# vectordb = self.embed_manager.get_or_create_embeddings()
# results = vectordb.similarity_search(query, k=15)
# # Keep all 15 results separate internally
# all_results = [res.page_content for res in results]
# # Combine into a single string for output (same format as before)
# combined_results = "\n---\n".join(all_results)
# return combined_results
# # Register the agent with the tool
# self.agent = Agent(
# role="Clinical Coding and Comorbidity Analyst",
# goal=(
# "First, identify all clinically significant comorbidities for a given primary diagnosis, "
# "focusing on those relevant to HCC risk adjustment. Second, verify the presence of "
# "these comorbidities in a patient's chart and present the findings in a structured JSON format."
# ),
# backstory=(
# "You are an expert clinical coding analyst specializing in risk adjustment and Hierarchical Condition Categories (HCC). "
# "Your primary skill is to research disease patterns and then meticulously cross-reference them with patient records embeddings "
# "to ensure accurate documentation and coding. You provide clear, evidence-based findings."
# ),
# tools=[patient_chart_search],
# verbose=True,
# memory=False,
# llm=self.llm,
# )
# def check_comorbidities_for_one(self, diagnosis_entry: Dict) -> Dict:
# """
# Orchestrates the two-task process for a single primary diagnosis.
# """
# primary_diagnosis = diagnosis_entry["diagnosis"]
# final_result = {"diagnosis": primary_diagnosis, "comorbidities": []}
# # --- Task 1: Identify Potential Comorbidities ---
# identify_task = Task(
# description=(
# f"For the primary diagnosis of '{primary_diagnosis}', generate a full list of common and clinically "
# f"significant comorbidities. Focus on conditions relevant for HCC {self.hcc_code} risk adjustment "
# f"in the {self.model_version} model. Use your search tool for research if needed."
# ),
# expected_output=(
# "A JSON object with a single key 'potential_comorbidities' containing a list of strings. "
# "Example: {\"potential_comorbidities\": [\"Hypertension\", \"Diabetes Mellitus Type 2\"]}"
# ),
# agent=self.agent,
# json_mode=True
# )
# print(f"\n[TASK 1] Identifying potential comorbidities for '{primary_diagnosis}'...")
# crew = Crew(agents=[self.agent], tasks=[identify_task], process=Process.sequential)
# result = crew.kickoff()
# try:
# comorbidities_to_check = json.loads(repair_json(result)).get("potential_comorbidities", [])
# if not comorbidities_to_check:
# print("[INFO] No potential comorbidities were identified by the agent.")
# return final_result
# print(f"[INFO] Identified potential comorbidities: {comorbidities_to_check}")
# except (json.JSONDecodeError, TypeError):
# print("[ERROR] Failed to decode the list of potential comorbidities. Aborting.")
# return final_result
# # --- Task 2: Verify Each Comorbidity in the Chart ---
# verify_task = Task(
# description=(
# f"The patient has a primary diagnosis of '{primary_diagnosis}'.\n"
# f"A list of potential comorbidities has been identified: {comorbidities_to_check}.\n\n"
# "For EACH comorbidity, you MUST use the `patient_chart_search` tool, which queries the persistent "
# "embedding database of the patient's chart. **Use all 15 retrieved results individually** to "
# "determine presence or absence of each comorbidity.\n\n"
# "After reviewing all results, construct a final JSON object with a single key 'comorbidity_analysis'. "
# "Ensure there is one object for EACH comorbidity from the initial list. The 'context' field should "
# "combine all relevant evidence snippets into a single string."
# ),
# expected_output=(
# "A final JSON object with the key 'comorbidity_analysis'. This key should contain a list "
# "where each item has the structure: \n"
# '{\n'
# ' "condition": "<name of comorbidity>",\n'
# ' "is_present": true/false,\n'
# ' "context": "<Use all 15 retrieved results individually and combined them according to comorbidity>",\n'
# ' "rationale": "<one-line explanation of your finding>"\n'
# '}'
# ),
# agent=self.agent,
# json_mode=True
# )
# print(f"\n[TASK 2] Verifying identified comorbidities in the patient chart...")
# crew = Crew(agents=[self.agent], tasks=[verify_task], process=Process.sequential)
# result = crew.kickoff()
# try:
# analysis = json.loads(repair_json(result))
# final_result["comorbidities"] = analysis.get("comorbidity_analysis", [])
# except (json.JSONDecodeError, TypeError):
# print(f"[ERROR] Failed to decode the final comorbidity analysis for '{primary_diagnosis}'.")
# final_result["comorbidities"] = []
# return final_result
# def run(self, meat_validated_results: List[Dict]) -> List[Dict]:
# """
# Main execution loop. It iterates through diagnoses that have met MEAT criteria
# and runs the comorbidity check for each.
# """
# final_results = []
# for entry in meat_validated_results:
# meat_criteria = entry.get("meat", {})
# if isinstance(meat_criteria, dict) and any(meat_criteria.values()):
# print(f"\n[INFO] Checking for structured comorbidities for: {entry['diagnosis']}")
# entry_with_comorbidities = self.check_comorbidities_for_one(entry)
# final_results.append(entry_with_comorbidities)
# print(f"[COMORBIDITIES CHECKED] Analysis complete for {entry['diagnosis']}.")
# else:
# if entry.get("answer", "").lower() == "yes":
# entry["comorbidities"] = {
# "status": "MEAT criteria not met; not proceeding with comorbidity analysis."
# }
# final_results.append(entry)
# return final_results
|