File size: 9,940 Bytes
fe0625d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
"""
test_required.py โ€”โ€” ๅ››ไธชๆŒ‡ๅฎš้ชŒๆ”ถๆต‹่ฏ•็”จไพ‹

TC-R1  test_dimension_and_channels
TC-R2  test_map_connectivity
TC-R3  test_termination_and_truncation
TC-R4  test_seeding_reproducibility
"""

from __future__ import annotations

from collections import deque

import numpy as np
import pytest

from maze_env import MazeEnv


# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ่พ…ๅŠฉๅ‡ฝๆ•ฐ๏ผš็‹ฌ็ซ‹ BFS๏ผŒ็”จไบŽๅค–้ƒจ้ชŒ่ฏ่ฟทๅฎซ่ฟž้€šๆ€ง
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

def _bfs_connected(wall_map: np.ndarray, start: tuple[int, int],
                   goal: tuple[int, int]) -> bool:
    """ๅฏน็ป™ๅฎš wall_map ๆ‰ง่กŒ BFS๏ผŒ่ฟ”ๅ›ž start ๅˆฐ goal ๆ˜ฏๅฆๅฏ่พพใ€‚

    Args:
        wall_map: shape (N, N) float32 ๆ•ฐ็ป„๏ผŒ1.0 ไปฃ่กจๅข™๏ผŒ0.0 ไปฃ่กจๅฏ้€š่กŒใ€‚
        start:    ่ตทๅง‹ๆ ผๅๆ ‡ (row, col)ใ€‚
        goal:     ็›ฎๆ ‡ๆ ผๅๆ ‡ (row, col)ใ€‚

    Returns:
        True ่กจ็คบๅฏ่พพ๏ผŒFalse ่กจ็คบไธๅฏ่พพใ€‚
    """
    N = wall_map.shape[0]
    visited: set[tuple[int, int]] = {start}
    queue: deque[tuple[int, int]] = deque([start])
    deltas = [(-1, 0), (1, 0), (0, -1), (0, 1)]
    while queue:
        r, c = queue.popleft()
        if (r, c) == goal:
            return True
        for dr, dc in deltas:
            nr, nc = r + dr, c + dc
            if (0 <= nr < N and 0 <= nc < N
                    and wall_map[nr, nc] == 0.0
                    and (nr, nc) not in visited):
                visited.add((nr, nc))
                queue.append((nr, nc))
    return (start == goal)


# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# TC-R1  test_dimension_and_channels
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

class TestDimensionAndChannels:
    """TC-R1๏ผš10ร—10 ็Žฏๅขƒ็š„่ง‚ๆต‹ๅฝข็ŠถไธŽ้€š้“่ฏญไน‰้ชŒ่ฏใ€‚"""

    @pytest.mark.unit
    def test_dimension_and_channels(self) -> None:
        """ๅฎžไพ‹ๅŒ– 10ร—10 ็Žฏๅขƒ๏ผŒๆ–ญ่จ€ obs.shape==(3,10,10)๏ผŒ
        Agent ้€š้“ๅ’Œ็ปˆ็‚น้€š้“ๅ„่‡ช sum==1ใ€‚

        ่พ“ๅ…ฅ:  MazeEnv(grid_size=10, obstacle_density=0.3, seed=0).reset()
        ๆœŸๆœ›:
          obs.shape == (4, 10, 10)
          obs[1].sum() == 1.0   ๏ผˆAgent ้€š้“๏ผšๅ”ฏไธ€ๆฟ€ๆดปๆ ผ๏ผ‰
          obs[2].sum() == 1.0   ๏ผˆ็ปˆ็‚น้€š้“๏ผšๅ”ฏไธ€ๆฟ€ๆดปๆ ผ๏ผ‰
        ๅฎžๆต‹:  obs ๅ„็ปดๅบฆๅŠ้€š้“ sum
        """
        env = MazeEnv(grid_size=10, obstacle_density=0.3, seed=0)
        obs, _ = env.reset()

        assert obs.shape == (4, 10, 10), \
            f"obs.shape ๆœŸๆœ› (3,10,10)๏ผŒๅฎž้™… {obs.shape}"
        assert float(obs[1].sum()) == 1.0, \
            f"Agent ้€š้“ (obs[1]) ๅบ”ๆฐๅฅฝๆœ‰ 1 ไธชๆฟ€ๆดปๆ ผ๏ผŒๅฎž้™… sum={obs[1].sum()}"
        assert float(obs[2].sum()) == 1.0, \
            f"็ปˆ็‚น้€š้“ (obs[2]) ๅบ”ๆฐๅฅฝๆœ‰ 1 ไธชๆฟ€ๆดปๆ ผ๏ผŒๅฎž้™… sum={obs[2].sum()}"


# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# TC-R2  test_map_connectivity
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

class TestMapConnectivity:
    """TC-R2๏ผš่ฟž็ปญ 100 ๆฌก reset๏ผŒๆฏๆฌก็”จ็‹ฌ็ซ‹ BFS ้ชŒ่ฏ่ตท็ปˆ็‚นๅฏ่พพใ€‚"""

    @pytest.mark.slow
    def test_map_connectivity(self) -> None:
        """ๅพช็Žฏ reset() 100 ๆฌก๏ผŒ็”จๅค–้ƒจ BFS ็‹ฌ็ซ‹้ชŒ่ฏๆฏๅผ ๅœฐๅ›พ็š„่ตท็ปˆ็‚น่ฟž้€šๆ€งใ€‚

        ่พ“ๅ…ฅ:  MazeEnv(grid_size=10, obstacle_density=0.45)๏ผŒreset() ร— 100
        ๆœŸๆœ›:  100% ่ฟž้€š๏ผˆไปปๆ„ไธ€ๆฌกไธ่ฟž้€šๅณๅคฑ่ดฅ๏ผ‰
        ๅฎžๆต‹:  ๅค–้ƒจ BFS ้ชŒ่ฏ wall_map๏ผˆobs[0]๏ผ‰๏ผŒstart=(1,1)๏ผŒgoal=(N-2,N-2)
        """
        N = 10
        start = (1, 1)
        goal  = (N - 2, N - 2)
        env = MazeEnv(grid_size=N, obstacle_density=0.45)

        for i in range(100):
            obs, info = env.reset()
            wall_map = obs[0]  # ้€š้“ 0 = ้™ๆ€ๅข™ๅฃๅœฐๅ›พ

            connected = _bfs_connected(wall_map, start, goal)
            assert connected, (
                f"็ฌฌ {i+1} ๆฌก reset๏ผšBFS ้ชŒ่ฏ่ตท็‚น {start} โ†’ ็ปˆ็‚น {goal} ไธ่ฟž้€š๏ผŒ"
                f"่ฏดๆ˜Ž MazeEnv ็š„่ฟž้€šๆ€ง่ฟ‡ๆปคๅ™จๆœช็”Ÿๆ•ˆใ€‚"
            )


# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# TC-R3  test_termination_and_truncation
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

class TestTerminationAndTruncation:
    """TC-R3๏ผš้ชŒ่ฏ truncated๏ผˆๆญฅๆ•ฐ่€—ๅฐฝ๏ผ‰ไธŽ terminated๏ผˆๅˆฐ่พพ็ปˆ็‚น๏ผ‰็š„ไบ’ๆ–ฅ่ฏญไน‰ใ€‚"""

    @pytest.mark.integration
    def test_termination_and_truncation(self) -> None:
        """Part A๏ผš็–ฏ็‹‚ๆ’žๅข™็›ด่‡ณ max_steps=50๏ผŒๆ–ญ่จ€ truncated=True & terminated=Falseใ€‚
        Part B๏ผšๆ‰‹ๅŠจๅฐ† agent ็ฝฎไบŽ็ปˆ็‚น้™„่ฟ‘๏ผŒๆ‰ง่กŒๆœ€ๅŽไธ€ๆญฅ๏ผŒๆ–ญ่จ€ terminated=True & truncated=Falseใ€‚

        ่พ“ๅ…ฅ:
          Part A: MazeEnv(grid_size=6, obstacle_density=0.0, seed=0, max_steps=50)
                  ๅๅค step(0)๏ผˆๆŒ็ปญๆ’žไธŠ่พน็•Œๅข™๏ผ‰ร— 50
          Part B: ๅŒไธ€็Žฏๅขƒ reset()๏ผŒ้€š่ฟ‡ๅˆๆณ•็งปๅŠจๅผ•ๅฏผ agent ๅˆฐ็ปˆ็‚น (4,4)

        ๆœŸๆœ›:
          Part A: truncated is True, terminated is False
          Part B: terminated is True, truncated is False
        """
        # โ”€โ”€ Part A๏ผšๆญฅๆ•ฐ่€—ๅฐฝ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
        env = MazeEnv(grid_size=6, obstacle_density=0.0, seed=0, max_steps=50)
        env.reset()

        terminated = truncated = False
        for _ in range(50):
            _, _, terminated, truncated, _ = env.step(0)  # ๆŒ็ปญๆ’ž่พน็•Œๅข™

        assert truncated  is True,  "ๆญฅๆ•ฐ่€—ๅฐฝ๏ผˆmax_steps=50๏ผ‰ๆ—ถ๏ผŒtruncated ๅบ”ไธบ True"
        assert terminated is False, "ๆญฅๆ•ฐ่€—ๅฐฝไฝ†ๆœชๅˆฐ็ปˆ็‚นๆ—ถ๏ผŒterminated ๅบ”ไธบ False"

        # โ”€โ”€ Part B๏ผšๆ‰‹ๅŠจๅผ•ๅฏผๅˆฐ็ปˆ็‚น โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
        env2 = MazeEnv(grid_size=6, obstacle_density=0.0, seed=0, max_steps=200)
        env2.reset()

        # ไปŽ (1,1) ๅณ็งป 3 ๆญฅๅˆฐ (1,4)๏ผŒๅ†ไธ‹็งป 3 ๆญฅๅˆฐ (4,4) = goal
        for _ in range(3):
            env2.step(3)   # ๅณ
        for _ in range(2):
            env2.step(1)   # ไธ‹
        _, _, terminated, truncated, info = env2.step(1)  # ๅˆฐ่พพ (4,4)

        assert terminated is True,  "ๅˆฐ่พพ็ปˆ็‚นๆ—ถ๏ผŒterminated ๅบ”ไธบ True"
        assert truncated  is False, "ๅˆฐ่พพ็ปˆ็‚นๆ—ถ๏ผŒtruncated ๅฟ…้กปไธบ False๏ผˆไธฅๆ ผไบ’ๆ–ฅ๏ผ‰"
        assert info["success"] is True, "ๅˆฐ่พพ็ปˆ็‚นๅŽ success ๅบ”ไธบ True"


# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# TC-R4  test_seeding_reproducibility
# โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

class TestSeedingReproducibility:
    """TC-R4๏ผš็›ธๅŒ seed=42 ็š„ไธคไธช็‹ฌ็ซ‹ๅฎžไพ‹ไบง็”ŸๅฎŒๅ…จ็›ธๅŒ็š„ๅœฐๅ›พใ€่ตท็‚นใ€็ปˆ็‚นใ€‚"""

    @pytest.mark.unit
    def test_seeding_reproducibility(self) -> None:
        """็”จ็›ธๅŒ seed=42 ๅˆๅง‹ๅŒ–ไธคไธช็‹ฌ็ซ‹ MazeEnv ๅฎžไพ‹๏ผŒๅˆ†ๅˆซ reset()๏ผŒ
        ๆ–ญ่จ€ๅœฐๅ›พ็Ÿฉ้˜ตใ€agent ไฝ็ฝฎใ€goal ไฝ็ฝฎๅฎŒๅ…จไธ€่‡ดใ€‚

        ่พ“ๅ…ฅ:
          env_a = MazeEnv(grid_size=10, obstacle_density=0.3, seed=42)
          env_b = MazeEnv(grid_size=10, obstacle_density=0.3, seed=42)
          ๅ„่ฐƒ็”จ reset()

        ๆœŸๆœ›:
          np.array_equal(obs_a[0], obs_b[0])  โ€” ๅœฐๅ›พๅฎŒๅ…จไธ€่‡ด
          info_a["agent_pos"] == info_b["agent_pos"]
          info_a["goal_pos"]  == info_b["goal_pos"]
          np.array_equal(obs_a, obs_b)         โ€” ไธ‰้€š้“ๅ…จ้ƒจไธ€่‡ด
        """
        env_a = MazeEnv(grid_size=10, obstacle_density=0.3, seed=42)
        env_b = MazeEnv(grid_size=10, obstacle_density=0.3, seed=42)

        obs_a, info_a = env_a.reset()
        obs_b, info_b = env_b.reset()

        assert np.array_equal(obs_a[0], obs_b[0]), \
            "็›ธๅŒ seed ็š„ไธคไธชๅฎžไพ‹ๅบ”ไบง็”ŸๅฎŒๅ…จ็›ธๅŒ็š„ๅข™ๅฃๅœฐๅ›พ๏ผˆobs[0]๏ผ‰"
        assert info_a["agent_pos"] == info_b["agent_pos"], \
            "็›ธๅŒ seed ็š„ไธคไธชๅฎžไพ‹ๅบ”ไบง็”Ÿ็›ธๅŒ็š„่ตท็‚น"
        assert info_a["goal_pos"]  == info_b["goal_pos"], \
            "็›ธๅŒ seed ็š„ไธคไธชๅฎžไพ‹ๅบ”ไบง็”Ÿ็›ธๅŒ็š„็ปˆ็‚น"
        assert np.array_equal(obs_a, obs_b), \
            "็›ธๅŒ seed ็š„ไธคไธชๅฎžไพ‹ๆ•ดไฝ“่ง‚ๆต‹๏ผˆไธ‰้€š้“๏ผ‰ๅบ”ๅฎŒๅ…จไธ€่‡ด"