tetris-env / src /tetris_env /server /game_engine.py
OutOfMystic's picture
v0.6.0: make height_breach_penalty configurable per TetrisEnv instance
86f507b
"""
Tetris Environment for OpenEnv.
Full game logic with combo scoring reward system.
"""
__version__ = "0.6.0" # configurable height_breach_penalty per instance
import random
import copy
from typing import Optional
# Standard Tetris pieces (rotations handled by rotation logic)
PIECES = {
"I": [[1, 1, 1, 1]],
"O": [[1, 1],
[1, 1]],
"T": [[0, 1, 0],
[1, 1, 1]],
"S": [[0, 1, 1],
[1, 1, 0]],
"Z": [[1, 1, 0],
[0, 1, 1]],
"L": [[1, 0],
[1, 0],
[1, 1]],
"J": [[0, 1],
[0, 1],
[1, 1]],
}
BOARD_WIDTH = 10
BOARD_HEIGHT = 20
# Combo scoring: more lines cleared at once = disproportionately higher reward
LINE_REWARDS = {
1: 100,
2: 300,
3: 700,
4: 1500, # "Tetris!" — the dream
}
STEP_PENALTY = -0.1
HOLE_PENALTY_MULT = -5
GAME_OVER_PENALTY = -50
HEIGHT_BREACH_THRESHOLD = 4
HEIGHT_BREACH_PENALTY = -50 # per level above threshold, decays with pieces_locked
def rotate_cw(piece: list[list[int]]) -> list[list[int]]:
"""Rotate piece 90 degrees clockwise."""
rows = len(piece)
cols = len(piece[0])
rotated = [[0] * rows for _ in range(cols)]
for r in range(rows):
for c in range(cols):
rotated[c][rows - 1 - r] = piece[r][c]
return rotated
def rotate_ccw(piece: list[list[int]]) -> list[list[int]]:
"""Rotate piece 90 degrees counter-clockwise."""
rows = len(piece)
cols = len(piece[0])
rotated = [[0] * rows for _ in range(cols)]
for r in range(rows):
for c in range(cols):
rotated[cols - 1 - c][r] = piece[r][c]
return rotated
class TetrisEnv:
def __init__(self, seed: Optional[int] = None, height_breach_penalty: float = HEIGHT_BREACH_PENALTY):
self.rng = random.Random(seed)
self.height_breach_penalty = height_breach_penalty
self.reset_state()
def reset_state(self, seed: Optional[int] = None):
if seed is not None:
self.rng = random.Random(seed)
self.board = [[0] * BOARD_WIDTH for _ in range(BOARD_HEIGHT)]
self.score = 0
self.total_lines = 0
self.steps = 0
self.done = False
self.current_piece = None
self.current_piece_name = ""
self.current_x = 0
self.current_y = 0
self.next_piece_name = ""
self.next_piece = None
self.max_penalized_height = HEIGHT_BREACH_THRESHOLD
self.pieces_locked = 0
self._spawn_next()
self._spawn_next()
def _spawn_next(self):
"""Move next piece to current, generate new next piece."""
self.current_piece = self.next_piece
self.current_piece_name = self.next_piece_name
self.next_piece_name = self.rng.choice(list(PIECES.keys()))
self.next_piece = copy.deepcopy(PIECES[self.next_piece_name])
if self.current_piece is not None:
piece_width = len(self.current_piece[0])
self.current_x = BOARD_WIDTH // 2 - piece_width // 2
self.current_y = 0
if not self._is_valid_position(self.current_piece, self.current_x, self.current_y):
self.done = True
def _is_valid_position(self, piece: list[list[int]], x: int, y: int) -> bool:
"""Check if piece at (x, y) doesn't collide with board or walls."""
for row_idx, row in enumerate(piece):
for col_idx, cell in enumerate(row):
if cell == 0:
continue
board_x = x + col_idx
board_y = y + row_idx
if board_x < 0 or board_x >= BOARD_WIDTH:
return False
if board_y < 0 or board_y >= BOARD_HEIGHT:
return False
if self.board[board_y][board_x] != 0:
return False
return True
def _lock_piece(self):
"""Lock current piece into the board."""
for row_idx, row in enumerate(self.current_piece):
for col_idx, cell in enumerate(row):
if cell:
bx = self.current_x + col_idx
by = self.current_y + row_idx
if 0 <= by < BOARD_HEIGHT and 0 <= bx < BOARD_WIDTH:
self.board[by][bx] = 1
def _clear_lines(self) -> int:
"""Clear completed lines. Returns number of lines cleared."""
lines_cleared = 0
new_board = []
for row in self.board:
if all(cell == 1 for cell in row):
lines_cleared += 1
else:
new_board.append(row)
# Add empty rows at the top
while len(new_board) < BOARD_HEIGHT:
new_board.insert(0, [0] * BOARD_WIDTH)
self.board = new_board
self.total_lines += lines_cleared
return lines_cleared
def _count_holes(self) -> int:
"""Count holes: empty cells with at least one filled cell above them."""
holes = 0
for col in range(BOARD_WIDTH):
found_block = False
for row in range(BOARD_HEIGHT):
if self.board[row][col] == 1:
found_block = True
elif found_block and self.board[row][col] == 0:
holes += 1
return holes
def _max_height(self) -> int:
"""Height of the tallest column."""
for row in range(BOARD_HEIGHT):
if any(cell == 1 for cell in self.board[row]):
return BOARD_HEIGHT - row
return 0
def _drop_piece(self):
"""Hard drop: move piece down until it can't go further."""
while self._is_valid_position(self.current_piece, self.current_x, self.current_y + 1):
self.current_y += 1
def get_board_with_piece(self) -> list[list[int]]:
"""Return board with current piece overlaid (for observation)."""
display = copy.deepcopy(self.board)
if self.current_piece and not self.done:
for row_idx, row in enumerate(self.current_piece):
for col_idx, cell in enumerate(row):
if cell:
bx = self.current_x + col_idx
by = self.current_y + row_idx
if 0 <= by < BOARD_HEIGHT and 0 <= bx < BOARD_WIDTH:
display[by][bx] = 2 # 2 = current piece
return display
def board_to_text(self) -> str:
"""Render board as text for LLM observation."""
display = self.get_board_with_piece()
symbols = {0: ".", 1: "#", 2: "@"}
lines = []
lines.append("+" + "-" * BOARD_WIDTH + "+")
for row in display:
line = "|" + "".join(symbols[c] for c in row) + "|"
lines.append(line)
lines.append("+" + "-" * BOARD_WIDTH + "+")
return "\n".join(lines)
def piece_to_text(self, piece: list[list[int]]) -> str:
"""Render a piece as text."""
return "\n".join("".join("#" if c else "." for c in row) for row in piece)
def step(self, action: str) -> dict:
"""
Execute one action. Valid actions:
- "left": move piece left
- "right": move piece right
- "rotate_cw": rotate clockwise
- "rotate_ccw": rotate counter-clockwise
- "drop": hard drop and lock
- "down": soft drop one row
- "noop": do nothing (piece falls one row)
Returns dict with: observation, reward, done, info
"""
if self.done:
return self._make_result(0)
self.steps += 1
reward = STEP_PENALTY # base penalty per step
holes_before = self._count_holes()
action = action.strip().lower()
if action == "left":
if self._is_valid_position(self.current_piece, self.current_x - 1, self.current_y):
self.current_x -= 1
elif action == "right":
if self._is_valid_position(self.current_piece, self.current_x + 1, self.current_y):
self.current_x += 1
elif action == "rotate_cw":
rotated = rotate_cw(self.current_piece)
if self._is_valid_position(rotated, self.current_x, self.current_y):
self.current_piece = rotated
elif action == "rotate_ccw":
rotated = rotate_ccw(self.current_piece)
if self._is_valid_position(rotated, self.current_x, self.current_y):
self.current_piece = rotated
elif action == "drop":
self._drop_piece()
elif action == "down":
if self._is_valid_position(self.current_piece, self.current_x, self.current_y + 1):
self.current_y += 1
elif action == "noop":
pass
# After action: try to move piece down (gravity)
if action != "drop":
if self._is_valid_position(self.current_piece, self.current_x, self.current_y + 1):
self.current_y += 1
else:
# Can't move down — lock piece
self._lock_piece()
self.pieces_locked += 1
lines = self._clear_lines()
if lines > 0:
reward += LINE_REWARDS.get(lines, lines * 400)
self.score += LINE_REWARDS.get(lines, lines * 400)
self._spawn_next()
else:
# Drop action: lock immediately
self._lock_piece()
self.pieces_locked += 1
lines = self._clear_lines()
if lines > 0:
reward += LINE_REWARDS.get(lines, lines * 400)
self.score += LINE_REWARDS.get(lines, lines * 400)
self._spawn_next()
# Penalty only for NEW holes created by this step
new_holes = self._count_holes() - holes_before
if new_holes > 0:
reward += HOLE_PENALTY_MULT * new_holes
# One-time penalty for each height level breached above threshold
# Decays by 5 per piece locked: piece 0 → -50, piece 9 → -5, piece 10+ → 0
current_height = self._max_height()
if current_height > self.max_penalized_height:
penalty_per_level = min(0, self.height_breach_penalty + 5 * self.pieces_locked)
if penalty_per_level < 0:
new_levels = current_height - self.max_penalized_height
reward += penalty_per_level * new_levels
self.max_penalized_height = current_height
if self.done:
reward += GAME_OVER_PENALTY
return self._make_result(reward)
def _make_result(self, reward: float) -> dict:
"""Build the observation/result dict."""
return {
"board": self.board_to_text(),
"current_piece": self.current_piece_name,
"current_piece_shape": self.piece_to_text(self.current_piece) if self.current_piece else "",
"next_piece": self.next_piece_name,
"next_piece_shape": self.piece_to_text(self.next_piece) if self.next_piece else "",
"piece_x": self.current_x,
"piece_y": self.current_y,
"score": self.score,
"total_lines": self.total_lines,
"steps": self.steps,
"max_height": self._max_height(),
"holes": self._count_holes(),
"reward": reward,
"done": self.done,
}
def reset(self, seed: Optional[int] = None) -> dict:
"""Reset the environment. Returns initial observation."""
self.reset_state(seed)
return self._make_result(0)