|
|
|
|
|
|
|
|
import gymnasium as gym |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from gymnasium import spaces |
|
|
|
|
|
class PortfolioEnv(gym.Env): |
|
|
""" |
|
|
A custom environment for portfolio management that includes macroeconomic data. |
|
|
""" |
|
|
metadata = {'render_modes': ['human']} |
|
|
|
|
|
def __init__(self, df, window_size=30, initial_balance=10000, transaction_cost_pct=0.001): |
|
|
super(PortfolioEnv, self).__init__() |
|
|
|
|
|
|
|
|
self.df = df |
|
|
self.window_size = window_size |
|
|
self.initial_balance = initial_balance |
|
|
self.transaction_cost_pct = transaction_cost_pct |
|
|
|
|
|
|
|
|
self.asset_columns = ['AAPL', 'BTC-USD', 'MSFT', 'SPY', 'TLT'] |
|
|
self.macro_columns = ['Federal Funds Rate', 'CPI', 'VIX'] |
|
|
|
|
|
self.n_assets = len(self.asset_columns) |
|
|
self.n_macro_features = len(self.macro_columns) |
|
|
|
|
|
|
|
|
self.n_features_per_step = self.n_assets + self.n_macro_features |
|
|
|
|
|
|
|
|
self.action_space = spaces.Box( |
|
|
low=-1, high=1, shape=(self.n_assets + 1,), dtype=np.float32 |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
self.observation_space = spaces.Box( |
|
|
low=-np.inf, high=np.inf, |
|
|
shape=(self.window_size * self.n_features_per_step,), |
|
|
dtype=np.float32 |
|
|
) |
|
|
|
|
|
|
|
|
self._current_step = 0 |
|
|
self._portfolio_value = 0 |
|
|
self._weights = np.zeros(self.n_assets + 1) |
|
|
|
|
|
|
|
|
self.price_df = self.df[self.asset_columns] |
|
|
self.macro_df = self.df[self.macro_columns] |
|
|
|
|
|
def reset(self, seed=None): |
|
|
super().reset(seed=seed) |
|
|
self._current_step = self.window_size |
|
|
self._portfolio_value = self.initial_balance |
|
|
|
|
|
self._weights = np.zeros(self.n_assets + 1) |
|
|
self._weights[-1] = 1.0 |
|
|
|
|
|
observation = self._get_obs() |
|
|
info = self._get_info() |
|
|
return observation, info |
|
|
|
|
|
def step(self, action): |
|
|
current_portfolio_value = self._portfolio_value |
|
|
|
|
|
target_weights = np.exp(action) / np.sum(np.exp(action)) |
|
|
|
|
|
current_asset_values = self._weights[:-1] * current_portfolio_value |
|
|
target_asset_values = target_weights[:-1] * current_portfolio_value |
|
|
trades = target_asset_values - current_asset_values |
|
|
transaction_costs = np.sum(np.abs(trades)) * self.transaction_cost_pct |
|
|
|
|
|
self._balance = current_portfolio_value - transaction_costs |
|
|
self._weights = target_weights |
|
|
|
|
|
self._current_step += 1 |
|
|
|
|
|
current_prices = self.price_df.iloc[self._current_step - 1].values |
|
|
next_prices = self.price_df.iloc[self._current_step].values |
|
|
|
|
|
price_ratio = next_prices / (current_prices + 1e-8) |
|
|
|
|
|
asset_values_after_price_change = (self._weights[:-1] * self._balance) * price_ratio |
|
|
new_portfolio_value = np.sum(asset_values_after_price_change) + (self._weights[-1] * self._balance) |
|
|
self._portfolio_value = new_portfolio_value |
|
|
|
|
|
reward = np.log(new_portfolio_value / (current_portfolio_value + 1e-8)) |
|
|
|
|
|
terminated = bool(self._portfolio_value <= self.initial_balance * 0.5) |
|
|
truncated = self._current_step >= len(self.df) - 1 |
|
|
|
|
|
observation = self._get_obs() |
|
|
info = self._get_info() |
|
|
|
|
|
return observation, reward, terminated, truncated, info |
|
|
|
|
|
def _get_obs(self): |
|
|
""" |
|
|
Gets the observation for the current time step. |
|
|
This includes a window of prices AND a window of macro data. |
|
|
""" |
|
|
price_window = self.price_df.iloc[self._current_step - self.window_size : self._current_step].values |
|
|
macro_window = self.macro_df.iloc[self._current_step - self.window_size : self._current_step].values |
|
|
|
|
|
|
|
|
normalized_price_window = price_window / (price_window[0] + 1e-8) |
|
|
|
|
|
|
|
|
normalized_macro_window = macro_window / (macro_window[0] + 1e-8) |
|
|
|
|
|
|
|
|
observation_window = np.concatenate([normalized_price_window, normalized_macro_window], axis=1) |
|
|
|
|
|
|
|
|
return observation_window.flatten().astype(np.float32) |
|
|
|
|
|
def _get_info(self): |
|
|
return { |
|
|
'step': self._current_step, |
|
|
'portfolio_value': self._portfolio_value, |
|
|
'weights': self._weights, |
|
|
} |
|
|
|
|
|
def render(self, mode='human'): |
|
|
pass |
|
|
|
|
|
def close(self): |
|
|
pass |