File size: 22,894 Bytes
76317bb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 | # -*- coding: utf-8 -*-
"""risk_analysis.py
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/10u2Di5_droisNYuq_KYAmdgVHixe6oVi
"""
# risk_analysis.py
# Functions for calculating risk metrics and correlations.
import pandas as pd
import numpy as np
import traceback
import plotly.graph_objects as go
from utils import create_empty_figure # Import helper
def get_drawdown_table(returns: pd.Series, top: int = 5) -> pd.DataFrame:
"""
Calculates drawdown periods and statistics from a series of returns.
Args:
returns: Series of daily returns with a DatetimeIndex.
top: Number of top drawdowns (by magnitude) to return.
Returns:
DataFrame containing information about the top drawdown periods:
'Peak Date', 'Valley Date', 'End Date', 'Duration (Days)', 'Max Drawdown (%)'.
Returns an empty DataFrame if input is invalid or no drawdowns occur.
"""
# Input validation
if returns is None or not isinstance(returns, pd.Series) or returns.empty:
# print("Drawdown calculation skipped: Input returns series is invalid or empty.")
return pd.DataFrame()
if not isinstance(returns.index, pd.DatetimeIndex):
# print("Drawdown calculation skipped: Input returns series index is not DatetimeIndex.")
return pd.DataFrame()
# Create a DataFrame from the returns series
df = returns.to_frame(name='returns')
# Ensure returns are numeric, drop non-numeric values
df['returns'] = pd.to_numeric(df['returns'], errors='coerce')
df.dropna(subset=['returns'], inplace=True)
if df.empty:
# print("Drawdown calculation skipped: No valid numeric returns.")
return pd.DataFrame()
# Calculate cumulative returns (compounded)
df['Cumulative'] = (1 + df['returns']).cumprod()
# Calculate the running maximum cumulative return (high watermark)
df['HighWatermark'] = df['Cumulative'].cummax()
# Calculate drawdown as the percentage decline from the high watermark
df['Drawdown'] = (df['Cumulative'] / df['HighWatermark']) - 1
# Identify drawdown periods
in_drawdown = False # Flag to track if currently in a drawdown
periods = [] # List to store completed drawdown period dictionaries
current_period = {} # Dictionary to store details of the ongoing drawdown
peak_idx = df.index[0] # Initialize peak index to the start
for idx, row in df.iterrows():
# Update the peak index if a new high watermark is reached
# Use .loc for safe index-based comparison, especially with potential duplicate indices
if row['Cumulative'] >= df.loc[peak_idx, 'Cumulative']:
peak_idx = idx
is_dd = row['Drawdown'] < 0 # Check if currently in a drawdown state
# Start of a new drawdown period
if not in_drawdown and is_dd:
in_drawdown = True
current_period = {
'Peak Date': peak_idx, # Date the drawdown started (previous peak)
'Valley Date': idx, # Date the maximum drawdown was reached (initially the start)
'End Date': pd.NaT, # Date the drawdown ended (recovered to peak) - initially NaT
'Max Drawdown (%)': row['Drawdown'], # The maximum drawdown percentage (initially the current DD)
'Duration (Days)': 0 # Duration of the drawdown - calculated at the end
}
# Inside an ongoing drawdown period
elif in_drawdown:
# Update valley date and max drawdown if a lower point is reached
if row['Drawdown'] < current_period['Max Drawdown (%)']:
current_period['Valley Date'] = idx
current_period['Max Drawdown (%)'] = row['Drawdown']
# End of the current drawdown period (recovered)
if not is_dd: # Recovered when Drawdown is no longer negative (or zero)
in_drawdown = False
current_period['End Date'] = idx # Mark the recovery date
# Calculate duration (using business days if possible, else calendar days)
start_date = current_period['Peak Date']
end_date = current_period['End Date']
if pd.notna(start_date) and pd.notna(end_date):
try:
# Attempt to use business days for duration
duration = len(pd.bdate_range(start=start_date, end=end_date))
except Exception: # Fallback to calendar days if bdate_range fails (e.g., non-standard dates)
duration = (end_date - start_date).days + 1 # Inclusive of start/end day
current_period['Duration (Days)'] = duration
else:
current_period['Duration (Days)'] = np.nan # Duration is NaN if dates are invalid
periods.append(current_period) # Add the completed period to the list
current_period = {} # Reset for the next potential drawdown
# Handle the case where the series ends while still in a drawdown
if in_drawdown:
start_date = current_period['Peak Date']
end_date = df.index[-1] # End date is the last date in the series
if pd.notna(start_date) and pd.notna(end_date):
try:
duration = len(pd.bdate_range(start=start_date, end=end_date))
except Exception:
duration = (end_date - start_date).days + 1
current_period['Duration (Days)'] = duration
else:
current_period['Duration (Days)'] = np.nan
# 'End Date' remains NaT as recovery hasn't happened by the end of the data
periods.append(current_period)
# If no drawdown periods were identified
if not periods:
return pd.DataFrame()
# Create DataFrame from the identified periods
drawdown_df = pd.DataFrame(periods)
# Sort by the magnitude of the drawdown (most negative first) and select the top N
drawdown_df = drawdown_df.sort_values(by='Max Drawdown (%)', ascending=True).head(top)
# Format the Max Drawdown column as percentage
drawdown_df['Max Drawdown (%)'] = drawdown_df['Max Drawdown (%)'].map('{:.2%}'.format)
# Format date columns to YYYY-MM-DD strings for display
for col in ['Peak Date', 'Valley Date', 'End Date']:
if col in drawdown_df.columns:
# Ensure conversion to datetime first, then format
drawdown_df[col] = pd.to_datetime(drawdown_df[col]).dt.strftime('%Y-%m-%d')
# Select and order columns for the final output table
cols_to_select = ['Peak Date', 'Valley Date', 'End Date', 'Duration (Days)', 'Max Drawdown (%)']
# Ensure only existing columns are selected (e.g., 'End Date' might be all NaT if never recovered)
existing_cols = [col for col in cols_to_select if col in drawdown_df.columns]
return drawdown_df[existing_cols]
def calculate_manual_risk_stats(returns_series):
"""
Calculates various risk and performance metrics manually using pandas based on daily returns.
Args:
returns_series: A pandas Series of daily percentage returns with a DatetimeIndex.
Returns:
A dictionary containing:
- monthly_returns_table_for_heatmap: DataFrame pivoted for monthly return heatmap (values as percentages).
- monthly_perf_stats: DataFrame with summary stats for monthly returns.
- rolling_vol_df: DataFrame containing rolling annualized volatility calculations (with 'Time' column).
- rolling_vol_stats: DataFrame summarizing min/max/mean rolling volatility.
- drawdown_table: DataFrame with top drawdown periods (from get_drawdown_table).
- status: A string indicating the status of the analysis.
"""
# Initialize results dictionary with default empty structures
analysis_results = {
"monthly_returns_table_for_heatmap": pd.DataFrame(),
"monthly_perf_stats": pd.DataFrame(columns=['Metric', 'Value']),
"rolling_vol_df": pd.DataFrame(),
"rolling_vol_stats": pd.DataFrame(columns=['Window', 'Min Vol', 'Max Vol', 'Mean Vol']),
"drawdown_table": pd.DataFrame(),
"status": "Analysis skipped." # Default status
}
# --- Input Validation ---
if returns_series is None or not isinstance(returns_series, pd.Series) or returns_series.empty or len(returns_series) < 2:
analysis_results["status"] = "Analysis skipped: Insufficient/invalid returns data."
return analysis_results
if not isinstance(returns_series.index, pd.DatetimeIndex):
analysis_results["status"] = "Analysis skipped: Returns index is not DatetimeIndex."
return analysis_results
try:
status_parts = [] # To collect status messages for different parts
# Ensure returns are numeric and index is UTC DatetimeIndex
returns_series = pd.to_numeric(returns_series, errors='coerce').dropna()
if returns_series.empty or len(returns_series) < 2:
analysis_results["status"] = "Analysis skipped: No valid numeric returns after cleaning."
return analysis_results
if returns_series.index.tz is None:
returns_series = returns_series.tz_localize('UTC')
elif returns_series.index.tz != 'UTC':
returns_series = returns_series.tz_convert('UTC')
# --- Monthly Returns Analysis ---
# Resample daily returns to monthly, calculating compounded monthly return
# The lambda function calculates (1+r1)*(1+r2)*...*(1+rn) - 1 for each month
monthly_rets = returns_series.resample('M').apply(lambda x: (1 + x).prod() - 1)
if not monthly_rets.empty:
# Create table for heatmap: Year rows, Month columns
monthly_ret_table_df = pd.DataFrame({'returns': monthly_rets})
monthly_ret_table_df['Year'] = monthly_ret_table_df.index.year
monthly_ret_table_df['Month'] = monthly_ret_table_df.index.strftime('%b') # Month abbreviation (Jan, Feb, ...)
# Pivot the table
monthly_heatmap_data = monthly_ret_table_df.pivot_table(index='Year', columns='Month', values='returns')
# Order columns chronologically
month_order = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
present_months = [m for m in month_order if m in monthly_heatmap_data.columns]
monthly_heatmap_data = monthly_heatmap_data[present_months]
# Sort index (Year) ascending
monthly_heatmap_data.sort_index(ascending=True, inplace=True)
# Store as percentages for the heatmap plot
analysis_results["monthly_returns_table_for_heatmap"] = monthly_heatmap_data * 100
# Monthly Performance Statistics
monthly_stats = {
"Min": f"{monthly_rets.min():.2%}",
"Max": f"{monthly_rets.max():.2%}",
"Mean": f"{monthly_rets.mean():.2%}",
"Positive Months": (monthly_rets > 0).sum(),
"Negative Months": (monthly_rets <= 0).sum()
}
analysis_results["monthly_perf_stats"] = pd.DataFrame(list(monthly_stats.items()), columns=['Metric', 'Value'])
status_parts.append("Monthly stats OK.")
else:
status_parts.append("Monthly stats skipped (no monthly data).")
# --- Rolling Volatility Analysis ---
vol_df = pd.DataFrame(index=returns_series.index) # Initialize DF to store rolling vol results
vol_stats_list = [] # List to store summary stats for each window
# Define windows (label: number of trading days)
windows = {'3M': 63, '6M': 126, '12M': 252}
vol_calculated = False
for label, window in windows.items():
# Check if there's enough data for the window
if len(returns_series) >= window:
try:
# Calculate rolling standard deviation
# min_periods ensures calculation starts even if window isn't full yet (adjust as needed)
rolling_std = returns_series.rolling(window=window, min_periods=window // 2).std()
# Annualize the volatility (multiply by sqrt of trading days per year)
rolling_vol = rolling_std * np.sqrt(252)
# Store the result in the DataFrame
vol_df[f'vol_{label}'] = rolling_vol
# Calculate summary stats for this window's volatility
if not rolling_vol.dropna().empty: # Check if there are valid vol values
vol_stats_list.append({
"Window": label,
"Min Vol": f"{rolling_vol.min():.2%}",
"Max Vol": f"{rolling_vol.max():.2%}",
"Mean Vol": f"{rolling_vol.mean():.2%}"
})
vol_calculated = True
except Exception as vol_e:
print(f"Error calculating rolling volatility for window {label}: {vol_e}")
status_parts.append(f"Rolling Vol ({label}) Error.")
# Store the rolling volatility DataFrame (reset index to get 'Time' column for plotting)
if not vol_df.empty:
analysis_results["rolling_vol_df"] = vol_df.reset_index()
# Store the summary statistics if any were calculated
if vol_stats_list:
analysis_results["rolling_vol_stats"] = pd.DataFrame(vol_stats_list)
status_parts.append("Rolling Vol OK.")
elif not vol_calculated and "Error" not in " ".join(status_parts): # If no vol calculated and no errors reported
status_parts.append("Rolling Vol skipped (insufficient data for windows).")
# --- Drawdown Table Calculation ---
try:
analysis_results["drawdown_table"] = get_drawdown_table(returns_series, top=5)
if not analysis_results["drawdown_table"].empty:
status_parts.append("Drawdown Table OK.")
else:
status_parts.append("Drawdown Table: No drawdowns found or error.")
except Exception as dd_e:
print(f"Error calculating drawdown table: {dd_e}")
traceback.print_exc()
status_parts.append("Drawdown Table Error.")
# --- Final Status ---
analysis_results["status"] = " ".join(status_parts) if status_parts else "Analysis completed (no specific issues)."
except Exception as e:
# Catch-all for any unexpected error during the entire analysis
error_msg = f"Error during manual risk analysis: {e}"
print(error_msg)
traceback.print_exc()
analysis_results["status"] = f"Manual risk analysis failed: {e}"
return analysis_results
def calculate_correlation(all_results):
"""
Calculates the correlation matrix for the daily returns of multiple strategies
and optionally includes the benchmark.
Args:
all_results: A dictionary where keys are strategy filenames and values are
the result dictionaries obtained from process_single_file.
These results should contain 'equity_df' and optionally 'benchmark_df'.
Returns:
A tuple containing:
- correlation_matrix: DataFrame of the Pearson correlation coefficients.
- heatmap_fig: Plotly heatmap figure of the correlation matrix.
- corr_status: String message indicating the status of the correlation calculation.
"""
# Default outputs
default_corr_matrix = pd.DataFrame()
default_heatmap = create_empty_figure("Correlation Heatmap (Insufficient Data)")
corr_status = "Correlation analysis skipped."
equity_data_all = {} # Dictionary to store equity series {filename: Series}
benchmark_data = None # To store the first valid benchmark series found
valid_strategies_count = 0 # Count strategies with valid equity data
# --- Extract Equity and Benchmark Data ---
for filename, results in all_results.items():
if results.get("error"): # Skip files that had processing errors
print(f"Skipping {filename} for correlation due to processing error.")
continue
equity_df = results.get("equity_df") # DataFrame with 'Time', 'Equity'
bench_df = results.get("benchmark_df") # DataFrame with 'Time', 'Benchmark'
# Check for valid equity data
if equity_df is not None and not equity_df.empty and \
'Time' in equity_df.columns and 'Equity' in equity_df.columns and \
pd.api.types.is_datetime64_any_dtype(equity_df['Time']):
# Set 'Time' as index, select 'Equity', remove duplicate indices
df_eq = equity_df.set_index('Time')['Equity']
df_eq = df_eq[~df_eq.index.duplicated(keep='first')]
# Ensure index is UTC
if df_eq.index.tz is None: df_eq = df_eq.tz_localize('UTC')
elif df_eq.index.tz != 'UTC': df_eq = df_eq.tz_convert('UTC')
if not df_eq.empty:
equity_data_all[filename] = df_eq
valid_strategies_count += 1
# Try to grab the benchmark data from the *first* strategy that has it
if benchmark_data is None and bench_df is not None and not bench_df.empty and \
'Time' in bench_df.columns and 'Benchmark' in bench_df.columns and \
pd.api.types.is_datetime64_any_dtype(bench_df['Time']):
df_b = bench_df.set_index('Time')['Benchmark']
df_b = df_b[~df_b.index.duplicated(keep='first')]
# Ensure index is UTC
if df_b.index.tz is None: df_b = df_b.tz_localize('UTC')
elif df_b.index.tz != 'UTC': df_b = df_b.tz_convert('UTC')
if not df_b.empty:
benchmark_data = df_b
print(f"Using benchmark data from {filename} for correlation.")
else:
print(f"Skipping {filename} for correlation: Invalid or empty equity_df or Time column.")
# --- Check if enough data for correlation ---
# Need at least 1 strategy for correlation (against itself or benchmark)
# Need at least 2 strategies if no benchmark is available
if valid_strategies_count == 0:
corr_status = "Correlation skipped: No valid strategy equity data found."
return default_corr_matrix, default_heatmap, corr_status
if valid_strategies_count == 1 and benchmark_data is None:
corr_status = "Correlation skipped: Only one strategy and no benchmark data."
# Return the single equity series maybe? Or just empty. Empty is safer.
return default_corr_matrix, default_heatmap, corr_status
# --- Combine Data and Calculate Returns ---
# Combine all valid equity series into a single DataFrame
combined_equity = pd.concat(equity_data_all, axis=1, join='outer') # Use outer join to keep all dates
# Add benchmark data if available
if benchmark_data is not None:
combined_equity['Benchmark'] = benchmark_data
# Sort by index (Time)
combined_equity = combined_equity.sort_index()
# Forward-fill missing values (common for aligning different start/end dates)
# Consider alternatives like backward fill or interpolation if ffill isn't appropriate
combined_equity_filled = combined_equity.ffill()
# Calculate daily percentage returns
daily_returns = combined_equity_filled.pct_change()
# Handle potential infinite values resulting from division by zero (e.g., price was 0)
daily_returns.replace([np.inf, -np.inf], np.nan, inplace=True)
# Drop rows with any NaN values (typically the first row after pct_change, and any rows affected by NaNs)
daily_returns.dropna(inplace=True)
# Check if enough overlapping data remains after cleaning
if daily_returns.empty or len(daily_returns) < 2:
corr_status = "Correlation skipped: Not enough overlapping daily data points after cleaning."
return default_corr_matrix, default_heatmap, corr_status
# --- Calculate Correlation Matrix ---
try:
correlation_matrix = daily_returns.corr(method='pearson') # Can change method if needed ('kendall', 'spearman')
corr_status = f"Correlation calculated for {valid_strategies_count} strategies"
if benchmark_data is not None:
corr_status += " and Benchmark."
else:
corr_status += "."
except Exception as corr_e:
print(f"Error calculating correlation matrix: {corr_e}")
traceback.print_exc()
corr_status = f"Correlation calculation failed: {corr_e}"
return default_corr_matrix, default_heatmap, corr_status
# --- Generate Correlation Heatmap Figure ---
heatmap_fig = create_empty_figure("Correlation Heatmap") # Default empty
try:
heatmap_fig = go.Figure(data=go.Heatmap(
z=correlation_matrix.values,
x=correlation_matrix.columns,
y=correlation_matrix.columns,
colorscale='RdBu', # Red-Blue diverging scale is good for correlation
zmin=-1, zmax=1, # Set scale limits to -1 and 1
colorbar=dict(title='Correlation')
))
heatmap_fig.update_layout(
title='Strategy (+Benchmark) Daily Return Correlation',
xaxis_tickangle=-45, # Angle labels for better readability if many strategies
yaxis_autorange='reversed' # Often preferred for matrices
)
# Add text annotations (correlation values) to the heatmap cells
for i in range(len(correlation_matrix.columns)):
for j in range(len(correlation_matrix.columns)):
corr_value = correlation_matrix.iloc[i, j]
if pd.notna(corr_value):
# Choose text color based on background intensity for better contrast
text_color = "white" if abs(corr_value) > 0.5 else "black"
heatmap_fig.add_annotation(
x=correlation_matrix.columns[j],
y=correlation_matrix.columns[i],
text=f"{corr_value:.2f}", # Format to 2 decimal places
showarrow=False,
font=dict(color=text_color)
)
except Exception as e:
print(f"Error creating correlation heatmap figure: {e}")
traceback.print_exc()
heatmap_fig = create_empty_figure("Error Creating Correlation Heatmap") # Update title on error
return correlation_matrix, heatmap_fig, corr_status |