File size: 2,001 Bytes
0827183
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
from unittest import TestCase
from uuid import UUID, uuid4

from botframework.streaming.payloads import StreamManager
from botframework.streaming.payloads.assemblers import PayloadStreamAssembler
from botframework.streaming.payloads.models import Header


class TestPayloadAssembler(TestCase):
    def test_ctor_id(self):
        identifier: UUID = uuid4()
        stream_manager = StreamManager()
        assembler = PayloadStreamAssembler(stream_manager, identifier)
        self.assertEqual(identifier, assembler.identifier)

    def test_ctor_end_false(self):
        identifier: UUID = uuid4()
        stream_manager = StreamManager()
        assembler = PayloadStreamAssembler(stream_manager, identifier)
        self.assertFalse(assembler.end)

    def test_get_stream(self):
        identifier: UUID = uuid4()
        stream_manager = StreamManager()
        assembler = PayloadStreamAssembler(stream_manager, identifier)
        stream = assembler.get_payload_as_stream()
        self.assertIsNotNone(stream)

    def test_get_stream_does_not_make_new_each_time(self):
        identifier: UUID = uuid4()
        stream_manager = StreamManager()
        assembler = PayloadStreamAssembler(stream_manager, identifier)
        stream1 = assembler.get_payload_as_stream()
        stream2 = assembler.get_payload_as_stream()
        self.assertEqual(stream1, stream2)

    def test_on_receive_sets_end(self):
        identifier: UUID = uuid4()
        stream_manager = StreamManager()
        assembler = PayloadStreamAssembler(stream_manager, identifier)

        header = Header()
        header.end = True

        assembler.get_payload_as_stream()
        assembler.on_receive(header, [], 100)

        self.assertTrue(assembler.end)

    def test_close_does_not_set_end(self):
        identifier: UUID = uuid4()
        stream_manager = StreamManager()
        assembler = PayloadStreamAssembler(stream_manager, identifier)

        assembler.close()

        self.assertFalse(assembler.end)