Spaces:
Running
Running
| # env.py | |
| # Custom Gym-style trading environment for the Dueling DQN agent. | |
| # State : flattened lookback window of technical indicators + macro | |
| # Actions: 0=CASH, 1..7 = ETF indices (maps to config.ACTIONS) | |
| # Reward : excess return over T-bill - transaction cost on switches, | |
| # scaled by inverse realised volatility to penalise drawdowns | |
| import numpy as np | |
| import pandas as pd | |
| import config | |
| class ETFTradingEnv: | |
| """ | |
| Single-asset-at-a-time ETF selection environment. | |
| Parameters | |
| ---------- | |
| feat_df : pd.DataFrame β full feature matrix (one row per trading day) | |
| price_df : pd.DataFrame β ETF close prices aligned to feat_df index | |
| macro_df : pd.DataFrame β macro data (for TBILL_3M fallback) | |
| start_idx : int β first index to start an episode from | |
| end_idx : int β last index (exclusive) | |
| fee_pct : float β one-way transaction cost as fraction (e.g. 0.001 = 10bps) | |
| lookback : int β window size fed as state | |
| tsl_pct : float β trailing stop loss % (applied post-signal in backtest) | |
| """ | |
| def __init__(self, | |
| feat_df: pd.DataFrame, | |
| price_df: pd.DataFrame, | |
| macro_df: pd.DataFrame, | |
| start_idx: int = 0, | |
| end_idx: int = None, | |
| fee_pct: float = config.DEFAULT_FEE_BPS / 10_000, | |
| lookback: int = config.LOOKBACK_WINDOW, | |
| tsl_pct: float = config.DEFAULT_TSL_PCT / 100): | |
| self.feat_df = feat_df.reset_index(drop=True) | |
| self.price_df = price_df.reindex(feat_df.index).reset_index(drop=True) | |
| self.macro_df = macro_df.reindex(feat_df.index).ffill().reset_index(drop=True) | |
| self.fee_pct = fee_pct | |
| self.lookback = lookback | |
| self.tsl_pct = tsl_pct | |
| self.n_actions = config.N_ACTIONS | |
| self.actions = config.ACTIONS | |
| self.start_idx = max(start_idx, lookback - 1) | |
| self.end_idx = end_idx if end_idx is not None else len(self.feat_df) - 1 | |
| self.n_features = feat_df.shape[1] * lookback # flattened state size | |
| self.reset() | |
| # ββ Gym-style interface βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def reset(self) -> np.ndarray: | |
| # FIX: randomise start within first 50% of window so agent sees diverse sequences | |
| max_rand = self.start_idx + (self.end_idx - self.start_idx) // 2 | |
| self.current_idx = int(np.random.randint(self.start_idx, max(self.start_idx + 1, max_rand))) | |
| self.held_action = 0 # start in CASH | |
| self.peak_equity = 1.0 | |
| self.equity = 1.0 | |
| self.is_stopped_out = False | |
| return self._get_state() | |
| def step(self, action: int): | |
| """ | |
| Execute one step. | |
| Returns (next_state, reward, done, info) | |
| """ | |
| assert 0 <= action < self.n_actions | |
| prev_idx = self.current_idx | |
| self.current_idx += 1 | |
| done = (self.current_idx >= self.end_idx) | |
| # ββ Transaction cost on switch ββββββββββββββββββββββββββββββββββββββββ | |
| switched = (action != self.held_action) | |
| t_cost = self.fee_pct if switched else 0.0 | |
| if switched: | |
| self.held_action = action | |
| self.peak_equity = self.equity # reset peak on new position | |
| # ββ Daily return of chosen action βββββββββββββββββββββββββββββββββββββ | |
| if action == 0: # CASH β earn T-bill | |
| tbill_rate = self._get_tbill(prev_idx) | |
| day_ret = tbill_rate / 252 | |
| else: | |
| etf = self.actions[action] | |
| if etf in self.price_df.columns: | |
| p0 = self.price_df[etf].iloc[prev_idx] | |
| p1 = self.price_df[etf].iloc[self.current_idx] | |
| day_ret = (p1 / (p0 + 1e-9)) - 1.0 | |
| else: | |
| day_ret = 0.0 | |
| day_ret -= t_cost | |
| self.equity *= (1.0 + day_ret) | |
| # ββ Trailing stop-loss check ββββββββββββββββββββββββββββββββββββββββββ | |
| if action != 0: | |
| if self.equity > self.peak_equity: | |
| self.peak_equity = self.equity | |
| if self.equity < self.peak_equity * (1 - self.tsl_pct): | |
| self.is_stopped_out = True | |
| # ββ Risk-adjusted reward ββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FIX: clamp vol so CASH (vol~0.005) doesn't get 30x reward amplification | |
| tbill_daily = self._get_tbill(prev_idx) / 252 | |
| excess_ret = day_ret - tbill_daily | |
| vol_21d = self._get_vol(action, prev_idx) | |
| vol_scale = float(np.clip(vol_21d, config.REWARD_VOL_MIN, config.REWARD_VOL_MAX)) | |
| reward = excess_ret / vol_scale | |
| # FIX: small bonus when ETF (not CASH) beats T-bill β discourage CASH collapse | |
| if action != 0 and excess_ret > 0: | |
| reward *= config.REWARD_ETF_BONUS | |
| next_state = self._get_state() | |
| info = dict(day_ret=day_ret, equity=self.equity, | |
| action_name=self.actions[action], | |
| tsl_triggered=self.is_stopped_out) | |
| return next_state, reward, done, info | |
| def observation_size(self) -> int: | |
| # FIX: +n_actions for one-hot position encoding appended to state | |
| return self.n_features + self.n_actions | |
| # ββ Internal helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _get_state(self) -> np.ndarray: | |
| """Return flattened lookback window + one-hot current position.""" | |
| start = self.current_idx - self.lookback + 1 | |
| end = self.current_idx + 1 | |
| window = self.feat_df.iloc[start:end].values.astype(np.float32) | |
| if len(window) < self.lookback: | |
| pad = np.zeros((self.lookback - len(window), self.feat_df.shape[1]), | |
| dtype=np.float32) | |
| window = np.vstack([pad, window]) | |
| # FIX: append one-hot current position so agent knows what it is holding | |
| position = np.zeros(self.n_actions, dtype=np.float32) | |
| position[self.held_action] = 1.0 | |
| return np.concatenate([window.flatten(), position]) | |
| def _get_tbill(self, idx: int) -> float: | |
| """Annual T-bill rate at given index (fraction).""" | |
| if "macro_TBILL_3M" in self.macro_df.columns: | |
| val = self.macro_df["macro_TBILL_3M"].iloc[idx] | |
| if not np.isnan(val): | |
| return float(val) / 100.0 | |
| return 0.036 # fallback 3.6% | |
| def _get_vol(self, action: int, idx: int) -> float: | |
| """21d annualised vol for scaling reward.""" | |
| if action == 0: | |
| return 0.005 # CASH ~ zero vol | |
| etf = self.actions[action] | |
| vol_col = f"{etf}_Vol21d" | |
| if vol_col in self.feat_df.columns: | |
| val = self.feat_df[vol_col].iloc[idx] | |
| if not np.isnan(val) and val > 0: | |
| return float(val) | |
| return 0.15 # fallback 15% vol | |
| # ββ Train / Val / Test splitter βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def make_splits(feat_df: pd.DataFrame, | |
| price_df: pd.DataFrame, | |
| macro_df: pd.DataFrame, | |
| start_year: int, | |
| fee_pct: float = config.DEFAULT_FEE_BPS / 10_000, | |
| lookback: int = config.LOOKBACK_WINDOW): | |
| """ | |
| Returns three ETFTradingEnv instances: train, val, test. | |
| Split is 80/10/10 of the date range from start_year onwards. | |
| """ | |
| # Filter by start year | |
| mask = feat_df.index.year >= start_year | |
| feat_sub = feat_df[mask].copy() | |
| n = len(feat_sub) | |
| n_train = int(n * config.TRAIN_SPLIT) | |
| n_val = int(n * config.VAL_SPLIT) | |
| train_env = ETFTradingEnv(feat_sub.iloc[:n_train], | |
| price_df, macro_df, | |
| fee_pct=fee_pct, lookback=lookback) | |
| val_env = ETFTradingEnv(feat_sub.iloc[n_train : n_train + n_val], | |
| price_df, macro_df, | |
| fee_pct=fee_pct, lookback=lookback) | |
| test_env = ETFTradingEnv(feat_sub.iloc[n_train + n_val:], | |
| price_df, macro_df, | |
| fee_pct=fee_pct, lookback=lookback) | |
| return train_env, val_env, test_env | |