File size: 4,319 Bytes
0827183
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import json

import aiounittest

from botbuilder.schema import Activity
from botframework.streaming import ReceiveRequest, StreamingRequest
from botframework.streaming.payloads import ResponseMessageStream


class TestRequests(aiounittest.AsyncTestCase):
    async def test_receive_request_empty_streams(self):
        sut = ReceiveRequest()

        self.assertIsNotNone(sut.streams)
        self.assertEqual(0, len(sut.streams))

    async def test_receive_request_null_properties(self):
        sut = ReceiveRequest()

        self.assertIsNone(sut.verb)
        self.assertIsNone(sut.path)

    async def test_streaming_request_null_properties(self):
        sut = StreamingRequest()

        self.assertIsNone(sut.verb)
        self.assertIsNone(sut.path)

    async def test_streaming_request_add_stream_null_throws(self):
        sut = StreamingRequest()

        with self.assertRaises(TypeError):
            sut.add_stream(None)

    async def test_streaming_request_add_stream_success(self):
        sut = StreamingRequest()
        content = "hi"

        sut.add_stream(content)

        self.assertIsNotNone(sut.streams)
        self.assertEqual(1, len(sut.streams))
        self.assertEqual(content, sut.streams[0].content)

    async def test_streaming_request_add_stream_existing_list_success(self):
        sut = StreamingRequest()
        content = "hi"
        content_2 = "hello"

        sut.streams = [ResponseMessageStream(content=content_2)]

        sut.add_stream(content)

        self.assertIsNotNone(sut.streams)
        self.assertEqual(2, len(sut.streams))
        self.assertEqual(content_2, sut.streams[0].content)
        self.assertEqual(content, sut.streams[1].content)

    async def test_streaming_request_create_get_success(self):
        sut = StreamingRequest.create_get()

        self.assertEqual(StreamingRequest.GET, sut.verb)
        self.assertIsNone(sut.path)
        self.assertIsNone(sut.streams)

    async def test_streaming_request_create_post_success(self):
        sut = StreamingRequest.create_post()

        self.assertEqual(StreamingRequest.POST, sut.verb)
        self.assertIsNone(sut.path)
        self.assertIsNone(sut.streams)

    async def test_streaming_request_create_delete_success(self):
        sut = StreamingRequest.create_delete()

        self.assertEqual(StreamingRequest.DELETE, sut.verb)
        self.assertIsNone(sut.path)
        self.assertIsNone(sut.streams)

    async def test_streaming_request_create_put_success(self):
        sut = StreamingRequest.create_put()

        self.assertEqual(StreamingRequest.PUT, sut.verb)
        self.assertIsNone(sut.path)
        self.assertIsNone(sut.streams)

    async def test_streaming_request_create_with_body_success(self):
        content = "hi"
        sut = StreamingRequest.create_request(StreamingRequest.POST, "123", content)

        self.assertEqual(StreamingRequest.POST, sut.verb)
        self.assertEqual("123", sut.path)
        self.assertIsNotNone(sut.streams)
        self.assertEqual(1, len(sut.streams))
        self.assertEqual(content, sut.streams[0].content)

    async def test_streaming_request_set_body_string_success(self):
        sut = StreamingRequest()

        sut.set_body("123")

        self.assertIsNotNone(sut.streams)
        self.assertEqual(1, len(sut.streams))
        self.assertIsInstance(sut.streams[0].content, list)
        self.assertIsInstance(sut.streams[0].content[0], int)
        self.assertEqual("123", bytes(sut.streams[0].content).decode("utf-8-sig"))

    async def test_streaming_request_set_body_none_does_not_throw(self):
        sut = StreamingRequest()

        sut.set_body(None)

    async def test_streaming_request_set_body_success(self):
        sut = StreamingRequest()
        activity = Activity(text="hi", type="message")

        sut.set_body(activity)

        self.assertIsNotNone(sut.streams)
        self.assertEqual(1, len(sut.streams))
        self.assertIsInstance(sut.streams[0].content, list)
        self.assertIsInstance(sut.streams[0].content[0], int)

        assert_activity = Activity.deserialize(
            json.loads(bytes(sut.streams[0].content).decode("utf-8-sig"))
        )

        self.assertEqual(activity.text, assert_activity.text)
        self.assertEqual(activity.type, assert_activity.type)