Spaces:
Build error
Build error
Validify-testbot-1
/
botbuilder-python
/libraries
/botbuilder-applicationinsights
/tests
/test_telemetry_waterfall.py
| # Copyright (c) Microsoft Corporation. All rights reserved. | |
| # Licensed under the MIT License. | |
| from unittest.mock import create_autospec, MagicMock | |
| from typing import Dict | |
| import aiounittest | |
| from botbuilder.core.adapters import TestAdapter, TestFlow | |
| from botbuilder.schema import Activity | |
| from botbuilder.core import ( | |
| ConversationState, | |
| MemoryStorage, | |
| TurnContext, | |
| NullTelemetryClient, | |
| ) | |
| from botbuilder.dialogs import ( | |
| Dialog, | |
| DialogInstance, | |
| DialogReason, | |
| DialogSet, | |
| WaterfallDialog, | |
| DialogTurnResult, | |
| DialogTurnStatus, | |
| ) | |
| BEGIN_MESSAGE = Activity() | |
| BEGIN_MESSAGE.text = "begin" | |
| BEGIN_MESSAGE.type = "message" | |
| MOCK_TELEMETRY = "botbuilder.applicationinsights.ApplicationInsightsTelemetryClient" | |
| class TelemetryWaterfallTests(aiounittest.AsyncTestCase): | |
| def test_none_telemetry_client(self): | |
| # arrange | |
| dialog = WaterfallDialog("myId") | |
| # act | |
| dialog.telemetry_client = None | |
| # assert | |
| self.assertEqual(type(dialog.telemetry_client), NullTelemetryClient) | |
| async def test_execute_sequence_waterfall_steps(self): | |
| # arrange | |
| # Create new ConversationState with MemoryStorage and register the state as middleware. | |
| convo_state = ConversationState(MemoryStorage()) | |
| telemetry = MagicMock(name=MOCK_TELEMETRY) | |
| # Create a DialogState property, DialogSet and register the WaterfallDialog. | |
| dialog_state = convo_state.create_property("dialogState") | |
| dialogs = DialogSet(dialog_state) | |
| async def step1(step) -> DialogTurnResult: | |
| await step.context.send_activity("bot responding.") | |
| return Dialog.end_of_turn | |
| async def step2(step) -> DialogTurnResult: | |
| await step.context.send_activity("ending WaterfallDialog.") | |
| return Dialog.end_of_turn | |
| # act | |
| my_dialog = WaterfallDialog("test", [step1, step2]) | |
| my_dialog.telemetry_client = telemetry | |
| dialogs.add(my_dialog) | |
| # Initialize TestAdapter | |
| async def exec_test(turn_context: TurnContext) -> None: | |
| dialog_context = await dialogs.create_context(turn_context) | |
| results = await dialog_context.continue_dialog() | |
| if results.status == DialogTurnStatus.Empty: | |
| await dialog_context.begin_dialog("test") | |
| else: | |
| if results.status == DialogTurnStatus.Complete: | |
| await turn_context.send_activity(results.result) | |
| await convo_state.save_changes(turn_context) | |
| adapt = TestAdapter(exec_test) | |
| test_flow = TestFlow(None, adapt) | |
| tf2 = await test_flow.send(BEGIN_MESSAGE) | |
| tf3 = await tf2.assert_reply("bot responding.") | |
| tf4 = await tf3.send("continue") | |
| await tf4.assert_reply("ending WaterfallDialog.") | |
| # assert | |
| telemetry_calls = [ | |
| ("WaterfallStart", {"DialogId": "test"}), | |
| ("WaterfallStep", {"DialogId": "test", "StepName": step1.__qualname__}), | |
| ("WaterfallStep", {"DialogId": "test", "StepName": step2.__qualname__}), | |
| ] | |
| self.assert_telemetry_calls(telemetry, telemetry_calls) | |
| async def test_ensure_end_dialog_called(self): | |
| # arrange | |
| # Create new ConversationState with MemoryStorage and register the state as middleware. | |
| convo_state = ConversationState(MemoryStorage()) | |
| telemetry = MagicMock(name=MOCK_TELEMETRY) | |
| # Create a DialogState property, DialogSet and register the WaterfallDialog. | |
| dialog_state = convo_state.create_property("dialogState") | |
| dialogs = DialogSet(dialog_state) | |
| async def step1(step) -> DialogTurnResult: | |
| await step.context.send_activity("step1 response") | |
| return Dialog.end_of_turn | |
| async def step2(step) -> DialogTurnResult: | |
| await step.context.send_activity("step2 response") | |
| return Dialog.end_of_turn | |
| # act | |
| my_dialog = WaterfallDialog("test", [step1, step2]) | |
| my_dialog.telemetry_client = telemetry | |
| dialogs.add(my_dialog) | |
| # Initialize TestAdapter | |
| async def exec_test(turn_context: TurnContext) -> None: | |
| dialog_context = await dialogs.create_context(turn_context) | |
| await dialog_context.continue_dialog() | |
| if not turn_context.responded: | |
| await dialog_context.begin_dialog("test", None) | |
| await convo_state.save_changes(turn_context) | |
| adapt = TestAdapter(exec_test) | |
| test_flow = TestFlow(None, adapt) | |
| tf2 = await test_flow.send(BEGIN_MESSAGE) | |
| tf3 = await tf2.assert_reply("step1 response") | |
| tf4 = await tf3.send("continue") | |
| tf5 = await tf4.assert_reply("step2 response") | |
| await tf5.send( | |
| "Should hit end of steps - this will restart the dialog and trigger COMPLETE event" | |
| ) | |
| # assert | |
| telemetry_calls = [ | |
| ("WaterfallStart", {"DialogId": "test"}), | |
| ("WaterfallStep", {"DialogId": "test", "StepName": step1.__qualname__}), | |
| ("WaterfallStep", {"DialogId": "test", "StepName": step2.__qualname__}), | |
| ("WaterfallComplete", {"DialogId": "test"}), | |
| ("WaterfallStart", {"DialogId": "test"}), | |
| ("WaterfallStep", {"DialogId": "test", "StepName": step1.__qualname__}), | |
| ] | |
| self.assert_telemetry_calls(telemetry, telemetry_calls) | |
| async def test_cancelling_waterfall_telemetry(self): | |
| # Arrange | |
| dialog_id = "waterfall" | |
| index = 0 | |
| guid = "(guid)" | |
| async def my_waterfall_step(step) -> DialogTurnResult: | |
| await step.context.send_activity("step1 response") | |
| return Dialog.end_of_turn | |
| dialog = WaterfallDialog(dialog_id, [my_waterfall_step]) | |
| telemetry_client = create_autospec(NullTelemetryClient) | |
| dialog.telemetry_client = telemetry_client | |
| dialog_instance = DialogInstance() | |
| dialog_instance.id = dialog_id | |
| dialog_instance.state = {"instanceId": guid, "stepIndex": index} | |
| # Act | |
| await dialog.end_dialog( | |
| TurnContext(TestAdapter(), Activity()), | |
| dialog_instance, | |
| DialogReason.CancelCalled, | |
| ) | |
| # Assert | |
| telemetry_props = telemetry_client.track_event.call_args_list[0][0][1] | |
| self.assertEqual(3, len(telemetry_props)) | |
| self.assertEqual(dialog_id, telemetry_props["DialogId"]) | |
| self.assertEqual(my_waterfall_step.__qualname__, telemetry_props["StepName"]) | |
| self.assertEqual(guid, telemetry_props["InstanceId"]) | |
| telemetry_client.track_event.assert_called_once() | |
| def assert_telemetry_call( | |
| self, telemetry_mock, index: int, event_name: str, props: Dict[str, str] | |
| ) -> None: | |
| # pylint: disable=unused-variable | |
| args, kwargs = telemetry_mock.track_event.call_args_list[index] | |
| self.assertEqual(args[0], event_name) | |
| for key, val in props.items(): | |
| self.assertTrue( | |
| key in args[1], | |
| msg=f"Could not find value {key} in {args[1]} for index {index}", | |
| ) | |
| self.assertTrue(isinstance(args[1], dict)) | |
| self.assertTrue(val == args[1][key]) | |
| def assert_telemetry_calls(self, telemetry_mock, calls) -> None: | |
| index = 0 | |
| for event_name, props in calls: | |
| self.assert_telemetry_call(telemetry_mock, index, event_name, props) | |
| index += 1 | |
| if index != len(telemetry_mock.track_event.call_args_list): | |
| self.assertTrue( # pylint: disable=redundant-unittest-assert | |
| False, | |
| f"Found {len(telemetry_mock.track_event.call_args_list)} calls, testing for {index + 1}", | |
| ) | |