OncoAgent / agents /graph.py
MaximoLopezChenlo's picture
Upload folder using huggingface_hub
e1624f5 verified
"""
OncoAgent LangGraph β€” SOTA Multi-Agent Orchestration Graph.
Architecture synthesised from:
- Claude Code: deterministic harness + sub-agent delegation
- Hermes Agent: structured tool calling + persistent state
- Corrective RAG: graded retrieval with query rewriting
- Reflexion: generator ↔ critic loop with max iterations
- Model Tiering: Qwen3.5-9B (fast) ↔ Qwen3.6-27B (deep reasoning)
Topology:
Router β†’ Ingestion β†’ Corrective RAG β†’ Specialist ↔ Critic β†’ HITL Gate β†’ Formatter
↓
Fallback
Conditional edges:
- Router: routes "insufficient" directly to fallback
- CRAG: routes insufficient docs to fallback
- Critic: loops back to specialist (max 2) or to fallback
- HITL: routes high-acuity to interrupt, others to formatter
"""
import logging
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from .state import AgentState
from .router import router_node
from .nodes import data_ingestion_node
from .corrective_rag import corrective_rag_node
from .specialist import specialist_node
from .critic import critic_node, MAX_CRITIC_ATTEMPTS
from .formatter import formatter_node, fallback_node
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Conditional edge functions
# ---------------------------------------------------------------------------
def _route_after_router(state: AgentState) -> str:
"""Route based on the router's complexity classification.
Returns:
Node name to transition to.
"""
decision = state.get("routing_decision", "simple")
if decision == "insufficient":
logger.info("Router β†’ Fallback (insufficient input)")
return "fallback"
# Both "simple" and "complex" proceed to ingestion
return "ingestion"
def _route_after_crag(state: AgentState) -> str:
"""Route based on CRAG retrieval results.
If insufficient relevant documents were found (even after rewrites),
route directly to fallback.
Returns:
Node name to transition to.
"""
graded_count = state.get("rag_grading_pass_count", 0)
retrieval_count = state.get("rag_retrieval_count", 0)
if retrieval_count == 0 and graded_count == 0:
logger.info("CRAG β†’ Fallback (no relevant documents)")
return "fallback"
return "specialist"
def _route_after_critic(state: AgentState) -> str:
"""Route based on the critic's verdict and attempt count.
- PASS β†’ proceed to HITL gate
- FAIL + attempts < max β†’ loop back to specialist
- FAIL + attempts >= max β†’ fallback
Returns:
Node name to transition to.
"""
verdict = state.get("critic_verdict", "FAIL")
attempts = state.get("critic_attempts", 0)
if verdict == "PASS":
logger.info("Critic β†’ HITL Gate (PASS on attempt %d)", attempts)
return "hitl_gate"
if attempts >= MAX_CRITIC_ATTEMPTS:
logger.warning(
"Critic β†’ Fallback (FAIL after %d/%d attempts)",
attempts, MAX_CRITIC_ATTEMPTS,
)
return "fallback"
logger.info(
"Critic β†’ Specialist retry (FAIL, attempt %d/%d)",
attempts, MAX_CRITIC_ATTEMPTS,
)
return "specialist"
def _route_after_hitl(state: AgentState) -> str:
"""Route based on acuity level and HITL requirements.
For the hackathon, high-acuity cases are flagged but auto-proceed.
In production, this would use LangGraph's interrupt() for real
clinician approval.
Returns:
Node name to transition to.
"""
# For now, always proceed to formatter
# In production: if hitl_required and not hitl_approved β†’ interrupt
return "formatter"
# ---------------------------------------------------------------------------
# HITL Gate Node
# ---------------------------------------------------------------------------
def hitl_gate_node(state: AgentState) -> dict:
"""Determine if the case requires Human-in-the-Loop approval.
Acuity classification:
- high: Stage IV + rare mutations β†’ requires clinician review
- medium: Stage III or complex β†’ flagged but auto-proceeds
- low: Standard cases β†’ auto-proceeds
Args:
state: Current LangGraph state.
Returns:
State update with acuity_level, hitl_required, hitl_approved.
"""
entities = state.get("extracted_entities", {})
complexity = state.get("complexity_score", 0.0)
stage = entities.get("stage", "Unknown").upper()
# Determine acuity
if "IV" in stage and complexity >= 0.6:
acuity = "high"
hitl_required = True
elif "III" in stage or complexity >= 0.4:
acuity = "medium"
hitl_required = False
else:
acuity = "low"
hitl_required = False
logger.info(
"HITL Gate: acuity=%s, hitl_required=%s, complexity=%.2f",
acuity, hitl_required, complexity,
)
return {
"acuity_level": acuity,
"hitl_required": hitl_required,
"hitl_approved": not hitl_required, # Auto-approve non-HITL cases
}
# ---------------------------------------------------------------------------
# Graph Builder
# ---------------------------------------------------------------------------
def build_oncoagent_graph() -> StateGraph:
"""Build the SOTA OncoAgent LangGraph state machine.
Topology:
START β†’ router β†’ (ingestion | fallback)
↓
corrective_rag β†’ (specialist | fallback)
↓
specialist ↔ critic (max 2 loops)
↓
hitl_gate β†’ formatter β†’ END
↓
fallback β†’ END
Returns:
Compiled LangGraph state machine.
"""
workflow = StateGraph(AgentState)
# --- Define Nodes ---
workflow.add_node("router", router_node)
workflow.add_node("ingestion", data_ingestion_node)
workflow.add_node("corrective_rag", corrective_rag_node)
workflow.add_node("specialist", specialist_node)
workflow.add_node("critic", critic_node)
workflow.add_node("hitl_gate", hitl_gate_node)
workflow.add_node("formatter", formatter_node)
workflow.add_node("fallback", fallback_node)
# --- Define Edges ---
# Entry point
workflow.set_entry_point("router")
# Router β†’ Ingestion or Fallback (conditional)
workflow.add_conditional_edges(
"router",
_route_after_router,
{
"ingestion": "ingestion",
"fallback": "fallback",
},
)
# Ingestion β†’ Corrective RAG (always)
workflow.add_edge("ingestion", "corrective_rag")
# Corrective RAG β†’ Specialist or Fallback (conditional)
workflow.add_conditional_edges(
"corrective_rag",
_route_after_crag,
{
"specialist": "specialist",
"fallback": "fallback",
},
)
# Specialist β†’ Critic (always)
workflow.add_edge("specialist", "critic")
# Critic β†’ HITL Gate, Specialist (retry), or Fallback (conditional)
workflow.add_conditional_edges(
"critic",
_route_after_critic,
{
"hitl_gate": "hitl_gate",
"specialist": "specialist",
"fallback": "fallback",
},
)
# HITL Gate β†’ Formatter (conditional, future: interrupt for clinician)
workflow.add_conditional_edges(
"hitl_gate",
_route_after_hitl,
{
"formatter": "formatter",
},
)
# Terminal edges
workflow.add_edge("formatter", END)
workflow.add_edge("fallback", END)
# Compile with recursion limit (Rule #20: strict limit for loops)
memory = MemorySaver()
compiled = workflow.compile(
checkpointer=memory,
)
logger.info("OncoAgent graph compiled successfully (8 nodes, SOTA topology).")
return compiled