Quant_Connect_JSON_analysis / risk_analysis.py
RazHadas's picture
Upload 6 files
76317bb verified
# -*- coding: utf-8 -*-
"""risk_analysis.py
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/10u2Di5_droisNYuq_KYAmdgVHixe6oVi
"""
# risk_analysis.py
# Functions for calculating risk metrics and correlations.
import pandas as pd
import numpy as np
import traceback
import plotly.graph_objects as go
from utils import create_empty_figure # Import helper
def get_drawdown_table(returns: pd.Series, top: int = 5) -> pd.DataFrame:
"""
Calculates drawdown periods and statistics from a series of returns.
Args:
returns: Series of daily returns with a DatetimeIndex.
top: Number of top drawdowns (by magnitude) to return.
Returns:
DataFrame containing information about the top drawdown periods:
'Peak Date', 'Valley Date', 'End Date', 'Duration (Days)', 'Max Drawdown (%)'.
Returns an empty DataFrame if input is invalid or no drawdowns occur.
"""
# Input validation
if returns is None or not isinstance(returns, pd.Series) or returns.empty:
# print("Drawdown calculation skipped: Input returns series is invalid or empty.")
return pd.DataFrame()
if not isinstance(returns.index, pd.DatetimeIndex):
# print("Drawdown calculation skipped: Input returns series index is not DatetimeIndex.")
return pd.DataFrame()
# Create a DataFrame from the returns series
df = returns.to_frame(name='returns')
# Ensure returns are numeric, drop non-numeric values
df['returns'] = pd.to_numeric(df['returns'], errors='coerce')
df.dropna(subset=['returns'], inplace=True)
if df.empty:
# print("Drawdown calculation skipped: No valid numeric returns.")
return pd.DataFrame()
# Calculate cumulative returns (compounded)
df['Cumulative'] = (1 + df['returns']).cumprod()
# Calculate the running maximum cumulative return (high watermark)
df['HighWatermark'] = df['Cumulative'].cummax()
# Calculate drawdown as the percentage decline from the high watermark
df['Drawdown'] = (df['Cumulative'] / df['HighWatermark']) - 1
# Identify drawdown periods
in_drawdown = False # Flag to track if currently in a drawdown
periods = [] # List to store completed drawdown period dictionaries
current_period = {} # Dictionary to store details of the ongoing drawdown
peak_idx = df.index[0] # Initialize peak index to the start
for idx, row in df.iterrows():
# Update the peak index if a new high watermark is reached
# Use .loc for safe index-based comparison, especially with potential duplicate indices
if row['Cumulative'] >= df.loc[peak_idx, 'Cumulative']:
peak_idx = idx
is_dd = row['Drawdown'] < 0 # Check if currently in a drawdown state
# Start of a new drawdown period
if not in_drawdown and is_dd:
in_drawdown = True
current_period = {
'Peak Date': peak_idx, # Date the drawdown started (previous peak)
'Valley Date': idx, # Date the maximum drawdown was reached (initially the start)
'End Date': pd.NaT, # Date the drawdown ended (recovered to peak) - initially NaT
'Max Drawdown (%)': row['Drawdown'], # The maximum drawdown percentage (initially the current DD)
'Duration (Days)': 0 # Duration of the drawdown - calculated at the end
}
# Inside an ongoing drawdown period
elif in_drawdown:
# Update valley date and max drawdown if a lower point is reached
if row['Drawdown'] < current_period['Max Drawdown (%)']:
current_period['Valley Date'] = idx
current_period['Max Drawdown (%)'] = row['Drawdown']
# End of the current drawdown period (recovered)
if not is_dd: # Recovered when Drawdown is no longer negative (or zero)
in_drawdown = False
current_period['End Date'] = idx # Mark the recovery date
# Calculate duration (using business days if possible, else calendar days)
start_date = current_period['Peak Date']
end_date = current_period['End Date']
if pd.notna(start_date) and pd.notna(end_date):
try:
# Attempt to use business days for duration
duration = len(pd.bdate_range(start=start_date, end=end_date))
except Exception: # Fallback to calendar days if bdate_range fails (e.g., non-standard dates)
duration = (end_date - start_date).days + 1 # Inclusive of start/end day
current_period['Duration (Days)'] = duration
else:
current_period['Duration (Days)'] = np.nan # Duration is NaN if dates are invalid
periods.append(current_period) # Add the completed period to the list
current_period = {} # Reset for the next potential drawdown
# Handle the case where the series ends while still in a drawdown
if in_drawdown:
start_date = current_period['Peak Date']
end_date = df.index[-1] # End date is the last date in the series
if pd.notna(start_date) and pd.notna(end_date):
try:
duration = len(pd.bdate_range(start=start_date, end=end_date))
except Exception:
duration = (end_date - start_date).days + 1
current_period['Duration (Days)'] = duration
else:
current_period['Duration (Days)'] = np.nan
# 'End Date' remains NaT as recovery hasn't happened by the end of the data
periods.append(current_period)
# If no drawdown periods were identified
if not periods:
return pd.DataFrame()
# Create DataFrame from the identified periods
drawdown_df = pd.DataFrame(periods)
# Sort by the magnitude of the drawdown (most negative first) and select the top N
drawdown_df = drawdown_df.sort_values(by='Max Drawdown (%)', ascending=True).head(top)
# Format the Max Drawdown column as percentage
drawdown_df['Max Drawdown (%)'] = drawdown_df['Max Drawdown (%)'].map('{:.2%}'.format)
# Format date columns to YYYY-MM-DD strings for display
for col in ['Peak Date', 'Valley Date', 'End Date']:
if col in drawdown_df.columns:
# Ensure conversion to datetime first, then format
drawdown_df[col] = pd.to_datetime(drawdown_df[col]).dt.strftime('%Y-%m-%d')
# Select and order columns for the final output table
cols_to_select = ['Peak Date', 'Valley Date', 'End Date', 'Duration (Days)', 'Max Drawdown (%)']
# Ensure only existing columns are selected (e.g., 'End Date' might be all NaT if never recovered)
existing_cols = [col for col in cols_to_select if col in drawdown_df.columns]
return drawdown_df[existing_cols]
def calculate_manual_risk_stats(returns_series):
"""
Calculates various risk and performance metrics manually using pandas based on daily returns.
Args:
returns_series: A pandas Series of daily percentage returns with a DatetimeIndex.
Returns:
A dictionary containing:
- monthly_returns_table_for_heatmap: DataFrame pivoted for monthly return heatmap (values as percentages).
- monthly_perf_stats: DataFrame with summary stats for monthly returns.
- rolling_vol_df: DataFrame containing rolling annualized volatility calculations (with 'Time' column).
- rolling_vol_stats: DataFrame summarizing min/max/mean rolling volatility.
- drawdown_table: DataFrame with top drawdown periods (from get_drawdown_table).
- status: A string indicating the status of the analysis.
"""
# Initialize results dictionary with default empty structures
analysis_results = {
"monthly_returns_table_for_heatmap": pd.DataFrame(),
"monthly_perf_stats": pd.DataFrame(columns=['Metric', 'Value']),
"rolling_vol_df": pd.DataFrame(),
"rolling_vol_stats": pd.DataFrame(columns=['Window', 'Min Vol', 'Max Vol', 'Mean Vol']),
"drawdown_table": pd.DataFrame(),
"status": "Analysis skipped." # Default status
}
# --- Input Validation ---
if returns_series is None or not isinstance(returns_series, pd.Series) or returns_series.empty or len(returns_series) < 2:
analysis_results["status"] = "Analysis skipped: Insufficient/invalid returns data."
return analysis_results
if not isinstance(returns_series.index, pd.DatetimeIndex):
analysis_results["status"] = "Analysis skipped: Returns index is not DatetimeIndex."
return analysis_results
try:
status_parts = [] # To collect status messages for different parts
# Ensure returns are numeric and index is UTC DatetimeIndex
returns_series = pd.to_numeric(returns_series, errors='coerce').dropna()
if returns_series.empty or len(returns_series) < 2:
analysis_results["status"] = "Analysis skipped: No valid numeric returns after cleaning."
return analysis_results
if returns_series.index.tz is None:
returns_series = returns_series.tz_localize('UTC')
elif returns_series.index.tz != 'UTC':
returns_series = returns_series.tz_convert('UTC')
# --- Monthly Returns Analysis ---
# Resample daily returns to monthly, calculating compounded monthly return
# The lambda function calculates (1+r1)*(1+r2)*...*(1+rn) - 1 for each month
monthly_rets = returns_series.resample('M').apply(lambda x: (1 + x).prod() - 1)
if not monthly_rets.empty:
# Create table for heatmap: Year rows, Month columns
monthly_ret_table_df = pd.DataFrame({'returns': monthly_rets})
monthly_ret_table_df['Year'] = monthly_ret_table_df.index.year
monthly_ret_table_df['Month'] = monthly_ret_table_df.index.strftime('%b') # Month abbreviation (Jan, Feb, ...)
# Pivot the table
monthly_heatmap_data = monthly_ret_table_df.pivot_table(index='Year', columns='Month', values='returns')
# Order columns chronologically
month_order = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
present_months = [m for m in month_order if m in monthly_heatmap_data.columns]
monthly_heatmap_data = monthly_heatmap_data[present_months]
# Sort index (Year) ascending
monthly_heatmap_data.sort_index(ascending=True, inplace=True)
# Store as percentages for the heatmap plot
analysis_results["monthly_returns_table_for_heatmap"] = monthly_heatmap_data * 100
# Monthly Performance Statistics
monthly_stats = {
"Min": f"{monthly_rets.min():.2%}",
"Max": f"{monthly_rets.max():.2%}",
"Mean": f"{monthly_rets.mean():.2%}",
"Positive Months": (monthly_rets > 0).sum(),
"Negative Months": (monthly_rets <= 0).sum()
}
analysis_results["monthly_perf_stats"] = pd.DataFrame(list(monthly_stats.items()), columns=['Metric', 'Value'])
status_parts.append("Monthly stats OK.")
else:
status_parts.append("Monthly stats skipped (no monthly data).")
# --- Rolling Volatility Analysis ---
vol_df = pd.DataFrame(index=returns_series.index) # Initialize DF to store rolling vol results
vol_stats_list = [] # List to store summary stats for each window
# Define windows (label: number of trading days)
windows = {'3M': 63, '6M': 126, '12M': 252}
vol_calculated = False
for label, window in windows.items():
# Check if there's enough data for the window
if len(returns_series) >= window:
try:
# Calculate rolling standard deviation
# min_periods ensures calculation starts even if window isn't full yet (adjust as needed)
rolling_std = returns_series.rolling(window=window, min_periods=window // 2).std()
# Annualize the volatility (multiply by sqrt of trading days per year)
rolling_vol = rolling_std * np.sqrt(252)
# Store the result in the DataFrame
vol_df[f'vol_{label}'] = rolling_vol
# Calculate summary stats for this window's volatility
if not rolling_vol.dropna().empty: # Check if there are valid vol values
vol_stats_list.append({
"Window": label,
"Min Vol": f"{rolling_vol.min():.2%}",
"Max Vol": f"{rolling_vol.max():.2%}",
"Mean Vol": f"{rolling_vol.mean():.2%}"
})
vol_calculated = True
except Exception as vol_e:
print(f"Error calculating rolling volatility for window {label}: {vol_e}")
status_parts.append(f"Rolling Vol ({label}) Error.")
# Store the rolling volatility DataFrame (reset index to get 'Time' column for plotting)
if not vol_df.empty:
analysis_results["rolling_vol_df"] = vol_df.reset_index()
# Store the summary statistics if any were calculated
if vol_stats_list:
analysis_results["rolling_vol_stats"] = pd.DataFrame(vol_stats_list)
status_parts.append("Rolling Vol OK.")
elif not vol_calculated and "Error" not in " ".join(status_parts): # If no vol calculated and no errors reported
status_parts.append("Rolling Vol skipped (insufficient data for windows).")
# --- Drawdown Table Calculation ---
try:
analysis_results["drawdown_table"] = get_drawdown_table(returns_series, top=5)
if not analysis_results["drawdown_table"].empty:
status_parts.append("Drawdown Table OK.")
else:
status_parts.append("Drawdown Table: No drawdowns found or error.")
except Exception as dd_e:
print(f"Error calculating drawdown table: {dd_e}")
traceback.print_exc()
status_parts.append("Drawdown Table Error.")
# --- Final Status ---
analysis_results["status"] = " ".join(status_parts) if status_parts else "Analysis completed (no specific issues)."
except Exception as e:
# Catch-all for any unexpected error during the entire analysis
error_msg = f"Error during manual risk analysis: {e}"
print(error_msg)
traceback.print_exc()
analysis_results["status"] = f"Manual risk analysis failed: {e}"
return analysis_results
def calculate_correlation(all_results):
"""
Calculates the correlation matrix for the daily returns of multiple strategies
and optionally includes the benchmark.
Args:
all_results: A dictionary where keys are strategy filenames and values are
the result dictionaries obtained from process_single_file.
These results should contain 'equity_df' and optionally 'benchmark_df'.
Returns:
A tuple containing:
- correlation_matrix: DataFrame of the Pearson correlation coefficients.
- heatmap_fig: Plotly heatmap figure of the correlation matrix.
- corr_status: String message indicating the status of the correlation calculation.
"""
# Default outputs
default_corr_matrix = pd.DataFrame()
default_heatmap = create_empty_figure("Correlation Heatmap (Insufficient Data)")
corr_status = "Correlation analysis skipped."
equity_data_all = {} # Dictionary to store equity series {filename: Series}
benchmark_data = None # To store the first valid benchmark series found
valid_strategies_count = 0 # Count strategies with valid equity data
# --- Extract Equity and Benchmark Data ---
for filename, results in all_results.items():
if results.get("error"): # Skip files that had processing errors
print(f"Skipping {filename} for correlation due to processing error.")
continue
equity_df = results.get("equity_df") # DataFrame with 'Time', 'Equity'
bench_df = results.get("benchmark_df") # DataFrame with 'Time', 'Benchmark'
# Check for valid equity data
if equity_df is not None and not equity_df.empty and \
'Time' in equity_df.columns and 'Equity' in equity_df.columns and \
pd.api.types.is_datetime64_any_dtype(equity_df['Time']):
# Set 'Time' as index, select 'Equity', remove duplicate indices
df_eq = equity_df.set_index('Time')['Equity']
df_eq = df_eq[~df_eq.index.duplicated(keep='first')]
# Ensure index is UTC
if df_eq.index.tz is None: df_eq = df_eq.tz_localize('UTC')
elif df_eq.index.tz != 'UTC': df_eq = df_eq.tz_convert('UTC')
if not df_eq.empty:
equity_data_all[filename] = df_eq
valid_strategies_count += 1
# Try to grab the benchmark data from the *first* strategy that has it
if benchmark_data is None and bench_df is not None and not bench_df.empty and \
'Time' in bench_df.columns and 'Benchmark' in bench_df.columns and \
pd.api.types.is_datetime64_any_dtype(bench_df['Time']):
df_b = bench_df.set_index('Time')['Benchmark']
df_b = df_b[~df_b.index.duplicated(keep='first')]
# Ensure index is UTC
if df_b.index.tz is None: df_b = df_b.tz_localize('UTC')
elif df_b.index.tz != 'UTC': df_b = df_b.tz_convert('UTC')
if not df_b.empty:
benchmark_data = df_b
print(f"Using benchmark data from {filename} for correlation.")
else:
print(f"Skipping {filename} for correlation: Invalid or empty equity_df or Time column.")
# --- Check if enough data for correlation ---
# Need at least 1 strategy for correlation (against itself or benchmark)
# Need at least 2 strategies if no benchmark is available
if valid_strategies_count == 0:
corr_status = "Correlation skipped: No valid strategy equity data found."
return default_corr_matrix, default_heatmap, corr_status
if valid_strategies_count == 1 and benchmark_data is None:
corr_status = "Correlation skipped: Only one strategy and no benchmark data."
# Return the single equity series maybe? Or just empty. Empty is safer.
return default_corr_matrix, default_heatmap, corr_status
# --- Combine Data and Calculate Returns ---
# Combine all valid equity series into a single DataFrame
combined_equity = pd.concat(equity_data_all, axis=1, join='outer') # Use outer join to keep all dates
# Add benchmark data if available
if benchmark_data is not None:
combined_equity['Benchmark'] = benchmark_data
# Sort by index (Time)
combined_equity = combined_equity.sort_index()
# Forward-fill missing values (common for aligning different start/end dates)
# Consider alternatives like backward fill or interpolation if ffill isn't appropriate
combined_equity_filled = combined_equity.ffill()
# Calculate daily percentage returns
daily_returns = combined_equity_filled.pct_change()
# Handle potential infinite values resulting from division by zero (e.g., price was 0)
daily_returns.replace([np.inf, -np.inf], np.nan, inplace=True)
# Drop rows with any NaN values (typically the first row after pct_change, and any rows affected by NaNs)
daily_returns.dropna(inplace=True)
# Check if enough overlapping data remains after cleaning
if daily_returns.empty or len(daily_returns) < 2:
corr_status = "Correlation skipped: Not enough overlapping daily data points after cleaning."
return default_corr_matrix, default_heatmap, corr_status
# --- Calculate Correlation Matrix ---
try:
correlation_matrix = daily_returns.corr(method='pearson') # Can change method if needed ('kendall', 'spearman')
corr_status = f"Correlation calculated for {valid_strategies_count} strategies"
if benchmark_data is not None:
corr_status += " and Benchmark."
else:
corr_status += "."
except Exception as corr_e:
print(f"Error calculating correlation matrix: {corr_e}")
traceback.print_exc()
corr_status = f"Correlation calculation failed: {corr_e}"
return default_corr_matrix, default_heatmap, corr_status
# --- Generate Correlation Heatmap Figure ---
heatmap_fig = create_empty_figure("Correlation Heatmap") # Default empty
try:
heatmap_fig = go.Figure(data=go.Heatmap(
z=correlation_matrix.values,
x=correlation_matrix.columns,
y=correlation_matrix.columns,
colorscale='RdBu', # Red-Blue diverging scale is good for correlation
zmin=-1, zmax=1, # Set scale limits to -1 and 1
colorbar=dict(title='Correlation')
))
heatmap_fig.update_layout(
title='Strategy (+Benchmark) Daily Return Correlation',
xaxis_tickangle=-45, # Angle labels for better readability if many strategies
yaxis_autorange='reversed' # Often preferred for matrices
)
# Add text annotations (correlation values) to the heatmap cells
for i in range(len(correlation_matrix.columns)):
for j in range(len(correlation_matrix.columns)):
corr_value = correlation_matrix.iloc[i, j]
if pd.notna(corr_value):
# Choose text color based on background intensity for better contrast
text_color = "white" if abs(corr_value) > 0.5 else "black"
heatmap_fig.add_annotation(
x=correlation_matrix.columns[j],
y=correlation_matrix.columns[i],
text=f"{corr_value:.2f}", # Format to 2 decimal places
showarrow=False,
font=dict(color=text_color)
)
except Exception as e:
print(f"Error creating correlation heatmap figure: {e}")
traceback.print_exc()
heatmap_fig = create_empty_figure("Error Creating Correlation Heatmap") # Update title on error
return correlation_matrix, heatmap_fig, corr_status