File size: 8,130 Bytes
e1624f5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 | """
OncoAgent LangGraph β SOTA Multi-Agent Orchestration Graph.
Architecture synthesised from:
- Claude Code: deterministic harness + sub-agent delegation
- Hermes Agent: structured tool calling + persistent state
- Corrective RAG: graded retrieval with query rewriting
- Reflexion: generator β critic loop with max iterations
- Model Tiering: Qwen3.5-9B (fast) β Qwen3.6-27B (deep reasoning)
Topology:
Router β Ingestion β Corrective RAG β Specialist β Critic β HITL Gate β Formatter
β
Fallback
Conditional edges:
- Router: routes "insufficient" directly to fallback
- CRAG: routes insufficient docs to fallback
- Critic: loops back to specialist (max 2) or to fallback
- HITL: routes high-acuity to interrupt, others to formatter
"""
import logging
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from .state import AgentState
from .router import router_node
from .nodes import data_ingestion_node
from .corrective_rag import corrective_rag_node
from .specialist import specialist_node
from .critic import critic_node, MAX_CRITIC_ATTEMPTS
from .formatter import formatter_node, fallback_node
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Conditional edge functions
# ---------------------------------------------------------------------------
def _route_after_router(state: AgentState) -> str:
"""Route based on the router's complexity classification.
Returns:
Node name to transition to.
"""
decision = state.get("routing_decision", "simple")
if decision == "insufficient":
logger.info("Router β Fallback (insufficient input)")
return "fallback"
# Both "simple" and "complex" proceed to ingestion
return "ingestion"
def _route_after_crag(state: AgentState) -> str:
"""Route based on CRAG retrieval results.
If insufficient relevant documents were found (even after rewrites),
route directly to fallback.
Returns:
Node name to transition to.
"""
graded_count = state.get("rag_grading_pass_count", 0)
retrieval_count = state.get("rag_retrieval_count", 0)
if retrieval_count == 0 and graded_count == 0:
logger.info("CRAG β Fallback (no relevant documents)")
return "fallback"
return "specialist"
def _route_after_critic(state: AgentState) -> str:
"""Route based on the critic's verdict and attempt count.
- PASS β proceed to HITL gate
- FAIL + attempts < max β loop back to specialist
- FAIL + attempts >= max β fallback
Returns:
Node name to transition to.
"""
verdict = state.get("critic_verdict", "FAIL")
attempts = state.get("critic_attempts", 0)
if verdict == "PASS":
logger.info("Critic β HITL Gate (PASS on attempt %d)", attempts)
return "hitl_gate"
if attempts >= MAX_CRITIC_ATTEMPTS:
logger.warning(
"Critic β Fallback (FAIL after %d/%d attempts)",
attempts, MAX_CRITIC_ATTEMPTS,
)
return "fallback"
logger.info(
"Critic β Specialist retry (FAIL, attempt %d/%d)",
attempts, MAX_CRITIC_ATTEMPTS,
)
return "specialist"
def _route_after_hitl(state: AgentState) -> str:
"""Route based on acuity level and HITL requirements.
For the hackathon, high-acuity cases are flagged but auto-proceed.
In production, this would use LangGraph's interrupt() for real
clinician approval.
Returns:
Node name to transition to.
"""
# For now, always proceed to formatter
# In production: if hitl_required and not hitl_approved β interrupt
return "formatter"
# ---------------------------------------------------------------------------
# HITL Gate Node
# ---------------------------------------------------------------------------
def hitl_gate_node(state: AgentState) -> dict:
"""Determine if the case requires Human-in-the-Loop approval.
Acuity classification:
- high: Stage IV + rare mutations β requires clinician review
- medium: Stage III or complex β flagged but auto-proceeds
- low: Standard cases β auto-proceeds
Args:
state: Current LangGraph state.
Returns:
State update with acuity_level, hitl_required, hitl_approved.
"""
entities = state.get("extracted_entities", {})
complexity = state.get("complexity_score", 0.0)
stage = entities.get("stage", "Unknown").upper()
# Determine acuity
if "IV" in stage and complexity >= 0.6:
acuity = "high"
hitl_required = True
elif "III" in stage or complexity >= 0.4:
acuity = "medium"
hitl_required = False
else:
acuity = "low"
hitl_required = False
logger.info(
"HITL Gate: acuity=%s, hitl_required=%s, complexity=%.2f",
acuity, hitl_required, complexity,
)
return {
"acuity_level": acuity,
"hitl_required": hitl_required,
"hitl_approved": not hitl_required, # Auto-approve non-HITL cases
}
# ---------------------------------------------------------------------------
# Graph Builder
# ---------------------------------------------------------------------------
def build_oncoagent_graph() -> StateGraph:
"""Build the SOTA OncoAgent LangGraph state machine.
Topology:
START β router β (ingestion | fallback)
β
corrective_rag β (specialist | fallback)
β
specialist β critic (max 2 loops)
β
hitl_gate β formatter β END
β
fallback β END
Returns:
Compiled LangGraph state machine.
"""
workflow = StateGraph(AgentState)
# --- Define Nodes ---
workflow.add_node("router", router_node)
workflow.add_node("ingestion", data_ingestion_node)
workflow.add_node("corrective_rag", corrective_rag_node)
workflow.add_node("specialist", specialist_node)
workflow.add_node("critic", critic_node)
workflow.add_node("hitl_gate", hitl_gate_node)
workflow.add_node("formatter", formatter_node)
workflow.add_node("fallback", fallback_node)
# --- Define Edges ---
# Entry point
workflow.set_entry_point("router")
# Router β Ingestion or Fallback (conditional)
workflow.add_conditional_edges(
"router",
_route_after_router,
{
"ingestion": "ingestion",
"fallback": "fallback",
},
)
# Ingestion β Corrective RAG (always)
workflow.add_edge("ingestion", "corrective_rag")
# Corrective RAG β Specialist or Fallback (conditional)
workflow.add_conditional_edges(
"corrective_rag",
_route_after_crag,
{
"specialist": "specialist",
"fallback": "fallback",
},
)
# Specialist β Critic (always)
workflow.add_edge("specialist", "critic")
# Critic β HITL Gate, Specialist (retry), or Fallback (conditional)
workflow.add_conditional_edges(
"critic",
_route_after_critic,
{
"hitl_gate": "hitl_gate",
"specialist": "specialist",
"fallback": "fallback",
},
)
# HITL Gate β Formatter (conditional, future: interrupt for clinician)
workflow.add_conditional_edges(
"hitl_gate",
_route_after_hitl,
{
"formatter": "formatter",
},
)
# Terminal edges
workflow.add_edge("formatter", END)
workflow.add_edge("fallback", END)
# Compile with recursion limit (Rule #20: strict limit for loops)
memory = MemorySaver()
compiled = workflow.compile(
checkpointer=memory,
)
logger.info("OncoAgent graph compiled successfully (8 nodes, SOTA topology).")
return compiled
|