Spaces:
Running
Running
| """ | |
| Mission M-B β Multi-Agent Orchestration Tests. | |
| Tests the 5-agent hierarchy: build_orchestrator(), sub-agent builders, | |
| and the qualification tools (mark_qualified, mark_disqualified). | |
| """ | |
| import pytest | |
| from datetime import datetime | |
| from uuid import uuid4 | |
| from unittest.mock import AsyncMock, MagicMock, patch | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from app.models.models import ( | |
| User, | |
| Workspace, | |
| WorkspaceMember, | |
| WorkspaceRole, | |
| Contact, | |
| ChannelIdentity, | |
| Conversation, | |
| ConversationStatus, | |
| ExecutionInstance, | |
| ExecutionStatus, | |
| Flow, | |
| FlowVersion, | |
| ) | |
| from app.core.security import get_password_hash | |
| from app.core.adk.agents.orchestrator import build_orchestrator | |
| from app.core.adk.agents.qualification import build_qualification_agent | |
| # ββ Fixtures ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def _setup_workspace_with_contact( | |
| db: AsyncSession, | |
| qualification_status: str | None = None, | |
| zoho_lead_id: str | None = None, | |
| ) -> tuple: | |
| """Create minimal workspace + contact + conversation + execution instance.""" | |
| user = User( | |
| email=f"mb_test_{uuid4().hex[:6]}@test.com", | |
| hashed_password=get_password_hash("testpass"), | |
| full_name="MB Test", | |
| is_active=True, | |
| email_verified_at=datetime.utcnow(), | |
| ) | |
| db.add(user) | |
| await db.flush() | |
| ws = Workspace(name=f"MB WS {uuid4().hex[:4]}") | |
| db.add(ws) | |
| await db.flush() | |
| db.add(WorkspaceMember(user_id=user.id, workspace_id=ws.id, role=WorkspaceRole.OWNER)) | |
| contact = Contact( | |
| workspace_id=ws.id, | |
| first_name="Test", | |
| last_name="Lead", | |
| additional_metadata={}, | |
| qualification_status=qualification_status, | |
| zoho_lead_id=zoho_lead_id, | |
| ) | |
| db.add(contact) | |
| await db.flush() | |
| identity = ChannelIdentity( | |
| workspace_id=ws.id, | |
| contact_id=contact.id, | |
| provider="whatsapp", | |
| provider_user_id="14155550001", | |
| ) | |
| db.add(identity) | |
| conversation = Conversation( | |
| workspace_id=ws.id, | |
| contact_id=contact.id, | |
| status=ConversationStatus.BOT_ACTIVE, | |
| ) | |
| db.add(conversation) | |
| flow = Flow(workspace_id=ws.id, name="MB Test Flow", status="published") | |
| db.add(flow) | |
| await db.flush() | |
| flow_version = FlowVersion( | |
| workspace_id=ws.id, | |
| flow_id=flow.id, | |
| version_number=1, | |
| is_published=True, | |
| definition_json={"nodes": [], "edges": []}, | |
| ) | |
| db.add(flow_version) | |
| await db.flush() | |
| instance = ExecutionInstance( | |
| workspace_id=ws.id, | |
| flow_version_id=flow_version.id, | |
| contact_id=contact.id, | |
| status=ExecutionStatus.RUNNING, | |
| ) | |
| db.add(instance) | |
| await db.flush() | |
| return ws, contact, conversation, instance | |
| # ββ Test 1: build_orchestrator produces agent with 4 AgentTools βββββββββββ | |
| async def test_build_orchestrator_has_four_agent_tools(db_session: AsyncSession): | |
| ws, contact, conversation, instance = await _setup_workspace_with_contact(db_session) | |
| compiled = MagicMock() | |
| compiled.system_instruction = "You are a helpful sales assistant." | |
| with patch( | |
| "app.core.adk.agents.orchestrator.compile_workspace_prompt", | |
| new=AsyncMock(return_value=compiled), | |
| ): | |
| agent = await build_orchestrator( | |
| workspace_id=ws.id, | |
| contact_id=contact.id, | |
| conversation_id=conversation.id, | |
| session=db_session, | |
| instance=instance, | |
| ) | |
| assert agent.name == "orchestrator" | |
| assert len(agent.tools) == 4 | |
| tool_names = {t.agent.name for t in agent.tools} | |
| assert tool_names == {"qualification_agent", "crm_agent", "reply_agent", "handover_agent"} | |
| # ββ Test 2: Instruction reflects unqualified + CRM-unsynced state βββββββββ | |
| async def test_orchestrator_instruction_unqualified_state(db_session: AsyncSession): | |
| ws, contact, conversation, instance = await _setup_workspace_with_contact( | |
| db_session, | |
| qualification_status=None, | |
| zoho_lead_id=None, | |
| ) | |
| compiled = MagicMock() | |
| compiled.system_instruction = "Business context." | |
| with patch( | |
| "app.core.adk.agents.orchestrator.compile_workspace_prompt", | |
| new=AsyncMock(return_value=compiled), | |
| ): | |
| agent = await build_orchestrator( | |
| workspace_id=ws.id, | |
| contact_id=contact.id, | |
| conversation_id=conversation.id, | |
| session=db_session, | |
| instance=instance, | |
| ) | |
| assert "not yet qualified" in agent.instruction | |
| assert "no" in agent.instruction # CRM synced: no | |
| # ββ Test 3: Instruction reflects qualified + CRM-synced state βββββββββββββ | |
| async def test_orchestrator_instruction_qualified_synced_state(db_session: AsyncSession): | |
| ws, contact, conversation, instance = await _setup_workspace_with_contact( | |
| db_session, | |
| qualification_status="qualified", | |
| zoho_lead_id="ZL-999", | |
| ) | |
| compiled = MagicMock() | |
| compiled.system_instruction = "Business context." | |
| with patch( | |
| "app.core.adk.agents.orchestrator.compile_workspace_prompt", | |
| new=AsyncMock(return_value=compiled), | |
| ): | |
| agent = await build_orchestrator( | |
| workspace_id=ws.id, | |
| contact_id=contact.id, | |
| conversation_id=conversation.id, | |
| session=db_session, | |
| instance=instance, | |
| ) | |
| # Both "qualified" and "yes" should appear in the routing context section | |
| assert "qualified" in agent.instruction | |
| assert "yes" in agent.instruction # CRM synced: yes | |
| # ββ Test 4: mark_qualified tool updates Contact.qualification_status βββββββ | |
| async def test_mark_qualified_updates_contact(db_session: AsyncSession): | |
| ws, contact, conversation, instance = await _setup_workspace_with_contact(db_session) | |
| assert contact.qualification_status is None | |
| agent = build_qualification_agent( | |
| qualification_questions=[{"label": "What is your budget?", "enabled": True, "order": 0}], | |
| qualification_criteria=[{"label": "Budget > $1000", "is_enabled": True, "sort_order": 0}], | |
| session=db_session, | |
| instance=instance, | |
| ) | |
| # Locate the mark_qualified FunctionTool and invoke its inner function directly | |
| mark_qualified_tool = next(t for t in agent.tools if t.name == "mark_qualified") | |
| result = await mark_qualified_tool.func(reason="Budget confirmed at $5000") | |
| assert result["qualification_status"] == "qualified" | |
| await db_session.refresh(contact) | |
| assert contact.qualification_status == "qualified" | |
| assert "qualified" in (contact.additional_metadata or {}).get("tags", []) | |
| # ββ Test 5: mark_disqualified tool updates Contact.qualification_status ββββ | |
| async def test_mark_disqualified_updates_contact(db_session: AsyncSession): | |
| ws, contact, conversation, instance = await _setup_workspace_with_contact(db_session) | |
| assert contact.qualification_status is None | |
| agent = build_qualification_agent( | |
| qualification_questions=[], | |
| qualification_criteria=[], | |
| session=db_session, | |
| instance=instance, | |
| ) | |
| mark_disqualified_tool = next(t for t in agent.tools if t.name == "mark_disqualified") | |
| result = await mark_disqualified_tool.func(reason="Budget too low") | |
| assert result["qualification_status"] == "disqualified" | |
| await db_session.refresh(contact) | |
| assert contact.qualification_status == "disqualified" | |
| assert "disqualified" in (contact.additional_metadata or {}).get("tags", []) | |