|
|
import argparse |
|
|
import os |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
from matplotlib.ticker import FuncFormatter |
|
|
|
|
|
|
|
|
from stable_baselines3 import PPO, SAC, TD3 |
|
|
from src.environment import PortfolioEnv |
|
|
|
|
|
|
|
|
def evaluate_agent(env, model): |
|
|
"""Runs a trained agent on a given environment.""" |
|
|
obs, info = env.reset() |
|
|
terminated, truncated = False, False |
|
|
portfolio_values = [env.initial_balance] |
|
|
while not (terminated or truncated): |
|
|
action, _states = model.predict(obs, deterministic=True) |
|
|
obs, reward, terminated, truncated, info = env.step(action) |
|
|
portfolio_values.append(info['portfolio_value']) |
|
|
return pd.Series(portfolio_values, index=env.df.index[:len(portfolio_values)]) |
|
|
|
|
|
def buy_and_hold(df, initial_balance=10000): |
|
|
"""Simulates the Buy and Hold strategy.""" |
|
|
n_assets = len(df.columns) |
|
|
initial_investment_per_asset = initial_balance / n_assets |
|
|
initial_prices = df.iloc[0] |
|
|
shares = initial_investment_per_asset / initial_prices |
|
|
portfolio_values = df.dot(shares) |
|
|
return portfolio_values |
|
|
|
|
|
def calculate_metrics(portfolio_values): |
|
|
"""Calculates performance metrics from a portfolio value series.""" |
|
|
total_return = (portfolio_values.iloc[-1] / portfolio_values.iloc[0]) - 1 |
|
|
num_years = (portfolio_values.index[-1] - portfolio_values.index[0]).days / 365.25 |
|
|
cagr = (portfolio_values.iloc[-1] / portfolio_values.iloc[0]) ** (1/num_years) - 1 if num_years > 0 else 0 |
|
|
daily_returns = portfolio_values.pct_change().dropna() |
|
|
sharpe_ratio = np.sqrt(252) * (daily_returns.mean() / daily_returns.std()) if daily_returns.std() != 0 else 0 |
|
|
rolling_max = portfolio_values.cummax() |
|
|
daily_drawdown = portfolio_values / rolling_max - 1.0 |
|
|
max_drawdown = daily_drawdown.min() |
|
|
return { |
|
|
"Total Return": f"{total_return:.2%}", "CAGR": f"{cagr:.2%}", |
|
|
"Sharpe Ratio": f"{sharpe_ratio:.2f}", "Max Drawdown": f"{max_drawdown:.2%}" |
|
|
} |
|
|
|
|
|
|
|
|
def run_stress_test(datafile_path, ppo_path, sac_path, td3_path, output_path): |
|
|
""" |
|
|
Loads data and models, runs evaluations, and plots the comparison. |
|
|
""" |
|
|
print(f"--- Running Stress Test on {datafile_path} ---") |
|
|
|
|
|
|
|
|
try: |
|
|
test_df = pd.read_csv(datafile_path, index_col='Date', parse_dates=True) |
|
|
except FileNotFoundError: |
|
|
print(f"❌ Error: Data file not found at {datafile_path}") |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
expected_assets = 5 |
|
|
if test_df.shape[1] != expected_assets: |
|
|
print(f"⚠️ Warning: Models were trained on {expected_assets} assets, but this dataset has {test_df.shape[1]}.") |
|
|
print("Skipping agent evaluation for this dataset.") |
|
|
return |
|
|
|
|
|
|
|
|
models_to_evaluate = { |
|
|
"PPO Agent": (PPO, ppo_path), |
|
|
"SAC Agent": (SAC, sac_path), |
|
|
"TD3 Agent": (TD3, td3_path) |
|
|
} |
|
|
|
|
|
portfolio_values = {} |
|
|
metrics = {} |
|
|
|
|
|
|
|
|
for name, (agent_type, model_path) in models_to_evaluate.items(): |
|
|
if os.path.exists(model_path): |
|
|
print(f"--- Evaluating {name} ---") |
|
|
model = agent_type.load(model_path) |
|
|
env = PortfolioEnv(test_df) |
|
|
portfolio_values[name] = evaluate_agent(env, model) |
|
|
metrics[name] = calculate_metrics(portfolio_values[name]) |
|
|
else: |
|
|
print(f"⚠️ Warning: Model file not found at {model_path}. Skipping.") |
|
|
|
|
|
|
|
|
print("\n--- Evaluating Buy and Hold Baseline ---") |
|
|
bnh_values = buy_and_hold(test_df) |
|
|
portfolio_values["Buy and Hold"] = bnh_values |
|
|
metrics["Buy and Hold"] = calculate_metrics(bnh_values) |
|
|
|
|
|
|
|
|
print("\n--- Stress Test Performance Metrics ---") |
|
|
metrics_df = pd.DataFrame(metrics) |
|
|
print(metrics_df) |
|
|
|
|
|
|
|
|
plt.style.use('seaborn-v0_8-darkgrid') |
|
|
fig, ax = plt.subplots(figsize=(14, 8)) |
|
|
|
|
|
colors = {"PPO Agent": "red", "SAC Agent": "green", "TD3 Agent": "orange", "Buy and Hold": "blue"} |
|
|
for name, values in portfolio_values.items(): |
|
|
ax.plot(values.index, values, label=name, color=colors.get(name, 'black'), linewidth=2) |
|
|
|
|
|
plot_title = f"Agent Stress Test: {os.path.basename(datafile_path).replace('.csv', '')}" |
|
|
ax.set_title(plot_title, fontsize=16) |
|
|
ax.set_xlabel('Date', fontsize=12) |
|
|
ax.set_ylabel('Portfolio Value ($)', fontsize=12) |
|
|
ax.legend(fontsize=12) |
|
|
|
|
|
formatter = FuncFormatter(lambda x, p: f'${x:,.0f}') |
|
|
ax.yaxis.set_major_formatter(formatter) |
|
|
|
|
|
plt.tight_layout() |
|
|
plt.savefig(output_path) |
|
|
print(f"\n✅ Plot saved to {output_path}") |
|
|
plt.show() |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
parser = argparse.ArgumentParser(description="Run a stress test on trained RL portfolio agents.") |
|
|
|
|
|
parser.add_argument("--datafile", type=str, default="data/stress_test_2018.csv", help="Path to the market data CSV file for the test.") |
|
|
parser.add_argument("--ppopath", type=str, default="checkpoints/ppo_portfolio_model.zip", help="Path to the trained PPO model.") |
|
|
parser.add_argument("--sacpath", type=str, default="checkpoints/sac_portfolio_model.zip", help="Path to the trained SAC model.") |
|
|
parser.add_argument("--td3path", type=str, default="checkpoints/td3_portfolio_model.zip", help="Path to the trained TD3 model.") |
|
|
parser.add_argument("--output", type=str, default="results/stress_test_comparison.png", help="Path to save the output plot.") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
run_stress_test( |
|
|
datafile_path=args.datafile, |
|
|
ppo_path=args.ppopath, |
|
|
sac_path=args.sacpath, |
|
|
td3_path=args.td3path, |
|
|
output_path=args.output |
|
|
) |