rl-from-scratch / rl_complete.py
Pista1981's picture
Complete RL: DQN + PPO (2788 lines, pure NumPy)
508ad65 verified
#!/usr/bin/env python3
"""
Complete Reinforcement Learning Implementation from Scratch
Author: Claude + Stevan
No external RL libraries - only numpy and standard library
"""
import numpy as np
import pickle
import os
import time
import argparse
from collections import deque
from typing import Tuple, List, Dict, Optional, Union, Callable
import struct
import json
# =============================================================================
# SECTION 1: CUSTOM ENVIRONMENTS (Lines 1-300)
# =============================================================================
class GridWorld:
"""
Custom GridWorld environment implemented from scratch.
Agent navigates grid to reach goal while avoiding obstacles.
FIXED: Now uses deterministic grid layout that persists across resets.
State representation includes noise for training stability.
Proper reward shaping: -1 per move, -10 pit/wall, +10 goal.
"""
EMPTY = 0
WALL = 1
GOAL = 2
PIT = 3
AGENT = 4
UP = 0
DOWN = 1
LEFT = 2
RIGHT = 3
def __init__(
self,
width: int = 4,
height: int = 4,
mode: str = 'static',
max_steps: int = 50,
seed: Optional[int] = None
):
self.width = width
self.height = height
self.mode = mode
self.max_steps = max_steps
self.n_states = width * height * 4
self.n_actions = 4
self.state_shape = (height, width, 4)
self.state_dim = self.n_states
self.action_names = ['UP', 'DOWN', 'LEFT', 'RIGHT']
self.action_deltas = {
self.UP: (-1, 0),
self.DOWN: (1, 0),
self.LEFT: (0, -1),
self.RIGHT: (0, 1)
}
self.rng = np.random.RandomState(seed)
self.initial_seed = seed
self.board = None
self.agent_pos = None
self.goal_pos = None
self.pit_pos = None
self.wall_pos = None
self.start_pos = None
self.step_count = 0
self.total_reward = 0.0
self.done = False
self._fixed_layout = None
self._generate_grid()
self._fixed_layout = self._save_layout()
def _save_layout(self) -> Dict:
return {
'board': self.board.copy(),
'goal_pos': self.goal_pos,
'pit_pos': self.pit_pos,
'wall_pos': self.wall_pos,
'start_pos': self.start_pos
}
def _restore_layout(self):
if self._fixed_layout is not None:
self.board = self._fixed_layout['board'].copy()
self.goal_pos = self._fixed_layout['goal_pos']
self.pit_pos = self._fixed_layout['pit_pos']
self.wall_pos = self._fixed_layout['wall_pos']
self.start_pos = self._fixed_layout['start_pos']
def _generate_grid(self) -> None:
self.board = np.zeros((4, self.height, self.width), dtype=np.float32)
self.start_pos = (0, 0)
self.agent_pos = list(self.start_pos)
if self.mode == 'static':
self.goal_pos = (self.height - 1, self.width - 1)
self.pit_pos = (self.height - 1, 1) if self.width > 2 else None
self.wall_pos = (1, 1) if self.width > 2 and self.height > 2 else None
else:
available = []
for i in range(self.height):
for j in range(self.width):
if (i, j) != self.start_pos:
available.append((i, j))
self.rng.shuffle(available)
self.goal_pos = available[0]
self.pit_pos = available[1] if len(available) > 1 else None
self.wall_pos = available[2] if len(available) > 2 else None
self.board[0, self.agent_pos[0], self.agent_pos[1]] = 1.0
self.board[1, self.goal_pos[0], self.goal_pos[1]] = 1.0
if self.pit_pos:
self.board[2, self.pit_pos[0], self.pit_pos[1]] = 1.0
if self.wall_pos:
self.board[3, self.wall_pos[0], self.wall_pos[1]] = 1.0
def reset(self, seed: Optional[int] = None) -> np.ndarray:
if self.mode == 'static' and self._fixed_layout is not None:
self._restore_layout()
elif seed is not None or self.mode == 'random':
if seed is not None:
self.rng = np.random.RandomState(seed)
self._generate_grid()
else:
self._restore_layout()
self.agent_pos = list(self.start_pos)
self.board[0] = 0.0
self.board[0, self.agent_pos[0], self.agent_pos[1]] = 1.0
self.step_count = 0
self.total_reward = 0.0
self.done = False
return self._get_state()
def _get_state(self) -> np.ndarray:
state = self.board.flatten().astype(np.float32)
noise = self.rng.rand(len(state)).astype(np.float32) / 100.0
return state + noise
def render_np(self) -> np.ndarray:
return self.board.copy()
def _is_valid_pos(self, pos: List[int]) -> bool:
row, col = pos
if row < 0 or row >= self.height:
return False
if col < 0 or col >= self.width:
return False
if self.wall_pos and (row, col) == self.wall_pos:
return False
return True
def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict]:
if self.done:
return self._get_state(), 0.0, True, {'episode_ended': True}
self.step_count += 1
delta = self.action_deltas[action]
new_pos = [self.agent_pos[0] + delta[0], self.agent_pos[1] + delta[1]]
reward = -1.0
done = False
info = {}
if not self._is_valid_pos(new_pos):
reward = -10.0
info['hit_wall'] = True
else:
self.board[0, self.agent_pos[0], self.agent_pos[1]] = 0.0
self.agent_pos = new_pos
self.board[0, self.agent_pos[0], self.agent_pos[1]] = 1.0
if tuple(self.agent_pos) == self.goal_pos:
reward = 10.0
done = True
info['reached_goal'] = True
elif self.pit_pos and tuple(self.agent_pos) == self.pit_pos:
reward = -10.0
done = True
info['fell_in_pit'] = True
if self.step_count >= self.max_steps:
done = True
info['max_steps_reached'] = True
self.total_reward += reward
self.done = done
info['step'] = self.step_count
info['total_reward'] = self.total_reward
return self._get_state(), reward, done, info
def render(self, mode: str = 'ascii') -> Optional[str]:
symbols = {
'empty': '.',
'agent': 'A',
'goal': 'G',
'pit': 'X',
'wall': '#'
}
lines = []
lines.append('=' * (self.width * 2 + 3))
for row in range(self.height):
line = '| '
for col in range(self.width):
if self.board[0, row, col] == 1.0:
line += symbols['agent'] + ' '
elif self.board[1, row, col] == 1.0:
line += symbols['goal'] + ' '
elif self.board[2, row, col] == 1.0:
line += symbols['pit'] + ' '
elif self.board[3, row, col] == 1.0:
line += symbols['wall'] + ' '
else:
line += symbols['empty'] + ' '
line += '|'
lines.append(line)
lines.append('=' * (self.width * 2 + 3))
lines.append(f'Step: {self.step_count} | Reward: {self.total_reward:.2f}')
output = '\n'.join(lines)
if mode == 'ascii':
print(output)
return None
elif mode == 'string':
return output
return output
def get_valid_actions(self) -> List[int]:
valid = []
for action in range(self.n_actions):
delta = self.action_deltas[action]
new_pos = [self.agent_pos[0] + delta[0], self.agent_pos[1] + delta[1]]
if self._is_valid_pos(new_pos):
valid.append(action)
return valid if valid else list(range(self.n_actions))
def clone(self) -> 'GridWorld':
env = GridWorld.__new__(GridWorld)
env.width = self.width
env.height = self.height
env.mode = self.mode
env.max_steps = self.max_steps
env.n_states = self.n_states
env.n_actions = self.n_actions
env.state_shape = self.state_shape
env.state_dim = self.state_dim
env.action_names = self.action_names
env.action_deltas = self.action_deltas
env.rng = np.random.RandomState()
env.rng.set_state(self.rng.get_state())
env.board = self.board.copy()
env.agent_pos = self.agent_pos.copy()
env.goal_pos = self.goal_pos
env.pit_pos = self.pit_pos
env.wall_pos = self.wall_pos
env.start_pos = self.start_pos
env.step_count = self.step_count
env.total_reward = self.total_reward
env.done = self.done
env._fixed_layout = self._fixed_layout.copy() if self._fixed_layout else None
return env
class ContinuousCartPole:
"""
CartPole environment with continuous state space.
Implemented from scratch using physics equations.
"""
def __init__(
self,
gravity: float = 9.8,
cart_mass: float = 1.0,
pole_mass: float = 0.1,
pole_length: float = 0.5,
force_mag: float = 10.0,
dt: float = 0.02,
max_steps: int = 500,
seed: Optional[int] = None
):
self.gravity = gravity
self.cart_mass = cart_mass
self.pole_mass = pole_mass
self.pole_length = pole_length
self.force_mag = force_mag
self.dt = dt
self.max_steps = max_steps
self.total_mass = cart_mass + pole_mass
self.pole_mass_length = pole_mass * pole_length
self.x_threshold = 2.4
self.theta_threshold = 12 * np.pi / 180
self.n_actions = 2
self.state_dim = 4
self.rng = np.random.RandomState(seed)
self.state = None
self.step_count = 0
self.done = False
def reset(self, seed: Optional[int] = None) -> np.ndarray:
if seed is not None:
self.rng = np.random.RandomState(seed)
self.state = self.rng.uniform(-0.05, 0.05, size=(4,)).astype(np.float32)
self.step_count = 0
self.done = False
return self.state.copy()
def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict]:
if self.done:
return self.state.copy(), 0.0, True, {}
x, x_dot, theta, theta_dot = self.state
force = self.force_mag if action == 1 else -self.force_mag
cos_theta = np.cos(theta)
sin_theta = np.sin(theta)
temp = (force + self.pole_mass_length * theta_dot ** 2 * sin_theta) / self.total_mass
theta_acc = (self.gravity * sin_theta - cos_theta * temp) / (
self.pole_length * (4.0 / 3.0 - self.pole_mass * cos_theta ** 2 / self.total_mass)
)
x_acc = temp - self.pole_mass_length * theta_acc * cos_theta / self.total_mass
x = x + self.dt * x_dot
x_dot = x_dot + self.dt * x_acc
theta = theta + self.dt * theta_dot
theta_dot = theta_dot + self.dt * theta_acc
self.state = np.array([x, x_dot, theta, theta_dot], dtype=np.float32)
self.step_count += 1
done = bool(
x < -self.x_threshold
or x > self.x_threshold
or theta < -self.theta_threshold
or theta > self.theta_threshold
or self.step_count >= self.max_steps
)
reward = 1.0 if not done else 0.0
if self.step_count >= self.max_steps:
reward = 1.0
self.done = done
info = {
'step': self.step_count,
'x': x,
'theta': theta
}
return self.state.copy(), reward, done, info
def render(self, mode: str = 'ascii') -> Optional[str]:
if self.state is None:
return None
x, _, theta, _ = self.state
width = 60
cart_pos = int((x / self.x_threshold + 1) * width / 2)
cart_pos = max(2, min(width - 3, cart_pos))
pole_len = 4
pole_dx = int(pole_len * np.sin(theta))
pole_dy = int(pole_len * np.cos(theta))
lines = []
lines.append('=' * width)
for row in range(-pole_len, 2):
line = [' '] * width
if row == 1:
line[cart_pos-1:cart_pos+2] = ['[', 'C', ']']
elif row == 0:
line[cart_pos] = '|'
else:
expected_row = -row
if 0 <= expected_row <= pole_len:
expected_dx = int(expected_row * np.sin(theta))
pole_x = cart_pos + expected_dx
if 0 <= pole_x < width:
line[pole_x] = '*'
lines.append(''.join(line))
lines.append('-' * width)
lines.append(f'Step: {self.step_count} | x: {x:.2f} | theta: {np.degrees(theta):.1f}°')
lines.append('=' * width)
output = '\n'.join(lines)
if mode == 'ascii':
print(output)
return None
return output
# =============================================================================
# SECTION 2: NEURAL NETWORK COMPONENTS (Lines 300-600)
# =============================================================================
class Tensor:
"""Simple tensor wrapper for automatic gradient tracking."""
def __init__(self, data: np.ndarray, requires_grad: bool = False):
self.data = np.asarray(data, dtype=np.float32)
self.requires_grad = requires_grad
self.grad = None
self._backward = lambda: None
self._prev = set()
@property
def shape(self):
return self.data.shape
def zero_grad(self):
self.grad = None
class LinearLayer:
"""Fully connected layer with weights and biases."""
def __init__(
self,
in_features: int,
out_features: int,
bias: bool = True,
init_method: str = 'xavier'
):
self.in_features = in_features
self.out_features = out_features
self.use_bias = bias
if init_method == 'xavier':
limit = np.sqrt(6.0 / (in_features + out_features))
self.weights = np.random.uniform(-limit, limit, (in_features, out_features)).astype(np.float32)
elif init_method == 'he':
std = np.sqrt(2.0 / in_features)
self.weights = np.random.randn(in_features, out_features).astype(np.float32) * std
elif init_method == 'normal':
self.weights = np.random.randn(in_features, out_features).astype(np.float32) * 0.01
else:
self.weights = np.zeros((in_features, out_features), dtype=np.float32)
if bias:
self.bias = np.zeros(out_features, dtype=np.float32)
else:
self.bias = None
self.weight_grad = np.zeros_like(self.weights)
self.bias_grad = np.zeros(out_features, dtype=np.float32) if bias else None
self._input_cache = None
def forward(self, x: np.ndarray) -> np.ndarray:
self._input_cache = x.copy()
output = np.dot(x, self.weights)
if self.use_bias:
output += self.bias
return output
def backward(self, grad_output: np.ndarray) -> np.ndarray:
batch_size = grad_output.shape[0] if grad_output.ndim > 1 else 1
if self._input_cache.ndim == 1:
self._input_cache = self._input_cache.reshape(1, -1)
if grad_output.ndim == 1:
grad_output = grad_output.reshape(1, -1)
# IN-PLACE update to preserve reference for optimizer
self.weight_grad[:] = np.dot(self._input_cache.T, grad_output) / batch_size
if self.use_bias:
self.bias_grad[:] = np.mean(grad_output, axis=0)
grad_input = np.dot(grad_output, self.weights.T)
return grad_input
def get_params(self) -> List[Tuple[np.ndarray, np.ndarray]]:
params = [(self.weights, self.weight_grad)]
if self.use_bias:
params.append((self.bias, self.bias_grad))
return params
def zero_grad(self):
self.weight_grad.fill(0)
if self.bias_grad is not None:
self.bias_grad.fill(0)
class ReLU:
"""Rectified Linear Unit activation."""
def __init__(self):
self._mask = None
def forward(self, x: np.ndarray) -> np.ndarray:
self._mask = (x > 0).astype(np.float32)
return x * self._mask
def backward(self, grad_output: np.ndarray) -> np.ndarray:
return grad_output * self._mask
def get_params(self) -> List:
return []
def zero_grad(self):
pass
class LeakyReLU:
"""Leaky ReLU activation."""
def __init__(self, negative_slope: float = 0.01):
self.negative_slope = negative_slope
self._mask = None
def forward(self, x: np.ndarray) -> np.ndarray:
self._mask = (x > 0).astype(np.float32)
return np.where(x > 0, x, x * self.negative_slope)
def backward(self, grad_output: np.ndarray) -> np.ndarray:
return grad_output * np.where(self._mask > 0, 1.0, self.negative_slope)
def get_params(self) -> List:
return []
def zero_grad(self):
pass
class Sigmoid:
"""Sigmoid activation function."""
def __init__(self):
self._output = None
def forward(self, x: np.ndarray) -> np.ndarray:
x = np.clip(x, -500, 500)
self._output = 1.0 / (1.0 + np.exp(-x))
return self._output
def backward(self, grad_output: np.ndarray) -> np.ndarray:
return grad_output * self._output * (1.0 - self._output)
def get_params(self) -> List:
return []
def zero_grad(self):
pass
class Tanh:
"""Hyperbolic tangent activation."""
def __init__(self):
self._output = None
def forward(self, x: np.ndarray) -> np.ndarray:
self._output = np.tanh(x)
return self._output
def backward(self, grad_output: np.ndarray) -> np.ndarray:
return grad_output * (1.0 - self._output ** 2)
def get_params(self) -> List:
return []
def zero_grad(self):
pass
class Softmax:
"""Softmax activation for probability outputs."""
def __init__(self, axis: int = -1):
self.axis = axis
self._output = None
def forward(self, x: np.ndarray) -> np.ndarray:
x_max = np.max(x, axis=self.axis, keepdims=True)
exp_x = np.exp(x - x_max)
self._output = exp_x / np.sum(exp_x, axis=self.axis, keepdims=True)
return self._output
def backward(self, grad_output: np.ndarray) -> np.ndarray:
return grad_output * self._output * (1.0 - self._output)
def get_params(self) -> List:
return []
def zero_grad(self):
pass
class Dropout:
"""Dropout regularization layer."""
def __init__(self, p: float = 0.5):
self.p = p
self._mask = None
self.training = True
def forward(self, x: np.ndarray) -> np.ndarray:
if not self.training:
return x
self._mask = (np.random.random(x.shape) > self.p).astype(np.float32)
return x * self._mask / (1.0 - self.p)
def backward(self, grad_output: np.ndarray) -> np.ndarray:
if not self.training:
return grad_output
return grad_output * self._mask / (1.0 - self.p)
def get_params(self) -> List:
return []
def zero_grad(self):
pass
class BatchNorm1d:
"""Batch normalization for 1D inputs."""
def __init__(self, num_features: int, eps: float = 1e-5, momentum: float = 0.1):
self.num_features = num_features
self.eps = eps
self.momentum = momentum
self.gamma = np.ones(num_features, dtype=np.float32)
self.beta = np.zeros(num_features, dtype=np.float32)
self.running_mean = np.zeros(num_features, dtype=np.float32)
self.running_var = np.ones(num_features, dtype=np.float32)
self.gamma_grad = np.zeros_like(self.gamma)
self.beta_grad = np.zeros_like(self.beta)
self._cache = None
self.training = True
def forward(self, x: np.ndarray) -> np.ndarray:
if self.training:
mean = np.mean(x, axis=0)
var = np.var(x, axis=0)
self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * mean
self.running_var = (1 - self.momentum) * self.running_var + self.momentum * var
x_norm = (x - mean) / np.sqrt(var + self.eps)
self._cache = (x, x_norm, mean, var)
else:
x_norm = (x - self.running_mean) / np.sqrt(self.running_var + self.eps)
return self.gamma * x_norm + self.beta
def backward(self, grad_output: np.ndarray) -> np.ndarray:
x, x_norm, mean, var = self._cache
batch_size = x.shape[0]
self.gamma_grad = np.sum(grad_output * x_norm, axis=0)
self.beta_grad = np.sum(grad_output, axis=0)
dx_norm = grad_output * self.gamma
dvar = np.sum(dx_norm * (x - mean) * -0.5 * (var + self.eps) ** -1.5, axis=0)
dmean = np.sum(dx_norm * -1 / np.sqrt(var + self.eps), axis=0)
dmean += dvar * np.mean(-2 * (x - mean), axis=0)
dx = dx_norm / np.sqrt(var + self.eps)
dx += dvar * 2 * (x - mean) / batch_size
dx += dmean / batch_size
return dx
def get_params(self) -> List[Tuple[np.ndarray, np.ndarray]]:
return [(self.gamma, self.gamma_grad), (self.beta, self.beta_grad)]
def zero_grad(self):
self.gamma_grad.fill(0)
self.beta_grad.fill(0)
class Sequential:
"""Sequential container for neural network layers."""
def __init__(self, layers: List = None):
self.layers = layers if layers is not None else []
self.training = True
def add(self, layer) -> 'Sequential':
self.layers.append(layer)
return self
def forward(self, x: np.ndarray) -> np.ndarray:
for layer in self.layers:
if hasattr(layer, 'training'):
layer.training = self.training
x = layer.forward(x)
return x
def backward(self, grad: np.ndarray) -> np.ndarray:
for layer in reversed(self.layers):
grad = layer.backward(grad)
return grad
def get_params(self) -> List[Tuple[np.ndarray, np.ndarray]]:
params = []
for layer in self.layers:
params.extend(layer.get_params())
return params
def zero_grad(self):
for layer in self.layers:
layer.zero_grad()
def train(self):
self.training = True
for layer in self.layers:
if hasattr(layer, 'training'):
layer.training = True
def eval(self):
self.training = False
for layer in self.layers:
if hasattr(layer, 'training'):
layer.training = False
def __call__(self, x: np.ndarray) -> np.ndarray:
return self.forward(x)
# =============================================================================
# SECTION 3: LOSS FUNCTIONS AND OPTIMIZERS (Lines 600-900)
# =============================================================================
class MSELoss:
"""Mean Squared Error loss."""
def __init__(self, reduction: str = 'mean'):
self.reduction = reduction
self._pred = None
self._target = None
def forward(self, pred: np.ndarray, target: np.ndarray) -> float:
self._pred = pred
self._target = target
diff = pred - target
loss = diff ** 2
if self.reduction == 'mean':
return float(np.mean(loss))
elif self.reduction == 'sum':
return float(np.sum(loss))
else:
return loss
def backward(self) -> np.ndarray:
grad = 2.0 * (self._pred - self._target)
if self.reduction == 'mean':
grad /= self._pred.size
return grad
def __call__(self, pred: np.ndarray, target: np.ndarray) -> float:
return self.forward(pred, target)
class HuberLoss:
"""Huber loss (smooth L1 loss)."""
def __init__(self, delta: float = 1.0, reduction: str = 'mean'):
self.delta = delta
self.reduction = reduction
self._pred = None
self._target = None
self._diff = None
def forward(self, pred: np.ndarray, target: np.ndarray) -> float:
self._pred = pred
self._target = target
self._diff = pred - target
abs_diff = np.abs(self._diff)
quadratic = np.minimum(abs_diff, self.delta)
linear = abs_diff - quadratic
loss = 0.5 * quadratic ** 2 + self.delta * linear
if self.reduction == 'mean':
return float(np.mean(loss))
elif self.reduction == 'sum':
return float(np.sum(loss))
else:
return loss
def backward(self) -> np.ndarray:
abs_diff = np.abs(self._diff)
grad = np.where(
abs_diff <= self.delta,
self._diff,
self.delta * np.sign(self._diff)
)
if self.reduction == 'mean':
grad /= self._pred.size
return grad
def __call__(self, pred: np.ndarray, target: np.ndarray) -> float:
return self.forward(pred, target)
class CrossEntropyLoss:
"""Cross entropy loss for classification."""
def __init__(self, reduction: str = 'mean'):
self.reduction = reduction
self._probs = None
self._target = None
def forward(self, logits: np.ndarray, target: np.ndarray) -> float:
max_logits = np.max(logits, axis=-1, keepdims=True)
exp_logits = np.exp(logits - max_logits)
self._probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)
self._target = target
if target.ndim == 1:
batch_size = logits.shape[0]
log_probs = np.log(self._probs[np.arange(batch_size), target] + 1e-10)
else:
log_probs = np.sum(target * np.log(self._probs + 1e-10), axis=-1)
loss = -log_probs
if self.reduction == 'mean':
return float(np.mean(loss))
elif self.reduction == 'sum':
return float(np.sum(loss))
else:
return loss
def backward(self) -> np.ndarray:
grad = self._probs.copy()
if self._target.ndim == 1:
batch_size = grad.shape[0]
grad[np.arange(batch_size), self._target] -= 1
else:
grad -= self._target
if self.reduction == 'mean':
grad /= grad.shape[0]
return grad
def __call__(self, logits: np.ndarray, target: np.ndarray) -> float:
return self.forward(logits, target)
class SGD:
"""Stochastic Gradient Descent optimizer."""
def __init__(
self,
params: List[Tuple[np.ndarray, np.ndarray]],
lr: float = 0.01,
momentum: float = 0.0,
weight_decay: float = 0.0
):
self.params = params
self.lr = lr
self.momentum = momentum
self.weight_decay = weight_decay
self.velocity = [np.zeros_like(p[0]) for p in params]
def step(self):
for i, (param, grad) in enumerate(self.params):
g = grad.copy()
if self.weight_decay > 0:
g = g + self.weight_decay * param
if self.momentum > 0:
self.velocity[i] = self.momentum * self.velocity[i] + g
param[:] = param - self.lr * self.velocity[i]
else:
param[:] = param - self.lr * g
def zero_grad(self):
for _, grad in self.params:
grad.fill(0)
class Adam:
"""Adam optimizer with momentum and adaptive learning rates."""
def __init__(
self,
params: List[Tuple[np.ndarray, np.ndarray]],
lr: float = 0.001,
beta1: float = 0.9,
beta2: float = 0.999,
eps: float = 1e-8,
weight_decay: float = 0.0
):
self.params = params
self.lr = lr
self.beta1 = beta1
self.beta2 = beta2
self.eps = eps
self.weight_decay = weight_decay
self.m = [np.zeros_like(p[0]) for p in params]
self.v = [np.zeros_like(p[0]) for p in params]
self.t = 0
def step(self):
self.t += 1
for i, (param, grad) in enumerate(self.params):
g = grad.copy()
if self.weight_decay > 0:
g = g + self.weight_decay * param
self.m[i] = self.beta1 * self.m[i] + (1 - self.beta1) * g
self.v[i] = self.beta2 * self.v[i] + (1 - self.beta2) * (g ** 2)
m_hat = self.m[i] / (1 - self.beta1 ** self.t)
v_hat = self.v[i] / (1 - self.beta2 ** self.t)
update = self.lr * m_hat / (np.sqrt(v_hat) + self.eps)
param[:] = param - update
def zero_grad(self):
for _, grad in self.params:
grad.fill(0)
class RMSprop:
"""RMSprop optimizer."""
def __init__(
self,
params: List[Tuple[np.ndarray, np.ndarray]],
lr: float = 0.01,
alpha: float = 0.99,
eps: float = 1e-8,
weight_decay: float = 0.0
):
self.params = params
self.lr = lr
self.alpha = alpha
self.eps = eps
self.weight_decay = weight_decay
self.v = [np.zeros_like(p[0]) for p in params]
def step(self):
for i, (param, grad) in enumerate(self.params):
g = grad.copy()
if self.weight_decay > 0:
g = g + self.weight_decay * param
self.v[i] = self.alpha * self.v[i] + (1 - self.alpha) * (g ** 2)
param[:] = param - self.lr * g / (np.sqrt(self.v[i]) + self.eps)
def zero_grad(self):
for _, grad in self.params:
grad.fill(0)
# =============================================================================
# SECTION 4: REPLAY BUFFERS (Lines 900-1200)
# =============================================================================
class ReplayBuffer:
"""Basic experience replay buffer."""
def __init__(self, capacity: int, state_dim: int, seed: Optional[int] = None):
self.capacity = capacity
self.state_dim = state_dim
self.states = np.zeros((capacity, state_dim), dtype=np.float32)
self.actions = np.zeros(capacity, dtype=np.int64)
self.rewards = np.zeros(capacity, dtype=np.float32)
self.next_states = np.zeros((capacity, state_dim), dtype=np.float32)
self.dones = np.zeros(capacity, dtype=np.float32)
self.position = 0
self.size = 0
self.rng = np.random.RandomState(seed)
def push(
self,
state: np.ndarray,
action: int,
reward: float,
next_state: np.ndarray,
done: bool
):
self.states[self.position] = state
self.actions[self.position] = action
self.rewards[self.position] = reward
self.next_states[self.position] = next_state
self.dones[self.position] = float(done)
self.position = (self.position + 1) % self.capacity
self.size = min(self.size + 1, self.capacity)
def sample(self, batch_size: int) -> Tuple[np.ndarray, ...]:
indices = self.rng.randint(0, self.size, size=batch_size)
return (
self.states[indices],
self.actions[indices],
self.rewards[indices],
self.next_states[indices],
self.dones[indices]
)
def __len__(self) -> int:
return self.size
def is_ready(self, batch_size: int) -> bool:
return self.size >= batch_size
class SumTree:
"""Sum tree data structure for efficient priority sampling."""
def __init__(self, capacity: int):
self.capacity = capacity
self.tree = np.zeros(2 * capacity - 1, dtype=np.float64)
self.data_pointer = 0
def _propagate(self, idx: int, change: float):
parent = (idx - 1) // 2
self.tree[parent] += change
if parent != 0:
self._propagate(parent, change)
def _retrieve(self, idx: int, s: float) -> int:
left = 2 * idx + 1
right = left + 1
if left >= len(self.tree):
return idx
if s <= self.tree[left]:
return self._retrieve(left, s)
else:
return self._retrieve(right, s - self.tree[left])
def total(self) -> float:
return self.tree[0]
def update(self, idx: int, priority: float):
change = priority - self.tree[idx]
self.tree[idx] = priority
self._propagate(idx, change)
def get_leaf(self, s: float) -> Tuple[int, float]:
idx = self._retrieve(0, s)
data_idx = idx - self.capacity + 1
return data_idx, self.tree[idx]
class PrioritizedReplayBuffer:
"""Prioritized Experience Replay buffer using sum tree."""
def __init__(
self,
capacity: int,
state_dim: int,
alpha: float = 0.6,
beta: float = 0.4,
beta_increment: float = 0.001,
epsilon: float = 1e-6,
seed: Optional[int] = None
):
self.capacity = capacity
self.state_dim = state_dim
self.alpha = alpha
self.beta = beta
self.beta_increment = beta_increment
self.epsilon = epsilon
self.tree = SumTree(capacity)
self.states = np.zeros((capacity, state_dim), dtype=np.float32)
self.actions = np.zeros(capacity, dtype=np.int64)
self.rewards = np.zeros(capacity, dtype=np.float32)
self.next_states = np.zeros((capacity, state_dim), dtype=np.float32)
self.dones = np.zeros(capacity, dtype=np.float32)
self.position = 0
self.size = 0
self.max_priority = 1.0
self.rng = np.random.RandomState(seed)
def push(
self,
state: np.ndarray,
action: int,
reward: float,
next_state: np.ndarray,
done: bool
):
self.states[self.position] = state
self.actions[self.position] = action
self.rewards[self.position] = reward
self.next_states[self.position] = next_state
self.dones[self.position] = float(done)
tree_idx = self.position + self.capacity - 1
self.tree.update(tree_idx, self.max_priority ** self.alpha)
self.position = (self.position + 1) % self.capacity
self.size = min(self.size + 1, self.capacity)
def sample(self, batch_size: int) -> Tuple[np.ndarray, ...]:
indices = np.zeros(batch_size, dtype=np.int64)
priorities = np.zeros(batch_size, dtype=np.float64)
segment = self.tree.total() / batch_size
self.beta = min(1.0, self.beta + self.beta_increment)
for i in range(batch_size):
a = segment * i
b = segment * (i + 1)
s = self.rng.uniform(a, b)
data_idx, priority = self.tree.get_leaf(s)
indices[i] = data_idx
priorities[i] = priority
sampling_probs = priorities / self.tree.total()
weights = (self.size * sampling_probs) ** (-self.beta)
weights /= weights.max()
weights = weights.astype(np.float32)
return (
self.states[indices],
self.actions[indices],
self.rewards[indices],
self.next_states[indices],
self.dones[indices],
indices,
weights
)
def update_priorities(self, indices: np.ndarray, td_errors: np.ndarray):
for idx, td_error in zip(indices, td_errors):
priority = (np.abs(td_error) + self.epsilon) ** self.alpha
self.max_priority = max(self.max_priority, priority)
tree_idx = idx + self.capacity - 1
self.tree.update(tree_idx, priority)
def __len__(self) -> int:
return self.size
def is_ready(self, batch_size: int) -> bool:
return self.size >= batch_size
class NStepReplayBuffer:
"""N-step returns replay buffer."""
def __init__(
self,
capacity: int,
state_dim: int,
n_steps: int = 3,
gamma: float = 0.99,
seed: Optional[int] = None
):
self.capacity = capacity
self.state_dim = state_dim
self.n_steps = n_steps
self.gamma = gamma
self.main_buffer = ReplayBuffer(capacity, state_dim, seed)
self.n_step_buffer = deque(maxlen=n_steps)
self.rng = np.random.RandomState(seed)
def push(
self,
state: np.ndarray,
action: int,
reward: float,
next_state: np.ndarray,
done: bool
):
self.n_step_buffer.append((state, action, reward, next_state, done))
if len(self.n_step_buffer) == self.n_steps:
n_step_return = 0.0
for i in range(self.n_steps):
n_step_return += (self.gamma ** i) * self.n_step_buffer[i][2]
first_state = self.n_step_buffer[0][0]
first_action = self.n_step_buffer[0][1]
last_next_state = self.n_step_buffer[-1][3]
last_done = self.n_step_buffer[-1][4]
self.main_buffer.push(
first_state,
first_action,
n_step_return,
last_next_state,
last_done
)
if done:
while len(self.n_step_buffer) > 0:
n = len(self.n_step_buffer)
n_step_return = 0.0
for i in range(n):
n_step_return += (self.gamma ** i) * self.n_step_buffer[i][2]
first_state = self.n_step_buffer[0][0]
first_action = self.n_step_buffer[0][1]
last_next_state = self.n_step_buffer[-1][3]
self.main_buffer.push(
first_state,
first_action,
n_step_return,
last_next_state,
True
)
self.n_step_buffer.popleft()
def sample(self, batch_size: int) -> Tuple[np.ndarray, ...]:
return self.main_buffer.sample(batch_size)
def __len__(self) -> int:
return len(self.main_buffer)
def is_ready(self, batch_size: int) -> bool:
return self.main_buffer.is_ready(batch_size)
# =============================================================================
# SECTION 5: DQN AGENTS (Lines 1200-1600)
# =============================================================================
class EpsilonGreedy:
"""Epsilon-greedy exploration strategy with decay."""
def __init__(
self,
epsilon_start: float = 1.0,
epsilon_end: float = 0.01,
epsilon_decay: float = 0.995,
decay_type: str = 'exponential',
decay_steps: int = 10000,
seed: Optional[int] = None
):
self.epsilon_start = epsilon_start
self.epsilon_end = epsilon_end
self.epsilon_decay = epsilon_decay
self.decay_type = decay_type
self.decay_steps = decay_steps
self.epsilon = epsilon_start
self.step_count = 0
self.rng = np.random.RandomState(seed)
def get_action(self, q_values: np.ndarray, valid_actions: List[int] = None) -> int:
if self.rng.random() < self.epsilon:
if valid_actions is not None:
return self.rng.choice(valid_actions)
else:
return self.rng.randint(0, len(q_values))
else:
if valid_actions is not None:
mask = np.full(len(q_values), -np.inf)
mask[valid_actions] = 0
return int(np.argmax(q_values + mask))
else:
return int(np.argmax(q_values))
def decay(self):
self.step_count += 1
if self.decay_type == 'exponential':
self.epsilon = max(
self.epsilon_end,
self.epsilon * self.epsilon_decay
)
elif self.decay_type == 'linear':
self.epsilon = max(
self.epsilon_end,
self.epsilon_start - (self.epsilon_start - self.epsilon_end) * (self.step_count / self.decay_steps)
)
def reset(self):
self.epsilon = self.epsilon_start
self.step_count = 0
class DQNNetwork:
"""Neural network for DQN Q-value estimation."""
def __init__(
self,
state_dim: int,
action_dim: int,
hidden_dims: List[int] = None,
activation: str = 'relu'
):
if hidden_dims is None:
hidden_dims = [128, 128]
self.state_dim = state_dim
self.action_dim = action_dim
self.hidden_dims = hidden_dims
if activation == 'relu':
activation_class = ReLU
elif activation == 'leaky_relu':
activation_class = LeakyReLU
elif activation == 'tanh':
activation_class = Tanh
else:
activation_class = ReLU
layers = []
prev_dim = state_dim
for hidden_dim in hidden_dims:
layers.append(LinearLayer(prev_dim, hidden_dim, init_method='he'))
layers.append(activation_class())
prev_dim = hidden_dim
layers.append(LinearLayer(prev_dim, action_dim, init_method='xavier'))
self.network = Sequential(layers)
def forward(self, state: np.ndarray) -> np.ndarray:
if state.ndim == 1:
state = state.reshape(1, -1)
return self.network.forward(state)
def backward(self, grad: np.ndarray) -> np.ndarray:
return self.network.backward(grad)
def get_params(self) -> List[Tuple[np.ndarray, np.ndarray]]:
return self.network.get_params()
def zero_grad(self):
self.network.zero_grad()
def copy_from(self, other: 'DQNNetwork'):
for (p1, _), (p2, _) in zip(self.get_params(), other.get_params()):
p1[:] = p2
def soft_update(self, other: 'DQNNetwork', tau: float):
for (p1, _), (p2, _) in zip(self.get_params(), other.get_params()):
p1[:] = tau * p2 + (1 - tau) * p1
def __call__(self, state: np.ndarray) -> np.ndarray:
return self.forward(state)
class DuelingDQNNetwork:
"""Dueling DQN network architecture."""
def __init__(
self,
state_dim: int,
action_dim: int,
hidden_dims: List[int] = None
):
if hidden_dims is None:
hidden_dims = [128, 128]
self.state_dim = state_dim
self.action_dim = action_dim
layers = []
prev_dim = state_dim
for hidden_dim in hidden_dims:
layers.append(LinearLayer(prev_dim, hidden_dim, init_method='he'))
layers.append(ReLU())
prev_dim = hidden_dim
self.feature_network = Sequential(layers)
self.value_stream = Sequential([
LinearLayer(prev_dim, 64, init_method='he'),
ReLU(),
LinearLayer(64, 1, init_method='xavier')
])
self.advantage_stream = Sequential([
LinearLayer(prev_dim, 64, init_method='he'),
ReLU(),
LinearLayer(64, action_dim, init_method='xavier')
])
def forward(self, state: np.ndarray) -> np.ndarray:
if state.ndim == 1:
state = state.reshape(1, -1)
features = self.feature_network.forward(state)
value = self.value_stream.forward(features)
advantage = self.advantage_stream.forward(features)
q_values = value + (advantage - np.mean(advantage, axis=1, keepdims=True))
return q_values
def backward(self, grad: np.ndarray) -> np.ndarray:
batch_size = grad.shape[0]
grad_value = np.sum(grad, axis=1, keepdims=True)
grad_advantage = grad - np.mean(grad, axis=1, keepdims=True)
grad_features_v = self.value_stream.backward(grad_value)
grad_features_a = self.advantage_stream.backward(grad_advantage)
grad_features = grad_features_v + grad_features_a
return self.feature_network.backward(grad_features)
def get_params(self) -> List[Tuple[np.ndarray, np.ndarray]]:
params = []
params.extend(self.feature_network.get_params())
params.extend(self.value_stream.get_params())
params.extend(self.advantage_stream.get_params())
return params
def zero_grad(self):
self.feature_network.zero_grad()
self.value_stream.zero_grad()
self.advantage_stream.zero_grad()
def copy_from(self, other: 'DuelingDQNNetwork'):
for (p1, _), (p2, _) in zip(self.get_params(), other.get_params()):
p1[:] = p2
def soft_update(self, other: 'DuelingDQNNetwork', tau: float):
for (p1, _), (p2, _) in zip(self.get_params(), other.get_params()):
p1[:] = tau * p2 + (1 - tau) * p1
def __call__(self, state: np.ndarray) -> np.ndarray:
return self.forward(state)
class DQNAgent:
"""Complete DQN Agent with vanilla, double, and dueling variants."""
def __init__(
self,
state_dim: int,
action_dim: int,
hidden_dims: List[int] = None,
lr: float = 0.001,
gamma: float = 0.99,
buffer_size: int = 100000,
batch_size: int = 64,
target_update_freq: int = 100,
tau: float = 0.005,
use_double: bool = True,
use_dueling: bool = False,
use_per: bool = False,
n_steps: int = 1,
epsilon_start: float = 1.0,
epsilon_end: float = 0.01,
epsilon_decay: float = 0.995,
seed: Optional[int] = None
):
self.state_dim = state_dim
self.action_dim = action_dim
self.gamma = gamma
self.batch_size = batch_size
self.target_update_freq = target_update_freq
self.tau = tau
self.use_double = use_double
self.use_dueling = use_dueling
self.use_per = use_per
self.n_steps = n_steps
self.gamma_n = gamma ** n_steps
if use_dueling:
self.q_network = DuelingDQNNetwork(state_dim, action_dim, hidden_dims)
self.target_network = DuelingDQNNetwork(state_dim, action_dim, hidden_dims)
else:
self.q_network = DQNNetwork(state_dim, action_dim, hidden_dims)
self.target_network = DQNNetwork(state_dim, action_dim, hidden_dims)
self.target_network.copy_from(self.q_network)
self.optimizer = Adam(self.q_network.get_params(), lr=lr)
self.loss_fn = HuberLoss()
if use_per:
self.buffer = PrioritizedReplayBuffer(buffer_size, state_dim, seed=seed)
elif n_steps > 1:
self.buffer = NStepReplayBuffer(buffer_size, state_dim, n_steps, gamma, seed)
else:
self.buffer = ReplayBuffer(buffer_size, state_dim, seed)
self.exploration = EpsilonGreedy(
epsilon_start, epsilon_end, epsilon_decay,
decay_type='exponential', seed=seed
)
self.train_steps = 0
self.episodes = 0
self.metrics = {
'losses': [],
'q_values': [],
'episode_rewards': [],
'episode_lengths': [],
'epsilon': []
}
def select_action(self, state: np.ndarray, training: bool = True) -> int:
q_values = self.q_network(state).flatten()
if training:
action = self.exploration.get_action(q_values)
else:
action = int(np.argmax(q_values))
return action
def store_transition(
self,
state: np.ndarray,
action: int,
reward: float,
next_state: np.ndarray,
done: bool
):
self.buffer.push(state, action, reward, next_state, done)
def train_step(self) -> Optional[float]:
if not self.buffer.is_ready(self.batch_size):
return None
if self.use_per:
states, actions, rewards, next_states, dones, indices, weights = self.buffer.sample(self.batch_size)
else:
states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size)
weights = np.ones(self.batch_size, dtype=np.float32)
# Forward pass for current states
current_q_all = self.q_network(states)
current_q = current_q_all[np.arange(self.batch_size), actions]
# IMPORTANT: Save input caches before any other forward passes
# because Double DQN will overwrite them
saved_caches = []
for layer in self.q_network.network.layers:
if hasattr(layer, '_input_cache') and layer._input_cache is not None:
saved_caches.append((layer, layer._input_cache.copy()))
if hasattr(layer, '_mask') and layer._mask is not None:
saved_caches.append((layer, '_mask', layer._mask.copy()))
if hasattr(layer, '_output') and layer._output is not None:
saved_caches.append((layer, '_output', layer._output.copy()))
with np.errstate(all='ignore'):
next_q_target = self.target_network(next_states)
if self.use_double:
next_q_online = self.q_network(next_states)
best_actions = np.argmax(next_q_online, axis=1)
next_q_max = next_q_target[np.arange(self.batch_size), best_actions]
else:
next_q_max = np.max(next_q_target, axis=1)
# Restore caches for backward pass
for item in saved_caches:
if len(item) == 2:
layer, cache = item
layer._input_cache = cache
else:
layer, attr, cache = item
setattr(layer, attr, cache)
gamma = self.gamma_n if self.n_steps > 1 else self.gamma
target_q = rewards + gamma * next_q_max * (1 - dones)
td_errors = current_q - target_q
if self.use_per:
self.buffer.update_priorities(indices, td_errors)
weighted_td_errors = td_errors * weights
loss = np.mean(weighted_td_errors ** 2)
self.q_network.zero_grad()
grad = np.zeros_like(current_q_all)
grad[np.arange(self.batch_size), actions] = 2 * weighted_td_errors / self.batch_size
self.q_network.backward(grad)
self.optimizer.step()
self.train_steps += 1
if self.train_steps % self.target_update_freq == 0:
if self.tau < 1.0:
self.target_network.soft_update(self.q_network, self.tau)
else:
self.target_network.copy_from(self.q_network)
self.exploration.decay()
self.metrics['losses'].append(loss)
self.metrics['q_values'].append(float(np.mean(current_q)))
self.metrics['epsilon'].append(self.exploration.epsilon)
return loss
def end_episode(self, total_reward: float, episode_length: int):
self.episodes += 1
self.metrics['episode_rewards'].append(total_reward)
self.metrics['episode_lengths'].append(episode_length)
def save(self, filepath: str):
state = {
'q_network_params': [(p.copy(), g.copy()) for p, g in self.q_network.get_params()],
'target_network_params': [(p.copy(), g.copy()) for p, g in self.target_network.get_params()],
'train_steps': self.train_steps,
'episodes': self.episodes,
'epsilon': self.exploration.epsilon,
'metrics': self.metrics,
'config': {
'state_dim': self.state_dim,
'action_dim': self.action_dim,
'gamma': self.gamma,
'batch_size': self.batch_size,
'use_double': self.use_double,
'use_dueling': self.use_dueling,
'use_per': self.use_per,
'n_steps': self.n_steps
}
}
with open(filepath, 'wb') as f:
pickle.dump(state, f)
def load(self, filepath: str):
with open(filepath, 'rb') as f:
state = pickle.load(f)
for (p, g), (saved_p, saved_g) in zip(self.q_network.get_params(), state['q_network_params']):
p[:] = saved_p
g[:] = saved_g
for (p, g), (saved_p, saved_g) in zip(self.target_network.get_params(), state['target_network_params']):
p[:] = saved_p
g[:] = saved_g
self.train_steps = state['train_steps']
self.episodes = state['episodes']
self.exploration.epsilon = state['epsilon']
self.metrics = state['metrics']
# =============================================================================
# SECTION 6: TRAINING LOOP (Lines 1600-1800)
# =============================================================================
class Trainer:
"""Complete training loop with logging and checkpointing."""
def __init__(
self,
agent: DQNAgent,
env,
eval_env=None,
log_interval: int = 100,
eval_interval: int = 1000,
eval_episodes: int = 10,
save_interval: int = 5000,
checkpoint_dir: str = './checkpoints',
early_stop_reward: float = None,
early_stop_window: int = 100
):
self.agent = agent
self.env = env
self.eval_env = eval_env if eval_env is not None else env
self.log_interval = log_interval
self.eval_interval = eval_interval
self.eval_episodes = eval_episodes
self.save_interval = save_interval
self.checkpoint_dir = checkpoint_dir
self.early_stop_reward = early_stop_reward
self.early_stop_window = early_stop_window
os.makedirs(checkpoint_dir, exist_ok=True)
self.training_history = {
'episode': [],
'reward': [],
'length': [],
'loss': [],
'epsilon': [],
'eval_reward': [],
'eval_length': []
}
def train(self, num_episodes: int) -> Dict:
start_time = time.time()
total_steps = 0
best_eval_reward = float('-inf')
recent_rewards = deque(maxlen=self.early_stop_window)
for episode in range(num_episodes):
state = self.env.reset()
episode_reward = 0.0
episode_length = 0
episode_losses = []
done = False
while not done:
action = self.agent.select_action(state, training=True)
next_state, reward, done, info = self.env.step(action)
self.agent.store_transition(state, action, reward, next_state, done)
loss = self.agent.train_step()
if loss is not None:
episode_losses.append(loss)
state = next_state
episode_reward += reward
episode_length += 1
total_steps += 1
self.agent.end_episode(episode_reward, episode_length)
recent_rewards.append(episode_reward)
self.training_history['episode'].append(episode)
self.training_history['reward'].append(episode_reward)
self.training_history['length'].append(episode_length)
self.training_history['loss'].append(np.mean(episode_losses) if episode_losses else 0)
self.training_history['epsilon'].append(self.agent.exploration.epsilon)
if episode % self.log_interval == 0:
avg_reward = np.mean(list(recent_rewards))
avg_loss = np.mean(episode_losses) if episode_losses else 0
elapsed = time.time() - start_time
print(f"Episode {episode:5d} | "
f"Reward: {episode_reward:7.2f} | "
f"Avg100: {avg_reward:7.2f} | "
f"Loss: {avg_loss:.4f} | "
f"Eps: {self.agent.exploration.epsilon:.3f} | "
f"Steps: {total_steps:7d} | "
f"Time: {elapsed:.1f}s")
if episode % self.eval_interval == 0 and episode > 0:
eval_reward, eval_length = self.evaluate()
self.training_history['eval_reward'].append(eval_reward)
self.training_history['eval_length'].append(eval_length)
print(f" [EVAL] Avg Reward: {eval_reward:.2f} | Avg Length: {eval_length:.1f}")
if eval_reward > best_eval_reward:
best_eval_reward = eval_reward
self.agent.save(os.path.join(self.checkpoint_dir, 'best_model.pkl'))
if episode % self.save_interval == 0 and episode > 0:
self.agent.save(os.path.join(self.checkpoint_dir, f'checkpoint_{episode}.pkl'))
if self.early_stop_reward is not None:
if len(recent_rewards) >= self.early_stop_window:
if np.mean(recent_rewards) >= self.early_stop_reward:
print(f"Early stopping: reached target reward {self.early_stop_reward}")
break
self.agent.save(os.path.join(self.checkpoint_dir, 'final_model.pkl'))
return self.training_history
def evaluate(self) -> Tuple[float, float]:
total_rewards = []
total_lengths = []
for _ in range(self.eval_episodes):
state = self.eval_env.reset()
episode_reward = 0.0
episode_length = 0
done = False
while not done:
action = self.agent.select_action(state, training=False)
next_state, reward, done, info = self.eval_env.step(action)
state = next_state
episode_reward += reward
episode_length += 1
total_rewards.append(episode_reward)
total_lengths.append(episode_length)
return np.mean(total_rewards), np.mean(total_lengths)
def save_history(self, filepath: str):
with open(filepath, 'w') as f:
json.dump(self.training_history, f, indent=2)
def load_history(self, filepath: str):
with open(filepath, 'r') as f:
self.training_history = json.load(f)
# =============================================================================
# SECTION 7: VISUALIZATION (Lines 1800-1950)
# =============================================================================
class Visualizer:
"""Visualization utilities for training metrics and agent behavior."""
def __init__(self, save_dir: str = './plots'):
self.save_dir = save_dir
os.makedirs(save_dir, exist_ok=True)
def plot_training_curves(
self,
history: Dict,
filename: str = 'training_curves.txt'
) -> str:
output_lines = []
output_lines.append("=" * 80)
output_lines.append("TRAINING CURVES (ASCII)")
output_lines.append("=" * 80)
output_lines.append("\nREWARD OVER EPISODES:")
output_lines.append("-" * 60)
rewards = history.get('reward', [])
if rewards:
self._ascii_plot(rewards, output_lines, width=60, height=15)
output_lines.append("\nLOSS OVER EPISODES:")
output_lines.append("-" * 60)
losses = history.get('loss', [])
if losses:
self._ascii_plot(losses, output_lines, width=60, height=15)
output_lines.append("\nEPSILON DECAY:")
output_lines.append("-" * 60)
epsilon = history.get('epsilon', [])
if epsilon:
self._ascii_plot(epsilon, output_lines, width=60, height=10)
output_lines.append("\nSTATISTICS:")
output_lines.append("-" * 60)
if rewards:
output_lines.append(f" Total Episodes: {len(rewards)}")
output_lines.append(f" Max Reward: {max(rewards):.2f}")
output_lines.append(f" Min Reward: {min(rewards):.2f}")
output_lines.append(f" Mean Reward: {np.mean(rewards):.2f}")
output_lines.append(f" Std Reward: {np.std(rewards):.2f}")
output_lines.append(f" Final Avg (last 100): {np.mean(rewards[-100:]):.2f}")
output = '\n'.join(output_lines)
filepath = os.path.join(self.save_dir, filename)
with open(filepath, 'w') as f:
f.write(output)
return output
def _ascii_plot(
self,
data: List[float],
output_lines: List[str],
width: int = 60,
height: int = 15
):
if not data:
output_lines.append(" No data to plot")
return
data = np.array(data)
if len(data) > width:
step = len(data) // width
data = [np.mean(data[i:i+step]) for i in range(0, len(data), step)][:width]
data = np.array(data)
min_val = np.min(data)
max_val = np.max(data)
if max_val == min_val:
max_val = min_val + 1
normalized = ((data - min_val) / (max_val - min_val) * (height - 1)).astype(int)
grid = [[' ' for _ in range(len(data))] for _ in range(height)]
for x, y in enumerate(normalized):
grid[height - 1 - y][x] = '*'
output_lines.append(f" {max_val:10.3f} |")
for row in grid:
output_lines.append(f" |{''.join(row)}")
output_lines.append(f" {min_val:10.3f} |{'_' * len(data)}")
output_lines.append(f" 0{' ' * (len(data) - 6)}{len(data)}")
def plot_q_values_heatmap(
self,
agent: DQNAgent,
env,
filename: str = 'q_values.txt'
) -> str:
output_lines = []
output_lines.append("=" * 80)
output_lines.append("Q-VALUES HEATMAP")
output_lines.append("=" * 80)
if not hasattr(env, 'height') or not hasattr(env, 'width'):
output_lines.append("Environment doesn't support grid visualization")
return '\n'.join(output_lines)
action_names = ['UP', 'DOWN', 'LEFT', 'RIGHT']
for action_idx, action_name in enumerate(action_names):
output_lines.append(f"\nQ-VALUES FOR ACTION: {action_name}")
output_lines.append("-" * 40)
q_grid = np.zeros((env.height, env.width))
for row in range(env.height):
for col in range(env.width):
state = np.zeros((env.height, env.width), dtype=np.float32)
state[row, col] = 4
state_flat = state.flatten()
q_values = agent.q_network(state_flat).flatten()
q_grid[row, col] = q_values[action_idx]
min_q = np.min(q_grid)
max_q = np.max(q_grid)
symbols = ' ░▒▓█'
for row in range(env.height):
line = " "
for col in range(env.width):
if max_q != min_q:
normalized = (q_grid[row, col] - min_q) / (max_q - min_q)
else:
normalized = 0.5
idx = min(int(normalized * (len(symbols) - 1)), len(symbols) - 1)
line += symbols[idx] + ' '
output_lines.append(line)
output_lines.append(f" Min: {min_q:.3f} | Max: {max_q:.3f}")
output = '\n'.join(output_lines)
filepath = os.path.join(self.save_dir, filename)
with open(filepath, 'w') as f:
f.write(output)
return output
def record_episode(
self,
agent: DQNAgent,
env,
filename: str = 'episode_recording.txt'
) -> str:
output_lines = []
output_lines.append("=" * 80)
output_lines.append("EPISODE RECORDING")
output_lines.append("=" * 80)
state = env.reset()
done = False
step = 0
total_reward = 0.0
while not done and step < 100:
output_lines.append(f"\n--- Step {step} ---")
render = env.render(mode='string')
if render:
output_lines.append(render)
q_values = agent.q_network(state).flatten()
action = int(np.argmax(q_values))
output_lines.append(f"Q-values: {q_values}")
output_lines.append(f"Action: {env.action_names[action] if hasattr(env, 'action_names') else action}")
next_state, reward, done, info = env.step(action)
total_reward += reward
output_lines.append(f"Reward: {reward:.2f} | Total: {total_reward:.2f}")
state = next_state
step += 1
output_lines.append(f"\n{'=' * 40}")
output_lines.append(f"EPISODE COMPLETE")
output_lines.append(f"Total Steps: {step}")
output_lines.append(f"Total Reward: {total_reward:.2f}")
output_lines.append(f"Final Info: {info}")
output = '\n'.join(output_lines)
filepath = os.path.join(self.save_dir, filename)
with open(filepath, 'w') as f:
f.write(output)
return output
# =============================================================================
# SECTION 8: HYPERPARAMETER TUNING (Lines 1950-2050)
# =============================================================================
class HyperparameterSearch:
"""Grid and random search for hyperparameter tuning."""
def __init__(
self,
env_class,
env_kwargs: Dict,
param_grid: Dict,
n_episodes: int = 100,
eval_episodes: int = 10,
n_trials: int = 10,
seed: int = 42
):
self.env_class = env_class
self.env_kwargs = env_kwargs
self.param_grid = param_grid
self.n_episodes = n_episodes
self.eval_episodes = eval_episodes
self.n_trials = n_trials
self.seed = seed
self.results = []
self.best_params = None
self.best_score = float('-inf')
def _sample_params(self) -> Dict:
params = {}
for key, values in self.param_grid.items():
if isinstance(values, list):
params[key] = np.random.choice(values)
elif isinstance(values, tuple) and len(values) == 2:
low, high = values
if isinstance(low, float):
params[key] = np.random.uniform(low, high)
else:
params[key] = np.random.randint(low, high + 1)
else:
params[key] = values
return params
def run_trial(self, params: Dict) -> float:
np.random.seed(self.seed)
env = self.env_class(**self.env_kwargs)
eval_env = self.env_class(**self.env_kwargs)
state_dim = env.n_states if hasattr(env, 'n_states') else env.state_dim
action_dim = env.n_actions
agent = DQNAgent(
state_dim=state_dim,
action_dim=action_dim,
hidden_dims=params.get('hidden_dims', [64, 64]),
lr=params.get('lr', 0.001),
gamma=params.get('gamma', 0.99),
buffer_size=params.get('buffer_size', 10000),
batch_size=params.get('batch_size', 32),
target_update_freq=params.get('target_update_freq', 100),
use_double=params.get('use_double', True),
use_dueling=params.get('use_dueling', False),
epsilon_start=params.get('epsilon_start', 1.0),
epsilon_end=params.get('epsilon_end', 0.01),
epsilon_decay=params.get('epsilon_decay', 0.995),
seed=self.seed
)
trainer = Trainer(
agent, env, eval_env,
log_interval=self.n_episodes + 1,
eval_interval=self.n_episodes + 1,
checkpoint_dir='/tmp/hp_search'
)
trainer.train(self.n_episodes)
eval_reward, _ = trainer.evaluate()
return eval_reward
def search(self, method: str = 'random') -> Dict:
print(f"Starting hyperparameter search ({method})")
print("=" * 60)
for trial in range(self.n_trials):
params = self._sample_params()
print(f"\nTrial {trial + 1}/{self.n_trials}")
print(f"Params: {params}")
try:
score = self.run_trial(params)
self.results.append({
'params': params,
'score': score
})
print(f"Score: {score:.2f}")
if score > self.best_score:
self.best_score = score
self.best_params = params.copy()
print(f" ** New best! **")
except Exception as e:
print(f"Trial failed: {e}")
self.results.append({
'params': params,
'score': float('-inf'),
'error': str(e)
})
print("\n" + "=" * 60)
print("SEARCH COMPLETE")
print(f"Best Score: {self.best_score:.2f}")
print(f"Best Params: {self.best_params}")
return {
'best_params': self.best_params,
'best_score': self.best_score,
'all_results': self.results
}
# =============================================================================
# SECTION 9: MAIN ENTRY POINT (Lines 2050-2100)
# =============================================================================
def create_default_config() -> Dict:
return {
'env': {
'type': 'gridworld',
'width': 4,
'height': 4,
'mode': 'static',
'max_steps': 50
},
'agent': {
'hidden_dims': [150, 100],
'lr': 0.001,
'gamma': 0.9,
'buffer_size': 1000,
'batch_size': 200,
'target_update_freq': 500,
'tau': 1.0,
'use_double': True,
'use_dueling': False,
'use_per': False,
'n_steps': 1,
'epsilon_start': 1.0,
'epsilon_end': 0.1,
'epsilon_decay': 0.9999
},
'training': {
'num_episodes': 5000,
'log_interval': 500,
'eval_interval': 1000,
'eval_episodes': 100,
'save_interval': 1000,
'checkpoint_dir': './checkpoints',
'early_stop_reward': None,
'early_stop_window': 100
},
'seed': 42
}
def create_env(config: Dict):
env_type = config['env']['type']
if env_type == 'gridworld':
return GridWorld(
width=config['env']['width'],
height=config['env']['height'],
mode=config['env'].get('mode', 'static'),
max_steps=config['env']['max_steps'],
seed=config.get('seed', None)
)
elif env_type == 'cartpole':
return ContinuousCartPole(
max_steps=config['env'].get('max_steps', 500),
seed=config.get('seed', None)
)
else:
raise ValueError(f"Unknown environment type: {env_type}")
def create_agent(config: Dict, state_dim: int, action_dim: int) -> DQNAgent:
agent_config = config['agent']
return DQNAgent(
state_dim=state_dim,
action_dim=action_dim,
hidden_dims=agent_config['hidden_dims'],
lr=agent_config['lr'],
gamma=agent_config['gamma'],
buffer_size=agent_config['buffer_size'],
batch_size=agent_config['batch_size'],
target_update_freq=agent_config['target_update_freq'],
tau=agent_config['tau'],
use_double=agent_config['use_double'],
use_dueling=agent_config['use_dueling'],
use_per=agent_config['use_per'],
n_steps=agent_config['n_steps'],
epsilon_start=agent_config['epsilon_start'],
epsilon_end=agent_config['epsilon_end'],
epsilon_decay=agent_config['epsilon_decay'],
seed=config.get('seed', None)
)
def main():
parser = argparse.ArgumentParser(description='Complete RL Training Script')
parser.add_argument('--env', type=str, default='gridworld',
choices=['gridworld', 'cartpole'],
help='Environment type')
parser.add_argument('--episodes', type=int, default=5000,
help='Number of training episodes')
parser.add_argument('--lr', type=float, default=0.001,
help='Learning rate')
parser.add_argument('--gamma', type=float, default=0.9,
help='Discount factor')
parser.add_argument('--batch-size', type=int, default=200,
help='Batch size')
parser.add_argument('--buffer-size', type=int, default=1000,
help='Replay buffer size')
parser.add_argument('--hidden-dims', type=int, nargs='+', default=[150, 100],
help='Hidden layer dimensions')
parser.add_argument('--double', action='store_true', default=True,
help='Use Double DQN')
parser.add_argument('--dueling', action='store_true', default=False,
help='Use Dueling DQN')
parser.add_argument('--per', action='store_true', default=False,
help='Use Prioritized Experience Replay')
parser.add_argument('--n-steps', type=int, default=1,
help='N-step returns')
parser.add_argument('--seed', type=int, default=42,
help='Random seed')
parser.add_argument('--checkpoint-dir', type=str, default='./checkpoints',
help='Checkpoint directory')
parser.add_argument('--load', type=str, default=None,
help='Load model from path')
parser.add_argument('--eval-only', action='store_true',
help='Only run evaluation')
parser.add_argument('--visualize', action='store_true',
help='Generate visualizations after training')
args = parser.parse_args()
np.random.seed(args.seed)
config = create_default_config()
config['env']['type'] = args.env
config['agent']['lr'] = args.lr
config['agent']['gamma'] = args.gamma
config['agent']['batch_size'] = args.batch_size
config['agent']['buffer_size'] = args.buffer_size
config['agent']['hidden_dims'] = args.hidden_dims
config['agent']['use_double'] = args.double
config['agent']['use_dueling'] = args.dueling
config['agent']['use_per'] = args.per
config['agent']['n_steps'] = args.n_steps
config['training']['num_episodes'] = args.episodes
config['training']['checkpoint_dir'] = args.checkpoint_dir
config['seed'] = args.seed
print("=" * 60)
print("REINFORCEMENT LEARNING TRAINING")
print("=" * 60)
print(f"Environment: {args.env}")
print(f"Episodes: {args.episodes}")
print(f"Learning Rate: {args.lr}")
print(f"Gamma: {args.gamma}")
print(f"Double DQN: {args.double}")
print(f"Dueling DQN: {args.dueling}")
print(f"PER: {args.per}")
print(f"N-Steps: {args.n_steps}")
print("=" * 60)
env = create_env(config)
eval_env = create_env(config)
state_dim = env.state_dim
action_dim = env.n_actions
print(f"State Dim: {state_dim}")
print(f"Action Dim: {action_dim}")
print("=" * 60)
agent = create_agent(config, state_dim, action_dim)
if args.load:
print(f"Loading model from: {args.load}")
agent.load(args.load)
if args.eval_only:
print("Running evaluation only...")
trainer = Trainer(agent, env, eval_env, checkpoint_dir=args.checkpoint_dir)
eval_reward, eval_length = trainer.evaluate()
print(f"Evaluation Results:")
print(f" Avg Reward: {eval_reward:.2f}")
print(f" Avg Length: {eval_length:.1f}")
return
trainer = Trainer(
agent, env, eval_env,
log_interval=config['training']['log_interval'],
eval_interval=config['training']['eval_interval'],
eval_episodes=config['training']['eval_episodes'],
save_interval=config['training']['save_interval'],
checkpoint_dir=config['training']['checkpoint_dir'],
early_stop_reward=config['training']['early_stop_reward'],
early_stop_window=config['training']['early_stop_window']
)
print("\nStarting training...")
history = trainer.train(config['training']['num_episodes'])
trainer.save_history(os.path.join(args.checkpoint_dir, 'training_history.json'))
if args.visualize:
print("\nGenerating visualizations...")
viz = Visualizer(save_dir=args.checkpoint_dir)
training_curves = viz.plot_training_curves(history)
print(training_curves)
if args.env == 'gridworld':
q_heatmap = viz.plot_q_values_heatmap(agent, env)
print(q_heatmap)
episode_recording = viz.record_episode(agent, eval_env)
print(episode_recording)
print("\n" + "=" * 60)
print("TRAINING COMPLETE")
print("=" * 60)
final_eval_reward, final_eval_length = trainer.evaluate()
print(f"Final Evaluation:")
print(f" Avg Reward: {final_eval_reward:.2f}")
print(f" Avg Length: {final_eval_length:.1f}")
if history['reward']:
print(f"\nTraining Statistics:")
print(f" Total Episodes: {len(history['reward'])}")
print(f" Best Reward: {max(history['reward']):.2f}")
print(f" Final Avg (last 100): {np.mean(history['reward'][-100:]):.2f}")
print(f"\nCheckpoints saved to: {args.checkpoint_dir}")
if __name__ == '__main__':
main()
# =============================================================================
# SECTION 8: PPO - PROXIMAL POLICY OPTIMIZATION (Lines 2430+)
# =============================================================================
class PPOBuffer:
"""GAE buffer za PPO"""
def __init__(self, state_dim: int, size: int, gamma: float = 0.99, lam: float = 0.95):
self.states = np.zeros((size, state_dim), dtype=np.float32)
self.actions = np.zeros(size, dtype=np.int32)
self.rewards = np.zeros(size, dtype=np.float32)
self.values = np.zeros(size, dtype=np.float32)
self.log_probs = np.zeros(size, dtype=np.float32)
self.advantages = np.zeros(size, dtype=np.float32)
self.returns = np.zeros(size, dtype=np.float32)
self.gamma = gamma
self.lam = lam
self.ptr = 0
self.path_start = 0
self.max_size = size
def store(self, state, action, reward, value, log_prob):
assert self.ptr < self.max_size
self.states[self.ptr] = state
self.actions[self.ptr] = action
self.rewards[self.ptr] = reward
self.values[self.ptr] = value
self.log_probs[self.ptr] = log_prob
self.ptr += 1
def finish_path(self, last_value: float = 0):
"""Compute GAE advantages"""
path_slice = slice(self.path_start, self.ptr)
rewards = np.append(self.rewards[path_slice], last_value)
values = np.append(self.values[path_slice], last_value)
# GAE-Lambda
deltas = rewards[:-1] + self.gamma * values[1:] - values[:-1]
self.advantages[path_slice] = self._discount_cumsum(deltas, self.gamma * self.lam)
self.returns[path_slice] = self._discount_cumsum(rewards[:-1], self.gamma)
self.path_start = self.ptr
def _discount_cumsum(self, x, discount):
n = len(x)
out = np.zeros(n, dtype=np.float32)
out[-1] = x[-1]
for i in range(n - 2, -1, -1):
out[i] = x[i] + discount * out[i + 1]
return out
def get(self):
assert self.ptr == self.max_size
self.ptr = 0
self.path_start = 0
# Normalize advantages
adv_mean = np.mean(self.advantages)
adv_std = np.std(self.advantages) + 1e-8
self.advantages = (self.advantages - adv_mean) / adv_std
return {
'states': self.states,
'actions': self.actions,
'returns': self.returns,
'advantages': self.advantages,
'log_probs': self.log_probs
}
class ActorCritic:
"""Actor-Critic za PPO - čist numpy"""
def __init__(self, state_dim: int, action_dim: int, hidden_dims: List[int] = [64, 64], lr: float = 3e-4):
self.state_dim = state_dim
self.action_dim = action_dim
self.lr = lr
# Shared layers
dims = [state_dim] + hidden_dims
self.shared_weights = []
self.shared_biases = []
for i in range(len(dims) - 1):
w = np.random.randn(dims[i], dims[i + 1]).astype(np.float32) * np.sqrt(2.0 / dims[i])
b = np.zeros(dims[i + 1], dtype=np.float32)
self.shared_weights.append(w)
self.shared_biases.append(b)
# Actor head (policy)
self.actor_w = np.random.randn(hidden_dims[-1], action_dim).astype(np.float32) * 0.01
self.actor_b = np.zeros(action_dim, dtype=np.float32)
# Critic head (value)
self.critic_w = np.random.randn(hidden_dims[-1], 1).astype(np.float32) * 1.0
self.critic_b = np.zeros(1, dtype=np.float32)
# Adam state
self._init_adam()
def _init_adam(self):
self.t = 0
self.m = {}
self.v = {}
all_params = self.shared_weights + self.shared_biases + [self.actor_w, self.actor_b, self.critic_w, self.critic_b]
for i, p in enumerate(all_params):
self.m[i] = np.zeros_like(p)
self.v[i] = np.zeros_like(p)
def forward(self, state: np.ndarray):
"""Forward pass"""
x = state
self.activations = [x]
for w, b in zip(self.shared_weights, self.shared_biases):
x = np.tanh(x @ w + b)
self.activations.append(x)
# Actor output (logits)
logits = x @ self.actor_w + self.actor_b
# Critic output (value)
value = (x @ self.critic_w + self.critic_b).squeeze()
return logits, value
def get_action(self, state: np.ndarray, deterministic: bool = False):
"""Sample action from policy"""
logits, value = self.forward(state)
# Softmax
logits_max = np.max(logits, axis=-1, keepdims=True)
exp_logits = np.exp(logits - logits_max)
probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)
if deterministic:
action = np.argmax(probs, axis=-1)
else:
if probs.ndim == 1:
action = np.random.choice(self.action_dim, p=probs)
else:
action = np.array([np.random.choice(self.action_dim, p=p) for p in probs])
# Log probability
log_prob = np.log(probs[action] + 1e-8) if probs.ndim == 1 else np.log(probs[np.arange(len(action)), action] + 1e-8)
return action, value, log_prob
def evaluate_actions(self, states: np.ndarray, actions: np.ndarray):
"""Evaluate log probs and values for given states/actions"""
logits, values = self.forward(states)
# Softmax
logits_max = np.max(logits, axis=-1, keepdims=True)
exp_logits = np.exp(logits - logits_max)
probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)
# Log probs for taken actions
log_probs = np.log(probs[np.arange(len(actions)), actions] + 1e-8)
# Entropy
entropy = -np.sum(probs * np.log(probs + 1e-8), axis=-1).mean()
return log_probs, values, entropy
class PPOAgent:
"""Proximal Policy Optimization Agent"""
def __init__(
self,
state_dim: int,
action_dim: int,
hidden_dims: List[int] = [64, 64],
lr: float = 3e-4,
gamma: float = 0.99,
lam: float = 0.95,
clip_ratio: float = 0.2,
target_kl: float = 0.01,
train_iters: int = 80,
value_coef: float = 0.5,
entropy_coef: float = 0.01,
max_grad_norm: float = 0.5,
seed: int = None
):
if seed is not None:
np.random.seed(seed)
self.state_dim = state_dim
self.action_dim = action_dim
self.gamma = gamma
self.lam = lam
self.clip_ratio = clip_ratio
self.target_kl = target_kl
self.train_iters = train_iters
self.value_coef = value_coef
self.entropy_coef = entropy_coef
self.max_grad_norm = max_grad_norm
self.actor_critic = ActorCritic(state_dim, action_dim, hidden_dims, lr)
def get_action(self, state: np.ndarray, deterministic: bool = False):
return self.actor_critic.get_action(state, deterministic)
def update(self, buffer_data: Dict) -> Dict:
"""PPO update"""
states = buffer_data['states']
actions = buffer_data['actions']
old_log_probs = buffer_data['log_probs']
advantages = buffer_data['advantages']
returns = buffer_data['returns']
total_loss = 0
policy_loss = 0
value_loss = 0
for i in range(self.train_iters):
log_probs, values, entropy = self.actor_critic.evaluate_actions(states, actions)
# Policy loss (PPO clip)
ratio = np.exp(log_probs - old_log_probs)
clip_adv = np.clip(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio) * advantages
policy_loss = -np.mean(np.minimum(ratio * advantages, clip_adv))
# Value loss
value_loss = np.mean((values - returns) ** 2)
# Total loss
loss = policy_loss + self.value_coef * value_loss - self.entropy_coef * entropy
# Approximate KL divergence for early stopping
approx_kl = np.mean(old_log_probs - log_probs)
if approx_kl > 1.5 * self.target_kl:
break
total_loss = loss
# Gradient update (simplified - full backprop would need more code)
# For now using finite differences approximation
self._update_params(states, actions, advantages, returns, old_log_probs)
return {
'loss': total_loss,
'policy_loss': policy_loss,
'value_loss': value_loss,
'entropy': entropy,
'kl': approx_kl
}
def _update_params(self, states, actions, advantages, returns, old_log_probs, eps=1e-4):
"""Simplified parameter update using numerical gradients"""
lr = self.actor_critic.lr
# Update actor weights
for idx, w in enumerate(self.actor_critic.shared_weights):
grad = np.zeros_like(w)
# Sample gradient estimation (faster than full finite diff)
for _ in range(min(10, w.size)):
i, j = np.random.randint(0, w.shape[0]), np.random.randint(0, w.shape[1])
w[i, j] += eps
loss_plus = self._compute_loss(states, actions, advantages, returns, old_log_probs)
w[i, j] -= 2 * eps
loss_minus = self._compute_loss(states, actions, advantages, returns, old_log_probs)
w[i, j] += eps
grad[i, j] = (loss_plus - loss_minus) / (2 * eps)
# Gradient clipping
grad_norm = np.linalg.norm(grad)
if grad_norm > self.max_grad_norm:
grad = grad * self.max_grad_norm / grad_norm
w -= lr * grad
def _compute_loss(self, states, actions, advantages, returns, old_log_probs):
log_probs, values, entropy = self.actor_critic.evaluate_actions(states, actions)
ratio = np.exp(log_probs - old_log_probs)
clip_adv = np.clip(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio) * advantages
policy_loss = -np.mean(np.minimum(ratio * advantages, clip_adv))
value_loss = np.mean((values - returns) ** 2)
return policy_loss + self.value_coef * value_loss - self.entropy_coef * entropy
def save(self, path: str):
data = {
'shared_weights': self.actor_critic.shared_weights,
'shared_biases': self.actor_critic.shared_biases,
'actor_w': self.actor_critic.actor_w,
'actor_b': self.actor_critic.actor_b,
'critic_w': self.actor_critic.critic_w,
'critic_b': self.actor_critic.critic_b
}
with open(path, 'wb') as f:
pickle.dump(data, f)
def load(self, path: str):
with open(path, 'rb') as f:
data = pickle.load(f)
self.actor_critic.shared_weights = data['shared_weights']
self.actor_critic.shared_biases = data['shared_biases']
self.actor_critic.actor_w = data['actor_w']
self.actor_critic.actor_b = data['actor_b']
self.actor_critic.critic_w = data['critic_w']
self.actor_critic.critic_b = data['critic_b']
def train_ppo(env, agent: PPOAgent, num_episodes: int = 1000, steps_per_epoch: int = 4000):
"""PPO Training Loop"""
buffer = PPOBuffer(agent.state_dim, steps_per_epoch, agent.gamma, agent.lam)
state = env.reset()
episode_reward = 0
episode_length = 0
episode_rewards = []
print("\n" + "=" * 60)
print("PPO TRAINING")
print("=" * 60)
for epoch in range(num_episodes // 10):
for t in range(steps_per_epoch):
action, value, log_prob = agent.get_action(state)
next_state, reward, done, info = env.step(action)
episode_reward += reward
episode_length += 1
buffer.store(state, action, reward, value, log_prob)
state = next_state
epoch_ended = t == steps_per_epoch - 1
if done or epoch_ended:
if epoch_ended and not done:
_, last_value, _ = agent.get_action(state)
else:
last_value = 0
buffer.finish_path(last_value)
if done:
episode_rewards.append(episode_reward)
episode_reward = 0
episode_length = 0
state = env.reset()
# Update
data = buffer.get()
update_info = agent.update(data)
avg_reward = np.mean(episode_rewards[-10:]) if episode_rewards else 0
print(f"Epoch {epoch:4d} | Avg Reward: {avg_reward:8.2f} | Loss: {update_info['loss']:.4f} | KL: {update_info['kl']:.4f}")
return episode_rewards
print("\n✅ PPO Implementation Added!")
print("Run with: python rl_complete.py --env gridworld --ppo")