P2SAMAPA's picture
[auto] Deploy to HF Space from GitHub
a47c2f1 verified
# env.py
# Custom Gym-style trading environment for the Dueling DQN agent.
# State : flattened lookback window of technical indicators + macro
# Actions: 0=CASH, 1..7 = ETF indices (maps to config.ACTIONS)
# Reward : excess return over T-bill - transaction cost on switches,
# scaled by inverse realised volatility to penalise drawdowns
import numpy as np
import pandas as pd
import config
class ETFTradingEnv:
"""
Single-asset-at-a-time ETF selection environment.
Parameters
----------
feat_df : pd.DataFrame β€” full feature matrix (one row per trading day)
price_df : pd.DataFrame β€” ETF close prices aligned to feat_df index
macro_df : pd.DataFrame β€” macro data (for TBILL_3M fallback)
start_idx : int β€” first index to start an episode from
end_idx : int β€” last index (exclusive)
fee_pct : float β€” one-way transaction cost as fraction (e.g. 0.001 = 10bps)
lookback : int β€” window size fed as state
tsl_pct : float β€” trailing stop loss % (applied post-signal in backtest)
"""
def __init__(self,
feat_df: pd.DataFrame,
price_df: pd.DataFrame,
macro_df: pd.DataFrame,
start_idx: int = 0,
end_idx: int = None,
fee_pct: float = config.DEFAULT_FEE_BPS / 10_000,
lookback: int = config.LOOKBACK_WINDOW,
tsl_pct: float = config.DEFAULT_TSL_PCT / 100):
self.feat_df = feat_df.reset_index(drop=True)
self.price_df = price_df.reindex(feat_df.index).reset_index(drop=True)
self.macro_df = macro_df.reindex(feat_df.index).ffill().reset_index(drop=True)
self.fee_pct = fee_pct
self.lookback = lookback
self.tsl_pct = tsl_pct
self.n_actions = config.N_ACTIONS
self.actions = config.ACTIONS
self.start_idx = max(start_idx, lookback - 1)
self.end_idx = end_idx if end_idx is not None else len(self.feat_df) - 1
self.n_features = feat_df.shape[1] * lookback # flattened state size
self.reset()
# ── Gym-style interface ───────────────────────────────────────────────────
def reset(self) -> np.ndarray:
# FIX: randomise start within first 50% of window so agent sees diverse sequences
max_rand = self.start_idx + (self.end_idx - self.start_idx) // 2
self.current_idx = int(np.random.randint(self.start_idx, max(self.start_idx + 1, max_rand)))
self.held_action = 0 # start in CASH
self.peak_equity = 1.0
self.equity = 1.0
self.is_stopped_out = False
return self._get_state()
def step(self, action: int):
"""
Execute one step.
Returns (next_state, reward, done, info)
"""
assert 0 <= action < self.n_actions
prev_idx = self.current_idx
self.current_idx += 1
done = (self.current_idx >= self.end_idx)
# ── Transaction cost on switch ────────────────────────────────────────
switched = (action != self.held_action)
t_cost = self.fee_pct if switched else 0.0
if switched:
self.held_action = action
self.peak_equity = self.equity # reset peak on new position
# ── Daily return of chosen action ─────────────────────────────────────
if action == 0: # CASH β€” earn T-bill
tbill_rate = self._get_tbill(prev_idx)
day_ret = tbill_rate / 252
else:
etf = self.actions[action]
if etf in self.price_df.columns:
p0 = self.price_df[etf].iloc[prev_idx]
p1 = self.price_df[etf].iloc[self.current_idx]
day_ret = (p1 / (p0 + 1e-9)) - 1.0
else:
day_ret = 0.0
day_ret -= t_cost
self.equity *= (1.0 + day_ret)
# ── Trailing stop-loss check ──────────────────────────────────────────
if action != 0:
if self.equity > self.peak_equity:
self.peak_equity = self.equity
if self.equity < self.peak_equity * (1 - self.tsl_pct):
self.is_stopped_out = True
# ── Risk-adjusted reward ──────────────────────────────────────────────
# FIX: clamp vol so CASH (vol~0.005) doesn't get 30x reward amplification
tbill_daily = self._get_tbill(prev_idx) / 252
excess_ret = day_ret - tbill_daily
vol_21d = self._get_vol(action, prev_idx)
vol_scale = float(np.clip(vol_21d, config.REWARD_VOL_MIN, config.REWARD_VOL_MAX))
reward = excess_ret / vol_scale
# FIX: small bonus when ETF (not CASH) beats T-bill β€” discourage CASH collapse
if action != 0 and excess_ret > 0:
reward *= config.REWARD_ETF_BONUS
next_state = self._get_state()
info = dict(day_ret=day_ret, equity=self.equity,
action_name=self.actions[action],
tsl_triggered=self.is_stopped_out)
return next_state, reward, done, info
@property
def observation_size(self) -> int:
# FIX: +n_actions for one-hot position encoding appended to state
return self.n_features + self.n_actions
# ── Internal helpers ──────────────────────────────────────────────────────
def _get_state(self) -> np.ndarray:
"""Return flattened lookback window + one-hot current position."""
start = self.current_idx - self.lookback + 1
end = self.current_idx + 1
window = self.feat_df.iloc[start:end].values.astype(np.float32)
if len(window) < self.lookback:
pad = np.zeros((self.lookback - len(window), self.feat_df.shape[1]),
dtype=np.float32)
window = np.vstack([pad, window])
# FIX: append one-hot current position so agent knows what it is holding
position = np.zeros(self.n_actions, dtype=np.float32)
position[self.held_action] = 1.0
return np.concatenate([window.flatten(), position])
def _get_tbill(self, idx: int) -> float:
"""Annual T-bill rate at given index (fraction)."""
if "macro_TBILL_3M" in self.macro_df.columns:
val = self.macro_df["macro_TBILL_3M"].iloc[idx]
if not np.isnan(val):
return float(val) / 100.0
return 0.036 # fallback 3.6%
def _get_vol(self, action: int, idx: int) -> float:
"""21d annualised vol for scaling reward."""
if action == 0:
return 0.005 # CASH ~ zero vol
etf = self.actions[action]
vol_col = f"{etf}_Vol21d"
if vol_col in self.feat_df.columns:
val = self.feat_df[vol_col].iloc[idx]
if not np.isnan(val) and val > 0:
return float(val)
return 0.15 # fallback 15% vol
# ── Train / Val / Test splitter ───────────────────────────────────────────────
def make_splits(feat_df: pd.DataFrame,
price_df: pd.DataFrame,
macro_df: pd.DataFrame,
start_year: int,
fee_pct: float = config.DEFAULT_FEE_BPS / 10_000,
lookback: int = config.LOOKBACK_WINDOW):
"""
Returns three ETFTradingEnv instances: train, val, test.
Split is 80/10/10 of the date range from start_year onwards.
"""
# Filter by start year
mask = feat_df.index.year >= start_year
feat_sub = feat_df[mask].copy()
n = len(feat_sub)
n_train = int(n * config.TRAIN_SPLIT)
n_val = int(n * config.VAL_SPLIT)
train_env = ETFTradingEnv(feat_sub.iloc[:n_train],
price_df, macro_df,
fee_pct=fee_pct, lookback=lookback)
val_env = ETFTradingEnv(feat_sub.iloc[n_train : n_train + n_val],
price_df, macro_df,
fee_pct=fee_pct, lookback=lookback)
test_env = ETFTradingEnv(feat_sub.iloc[n_train + n_val:],
price_df, macro_df,
fee_pct=fee_pct, lookback=lookback)
return train_env, val_env, test_env