| |
| """risk_analysis.py |
| |
| Automatically generated by Colab. |
| |
| Original file is located at |
| https://colab.research.google.com/drive/10u2Di5_droisNYuq_KYAmdgVHixe6oVi |
| """ |
|
|
| |
| |
|
|
| import pandas as pd |
| import numpy as np |
| import traceback |
| import plotly.graph_objects as go |
| from utils import create_empty_figure |
|
|
| def get_drawdown_table(returns: pd.Series, top: int = 5) -> pd.DataFrame: |
| """ |
| Calculates drawdown periods and statistics from a series of returns. |
| |
| Args: |
| returns: Series of daily returns with a DatetimeIndex. |
| top: Number of top drawdowns (by magnitude) to return. |
| |
| Returns: |
| DataFrame containing information about the top drawdown periods: |
| 'Peak Date', 'Valley Date', 'End Date', 'Duration (Days)', 'Max Drawdown (%)'. |
| Returns an empty DataFrame if input is invalid or no drawdowns occur. |
| """ |
| |
| if returns is None or not isinstance(returns, pd.Series) or returns.empty: |
| |
| return pd.DataFrame() |
| if not isinstance(returns.index, pd.DatetimeIndex): |
| |
| return pd.DataFrame() |
|
|
| |
| df = returns.to_frame(name='returns') |
|
|
| |
| df['returns'] = pd.to_numeric(df['returns'], errors='coerce') |
| df.dropna(subset=['returns'], inplace=True) |
| if df.empty: |
| |
| return pd.DataFrame() |
|
|
| |
| df['Cumulative'] = (1 + df['returns']).cumprod() |
| |
| df['HighWatermark'] = df['Cumulative'].cummax() |
| |
| df['Drawdown'] = (df['Cumulative'] / df['HighWatermark']) - 1 |
|
|
| |
| in_drawdown = False |
| periods = [] |
| current_period = {} |
| peak_idx = df.index[0] |
|
|
| for idx, row in df.iterrows(): |
| |
| |
| if row['Cumulative'] >= df.loc[peak_idx, 'Cumulative']: |
| peak_idx = idx |
|
|
| is_dd = row['Drawdown'] < 0 |
|
|
| |
| if not in_drawdown and is_dd: |
| in_drawdown = True |
| current_period = { |
| 'Peak Date': peak_idx, |
| 'Valley Date': idx, |
| 'End Date': pd.NaT, |
| 'Max Drawdown (%)': row['Drawdown'], |
| 'Duration (Days)': 0 |
| } |
| |
| elif in_drawdown: |
| |
| if row['Drawdown'] < current_period['Max Drawdown (%)']: |
| current_period['Valley Date'] = idx |
| current_period['Max Drawdown (%)'] = row['Drawdown'] |
|
|
| |
| if not is_dd: |
| in_drawdown = False |
| current_period['End Date'] = idx |
|
|
| |
| start_date = current_period['Peak Date'] |
| end_date = current_period['End Date'] |
| if pd.notna(start_date) and pd.notna(end_date): |
| try: |
| |
| duration = len(pd.bdate_range(start=start_date, end=end_date)) |
| except Exception: |
| duration = (end_date - start_date).days + 1 |
| current_period['Duration (Days)'] = duration |
| else: |
| current_period['Duration (Days)'] = np.nan |
|
|
| periods.append(current_period) |
| current_period = {} |
|
|
| |
| if in_drawdown: |
| start_date = current_period['Peak Date'] |
| end_date = df.index[-1] |
| if pd.notna(start_date) and pd.notna(end_date): |
| try: |
| duration = len(pd.bdate_range(start=start_date, end=end_date)) |
| except Exception: |
| duration = (end_date - start_date).days + 1 |
| current_period['Duration (Days)'] = duration |
| else: |
| current_period['Duration (Days)'] = np.nan |
| |
| periods.append(current_period) |
|
|
| |
| if not periods: |
| return pd.DataFrame() |
|
|
| |
| drawdown_df = pd.DataFrame(periods) |
|
|
| |
| drawdown_df = drawdown_df.sort_values(by='Max Drawdown (%)', ascending=True).head(top) |
|
|
| |
| drawdown_df['Max Drawdown (%)'] = drawdown_df['Max Drawdown (%)'].map('{:.2%}'.format) |
|
|
| |
| for col in ['Peak Date', 'Valley Date', 'End Date']: |
| if col in drawdown_df.columns: |
| |
| drawdown_df[col] = pd.to_datetime(drawdown_df[col]).dt.strftime('%Y-%m-%d') |
|
|
| |
| cols_to_select = ['Peak Date', 'Valley Date', 'End Date', 'Duration (Days)', 'Max Drawdown (%)'] |
| |
| existing_cols = [col for col in cols_to_select if col in drawdown_df.columns] |
|
|
| return drawdown_df[existing_cols] |
|
|
|
|
| def calculate_manual_risk_stats(returns_series): |
| """ |
| Calculates various risk and performance metrics manually using pandas based on daily returns. |
| |
| Args: |
| returns_series: A pandas Series of daily percentage returns with a DatetimeIndex. |
| |
| Returns: |
| A dictionary containing: |
| - monthly_returns_table_for_heatmap: DataFrame pivoted for monthly return heatmap (values as percentages). |
| - monthly_perf_stats: DataFrame with summary stats for monthly returns. |
| - rolling_vol_df: DataFrame containing rolling annualized volatility calculations (with 'Time' column). |
| - rolling_vol_stats: DataFrame summarizing min/max/mean rolling volatility. |
| - drawdown_table: DataFrame with top drawdown periods (from get_drawdown_table). |
| - status: A string indicating the status of the analysis. |
| """ |
| |
| analysis_results = { |
| "monthly_returns_table_for_heatmap": pd.DataFrame(), |
| "monthly_perf_stats": pd.DataFrame(columns=['Metric', 'Value']), |
| "rolling_vol_df": pd.DataFrame(), |
| "rolling_vol_stats": pd.DataFrame(columns=['Window', 'Min Vol', 'Max Vol', 'Mean Vol']), |
| "drawdown_table": pd.DataFrame(), |
| "status": "Analysis skipped." |
| } |
|
|
| |
| if returns_series is None or not isinstance(returns_series, pd.Series) or returns_series.empty or len(returns_series) < 2: |
| analysis_results["status"] = "Analysis skipped: Insufficient/invalid returns data." |
| return analysis_results |
| if not isinstance(returns_series.index, pd.DatetimeIndex): |
| analysis_results["status"] = "Analysis skipped: Returns index is not DatetimeIndex." |
| return analysis_results |
|
|
| try: |
| status_parts = [] |
|
|
| |
| returns_series = pd.to_numeric(returns_series, errors='coerce').dropna() |
| if returns_series.empty or len(returns_series) < 2: |
| analysis_results["status"] = "Analysis skipped: No valid numeric returns after cleaning." |
| return analysis_results |
|
|
| if returns_series.index.tz is None: |
| returns_series = returns_series.tz_localize('UTC') |
| elif returns_series.index.tz != 'UTC': |
| returns_series = returns_series.tz_convert('UTC') |
|
|
| |
| |
| |
| monthly_rets = returns_series.resample('M').apply(lambda x: (1 + x).prod() - 1) |
|
|
| if not monthly_rets.empty: |
| |
| monthly_ret_table_df = pd.DataFrame({'returns': monthly_rets}) |
| monthly_ret_table_df['Year'] = monthly_ret_table_df.index.year |
| monthly_ret_table_df['Month'] = monthly_ret_table_df.index.strftime('%b') |
| |
| monthly_heatmap_data = monthly_ret_table_df.pivot_table(index='Year', columns='Month', values='returns') |
| |
| month_order = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] |
| present_months = [m for m in month_order if m in monthly_heatmap_data.columns] |
| monthly_heatmap_data = monthly_heatmap_data[present_months] |
| |
| monthly_heatmap_data.sort_index(ascending=True, inplace=True) |
| |
| analysis_results["monthly_returns_table_for_heatmap"] = monthly_heatmap_data * 100 |
|
|
| |
| monthly_stats = { |
| "Min": f"{monthly_rets.min():.2%}", |
| "Max": f"{monthly_rets.max():.2%}", |
| "Mean": f"{monthly_rets.mean():.2%}", |
| "Positive Months": (monthly_rets > 0).sum(), |
| "Negative Months": (monthly_rets <= 0).sum() |
| } |
| analysis_results["monthly_perf_stats"] = pd.DataFrame(list(monthly_stats.items()), columns=['Metric', 'Value']) |
| status_parts.append("Monthly stats OK.") |
| else: |
| status_parts.append("Monthly stats skipped (no monthly data).") |
|
|
|
|
| |
| vol_df = pd.DataFrame(index=returns_series.index) |
| vol_stats_list = [] |
| |
| windows = {'3M': 63, '6M': 126, '12M': 252} |
| vol_calculated = False |
| for label, window in windows.items(): |
| |
| if len(returns_series) >= window: |
| try: |
| |
| |
| rolling_std = returns_series.rolling(window=window, min_periods=window // 2).std() |
| |
| rolling_vol = rolling_std * np.sqrt(252) |
| |
| vol_df[f'vol_{label}'] = rolling_vol |
| |
| if not rolling_vol.dropna().empty: |
| vol_stats_list.append({ |
| "Window": label, |
| "Min Vol": f"{rolling_vol.min():.2%}", |
| "Max Vol": f"{rolling_vol.max():.2%}", |
| "Mean Vol": f"{rolling_vol.mean():.2%}" |
| }) |
| vol_calculated = True |
| except Exception as vol_e: |
| print(f"Error calculating rolling volatility for window {label}: {vol_e}") |
| status_parts.append(f"Rolling Vol ({label}) Error.") |
|
|
| |
| if not vol_df.empty: |
| analysis_results["rolling_vol_df"] = vol_df.reset_index() |
|
|
| |
| if vol_stats_list: |
| analysis_results["rolling_vol_stats"] = pd.DataFrame(vol_stats_list) |
| status_parts.append("Rolling Vol OK.") |
| elif not vol_calculated and "Error" not in " ".join(status_parts): |
| status_parts.append("Rolling Vol skipped (insufficient data for windows).") |
|
|
|
|
| |
| try: |
| analysis_results["drawdown_table"] = get_drawdown_table(returns_series, top=5) |
| if not analysis_results["drawdown_table"].empty: |
| status_parts.append("Drawdown Table OK.") |
| else: |
| status_parts.append("Drawdown Table: No drawdowns found or error.") |
| except Exception as dd_e: |
| print(f"Error calculating drawdown table: {dd_e}") |
| traceback.print_exc() |
| status_parts.append("Drawdown Table Error.") |
|
|
|
|
| |
| analysis_results["status"] = " ".join(status_parts) if status_parts else "Analysis completed (no specific issues)." |
|
|
| except Exception as e: |
| |
| error_msg = f"Error during manual risk analysis: {e}" |
| print(error_msg) |
| traceback.print_exc() |
| analysis_results["status"] = f"Manual risk analysis failed: {e}" |
|
|
| return analysis_results |
|
|
|
|
| def calculate_correlation(all_results): |
| """ |
| Calculates the correlation matrix for the daily returns of multiple strategies |
| and optionally includes the benchmark. |
| |
| Args: |
| all_results: A dictionary where keys are strategy filenames and values are |
| the result dictionaries obtained from process_single_file. |
| These results should contain 'equity_df' and optionally 'benchmark_df'. |
| |
| Returns: |
| A tuple containing: |
| - correlation_matrix: DataFrame of the Pearson correlation coefficients. |
| - heatmap_fig: Plotly heatmap figure of the correlation matrix. |
| - corr_status: String message indicating the status of the correlation calculation. |
| """ |
| |
| default_corr_matrix = pd.DataFrame() |
| default_heatmap = create_empty_figure("Correlation Heatmap (Insufficient Data)") |
| corr_status = "Correlation analysis skipped." |
|
|
| equity_data_all = {} |
| benchmark_data = None |
| valid_strategies_count = 0 |
|
|
| |
| for filename, results in all_results.items(): |
| if results.get("error"): |
| print(f"Skipping {filename} for correlation due to processing error.") |
| continue |
|
|
| equity_df = results.get("equity_df") |
| bench_df = results.get("benchmark_df") |
|
|
| |
| if equity_df is not None and not equity_df.empty and \ |
| 'Time' in equity_df.columns and 'Equity' in equity_df.columns and \ |
| pd.api.types.is_datetime64_any_dtype(equity_df['Time']): |
|
|
| |
| df_eq = equity_df.set_index('Time')['Equity'] |
| df_eq = df_eq[~df_eq.index.duplicated(keep='first')] |
|
|
| |
| if df_eq.index.tz is None: df_eq = df_eq.tz_localize('UTC') |
| elif df_eq.index.tz != 'UTC': df_eq = df_eq.tz_convert('UTC') |
|
|
| if not df_eq.empty: |
| equity_data_all[filename] = df_eq |
| valid_strategies_count += 1 |
|
|
| |
| if benchmark_data is None and bench_df is not None and not bench_df.empty and \ |
| 'Time' in bench_df.columns and 'Benchmark' in bench_df.columns and \ |
| pd.api.types.is_datetime64_any_dtype(bench_df['Time']): |
|
|
| df_b = bench_df.set_index('Time')['Benchmark'] |
| df_b = df_b[~df_b.index.duplicated(keep='first')] |
|
|
| |
| if df_b.index.tz is None: df_b = df_b.tz_localize('UTC') |
| elif df_b.index.tz != 'UTC': df_b = df_b.tz_convert('UTC') |
|
|
| if not df_b.empty: |
| benchmark_data = df_b |
| print(f"Using benchmark data from {filename} for correlation.") |
| else: |
| print(f"Skipping {filename} for correlation: Invalid or empty equity_df or Time column.") |
|
|
| |
| |
| |
| if valid_strategies_count == 0: |
| corr_status = "Correlation skipped: No valid strategy equity data found." |
| return default_corr_matrix, default_heatmap, corr_status |
| if valid_strategies_count == 1 and benchmark_data is None: |
| corr_status = "Correlation skipped: Only one strategy and no benchmark data." |
| |
| return default_corr_matrix, default_heatmap, corr_status |
|
|
|
|
| |
| |
| combined_equity = pd.concat(equity_data_all, axis=1, join='outer') |
|
|
| |
| if benchmark_data is not None: |
| combined_equity['Benchmark'] = benchmark_data |
|
|
| |
| combined_equity = combined_equity.sort_index() |
|
|
| |
| |
| combined_equity_filled = combined_equity.ffill() |
|
|
| |
| daily_returns = combined_equity_filled.pct_change() |
|
|
| |
| daily_returns.replace([np.inf, -np.inf], np.nan, inplace=True) |
|
|
| |
| daily_returns.dropna(inplace=True) |
|
|
| |
| if daily_returns.empty or len(daily_returns) < 2: |
| corr_status = "Correlation skipped: Not enough overlapping daily data points after cleaning." |
| return default_corr_matrix, default_heatmap, corr_status |
|
|
| |
| try: |
| correlation_matrix = daily_returns.corr(method='pearson') |
| corr_status = f"Correlation calculated for {valid_strategies_count} strategies" |
| if benchmark_data is not None: |
| corr_status += " and Benchmark." |
| else: |
| corr_status += "." |
| except Exception as corr_e: |
| print(f"Error calculating correlation matrix: {corr_e}") |
| traceback.print_exc() |
| corr_status = f"Correlation calculation failed: {corr_e}" |
| return default_corr_matrix, default_heatmap, corr_status |
|
|
|
|
| |
| heatmap_fig = create_empty_figure("Correlation Heatmap") |
| try: |
| heatmap_fig = go.Figure(data=go.Heatmap( |
| z=correlation_matrix.values, |
| x=correlation_matrix.columns, |
| y=correlation_matrix.columns, |
| colorscale='RdBu', |
| zmin=-1, zmax=1, |
| colorbar=dict(title='Correlation') |
| )) |
| heatmap_fig.update_layout( |
| title='Strategy (+Benchmark) Daily Return Correlation', |
| xaxis_tickangle=-45, |
| yaxis_autorange='reversed' |
| ) |
|
|
| |
| for i in range(len(correlation_matrix.columns)): |
| for j in range(len(correlation_matrix.columns)): |
| corr_value = correlation_matrix.iloc[i, j] |
| if pd.notna(corr_value): |
| |
| text_color = "white" if abs(corr_value) > 0.5 else "black" |
| heatmap_fig.add_annotation( |
| x=correlation_matrix.columns[j], |
| y=correlation_matrix.columns[i], |
| text=f"{corr_value:.2f}", |
| showarrow=False, |
| font=dict(color=text_color) |
| ) |
| except Exception as e: |
| print(f"Error creating correlation heatmap figure: {e}") |
| traceback.print_exc() |
| heatmap_fig = create_empty_figure("Error Creating Correlation Heatmap") |
|
|
| return correlation_matrix, heatmap_fig, corr_status |