VortexedSquirrel commited on
Commit
fe115bf
·
verified ·
1 Parent(s): ba9aef4

Upload src/tetris_env/server/game_engine.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. src/tetris_env/server/game_engine.py +297 -0
src/tetris_env/server/game_engine.py ADDED
@@ -0,0 +1,297 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Tetris Environment for OpenEnv.
3
+ Full game logic with combo scoring reward system.
4
+ """
5
+
6
+ import random
7
+ import copy
8
+ from typing import Optional
9
+
10
+ # Standard Tetris pieces (rotations handled by rotation logic)
11
+ PIECES = {
12
+ "I": [[1, 1, 1, 1]],
13
+ "O": [[1, 1],
14
+ [1, 1]],
15
+ "T": [[0, 1, 0],
16
+ [1, 1, 1]],
17
+ "S": [[0, 1, 1],
18
+ [1, 1, 0]],
19
+ "Z": [[1, 1, 0],
20
+ [0, 1, 1]],
21
+ "L": [[1, 0],
22
+ [1, 0],
23
+ [1, 1]],
24
+ "J": [[0, 1],
25
+ [0, 1],
26
+ [1, 1]],
27
+ }
28
+
29
+ BOARD_WIDTH = 10
30
+ BOARD_HEIGHT = 20
31
+
32
+ # Combo scoring: more lines cleared at once = disproportionately higher reward
33
+ LINE_REWARDS = {
34
+ 1: 100,
35
+ 2: 300,
36
+ 3: 700,
37
+ 4: 1500, # "Tetris!" — the dream
38
+ }
39
+
40
+ STEP_PENALTY = -1
41
+ HEIGHT_PENALTY_MULT = -2
42
+ HOLE_PENALTY_MULT = -5
43
+ GAME_OVER_PENALTY = -500
44
+
45
+
46
+ def rotate_cw(piece: list[list[int]]) -> list[list[int]]:
47
+ """Rotate piece 90 degrees clockwise."""
48
+ rows = len(piece)
49
+ cols = len(piece[0])
50
+ rotated = [[0] * rows for _ in range(cols)]
51
+ for r in range(rows):
52
+ for c in range(cols):
53
+ rotated[c][rows - 1 - r] = piece[r][c]
54
+ return rotated
55
+
56
+
57
+ def rotate_ccw(piece: list[list[int]]) -> list[list[int]]:
58
+ """Rotate piece 90 degrees counter-clockwise."""
59
+ rows = len(piece)
60
+ cols = len(piece[0])
61
+ rotated = [[0] * rows for _ in range(cols)]
62
+ for r in range(rows):
63
+ for c in range(cols):
64
+ rotated[cols - 1 - c][r] = piece[r][c]
65
+ return rotated
66
+
67
+
68
+ class TetrisEnv:
69
+ def __init__(self, seed: Optional[int] = None):
70
+ self.rng = random.Random(seed)
71
+ self.reset_state()
72
+
73
+ def reset_state(self, seed: Optional[int] = None):
74
+ if seed is not None:
75
+ self.rng = random.Random(seed)
76
+ self.board = [[0] * BOARD_WIDTH for _ in range(BOARD_HEIGHT)]
77
+ self.score = 0
78
+ self.total_lines = 0
79
+ self.steps = 0
80
+ self.done = False
81
+ self.current_piece = None
82
+ self.current_piece_name = ""
83
+ self.current_x = 0
84
+ self.current_y = 0
85
+ self.next_piece_name = ""
86
+ self.next_piece = None
87
+ self._spawn_next()
88
+ self._spawn_next()
89
+
90
+ def _spawn_next(self):
91
+ """Move next piece to current, generate new next piece."""
92
+ self.current_piece = self.next_piece
93
+ self.current_piece_name = self.next_piece_name
94
+ self.next_piece_name = self.rng.choice(list(PIECES.keys()))
95
+ self.next_piece = copy.deepcopy(PIECES[self.next_piece_name])
96
+
97
+ if self.current_piece is not None:
98
+ piece_width = len(self.current_piece[0])
99
+ self.current_x = BOARD_WIDTH // 2 - piece_width // 2
100
+ self.current_y = 0
101
+
102
+ if not self._is_valid_position(self.current_piece, self.current_x, self.current_y):
103
+ self.done = True
104
+
105
+ def _is_valid_position(self, piece: list[list[int]], x: int, y: int) -> bool:
106
+ """Check if piece at (x, y) doesn't collide with board or walls."""
107
+ for row_idx, row in enumerate(piece):
108
+ for col_idx, cell in enumerate(row):
109
+ if cell == 0:
110
+ continue
111
+ board_x = x + col_idx
112
+ board_y = y + row_idx
113
+ if board_x < 0 or board_x >= BOARD_WIDTH:
114
+ return False
115
+ if board_y < 0 or board_y >= BOARD_HEIGHT:
116
+ return False
117
+ if self.board[board_y][board_x] != 0:
118
+ return False
119
+ return True
120
+
121
+ def _lock_piece(self):
122
+ """Lock current piece into the board."""
123
+ for row_idx, row in enumerate(self.current_piece):
124
+ for col_idx, cell in enumerate(row):
125
+ if cell:
126
+ bx = self.current_x + col_idx
127
+ by = self.current_y + row_idx
128
+ if 0 <= by < BOARD_HEIGHT and 0 <= bx < BOARD_WIDTH:
129
+ self.board[by][bx] = 1
130
+
131
+ def _clear_lines(self) -> int:
132
+ """Clear completed lines. Returns number of lines cleared."""
133
+ lines_cleared = 0
134
+ new_board = []
135
+ for row in self.board:
136
+ if all(cell == 1 for cell in row):
137
+ lines_cleared += 1
138
+ else:
139
+ new_board.append(row)
140
+
141
+ # Add empty rows at the top
142
+ while len(new_board) < BOARD_HEIGHT:
143
+ new_board.insert(0, [0] * BOARD_WIDTH)
144
+
145
+ self.board = new_board
146
+ self.total_lines += lines_cleared
147
+ return lines_cleared
148
+
149
+ def _count_holes(self) -> int:
150
+ """Count holes: empty cells with at least one filled cell above them."""
151
+ holes = 0
152
+ for col in range(BOARD_WIDTH):
153
+ found_block = False
154
+ for row in range(BOARD_HEIGHT):
155
+ if self.board[row][col] == 1:
156
+ found_block = True
157
+ elif found_block and self.board[row][col] == 0:
158
+ holes += 1
159
+ return holes
160
+
161
+ def _max_height(self) -> int:
162
+ """Height of the tallest column."""
163
+ for row in range(BOARD_HEIGHT):
164
+ if any(cell == 1 for cell in self.board[row]):
165
+ return BOARD_HEIGHT - row
166
+ return 0
167
+
168
+ def _drop_piece(self):
169
+ """Hard drop: move piece down until it can't go further."""
170
+ while self._is_valid_position(self.current_piece, self.current_x, self.current_y + 1):
171
+ self.current_y += 1
172
+
173
+ def get_board_with_piece(self) -> list[list[int]]:
174
+ """Return board with current piece overlaid (for observation)."""
175
+ display = copy.deepcopy(self.board)
176
+ if self.current_piece and not self.done:
177
+ for row_idx, row in enumerate(self.current_piece):
178
+ for col_idx, cell in enumerate(row):
179
+ if cell:
180
+ bx = self.current_x + col_idx
181
+ by = self.current_y + row_idx
182
+ if 0 <= by < BOARD_HEIGHT and 0 <= bx < BOARD_WIDTH:
183
+ display[by][bx] = 2 # 2 = current piece
184
+ return display
185
+
186
+ def board_to_text(self) -> str:
187
+ """Render board as text for LLM observation."""
188
+ display = self.get_board_with_piece()
189
+ symbols = {0: ".", 1: "#", 2: "@"}
190
+ lines = []
191
+ lines.append("+" + "-" * BOARD_WIDTH + "+")
192
+ for row in display:
193
+ line = "|" + "".join(symbols[c] for c in row) + "|"
194
+ lines.append(line)
195
+ lines.append("+" + "-" * BOARD_WIDTH + "+")
196
+ return "\n".join(lines)
197
+
198
+ def piece_to_text(self, piece: list[list[int]]) -> str:
199
+ """Render a piece as text."""
200
+ return "\n".join("".join("#" if c else "." for c in row) for row in piece)
201
+
202
+ def step(self, action: str) -> dict:
203
+ """
204
+ Execute one action. Valid actions:
205
+ - "left": move piece left
206
+ - "right": move piece right
207
+ - "rotate_cw": rotate clockwise
208
+ - "rotate_ccw": rotate counter-clockwise
209
+ - "drop": hard drop and lock
210
+ - "down": soft drop one row
211
+ - "noop": do nothing (piece falls one row)
212
+
213
+ Returns dict with: observation, reward, done, info
214
+ """
215
+ if self.done:
216
+ return self._make_result(0)
217
+
218
+ self.steps += 1
219
+ reward = STEP_PENALTY # base penalty per step
220
+
221
+ action = action.strip().lower()
222
+
223
+ if action == "left":
224
+ if self._is_valid_position(self.current_piece, self.current_x - 1, self.current_y):
225
+ self.current_x -= 1
226
+ elif action == "right":
227
+ if self._is_valid_position(self.current_piece, self.current_x + 1, self.current_y):
228
+ self.current_x += 1
229
+ elif action == "rotate_cw":
230
+ rotated = rotate_cw(self.current_piece)
231
+ if self._is_valid_position(rotated, self.current_x, self.current_y):
232
+ self.current_piece = rotated
233
+ elif action == "rotate_ccw":
234
+ rotated = rotate_ccw(self.current_piece)
235
+ if self._is_valid_position(rotated, self.current_x, self.current_y):
236
+ self.current_piece = rotated
237
+ elif action == "drop":
238
+ self._drop_piece()
239
+ elif action == "down":
240
+ if self._is_valid_position(self.current_piece, self.current_x, self.current_y + 1):
241
+ self.current_y += 1
242
+ elif action == "noop":
243
+ pass
244
+
245
+ # After action: try to move piece down (gravity)
246
+ if action != "drop":
247
+ if self._is_valid_position(self.current_piece, self.current_x, self.current_y + 1):
248
+ self.current_y += 1
249
+ else:
250
+ # Can't move down — lock piece
251
+ self._lock_piece()
252
+ lines = self._clear_lines()
253
+ if lines > 0:
254
+ reward += LINE_REWARDS.get(lines, lines * 400)
255
+ self.score += LINE_REWARDS.get(lines, lines * 400)
256
+ self._spawn_next()
257
+ else:
258
+ # Drop action: lock immediately
259
+ self._lock_piece()
260
+ lines = self._clear_lines()
261
+ if lines > 0:
262
+ reward += LINE_REWARDS.get(lines, lines * 400)
263
+ self.score += LINE_REWARDS.get(lines, lines * 400)
264
+ self._spawn_next()
265
+
266
+ # Penalties for board state
267
+ reward += HEIGHT_PENALTY_MULT * self._max_height()
268
+ reward += HOLE_PENALTY_MULT * self._count_holes()
269
+
270
+ if self.done:
271
+ reward += GAME_OVER_PENALTY
272
+
273
+ return self._make_result(reward)
274
+
275
+ def _make_result(self, reward: float) -> dict:
276
+ """Build the observation/result dict."""
277
+ return {
278
+ "board": self.board_to_text(),
279
+ "current_piece": self.current_piece_name,
280
+ "current_piece_shape": self.piece_to_text(self.current_piece) if self.current_piece else "",
281
+ "next_piece": self.next_piece_name,
282
+ "next_piece_shape": self.piece_to_text(self.next_piece) if self.next_piece else "",
283
+ "piece_x": self.current_x,
284
+ "piece_y": self.current_y,
285
+ "score": self.score,
286
+ "total_lines": self.total_lines,
287
+ "steps": self.steps,
288
+ "max_height": self._max_height(),
289
+ "holes": self._count_holes(),
290
+ "reward": reward,
291
+ "done": self.done,
292
+ }
293
+
294
+ def reset(self, seed: Optional[int] = None) -> dict:
295
+ """Reset the environment. Returns initial observation."""
296
+ self.reset_state(seed)
297
+ return self._make_result(0)