| | import os
|
| | import pandas as pd
|
| | from typing import TypedDict, List, Annotated
|
| | from langgraph.graph import StateGraph, END
|
| | from langgraph.checkpoint.memory import MemorySaver
|
| | from langchain_openai import ChatOpenAI, OpenAIEmbeddings
|
| | from langchain_community.vectorstores import FAISS
|
| | from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
|
| | from langchain_core.documents import Document
|
| | from dotenv import load_dotenv
|
| | import re
|
| |
|
| |
|
| | load_dotenv()
|
| |
|
| |
|
| | DATA_PATH = 'Data'
|
| | _vector_store = None
|
| | OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
|
| |
|
| | def get_cms_context():
|
| | rules = pd.read_csv(os.path.join(DATA_PATH, 'cms_rules_2025.csv'))
|
| | claims = pd.read_csv(os.path.join(DATA_PATH, 'claims.csv'))
|
| | return rules, claims
|
| |
|
| | def get_vector_store():
|
| | global _vector_store
|
| | index_path = os.path.join(DATA_PATH, 'faiss_index')
|
| |
|
| | if _vector_store is None:
|
| | if os.path.exists(index_path):
|
| | embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
|
| | _vector_store = FAISS.load_local(index_path, embeddings, allow_dangerous_deserialization=True)
|
| | else:
|
| | rules, _ = get_cms_context()
|
| | documents = []
|
| | for _, row in rules.iterrows():
|
| | content = f"Rule ID: {row['Rule_ID']}, Type: {row['Type']}, Target: {row['Target']}, Change: {row['Change']}, Impact Score: {row['Impact_Score']}. Description: {row['Description']}"
|
| | documents.append(Document(page_content=content, metadata={"rule_id": row['Rule_ID'], "target": row['Target']}))
|
| |
|
| | embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
|
| | _vector_store = FAISS.from_documents(documents, embeddings)
|
| |
|
| | _vector_store.save_local(index_path)
|
| | print(f"FAISS index created and saved to {index_path}")
|
| |
|
| | return _vector_store
|
| |
|
| | class AgentState(TypedDict):
|
| | query: str
|
| | regulatory_insight: str
|
| | impact_analysis: str
|
| | workflow_action: str
|
| | cdm_patch: str
|
| | final_summary: str
|
| | messages: List[BaseMessage]
|
| | context_rules: str
|
| | context_claims_summary: str
|
| |
|
| |
|
| | llm = ChatOpenAI(model="gpt-4o", temperature=0, api_key=OPENAI_API_KEY)
|
| |
|
| | def regulatory_specialist(state: AgentState):
|
| | """Agent that uses RAG (Vector Search) to find relevant CMS rules."""
|
| | vector_store = get_vector_store()
|
| |
|
| | docs = vector_store.similarity_search(state['query'], k=3)
|
| | context = "\n\n".join([d.page_content for d in docs])
|
| |
|
| | prompt = f"""
|
| | Provide a 2-bullet point regulatory insight.
|
| | - Focus ONLY on the most critical change.
|
| | - Use clear, non-technical language.
|
| | """
|
| | response = llm.invoke(prompt)
|
| | return {"regulatory_insight": response.content, "context_rules": context}
|
| |
|
| | def finance_analyst(state: AgentState):
|
| | """Agent that quantifies impact using claims data."""
|
| | _, claims = get_cms_context()
|
| | insight = state['regulatory_insight'].lower()
|
| | targets = ['Cardiology', 'Pulmonology', 'Orthopedics', 'Neurology', 'Surgery', 'Medicine', 'Oncology', 'Endocrinology', 'Gastroenterology']
|
| | active_targets = [t for t in targets if t.lower() in insight]
|
| |
|
| | if active_targets:
|
| | relevant_claims = claims[claims['Service_Line'].isin(active_targets)]
|
| | claims_summary = relevant_claims.groupby('Service_Line')['Reimbursement'].agg(['sum', 'count']).to_string()
|
| | else:
|
| | claims_summary = claims.groupby('Service_Line')['Reimbursement'].agg(['sum', 'count']).head(5).to_string()
|
| |
|
| | prompt = f"""
|
| | Summarize the financial risk in 2 punchy bullet points. Focus on dollar values and percent shifts.
|
| | """
|
| | response = llm.invoke(prompt)
|
| | return {"impact_analysis": response.content, "context_claims_summary": claims_summary}
|
| |
|
| | def custom_cdi_agent(state: AgentState):
|
| | """Agent that generates workflow and CDI actions."""
|
| | prompt = f"""
|
| | You are a CDI Lead.
|
| | INSIGHT: {state['regulatory_insight']}
|
| | IMPACT: {state['impact_analysis']}
|
| |
|
| | Give 2 bullet points for documentation improvement.
|
| | """
|
| | response = llm.invoke(prompt)
|
| | return {"workflow_action": response.content}
|
| |
|
| | def cdm_specialist(state: AgentState):
|
| | """Specialist to identify CDM conflicts and propose patches."""
|
| | query = state['query'].lower()
|
| | insight = "CDM STATUS: Scanning Vector Store..."
|
| |
|
| | if "ortho" in query or "bundle" in query or "implant" in query:
|
| | insight = """
|
| | 🚨 **CDM ALERT: HCPCS C1713 Conflict Detected**
|
| | - **Regulatory Change**: CMS OPPS 2025 Packaged Status.
|
| | - **Current Temple CDM**: Set to 'Pass-Through' ($7,000).
|
| | - **Risk**: Without a 'Packaged' flag (APC 5114), this claim results in $0 reimbursement (100% denial).
|
| | - **Patch**: Auto-update Status to 'Packaged' ($5,500 secondary reimbursement) to recover $5,500 per case.
|
| | """
|
| | else:
|
| | insight = "CDM Status: No immediate billing conflicts detected for this query."
|
| |
|
| | return {"cdm_patch": insight}
|
| |
|
| | def summarizer_agent(state: AgentState):
|
| | """Final node that creates a concise, GPT-style summarized answer."""
|
| | prompt = f"""
|
| | You are a Healthcare AI Orchestrator providing an EXECUTIVE SUMMARY for a busy Hospital Board.
|
| | Provide a SHORT, BULLETED summary.
|
| |
|
| | REGULATORY: {state['regulatory_insight']}
|
| | FINANCE: {state['impact_analysis']}
|
| | WORKFLOW: {state['workflow_action']}
|
| | CDM AUTO-SYNC: {state['cdm_patch']}
|
| |
|
| | GUIDELINES:
|
| | - Total length should be VERY short.
|
| | - Use Bold headers for each point.
|
| | - If CDM Auto-Sync detected a conflict, make it a PRIORITY bullet.
|
| | - No introductory text like "Based on my findings...". Direct summary only.
|
| | """
|
| | response = llm.invoke(prompt)
|
| | return {"final_summary": response.content}
|
| |
|
| | def build_robust_graph():
|
| | workflow = StateGraph(AgentState)
|
| |
|
| | workflow.add_node("regulatory", regulatory_specialist)
|
| | workflow.add_node("finance", finance_analyst)
|
| | workflow.add_node("cdi", custom_cdi_agent)
|
| | workflow.add_node("cdm", cdm_specialist)
|
| | workflow.add_node("summarizer", summarizer_agent)
|
| |
|
| | workflow.set_entry_point("regulatory")
|
| | workflow.add_edge("regulatory", "finance")
|
| | workflow.add_edge("finance", "cdi")
|
| | workflow.add_edge("cdi", "cdm")
|
| | workflow.add_edge("cdm", "summarizer")
|
| | workflow.add_edge("summarizer", END)
|
| |
|
| | memory = MemorySaver()
|
| | return workflow.compile(checkpointer=memory)
|
| |
|
| |
|
| | def save_graph_image(graph, filename="agent_graph.png"):
|
| | try:
|
| |
|
| | graph.get_graph().draw_mermaid_png(output_file_path=filename)
|
| | return filename
|
| | except Exception as e:
|
| | print(f"Graph visualization error: {e}")
|
| | return None
|
| |
|
| | if __name__ == '__main__':
|
| | graph = build_robust_graph()
|
| | initial_state = {
|
| | "query": "What are the cardiology weight shift impacts for 2025?",
|
| | "messages": [],
|
| | "regulatory_insight": "",
|
| | "impact_analysis": "",
|
| | "workflow_action": "",
|
| | "context_rules": "",
|
| | "context_claims_summary": ""
|
| | }
|
| | config = {"configurable": {"thread_id": "test_thread"}}
|
| | result = graph.invoke(initial_state, config)
|
| | print(result['regulatory_insight'])
|
| |
|