AIDA / evals /test_supervisor_graph.py
destinyebuka's picture
new architecture
5782bb5
# TOMBSTONE: supervisor_graph.py was removed — routing is now plain Python in agent_hub.
# These tests are superseded by evals that exercise agent_hub._dispatch + _handle_handoff.
import pytest
pytest.skip(
"supervisor_graph removed — tests are obsolete",
allow_module_level=True,
)
"""
Supervisor subgraph eval — pure-function tests on the routing layer.
The agent nodes themselves call real specialist process() functions
which need DB + LLM, so we don't exercise them end-to-end here. We
test the deterministic pieces:
- Entry routing picks the right node from state.active_agent
- _next_command(no handoff) → Command(goto=END)
- _next_command(handoff to other agent) → Command(goto=NODE_X)
- _next_command drains state via consume_handoff (single-fire)
- Self-handoff is suppressed (no infinite loop)
- Hop counter caps at MAX_HOPS
- Pre-handoff context is forwarded
- get_supervisor_graph is a singleton
- InterruptKind enum + _coerce_resume edge cases
"""
import pytest
from langgraph.graph import END
from app.ai.agent.handoffs import (
HandoffTarget,
request_handoff,
)
from app.ai.agent.supervisor_graph import (
MAX_HOPS,
NODE_BROKER,
NODE_CONCIERGE,
NODE_GENERAL,
NODE_MATCHER,
_next_command,
_route_entry,
get_supervisor_graph,
)
from app.ai.agent.interrupts import (
InterruptKind,
InterruptPayload,
InterruptResume,
_coerce_resume,
)
from evals.harness import make_state
# ============================================================
# Entry routing
# ============================================================
@pytest.mark.parametrize("active_agent,expected_node", [
("general", NODE_GENERAL),
("concierge", NODE_CONCIERGE),
("broker", NODE_BROKER),
("matcher", NODE_MATCHER),
(None, NODE_GENERAL), # default fallback
("", NODE_GENERAL), # empty string fallback
("unknown", NODE_GENERAL), # garbage fallback (defensive)
])
def test_entry_routing(active_agent, expected_node):
state = make_state(active_agent=active_agent)
assert _route_entry(state) == expected_node
# ============================================================
# _next_command — handoff translation
# ============================================================
def test_next_command_no_handoff_terminates():
state = make_state(active_agent="general")
cmd = _next_command(state, current_agent=HandoffTarget.GENERAL)
assert cmd.goto == END
def test_next_command_handoff_to_concierge():
state = make_state(active_agent="general")
request_handoff(state, HandoffTarget.CONCIERGE, reason="user wants to book")
cmd = _next_command(state, current_agent=HandoffTarget.GENERAL)
assert cmd.goto == NODE_CONCIERGE
def test_next_command_handoff_to_broker():
state = make_state(active_agent="concierge")
request_handoff(state, HandoffTarget.BROKER)
cmd = _next_command(state, current_agent=HandoffTarget.CONCIERGE)
assert cmd.goto == NODE_BROKER
def test_next_command_drains_handoff_slot():
"""A request must fire exactly once — second read returns None."""
state = make_state()
request_handoff(state, HandoffTarget.MATCHER)
_next_command(state, current_agent=HandoffTarget.GENERAL)
# Slot was consumed; a second pass terminates instead of re-routing
cmd2 = _next_command(state, current_agent=HandoffTarget.GENERAL)
assert cmd2.goto == END
def test_next_command_self_handoff_suppressed():
"""Concierge requesting handoff to concierge is a no-op."""
state = make_state()
request_handoff(state, HandoffTarget.CONCIERGE)
cmd = _next_command(state, current_agent=HandoffTarget.CONCIERGE)
assert cmd.goto == END
def test_next_command_forwards_handoff_context_from_reason():
state = make_state()
request_handoff(state, HandoffTarget.BROKER, reason="user wants viewing scheduled")
_next_command(state, current_agent=HandoffTarget.GENERAL)
assert state.temp_data.get("pre_handoff_context") == "user wants viewing scheduled"
def test_next_command_forwards_explicit_context_over_reason():
state = make_state()
request_handoff(
state, HandoffTarget.BROKER,
reason="generic",
context="user has been viewing listing-9 for 3 turns",
)
_next_command(state, current_agent=HandoffTarget.GENERAL)
assert state.temp_data.get("pre_handoff_context") == \
"user has been viewing listing-9 for 3 turns"
def test_next_command_updates_active_agent_to_target():
state = make_state(active_agent="general")
request_handoff(state, HandoffTarget.MATCHER)
_next_command(state, current_agent=HandoffTarget.GENERAL)
assert state.active_agent == "matcher"
assert state.temp_data.get("active_agent") == "matcher"
def test_next_command_increments_hop_counter():
state = make_state()
request_handoff(state, HandoffTarget.BROKER)
_next_command(state, current_agent=HandoffTarget.GENERAL)
assert state.temp_data.get("_supervisor_hops") == 1
def test_next_command_caps_at_max_hops():
state = make_state(temp_data={"_supervisor_hops": MAX_HOPS})
request_handoff(state, HandoffTarget.MATCHER)
cmd = _next_command(state, current_agent=HandoffTarget.GENERAL)
# Beyond cap → terminate even if a handoff was requested
assert cmd.goto == END
# ============================================================
# get_supervisor_graph — singleton
# ============================================================
def test_get_supervisor_graph_is_singleton():
g1 = get_supervisor_graph()
g2 = get_supervisor_graph()
assert g1 is g2
def test_supervisor_graph_has_all_agent_nodes():
g = get_supervisor_graph()
# Compiled graph exposes node names via .nodes (CompiledStateGraph)
nodes = set(g.nodes.keys()) if hasattr(g, "nodes") else set()
for required in (NODE_GENERAL, NODE_CONCIERGE, NODE_BROKER, NODE_MATCHER):
# LangGraph adds __start__/__end__ around our 4 — just check ours present
assert required in nodes, f"missing node {required!r} in graph: {nodes}"
# ============================================================
# Interrupt helpers — type coercion
# ============================================================
def _payload(kind=InterruptKind.AWAIT_PAYMENT) -> InterruptPayload:
return InterruptPayload(
kind=kind,
interrupt_id="test-id",
prompt="ready?",
data={},
)
def test_coerce_resume_passthrough_for_typed_resume():
typed = InterruptResume(
kind=InterruptKind.AWAIT_PAYMENT,
interrupt_id="abc",
accepted=True,
data={"txn": "ok"},
)
out = _coerce_resume(typed, _payload())
assert out is typed
def test_coerce_resume_from_bool_true():
out = _coerce_resume(True, _payload())
assert out.accepted is True
assert out.kind == InterruptKind.AWAIT_PAYMENT
assert out.interrupt_id == "test-id"
def test_coerce_resume_from_bool_false():
out = _coerce_resume(False, _payload())
assert out.accepted is False
def test_coerce_resume_from_dict():
raw = {"kind": "await_review", "interrupt_id": "xyz", "accepted": True,
"data": {"stars": 5}}
out = _coerce_resume(raw, _payload(InterruptKind.AWAIT_REVIEW))
assert out.kind == InterruptKind.AWAIT_REVIEW
assert out.interrupt_id == "xyz"
assert out.accepted is True
assert out.data == {"stars": 5}
def test_coerce_resume_dict_inherits_payload_id_when_absent():
out = _coerce_resume({"accepted": True}, _payload())
assert out.interrupt_id == "test-id"
assert out.kind == InterruptKind.AWAIT_PAYMENT
def test_coerce_resume_unknown_type_treated_as_decline():
out = _coerce_resume(12345, _payload())
assert out.accepted is False
def test_interrupt_kind_values_are_unique():
values = [k.value for k in InterruptKind]
assert len(values) == len(set(values))