LeadPilot / backend /tests /test_multi_agent.py
Ashraf Al-Kassem
Mission M-B β€” Multi-Agent Orchestration
d5ba3bd
raw
history blame
7.91 kB
"""
Mission M-B β€” Multi-Agent Orchestration Tests.
Tests the 5-agent hierarchy: build_orchestrator(), sub-agent builders,
and the qualification tools (mark_qualified, mark_disqualified).
"""
import pytest
from datetime import datetime
from uuid import uuid4
from unittest.mock import AsyncMock, MagicMock, patch
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.models import (
User,
Workspace,
WorkspaceMember,
WorkspaceRole,
Contact,
ChannelIdentity,
Conversation,
ConversationStatus,
ExecutionInstance,
ExecutionStatus,
Flow,
FlowVersion,
)
from app.core.security import get_password_hash
from app.core.adk.agents.orchestrator import build_orchestrator
from app.core.adk.agents.qualification import build_qualification_agent
# ── Fixtures ──────────────────────────────────────────────────────────────
async def _setup_workspace_with_contact(
db: AsyncSession,
qualification_status: str | None = None,
zoho_lead_id: str | None = None,
) -> tuple:
"""Create minimal workspace + contact + conversation + execution instance."""
user = User(
email=f"mb_test_{uuid4().hex[:6]}@test.com",
hashed_password=get_password_hash("testpass"),
full_name="MB Test",
is_active=True,
email_verified_at=datetime.utcnow(),
)
db.add(user)
await db.flush()
ws = Workspace(name=f"MB WS {uuid4().hex[:4]}")
db.add(ws)
await db.flush()
db.add(WorkspaceMember(user_id=user.id, workspace_id=ws.id, role=WorkspaceRole.OWNER))
contact = Contact(
workspace_id=ws.id,
first_name="Test",
last_name="Lead",
additional_metadata={},
qualification_status=qualification_status,
zoho_lead_id=zoho_lead_id,
)
db.add(contact)
await db.flush()
identity = ChannelIdentity(
workspace_id=ws.id,
contact_id=contact.id,
provider="whatsapp",
provider_user_id="14155550001",
)
db.add(identity)
conversation = Conversation(
workspace_id=ws.id,
contact_id=contact.id,
status=ConversationStatus.BOT_ACTIVE,
)
db.add(conversation)
flow = Flow(workspace_id=ws.id, name="MB Test Flow", status="published")
db.add(flow)
await db.flush()
flow_version = FlowVersion(
workspace_id=ws.id,
flow_id=flow.id,
version_number=1,
is_published=True,
definition_json={"nodes": [], "edges": []},
)
db.add(flow_version)
await db.flush()
instance = ExecutionInstance(
workspace_id=ws.id,
flow_version_id=flow_version.id,
contact_id=contact.id,
status=ExecutionStatus.RUNNING,
)
db.add(instance)
await db.flush()
return ws, contact, conversation, instance
# ── Test 1: build_orchestrator produces agent with 4 AgentTools ───────────
@pytest.mark.asyncio
async def test_build_orchestrator_has_four_agent_tools(db_session: AsyncSession):
ws, contact, conversation, instance = await _setup_workspace_with_contact(db_session)
compiled = MagicMock()
compiled.system_instruction = "You are a helpful sales assistant."
with patch(
"app.core.adk.agents.orchestrator.compile_workspace_prompt",
new=AsyncMock(return_value=compiled),
):
agent = await build_orchestrator(
workspace_id=ws.id,
contact_id=contact.id,
conversation_id=conversation.id,
session=db_session,
instance=instance,
)
assert agent.name == "orchestrator"
assert len(agent.tools) == 4
tool_names = {t.agent.name for t in agent.tools}
assert tool_names == {"qualification_agent", "crm_agent", "reply_agent", "handover_agent"}
# ── Test 2: Instruction reflects unqualified + CRM-unsynced state ─────────
@pytest.mark.asyncio
async def test_orchestrator_instruction_unqualified_state(db_session: AsyncSession):
ws, contact, conversation, instance = await _setup_workspace_with_contact(
db_session,
qualification_status=None,
zoho_lead_id=None,
)
compiled = MagicMock()
compiled.system_instruction = "Business context."
with patch(
"app.core.adk.agents.orchestrator.compile_workspace_prompt",
new=AsyncMock(return_value=compiled),
):
agent = await build_orchestrator(
workspace_id=ws.id,
contact_id=contact.id,
conversation_id=conversation.id,
session=db_session,
instance=instance,
)
assert "not yet qualified" in agent.instruction
assert "no" in agent.instruction # CRM synced: no
# ── Test 3: Instruction reflects qualified + CRM-synced state ─────────────
@pytest.mark.asyncio
async def test_orchestrator_instruction_qualified_synced_state(db_session: AsyncSession):
ws, contact, conversation, instance = await _setup_workspace_with_contact(
db_session,
qualification_status="qualified",
zoho_lead_id="ZL-999",
)
compiled = MagicMock()
compiled.system_instruction = "Business context."
with patch(
"app.core.adk.agents.orchestrator.compile_workspace_prompt",
new=AsyncMock(return_value=compiled),
):
agent = await build_orchestrator(
workspace_id=ws.id,
contact_id=contact.id,
conversation_id=conversation.id,
session=db_session,
instance=instance,
)
# Both "qualified" and "yes" should appear in the routing context section
assert "qualified" in agent.instruction
assert "yes" in agent.instruction # CRM synced: yes
# ── Test 4: mark_qualified tool updates Contact.qualification_status ───────
@pytest.mark.asyncio
async def test_mark_qualified_updates_contact(db_session: AsyncSession):
ws, contact, conversation, instance = await _setup_workspace_with_contact(db_session)
assert contact.qualification_status is None
agent = build_qualification_agent(
qualification_questions=[{"label": "What is your budget?", "enabled": True, "order": 0}],
qualification_criteria=[{"label": "Budget > $1000", "is_enabled": True, "sort_order": 0}],
session=db_session,
instance=instance,
)
# Locate the mark_qualified FunctionTool and invoke its inner function directly
mark_qualified_tool = next(t for t in agent.tools if t.name == "mark_qualified")
result = await mark_qualified_tool.func(reason="Budget confirmed at $5000")
assert result["qualification_status"] == "qualified"
await db_session.refresh(contact)
assert contact.qualification_status == "qualified"
assert "qualified" in (contact.additional_metadata or {}).get("tags", [])
# ── Test 5: mark_disqualified tool updates Contact.qualification_status ────
@pytest.mark.asyncio
async def test_mark_disqualified_updates_contact(db_session: AsyncSession):
ws, contact, conversation, instance = await _setup_workspace_with_contact(db_session)
assert contact.qualification_status is None
agent = build_qualification_agent(
qualification_questions=[],
qualification_criteria=[],
session=db_session,
instance=instance,
)
mark_disqualified_tool = next(t for t in agent.tools if t.name == "mark_disqualified")
result = await mark_disqualified_tool.func(reason="Budget too low")
assert result["qualification_status"] == "disqualified"
await db_session.refresh(contact)
assert contact.qualification_status == "disqualified"
assert "disqualified" in (contact.additional_metadata or {}).get("tags", [])