File size: 4,856 Bytes
349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 349ad65 7d2e753 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# src/environment.py (This is the CORRECT version for 8 features)
import gymnasium as gym
import numpy as np
import pandas as pd
from gymnasium import spaces
class PortfolioEnv(gym.Env):
"""
A custom environment for portfolio management that includes macroeconomic data.
"""
metadata = {'render_modes': ['human']}
def __init__(self, df, window_size=30, initial_balance=10000, transaction_cost_pct=0.001):
super(PortfolioEnv, self).__init__()
# --- Data Handling ---
self.df = df
self.window_size = window_size
self.initial_balance = initial_balance
self.transaction_cost_pct = transaction_cost_pct
# --- IMPORTANT: Define asset and macro columns ---
self.asset_columns = ['AAPL', 'BTC-USD', 'MSFT', 'SPY', 'TLT']
self.macro_columns = ['Federal Funds Rate', 'CPI', 'VIX']
self.n_assets = len(self.asset_columns)
self.n_macro_features = len(self.macro_columns)
# --- This is the attribute that was missing ---
self.n_features_per_step = self.n_assets + self.n_macro_features # Should be 8
# --- Action Space ---
self.action_space = spaces.Box(
low=-1, high=1, shape=(self.n_assets + 1,), dtype=np.float32
)
# --- Observation Space ---
# Shape: (window_size * total_features) = (30 * 8) = 240
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf,
shape=(self.window_size * self.n_features_per_step,),
dtype=np.float32
)
# --- Internal State ---
self._current_step = 0
self._portfolio_value = 0
self._weights = np.zeros(self.n_assets + 1)
# Separate dataframes for prices and macro for easier handling
self.price_df = self.df[self.asset_columns]
self.macro_df = self.df[self.macro_columns]
def reset(self, seed=None):
super().reset(seed=seed)
self._current_step = self.window_size
self._portfolio_value = self.initial_balance
self._weights = np.zeros(self.n_assets + 1)
self._weights[-1] = 1.0 # 100% in cash
observation = self._get_obs()
info = self._get_info()
return observation, info
def step(self, action):
current_portfolio_value = self._portfolio_value
target_weights = np.exp(action) / np.sum(np.exp(action)) # Softmax
current_asset_values = self._weights[:-1] * current_portfolio_value
target_asset_values = target_weights[:-1] * current_portfolio_value
trades = target_asset_values - current_asset_values
transaction_costs = np.sum(np.abs(trades)) * self.transaction_cost_pct
self._balance = current_portfolio_value - transaction_costs
self._weights = target_weights
self._current_step += 1
current_prices = self.price_df.iloc[self._current_step - 1].values
next_prices = self.price_df.iloc[self._current_step].values
price_ratio = next_prices / (current_prices + 1e-8) # Add epsilon for safety
asset_values_after_price_change = (self._weights[:-1] * self._balance) * price_ratio
new_portfolio_value = np.sum(asset_values_after_price_change) + (self._weights[-1] * self._balance)
self._portfolio_value = new_portfolio_value
reward = np.log(new_portfolio_value / (current_portfolio_value + 1e-8)) # Add epsilon
terminated = bool(self._portfolio_value <= self.initial_balance * 0.5)
truncated = self._current_step >= len(self.df) - 1
observation = self._get_obs()
info = self._get_info()
return observation, reward, terminated, truncated, info
def _get_obs(self):
"""
Gets the observation for the current time step.
This includes a window of prices AND a window of macro data.
"""
price_window = self.price_df.iloc[self._current_step - self.window_size : self._current_step].values
macro_window = self.macro_df.iloc[self._current_step - self.window_size : self._current_step].values
# Normalize the price window (relative changes)
normalized_price_window = price_window / (price_window[0] + 1e-8)
# Normalize the macro window
normalized_macro_window = macro_window / (macro_window[0] + 1e-8)
# Combine the normalized windows
observation_window = np.concatenate([normalized_price_window, normalized_macro_window], axis=1)
# Flatten into a 1D vector
return observation_window.flatten().astype(np.float32)
def _get_info(self):
return {
'step': self._current_step,
'portfolio_value': self._portfolio_value,
'weights': self._weights,
}
def render(self, mode='human'):
pass
def close(self):
pass |