File size: 8,016 Bytes
0827183
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

from unittest.mock import create_autospec, MagicMock
from typing import Dict
import aiounittest
from botbuilder.core.adapters import TestAdapter, TestFlow
from botbuilder.schema import Activity
from botbuilder.core import (
    ConversationState,
    MemoryStorage,
    TurnContext,
    NullTelemetryClient,
)
from botbuilder.dialogs import (
    Dialog,
    DialogInstance,
    DialogReason,
    DialogSet,
    WaterfallDialog,
    DialogTurnResult,
    DialogTurnStatus,
)

BEGIN_MESSAGE = Activity()
BEGIN_MESSAGE.text = "begin"
BEGIN_MESSAGE.type = "message"

MOCK_TELEMETRY = "botbuilder.applicationinsights.ApplicationInsightsTelemetryClient"


class TelemetryWaterfallTests(aiounittest.AsyncTestCase):
    def test_none_telemetry_client(self):
        # arrange
        dialog = WaterfallDialog("myId")
        # act
        dialog.telemetry_client = None
        # assert
        self.assertEqual(type(dialog.telemetry_client), NullTelemetryClient)

    async def test_execute_sequence_waterfall_steps(self):
        # arrange

        # Create new ConversationState with MemoryStorage and register the state as middleware.
        convo_state = ConversationState(MemoryStorage())
        telemetry = MagicMock(name=MOCK_TELEMETRY)

        # Create a DialogState property, DialogSet and register the WaterfallDialog.
        dialog_state = convo_state.create_property("dialogState")
        dialogs = DialogSet(dialog_state)

        async def step1(step) -> DialogTurnResult:
            await step.context.send_activity("bot responding.")
            return Dialog.end_of_turn

        async def step2(step) -> DialogTurnResult:
            await step.context.send_activity("ending WaterfallDialog.")
            return Dialog.end_of_turn

        # act

        my_dialog = WaterfallDialog("test", [step1, step2])
        my_dialog.telemetry_client = telemetry
        dialogs.add(my_dialog)

        # Initialize TestAdapter
        async def exec_test(turn_context: TurnContext) -> None:
            dialog_context = await dialogs.create_context(turn_context)
            results = await dialog_context.continue_dialog()
            if results.status == DialogTurnStatus.Empty:
                await dialog_context.begin_dialog("test")
            else:
                if results.status == DialogTurnStatus.Complete:
                    await turn_context.send_activity(results.result)

            await convo_state.save_changes(turn_context)

        adapt = TestAdapter(exec_test)

        test_flow = TestFlow(None, adapt)
        tf2 = await test_flow.send(BEGIN_MESSAGE)
        tf3 = await tf2.assert_reply("bot responding.")
        tf4 = await tf3.send("continue")
        await tf4.assert_reply("ending WaterfallDialog.")

        # assert
        telemetry_calls = [
            ("WaterfallStart", {"DialogId": "test"}),
            ("WaterfallStep", {"DialogId": "test", "StepName": step1.__qualname__}),
            ("WaterfallStep", {"DialogId": "test", "StepName": step2.__qualname__}),
        ]
        self.assert_telemetry_calls(telemetry, telemetry_calls)

    async def test_ensure_end_dialog_called(self):
        # arrange

        # Create new ConversationState with MemoryStorage and register the state as middleware.
        convo_state = ConversationState(MemoryStorage())
        telemetry = MagicMock(name=MOCK_TELEMETRY)

        # Create a DialogState property, DialogSet and register the WaterfallDialog.
        dialog_state = convo_state.create_property("dialogState")
        dialogs = DialogSet(dialog_state)

        async def step1(step) -> DialogTurnResult:
            await step.context.send_activity("step1 response")
            return Dialog.end_of_turn

        async def step2(step) -> DialogTurnResult:
            await step.context.send_activity("step2 response")
            return Dialog.end_of_turn

        # act

        my_dialog = WaterfallDialog("test", [step1, step2])
        my_dialog.telemetry_client = telemetry
        dialogs.add(my_dialog)

        # Initialize TestAdapter
        async def exec_test(turn_context: TurnContext) -> None:
            dialog_context = await dialogs.create_context(turn_context)
            await dialog_context.continue_dialog()
            if not turn_context.responded:
                await dialog_context.begin_dialog("test", None)
            await convo_state.save_changes(turn_context)

        adapt = TestAdapter(exec_test)

        test_flow = TestFlow(None, adapt)
        tf2 = await test_flow.send(BEGIN_MESSAGE)
        tf3 = await tf2.assert_reply("step1 response")
        tf4 = await tf3.send("continue")
        tf5 = await tf4.assert_reply("step2 response")
        await tf5.send(
            "Should hit end of steps - this will restart the dialog and trigger COMPLETE event"
        )
        # assert
        telemetry_calls = [
            ("WaterfallStart", {"DialogId": "test"}),
            ("WaterfallStep", {"DialogId": "test", "StepName": step1.__qualname__}),
            ("WaterfallStep", {"DialogId": "test", "StepName": step2.__qualname__}),
            ("WaterfallComplete", {"DialogId": "test"}),
            ("WaterfallStart", {"DialogId": "test"}),
            ("WaterfallStep", {"DialogId": "test", "StepName": step1.__qualname__}),
        ]
        self.assert_telemetry_calls(telemetry, telemetry_calls)

    async def test_cancelling_waterfall_telemetry(self):
        # Arrange
        dialog_id = "waterfall"
        index = 0
        guid = "(guid)"

        async def my_waterfall_step(step) -> DialogTurnResult:
            await step.context.send_activity("step1 response")
            return Dialog.end_of_turn

        dialog = WaterfallDialog(dialog_id, [my_waterfall_step])

        telemetry_client = create_autospec(NullTelemetryClient)
        dialog.telemetry_client = telemetry_client

        dialog_instance = DialogInstance()
        dialog_instance.id = dialog_id
        dialog_instance.state = {"instanceId": guid, "stepIndex": index}

        # Act
        await dialog.end_dialog(
            TurnContext(TestAdapter(), Activity()),
            dialog_instance,
            DialogReason.CancelCalled,
        )

        # Assert
        telemetry_props = telemetry_client.track_event.call_args_list[0][0][1]

        self.assertEqual(3, len(telemetry_props))
        self.assertEqual(dialog_id, telemetry_props["DialogId"])
        self.assertEqual(my_waterfall_step.__qualname__, telemetry_props["StepName"])
        self.assertEqual(guid, telemetry_props["InstanceId"])
        telemetry_client.track_event.assert_called_once()

    def assert_telemetry_call(

        self, telemetry_mock, index: int, event_name: str, props: Dict[str, str]

    ) -> None:
        # pylint: disable=unused-variable
        args, kwargs = telemetry_mock.track_event.call_args_list[index]
        self.assertEqual(args[0], event_name)

        for key, val in props.items():
            self.assertTrue(
                key in args[1],
                msg=f"Could not find value {key} in {args[1]} for index {index}",
            )
            self.assertTrue(isinstance(args[1], dict))
            self.assertTrue(val == args[1][key])

    def assert_telemetry_calls(self, telemetry_mock, calls) -> None:
        index = 0
        for event_name, props in calls:
            self.assert_telemetry_call(telemetry_mock, index, event_name, props)
            index += 1
        if index != len(telemetry_mock.track_event.call_args_list):
            self.assertTrue(  # pylint: disable=redundant-unittest-assert
                False,
                f"Found {len(telemetry_mock.track_event.call_args_list)} calls, testing for {index + 1}",
            )