DanielKiani's picture
Initial commit
7d2e753
import argparse
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
# Import all agent classes and the environment
from stable_baselines3 import PPO, SAC, TD3
from src.environment import PortfolioEnv
# --- Helper Functions ---
def evaluate_agent(env, model):
"""Runs a trained agent on a given environment."""
obs, info = env.reset()
terminated, truncated = False, False
portfolio_values = [env.initial_balance]
while not (terminated or truncated):
action, _states = model.predict(obs, deterministic=True)
obs, reward, terminated, truncated, info = env.step(action)
portfolio_values.append(info['portfolio_value'])
return pd.Series(portfolio_values, index=env.df.index[:len(portfolio_values)])
def buy_and_hold(df, initial_balance=10000):
"""Simulates the Buy and Hold strategy."""
n_assets = len(df.columns)
initial_investment_per_asset = initial_balance / n_assets
initial_prices = df.iloc[0]
shares = initial_investment_per_asset / initial_prices
portfolio_values = df.dot(shares)
return portfolio_values
def calculate_metrics(portfolio_values):
"""Calculates performance metrics from a portfolio value series."""
total_return = (portfolio_values.iloc[-1] / portfolio_values.iloc[0]) - 1
num_years = (portfolio_values.index[-1] - portfolio_values.index[0]).days / 365.25
cagr = (portfolio_values.iloc[-1] / portfolio_values.iloc[0]) ** (1/num_years) - 1 if num_years > 0 else 0
daily_returns = portfolio_values.pct_change().dropna()
sharpe_ratio = np.sqrt(252) * (daily_returns.mean() / daily_returns.std()) if daily_returns.std() != 0 else 0
rolling_max = portfolio_values.cummax()
daily_drawdown = portfolio_values / rolling_max - 1.0
max_drawdown = daily_drawdown.min()
return {
"Total Return": f"{total_return:.2%}", "CAGR": f"{cagr:.2%}",
"Sharpe Ratio": f"{sharpe_ratio:.2f}", "Max Drawdown": f"{max_drawdown:.2%}"
}
# --- Main Stress Test Function ---
def run_stress_test(datafile_path, ppo_path, sac_path, td3_path, output_path):
"""
Loads data and models, runs evaluations, and plots the comparison.
"""
print(f"--- Running Stress Test on {datafile_path} ---")
# 1. Load Data
try:
test_df = pd.read_csv(datafile_path, index_col='Date', parse_dates=True)
except FileNotFoundError:
print(f"❌ Error: Data file not found at {datafile_path}")
return
# Check for asset mismatch (e.g., 4 assets in 2008 data vs 5-asset models)
# The standard models were trained on 5 assets (e.g., shape = 30 * 5 = 150)
expected_assets = 5
if test_df.shape[1] != expected_assets:
print(f"⚠️ Warning: Models were trained on {expected_assets} assets, but this dataset has {test_df.shape[1]}.")
print("Skipping agent evaluation for this dataset.")
return
# 2. Define Models to Evaluate
models_to_evaluate = {
"PPO Agent": (PPO, ppo_path),
"SAC Agent": (SAC, sac_path),
"TD3 Agent": (TD3, td3_path)
}
portfolio_values = {}
metrics = {}
# 3. Run Evaluations
for name, (agent_type, model_path) in models_to_evaluate.items():
if os.path.exists(model_path):
print(f"--- Evaluating {name} ---")
model = agent_type.load(model_path)
env = PortfolioEnv(test_df)
portfolio_values[name] = evaluate_agent(env, model)
metrics[name] = calculate_metrics(portfolio_values[name])
else:
print(f"⚠️ Warning: Model file not found at {model_path}. Skipping.")
# Evaluate Buy and Hold Baseline
print("\n--- Evaluating Buy and Hold Baseline ---")
bnh_values = buy_and_hold(test_df)
portfolio_values["Buy and Hold"] = bnh_values
metrics["Buy and Hold"] = calculate_metrics(bnh_values)
# 4. Display Results
print("\n--- Stress Test Performance Metrics ---")
metrics_df = pd.DataFrame(metrics)
print(metrics_df)
# 5. Plotting
plt.style.use('seaborn-v0_8-darkgrid')
fig, ax = plt.subplots(figsize=(14, 8))
colors = {"PPO Agent": "red", "SAC Agent": "green", "TD3 Agent": "orange", "Buy and Hold": "blue"}
for name, values in portfolio_values.items():
ax.plot(values.index, values, label=name, color=colors.get(name, 'black'), linewidth=2)
plot_title = f"Agent Stress Test: {os.path.basename(datafile_path).replace('.csv', '')}"
ax.set_title(plot_title, fontsize=16)
ax.set_xlabel('Date', fontsize=12)
ax.set_ylabel('Portfolio Value ($)', fontsize=12)
ax.legend(fontsize=12)
formatter = FuncFormatter(lambda x, p: f'${x:,.0f}')
ax.yaxis.set_major_formatter(formatter)
plt.tight_layout()
plt.savefig(output_path)
print(f"\n✅ Plot saved to {output_path}")
plt.show()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Run a stress test on trained RL portfolio agents.")
parser.add_argument("--datafile", type=str, default="data/stress_test_2018.csv", help="Path to the market data CSV file for the test.")
parser.add_argument("--ppopath", type=str, default="checkpoints/ppo_portfolio_model.zip", help="Path to the trained PPO model.")
parser.add_argument("--sacpath", type=str, default="checkpoints/sac_portfolio_model.zip", help="Path to the trained SAC model.")
parser.add_argument("--td3path", type=str, default="checkpoints/td3_portfolio_model.zip", help="Path to the trained TD3 model.")
parser.add_argument("--output", type=str, default="results/stress_test_comparison.png", help="Path to save the output plot.")
args = parser.parse_args()
run_stress_test(
datafile_path=args.datafile,
ppo_path=args.ppopath,
sac_path=args.sacpath,
td3_path=args.td3path,
output_path=args.output
)