File size: 22,894 Bytes
76317bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
# -*- coding: utf-8 -*-
"""risk_analysis.py

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/10u2Di5_droisNYuq_KYAmdgVHixe6oVi
"""

# risk_analysis.py
# Functions for calculating risk metrics and correlations.

import pandas as pd
import numpy as np
import traceback
import plotly.graph_objects as go
from utils import create_empty_figure # Import helper

def get_drawdown_table(returns: pd.Series, top: int = 5) -> pd.DataFrame:
    """
    Calculates drawdown periods and statistics from a series of returns.

    Args:
        returns: Series of daily returns with a DatetimeIndex.
        top: Number of top drawdowns (by magnitude) to return.

    Returns:
        DataFrame containing information about the top drawdown periods:
        'Peak Date', 'Valley Date', 'End Date', 'Duration (Days)', 'Max Drawdown (%)'.
        Returns an empty DataFrame if input is invalid or no drawdowns occur.
    """
    # Input validation
    if returns is None or not isinstance(returns, pd.Series) or returns.empty:
        # print("Drawdown calculation skipped: Input returns series is invalid or empty.")
        return pd.DataFrame()
    if not isinstance(returns.index, pd.DatetimeIndex):
        # print("Drawdown calculation skipped: Input returns series index is not DatetimeIndex.")
        return pd.DataFrame()

    # Create a DataFrame from the returns series
    df = returns.to_frame(name='returns')

    # Ensure returns are numeric, drop non-numeric values
    df['returns'] = pd.to_numeric(df['returns'], errors='coerce')
    df.dropna(subset=['returns'], inplace=True)
    if df.empty:
        # print("Drawdown calculation skipped: No valid numeric returns.")
        return pd.DataFrame()

    # Calculate cumulative returns (compounded)
    df['Cumulative'] = (1 + df['returns']).cumprod()
    # Calculate the running maximum cumulative return (high watermark)
    df['HighWatermark'] = df['Cumulative'].cummax()
    # Calculate drawdown as the percentage decline from the high watermark
    df['Drawdown'] = (df['Cumulative'] / df['HighWatermark']) - 1

    # Identify drawdown periods
    in_drawdown = False # Flag to track if currently in a drawdown
    periods = []        # List to store completed drawdown period dictionaries
    current_period = {} # Dictionary to store details of the ongoing drawdown
    peak_idx = df.index[0] # Initialize peak index to the start

    for idx, row in df.iterrows():
        # Update the peak index if a new high watermark is reached
        # Use .loc for safe index-based comparison, especially with potential duplicate indices
        if row['Cumulative'] >= df.loc[peak_idx, 'Cumulative']:
            peak_idx = idx

        is_dd = row['Drawdown'] < 0 # Check if currently in a drawdown state

        # Start of a new drawdown period
        if not in_drawdown and is_dd:
            in_drawdown = True
            current_period = {
                'Peak Date': peak_idx,          # Date the drawdown started (previous peak)
                'Valley Date': idx,             # Date the maximum drawdown was reached (initially the start)
                'End Date': pd.NaT,             # Date the drawdown ended (recovered to peak) - initially NaT
                'Max Drawdown (%)': row['Drawdown'], # The maximum drawdown percentage (initially the current DD)
                'Duration (Days)': 0            # Duration of the drawdown - calculated at the end
            }
        # Inside an ongoing drawdown period
        elif in_drawdown:
            # Update valley date and max drawdown if a lower point is reached
            if row['Drawdown'] < current_period['Max Drawdown (%)']:
                current_period['Valley Date'] = idx
                current_period['Max Drawdown (%)'] = row['Drawdown']

            # End of the current drawdown period (recovered)
            if not is_dd: # Recovered when Drawdown is no longer negative (or zero)
                in_drawdown = False
                current_period['End Date'] = idx # Mark the recovery date

                # Calculate duration (using business days if possible, else calendar days)
                start_date = current_period['Peak Date']
                end_date = current_period['End Date']
                if pd.notna(start_date) and pd.notna(end_date):
                    try:
                        # Attempt to use business days for duration
                        duration = len(pd.bdate_range(start=start_date, end=end_date))
                    except Exception: # Fallback to calendar days if bdate_range fails (e.g., non-standard dates)
                         duration = (end_date - start_date).days + 1 # Inclusive of start/end day
                    current_period['Duration (Days)'] = duration
                else:
                    current_period['Duration (Days)'] = np.nan # Duration is NaN if dates are invalid

                periods.append(current_period) # Add the completed period to the list
                current_period = {} # Reset for the next potential drawdown

    # Handle the case where the series ends while still in a drawdown
    if in_drawdown:
        start_date = current_period['Peak Date']
        end_date = df.index[-1] # End date is the last date in the series
        if pd.notna(start_date) and pd.notna(end_date):
             try:
                 duration = len(pd.bdate_range(start=start_date, end=end_date))
             except Exception:
                 duration = (end_date - start_date).days + 1
             current_period['Duration (Days)'] = duration
        else:
             current_period['Duration (Days)'] = np.nan
        # 'End Date' remains NaT as recovery hasn't happened by the end of the data
        periods.append(current_period)

    # If no drawdown periods were identified
    if not periods:
        return pd.DataFrame()

    # Create DataFrame from the identified periods
    drawdown_df = pd.DataFrame(periods)

    # Sort by the magnitude of the drawdown (most negative first) and select the top N
    drawdown_df = drawdown_df.sort_values(by='Max Drawdown (%)', ascending=True).head(top)

    # Format the Max Drawdown column as percentage
    drawdown_df['Max Drawdown (%)'] = drawdown_df['Max Drawdown (%)'].map('{:.2%}'.format)

    # Format date columns to YYYY-MM-DD strings for display
    for col in ['Peak Date', 'Valley Date', 'End Date']:
        if col in drawdown_df.columns:
             # Ensure conversion to datetime first, then format
             drawdown_df[col] = pd.to_datetime(drawdown_df[col]).dt.strftime('%Y-%m-%d')

    # Select and order columns for the final output table
    cols_to_select = ['Peak Date', 'Valley Date', 'End Date', 'Duration (Days)', 'Max Drawdown (%)']
    # Ensure only existing columns are selected (e.g., 'End Date' might be all NaT if never recovered)
    existing_cols = [col for col in cols_to_select if col in drawdown_df.columns]

    return drawdown_df[existing_cols]


