P2-ETF-DQN-ENGINE / agent.py
P2SAMAPA's picture
[auto] Deploy to HF Space from GitHub
a5f4aab verified
# agent.py
# Dueling DQN implementation (PyTorch).
#
# Architecture follows:
# Wang et al. (2016) "Dueling Network Architectures for Deep Reinforcement Learning"
# Applied to ETF selection as recommended by:
# Yasin & Gill (2024) "RL Framework for Quantitative Trading" arXiv:2411.07585
#
# Key design choices:
# - MLP policy (paper showed MLP > LSTM for daily ETF data)
# - Separate Value and Advantage streams (Dueling β€” better for multi-action spaces)
# - Experience replay buffer (100k transitions)
# - Hard target network update every TARGET_UPDATE_FREQ steps
# - epsilon-greedy exploration: 1.0 β†’ 0.05 over first 50% of training
import os
import random
from collections import deque
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import config
# ── Dueling DQN Network ───────────────────────────────────────────────────────
class DuelingDQN(nn.Module):
"""
Dueling architecture:
Input β†’ shared trunk β†’ [Value stream | Advantage stream]
Q(s,a) = V(s) + A(s,a) - mean(A(s,Β·))
The mean-subtraction ensures identifiability:
V and A cannot compensate for each other arbitrarily.
"""
def __init__(self, state_size: int, n_actions: int,
hidden: int = config.HIDDEN_UNITS):
super().__init__()
# Shared feature extractor
self.trunk = nn.Sequential(
nn.Linear(state_size, hidden),
nn.LayerNorm(hidden),
nn.ReLU(),
nn.Linear(hidden, hidden),
nn.LayerNorm(hidden),
nn.ReLU(),
)
# Value stream V(s) β€” scalar
self.value_stream = nn.Sequential(
nn.Linear(hidden, hidden // 2),
nn.ReLU(),
nn.Linear(hidden // 2, 1),
)
# Advantage stream A(s,a) β€” one per action
self.advantage_stream = nn.Sequential(
nn.Linear(hidden, hidden // 2),
nn.ReLU(),
nn.Linear(hidden // 2, n_actions),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
features = self.trunk(x)
value = self.value_stream(features) # (batch, 1)
advantage= self.advantage_stream(features) # (batch, n_actions)
# Q(s,a) = V(s) + A(s,a) - mean_a(A(s,a))
q_values = value + advantage - advantage.mean(dim=1, keepdim=True)
return q_values
# ── Replay Buffer ─────────────────────────────────────────────────────────────
class ReplayBuffer:
def __init__(self, capacity: int = config.REPLAY_BUFFER_SIZE):
self.buffer = deque(maxlen=capacity)
def push(self, state, action, reward, next_state, done):
self.buffer.append((
np.array(state, dtype=np.float32),
int(action),
float(reward),
np.array(next_state, dtype=np.float32),
bool(done),
))
def sample(self, batch_size: int):
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return (
np.array(states),
np.array(actions),
np.array(rewards),
np.array(next_states),
np.array(dones, dtype=np.float32),
)
def __len__(self):
return len(self.buffer)
# ── DQN Agent ─────────────────────────────────────────────────────────────────
class DQNAgent:
def __init__(self, state_size: int,
n_actions: int = config.N_ACTIONS,
lr: float = config.LEARNING_RATE,
gamma: float = config.GAMMA,
eps_start: float = config.EPSILON_START,
eps_end: float = config.EPSILON_END,
eps_decay_frac: float = config.EPSILON_DECAY_FRAC,
buffer_size: int = config.REPLAY_BUFFER_SIZE,
batch_size: int = config.BATCH_SIZE,
target_update: int = config.TARGET_UPDATE_FREQ,
total_steps: int = 100_000):
self.n_actions = n_actions
self.gamma = gamma
self.batch_size = batch_size
self.target_update= target_update
self.steps_done = 0
# Epsilon schedule
self.eps_start = eps_start
self.eps_end = eps_end
self.eps_decay_steps = int(total_steps * eps_decay_frac)
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Online and target networks
self.online_net = DuelingDQN(state_size, n_actions).to(self.device)
self.target_net = DuelingDQN(state_size, n_actions).to(self.device)
self.target_net.load_state_dict(self.online_net.state_dict())
self.target_net.eval()
self.optimizer = optim.Adam(self.online_net.parameters(), lr=lr)
self.buffer = ReplayBuffer(buffer_size)
self.loss_fn = nn.SmoothL1Loss() # Huber loss β€” more stable than MSE
# ── Epsilon ───────────────────────────────────────────────────────────────
@property
def epsilon(self) -> float:
progress = min(1.0, self.steps_done / (self.eps_decay_steps + 1))
return self.eps_end + (self.eps_start - self.eps_end) * (1.0 - progress)
# ── Action selection ──────────────────────────────────────────────────────
def select_action(self, state: np.ndarray, greedy: bool = False) -> int:
if not greedy and random.random() < self.epsilon:
return random.randrange(self.n_actions)
with torch.no_grad():
s = torch.FloatTensor(state).unsqueeze(0).to(self.device)
return int(self.online_net(s).argmax(dim=1).item())
def q_values(self, state: np.ndarray) -> np.ndarray:
"""Return raw Q-values for all actions (for UI display)."""
with torch.no_grad():
s = torch.FloatTensor(state).unsqueeze(0).to(self.device)
return self.online_net(s).cpu().numpy().flatten()
# ── Learning ──────────────────────────────────────────────────────────────
def push(self, state, action, reward, next_state, done):
self.buffer.push(state, action, reward, next_state, done)
self.steps_done += 1
if self.steps_done % self.target_update == 0:
self._update_target()
def learn(self) -> float:
if len(self.buffer) < config.MIN_REPLAY_SIZE:
return 0.0
states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size)
states = torch.FloatTensor(states).to(self.device)
actions = torch.LongTensor(actions).to(self.device)
rewards = torch.FloatTensor(rewards).to(self.device)
next_states = torch.FloatTensor(next_states).to(self.device)
dones = torch.FloatTensor(dones).to(self.device)
# Current Q values
current_q = self.online_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)
# Target Q values (Double DQN: online selects action, target evaluates)
with torch.no_grad():
next_actions = self.online_net(next_states).argmax(dim=1)
next_q = self.target_net(next_states).gather(
1, next_actions.unsqueeze(1)).squeeze(1)
target_q = rewards + self.gamma * next_q * (1 - dones)
loss = self.loss_fn(current_q, target_q)
self.optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(self.online_net.parameters(), 10.0)
self.optimizer.step()
return float(loss.item())
def _update_target(self):
# FIX: soft Polyak update instead of hard copy
# target = TAU * online + (1-TAU) * target β€” much more stable training
for p_on, p_tgt in zip(self.online_net.parameters(),
self.target_net.parameters()):
p_tgt.data.copy_(
config.TAU * p_on.data + (1.0 - config.TAU) * p_tgt.data
)
# ── Persistence ───────────────────────────────────────────────────────────
def save(self, path: str):
os.makedirs(os.path.dirname(path) if os.path.dirname(path) else ".", exist_ok=True)
torch.save({
"online_net": self.online_net.state_dict(),
"target_net": self.target_net.state_dict(),
"optimizer": self.optimizer.state_dict(),
"steps_done": self.steps_done,
}, path)
def load(self, path: str):
if not os.path.exists(path):
raise FileNotFoundError(f"No weights at {path}")
ckpt = torch.load(path, map_location=self.device)
self.online_net.load_state_dict(ckpt["online_net"])
self.target_net.load_state_dict(ckpt["target_net"])
self.optimizer.load_state_dict(ckpt["optimizer"])
self.steps_done = ckpt.get("steps_done", 0)
self.online_net.eval()
self.target_net.eval()