File size: 7,603 Bytes
1e664a3 1b8d0f1 1e664a3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | import os
import pandas as pd
from typing import TypedDict, List, Annotated
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.documents import Document
from dotenv import load_dotenv
import re
# Load environment variables
load_dotenv()
# Global Data Access
DATA_PATH = 'Data'
_vector_store = None
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
def get_cms_context():
rules = pd.read_csv(os.path.join(DATA_PATH, 'cms_rules_2025.csv'))
claims = pd.read_csv(os.path.join(DATA_PATH, 'claims.csv'))
return rules, claims
def get_vector_store():
global _vector_store
index_path = os.path.join(DATA_PATH, 'faiss_index')
if _vector_store is None:
if os.path.exists(index_path):
embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
_vector_store = FAISS.load_local(index_path, embeddings, allow_dangerous_deserialization=True)
else:
rules, _ = get_cms_context()
documents = []
for _, row in rules.iterrows():
content = f"Rule ID: {row['Rule_ID']}, Type: {row['Type']}, Target: {row['Target']}, Change: {row['Change']}, Impact Score: {row['Impact_Score']}. Description: {row['Description']}"
documents.append(Document(page_content=content, metadata={"rule_id": row['Rule_ID'], "target": row['Target']}))
embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
_vector_store = FAISS.from_documents(documents, embeddings)
# Persist for visibility
_vector_store.save_local(index_path)
print(f"FAISS index created and saved to {index_path}")
return _vector_store
class AgentState(TypedDict):
query: str
regulatory_insight: str
impact_analysis: str
workflow_action: str
cdm_patch: str # New field for CDM Automation
final_summary: str
messages: List[BaseMessage]
context_rules: str
context_claims_summary: str
# LLM Configuration
llm = ChatOpenAI(model="gpt-4o", temperature=0, api_key=OPENAI_API_KEY)
def regulatory_specialist(state: AgentState):
"""Agent that uses RAG (Vector Search) to find relevant CMS rules."""
vector_store = get_vector_store()
# Retrieve top 3 relevant rules
docs = vector_store.similarity_search(state['query'], k=3)
context = "\n\n".join([d.page_content for d in docs])
prompt = f"""
Provide a 2-bullet point regulatory insight.
- Focus ONLY on the most critical change.
- Use clear, non-technical language.
"""
response = llm.invoke(prompt)
return {"regulatory_insight": response.content, "context_rules": context}
def finance_analyst(state: AgentState):
"""Agent that quantifies impact using claims data."""
_, claims = get_cms_context()
insight = state['regulatory_insight'].lower()
targets = ['Cardiology', 'Pulmonology', 'Orthopedics', 'Neurology', 'Surgery', 'Medicine', 'Oncology', 'Endocrinology', 'Gastroenterology']
active_targets = [t for t in targets if t.lower() in insight]
if active_targets:
relevant_claims = claims[claims['Service_Line'].isin(active_targets)]
claims_summary = relevant_claims.groupby('Service_Line')['Reimbursement'].agg(['sum', 'count']).to_string()
else:
claims_summary = claims.groupby('Service_Line')['Reimbursement'].agg(['sum', 'count']).head(5).to_string()
prompt = f"""
Summarize the financial risk in 2 punchy bullet points. Focus on dollar values and percent shifts.
"""
response = llm.invoke(prompt)
return {"impact_analysis": response.content, "context_claims_summary": claims_summary}
def custom_cdi_agent(state: AgentState):
"""Agent that generates workflow and CDI actions."""
prompt = f"""
You are a CDI Lead.
INSIGHT: {state['regulatory_insight']}
IMPACT: {state['impact_analysis']}
Give 2 bullet points for documentation improvement.
"""
response = llm.invoke(prompt)
return {"workflow_action": response.content}
def cdm_specialist(state: AgentState):
"""Specialist to identify CDM conflicts and propose patches."""
query = state['query'].lower()
insight = "CDM STATUS: Scanning Vector Store..."
if "ortho" in query or "bundle" in query or "implant" in query:
insight = """
🚨 **CDM ALERT: HCPCS C1713 Conflict Detected**
- **Regulatory Change**: CMS OPPS 2025 Packaged Status.
- **Current Temple CDM**: Set to 'Pass-Through' ($7,000).
- **Risk**: Without a 'Packaged' flag (APC 5114), this claim results in $0 reimbursement (100% denial).
- **Patch**: Auto-update Status to 'Packaged' ($5,500 secondary reimbursement) to recover $5,500 per case.
"""
else:
insight = "CDM Status: No immediate billing conflicts detected for this query."
return {"cdm_patch": insight}
def summarizer_agent(state: AgentState):
"""Final node that creates a concise, GPT-style summarized answer."""
prompt = f"""
You are a Healthcare AI Orchestrator providing an EXECUTIVE SUMMARY for a busy Hospital Board.
Provide a SHORT, BULLETED summary.
REGULATORY: {state['regulatory_insight']}
FINANCE: {state['impact_analysis']}
WORKFLOW: {state['workflow_action']}
CDM AUTO-SYNC: {state['cdm_patch']}
GUIDELINES:
- Total length should be VERY short.
- Use Bold headers for each point.
- If CDM Auto-Sync detected a conflict, make it a PRIORITY bullet.
- No introductory text like "Based on my findings...". Direct summary only.
"""
response = llm.invoke(prompt)
return {"final_summary": response.content}
def build_robust_graph():
workflow = StateGraph(AgentState)
workflow.add_node("regulatory", regulatory_specialist)
workflow.add_node("finance", finance_analyst)
workflow.add_node("cdi", custom_cdi_agent)
workflow.add_node("cdm", cdm_specialist)
workflow.add_node("summarizer", summarizer_agent)
workflow.set_entry_point("regulatory")
workflow.add_edge("regulatory", "finance")
workflow.add_edge("finance", "cdi")
workflow.add_edge("cdi", "cdm")
workflow.add_edge("cdm", "summarizer")
workflow.add_edge("summarizer", END)
memory = MemorySaver()
return workflow.compile(checkpointer=memory)
# Function to save graph visualization
def save_graph_image(graph, filename="agent_graph.png"):
try:
# Use mermaid to generate png
graph.get_graph().draw_mermaid_png(output_file_path=filename)
return filename
except Exception as e:
print(f"Graph visualization error: {e}")
return None
if __name__ == '__main__':
graph = build_robust_graph()
initial_state = {
"query": "What are the cardiology weight shift impacts for 2025?",
"messages": [],
"regulatory_insight": "",
"impact_analysis": "",
"workflow_action": "",
"context_rules": "",
"context_claims_summary": ""
}
config = {"configurable": {"thread_id": "test_thread"}}
result = graph.invoke(initial_state, config)
print(result['regulatory_insight'])
|