Spaces:
Running
Running
| """ | |
| Mission M-E β End-to-End Agent Flow Tests. | |
| Tests the ADK execution path end-to-end: inbound message β runner β | |
| tool calls β event logging β status transitions. Uses mocked Runner to | |
| avoid real Gemini API calls. | |
| """ | |
| import pytest | |
| from datetime import datetime, timedelta | |
| from unittest.mock import AsyncMock, MagicMock, patch | |
| from uuid import uuid4 | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlmodel import select | |
| from app.core.adk.runner import run_for_contact | |
| from app.models.models import ( | |
| User, | |
| Workspace, | |
| WorkspaceMember, | |
| WorkspaceRole, | |
| Contact, | |
| ChannelIdentity, | |
| Conversation, | |
| ConversationStatus, | |
| ExecutionInstance, | |
| ExecutionStatus, | |
| ExecutionStepLog, | |
| Flow, | |
| FlowVersion, | |
| RuntimeEventLog, | |
| ) | |
| from app.core.security import get_password_hash | |
| # ββ Fixtures ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def _setup_workspace_with_contact(db: AsyncSession): | |
| """Create minimal workspace + contact + conversation + execution instance.""" | |
| user = User( | |
| email=f"e2e_{uuid4().hex[:6]}@test.com", | |
| hashed_password=get_password_hash("pass"), | |
| full_name="E2E Test", | |
| is_active=True, | |
| email_verified_at=datetime.utcnow(), | |
| ) | |
| db.add(user) | |
| await db.flush() | |
| ws = Workspace(name=f"E2E WS {uuid4().hex[:4]}") | |
| db.add(ws) | |
| await db.flush() | |
| db.add(WorkspaceMember(user_id=user.id, workspace_id=ws.id, role=WorkspaceRole.OWNER)) | |
| contact = Contact( | |
| workspace_id=ws.id, | |
| first_name="E2E", | |
| last_name="Lead", | |
| additional_metadata={}, | |
| ) | |
| db.add(contact) | |
| await db.flush() | |
| db.add(ChannelIdentity( | |
| workspace_id=ws.id, | |
| contact_id=contact.id, | |
| provider="whatsapp", | |
| provider_user_id=f"e2e_{uuid4().hex[:6]}", | |
| )) | |
| conversation = Conversation( | |
| workspace_id=ws.id, | |
| contact_id=contact.id, | |
| status=ConversationStatus.BOT_ACTIVE, | |
| ) | |
| db.add(conversation) | |
| flow = Flow(workspace_id=ws.id, name="E2E Flow", status="published") | |
| db.add(flow) | |
| await db.flush() | |
| flow_version = FlowVersion( | |
| workspace_id=ws.id, | |
| flow_id=flow.id, | |
| version_number=1, | |
| is_published=True, | |
| definition_json={"nodes": [], "edges": []}, | |
| ) | |
| db.add(flow_version) | |
| await db.flush() | |
| instance = ExecutionInstance( | |
| workspace_id=ws.id, | |
| flow_version_id=flow_version.id, | |
| contact_id=contact.id, | |
| status=ExecutionStatus.RUNNING, | |
| ) | |
| db.add(instance) | |
| await db.flush() | |
| return ws, contact, conversation, instance | |
| def _compiled_prompt(): | |
| c = MagicMock() | |
| c.system_instruction = "You are a helpful sales assistant." | |
| c.version_id = uuid4() | |
| return c | |
| # ββ E2E Test 1: tool call produces step log + runtime events ββββββββββββββ | |
| async def test_run_for_contact_tool_call_produces_step_log_and_events( | |
| db_session: AsyncSession, | |
| ): | |
| """ | |
| When the runner emits a tool-call event, _log_tool_call() writes an | |
| ExecutionStepLog row and a RuntimeEventLog entry. | |
| """ | |
| ws, contact, conversation, instance = await _setup_workspace_with_contact(db_session) | |
| fake_call = MagicMock() | |
| fake_call.name = "send_reply" | |
| fake_call.args = {"message_content": "Hello there!"} | |
| mock_event = MagicMock() | |
| mock_event.get_function_calls.return_value = [fake_call] | |
| async def _fake_run_async(**kwargs): | |
| yield mock_event | |
| mock_svc = MagicMock() | |
| mock_svc.get_session = AsyncMock(return_value=None) | |
| mock_svc.create_session = AsyncMock(return_value=MagicMock()) | |
| mock_svc.append_event = AsyncMock() | |
| mock_svc.update_session = AsyncMock() | |
| with ( | |
| patch( | |
| "app.core.adk.agents.orchestrator.compile_workspace_prompt", | |
| new=AsyncMock(return_value=_compiled_prompt()), | |
| ), | |
| patch("google.adk.runners.Runner.run_async", side_effect=_fake_run_async), | |
| patch("app.core.adk.runner.LeadPilotSessionService", return_value=mock_svc), | |
| ): | |
| await run_for_contact( | |
| workspace_id=ws.id, | |
| contact_id=contact.id, | |
| conversation_id=conversation.id, | |
| inbound_message="Hi", | |
| execution_instance=instance, | |
| session=db_session, | |
| ) | |
| # ExecutionStepLog written for the tool call | |
| step_result = await db_session.execute( | |
| select(ExecutionStepLog).where( | |
| ExecutionStepLog.execution_instance_id == instance.id | |
| ) | |
| ) | |
| step_log = step_result.scalars().first() | |
| assert step_log is not None | |
| assert step_log.output_data == {"tool": "send_reply"} | |
| # RuntimeEventLog has adk_turn_started + step_completed + adk_turn_completed | |
| event_result = await db_session.execute( | |
| select(RuntimeEventLog).where( | |
| RuntimeEventLog.related_ids["execution_instance_id"].as_string() | |
| == str(instance.id) | |
| ) | |
| ) | |
| events = event_result.scalars().all() | |
| event_types = {e.event_type for e in events} | |
| assert "runtime.adk_turn_started" in event_types | |
| assert "runtime.step_completed" in event_types | |
| assert "runtime.adk_turn_completed" in event_types | |
| # Instance completed | |
| await db_session.refresh(instance) | |
| assert instance.status == ExecutionStatus.COMPLETED | |
| # ββ E2E Test 2: exception marks instance FAILED βββββββββββββββββββββββββββ | |
| async def test_run_for_contact_exception_marks_instance_failed( | |
| db_session: AsyncSession, | |
| ): | |
| """ | |
| When the ADK runner raises, the ExecutionInstance is marked FAILED | |
| and the exception is re-raised. | |
| """ | |
| ws, contact, conversation, instance = await _setup_workspace_with_contact(db_session) | |
| async def _raise_on_run(**kwargs): | |
| raise RuntimeError("Gemini API unavailable") | |
| yield # pragma: no cover | |
| mock_svc = MagicMock() | |
| mock_svc.get_session = AsyncMock(return_value=None) | |
| mock_svc.create_session = AsyncMock(return_value=MagicMock()) | |
| mock_svc.append_event = AsyncMock() | |
| mock_svc.update_session = AsyncMock() | |
| with ( | |
| patch( | |
| "app.core.adk.agents.orchestrator.compile_workspace_prompt", | |
| new=AsyncMock(return_value=_compiled_prompt()), | |
| ), | |
| patch("google.adk.runners.Runner.run_async", side_effect=_raise_on_run), | |
| patch("app.core.adk.runner.LeadPilotSessionService", return_value=mock_svc), | |
| ): | |
| with pytest.raises(RuntimeError, match="Gemini API unavailable"): | |
| await run_for_contact( | |
| workspace_id=ws.id, | |
| contact_id=contact.id, | |
| conversation_id=conversation.id, | |
| inbound_message="Hi", | |
| execution_instance=instance, | |
| session=db_session, | |
| ) | |
| await db_session.refresh(instance) | |
| assert instance.status == ExecutionStatus.FAILED | |
| # ββ E2E Test 3: WAIT_DELAY resume cycle βββββββββββββββββββββββββββββββββββ | |
| async def test_wait_delay_instance_resumes_and_completes(db_session: AsyncSession): | |
| """ | |
| Simulate the WAIT_DELAY resume cycle: | |
| WAITING (resume_at elapsed) β mark RUNNING β run_for_contact β COMPLETED. | |
| This mirrors what resume_waiting_instances_task does, tested directly | |
| against run_for_contact to avoid Celery engine-session isolation issues. | |
| """ | |
| ws, contact, conversation, instance = await _setup_workspace_with_contact(db_session) | |
| # Put instance in WAITING state with an already-elapsed resume_at | |
| instance.status = ExecutionStatus.WAITING | |
| instance.resume_at = datetime.utcnow() - timedelta(minutes=5) | |
| db_session.add(instance) | |
| await db_session.flush() | |
| # Simulate the task's "mark RUNNING before calling run_for_contact" | |
| instance.status = ExecutionStatus.RUNNING | |
| db_session.add(instance) | |
| await db_session.flush() | |
| mock_event = MagicMock() | |
| mock_event.get_function_calls.return_value = [] | |
| async def _fake_run_async(**kwargs): | |
| yield mock_event | |
| mock_svc = MagicMock() | |
| mock_svc.get_session = AsyncMock(return_value=None) | |
| mock_svc.create_session = AsyncMock(return_value=MagicMock()) | |
| mock_svc.append_event = AsyncMock() | |
| mock_svc.update_session = AsyncMock() | |
| with ( | |
| patch( | |
| "app.core.adk.agents.orchestrator.compile_workspace_prompt", | |
| new=AsyncMock(return_value=_compiled_prompt()), | |
| ), | |
| patch("google.adk.runners.Runner.run_async", side_effect=_fake_run_async), | |
| patch("app.core.adk.runner.LeadPilotSessionService", return_value=mock_svc), | |
| ): | |
| await run_for_contact( | |
| workspace_id=ws.id, | |
| contact_id=contact.id, | |
| conversation_id=conversation.id, | |
| inbound_message="[Resumed after delay]", | |
| execution_instance=instance, | |
| session=db_session, | |
| ) | |
| await db_session.refresh(instance) | |
| assert instance.status == ExecutionStatus.COMPLETED | |