""" OncoAgent LangGraph — SOTA Multi-Agent Orchestration Graph. Architecture synthesised from: - Claude Code: deterministic harness + sub-agent delegation - Hermes Agent: structured tool calling + persistent state - Corrective RAG: graded retrieval with query rewriting - Reflexion: generator ↔ critic loop with max iterations - Model Tiering: Qwen3.5-9B (fast) ↔ Qwen3.6-27B (deep reasoning) Topology: Router → Ingestion → Corrective RAG → Specialist ↔ Critic → HITL Gate → Formatter ↓ Fallback Conditional edges: - Router: routes "insufficient" directly to fallback - CRAG: routes insufficient docs to fallback - Critic: loops back to specialist (max 2) or to fallback - HITL: routes high-acuity to interrupt, others to formatter """ import logging from langgraph.graph import StateGraph, END from langgraph.checkpoint.memory import MemorySaver from .state import AgentState from .router import router_node from .nodes import data_ingestion_node from .corrective_rag import corrective_rag_node from .specialist import specialist_node from .critic import critic_node, MAX_CRITIC_ATTEMPTS from .formatter import formatter_node, fallback_node logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Conditional edge functions # --------------------------------------------------------------------------- def _route_after_router(state: AgentState) -> str: """Route based on the router's complexity classification. Returns: Node name to transition to. """ decision = state.get("routing_decision", "simple") if decision == "insufficient": logger.info("Router → Fallback (insufficient input)") return "fallback" # Both "simple" and "complex" proceed to ingestion return "ingestion" def _route_after_crag(state: AgentState) -> str: """Route based on CRAG retrieval results. If insufficient relevant documents were found (even after rewrites), route directly to fallback. Returns: Node name to transition to. """ graded_count = state.get("rag_grading_pass_count", 0) retrieval_count = state.get("rag_retrieval_count", 0) if retrieval_count == 0 and graded_count == 0: logger.info("CRAG → Fallback (no relevant documents)") return "fallback" return "specialist" def _route_after_critic(state: AgentState) -> str: """Route based on the critic's verdict and attempt count. - PASS → proceed to HITL gate - FAIL + attempts < max → loop back to specialist - FAIL + attempts >= max → fallback Returns: Node name to transition to. """ verdict = state.get("critic_verdict", "FAIL") attempts = state.get("critic_attempts", 0) if verdict == "PASS": logger.info("Critic → HITL Gate (PASS on attempt %d)", attempts) return "hitl_gate" if attempts >= MAX_CRITIC_ATTEMPTS: logger.warning( "Critic → Fallback (FAIL after %d/%d attempts)", attempts, MAX_CRITIC_ATTEMPTS, ) return "fallback" logger.info( "Critic → Specialist retry (FAIL, attempt %d/%d)", attempts, MAX_CRITIC_ATTEMPTS, ) return "specialist" def _route_after_hitl(state: AgentState) -> str: """Route based on acuity level and HITL requirements. For the hackathon, high-acuity cases are flagged but auto-proceed. In production, this would use LangGraph's interrupt() for real clinician approval. Returns: Node name to transition to. """ # For now, always proceed to formatter # In production: if hitl_required and not hitl_approved → interrupt return "formatter" # --------------------------------------------------------------------------- # HITL Gate Node # --------------------------------------------------------------------------- def hitl_gate_node(state: AgentState) -> dict: """Determine if the case requires Human-in-the-Loop approval. Acuity classification: - high: Stage IV + rare mutations → requires clinician review - medium: Stage III or complex → flagged but auto-proceeds - low: Standard cases → auto-proceeds Args: state: Current LangGraph state. Returns: State update with acuity_level, hitl_required, hitl_approved. """ entities = state.get("extracted_entities", {}) complexity = state.get("complexity_score", 0.0) stage = entities.get("stage", "Unknown").upper() # Determine acuity if "IV" in stage and complexity >= 0.6: acuity = "high" hitl_required = True elif "III" in stage or complexity >= 0.4: acuity = "medium" hitl_required = False else: acuity = "low" hitl_required = False logger.info( "HITL Gate: acuity=%s, hitl_required=%s, complexity=%.2f", acuity, hitl_required, complexity, ) return { "acuity_level": acuity, "hitl_required": hitl_required, "hitl_approved": not hitl_required, # Auto-approve non-HITL cases } # --------------------------------------------------------------------------- # Graph Builder # --------------------------------------------------------------------------- def build_oncoagent_graph() -> StateGraph: """Build the SOTA OncoAgent LangGraph state machine. Topology: START → router → (ingestion | fallback) ↓ corrective_rag → (specialist | fallback) ↓ specialist ↔ critic (max 2 loops) ↓ hitl_gate → formatter → END ↓ fallback → END Returns: Compiled LangGraph state machine. """ workflow = StateGraph(AgentState) # --- Define Nodes --- workflow.add_node("router", router_node) workflow.add_node("ingestion", data_ingestion_node) workflow.add_node("corrective_rag", corrective_rag_node) workflow.add_node("specialist", specialist_node) workflow.add_node("critic", critic_node) workflow.add_node("hitl_gate", hitl_gate_node) workflow.add_node("formatter", formatter_node) workflow.add_node("fallback", fallback_node) # --- Define Edges --- # Entry point workflow.set_entry_point("router") # Router → Ingestion or Fallback (conditional) workflow.add_conditional_edges( "router", _route_after_router, { "ingestion": "ingestion", "fallback": "fallback", }, ) # Ingestion → Corrective RAG (always) workflow.add_edge("ingestion", "corrective_rag") # Corrective RAG → Specialist or Fallback (conditional) workflow.add_conditional_edges( "corrective_rag", _route_after_crag, { "specialist": "specialist", "fallback": "fallback", }, ) # Specialist → Critic (always) workflow.add_edge("specialist", "critic") # Critic → HITL Gate, Specialist (retry), or Fallback (conditional) workflow.add_conditional_edges( "critic", _route_after_critic, { "hitl_gate": "hitl_gate", "specialist": "specialist", "fallback": "fallback", }, ) # HITL Gate → Formatter (conditional, future: interrupt for clinician) workflow.add_conditional_edges( "hitl_gate", _route_after_hitl, { "formatter": "formatter", }, ) # Terminal edges workflow.add_edge("formatter", END) workflow.add_edge("fallback", END) # Compile with recursion limit (Rule #20: strict limit for loops) memory = MemorySaver() compiled = workflow.compile( checkpointer=memory, ) logger.info("OncoAgent graph compiled successfully (8 nodes, SOTA topology).") return compiled