def calculate_manual_risk_stats(returns_series):
    """
    Calculates various risk and performance metrics manually using pandas based on daily returns.

    Args:
        returns_series: A pandas Series of daily percentage returns with a DatetimeIndex.

    Returns:
        A dictionary containing:
        - monthly_returns_table_for_heatmap: DataFrame pivoted for monthly return heatmap (values as percentages).
        - monthly_perf_stats: DataFrame with summary stats for monthly returns.
        - rolling_vol_df: DataFrame containing rolling annualized volatility calculations (with 'Time' column).
        - rolling_vol_stats: DataFrame summarizing min/max/mean rolling volatility.
        - drawdown_table: DataFrame with top drawdown periods (from get_drawdown_table).
        - status: A string indicating the status of the analysis.
    """
    # Initialize results dictionary with default empty structures
    analysis_results = {
        "monthly_returns_table_for_heatmap": pd.DataFrame(),
        "monthly_perf_stats": pd.DataFrame(columns=['Metric', 'Value']),
        "rolling_vol_df": pd.DataFrame(),
        "rolling_vol_stats": pd.DataFrame(columns=['Window', 'Min Vol', 'Max Vol', 'Mean Vol']),
        "drawdown_table": pd.DataFrame(),
        "status": "Analysis skipped." # Default status
    }

    # --- Input Validation ---
    if returns_series is None or not isinstance(returns_series, pd.Series) or returns_series.empty or len(returns_series) < 2:
        analysis_results["status"] = "Analysis skipped: Insufficient/invalid returns data."
        return analysis_results
    if not isinstance(returns_series.index, pd.DatetimeIndex):
        analysis_results["status"] = "Analysis skipped: Returns index is not DatetimeIndex."
        return analysis_results

    try:
        status_parts = [] # To collect status messages for different parts

        # Ensure returns are numeric and index is UTC DatetimeIndex
        returns_series = pd.to_numeric(returns_series, errors='coerce').dropna()
        if returns_series.empty or len(returns_series) < 2:
             analysis_results["status"] = "Analysis skipped: No valid numeric returns after cleaning."
             return analysis_results

        if returns_series.index.tz is None:
            returns_series = returns_series.tz_localize('UTC')
        elif returns_series.index.tz != 'UTC':
            returns_series = returns_series.tz_convert('UTC')

        # --- Monthly Returns Analysis ---
        # Resample daily returns to monthly, calculating compounded monthly return
        # The lambda function calculates (1+r1)*(1+r2)*...*(1+rn) - 1 for each month
        monthly_rets = returns_series.resample('M').apply(lambda x: (1 + x).prod() - 1)

        if not monthly_rets.empty:
            # Create table for heatmap: Year rows, Month columns
            monthly_ret_table_df = pd.DataFrame({'returns': monthly_rets})
            monthly_ret_table_df['Year'] = monthly_ret_table_df.index.year
            monthly_ret_table_df['Month'] = monthly_ret_table_df.index.strftime('%b') # Month abbreviation (Jan, Feb, ...)
            # Pivot the table
            monthly_heatmap_data = monthly_ret_table_df.pivot_table(index='Year', columns='Month', values='returns')
            # Order columns chronologically
            month_order = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
            present_months = [m for m in month_order if m in monthly_heatmap_data.columns]
            monthly_heatmap_data = monthly_heatmap_data[present_months]
            # Sort index (Year) ascending
            monthly_heatmap_data.sort_index(ascending=True, inplace=True)
            # Store as percentages for the heatmap plot
            analysis_results["monthly_returns_table_for_heatmap"] = monthly_heatmap_data * 100

            # Monthly Performance Statistics
            monthly_stats = {
                "Min": f"{monthly_rets.min():.2%}",
                "Max": f"{monthly_rets.max():.2%}",
                "Mean": f"{monthly_rets.mean():.2%}",
                "Positive Months": (monthly_rets > 0).sum(),
                "Negative Months": (monthly_rets <= 0).sum()
            }
            analysis_results["monthly_perf_stats"] = pd.DataFrame(list(monthly_stats.items()), columns=['Metric', 'Value'])
            status_parts.append("Monthly stats OK.")
        else:
            status_parts.append("Monthly stats skipped (no monthly data).")


        # --- Rolling Volatility Analysis ---
        vol_df = pd.DataFrame(index=returns_series.index) # Initialize DF to store rolling vol results
        vol_stats_list = [] # List to store summary stats for each window
        # Define windows (label: number of trading days)
        windows = {'3M': 63, '6M': 126, '12M': 252}
        vol_calculated = False
        for label, window in windows.items():
            # Check if there's enough data for the window
            if len(returns_series) >= window:
                try:
                    # Calculate rolling standard deviation
                    # min_periods ensures calculation starts even if window isn't full yet (adjust as needed)
                    rolling_std = returns_series.rolling(window=window, min_periods=window // 2).std()
                    # Annualize the volatility (multiply by sqrt of trading days per year)
                    rolling_vol = rolling_std * np.sqrt(252)
                    # Store the result in the DataFrame
                    vol_df[f'vol_{label}'] = rolling_vol
                    # Calculate summary stats for this window's volatility
                    if not rolling_vol.dropna().empty: # Check if there are valid vol values
                        vol_stats_list.append({
                            "Window": label,
                            "Min Vol": f"{rolling_vol.min():.2%}",
                            "Max Vol": f"{rolling_vol.max():.2%}",
                            "Mean Vol": f"{rolling_vol.mean():.2%}"
                        })
                        vol_calculated = True
                except Exception as vol_e:
                    print(f"Error calculating rolling volatility for window {label}: {vol_e}")
                    status_parts.append(f"Rolling Vol ({label}) Error.")

        # Store the rolling volatility DataFrame (reset index to get 'Time' column for plotting)
        if not vol_df.empty:
            analysis_results["rolling_vol_df"] = vol_df.reset_index()

        # Store the summary statistics if any were calculated
        if vol_stats_list:
            analysis_results["rolling_vol_stats"] = pd.DataFrame(vol_stats_list)
            status_parts.append("Rolling Vol OK.")
        elif not vol_calculated and "Error" not in " ".join(status_parts): # If no vol calculated and no errors reported
             status_parts.append("Rolling Vol skipped (insufficient data for windows).")


        # --- Drawdown Table Calculation ---
        try:
            analysis_results["drawdown_table"] = get_drawdown_table(returns_series, top=5)
            if not analysis_results["drawdown_table"].empty:
                 status_parts.append("Drawdown Table OK.")
            else:
                 status_parts.append("Drawdown Table: No drawdowns found or error.")
        except Exception as dd_e:
             print(f"Error calculating drawdown table: {dd_e}")
             traceback.print_exc()
             status_parts.append("Drawdown Table Error.")


        # --- Final Status ---
        analysis_results["status"] = " ".join(status_parts) if status_parts else "Analysis completed (no specific issues)."

    except Exception as e:
        # Catch-all for any unexpected error during the entire analysis
        error_msg = f"Error during manual risk analysis: {e}"
        print(error_msg)
        traceback.print_exc()
        analysis_results["status"] = f"Manual risk analysis failed: {e}"

    return analysis_results


def calculate_correlation(all_results):
    """
    Calculates the correlation matrix for the daily returns of multiple strategies
    and optionally includes the benchmark.

    Args:
        all_results: A dictionary where keys are strategy filenames and values are
                     the result dictionaries obtained from process_single_file.
                     These results should contain 'equity_df' and optionally 'benchmark_df'.

    Returns:
        A tuple containing:
        - correlation_matrix: DataFrame of the Pearson correlation coefficients.
        - heatmap_fig: Plotly heatmap figure of the correlation matrix.
        - corr_status: String message indicating the status of the correlation calculation.
    """
    # Default outputs
    default_corr_matrix = pd.DataFrame()
    default_heatmap = create_empty_figure("Correlation Heatmap (Insufficient Data)")
    corr_status = "Correlation analysis skipped."

    equity_data_all = {} # Dictionary to store equity series {filename: Series}
    benchmark_data = None # To store the first valid benchmark series found
    valid_strategies_count = 0 # Count strategies with valid equity data

    # --- Extract Equity and Benchmark Data ---
    for filename, results in all_results.items():
        if results.get("error"): # Skip files that had processing errors
             print(f"Skipping {filename} for correlation due to processing error.")
             continue

        equity_df = results.get("equity_df") # DataFrame with 'Time', 'Equity'
        bench_df = results.get("benchmark_df") # DataFrame with 'Time', 'Benchmark'

        # Check for valid equity data
        if equity_df is not None and not equity_df.empty and \
           'Time' in equity_df.columns and 'Equity' in equity_df.columns and \
           pd.api.types.is_datetime64_any_dtype(equity_df['Time']):

            # Set 'Time' as index, select 'Equity', remove duplicate indices
            df_eq = equity_df.set_index('Time')['Equity']
            df_eq = df_eq[~df_eq.index.duplicated(keep='first')]

            # Ensure index is UTC
            if df_eq.index.tz is None: df_eq = df_eq.tz_localize('UTC')
            elif df_eq.index.tz != 'UTC': df_eq = df_eq.tz_convert('UTC')

            if not df_eq.empty:
                equity_data_all[filename] = df_eq
                valid_strategies_count += 1

                # Try to grab the benchmark data from the *first* strategy that has it
                if benchmark_data is None and bench_df is not None and not bench_df.empty and \
                   'Time' in bench_df.columns and 'Benchmark' in bench_df.columns and \
                   pd.api.types.is_datetime64_any_dtype(bench_df['Time']):

                    df_b = bench_df.set_index('Time')['Benchmark']
                    df_b = df_b[~df_b.index.duplicated(keep='first')]

                    # Ensure index is UTC
                    if df_b.index.tz is None: df_b = df_b.tz_localize('UTC')
                    elif df_b.index.tz != 'UTC': df_b = df_b.tz_convert('UTC')

                    if not df_b.empty:
                        benchmark_data = df_b
                        print(f"Using benchmark data from {filename} for correlation.")
        else:
            print(f"Skipping {filename} for correlation: Invalid or empty equity_df or Time column.")

    # --- Check if enough data for correlation ---
    # Need at least 1 strategy for correlation (against itself or benchmark)
    # Need at least 2 strategies if no benchmark is available
    if valid_strategies_count == 0:
         corr_status = "Correlation skipped: No valid strategy equity data found."
         return default_corr_matrix, default_heatmap, corr_status
    if valid_strategies_count == 1 and benchmark_data is None:
         corr_status = "Correlation skipped: Only one strategy and no benchmark data."
         # Return the single equity series maybe? Or just empty. Empty is safer.
         return default_corr_matrix, default_heatmap, corr_status


    # --- Combine Data and Calculate Returns ---
    # Combine all valid equity series into a single DataFrame
    combined_equity = pd.concat(equity_data_all, axis=1, join='outer') # Use outer join to keep all dates

    # Add benchmark data if available
    if benchmark_data is not None:
        combined_equity['Benchmark'] = benchmark_data

    # Sort by index (Time)
    combined_equity = combined_equity.sort_index()

    # Forward-fill missing values (common for aligning different start/end dates)
    # Consider alternatives like backward fill or interpolation if ffill isn't appropriate
    combined_equity_filled = combined_equity.ffill()

    # Calculate daily percentage returns
    daily_returns = combined_equity_filled.pct_change()

    # Handle potential infinite values resulting from division by zero (e.g., price was 0)
    daily_returns.replace([np.inf, -np.inf], np.nan, inplace=True)

    # Drop rows with any NaN values (typically the first row after pct_change, and any rows affected by NaNs)
    daily_returns.dropna(inplace=True)

    # Check if enough overlapping data remains after cleaning
    if daily_returns.empty or len(daily_returns) < 2:
        corr_status = "Correlation skipped: Not enough overlapping daily data points after cleaning."
        return default_corr_matrix, default_heatmap, corr_status

    # --- Calculate Correlation Matrix ---
    try:
        correlation_matrix = daily_returns.corr(method='pearson') # Can change method if needed ('kendall', 'spearman')
        corr_status = f"Correlation calculated for {valid_strategies_count} strategies"
        if benchmark_data is not None:
            corr_status += " and Benchmark."
        else:
            corr_status += "."
    except Exception as corr_e:
         print(f"Error calculating correlation matrix: {corr_e}")
         traceback.print_exc()
         corr_status = f"Correlation calculation failed: {corr_e}"
         return default_corr_matrix, default_heatmap, corr_status


    # --- Generate Correlation Heatmap Figure ---
    heatmap_fig = create_empty_figure("Correlation Heatmap") # Default empty
    try:
        heatmap_fig = go.Figure(data=go.Heatmap(
            z=correlation_matrix.values,
            x=correlation_matrix.columns,
            y=correlation_matrix.columns,
            colorscale='RdBu', # Red-Blue diverging scale is good for correlation
            zmin=-1, zmax=1,   # Set scale limits to -1 and 1
            colorbar=dict(title='Correlation')
        ))
        heatmap_fig.update_layout(
            title='Strategy (+Benchmark) Daily Return Correlation',
            xaxis_tickangle=-45, # Angle labels for better readability if many strategies
            yaxis_autorange='reversed' # Often preferred for matrices
        )

        # Add text annotations (correlation values) to the heatmap cells
        for i in range(len(correlation_matrix.columns)):
            for j in range(len(correlation_matrix.columns)):
                corr_value = correlation_matrix.iloc[i, j]
                if pd.notna(corr_value):
                    # Choose text color based on background intensity for better contrast
                    text_color = "white" if abs(corr_value) > 0.5 else "black"
                    heatmap_fig.add_annotation(
                        x=correlation_matrix.columns[j],
                        y=correlation_matrix.columns[i],
                        text=f"{corr_value:.2f}", # Format to 2 decimal places
                        showarrow=False,
                        font=dict(color=text_color)
                    )
    except Exception as e:
        print(f"Error creating correlation heatmap figure: {e}")
        traceback.print_exc()
        heatmap_fig = create_empty_figure("Error Creating Correlation Heatmap") # Update title on error

    return correlation_matrix, heatmap_fig, corr_status