|
|
|
|
|
""" |
|
|
Complete Reinforcement Learning Implementation from Scratch |
|
|
Author: Claude + Stevan |
|
|
No external RL libraries - only numpy and standard library |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
import pickle |
|
|
import os |
|
|
import time |
|
|
import argparse |
|
|
from collections import deque |
|
|
from typing import Tuple, List, Dict, Optional, Union, Callable |
|
|
import struct |
|
|
import json |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GridWorld: |
|
|
""" |
|
|
Custom GridWorld environment implemented from scratch. |
|
|
Agent navigates grid to reach goal while avoiding obstacles. |
|
|
|
|
|
FIXED: Now uses deterministic grid layout that persists across resets. |
|
|
State representation includes noise for training stability. |
|
|
Proper reward shaping: -1 per move, -10 pit/wall, +10 goal. |
|
|
""" |
|
|
|
|
|
EMPTY = 0 |
|
|
WALL = 1 |
|
|
GOAL = 2 |
|
|
PIT = 3 |
|
|
AGENT = 4 |
|
|
|
|
|
UP = 0 |
|
|
DOWN = 1 |
|
|
LEFT = 2 |
|
|
RIGHT = 3 |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
width: int = 4, |
|
|
height: int = 4, |
|
|
mode: str = 'static', |
|
|
max_steps: int = 50, |
|
|
seed: Optional[int] = None |
|
|
): |
|
|
self.width = width |
|
|
self.height = height |
|
|
self.mode = mode |
|
|
self.max_steps = max_steps |
|
|
|
|
|
self.n_states = width * height * 4 |
|
|
self.n_actions = 4 |
|
|
self.state_shape = (height, width, 4) |
|
|
self.state_dim = self.n_states |
|
|
|
|
|
self.action_names = ['UP', 'DOWN', 'LEFT', 'RIGHT'] |
|
|
self.action_deltas = { |
|
|
self.UP: (-1, 0), |
|
|
self.DOWN: (1, 0), |
|
|
self.LEFT: (0, -1), |
|
|
self.RIGHT: (0, 1) |
|
|
} |
|
|
|
|
|
self.rng = np.random.RandomState(seed) |
|
|
self.initial_seed = seed |
|
|
|
|
|
self.board = None |
|
|
self.agent_pos = None |
|
|
self.goal_pos = None |
|
|
self.pit_pos = None |
|
|
self.wall_pos = None |
|
|
self.start_pos = None |
|
|
self.step_count = 0 |
|
|
self.total_reward = 0.0 |
|
|
self.done = False |
|
|
|
|
|
self._fixed_layout = None |
|
|
self._generate_grid() |
|
|
self._fixed_layout = self._save_layout() |
|
|
|
|
|
def _save_layout(self) -> Dict: |
|
|
return { |
|
|
'board': self.board.copy(), |
|
|
'goal_pos': self.goal_pos, |
|
|
'pit_pos': self.pit_pos, |
|
|
'wall_pos': self.wall_pos, |
|
|
'start_pos': self.start_pos |
|
|
} |
|
|
|
|
|
def _restore_layout(self): |
|
|
if self._fixed_layout is not None: |
|
|
self.board = self._fixed_layout['board'].copy() |
|
|
self.goal_pos = self._fixed_layout['goal_pos'] |
|
|
self.pit_pos = self._fixed_layout['pit_pos'] |
|
|
self.wall_pos = self._fixed_layout['wall_pos'] |
|
|
self.start_pos = self._fixed_layout['start_pos'] |
|
|
|
|
|
def _generate_grid(self) -> None: |
|
|
self.board = np.zeros((4, self.height, self.width), dtype=np.float32) |
|
|
|
|
|
self.start_pos = (0, 0) |
|
|
self.agent_pos = list(self.start_pos) |
|
|
|
|
|
if self.mode == 'static': |
|
|
self.goal_pos = (self.height - 1, self.width - 1) |
|
|
self.pit_pos = (self.height - 1, 1) if self.width > 2 else None |
|
|
self.wall_pos = (1, 1) if self.width > 2 and self.height > 2 else None |
|
|
else: |
|
|
available = [] |
|
|
for i in range(self.height): |
|
|
for j in range(self.width): |
|
|
if (i, j) != self.start_pos: |
|
|
available.append((i, j)) |
|
|
self.rng.shuffle(available) |
|
|
self.goal_pos = available[0] |
|
|
self.pit_pos = available[1] if len(available) > 1 else None |
|
|
self.wall_pos = available[2] if len(available) > 2 else None |
|
|
|
|
|
self.board[0, self.agent_pos[0], self.agent_pos[1]] = 1.0 |
|
|
self.board[1, self.goal_pos[0], self.goal_pos[1]] = 1.0 |
|
|
if self.pit_pos: |
|
|
self.board[2, self.pit_pos[0], self.pit_pos[1]] = 1.0 |
|
|
if self.wall_pos: |
|
|
self.board[3, self.wall_pos[0], self.wall_pos[1]] = 1.0 |
|
|
|
|
|
def reset(self, seed: Optional[int] = None) -> np.ndarray: |
|
|
if self.mode == 'static' and self._fixed_layout is not None: |
|
|
self._restore_layout() |
|
|
elif seed is not None or self.mode == 'random': |
|
|
if seed is not None: |
|
|
self.rng = np.random.RandomState(seed) |
|
|
self._generate_grid() |
|
|
else: |
|
|
self._restore_layout() |
|
|
|
|
|
self.agent_pos = list(self.start_pos) |
|
|
self.board[0] = 0.0 |
|
|
self.board[0, self.agent_pos[0], self.agent_pos[1]] = 1.0 |
|
|
|
|
|
self.step_count = 0 |
|
|
self.total_reward = 0.0 |
|
|
self.done = False |
|
|
|
|
|
return self._get_state() |
|
|
|
|
|
def _get_state(self) -> np.ndarray: |
|
|
state = self.board.flatten().astype(np.float32) |
|
|
noise = self.rng.rand(len(state)).astype(np.float32) / 100.0 |
|
|
return state + noise |
|
|
|
|
|
def render_np(self) -> np.ndarray: |
|
|
return self.board.copy() |
|
|
|
|
|
def _is_valid_pos(self, pos: List[int]) -> bool: |
|
|
row, col = pos |
|
|
if row < 0 or row >= self.height: |
|
|
return False |
|
|
if col < 0 or col >= self.width: |
|
|
return False |
|
|
if self.wall_pos and (row, col) == self.wall_pos: |
|
|
return False |
|
|
return True |
|
|
|
|
|
def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict]: |
|
|
if self.done: |
|
|
return self._get_state(), 0.0, True, {'episode_ended': True} |
|
|
|
|
|
self.step_count += 1 |
|
|
|
|
|
delta = self.action_deltas[action] |
|
|
new_pos = [self.agent_pos[0] + delta[0], self.agent_pos[1] + delta[1]] |
|
|
|
|
|
reward = -1.0 |
|
|
done = False |
|
|
info = {} |
|
|
|
|
|
if not self._is_valid_pos(new_pos): |
|
|
reward = -10.0 |
|
|
info['hit_wall'] = True |
|
|
else: |
|
|
self.board[0, self.agent_pos[0], self.agent_pos[1]] = 0.0 |
|
|
self.agent_pos = new_pos |
|
|
self.board[0, self.agent_pos[0], self.agent_pos[1]] = 1.0 |
|
|
|
|
|
if tuple(self.agent_pos) == self.goal_pos: |
|
|
reward = 10.0 |
|
|
done = True |
|
|
info['reached_goal'] = True |
|
|
elif self.pit_pos and tuple(self.agent_pos) == self.pit_pos: |
|
|
reward = -10.0 |
|
|
done = True |
|
|
info['fell_in_pit'] = True |
|
|
|
|
|
if self.step_count >= self.max_steps: |
|
|
done = True |
|
|
info['max_steps_reached'] = True |
|
|
|
|
|
self.total_reward += reward |
|
|
self.done = done |
|
|
info['step'] = self.step_count |
|
|
info['total_reward'] = self.total_reward |
|
|
|
|
|
return self._get_state(), reward, done, info |
|
|
|
|
|
def render(self, mode: str = 'ascii') -> Optional[str]: |
|
|
symbols = { |
|
|
'empty': '.', |
|
|
'agent': 'A', |
|
|
'goal': 'G', |
|
|
'pit': 'X', |
|
|
'wall': '#' |
|
|
} |
|
|
|
|
|
lines = [] |
|
|
lines.append('=' * (self.width * 2 + 3)) |
|
|
for row in range(self.height): |
|
|
line = '| ' |
|
|
for col in range(self.width): |
|
|
if self.board[0, row, col] == 1.0: |
|
|
line += symbols['agent'] + ' ' |
|
|
elif self.board[1, row, col] == 1.0: |
|
|
line += symbols['goal'] + ' ' |
|
|
elif self.board[2, row, col] == 1.0: |
|
|
line += symbols['pit'] + ' ' |
|
|
elif self.board[3, row, col] == 1.0: |
|
|
line += symbols['wall'] + ' ' |
|
|
else: |
|
|
line += symbols['empty'] + ' ' |
|
|
line += '|' |
|
|
lines.append(line) |
|
|
lines.append('=' * (self.width * 2 + 3)) |
|
|
lines.append(f'Step: {self.step_count} | Reward: {self.total_reward:.2f}') |
|
|
|
|
|
output = '\n'.join(lines) |
|
|
|
|
|
if mode == 'ascii': |
|
|
print(output) |
|
|
return None |
|
|
elif mode == 'string': |
|
|
return output |
|
|
|
|
|
return output |
|
|
|
|
|
def get_valid_actions(self) -> List[int]: |
|
|
valid = [] |
|
|
for action in range(self.n_actions): |
|
|
delta = self.action_deltas[action] |
|
|
new_pos = [self.agent_pos[0] + delta[0], self.agent_pos[1] + delta[1]] |
|
|
if self._is_valid_pos(new_pos): |
|
|
valid.append(action) |
|
|
return valid if valid else list(range(self.n_actions)) |
|
|
|
|
|
def clone(self) -> 'GridWorld': |
|
|
env = GridWorld.__new__(GridWorld) |
|
|
env.width = self.width |
|
|
env.height = self.height |
|
|
env.mode = self.mode |
|
|
env.max_steps = self.max_steps |
|
|
env.n_states = self.n_states |
|
|
env.n_actions = self.n_actions |
|
|
env.state_shape = self.state_shape |
|
|
env.state_dim = self.state_dim |
|
|
env.action_names = self.action_names |
|
|
env.action_deltas = self.action_deltas |
|
|
env.rng = np.random.RandomState() |
|
|
env.rng.set_state(self.rng.get_state()) |
|
|
env.board = self.board.copy() |
|
|
env.agent_pos = self.agent_pos.copy() |
|
|
env.goal_pos = self.goal_pos |
|
|
env.pit_pos = self.pit_pos |
|
|
env.wall_pos = self.wall_pos |
|
|
env.start_pos = self.start_pos |
|
|
env.step_count = self.step_count |
|
|
env.total_reward = self.total_reward |
|
|
env.done = self.done |
|
|
env._fixed_layout = self._fixed_layout.copy() if self._fixed_layout else None |
|
|
return env |
|
|
|
|
|
|
|
|
class ContinuousCartPole: |
|
|
""" |
|
|
CartPole environment with continuous state space. |
|
|
Implemented from scratch using physics equations. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
gravity: float = 9.8, |
|
|
cart_mass: float = 1.0, |
|
|
pole_mass: float = 0.1, |
|
|
pole_length: float = 0.5, |
|
|
force_mag: float = 10.0, |
|
|
dt: float = 0.02, |
|
|
max_steps: int = 500, |
|
|
seed: Optional[int] = None |
|
|
): |
|
|
self.gravity = gravity |
|
|
self.cart_mass = cart_mass |
|
|
self.pole_mass = pole_mass |
|
|
self.pole_length = pole_length |
|
|
self.force_mag = force_mag |
|
|
self.dt = dt |
|
|
self.max_steps = max_steps |
|
|
|
|
|
self.total_mass = cart_mass + pole_mass |
|
|
self.pole_mass_length = pole_mass * pole_length |
|
|
|
|
|
self.x_threshold = 2.4 |
|
|
self.theta_threshold = 12 * np.pi / 180 |
|
|
|
|
|
self.n_actions = 2 |
|
|
self.state_dim = 4 |
|
|
|
|
|
self.rng = np.random.RandomState(seed) |
|
|
self.state = None |
|
|
self.step_count = 0 |
|
|
self.done = False |
|
|
|
|
|
def reset(self, seed: Optional[int] = None) -> np.ndarray: |
|
|
if seed is not None: |
|
|
self.rng = np.random.RandomState(seed) |
|
|
|
|
|
self.state = self.rng.uniform(-0.05, 0.05, size=(4,)).astype(np.float32) |
|
|
self.step_count = 0 |
|
|
self.done = False |
|
|
|
|
|
return self.state.copy() |
|
|
|
|
|
def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict]: |
|
|
if self.done: |
|
|
return self.state.copy(), 0.0, True, {} |
|
|
|
|
|
x, x_dot, theta, theta_dot = self.state |
|
|
|
|
|
force = self.force_mag if action == 1 else -self.force_mag |
|
|
|
|
|
cos_theta = np.cos(theta) |
|
|
sin_theta = np.sin(theta) |
|
|
|
|
|
temp = (force + self.pole_mass_length * theta_dot ** 2 * sin_theta) / self.total_mass |
|
|
|
|
|
theta_acc = (self.gravity * sin_theta - cos_theta * temp) / ( |
|
|
self.pole_length * (4.0 / 3.0 - self.pole_mass * cos_theta ** 2 / self.total_mass) |
|
|
) |
|
|
|
|
|
x_acc = temp - self.pole_mass_length * theta_acc * cos_theta / self.total_mass |
|
|
|
|
|
x = x + self.dt * x_dot |
|
|
x_dot = x_dot + self.dt * x_acc |
|
|
theta = theta + self.dt * theta_dot |
|
|
theta_dot = theta_dot + self.dt * theta_acc |
|
|
|
|
|
self.state = np.array([x, x_dot, theta, theta_dot], dtype=np.float32) |
|
|
self.step_count += 1 |
|
|
|
|
|
done = bool( |
|
|
x < -self.x_threshold |
|
|
or x > self.x_threshold |
|
|
or theta < -self.theta_threshold |
|
|
or theta > self.theta_threshold |
|
|
or self.step_count >= self.max_steps |
|
|
) |
|
|
|
|
|
reward = 1.0 if not done else 0.0 |
|
|
if self.step_count >= self.max_steps: |
|
|
reward = 1.0 |
|
|
|
|
|
self.done = done |
|
|
|
|
|
info = { |
|
|
'step': self.step_count, |
|
|
'x': x, |
|
|
'theta': theta |
|
|
} |
|
|
|
|
|
return self.state.copy(), reward, done, info |
|
|
|
|
|
def render(self, mode: str = 'ascii') -> Optional[str]: |
|
|
if self.state is None: |
|
|
return None |
|
|
|
|
|
x, _, theta, _ = self.state |
|
|
|
|
|
width = 60 |
|
|
cart_pos = int((x / self.x_threshold + 1) * width / 2) |
|
|
cart_pos = max(2, min(width - 3, cart_pos)) |
|
|
|
|
|
pole_len = 4 |
|
|
pole_dx = int(pole_len * np.sin(theta)) |
|
|
pole_dy = int(pole_len * np.cos(theta)) |
|
|
|
|
|
lines = [] |
|
|
lines.append('=' * width) |
|
|
|
|
|
for row in range(-pole_len, 2): |
|
|
line = [' '] * width |
|
|
if row == 1: |
|
|
line[cart_pos-1:cart_pos+2] = ['[', 'C', ']'] |
|
|
elif row == 0: |
|
|
line[cart_pos] = '|' |
|
|
else: |
|
|
expected_row = -row |
|
|
if 0 <= expected_row <= pole_len: |
|
|
expected_dx = int(expected_row * np.sin(theta)) |
|
|
pole_x = cart_pos + expected_dx |
|
|
if 0 <= pole_x < width: |
|
|
line[pole_x] = '*' |
|
|
lines.append(''.join(line)) |
|
|
|
|
|
lines.append('-' * width) |
|
|
lines.append(f'Step: {self.step_count} | x: {x:.2f} | theta: {np.degrees(theta):.1f}°') |
|
|
lines.append('=' * width) |
|
|
|
|
|
output = '\n'.join(lines) |
|
|
|
|
|
if mode == 'ascii': |
|
|
print(output) |
|
|
return None |
|
|
|
|
|
return output |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Tensor: |
|
|
"""Simple tensor wrapper for automatic gradient tracking.""" |
|
|
|
|
|
def __init__(self, data: np.ndarray, requires_grad: bool = False): |
|
|
self.data = np.asarray(data, dtype=np.float32) |
|
|
self.requires_grad = requires_grad |
|
|
self.grad = None |
|
|
self._backward = lambda: None |
|
|
self._prev = set() |
|
|
|
|
|
@property |
|
|
def shape(self): |
|
|
return self.data.shape |
|
|
|
|
|
def zero_grad(self): |
|
|
self.grad = None |
|
|
|
|
|
|
|
|
class LinearLayer: |
|
|
"""Fully connected layer with weights and biases.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
in_features: int, |
|
|
out_features: int, |
|
|
bias: bool = True, |
|
|
init_method: str = 'xavier' |
|
|
): |
|
|
self.in_features = in_features |
|
|
self.out_features = out_features |
|
|
self.use_bias = bias |
|
|
|
|
|
if init_method == 'xavier': |
|
|
limit = np.sqrt(6.0 / (in_features + out_features)) |
|
|
self.weights = np.random.uniform(-limit, limit, (in_features, out_features)).astype(np.float32) |
|
|
elif init_method == 'he': |
|
|
std = np.sqrt(2.0 / in_features) |
|
|
self.weights = np.random.randn(in_features, out_features).astype(np.float32) * std |
|
|
elif init_method == 'normal': |
|
|
self.weights = np.random.randn(in_features, out_features).astype(np.float32) * 0.01 |
|
|
else: |
|
|
self.weights = np.zeros((in_features, out_features), dtype=np.float32) |
|
|
|
|
|
if bias: |
|
|
self.bias = np.zeros(out_features, dtype=np.float32) |
|
|
else: |
|
|
self.bias = None |
|
|
|
|
|
self.weight_grad = np.zeros_like(self.weights) |
|
|
self.bias_grad = np.zeros(out_features, dtype=np.float32) if bias else None |
|
|
|
|
|
self._input_cache = None |
|
|
|
|
|
def forward(self, x: np.ndarray) -> np.ndarray: |
|
|
self._input_cache = x.copy() |
|
|
output = np.dot(x, self.weights) |
|
|
if self.use_bias: |
|
|
output += self.bias |
|
|
return output |
|
|
|
|
|
def backward(self, grad_output: np.ndarray) -> np.ndarray: |
|
|
batch_size = grad_output.shape[0] if grad_output.ndim > 1 else 1 |
|
|
|
|
|
if self._input_cache.ndim == 1: |
|
|
self._input_cache = self._input_cache.reshape(1, -1) |
|
|
if grad_output.ndim == 1: |
|
|
grad_output = grad_output.reshape(1, -1) |
|
|
|
|
|
|
|
|
self.weight_grad[:] = np.dot(self._input_cache.T, grad_output) / batch_size |
|
|
|
|
|
if self.use_bias: |
|
|
self.bias_grad[:] = np.mean(grad_output, axis=0) |
|
|
|
|
|
grad_input = np.dot(grad_output, self.weights.T) |
|
|
|
|
|
return grad_input |
|
|
|
|
|
def get_params(self) -> List[Tuple[np.ndarray, np.ndarray]]: |
|
|
params = [(self.weights, self.weight_grad)] |
|
|
if self.use_bias: |
|
|
params.append((self.bias, self.bias_grad)) |
|
|
return params |
|
|
|
|
|
def zero_grad(self): |
|
|
self.weight_grad.fill(0) |
|
|
if self.bias_grad is not None: |
|
|
self.bias_grad.fill(0) |
|
|
|
|
|
|
|
|
class ReLU: |
|
|
"""Rectified Linear Unit activation.""" |
|
|
|
|
|
def __init__(self): |
|
|
self._mask = None |
|
|
|
|
|
def forward(self, x: np.ndarray) -> np.ndarray: |
|
|
self._mask = (x > 0).astype(np.float32) |
|
|
return x * self._mask |
|
|
|
|
|
def backward(self, grad_output: np.ndarray) -> np.ndarray: |
|
|
return grad_output * self._mask |
|
|
|
|
|
def get_params(self) -> List: |
|
|
return [] |
|
|
|
|
|
def zero_grad(self): |
|
|
pass |
|
|
|
|
|
|
|
|
class LeakyReLU: |
|
|
"""Leaky ReLU activation.""" |
|
|
|
|
|
def __init__(self, negative_slope: float = 0.01): |
|
|
self.negative_slope = negative_slope |
|
|
self._mask = None |
|
|
|
|
|
def forward(self, x: np.ndarray) -> np.ndarray: |
|
|
self._mask = (x > 0).astype(np.float32) |
|
|
return np.where(x > 0, x, x * self.negative_slope) |
|
|
|
|
|
def backward(self, grad_output: np.ndarray) -> np.ndarray: |
|
|
return grad_output * np.where(self._mask > 0, 1.0, self.negative_slope) |
|
|
|
|
|
def get_params(self) -> List: |
|
|
return [] |
|
|
|
|
|
def zero_grad(self): |
|
|
pass |
|
|
|
|
|
|
|
|
class Sigmoid: |
|
|
"""Sigmoid activation function.""" |
|
|
|
|
|
def __init__(self): |
|
|
self._output = None |
|
|
|
|
|
def forward(self, x: np.ndarray) -> np.ndarray: |
|
|
x = np.clip(x, -500, 500) |
|
|
self._output = 1.0 / (1.0 + np.exp(-x)) |
|
|
return self._output |
|
|
|
|
|
def backward(self, grad_output: np.ndarray) -> np.ndarray: |
|
|
return grad_output * self._output * (1.0 - self._output) |
|
|
|
|
|
def get_params(self) -> List: |
|
|
return [] |
|
|
|
|
|
def zero_grad(self): |
|
|
pass |
|
|
|
|
|
|
|
|
class Tanh: |
|
|
"""Hyperbolic tangent activation.""" |
|
|
|
|
|
def __init__(self): |
|
|
self._output = None |
|
|
|
|
|
def forward(self, x: np.ndarray) -> np.ndarray: |
|
|
self._output = np.tanh(x) |
|
|
return self._output |
|
|
|
|
|
def backward(self, grad_output: np.ndarray) -> np.ndarray: |
|
|
return grad_output * (1.0 - self._output ** 2) |
|
|
|
|
|
def get_params(self) -> List: |
|
|
return [] |
|
|
|
|
|
def zero_grad(self): |
|
|
pass |
|
|
|
|
|
|
|
|
class Softmax: |
|
|
"""Softmax activation for probability outputs.""" |
|
|
|
|
|
def __init__(self, axis: int = -1): |
|
|
self.axis = axis |
|
|
self._output = None |
|
|
|
|
|
def forward(self, x: np.ndarray) -> np.ndarray: |
|
|
x_max = np.max(x, axis=self.axis, keepdims=True) |
|
|
exp_x = np.exp(x - x_max) |
|
|
self._output = exp_x / np.sum(exp_x, axis=self.axis, keepdims=True) |
|
|
return self._output |
|
|
|
|
|
def backward(self, grad_output: np.ndarray) -> np.ndarray: |
|
|
return grad_output * self._output * (1.0 - self._output) |
|
|
|
|
|
def get_params(self) -> List: |
|
|
return [] |
|
|
|
|
|
def zero_grad(self): |
|
|
pass |
|
|
|
|
|
|
|
|
class Dropout: |
|
|
"""Dropout regularization layer.""" |
|
|
|
|
|
def __init__(self, p: float = 0.5): |
|
|
self.p = p |
|
|
self._mask = None |
|
|
self.training = True |
|
|
|
|
|
def forward(self, x: np.ndarray) -> np.ndarray: |
|
|
if not self.training: |
|
|
return x |
|
|
|
|
|
self._mask = (np.random.random(x.shape) > self.p).astype(np.float32) |
|
|
return x * self._mask / (1.0 - self.p) |
|
|
|
|
|
def backward(self, grad_output: np.ndarray) -> np.ndarray: |
|
|
if not self.training: |
|
|
return grad_output |
|
|
return grad_output * self._mask / (1.0 - self.p) |
|
|
|
|
|
def get_params(self) -> List: |
|
|
return [] |
|
|
|
|
|
def zero_grad(self): |
|
|
pass |
|
|
|
|
|
|
|
|
class BatchNorm1d: |
|
|
"""Batch normalization for 1D inputs.""" |
|
|
|
|
|
def __init__(self, num_features: int, eps: float = 1e-5, momentum: float = 0.1): |
|
|
self.num_features = num_features |
|
|
self.eps = eps |
|
|
self.momentum = momentum |
|
|
|
|
|
self.gamma = np.ones(num_features, dtype=np.float32) |
|
|
self.beta = np.zeros(num_features, dtype=np.float32) |
|
|
|
|
|
self.running_mean = np.zeros(num_features, dtype=np.float32) |
|
|
self.running_var = np.ones(num_features, dtype=np.float32) |
|
|
|
|
|
self.gamma_grad = np.zeros_like(self.gamma) |
|
|
self.beta_grad = np.zeros_like(self.beta) |
|
|
|
|
|
self._cache = None |
|
|
self.training = True |
|
|
|
|
|
def forward(self, x: np.ndarray) -> np.ndarray: |
|
|
if self.training: |
|
|
mean = np.mean(x, axis=0) |
|
|
var = np.var(x, axis=0) |
|
|
|
|
|
self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * mean |
|
|
self.running_var = (1 - self.momentum) * self.running_var + self.momentum * var |
|
|
|
|
|
x_norm = (x - mean) / np.sqrt(var + self.eps) |
|
|
self._cache = (x, x_norm, mean, var) |
|
|
else: |
|
|
x_norm = (x - self.running_mean) / np.sqrt(self.running_var + self.eps) |
|
|
|
|
|
return self.gamma * x_norm + self.beta |
|
|
|
|
|
def backward(self, grad_output: np.ndarray) -> np.ndarray: |
|
|
x, x_norm, mean, var = self._cache |
|
|
batch_size = x.shape[0] |
|
|
|
|
|
self.gamma_grad = np.sum(grad_output * x_norm, axis=0) |
|
|
self.beta_grad = np.sum(grad_output, axis=0) |
|
|
|
|
|
dx_norm = grad_output * self.gamma |
|
|
dvar = np.sum(dx_norm * (x - mean) * -0.5 * (var + self.eps) ** -1.5, axis=0) |
|
|
dmean = np.sum(dx_norm * -1 / np.sqrt(var + self.eps), axis=0) |
|
|
dmean += dvar * np.mean(-2 * (x - mean), axis=0) |
|
|
|
|
|
dx = dx_norm / np.sqrt(var + self.eps) |
|
|
dx += dvar * 2 * (x - mean) / batch_size |
|
|
dx += dmean / batch_size |
|
|
|
|
|
return dx |
|
|
|
|
|
def get_params(self) -> List[Tuple[np.ndarray, np.ndarray]]: |
|
|
return [(self.gamma, self.gamma_grad), (self.beta, self.beta_grad)] |
|
|
|
|
|
def zero_grad(self): |
|
|
self.gamma_grad.fill(0) |
|
|
self.beta_grad.fill(0) |
|
|
|
|
|
|
|
|
class Sequential: |
|
|
"""Sequential container for neural network layers.""" |
|
|
|
|
|
def __init__(self, layers: List = None): |
|
|
self.layers = layers if layers is not None else [] |
|
|
self.training = True |
|
|
|
|
|
def add(self, layer) -> 'Sequential': |
|
|
self.layers.append(layer) |
|
|
return self |
|
|
|
|
|
def forward(self, x: np.ndarray) -> np.ndarray: |
|
|
for layer in self.layers: |
|
|
if hasattr(layer, 'training'): |
|
|
layer.training = self.training |
|
|
x = layer.forward(x) |
|
|
return x |
|
|
|
|
|
def backward(self, grad: np.ndarray) -> np.ndarray: |
|
|
for layer in reversed(self.layers): |
|
|
grad = layer.backward(grad) |
|
|
return grad |
|
|
|
|
|
def get_params(self) -> List[Tuple[np.ndarray, np.ndarray]]: |
|
|
params = [] |
|
|
for layer in self.layers: |
|
|
params.extend(layer.get_params()) |
|
|
return params |
|
|
|
|
|
def zero_grad(self): |
|
|
for layer in self.layers: |
|
|
layer.zero_grad() |
|
|
|
|
|
def train(self): |
|
|
self.training = True |
|
|
for layer in self.layers: |
|
|
if hasattr(layer, 'training'): |
|
|
layer.training = True |
|
|
|
|
|
def eval(self): |
|
|
self.training = False |
|
|
for layer in self.layers: |
|
|
if hasattr(layer, 'training'): |
|
|
layer.training = False |
|
|
|
|
|
def __call__(self, x: np.ndarray) -> np.ndarray: |
|
|
return self.forward(x) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MSELoss: |
|
|
"""Mean Squared Error loss.""" |
|
|
|
|
|
def __init__(self, reduction: str = 'mean'): |
|
|
self.reduction = reduction |
|
|
self._pred = None |
|
|
self._target = None |
|
|
|
|
|
def forward(self, pred: np.ndarray, target: np.ndarray) -> float: |
|
|
self._pred = pred |
|
|
self._target = target |
|
|
|
|
|
diff = pred - target |
|
|
loss = diff ** 2 |
|
|
|
|
|
if self.reduction == 'mean': |
|
|
return float(np.mean(loss)) |
|
|
elif self.reduction == 'sum': |
|
|
return float(np.sum(loss)) |
|
|
else: |
|
|
return loss |
|
|
|
|
|
def backward(self) -> np.ndarray: |
|
|
grad = 2.0 * (self._pred - self._target) |
|
|
|
|
|
if self.reduction == 'mean': |
|
|
grad /= self._pred.size |
|
|
|
|
|
return grad |
|
|
|
|
|
def __call__(self, pred: np.ndarray, target: np.ndarray) -> float: |
|
|
return self.forward(pred, target) |
|
|
|
|
|
|
|
|
class HuberLoss: |
|
|
"""Huber loss (smooth L1 loss).""" |
|
|
|
|
|
def __init__(self, delta: float = 1.0, reduction: str = 'mean'): |
|
|
self.delta = delta |
|
|
self.reduction = reduction |
|
|
self._pred = None |
|
|
self._target = None |
|
|
self._diff = None |
|
|
|
|
|
def forward(self, pred: np.ndarray, target: np.ndarray) -> float: |
|
|
self._pred = pred |
|
|
self._target = target |
|
|
self._diff = pred - target |
|
|
|
|
|
abs_diff = np.abs(self._diff) |
|
|
|
|
|
quadratic = np.minimum(abs_diff, self.delta) |
|
|
linear = abs_diff - quadratic |
|
|
|
|
|
loss = 0.5 * quadratic ** 2 + self.delta * linear |
|
|
|
|
|
if self.reduction == 'mean': |
|
|
return float(np.mean(loss)) |
|
|
elif self.reduction == 'sum': |
|
|
return float(np.sum(loss)) |
|
|
else: |
|
|
return loss |
|
|
|
|
|
def backward(self) -> np.ndarray: |
|
|
abs_diff = np.abs(self._diff) |
|
|
|
|
|
grad = np.where( |
|
|
abs_diff <= self.delta, |
|
|
self._diff, |
|
|
self.delta * np.sign(self._diff) |
|
|
) |
|
|
|
|
|
if self.reduction == 'mean': |
|
|
grad /= self._pred.size |
|
|
|
|
|
return grad |
|
|
|
|
|
def __call__(self, pred: np.ndarray, target: np.ndarray) -> float: |
|
|
return self.forward(pred, target) |
|
|
|
|
|
|
|
|
class CrossEntropyLoss: |
|
|
"""Cross entropy loss for classification.""" |
|
|
|
|
|
def __init__(self, reduction: str = 'mean'): |
|
|
self.reduction = reduction |
|
|
self._probs = None |
|
|
self._target = None |
|
|
|
|
|
def forward(self, logits: np.ndarray, target: np.ndarray) -> float: |
|
|
max_logits = np.max(logits, axis=-1, keepdims=True) |
|
|
exp_logits = np.exp(logits - max_logits) |
|
|
self._probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True) |
|
|
|
|
|
self._target = target |
|
|
|
|
|
if target.ndim == 1: |
|
|
batch_size = logits.shape[0] |
|
|
log_probs = np.log(self._probs[np.arange(batch_size), target] + 1e-10) |
|
|
else: |
|
|
log_probs = np.sum(target * np.log(self._probs + 1e-10), axis=-1) |
|
|
|
|
|
loss = -log_probs |
|
|
|
|
|
if self.reduction == 'mean': |
|
|
return float(np.mean(loss)) |
|
|
elif self.reduction == 'sum': |
|
|
return float(np.sum(loss)) |
|
|
else: |
|
|
return loss |
|
|
|
|
|
def backward(self) -> np.ndarray: |
|
|
grad = self._probs.copy() |
|
|
|
|
|
if self._target.ndim == 1: |
|
|
batch_size = grad.shape[0] |
|
|
grad[np.arange(batch_size), self._target] -= 1 |
|
|
else: |
|
|
grad -= self._target |
|
|
|
|
|
if self.reduction == 'mean': |
|
|
grad /= grad.shape[0] |
|
|
|
|
|
return grad |
|
|
|
|
|
def __call__(self, logits: np.ndarray, target: np.ndarray) -> float: |
|
|
return self.forward(logits, target) |
|
|
|
|
|
|
|
|
class SGD: |
|
|
"""Stochastic Gradient Descent optimizer.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
params: List[Tuple[np.ndarray, np.ndarray]], |
|
|
lr: float = 0.01, |
|
|
momentum: float = 0.0, |
|
|
weight_decay: float = 0.0 |
|
|
): |
|
|
self.params = params |
|
|
self.lr = lr |
|
|
self.momentum = momentum |
|
|
self.weight_decay = weight_decay |
|
|
|
|
|
self.velocity = [np.zeros_like(p[0]) for p in params] |
|
|
|
|
|
def step(self): |
|
|
for i, (param, grad) in enumerate(self.params): |
|
|
g = grad.copy() |
|
|
if self.weight_decay > 0: |
|
|
g = g + self.weight_decay * param |
|
|
|
|
|
if self.momentum > 0: |
|
|
self.velocity[i] = self.momentum * self.velocity[i] + g |
|
|
param[:] = param - self.lr * self.velocity[i] |
|
|
else: |
|
|
param[:] = param - self.lr * g |
|
|
|
|
|
def zero_grad(self): |
|
|
for _, grad in self.params: |
|
|
grad.fill(0) |
|
|
|
|
|
|
|
|
class Adam: |
|
|
"""Adam optimizer with momentum and adaptive learning rates.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
params: List[Tuple[np.ndarray, np.ndarray]], |
|
|
lr: float = 0.001, |
|
|
beta1: float = 0.9, |
|
|
beta2: float = 0.999, |
|
|
eps: float = 1e-8, |
|
|
weight_decay: float = 0.0 |
|
|
): |
|
|
self.params = params |
|
|
self.lr = lr |
|
|
self.beta1 = beta1 |
|
|
self.beta2 = beta2 |
|
|
self.eps = eps |
|
|
self.weight_decay = weight_decay |
|
|
|
|
|
self.m = [np.zeros_like(p[0]) for p in params] |
|
|
self.v = [np.zeros_like(p[0]) for p in params] |
|
|
self.t = 0 |
|
|
|
|
|
def step(self): |
|
|
self.t += 1 |
|
|
|
|
|
for i, (param, grad) in enumerate(self.params): |
|
|
g = grad.copy() |
|
|
if self.weight_decay > 0: |
|
|
g = g + self.weight_decay * param |
|
|
|
|
|
self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * g |
|
|
self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * (g ** 2) |
|
|
|
|
|
m_hat = self.m[i] / (1 - self.beta1 ** self.t) |
|
|
v_hat = self.v[i] / (1 - self.beta2 ** self.t) |
|
|
|
|
|
update = self.lr * m_hat / (np.sqrt(v_hat) + self.eps) |
|
|
param[:] = param - update |
|
|
|
|
|
def zero_grad(self): |
|
|
for _, grad in self.params: |
|
|
grad.fill(0) |
|
|
|
|
|
|
|
|
class RMSprop: |
|
|
"""RMSprop optimizer.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
params: List[Tuple[np.ndarray, np.ndarray]], |
|
|
lr: float = 0.01, |
|
|
alpha: float = 0.99, |
|
|
eps: float = 1e-8, |
|
|
weight_decay: float = 0.0 |
|
|
): |
|
|
self.params = params |
|
|
self.lr = lr |
|
|
self.alpha = alpha |
|
|
self.eps = eps |
|
|
self.weight_decay = weight_decay |
|
|
|
|
|
self.v = [np.zeros_like(p[0]) for p in params] |
|
|
|
|
|
def step(self): |
|
|
for i, (param, grad) in enumerate(self.params): |
|
|
g = grad.copy() |
|
|
if self.weight_decay > 0: |
|
|
g = g + self.weight_decay * param |
|
|
|
|
|
self.v[i] = self.alpha * self.v[i] + (1 - self.alpha) * (g ** 2) |
|
|
param[:] = param - self.lr * g / (np.sqrt(self.v[i]) + self.eps) |
|
|
|
|
|
def zero_grad(self): |
|
|
for _, grad in self.params: |
|
|
grad.fill(0) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ReplayBuffer: |
|
|
"""Basic experience replay buffer.""" |
|
|
|
|
|
def __init__(self, capacity: int, state_dim: int, seed: Optional[int] = None): |
|
|
self.capacity = capacity |
|
|
self.state_dim = state_dim |
|
|
|
|
|
self.states = np.zeros((capacity, state_dim), dtype=np.float32) |
|
|
self.actions = np.zeros(capacity, dtype=np.int64) |
|
|
self.rewards = np.zeros(capacity, dtype=np.float32) |
|
|
self.next_states = np.zeros((capacity, state_dim), dtype=np.float32) |
|
|
self.dones = np.zeros(capacity, dtype=np.float32) |
|
|
|
|
|
self.position = 0 |
|
|
self.size = 0 |
|
|
|
|
|
self.rng = np.random.RandomState(seed) |
|
|
|
|
|
def push( |
|
|
self, |
|
|
state: np.ndarray, |
|
|
action: int, |
|
|
reward: float, |
|
|
next_state: np.ndarray, |
|
|
done: bool |
|
|
): |
|
|
self.states[self.position] = state |
|
|
self.actions[self.position] = action |
|
|
self.rewards[self.position] = reward |
|
|
self.next_states[self.position] = next_state |
|
|
self.dones[self.position] = float(done) |
|
|
|
|
|
self.position = (self.position + 1) % self.capacity |
|
|
self.size = min(self.size + 1, self.capacity) |
|
|
|
|
|
def sample(self, batch_size: int) -> Tuple[np.ndarray, ...]: |
|
|
indices = self.rng.randint(0, self.size, size=batch_size) |
|
|
|
|
|
return ( |
|
|
self.states[indices], |
|
|
self.actions[indices], |
|
|
self.rewards[indices], |
|
|
self.next_states[indices], |
|
|
self.dones[indices] |
|
|
) |
|
|
|
|
|
def __len__(self) -> int: |
|
|
return self.size |
|
|
|
|
|
def is_ready(self, batch_size: int) -> bool: |
|
|
return self.size >= batch_size |
|
|
|
|
|
|
|
|
class SumTree: |
|
|
"""Sum tree data structure for efficient priority sampling.""" |
|
|
|
|
|
def __init__(self, capacity: int): |
|
|
self.capacity = capacity |
|
|
self.tree = np.zeros(2 * capacity - 1, dtype=np.float64) |
|
|
self.data_pointer = 0 |
|
|
|
|
|
def _propagate(self, idx: int, change: float): |
|
|
parent = (idx - 1) // 2 |
|
|
self.tree[parent] += change |
|
|
if parent != 0: |
|
|
self._propagate(parent, change) |
|
|
|
|
|
def _retrieve(self, idx: int, s: float) -> int: |
|
|
left = 2 * idx + 1 |
|
|
right = left + 1 |
|
|
|
|
|
if left >= len(self.tree): |
|
|
return idx |
|
|
|
|
|
if s <= self.tree[left]: |
|
|
return self._retrieve(left, s) |
|
|
else: |
|
|
return self._retrieve(right, s - self.tree[left]) |
|
|
|
|
|
def total(self) -> float: |
|
|
return self.tree[0] |
|
|
|
|
|
def update(self, idx: int, priority: float): |
|
|
change = priority - self.tree[idx] |
|
|
self.tree[idx] = priority |
|
|
self._propagate(idx, change) |
|
|
|
|
|
def get_leaf(self, s: float) -> Tuple[int, float]: |
|
|
idx = self._retrieve(0, s) |
|
|
data_idx = idx - self.capacity + 1 |
|
|
return data_idx, self.tree[idx] |
|
|
|
|
|
|
|
|
class PrioritizedReplayBuffer: |
|
|
"""Prioritized Experience Replay buffer using sum tree.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
capacity: int, |
|
|
state_dim: int, |
|
|
alpha: float = 0.6, |
|
|
beta: float = 0.4, |
|
|
beta_increment: float = 0.001, |
|
|
epsilon: float = 1e-6, |
|
|
seed: Optional[int] = None |
|
|
): |
|
|
self.capacity = capacity |
|
|
self.state_dim = state_dim |
|
|
self.alpha = alpha |
|
|
self.beta = beta |
|
|
self.beta_increment = beta_increment |
|
|
self.epsilon = epsilon |
|
|
|
|
|
self.tree = SumTree(capacity) |
|
|
|
|
|
self.states = np.zeros((capacity, state_dim), dtype=np.float32) |
|
|
self.actions = np.zeros(capacity, dtype=np.int64) |
|
|
self.rewards = np.zeros(capacity, dtype=np.float32) |
|
|
self.next_states = np.zeros((capacity, state_dim), dtype=np.float32) |
|
|
self.dones = np.zeros(capacity, dtype=np.float32) |
|
|
|
|
|
self.position = 0 |
|
|
self.size = 0 |
|
|
self.max_priority = 1.0 |
|
|
|
|
|
self.rng = np.random.RandomState(seed) |
|
|
|
|
|
def push( |
|
|
self, |
|
|
state: np.ndarray, |
|
|
action: int, |
|
|
reward: float, |
|
|
next_state: np.ndarray, |
|
|
done: bool |
|
|
): |
|
|
self.states[self.position] = state |
|
|
self.actions[self.position] = action |
|
|
self.rewards[self.position] = reward |
|
|
self.next_states[self.position] = next_state |
|
|
self.dones[self.position] = float(done) |
|
|
|
|
|
tree_idx = self.position + self.capacity - 1 |
|
|
self.tree.update(tree_idx, self.max_priority ** self.alpha) |
|
|
|
|
|
self.position = (self.position + 1) % self.capacity |
|
|
self.size = min(self.size + 1, self.capacity) |
|
|
|
|
|
def sample(self, batch_size: int) -> Tuple[np.ndarray, ...]: |
|
|
indices = np.zeros(batch_size, dtype=np.int64) |
|
|
priorities = np.zeros(batch_size, dtype=np.float64) |
|
|
|
|
|
segment = self.tree.total() / batch_size |
|
|
|
|
|
self.beta = min(1.0, self.beta + self.beta_increment) |
|
|
|
|
|
for i in range(batch_size): |
|
|
a = segment * i |
|
|
b = segment * (i + 1) |
|
|
s = self.rng.uniform(a, b) |
|
|
|
|
|
data_idx, priority = self.tree.get_leaf(s) |
|
|
indices[i] = data_idx |
|
|
priorities[i] = priority |
|
|
|
|
|
sampling_probs = priorities / self.tree.total() |
|
|
weights = (self.size * sampling_probs) ** (-self.beta) |
|
|
weights /= weights.max() |
|
|
weights = weights.astype(np.float32) |
|
|
|
|
|
return ( |
|
|
self.states[indices], |
|
|
self.actions[indices], |
|
|
self.rewards[indices], |
|
|
self.next_states[indices], |
|
|
self.dones[indices], |
|
|
indices, |
|
|
weights |
|
|
) |
|
|
|
|
|
def update_priorities(self, indices: np.ndarray, td_errors: np.ndarray): |
|
|
for idx, td_error in zip(indices, td_errors): |
|
|
priority = (np.abs(td_error) + self.epsilon) ** self.alpha |
|
|
self.max_priority = max(self.max_priority, priority) |
|
|
|
|
|
tree_idx = idx + self.capacity - 1 |
|
|
self.tree.update(tree_idx, priority) |
|
|
|
|
|
def __len__(self) -> int: |
|
|
return self.size |
|
|
|
|
|
def is_ready(self, batch_size: int) -> bool: |
|
|
return self.size >= batch_size |
|
|
|
|
|
|
|
|
class NStepReplayBuffer: |
|
|
"""N-step returns replay buffer.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
capacity: int, |
|
|
state_dim: int, |
|
|
n_steps: int = 3, |
|
|
gamma: float = 0.99, |
|
|
seed: Optional[int] = None |
|
|
): |
|
|
self.capacity = capacity |
|
|
self.state_dim = state_dim |
|
|
self.n_steps = n_steps |
|
|
self.gamma = gamma |
|
|
|
|
|
self.main_buffer = ReplayBuffer(capacity, state_dim, seed) |
|
|
|
|
|
self.n_step_buffer = deque(maxlen=n_steps) |
|
|
|
|
|
self.rng = np.random.RandomState(seed) |
|
|
|
|
|
def push( |
|
|
self, |
|
|
state: np.ndarray, |
|
|
action: int, |
|
|
reward: float, |
|
|
next_state: np.ndarray, |
|
|
done: bool |
|
|
): |
|
|
self.n_step_buffer.append((state, action, reward, next_state, done)) |
|
|
|
|
|
if len(self.n_step_buffer) == self.n_steps: |
|
|
n_step_return = 0.0 |
|
|
for i in range(self.n_steps): |
|
|
n_step_return += (self.gamma ** i) * self.n_step_buffer[i][2] |
|
|
|
|
|
first_state = self.n_step_buffer[0][0] |
|
|
first_action = self.n_step_buffer[0][1] |
|
|
last_next_state = self.n_step_buffer[-1][3] |
|
|
last_done = self.n_step_buffer[-1][4] |
|
|
|
|
|
self.main_buffer.push( |
|
|
first_state, |
|
|
first_action, |
|
|
n_step_return, |
|
|
last_next_state, |
|
|
last_done |
|
|
) |
|
|
|
|
|
if done: |
|
|
while len(self.n_step_buffer) > 0: |
|
|
n = len(self.n_step_buffer) |
|
|
n_step_return = 0.0 |
|
|
for i in range(n): |
|
|
n_step_return += (self.gamma ** i) * self.n_step_buffer[i][2] |
|
|
|
|
|
first_state = self.n_step_buffer[0][0] |
|
|
first_action = self.n_step_buffer[0][1] |
|
|
last_next_state = self.n_step_buffer[-1][3] |
|
|
|
|
|
self.main_buffer.push( |
|
|
first_state, |
|
|
first_action, |
|
|
n_step_return, |
|
|
last_next_state, |
|
|
True |
|
|
) |
|
|
|
|
|
self.n_step_buffer.popleft() |
|
|
|
|
|
def sample(self, batch_size: int) -> Tuple[np.ndarray, ...]: |
|
|
return self.main_buffer.sample(batch_size) |
|
|
|
|
|
def __len__(self) -> int: |
|
|
return len(self.main_buffer) |
|
|
|
|
|
def is_ready(self, batch_size: int) -> bool: |
|
|
return self.main_buffer.is_ready(batch_size) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EpsilonGreedy: |
|
|
"""Epsilon-greedy exploration strategy with decay.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
epsilon_start: float = 1.0, |
|
|
epsilon_end: float = 0.01, |
|
|
epsilon_decay: float = 0.995, |
|
|
decay_type: str = 'exponential', |
|
|
decay_steps: int = 10000, |
|
|
seed: Optional[int] = None |
|
|
): |
|
|
self.epsilon_start = epsilon_start |
|
|
self.epsilon_end = epsilon_end |
|
|
self.epsilon_decay = epsilon_decay |
|
|
self.decay_type = decay_type |
|
|
self.decay_steps = decay_steps |
|
|
|
|
|
self.epsilon = epsilon_start |
|
|
self.step_count = 0 |
|
|
|
|
|
self.rng = np.random.RandomState(seed) |
|
|
|
|
|
def get_action(self, q_values: np.ndarray, valid_actions: List[int] = None) -> int: |
|
|
if self.rng.random() < self.epsilon: |
|
|
if valid_actions is not None: |
|
|
return self.rng.choice(valid_actions) |
|
|
else: |
|
|
return self.rng.randint(0, len(q_values)) |
|
|
else: |
|
|
if valid_actions is not None: |
|
|
mask = np.full(len(q_values), -np.inf) |
|
|
mask[valid_actions] = 0 |
|
|
return int(np.argmax(q_values + mask)) |
|
|
else: |
|
|
return int(np.argmax(q_values)) |
|
|
|
|
|
def decay(self): |
|
|
self.step_count += 1 |
|
|
|
|
|
if self.decay_type == 'exponential': |
|
|
self.epsilon = max( |
|
|
self.epsilon_end, |
|
|
self.epsilon * self.epsilon_decay |
|
|
) |
|
|
elif self.decay_type == 'linear': |
|
|
self.epsilon = max( |
|
|
self.epsilon_end, |
|
|
self.epsilon_start - (self.epsilon_start - self.epsilon_end) * (self.step_count / self.decay_steps) |
|
|
) |
|
|
|
|
|
def reset(self): |
|
|
self.epsilon = self.epsilon_start |
|
|
self.step_count = 0 |
|
|
|
|
|
|
|
|
class DQNNetwork: |
|
|
"""Neural network for DQN Q-value estimation.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
state_dim: int, |
|
|
action_dim: int, |
|
|
hidden_dims: List[int] = None, |
|
|
activation: str = 'relu' |
|
|
): |
|
|
if hidden_dims is None: |
|
|
hidden_dims = [128, 128] |
|
|
|
|
|
self.state_dim = state_dim |
|
|
self.action_dim = action_dim |
|
|
self.hidden_dims = hidden_dims |
|
|
|
|
|
if activation == 'relu': |
|
|
activation_class = ReLU |
|
|
elif activation == 'leaky_relu': |
|
|
activation_class = LeakyReLU |
|
|
elif activation == 'tanh': |
|
|
activation_class = Tanh |
|
|
else: |
|
|
activation_class = ReLU |
|
|
|
|
|
layers = [] |
|
|
prev_dim = state_dim |
|
|
|
|
|
for hidden_dim in hidden_dims: |
|
|
layers.append(LinearLayer(prev_dim, hidden_dim, init_method='he')) |
|
|
layers.append(activation_class()) |
|
|
prev_dim = hidden_dim |
|
|
|
|
|
layers.append(LinearLayer(prev_dim, action_dim, init_method='xavier')) |
|
|
|
|
|
self.network = Sequential(layers) |
|
|
|
|
|
def forward(self, state: np.ndarray) -> np.ndarray: |
|
|
if state.ndim == 1: |
|
|
state = state.reshape(1, -1) |
|
|
return self.network.forward(state) |
|
|
|
|
|
def backward(self, grad: np.ndarray) -> np.ndarray: |
|
|
return self.network.backward(grad) |
|
|
|
|
|
def get_params(self) -> List[Tuple[np.ndarray, np.ndarray]]: |
|
|
return self.network.get_params() |
|
|
|
|
|
def zero_grad(self): |
|
|
self.network.zero_grad() |
|
|
|
|
|
def copy_from(self, other: 'DQNNetwork'): |
|
|
for (p1, _), (p2, _) in zip(self.get_params(), other.get_params()): |
|
|
p1[:] = p2 |
|
|
|
|
|
def soft_update(self, other: 'DQNNetwork', tau: float): |
|
|
for (p1, _), (p2, _) in zip(self.get_params(), other.get_params()): |
|
|
p1[:] = tau * p2 + (1 - tau) * p1 |
|
|
|
|
|
def __call__(self, state: np.ndarray) -> np.ndarray: |
|
|
return self.forward(state) |
|
|
|
|
|
|
|
|
class DuelingDQNNetwork: |
|
|
"""Dueling DQN network architecture.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
state_dim: int, |
|
|
action_dim: int, |
|
|
hidden_dims: List[int] = None |
|
|
): |
|
|
if hidden_dims is None: |
|
|
hidden_dims = [128, 128] |
|
|
|
|
|
self.state_dim = state_dim |
|
|
self.action_dim = action_dim |
|
|
|
|
|
layers = [] |
|
|
prev_dim = state_dim |
|
|
|
|
|
for hidden_dim in hidden_dims: |
|
|
layers.append(LinearLayer(prev_dim, hidden_dim, init_method='he')) |
|
|
layers.append(ReLU()) |
|
|
prev_dim = hidden_dim |
|
|
|
|
|
self.feature_network = Sequential(layers) |
|
|
|
|
|
self.value_stream = Sequential([ |
|
|
LinearLayer(prev_dim, 64, init_method='he'), |
|
|
ReLU(), |
|
|
LinearLayer(64, 1, init_method='xavier') |
|
|
]) |
|
|
|
|
|
self.advantage_stream = Sequential([ |
|
|
LinearLayer(prev_dim, 64, init_method='he'), |
|
|
ReLU(), |
|
|
LinearLayer(64, action_dim, init_method='xavier') |
|
|
]) |
|
|
|
|
|
def forward(self, state: np.ndarray) -> np.ndarray: |
|
|
if state.ndim == 1: |
|
|
state = state.reshape(1, -1) |
|
|
|
|
|
features = self.feature_network.forward(state) |
|
|
|
|
|
value = self.value_stream.forward(features) |
|
|
advantage = self.advantage_stream.forward(features) |
|
|
|
|
|
q_values = value + (advantage - np.mean(advantage, axis=1, keepdims=True)) |
|
|
|
|
|
return q_values |
|
|
|
|
|
def backward(self, grad: np.ndarray) -> np.ndarray: |
|
|
batch_size = grad.shape[0] |
|
|
|
|
|
grad_value = np.sum(grad, axis=1, keepdims=True) |
|
|
grad_advantage = grad - np.mean(grad, axis=1, keepdims=True) |
|
|
|
|
|
grad_features_v = self.value_stream.backward(grad_value) |
|
|
grad_features_a = self.advantage_stream.backward(grad_advantage) |
|
|
|
|
|
grad_features = grad_features_v + grad_features_a |
|
|
|
|
|
return self.feature_network.backward(grad_features) |
|
|
|
|
|
def get_params(self) -> List[Tuple[np.ndarray, np.ndarray]]: |
|
|
params = [] |
|
|
params.extend(self.feature_network.get_params()) |
|
|
params.extend(self.value_stream.get_params()) |
|
|
params.extend(self.advantage_stream.get_params()) |
|
|
return params |
|
|
|
|
|
def zero_grad(self): |
|
|
self.feature_network.zero_grad() |
|
|
self.value_stream.zero_grad() |
|
|
self.advantage_stream.zero_grad() |
|
|
|
|
|
def copy_from(self, other: 'DuelingDQNNetwork'): |
|
|
for (p1, _), (p2, _) in zip(self.get_params(), other.get_params()): |
|
|
p1[:] = p2 |
|
|
|
|
|
def soft_update(self, other: 'DuelingDQNNetwork', tau: float): |
|
|
for (p1, _), (p2, _) in zip(self.get_params(), other.get_params()): |
|
|
p1[:] = tau * p2 + (1 - tau) * p1 |
|
|
|
|
|
def __call__(self, state: np.ndarray) -> np.ndarray: |
|
|
return self.forward(state) |
|
|
|
|
|
|
|
|
class DQNAgent: |
|
|
"""Complete DQN Agent with vanilla, double, and dueling variants.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
state_dim: int, |
|
|
action_dim: int, |
|
|
hidden_dims: List[int] = None, |
|
|
lr: float = 0.001, |
|
|
gamma: float = 0.99, |
|
|
buffer_size: int = 100000, |
|
|
batch_size: int = 64, |
|
|
target_update_freq: int = 100, |
|
|
tau: float = 0.005, |
|
|
use_double: bool = True, |
|
|
use_dueling: bool = False, |
|
|
use_per: bool = False, |
|
|
n_steps: int = 1, |
|
|
epsilon_start: float = 1.0, |
|
|
epsilon_end: float = 0.01, |
|
|
epsilon_decay: float = 0.995, |
|
|
seed: Optional[int] = None |
|
|
): |
|
|
self.state_dim = state_dim |
|
|
self.action_dim = action_dim |
|
|
self.gamma = gamma |
|
|
self.batch_size = batch_size |
|
|
self.target_update_freq = target_update_freq |
|
|
self.tau = tau |
|
|
self.use_double = use_double |
|
|
self.use_dueling = use_dueling |
|
|
self.use_per = use_per |
|
|
self.n_steps = n_steps |
|
|
self.gamma_n = gamma ** n_steps |
|
|
|
|
|
if use_dueling: |
|
|
self.q_network = DuelingDQNNetwork(state_dim, action_dim, hidden_dims) |
|
|
self.target_network = DuelingDQNNetwork(state_dim, action_dim, hidden_dims) |
|
|
else: |
|
|
self.q_network = DQNNetwork(state_dim, action_dim, hidden_dims) |
|
|
self.target_network = DQNNetwork(state_dim, action_dim, hidden_dims) |
|
|
|
|
|
self.target_network.copy_from(self.q_network) |
|
|
|
|
|
self.optimizer = Adam(self.q_network.get_params(), lr=lr) |
|
|
self.loss_fn = HuberLoss() |
|
|
|
|
|
if use_per: |
|
|
self.buffer = PrioritizedReplayBuffer(buffer_size, state_dim, seed=seed) |
|
|
elif n_steps > 1: |
|
|
self.buffer = NStepReplayBuffer(buffer_size, state_dim, n_steps, gamma, seed) |
|
|
else: |
|
|
self.buffer = ReplayBuffer(buffer_size, state_dim, seed) |
|
|
|
|
|
self.exploration = EpsilonGreedy( |
|
|
epsilon_start, epsilon_end, epsilon_decay, |
|
|
decay_type='exponential', seed=seed |
|
|
) |
|
|
|
|
|
self.train_steps = 0 |
|
|
self.episodes = 0 |
|
|
|
|
|
self.metrics = { |
|
|
'losses': [], |
|
|
'q_values': [], |
|
|
'episode_rewards': [], |
|
|
'episode_lengths': [], |
|
|
'epsilon': [] |
|
|
} |
|
|
|
|
|
def select_action(self, state: np.ndarray, training: bool = True) -> int: |
|
|
q_values = self.q_network(state).flatten() |
|
|
|
|
|
if training: |
|
|
action = self.exploration.get_action(q_values) |
|
|
else: |
|
|
action = int(np.argmax(q_values)) |
|
|
|
|
|
return action |
|
|
|
|
|
def store_transition( |
|
|
self, |
|
|
state: np.ndarray, |
|
|
action: int, |
|
|
reward: float, |
|
|
next_state: np.ndarray, |
|
|
done: bool |
|
|
): |
|
|
self.buffer.push(state, action, reward, next_state, done) |
|
|
|
|
|
def train_step(self) -> Optional[float]: |
|
|
if not self.buffer.is_ready(self.batch_size): |
|
|
return None |
|
|
|
|
|
if self.use_per: |
|
|
states, actions, rewards, next_states, dones, indices, weights = self.buffer.sample(self.batch_size) |
|
|
else: |
|
|
states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size) |
|
|
weights = np.ones(self.batch_size, dtype=np.float32) |
|
|
|
|
|
|
|
|
current_q_all = self.q_network(states) |
|
|
current_q = current_q_all[np.arange(self.batch_size), actions] |
|
|
|
|
|
|
|
|
|
|
|
saved_caches = [] |
|
|
for layer in self.q_network.network.layers: |
|
|
if hasattr(layer, '_input_cache') and layer._input_cache is not None: |
|
|
saved_caches.append((layer, layer._input_cache.copy())) |
|
|
if hasattr(layer, '_mask') and layer._mask is not None: |
|
|
saved_caches.append((layer, '_mask', layer._mask.copy())) |
|
|
if hasattr(layer, '_output') and layer._output is not None: |
|
|
saved_caches.append((layer, '_output', layer._output.copy())) |
|
|
|
|
|
with np.errstate(all='ignore'): |
|
|
next_q_target = self.target_network(next_states) |
|
|
|
|
|
if self.use_double: |
|
|
next_q_online = self.q_network(next_states) |
|
|
best_actions = np.argmax(next_q_online, axis=1) |
|
|
next_q_max = next_q_target[np.arange(self.batch_size), best_actions] |
|
|
else: |
|
|
next_q_max = np.max(next_q_target, axis=1) |
|
|
|
|
|
|
|
|
for item in saved_caches: |
|
|
if len(item) == 2: |
|
|
layer, cache = item |
|
|
layer._input_cache = cache |
|
|
else: |
|
|
layer, attr, cache = item |
|
|
setattr(layer, attr, cache) |
|
|
|
|
|
gamma = self.gamma_n if self.n_steps > 1 else self.gamma |
|
|
target_q = rewards + gamma * next_q_max * (1 - dones) |
|
|
|
|
|
td_errors = current_q - target_q |
|
|
|
|
|
if self.use_per: |
|
|
self.buffer.update_priorities(indices, td_errors) |
|
|
|
|
|
weighted_td_errors = td_errors * weights |
|
|
loss = np.mean(weighted_td_errors ** 2) |
|
|
|
|
|
self.q_network.zero_grad() |
|
|
|
|
|
grad = np.zeros_like(current_q_all) |
|
|
grad[np.arange(self.batch_size), actions] = 2 * weighted_td_errors / self.batch_size |
|
|
|
|
|
self.q_network.backward(grad) |
|
|
|
|
|
self.optimizer.step() |
|
|
|
|
|
self.train_steps += 1 |
|
|
|
|
|
if self.train_steps % self.target_update_freq == 0: |
|
|
if self.tau < 1.0: |
|
|
self.target_network.soft_update(self.q_network, self.tau) |
|
|
else: |
|
|
self.target_network.copy_from(self.q_network) |
|
|
|
|
|
self.exploration.decay() |
|
|
|
|
|
self.metrics['losses'].append(loss) |
|
|
self.metrics['q_values'].append(float(np.mean(current_q))) |
|
|
self.metrics['epsilon'].append(self.exploration.epsilon) |
|
|
|
|
|
return loss |
|
|
|
|
|
def end_episode(self, total_reward: float, episode_length: int): |
|
|
self.episodes += 1 |
|
|
self.metrics['episode_rewards'].append(total_reward) |
|
|
self.metrics['episode_lengths'].append(episode_length) |
|
|
|
|
|
def save(self, filepath: str): |
|
|
state = { |
|
|
'q_network_params': [(p.copy(), g.copy()) for p, g in self.q_network.get_params()], |
|
|
'target_network_params': [(p.copy(), g.copy()) for p, g in self.target_network.get_params()], |
|
|
'train_steps': self.train_steps, |
|
|
'episodes': self.episodes, |
|
|
'epsilon': self.exploration.epsilon, |
|
|
'metrics': self.metrics, |
|
|
'config': { |
|
|
'state_dim': self.state_dim, |
|
|
'action_dim': self.action_dim, |
|
|
'gamma': self.gamma, |
|
|
'batch_size': self.batch_size, |
|
|
'use_double': self.use_double, |
|
|
'use_dueling': self.use_dueling, |
|
|
'use_per': self.use_per, |
|
|
'n_steps': self.n_steps |
|
|
} |
|
|
} |
|
|
|
|
|
with open(filepath, 'wb') as f: |
|
|
pickle.dump(state, f) |
|
|
|
|
|
def load(self, filepath: str): |
|
|
with open(filepath, 'rb') as f: |
|
|
state = pickle.load(f) |
|
|
|
|
|
for (p, g), (saved_p, saved_g) in zip(self.q_network.get_params(), state['q_network_params']): |
|
|
p[:] = saved_p |
|
|
g[:] = saved_g |
|
|
|
|
|
for (p, g), (saved_p, saved_g) in zip(self.target_network.get_params(), state['target_network_params']): |
|
|
p[:] = saved_p |
|
|
g[:] = saved_g |
|
|
|
|
|
self.train_steps = state['train_steps'] |
|
|
self.episodes = state['episodes'] |
|
|
self.exploration.epsilon = state['epsilon'] |
|
|
self.metrics = state['metrics'] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Trainer: |
|
|
"""Complete training loop with logging and checkpointing.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
agent: DQNAgent, |
|
|
env, |
|
|
eval_env=None, |
|
|
log_interval: int = 100, |
|
|
eval_interval: int = 1000, |
|
|
eval_episodes: int = 10, |
|
|
save_interval: int = 5000, |
|
|
checkpoint_dir: str = './checkpoints', |
|
|
early_stop_reward: float = None, |
|
|
early_stop_window: int = 100 |
|
|
): |
|
|
self.agent = agent |
|
|
self.env = env |
|
|
self.eval_env = eval_env if eval_env is not None else env |
|
|
self.log_interval = log_interval |
|
|
self.eval_interval = eval_interval |
|
|
self.eval_episodes = eval_episodes |
|
|
self.save_interval = save_interval |
|
|
self.checkpoint_dir = checkpoint_dir |
|
|
self.early_stop_reward = early_stop_reward |
|
|
self.early_stop_window = early_stop_window |
|
|
|
|
|
os.makedirs(checkpoint_dir, exist_ok=True) |
|
|
|
|
|
self.training_history = { |
|
|
'episode': [], |
|
|
'reward': [], |
|
|
'length': [], |
|
|
'loss': [], |
|
|
'epsilon': [], |
|
|
'eval_reward': [], |
|
|
'eval_length': [] |
|
|
} |
|
|
|
|
|
def train(self, num_episodes: int) -> Dict: |
|
|
start_time = time.time() |
|
|
total_steps = 0 |
|
|
best_eval_reward = float('-inf') |
|
|
|
|
|
recent_rewards = deque(maxlen=self.early_stop_window) |
|
|
|
|
|
for episode in range(num_episodes): |
|
|
state = self.env.reset() |
|
|
episode_reward = 0.0 |
|
|
episode_length = 0 |
|
|
episode_losses = [] |
|
|
done = False |
|
|
|
|
|
while not done: |
|
|
action = self.agent.select_action(state, training=True) |
|
|
next_state, reward, done, info = self.env.step(action) |
|
|
|
|
|
self.agent.store_transition(state, action, reward, next_state, done) |
|
|
|
|
|
loss = self.agent.train_step() |
|
|
if loss is not None: |
|
|
episode_losses.append(loss) |
|
|
|
|
|
state = next_state |
|
|
episode_reward += reward |
|
|
episode_length += 1 |
|
|
total_steps += 1 |
|
|
|
|
|
self.agent.end_episode(episode_reward, episode_length) |
|
|
recent_rewards.append(episode_reward) |
|
|
|
|
|
self.training_history['episode'].append(episode) |
|
|
self.training_history['reward'].append(episode_reward) |
|
|
self.training_history['length'].append(episode_length) |
|
|
self.training_history['loss'].append(np.mean(episode_losses) if episode_losses else 0) |
|
|
self.training_history['epsilon'].append(self.agent.exploration.epsilon) |
|
|
|
|
|
if episode % self.log_interval == 0: |
|
|
avg_reward = np.mean(list(recent_rewards)) |
|
|
avg_loss = np.mean(episode_losses) if episode_losses else 0 |
|
|
elapsed = time.time() - start_time |
|
|
|
|
|
print(f"Episode {episode:5d} | " |
|
|
f"Reward: {episode_reward:7.2f} | " |
|
|
f"Avg100: {avg_reward:7.2f} | " |
|
|
f"Loss: {avg_loss:.4f} | " |
|
|
f"Eps: {self.agent.exploration.epsilon:.3f} | " |
|
|
f"Steps: {total_steps:7d} | " |
|
|
f"Time: {elapsed:.1f}s") |
|
|
|
|
|
if episode % self.eval_interval == 0 and episode > 0: |
|
|
eval_reward, eval_length = self.evaluate() |
|
|
self.training_history['eval_reward'].append(eval_reward) |
|
|
self.training_history['eval_length'].append(eval_length) |
|
|
|
|
|
print(f" [EVAL] Avg Reward: {eval_reward:.2f} | Avg Length: {eval_length:.1f}") |
|
|
|
|
|
if eval_reward > best_eval_reward: |
|
|
best_eval_reward = eval_reward |
|
|
self.agent.save(os.path.join(self.checkpoint_dir, 'best_model.pkl')) |
|
|
|
|
|
if episode % self.save_interval == 0 and episode > 0: |
|
|
self.agent.save(os.path.join(self.checkpoint_dir, f'checkpoint_{episode}.pkl')) |
|
|
|
|
|
if self.early_stop_reward is not None: |
|
|
if len(recent_rewards) >= self.early_stop_window: |
|
|
if np.mean(recent_rewards) >= self.early_stop_reward: |
|
|
print(f"Early stopping: reached target reward {self.early_stop_reward}") |
|
|
break |
|
|
|
|
|
self.agent.save(os.path.join(self.checkpoint_dir, 'final_model.pkl')) |
|
|
|
|
|
return self.training_history |
|
|
|
|
|
def evaluate(self) -> Tuple[float, float]: |
|
|
total_rewards = [] |
|
|
total_lengths = [] |
|
|
|
|
|
for _ in range(self.eval_episodes): |
|
|
state = self.eval_env.reset() |
|
|
episode_reward = 0.0 |
|
|
episode_length = 0 |
|
|
done = False |
|
|
|
|
|
while not done: |
|
|
action = self.agent.select_action(state, training=False) |
|
|
next_state, reward, done, info = self.eval_env.step(action) |
|
|
|
|
|
state = next_state |
|
|
episode_reward += reward |
|
|
episode_length += 1 |
|
|
|
|
|
total_rewards.append(episode_reward) |
|
|
total_lengths.append(episode_length) |
|
|
|
|
|
return np.mean(total_rewards), np.mean(total_lengths) |
|
|
|
|
|
def save_history(self, filepath: str): |
|
|
with open(filepath, 'w') as f: |
|
|
json.dump(self.training_history, f, indent=2) |
|
|
|
|
|
def load_history(self, filepath: str): |
|
|
with open(filepath, 'r') as f: |
|
|
self.training_history = json.load(f) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Visualizer: |
|
|
"""Visualization utilities for training metrics and agent behavior.""" |
|
|
|
|
|
def __init__(self, save_dir: str = './plots'): |
|
|
self.save_dir = save_dir |
|
|
os.makedirs(save_dir, exist_ok=True) |
|
|
|
|
|
def plot_training_curves( |
|
|
self, |
|
|
history: Dict, |
|
|
filename: str = 'training_curves.txt' |
|
|
) -> str: |
|
|
output_lines = [] |
|
|
output_lines.append("=" * 80) |
|
|
output_lines.append("TRAINING CURVES (ASCII)") |
|
|
output_lines.append("=" * 80) |
|
|
|
|
|
output_lines.append("\nREWARD OVER EPISODES:") |
|
|
output_lines.append("-" * 60) |
|
|
rewards = history.get('reward', []) |
|
|
if rewards: |
|
|
self._ascii_plot(rewards, output_lines, width=60, height=15) |
|
|
|
|
|
output_lines.append("\nLOSS OVER EPISODES:") |
|
|
output_lines.append("-" * 60) |
|
|
losses = history.get('loss', []) |
|
|
if losses: |
|
|
self._ascii_plot(losses, output_lines, width=60, height=15) |
|
|
|
|
|
output_lines.append("\nEPSILON DECAY:") |
|
|
output_lines.append("-" * 60) |
|
|
epsilon = history.get('epsilon', []) |
|
|
if epsilon: |
|
|
self._ascii_plot(epsilon, output_lines, width=60, height=10) |
|
|
|
|
|
output_lines.append("\nSTATISTICS:") |
|
|
output_lines.append("-" * 60) |
|
|
if rewards: |
|
|
output_lines.append(f" Total Episodes: {len(rewards)}") |
|
|
output_lines.append(f" Max Reward: {max(rewards):.2f}") |
|
|
output_lines.append(f" Min Reward: {min(rewards):.2f}") |
|
|
output_lines.append(f" Mean Reward: {np.mean(rewards):.2f}") |
|
|
output_lines.append(f" Std Reward: {np.std(rewards):.2f}") |
|
|
output_lines.append(f" Final Avg (last 100): {np.mean(rewards[-100:]):.2f}") |
|
|
|
|
|
output = '\n'.join(output_lines) |
|
|
|
|
|
filepath = os.path.join(self.save_dir, filename) |
|
|
with open(filepath, 'w') as f: |
|
|
f.write(output) |
|
|
|
|
|
return output |
|
|
|
|
|
def _ascii_plot( |
|
|
self, |
|
|
data: List[float], |
|
|
output_lines: List[str], |
|
|
width: int = 60, |
|
|
height: int = 15 |
|
|
): |
|
|
if not data: |
|
|
output_lines.append(" No data to plot") |
|
|
return |
|
|
|
|
|
data = np.array(data) |
|
|
|
|
|
if len(data) > width: |
|
|
step = len(data) // width |
|
|
data = [np.mean(data[i:i+step]) for i in range(0, len(data), step)][:width] |
|
|
data = np.array(data) |
|
|
|
|
|
min_val = np.min(data) |
|
|
max_val = np.max(data) |
|
|
|
|
|
if max_val == min_val: |
|
|
max_val = min_val + 1 |
|
|
|
|
|
normalized = ((data - min_val) / (max_val - min_val) * (height - 1)).astype(int) |
|
|
|
|
|
grid = [[' ' for _ in range(len(data))] for _ in range(height)] |
|
|
|
|
|
for x, y in enumerate(normalized): |
|
|
grid[height - 1 - y][x] = '*' |
|
|
|
|
|
output_lines.append(f" {max_val:10.3f} |") |
|
|
for row in grid: |
|
|
output_lines.append(f" |{''.join(row)}") |
|
|
output_lines.append(f" {min_val:10.3f} |{'_' * len(data)}") |
|
|
output_lines.append(f" 0{' ' * (len(data) - 6)}{len(data)}") |
|
|
|
|
|
def plot_q_values_heatmap( |
|
|
self, |
|
|
agent: DQNAgent, |
|
|
env, |
|
|
filename: str = 'q_values.txt' |
|
|
) -> str: |
|
|
output_lines = [] |
|
|
output_lines.append("=" * 80) |
|
|
output_lines.append("Q-VALUES HEATMAP") |
|
|
output_lines.append("=" * 80) |
|
|
|
|
|
if not hasattr(env, 'height') or not hasattr(env, 'width'): |
|
|
output_lines.append("Environment doesn't support grid visualization") |
|
|
return '\n'.join(output_lines) |
|
|
|
|
|
action_names = ['UP', 'DOWN', 'LEFT', 'RIGHT'] |
|
|
|
|
|
for action_idx, action_name in enumerate(action_names): |
|
|
output_lines.append(f"\nQ-VALUES FOR ACTION: {action_name}") |
|
|
output_lines.append("-" * 40) |
|
|
|
|
|
q_grid = np.zeros((env.height, env.width)) |
|
|
|
|
|
for row in range(env.height): |
|
|
for col in range(env.width): |
|
|
state = np.zeros((env.height, env.width), dtype=np.float32) |
|
|
state[row, col] = 4 |
|
|
state_flat = state.flatten() |
|
|
|
|
|
q_values = agent.q_network(state_flat).flatten() |
|
|
q_grid[row, col] = q_values[action_idx] |
|
|
|
|
|
min_q = np.min(q_grid) |
|
|
max_q = np.max(q_grid) |
|
|
|
|
|
symbols = ' ░▒▓█' |
|
|
|
|
|
for row in range(env.height): |
|
|
line = " " |
|
|
for col in range(env.width): |
|
|
if max_q != min_q: |
|
|
normalized = (q_grid[row, col] - min_q) / (max_q - min_q) |
|
|
else: |
|
|
normalized = 0.5 |
|
|
idx = min(int(normalized * (len(symbols) - 1)), len(symbols) - 1) |
|
|
line += symbols[idx] + ' ' |
|
|
output_lines.append(line) |
|
|
|
|
|
output_lines.append(f" Min: {min_q:.3f} | Max: {max_q:.3f}") |
|
|
|
|
|
output = '\n'.join(output_lines) |
|
|
|
|
|
filepath = os.path.join(self.save_dir, filename) |
|
|
with open(filepath, 'w') as f: |
|
|
f.write(output) |
|
|
|
|
|
return output |
|
|
|
|
|
def record_episode( |
|
|
self, |
|
|
agent: DQNAgent, |
|
|
env, |
|
|
filename: str = 'episode_recording.txt' |
|
|
) -> str: |
|
|
output_lines = [] |
|
|
output_lines.append("=" * 80) |
|
|
output_lines.append("EPISODE RECORDING") |
|
|
output_lines.append("=" * 80) |
|
|
|
|
|
state = env.reset() |
|
|
done = False |
|
|
step = 0 |
|
|
total_reward = 0.0 |
|
|
|
|
|
while not done and step < 100: |
|
|
output_lines.append(f"\n--- Step {step} ---") |
|
|
|
|
|
render = env.render(mode='string') |
|
|
if render: |
|
|
output_lines.append(render) |
|
|
|
|
|
q_values = agent.q_network(state).flatten() |
|
|
action = int(np.argmax(q_values)) |
|
|
|
|
|
output_lines.append(f"Q-values: {q_values}") |
|
|
output_lines.append(f"Action: {env.action_names[action] if hasattr(env, 'action_names') else action}") |
|
|
|
|
|
next_state, reward, done, info = env.step(action) |
|
|
total_reward += reward |
|
|
|
|
|
output_lines.append(f"Reward: {reward:.2f} | Total: {total_reward:.2f}") |
|
|
|
|
|
state = next_state |
|
|
step += 1 |
|
|
|
|
|
output_lines.append(f"\n{'=' * 40}") |
|
|
output_lines.append(f"EPISODE COMPLETE") |
|
|
output_lines.append(f"Total Steps: {step}") |
|
|
output_lines.append(f"Total Reward: {total_reward:.2f}") |
|
|
output_lines.append(f"Final Info: {info}") |
|
|
|
|
|
output = '\n'.join(output_lines) |
|
|
|
|
|
filepath = os.path.join(self.save_dir, filename) |
|
|
with open(filepath, 'w') as f: |
|
|
f.write(output) |
|
|
|
|
|
return output |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HyperparameterSearch: |
|
|
"""Grid and random search for hyperparameter tuning.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
env_class, |
|
|
env_kwargs: Dict, |
|
|
param_grid: Dict, |
|
|
n_episodes: int = 100, |
|
|
eval_episodes: int = 10, |
|
|
n_trials: int = 10, |
|
|
seed: int = 42 |
|
|
): |
|
|
self.env_class = env_class |
|
|
self.env_kwargs = env_kwargs |
|
|
self.param_grid = param_grid |
|
|
self.n_episodes = n_episodes |
|
|
self.eval_episodes = eval_episodes |
|
|
self.n_trials = n_trials |
|
|
self.seed = seed |
|
|
|
|
|
self.results = [] |
|
|
self.best_params = None |
|
|
self.best_score = float('-inf') |
|
|
|
|
|
def _sample_params(self) -> Dict: |
|
|
params = {} |
|
|
for key, values in self.param_grid.items(): |
|
|
if isinstance(values, list): |
|
|
params[key] = np.random.choice(values) |
|
|
elif isinstance(values, tuple) and len(values) == 2: |
|
|
low, high = values |
|
|
if isinstance(low, float): |
|
|
params[key] = np.random.uniform(low, high) |
|
|
else: |
|
|
params[key] = np.random.randint(low, high + 1) |
|
|
else: |
|
|
params[key] = values |
|
|
return params |
|
|
|
|
|
def run_trial(self, params: Dict) -> float: |
|
|
np.random.seed(self.seed) |
|
|
|
|
|
env = self.env_class(**self.env_kwargs) |
|
|
eval_env = self.env_class(**self.env_kwargs) |
|
|
|
|
|
state_dim = env.n_states if hasattr(env, 'n_states') else env.state_dim |
|
|
action_dim = env.n_actions |
|
|
|
|
|
agent = DQNAgent( |
|
|
state_dim=state_dim, |
|
|
action_dim=action_dim, |
|
|
hidden_dims=params.get('hidden_dims', [64, 64]), |
|
|
lr=params.get('lr', 0.001), |
|
|
gamma=params.get('gamma', 0.99), |
|
|
buffer_size=params.get('buffer_size', 10000), |
|
|
batch_size=params.get('batch_size', 32), |
|
|
target_update_freq=params.get('target_update_freq', 100), |
|
|
use_double=params.get('use_double', True), |
|
|
use_dueling=params.get('use_dueling', False), |
|
|
epsilon_start=params.get('epsilon_start', 1.0), |
|
|
epsilon_end=params.get('epsilon_end', 0.01), |
|
|
epsilon_decay=params.get('epsilon_decay', 0.995), |
|
|
seed=self.seed |
|
|
) |
|
|
|
|
|
trainer = Trainer( |
|
|
agent, env, eval_env, |
|
|
log_interval=self.n_episodes + 1, |
|
|
eval_interval=self.n_episodes + 1, |
|
|
checkpoint_dir='/tmp/hp_search' |
|
|
) |
|
|
|
|
|
trainer.train(self.n_episodes) |
|
|
|
|
|
eval_reward, _ = trainer.evaluate() |
|
|
|
|
|
return eval_reward |
|
|
|
|
|
def search(self, method: str = 'random') -> Dict: |
|
|
print(f"Starting hyperparameter search ({method})") |
|
|
print("=" * 60) |
|
|
|
|
|
for trial in range(self.n_trials): |
|
|
params = self._sample_params() |
|
|
|
|
|
print(f"\nTrial {trial + 1}/{self.n_trials}") |
|
|
print(f"Params: {params}") |
|
|
|
|
|
try: |
|
|
score = self.run_trial(params) |
|
|
|
|
|
self.results.append({ |
|
|
'params': params, |
|
|
'score': score |
|
|
}) |
|
|
|
|
|
print(f"Score: {score:.2f}") |
|
|
|
|
|
if score > self.best_score: |
|
|
self.best_score = score |
|
|
self.best_params = params.copy() |
|
|
print(f" ** New best! **") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Trial failed: {e}") |
|
|
self.results.append({ |
|
|
'params': params, |
|
|
'score': float('-inf'), |
|
|
'error': str(e) |
|
|
}) |
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("SEARCH COMPLETE") |
|
|
print(f"Best Score: {self.best_score:.2f}") |
|
|
print(f"Best Params: {self.best_params}") |
|
|
|
|
|
return { |
|
|
'best_params': self.best_params, |
|
|
'best_score': self.best_score, |
|
|
'all_results': self.results |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_default_config() -> Dict: |
|
|
return { |
|
|
'env': { |
|
|
'type': 'gridworld', |
|
|
'width': 4, |
|
|
'height': 4, |
|
|
'mode': 'static', |
|
|
'max_steps': 50 |
|
|
}, |
|
|
'agent': { |
|
|
'hidden_dims': [150, 100], |
|
|
'lr': 0.001, |
|
|
'gamma': 0.9, |
|
|
'buffer_size': 1000, |
|
|
'batch_size': 200, |
|
|
'target_update_freq': 500, |
|
|
'tau': 1.0, |
|
|
'use_double': True, |
|
|
'use_dueling': False, |
|
|
'use_per': False, |
|
|
'n_steps': 1, |
|
|
'epsilon_start': 1.0, |
|
|
'epsilon_end': 0.1, |
|
|
'epsilon_decay': 0.9999 |
|
|
}, |
|
|
'training': { |
|
|
'num_episodes': 5000, |
|
|
'log_interval': 500, |
|
|
'eval_interval': 1000, |
|
|
'eval_episodes': 100, |
|
|
'save_interval': 1000, |
|
|
'checkpoint_dir': './checkpoints', |
|
|
'early_stop_reward': None, |
|
|
'early_stop_window': 100 |
|
|
}, |
|
|
'seed': 42 |
|
|
} |
|
|
|
|
|
|
|
|
def create_env(config: Dict): |
|
|
env_type = config['env']['type'] |
|
|
|
|
|
if env_type == 'gridworld': |
|
|
return GridWorld( |
|
|
width=config['env']['width'], |
|
|
height=config['env']['height'], |
|
|
mode=config['env'].get('mode', 'static'), |
|
|
max_steps=config['env']['max_steps'], |
|
|
seed=config.get('seed', None) |
|
|
) |
|
|
elif env_type == 'cartpole': |
|
|
return ContinuousCartPole( |
|
|
max_steps=config['env'].get('max_steps', 500), |
|
|
seed=config.get('seed', None) |
|
|
) |
|
|
else: |
|
|
raise ValueError(f"Unknown environment type: {env_type}") |
|
|
|
|
|
|
|
|
def create_agent(config: Dict, state_dim: int, action_dim: int) -> DQNAgent: |
|
|
agent_config = config['agent'] |
|
|
|
|
|
return DQNAgent( |
|
|
state_dim=state_dim, |
|
|
action_dim=action_dim, |
|
|
hidden_dims=agent_config['hidden_dims'], |
|
|
lr=agent_config['lr'], |
|
|
gamma=agent_config['gamma'], |
|
|
buffer_size=agent_config['buffer_size'], |
|
|
batch_size=agent_config['batch_size'], |
|
|
target_update_freq=agent_config['target_update_freq'], |
|
|
tau=agent_config['tau'], |
|
|
use_double=agent_config['use_double'], |
|
|
use_dueling=agent_config['use_dueling'], |
|
|
use_per=agent_config['use_per'], |
|
|
n_steps=agent_config['n_steps'], |
|
|
epsilon_start=agent_config['epsilon_start'], |
|
|
epsilon_end=agent_config['epsilon_end'], |
|
|
epsilon_decay=agent_config['epsilon_decay'], |
|
|
seed=config.get('seed', None) |
|
|
) |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description='Complete RL Training Script') |
|
|
|
|
|
parser.add_argument('--env', type=str, default='gridworld', |
|
|
choices=['gridworld', 'cartpole'], |
|
|
help='Environment type') |
|
|
parser.add_argument('--episodes', type=int, default=5000, |
|
|
help='Number of training episodes') |
|
|
parser.add_argument('--lr', type=float, default=0.001, |
|
|
help='Learning rate') |
|
|
parser.add_argument('--gamma', type=float, default=0.9, |
|
|
help='Discount factor') |
|
|
parser.add_argument('--batch-size', type=int, default=200, |
|
|
help='Batch size') |
|
|
parser.add_argument('--buffer-size', type=int, default=1000, |
|
|
help='Replay buffer size') |
|
|
parser.add_argument('--hidden-dims', type=int, nargs='+', default=[150, 100], |
|
|
help='Hidden layer dimensions') |
|
|
parser.add_argument('--double', action='store_true', default=True, |
|
|
help='Use Double DQN') |
|
|
parser.add_argument('--dueling', action='store_true', default=False, |
|
|
help='Use Dueling DQN') |
|
|
parser.add_argument('--per', action='store_true', default=False, |
|
|
help='Use Prioritized Experience Replay') |
|
|
parser.add_argument('--n-steps', type=int, default=1, |
|
|
help='N-step returns') |
|
|
parser.add_argument('--seed', type=int, default=42, |
|
|
help='Random seed') |
|
|
parser.add_argument('--checkpoint-dir', type=str, default='./checkpoints', |
|
|
help='Checkpoint directory') |
|
|
parser.add_argument('--load', type=str, default=None, |
|
|
help='Load model from path') |
|
|
parser.add_argument('--eval-only', action='store_true', |
|
|
help='Only run evaluation') |
|
|
parser.add_argument('--visualize', action='store_true', |
|
|
help='Generate visualizations after training') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
np.random.seed(args.seed) |
|
|
|
|
|
config = create_default_config() |
|
|
config['env']['type'] = args.env |
|
|
config['agent']['lr'] = args.lr |
|
|
config['agent']['gamma'] = args.gamma |
|
|
config['agent']['batch_size'] = args.batch_size |
|
|
config['agent']['buffer_size'] = args.buffer_size |
|
|
config['agent']['hidden_dims'] = args.hidden_dims |
|
|
config['agent']['use_double'] = args.double |
|
|
config['agent']['use_dueling'] = args.dueling |
|
|
config['agent']['use_per'] = args.per |
|
|
config['agent']['n_steps'] = args.n_steps |
|
|
config['training']['num_episodes'] = args.episodes |
|
|
config['training']['checkpoint_dir'] = args.checkpoint_dir |
|
|
config['seed'] = args.seed |
|
|
|
|
|
print("=" * 60) |
|
|
print("REINFORCEMENT LEARNING TRAINING") |
|
|
print("=" * 60) |
|
|
print(f"Environment: {args.env}") |
|
|
print(f"Episodes: {args.episodes}") |
|
|
print(f"Learning Rate: {args.lr}") |
|
|
print(f"Gamma: {args.gamma}") |
|
|
print(f"Double DQN: {args.double}") |
|
|
print(f"Dueling DQN: {args.dueling}") |
|
|
print(f"PER: {args.per}") |
|
|
print(f"N-Steps: {args.n_steps}") |
|
|
print("=" * 60) |
|
|
|
|
|
env = create_env(config) |
|
|
eval_env = create_env(config) |
|
|
|
|
|
state_dim = env.state_dim |
|
|
action_dim = env.n_actions |
|
|
|
|
|
print(f"State Dim: {state_dim}") |
|
|
print(f"Action Dim: {action_dim}") |
|
|
print("=" * 60) |
|
|
|
|
|
agent = create_agent(config, state_dim, action_dim) |
|
|
|
|
|
if args.load: |
|
|
print(f"Loading model from: {args.load}") |
|
|
agent.load(args.load) |
|
|
|
|
|
if args.eval_only: |
|
|
print("Running evaluation only...") |
|
|
trainer = Trainer(agent, env, eval_env, checkpoint_dir=args.checkpoint_dir) |
|
|
eval_reward, eval_length = trainer.evaluate() |
|
|
print(f"Evaluation Results:") |
|
|
print(f" Avg Reward: {eval_reward:.2f}") |
|
|
print(f" Avg Length: {eval_length:.1f}") |
|
|
return |
|
|
|
|
|
trainer = Trainer( |
|
|
agent, env, eval_env, |
|
|
log_interval=config['training']['log_interval'], |
|
|
eval_interval=config['training']['eval_interval'], |
|
|
eval_episodes=config['training']['eval_episodes'], |
|
|
save_interval=config['training']['save_interval'], |
|
|
checkpoint_dir=config['training']['checkpoint_dir'], |
|
|
early_stop_reward=config['training']['early_stop_reward'], |
|
|
early_stop_window=config['training']['early_stop_window'] |
|
|
) |
|
|
|
|
|
print("\nStarting training...") |
|
|
history = trainer.train(config['training']['num_episodes']) |
|
|
|
|
|
trainer.save_history(os.path.join(args.checkpoint_dir, 'training_history.json')) |
|
|
|
|
|
if args.visualize: |
|
|
print("\nGenerating visualizations...") |
|
|
viz = Visualizer(save_dir=args.checkpoint_dir) |
|
|
|
|
|
training_curves = viz.plot_training_curves(history) |
|
|
print(training_curves) |
|
|
|
|
|
if args.env == 'gridworld': |
|
|
q_heatmap = viz.plot_q_values_heatmap(agent, env) |
|
|
print(q_heatmap) |
|
|
|
|
|
episode_recording = viz.record_episode(agent, eval_env) |
|
|
print(episode_recording) |
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("TRAINING COMPLETE") |
|
|
print("=" * 60) |
|
|
|
|
|
final_eval_reward, final_eval_length = trainer.evaluate() |
|
|
print(f"Final Evaluation:") |
|
|
print(f" Avg Reward: {final_eval_reward:.2f}") |
|
|
print(f" Avg Length: {final_eval_length:.1f}") |
|
|
|
|
|
if history['reward']: |
|
|
print(f"\nTraining Statistics:") |
|
|
print(f" Total Episodes: {len(history['reward'])}") |
|
|
print(f" Best Reward: {max(history['reward']):.2f}") |
|
|
print(f" Final Avg (last 100): {np.mean(history['reward'][-100:]):.2f}") |
|
|
|
|
|
print(f"\nCheckpoints saved to: {args.checkpoint_dir}") |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PPOBuffer: |
|
|
"""GAE buffer za PPO""" |
|
|
|
|
|
def __init__(self, state_dim: int, size: int, gamma: float = 0.99, lam: float = 0.95): |
|
|
self.states = np.zeros((size, state_dim), dtype=np.float32) |
|
|
self.actions = np.zeros(size, dtype=np.int32) |
|
|
self.rewards = np.zeros(size, dtype=np.float32) |
|
|
self.values = np.zeros(size, dtype=np.float32) |
|
|
self.log_probs = np.zeros(size, dtype=np.float32) |
|
|
self.advantages = np.zeros(size, dtype=np.float32) |
|
|
self.returns = np.zeros(size, dtype=np.float32) |
|
|
|
|
|
self.gamma = gamma |
|
|
self.lam = lam |
|
|
self.ptr = 0 |
|
|
self.path_start = 0 |
|
|
self.max_size = size |
|
|
|
|
|
def store(self, state, action, reward, value, log_prob): |
|
|
assert self.ptr < self.max_size |
|
|
self.states[self.ptr] = state |
|
|
self.actions[self.ptr] = action |
|
|
self.rewards[self.ptr] = reward |
|
|
self.values[self.ptr] = value |
|
|
self.log_probs[self.ptr] = log_prob |
|
|
self.ptr += 1 |
|
|
|
|
|
def finish_path(self, last_value: float = 0): |
|
|
"""Compute GAE advantages""" |
|
|
path_slice = slice(self.path_start, self.ptr) |
|
|
rewards = np.append(self.rewards[path_slice], last_value) |
|
|
values = np.append(self.values[path_slice], last_value) |
|
|
|
|
|
|
|
|
deltas = rewards[:-1] + self.gamma * values[1:] - values[:-1] |
|
|
self.advantages[path_slice] = self._discount_cumsum(deltas, self.gamma * self.lam) |
|
|
self.returns[path_slice] = self._discount_cumsum(rewards[:-1], self.gamma) |
|
|
|
|
|
self.path_start = self.ptr |
|
|
|
|
|
def _discount_cumsum(self, x, discount): |
|
|
n = len(x) |
|
|
out = np.zeros(n, dtype=np.float32) |
|
|
out[-1] = x[-1] |
|
|
for i in range(n - 2, -1, -1): |
|
|
out[i] = x[i] + discount * out[i + 1] |
|
|
return out |
|
|
|
|
|
def get(self): |
|
|
assert self.ptr == self.max_size |
|
|
self.ptr = 0 |
|
|
self.path_start = 0 |
|
|
|
|
|
|
|
|
adv_mean = np.mean(self.advantages) |
|
|
adv_std = np.std(self.advantages) + 1e-8 |
|
|
self.advantages = (self.advantages - adv_mean) / adv_std |
|
|
|
|
|
return { |
|
|
'states': self.states, |
|
|
'actions': self.actions, |
|
|
'returns': self.returns, |
|
|
'advantages': self.advantages, |
|
|
'log_probs': self.log_probs |
|
|
} |
|
|
|
|
|
|
|
|
class ActorCritic: |
|
|
"""Actor-Critic za PPO - čist numpy""" |
|
|
|
|
|
def __init__(self, state_dim: int, action_dim: int, hidden_dims: List[int] = [64, 64], lr: float = 3e-4): |
|
|
self.state_dim = state_dim |
|
|
self.action_dim = action_dim |
|
|
self.lr = lr |
|
|
|
|
|
|
|
|
dims = [state_dim] + hidden_dims |
|
|
self.shared_weights = [] |
|
|
self.shared_biases = [] |
|
|
|
|
|
for i in range(len(dims) - 1): |
|
|
w = np.random.randn(dims[i], dims[i + 1]).astype(np.float32) * np.sqrt(2.0 / dims[i]) |
|
|
b = np.zeros(dims[i + 1], dtype=np.float32) |
|
|
self.shared_weights.append(w) |
|
|
self.shared_biases.append(b) |
|
|
|
|
|
|
|
|
self.actor_w = np.random.randn(hidden_dims[-1], action_dim).astype(np.float32) * 0.01 |
|
|
self.actor_b = np.zeros(action_dim, dtype=np.float32) |
|
|
|
|
|
|
|
|
self.critic_w = np.random.randn(hidden_dims[-1], 1).astype(np.float32) * 1.0 |
|
|
self.critic_b = np.zeros(1, dtype=np.float32) |
|
|
|
|
|
|
|
|
self._init_adam() |
|
|
|
|
|
def _init_adam(self): |
|
|
self.t = 0 |
|
|
self.m = {} |
|
|
self.v = {} |
|
|
|
|
|
all_params = self.shared_weights + self.shared_biases + [self.actor_w, self.actor_b, self.critic_w, self.critic_b] |
|
|
for i, p in enumerate(all_params): |
|
|
self.m[i] = np.zeros_like(p) |
|
|
self.v[i] = np.zeros_like(p) |
|
|
|
|
|
def forward(self, state: np.ndarray): |
|
|
"""Forward pass""" |
|
|
x = state |
|
|
self.activations = [x] |
|
|
|
|
|
for w, b in zip(self.shared_weights, self.shared_biases): |
|
|
x = np.tanh(x @ w + b) |
|
|
self.activations.append(x) |
|
|
|
|
|
|
|
|
logits = x @ self.actor_w + self.actor_b |
|
|
|
|
|
|
|
|
value = (x @ self.critic_w + self.critic_b).squeeze() |
|
|
|
|
|
return logits, value |
|
|
|
|
|
def get_action(self, state: np.ndarray, deterministic: bool = False): |
|
|
"""Sample action from policy""" |
|
|
logits, value = self.forward(state) |
|
|
|
|
|
|
|
|
logits_max = np.max(logits, axis=-1, keepdims=True) |
|
|
exp_logits = np.exp(logits - logits_max) |
|
|
probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True) |
|
|
|
|
|
if deterministic: |
|
|
action = np.argmax(probs, axis=-1) |
|
|
else: |
|
|
if probs.ndim == 1: |
|
|
action = np.random.choice(self.action_dim, p=probs) |
|
|
else: |
|
|
action = np.array([np.random.choice(self.action_dim, p=p) for p in probs]) |
|
|
|
|
|
|
|
|
log_prob = np.log(probs[action] + 1e-8) if probs.ndim == 1 else np.log(probs[np.arange(len(action)), action] + 1e-8) |
|
|
|
|
|
return action, value, log_prob |
|
|
|
|
|
def evaluate_actions(self, states: np.ndarray, actions: np.ndarray): |
|
|
"""Evaluate log probs and values for given states/actions""" |
|
|
logits, values = self.forward(states) |
|
|
|
|
|
|
|
|
logits_max = np.max(logits, axis=-1, keepdims=True) |
|
|
exp_logits = np.exp(logits - logits_max) |
|
|
probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True) |
|
|
|
|
|
|
|
|
log_probs = np.log(probs[np.arange(len(actions)), actions] + 1e-8) |
|
|
|
|
|
|
|
|
entropy = -np.sum(probs * np.log(probs + 1e-8), axis=-1).mean() |
|
|
|
|
|
return log_probs, values, entropy |
|
|
|
|
|
|
|
|
class PPOAgent: |
|
|
"""Proximal Policy Optimization Agent""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
state_dim: int, |
|
|
action_dim: int, |
|
|
hidden_dims: List[int] = [64, 64], |
|
|
lr: float = 3e-4, |
|
|
gamma: float = 0.99, |
|
|
lam: float = 0.95, |
|
|
clip_ratio: float = 0.2, |
|
|
target_kl: float = 0.01, |
|
|
train_iters: int = 80, |
|
|
value_coef: float = 0.5, |
|
|
entropy_coef: float = 0.01, |
|
|
max_grad_norm: float = 0.5, |
|
|
seed: int = None |
|
|
): |
|
|
if seed is not None: |
|
|
np.random.seed(seed) |
|
|
|
|
|
self.state_dim = state_dim |
|
|
self.action_dim = action_dim |
|
|
self.gamma = gamma |
|
|
self.lam = lam |
|
|
self.clip_ratio = clip_ratio |
|
|
self.target_kl = target_kl |
|
|
self.train_iters = train_iters |
|
|
self.value_coef = value_coef |
|
|
self.entropy_coef = entropy_coef |
|
|
self.max_grad_norm = max_grad_norm |
|
|
|
|
|
self.actor_critic = ActorCritic(state_dim, action_dim, hidden_dims, lr) |
|
|
|
|
|
def get_action(self, state: np.ndarray, deterministic: bool = False): |
|
|
return self.actor_critic.get_action(state, deterministic) |
|
|
|
|
|
def update(self, buffer_data: Dict) -> Dict: |
|
|
"""PPO update""" |
|
|
states = buffer_data['states'] |
|
|
actions = buffer_data['actions'] |
|
|
old_log_probs = buffer_data['log_probs'] |
|
|
advantages = buffer_data['advantages'] |
|
|
returns = buffer_data['returns'] |
|
|
|
|
|
total_loss = 0 |
|
|
policy_loss = 0 |
|
|
value_loss = 0 |
|
|
|
|
|
for i in range(self.train_iters): |
|
|
log_probs, values, entropy = self.actor_critic.evaluate_actions(states, actions) |
|
|
|
|
|
|
|
|
ratio = np.exp(log_probs - old_log_probs) |
|
|
clip_adv = np.clip(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio) * advantages |
|
|
policy_loss = -np.mean(np.minimum(ratio * advantages, clip_adv)) |
|
|
|
|
|
|
|
|
value_loss = np.mean((values - returns) ** 2) |
|
|
|
|
|
|
|
|
loss = policy_loss + self.value_coef * value_loss - self.entropy_coef * entropy |
|
|
|
|
|
|
|
|
approx_kl = np.mean(old_log_probs - log_probs) |
|
|
if approx_kl > 1.5 * self.target_kl: |
|
|
break |
|
|
|
|
|
total_loss = loss |
|
|
|
|
|
|
|
|
|
|
|
self._update_params(states, actions, advantages, returns, old_log_probs) |
|
|
|
|
|
return { |
|
|
'loss': total_loss, |
|
|
'policy_loss': policy_loss, |
|
|
'value_loss': value_loss, |
|
|
'entropy': entropy, |
|
|
'kl': approx_kl |
|
|
} |
|
|
|
|
|
def _update_params(self, states, actions, advantages, returns, old_log_probs, eps=1e-4): |
|
|
"""Simplified parameter update using numerical gradients""" |
|
|
lr = self.actor_critic.lr |
|
|
|
|
|
|
|
|
for idx, w in enumerate(self.actor_critic.shared_weights): |
|
|
grad = np.zeros_like(w) |
|
|
|
|
|
for _ in range(min(10, w.size)): |
|
|
i, j = np.random.randint(0, w.shape[0]), np.random.randint(0, w.shape[1]) |
|
|
w[i, j] += eps |
|
|
loss_plus = self._compute_loss(states, actions, advantages, returns, old_log_probs) |
|
|
w[i, j] -= 2 * eps |
|
|
loss_minus = self._compute_loss(states, actions, advantages, returns, old_log_probs) |
|
|
w[i, j] += eps |
|
|
grad[i, j] = (loss_plus - loss_minus) / (2 * eps) |
|
|
|
|
|
|
|
|
grad_norm = np.linalg.norm(grad) |
|
|
if grad_norm > self.max_grad_norm: |
|
|
grad = grad * self.max_grad_norm / grad_norm |
|
|
|
|
|
w -= lr * grad |
|
|
|
|
|
def _compute_loss(self, states, actions, advantages, returns, old_log_probs): |
|
|
log_probs, values, entropy = self.actor_critic.evaluate_actions(states, actions) |
|
|
ratio = np.exp(log_probs - old_log_probs) |
|
|
clip_adv = np.clip(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio) * advantages |
|
|
policy_loss = -np.mean(np.minimum(ratio * advantages, clip_adv)) |
|
|
value_loss = np.mean((values - returns) ** 2) |
|
|
return policy_loss + self.value_coef * value_loss - self.entropy_coef * entropy |
|
|
|
|
|
def save(self, path: str): |
|
|
data = { |
|
|
'shared_weights': self.actor_critic.shared_weights, |
|
|
'shared_biases': self.actor_critic.shared_biases, |
|
|
'actor_w': self.actor_critic.actor_w, |
|
|
'actor_b': self.actor_critic.actor_b, |
|
|
'critic_w': self.actor_critic.critic_w, |
|
|
'critic_b': self.actor_critic.critic_b |
|
|
} |
|
|
with open(path, 'wb') as f: |
|
|
pickle.dump(data, f) |
|
|
|
|
|
def load(self, path: str): |
|
|
with open(path, 'rb') as f: |
|
|
data = pickle.load(f) |
|
|
self.actor_critic.shared_weights = data['shared_weights'] |
|
|
self.actor_critic.shared_biases = data['shared_biases'] |
|
|
self.actor_critic.actor_w = data['actor_w'] |
|
|
self.actor_critic.actor_b = data['actor_b'] |
|
|
self.actor_critic.critic_w = data['critic_w'] |
|
|
self.actor_critic.critic_b = data['critic_b'] |
|
|
|
|
|
|
|
|
def train_ppo(env, agent: PPOAgent, num_episodes: int = 1000, steps_per_epoch: int = 4000): |
|
|
"""PPO Training Loop""" |
|
|
buffer = PPOBuffer(agent.state_dim, steps_per_epoch, agent.gamma, agent.lam) |
|
|
|
|
|
state = env.reset() |
|
|
episode_reward = 0 |
|
|
episode_length = 0 |
|
|
episode_rewards = [] |
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
print("PPO TRAINING") |
|
|
print("=" * 60) |
|
|
|
|
|
for epoch in range(num_episodes // 10): |
|
|
for t in range(steps_per_epoch): |
|
|
action, value, log_prob = agent.get_action(state) |
|
|
next_state, reward, done, info = env.step(action) |
|
|
|
|
|
episode_reward += reward |
|
|
episode_length += 1 |
|
|
|
|
|
buffer.store(state, action, reward, value, log_prob) |
|
|
state = next_state |
|
|
|
|
|
epoch_ended = t == steps_per_epoch - 1 |
|
|
|
|
|
if done or epoch_ended: |
|
|
if epoch_ended and not done: |
|
|
_, last_value, _ = agent.get_action(state) |
|
|
else: |
|
|
last_value = 0 |
|
|
|
|
|
buffer.finish_path(last_value) |
|
|
|
|
|
if done: |
|
|
episode_rewards.append(episode_reward) |
|
|
episode_reward = 0 |
|
|
episode_length = 0 |
|
|
state = env.reset() |
|
|
|
|
|
|
|
|
data = buffer.get() |
|
|
update_info = agent.update(data) |
|
|
|
|
|
avg_reward = np.mean(episode_rewards[-10:]) if episode_rewards else 0 |
|
|
print(f"Epoch {epoch:4d} | Avg Reward: {avg_reward:8.2f} | Loss: {update_info['loss']:.4f} | KL: {update_info['kl']:.4f}") |
|
|
|
|
|
return episode_rewards |
|
|
|
|
|
|
|
|
print("\n✅ PPO Implementation Added!") |
|
|
print("Run with: python rl_complete.py --env gridworld --ppo") |
|
|
|