# -*- coding: utf-8 -*- """risk_analysis.py Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/10u2Di5_droisNYuq_KYAmdgVHixe6oVi """ # risk_analysis.py # Functions for calculating risk metrics and correlations. import pandas as pd import numpy as np import traceback import plotly.graph_objects as go from utils import create_empty_figure # Import helper def get_drawdown_table(returns: pd.Series, top: int = 5) -> pd.DataFrame: """ Calculates drawdown periods and statistics from a series of returns. Args: returns: Series of daily returns with a DatetimeIndex. top: Number of top drawdowns (by magnitude) to return. Returns: DataFrame containing information about the top drawdown periods: 'Peak Date', 'Valley Date', 'End Date', 'Duration (Days)', 'Max Drawdown (%)'. Returns an empty DataFrame if input is invalid or no drawdowns occur. """ # Input validation if returns is None or not isinstance(returns, pd.Series) or returns.empty: # print("Drawdown calculation skipped: Input returns series is invalid or empty.") return pd.DataFrame() if not isinstance(returns.index, pd.DatetimeIndex): # print("Drawdown calculation skipped: Input returns series index is not DatetimeIndex.") return pd.DataFrame() # Create a DataFrame from the returns series df = returns.to_frame(name='returns') # Ensure returns are numeric, drop non-numeric values df['returns'] = pd.to_numeric(df['returns'], errors='coerce') df.dropna(subset=['returns'], inplace=True) if df.empty: # print("Drawdown calculation skipped: No valid numeric returns.") return pd.DataFrame() # Calculate cumulative returns (compounded) df['Cumulative'] = (1 + df['returns']).cumprod() # Calculate the running maximum cumulative return (high watermark) df['HighWatermark'] = df['Cumulative'].cummax() # Calculate drawdown as the percentage decline from the high watermark df['Drawdown'] = (df['Cumulative'] / df['HighWatermark']) - 1 # Identify drawdown periods in_drawdown = False # Flag to track if currently in a drawdown periods = [] # List to store completed drawdown period dictionaries current_period = {} # Dictionary to store details of the ongoing drawdown peak_idx = df.index[0] # Initialize peak index to the start for idx, row in df.iterrows(): # Update the peak index if a new high watermark is reached # Use .loc for safe index-based comparison, especially with potential duplicate indices if row['Cumulative'] >= df.loc[peak_idx, 'Cumulative']: peak_idx = idx is_dd = row['Drawdown'] < 0 # Check if currently in a drawdown state # Start of a new drawdown period if not in_drawdown and is_dd: in_drawdown = True current_period = { 'Peak Date': peak_idx, # Date the drawdown started (previous peak) 'Valley Date': idx, # Date the maximum drawdown was reached (initially the start) 'End Date': pd.NaT, # Date the drawdown ended (recovered to peak) - initially NaT 'Max Drawdown (%)': row['Drawdown'], # The maximum drawdown percentage (initially the current DD) 'Duration (Days)': 0 # Duration of the drawdown - calculated at the end } # Inside an ongoing drawdown period elif in_drawdown: # Update valley date and max drawdown if a lower point is reached if row['Drawdown'] < current_period['Max Drawdown (%)']: current_period['Valley Date'] = idx current_period['Max Drawdown (%)'] = row['Drawdown'] # End of the current drawdown period (recovered) if not is_dd: # Recovered when Drawdown is no longer negative (or zero) in_drawdown = False current_period['End Date'] = idx # Mark the recovery date # Calculate duration (using business days if possible, else calendar days) start_date = current_period['Peak Date'] end_date = current_period['End Date'] if pd.notna(start_date) and pd.notna(end_date): try: # Attempt to use business days for duration duration = len(pd.bdate_range(start=start_date, end=end_date)) except Exception: # Fallback to calendar days if bdate_range fails (e.g., non-standard dates) duration = (end_date - start_date).days + 1 # Inclusive of start/end day current_period['Duration (Days)'] = duration else: current_period['Duration (Days)'] = np.nan # Duration is NaN if dates are invalid periods.append(current_period) # Add the completed period to the list current_period = {} # Reset for the next potential drawdown # Handle the case where the series ends while still in a drawdown if in_drawdown: start_date = current_period['Peak Date'] end_date = df.index[-1] # End date is the last date in the series if pd.notna(start_date) and pd.notna(end_date): try: duration = len(pd.bdate_range(start=start_date, end=end_date)) except Exception: duration = (end_date - start_date).days + 1 current_period['Duration (Days)'] = duration else: current_period['Duration (Days)'] = np.nan # 'End Date' remains NaT as recovery hasn't happened by the end of the data periods.append(current_period) # If no drawdown periods were identified if not periods: return pd.DataFrame() # Create DataFrame from the identified periods drawdown_df = pd.DataFrame(periods) # Sort by the magnitude of the drawdown (most negative first) and select the top N drawdown_df = drawdown_df.sort_values(by='Max Drawdown (%)', ascending=True).head(top) # Format the Max Drawdown column as percentage drawdown_df['Max Drawdown (%)'] = drawdown_df['Max Drawdown (%)'].map('{:.2%}'.format) # Format date columns to YYYY-MM-DD strings for display for col in ['Peak Date', 'Valley Date', 'End Date']: if col in drawdown_df.columns: # Ensure conversion to datetime first, then format drawdown_df[col] = pd.to_datetime(drawdown_df[col]).dt.strftime('%Y-%m-%d') # Select and order columns for the final output table cols_to_select = ['Peak Date', 'Valley Date', 'End Date', 'Duration (Days)', 'Max Drawdown (%)'] # Ensure only existing columns are selected (e.g., 'End Date' might be all NaT if never recovered) existing_cols = [col for col in cols_to_select if col in drawdown_df.columns] return drawdown_df[existing_cols] def calculate_manual_risk_stats(returns_series): """ Calculates various risk and performance metrics manually using pandas based on daily returns. Args: returns_series: A pandas Series of daily percentage returns with a DatetimeIndex. Returns: A dictionary containing: - monthly_returns_table_for_heatmap: DataFrame pivoted for monthly return heatmap (values as percentages). - monthly_perf_stats: DataFrame with summary stats for monthly returns. - rolling_vol_df: DataFrame containing rolling annualized volatility calculations (with 'Time' column). - rolling_vol_stats: DataFrame summarizing min/max/mean rolling volatility. - drawdown_table: DataFrame with top drawdown periods (from get_drawdown_table). - status: A string indicating the status of the analysis. """ # Initialize results dictionary with default empty structures analysis_results = { "monthly_returns_table_for_heatmap": pd.DataFrame(), "monthly_perf_stats": pd.DataFrame(columns=['Metric', 'Value']), "rolling_vol_df": pd.DataFrame(), "rolling_vol_stats": pd.DataFrame(columns=['Window', 'Min Vol', 'Max Vol', 'Mean Vol']), "drawdown_table": pd.DataFrame(), "status": "Analysis skipped." # Default status } # --- Input Validation --- if returns_series is None or not isinstance(returns_series, pd.Series) or returns_series.empty or len(returns_series) < 2: analysis_results["status"] = "Analysis skipped: Insufficient/invalid returns data." return analysis_results if not isinstance(returns_series.index, pd.DatetimeIndex): analysis_results["status"] = "Analysis skipped: Returns index is not DatetimeIndex." return analysis_results try: status_parts = [] # To collect status messages for different parts # Ensure returns are numeric and index is UTC DatetimeIndex returns_series = pd.to_numeric(returns_series, errors='coerce').dropna() if returns_series.empty or len(returns_series) < 2: analysis_results["status"] = "Analysis skipped: No valid numeric returns after cleaning." return analysis_results if returns_series.index.tz is None: returns_series = returns_series.tz_localize('UTC') elif returns_series.index.tz != 'UTC': returns_series = returns_series.tz_convert('UTC') # --- Monthly Returns Analysis --- # Resample daily returns to monthly, calculating compounded monthly return # The lambda function calculates (1+r1)*(1+r2)*...*(1+rn) - 1 for each month monthly_rets = returns_series.resample('M').apply(lambda x: (1 + x).prod() - 1) if not monthly_rets.empty: # Create table for heatmap: Year rows, Month columns monthly_ret_table_df = pd.DataFrame({'returns': monthly_rets}) monthly_ret_table_df['Year'] = monthly_ret_table_df.index.year monthly_ret_table_df['Month'] = monthly_ret_table_df.index.strftime('%b') # Month abbreviation (Jan, Feb, ...) # Pivot the table monthly_heatmap_data = monthly_ret_table_df.pivot_table(index='Year', columns='Month', values='returns') # Order columns chronologically month_order = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] present_months = [m for m in month_order if m in monthly_heatmap_data.columns] monthly_heatmap_data = monthly_heatmap_data[present_months] # Sort index (Year) ascending monthly_heatmap_data.sort_index(ascending=True, inplace=True) # Store as percentages for the heatmap plot analysis_results["monthly_returns_table_for_heatmap"] = monthly_heatmap_data * 100 # Monthly Performance Statistics monthly_stats = { "Min": f"{monthly_rets.min():.2%}", "Max": f"{monthly_rets.max():.2%}", "Mean": f"{monthly_rets.mean():.2%}", "Positive Months": (monthly_rets > 0).sum(), "Negative Months": (monthly_rets <= 0).sum() } analysis_results["monthly_perf_stats"] = pd.DataFrame(list(monthly_stats.items()), columns=['Metric', 'Value']) status_parts.append("Monthly stats OK.") else: status_parts.append("Monthly stats skipped (no monthly data).") # --- Rolling Volatility Analysis --- vol_df = pd.DataFrame(index=returns_series.index) # Initialize DF to store rolling vol results vol_stats_list = [] # List to store summary stats for each window # Define windows (label: number of trading days) windows = {'3M': 63, '6M': 126, '12M': 252} vol_calculated = False for label, window in windows.items(): # Check if there's enough data for the window if len(returns_series) >= window: try: # Calculate rolling standard deviation # min_periods ensures calculation starts even if window isn't full yet (adjust as needed) rolling_std = returns_series.rolling(window=window, min_periods=window // 2).std() # Annualize the volatility (multiply by sqrt of trading days per year) rolling_vol = rolling_std * np.sqrt(252) # Store the result in the DataFrame vol_df[f'vol_{label}'] = rolling_vol # Calculate summary stats for this window's volatility if not rolling_vol.dropna().empty: # Check if there are valid vol values vol_stats_list.append({ "Window": label, "Min Vol": f"{rolling_vol.min():.2%}", "Max Vol": f"{rolling_vol.max():.2%}", "Mean Vol": f"{rolling_vol.mean():.2%}" }) vol_calculated = True except Exception as vol_e: print(f"Error calculating rolling volatility for window {label}: {vol_e}") status_parts.append(f"Rolling Vol ({label}) Error.") # Store the rolling volatility DataFrame (reset index to get 'Time' column for plotting) if not vol_df.empty: analysis_results["rolling_vol_df"] = vol_df.reset_index() # Store the summary statistics if any were calculated if vol_stats_list: analysis_results["rolling_vol_stats"] = pd.DataFrame(vol_stats_list) status_parts.append("Rolling Vol OK.") elif not vol_calculated and "Error" not in " ".join(status_parts): # If no vol calculated and no errors reported status_parts.append("Rolling Vol skipped (insufficient data for windows).") # --- Drawdown Table Calculation --- try: analysis_results["drawdown_table"] = get_drawdown_table(returns_series, top=5) if not analysis_results["drawdown_table"].empty: status_parts.append("Drawdown Table OK.") else: status_parts.append("Drawdown Table: No drawdowns found or error.") except Exception as dd_e: print(f"Error calculating drawdown table: {dd_e}") traceback.print_exc() status_parts.append("Drawdown Table Error.") # --- Final Status --- analysis_results["status"] = " ".join(status_parts) if status_parts else "Analysis completed (no specific issues)." except Exception as e: # Catch-all for any unexpected error during the entire analysis error_msg = f"Error during manual risk analysis: {e}" print(error_msg) traceback.print_exc() analysis_results["status"] = f"Manual risk analysis failed: {e}" return analysis_results def calculate_correlation(all_results): """ Calculates the correlation matrix for the daily returns of multiple strategies and optionally includes the benchmark. Args: all_results: A dictionary where keys are strategy filenames and values are the result dictionaries obtained from process_single_file. These results should contain 'equity_df' and optionally 'benchmark_df'. Returns: A tuple containing: - correlation_matrix: DataFrame of the Pearson correlation coefficients. - heatmap_fig: Plotly heatmap figure of the correlation matrix. - corr_status: String message indicating the status of the correlation calculation. """ # Default outputs default_corr_matrix = pd.DataFrame() default_heatmap = create_empty_figure("Correlation Heatmap (Insufficient Data)") corr_status = "Correlation analysis skipped." equity_data_all = {} # Dictionary to store equity series {filename: Series} benchmark_data = None # To store the first valid benchmark series found valid_strategies_count = 0 # Count strategies with valid equity data # --- Extract Equity and Benchmark Data --- for filename, results in all_results.items(): if results.get("error"): # Skip files that had processing errors print(f"Skipping {filename} for correlation due to processing error.") continue equity_df = results.get("equity_df") # DataFrame with 'Time', 'Equity' bench_df = results.get("benchmark_df") # DataFrame with 'Time', 'Benchmark' # Check for valid equity data if equity_df is not None and not equity_df.empty and \ 'Time' in equity_df.columns and 'Equity' in equity_df.columns and \ pd.api.types.is_datetime64_any_dtype(equity_df['Time']): # Set 'Time' as index, select 'Equity', remove duplicate indices df_eq = equity_df.set_index('Time')['Equity'] df_eq = df_eq[~df_eq.index.duplicated(keep='first')] # Ensure index is UTC if df_eq.index.tz is None: df_eq = df_eq.tz_localize('UTC') elif df_eq.index.tz != 'UTC': df_eq = df_eq.tz_convert('UTC') if not df_eq.empty: equity_data_all[filename] = df_eq valid_strategies_count += 1 # Try to grab the benchmark data from the *first* strategy that has it if benchmark_data is None and bench_df is not None and not bench_df.empty and \ 'Time' in bench_df.columns and 'Benchmark' in bench_df.columns and \ pd.api.types.is_datetime64_any_dtype(bench_df['Time']): df_b = bench_df.set_index('Time')['Benchmark'] df_b = df_b[~df_b.index.duplicated(keep='first')] # Ensure index is UTC if df_b.index.tz is None: df_b = df_b.tz_localize('UTC') elif df_b.index.tz != 'UTC': df_b = df_b.tz_convert('UTC') if not df_b.empty: benchmark_data = df_b print(f"Using benchmark data from {filename} for correlation.") else: print(f"Skipping {filename} for correlation: Invalid or empty equity_df or Time column.") # --- Check if enough data for correlation --- # Need at least 1 strategy for correlation (against itself or benchmark) # Need at least 2 strategies if no benchmark is available if valid_strategies_count == 0: corr_status = "Correlation skipped: No valid strategy equity data found." return default_corr_matrix, default_heatmap, corr_status if valid_strategies_count == 1 and benchmark_data is None: corr_status = "Correlation skipped: Only one strategy and no benchmark data." # Return the single equity series maybe? Or just empty. Empty is safer. return default_corr_matrix, default_heatmap, corr_status # --- Combine Data and Calculate Returns --- # Combine all valid equity series into a single DataFrame combined_equity = pd.concat(equity_data_all, axis=1, join='outer') # Use outer join to keep all dates # Add benchmark data if available if benchmark_data is not None: combined_equity['Benchmark'] = benchmark_data # Sort by index (Time) combined_equity = combined_equity.sort_index() # Forward-fill missing values (common for aligning different start/end dates) # Consider alternatives like backward fill or interpolation if ffill isn't appropriate combined_equity_filled = combined_equity.ffill() # Calculate daily percentage returns daily_returns = combined_equity_filled.pct_change() # Handle potential infinite values resulting from division by zero (e.g., price was 0) daily_returns.replace([np.inf, -np.inf], np.nan, inplace=True) # Drop rows with any NaN values (typically the first row after pct_change, and any rows affected by NaNs) daily_returns.dropna(inplace=True) # Check if enough overlapping data remains after cleaning if daily_returns.empty or len(daily_returns) < 2: corr_status = "Correlation skipped: Not enough overlapping daily data points after cleaning." return default_corr_matrix, default_heatmap, corr_status # --- Calculate Correlation Matrix --- try: correlation_matrix = daily_returns.corr(method='pearson') # Can change method if needed ('kendall', 'spearman') corr_status = f"Correlation calculated for {valid_strategies_count} strategies" if benchmark_data is not None: corr_status += " and Benchmark." else: corr_status += "." except Exception as corr_e: print(f"Error calculating correlation matrix: {corr_e}") traceback.print_exc() corr_status = f"Correlation calculation failed: {corr_e}" return default_corr_matrix, default_heatmap, corr_status # --- Generate Correlation Heatmap Figure --- heatmap_fig = create_empty_figure("Correlation Heatmap") # Default empty try: heatmap_fig = go.Figure(data=go.Heatmap( z=correlation_matrix.values, x=correlation_matrix.columns, y=correlation_matrix.columns, colorscale='RdBu', # Red-Blue diverging scale is good for correlation zmin=-1, zmax=1, # Set scale limits to -1 and 1 colorbar=dict(title='Correlation') )) heatmap_fig.update_layout( title='Strategy (+Benchmark) Daily Return Correlation', xaxis_tickangle=-45, # Angle labels for better readability if many strategies yaxis_autorange='reversed' # Often preferred for matrices ) # Add text annotations (correlation values) to the heatmap cells for i in range(len(correlation_matrix.columns)): for j in range(len(correlation_matrix.columns)): corr_value = correlation_matrix.iloc[i, j] if pd.notna(corr_value): # Choose text color based on background intensity for better contrast text_color = "white" if abs(corr_value) > 0.5 else "black" heatmap_fig.add_annotation( x=correlation_matrix.columns[j], y=correlation_matrix.columns[i], text=f"{corr_value:.2f}", # Format to 2 decimal places showarrow=False, font=dict(color=text_color) ) except Exception as e: print(f"Error creating correlation heatmap figure: {e}") traceback.print_exc() heatmap_fig = create_empty_figure("Error Creating Correlation Heatmap") # Update title on error return correlation_matrix, heatmap_fig, corr_status