Spaces:
Sleeping
Sleeping
File size: 9,290 Bytes
c64c726 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
from typing import Tuple, Union
import numpy as np
import pygame
from PIL import Image
import cv2
import os
from datetime import datetime
from csgo.action_processing import CSGOAction
from .dataset_env import DatasetEnv
from .play_env import PlayEnv
class Game:
def __init__(
self,
play_env: Union[PlayEnv, DatasetEnv],
size: Tuple[int, int],
mouse_multiplier: int,
fps: int,
verbose: bool,
) -> None:
self.env = play_env
self.height, self.width = size
self.mouse_multiplier = mouse_multiplier
self.fps = fps
self.verbose = verbose
self.env.print_controls()
print("\nControls:\n")
print(" m : switch control (human/replay)") # Not for main as Game can use either PlayEnv or DatasetEnv
print(" . : pause/unpause")
print(" e : step-by-step (when paused)")
print(" ⏎ : reset env")
print("Esc : quit")
print("\n")
input("Press enter to start")
def run(self) -> None:
pygame.init()
header_height = 150 if self.verbose else 0
header_width = 540
font_size = 16
screen = pygame.display.set_mode((0, 0), pygame.FULLSCREEN)
pygame.mouse.set_visible(False)
pygame.event.set_grab(True)
clock = pygame.time.Clock()
font = pygame.font.SysFont("mono", font_size)
x_center, y_center = screen.get_rect().center
x_header = x_center - header_width // 2
y_header = y_center - self.height // 2 - header_height - 10
header_rect = pygame.Rect(x_header, y_header, header_width, header_height)
def clear_header():
pygame.draw.rect(screen, pygame.Color("black"), header_rect)
pygame.draw.rect(screen, pygame.Color("white"), header_rect, 1)
def draw_text(text, idx_line, idx_column, num_cols):
x_pos = 5 + idx_column * int(header_width // num_cols)
y_pos = 5 + idx_line * font_size
assert (0 <= x_pos <= header_width) and (0 <= y_pos <= header_height)
screen.blit(font.render(text, True, pygame.Color("white")), (x_header + x_pos, y_header + y_pos))
def draw_obs(obs, obs_low_res=None):
assert obs.ndim == 4 and obs.size(0) == 1
img = Image.fromarray(obs[0].add(1).div(2).mul(255).byte().permute(1, 2, 0).cpu().numpy())
pygame_image = np.array(img.resize((self.width, self.height), resample=Image.BICUBIC)).transpose((1, 0, 2))
surface = pygame.surfarray.make_surface(pygame_image)
screen.blit(surface, (x_center - self.width // 2, y_center - self.height // 2))
if obs_low_res is not None:
assert obs_low_res.ndim == 4 and obs_low_res.size(0) == 1
img = Image.fromarray(obs_low_res[0].add(1).div(2).mul(255).byte().permute(1, 2, 0).cpu().numpy())
h = self.height * obs_low_res.size(2) // obs.size(2)
w = self.width * obs_low_res.size(3) // obs.size(3)
pygame_image = np.array(img.resize((w, h), resample=Image.BICUBIC)).transpose((1, 0, 2))
surface = pygame.surfarray.make_surface(pygame_image)
screen.blit(surface, (x_header + header_width - w - 5, y_header + 5 + font_size))
# screen.blit(surface, (x_center - w // 2, y_center + self.height // 2))
# Return the main pygame_image for video recording
return pygame_image.transpose((1, 0, 2))
def reset():
nonlocal obs, info, do_reset, ep_return, ep_length, keys_pressed, l_click, r_click
obs, info = self.env.reset()
pygame.event.clear()
do_reset = False
ep_return = 0
ep_length = 0
keys_pressed = []
l_click = r_click = False
obs, info, do_reset, ep_return, ep_length, keys_pressed, l_click, r_click = (None,) * 8
reset()
do_wait = False
should_stop = False
frame_list = []
# Initialize video recording
video_dir = "/home/alienware3/Documents/diamond/videos"
os.makedirs(video_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
video_filename = os.path.join(video_dir, f"game_recording_{timestamp}.mp4")
# Video writer will be initialized when we get the first frame
video_writer = None
recording_fps = self.fps
while not should_stop:
do_one_step = False
mouse_x, mouse_y = 0, 0
pygame.event.pump()
for event in pygame.event.get():
if event.type == pygame.QUIT or (event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE):
should_stop = True
if event.type == pygame.MOUSEMOTION:
mouse_x, mouse_y = event.rel
mouse_x *= self.mouse_multiplier
mouse_y *= self.mouse_multiplier
if event.type == pygame.MOUSEBUTTONDOWN:
if event.button == 1:
l_click = True
if event.button == 3:
r_click = True
elif event.type == pygame.MOUSEBUTTONUP:
if event.button == 1:
l_click = False
if event.button == 3:
r_click = False
if event.type == pygame.KEYDOWN:
keys_pressed.append(event.key)
elif event.type == pygame.KEYUP and event.key in keys_pressed:
keys_pressed.remove(event.key)
if event.type != pygame.KEYDOWN:
continue
if event.key == pygame.K_RETURN:
do_reset = True
if event.key == pygame.K_PERIOD:
do_wait = not do_wait
print("Game paused." if do_wait else "Game resumed.")
if event.key == pygame.K_e:
do_one_step = True
if event.key == pygame.K_m:
do_reset = self.env.next_mode()
if event.key == pygame.K_UP:
do_reset = self.env.next_axis_1()
if event.key == pygame.K_DOWN:
do_reset = self.env.prev_axis_1()
if event.key == pygame.K_RIGHT:
do_reset = self.env.next_axis_2()
if event.key == pygame.K_LEFT:
do_reset = self.env.prev_axis_2()
if do_reset:
reset()
if do_wait and not do_one_step:
continue
csgo_action = CSGOAction(keys_pressed, mouse_x, mouse_y, l_click, r_click)
next_obs, rew, end, trunc, info = self.env.step(csgo_action)
ep_return += rew.item()
ep_length += 1
if self.verbose and info is not None:
clear_header()
assert isinstance(info, dict) and "header" in info
header = info["header"]
num_cols = len(header)
for j, col in enumerate(header):
for i, row in enumerate(col):
draw_text(row, idx_line=i, idx_column=j, num_cols=num_cols)
draw_low_res = self.verbose and "obs_low_res" in info and self.width == 280
if draw_low_res:
current_frame = draw_obs(obs, info["obs_low_res"])
draw_text(" Pre-upsampling:", 0, 2, 3)
else:
current_frame = draw_obs(obs, None)
# Initialize video writer with first frame dimensions
if video_writer is None and current_frame is not None:
h, w, c = current_frame.shape
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(video_filename, fourcc, recording_fps, (w, h))
print(f"Started recording video: {video_filename}")
print(f"Video dimensions: {w}x{h}, FPS: {recording_fps}")
# Save frame to video
if video_writer is not None and current_frame is not None:
# Convert RGB to BGR for OpenCV
frame_bgr = cv2.cvtColor(current_frame.astype(np.uint8), cv2.COLOR_RGB2BGR)
# save image to folder
frame_dir = "/home/alienware3/Documents/diamond/frames"
os.makedirs(frame_dir, exist_ok=True)
cv2.imwrite(os.path.join(frame_dir, f"frame_{len(frame_list)}.png"), frame_bgr)
video_writer.write(frame_bgr)
frame_list.append(current_frame)
pygame.display.flip() # update screen
clock.tick(self.fps) # ensures game maintains the given frame rate
if end or trunc:
reset()
else:
obs = next_obs
# Finalize video recording
if video_writer is not None:
video_writer.release()
print(f"Video saved successfully: {video_filename}")
print(f"Total frames recorded: {len(frame_list)}")
pygame.quit() |