privateboss's picture
Update utils.py
07fb5e6 verified
import gymnasium as gym
from gymnasium.vector import AsyncVectorEnv
import numpy as np
import tensorflow as tf
import json
import os
import time
from reward_shaping import LunarLanderRewardShaping
from config import *
def save_resume_data(filepath, timesteps, episodes):
"""
Saves the current training state to a JSON file located at the specified filepath.
"""
data = {
"timesteps": timesteps,
"episode_count": episodes,
"date": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}
try:
with open(filepath, 'w') as f:
json.dump(data, f)
except Exception as e:
print(f"Error saving resume data to {filepath}: {e}")
def load_resume_data(filepath):
"""
Loads the last saved training state from the specified JSON file.
Returns (timesteps, episode_count).
"""
if os.path.exists(filepath):
try:
with open(filepath, 'r') as f:
data = json.load(f)
timesteps = data.get("timesteps", 0)
episodes = data.get("episode_count", 0)
return (timesteps, episodes)
except Exception as e:
print(f"Error loading resume data from {filepath}: {e}. Starting from scratch.")
return (0, 0)
def make_env(env_id, seed, idx, **kwargs):
"""
Creates a single environment instance with a unique seed and applies necessary wrappers.
"""
def thunk():
env = gym.make(env_id, **kwargs)
env = LunarLanderRewardShaping(env)
env = gym.wrappers.RecordEpisodeStatistics(env)
env.action_space.seed(seed + idx)
env.observation_space.seed(seed + idx)
return env
return thunk
def make_parallel_envs(env_id, num_envs, seed):
"""
Creates multiple environments and wraps them in an AsyncVectorEnv.
"""
env_fns = [make_env(env_id, seed, i) for i in range(num_envs)]
return AsyncVectorEnv(env_fns)
def calculate_gae(rewards, values, terminated, truncated, next_value, gamma=GAMMA, gae_lambda=GAE_LAMBDA):
"""
Calculates Generalized Advantage Estimation (GAE) and Returns (R) from rollout data.
"""
advantages = np.zeros_like(rewards, dtype=np.float32)
last_gae_lambda = 0
for t in reversed(range(N_STEPS)):
done_mask = 1.0 - (terminated[t] | truncated[t]).astype(np.float32)
if t == N_STEPS - 1:
next_non_terminal_value = next_value
next_value_actual = values[t]
else:
next_non_terminal_value = values[t + 1]
next_value_actual = values[t]
delta = rewards[t] + gamma * next_non_terminal_value * done_mask - next_value_actual
advantages[t] = delta + gamma * gae_lambda * done_mask * last_gae_lambda
last_gae_lambda = advantages[t]
returns = advantages + values
return advantages, returns