Spaces:
Sleeping
Sleeping
File size: 6,315 Bytes
6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 9659593 6dc9d46 9659593 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 9659593 6dc9d46 696f787 aefac4f 696f787 6dc9d46 aefac4f 9659593 696f787 6dc9d46 9659593 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 696f787 6dc9d46 9659593 6dc9d46 9659593 6dc9d46 696f787 9659593 6dc9d46 9659593 6dc9d46 9659593 696f787 6dc9d46 9659593 6dc9d46 696f787 6dc9d46 696f787 9659593 6dc9d46 9659593 6dc9d46 aefac4f 9659593 696f787 aefac4f 6dc9d46 aefac4f 6dc9d46 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | """
MediGuard AI RAG-Helper
Main LangGraph Workflow - Clinical Insight Guild Orchestration
"""
from langgraph.graph import END, StateGraph
from src.pdf_processor import get_all_retrievers
from src.state import GuildState
class ClinicalInsightGuild:
"""
Main workflow orchestrator for MediGuard AI RAG-Helper.
Coordinates all specialist agents in the Clinical Insight Guild.
"""
def __init__(self):
"""Initialize the guild with all specialist agents"""
print("\n" + "=" * 70)
print("INITIALIZING: Clinical Insight Guild")
print("=" * 70)
# Load retrievers
print("\nLoading RAG retrievers...")
retrievers = get_all_retrievers()
# Import and initialize all agents
from src.agents.biomarker_analyzer import biomarker_analyzer_agent
from src.agents.biomarker_linker import create_biomarker_linker_agent
from src.agents.clinical_guidelines import create_clinical_guidelines_agent
from src.agents.confidence_assessor import confidence_assessor_agent
from src.agents.disease_explainer import create_disease_explainer_agent
from src.agents.response_synthesizer import response_synthesizer_agent
self.biomarker_analyzer = biomarker_analyzer_agent
self.disease_explainer = create_disease_explainer_agent(retrievers["disease_explainer"])
self.biomarker_linker = create_biomarker_linker_agent(retrievers["biomarker_linker"])
self.clinical_guidelines = create_clinical_guidelines_agent(retrievers["clinical_guidelines"])
self.confidence_assessor = confidence_assessor_agent
self.response_synthesizer = response_synthesizer_agent
print("All agents initialized successfully")
# Build workflow graph
self.workflow = self._build_workflow()
print("Workflow graph compiled")
print("=" * 70 + "\n")
def _build_workflow(self):
"""
Build the LangGraph workflow.
Execution flow:
1. Biomarker Analyzer (validates all biomarkers)
2. Parallel execution:
- Disease Explainer (RAG for pathophysiology)
- Biomarker-Disease Linker (connects values to prediction)
- Clinical Guidelines (RAG for recommendations)
3. Confidence Assessor (evaluates reliability)
4. Response Synthesizer (compiles final output)
"""
# Create state graph
workflow = StateGraph(GuildState)
# Add all agent nodes
workflow.add_node("biomarker_analyzer", self.biomarker_analyzer.analyze)
workflow.add_node("disease_explainer", self.disease_explainer.explain)
workflow.add_node("biomarker_linker", self.biomarker_linker.link)
workflow.add_node("clinical_guidelines", self.clinical_guidelines.recommend)
workflow.add_node("confidence_assessor", self.confidence_assessor.assess)
workflow.add_node("response_synthesizer", self.response_synthesizer.synthesize)
# Define execution flow
# Start -> Biomarker Analyzer
workflow.set_entry_point("biomarker_analyzer")
# Biomarker Analyzer -> Parallel specialists
workflow.add_edge("biomarker_analyzer", "disease_explainer")
workflow.add_edge("biomarker_analyzer", "biomarker_linker")
workflow.add_edge("biomarker_analyzer", "clinical_guidelines")
# All parallel specialists -> Confidence Assessor
workflow.add_edge("disease_explainer", "confidence_assessor")
workflow.add_edge("biomarker_linker", "confidence_assessor")
workflow.add_edge("clinical_guidelines", "confidence_assessor")
# Confidence Assessor -> Response Synthesizer
workflow.add_edge("confidence_assessor", "response_synthesizer")
# Response Synthesizer -> END
workflow.add_edge("response_synthesizer", END)
# Compile workflow (returns CompiledGraph with invoke method)
return workflow.compile()
def run(self, patient_input) -> dict:
"""
Execute the complete Clinical Insight Guild workflow.
Args:
patient_input: PatientInput object with biomarkers and ML prediction
Returns:
Complete structured response dictionary
"""
from datetime import datetime
from src.config import BASELINE_SOP
print("\n" + "=" * 70)
print("STARTING: Clinical Insight Guild Workflow")
print("=" * 70)
print(f"Patient: {patient_input.patient_context.get('patient_id', 'Unknown')}")
print(f"Predicted Disease: {patient_input.model_prediction['disease']}")
print(f"Model Confidence: {patient_input.model_prediction['confidence']:.1%}")
print("=" * 70 + "\n")
# Initialize state from PatientInput
initial_state: GuildState = {
"patient_biomarkers": patient_input.biomarkers,
"model_prediction": patient_input.model_prediction,
"patient_context": patient_input.patient_context,
"plan": None,
"sop": BASELINE_SOP,
"agent_outputs": [],
"biomarker_flags": [],
"safety_alerts": [],
"final_response": None,
"biomarker_analysis": None,
"processing_timestamp": datetime.now().isoformat(),
"sop_version": "Baseline",
}
# Run workflow
final_state = self.workflow.invoke(initial_state)
print("\n" + "=" * 70)
print("COMPLETED: Clinical Insight Guild Workflow")
print("=" * 70)
print(f"Total Agents Executed: {len(final_state.get('agent_outputs', []))}")
print("Workflow execution successful")
print("=" * 70 + "\n")
# Return full state so callers can access agent_outputs,
# biomarker_flags, safety_alerts, and final_response
return dict(final_state)
def create_guild() -> ClinicalInsightGuild:
"""Factory function to create and initialize the Clinical Insight Guild"""
return ClinicalInsightGuild()
if __name__ == "__main__":
# Test workflow initialization
print("Testing Clinical Insight Guild initialization...")
guild = create_guild()
print("\nGuild initialization successful!")
print("Ready to process patient inputs.")
|