| | import pandas as pd
|
| | import numpy as np
|
| | from scipy import stats
|
| | import sys
|
| |
|
| | def calculate_rolling_metrics(input_file, output_file):
|
| | try:
|
| | print(f"Reading {input_file}...")
|
| | df = pd.read_csv(input_file)
|
| |
|
| |
|
| |
|
| | df['Time_deal'] = pd.to_datetime(df['Time_deal'])
|
| |
|
| |
|
| | df = df.sort_values('Time_deal').reset_index(drop=True)
|
| |
|
| |
|
| |
|
| |
|
| | df['Prev_Balance'] = df['Balance'] - df['Profit']
|
| |
|
| |
|
| | df['Trade_Return'] = np.where(
|
| | df['Prev_Balance'] > 0,
|
| | df['Profit'] / df['Prev_Balance'],
|
| | 0.0
|
| | )
|
| |
|
| |
|
| | metric_map = {
|
| | 'Sharpe': 'rolling_Sharpe_Ratio',
|
| | 'Sortino': 'rolling_Sortino_Ratio',
|
| | 'Calmar': 'rolling_Calmar_Ratio',
|
| | 'Stability': 'rolling_Stability',
|
| | 'Recovery Factor': 'rolling_Recovery_Factor',
|
| | 'omega ratio': 'rolling_Omega_Ratio',
|
| | 'skew': 'rolling_Skewness',
|
| | 'kurtosis': 'rolling_Kurtosis',
|
| | 'tail ratio': 'rolling_Tail_Ratio',
|
| | 'alpha': 'rolling_Alpha',
|
| | 'beta': 'rolling_Beta',
|
| | 'Common Sense Ratio': 'rolling_Common_Sense_Ratio',
|
| | 'volatility': 'rolling_Volatility',
|
| | 'Kelly Criterion': 'rolling_Kelly_Criterion',
|
| | 'System Quality Number (SQN)': 'rolling_SQN_System_Quality_Number',
|
| | 'K-Ratio': 'rolling_K_Ratio',
|
| | 'R-Squared': 'rolling_R_Squared',
|
| | 'CPC Index': 'rolling_CPC_Index',
|
| | 'VaR': 'rolling_VaR_Value_at_Risk',
|
| | 'CVaR': 'rolling_CVaR_Conditional_Value_at_Risk',
|
| | 'return standard deviation': 'rolling_Return_Standard_Deviation',
|
| | 'AHPR': 'rolling_AHPR_Average_Holding_Period_Return',
|
| | 'GHPR': 'rolling_GHPR_Geometric_Holding_Period_Return',
|
| | 'Drawdown duration': 'rolling_Drawdown_Duration',
|
| | 'Maximum drawdown duration': 'rolling_Max_Drawdown_Duration',
|
| | 'Time-weighted return (TWR)': 'rolling_TWR_Time_Weighted_Return',
|
| | 'Money-weighted return (MWR / IRR)': 'rolling_MWR_Money_Weighted_Return',
|
| | 'Ulcer Index': 'rolling_Ulcer_Index',
|
| | 'MAE': 'rolling_MAE_Max_Adverse_Excursion',
|
| | 'MFE': 'rolling_MFE_Max_Favorable_Excursion',
|
| | 'MAR Ratio': 'rolling_MAR_Ratio',
|
| | 'Information Ratio': 'rolling_Information_Ratio',
|
| | 'Treynor Ratio': 'rolling_Treynor_Ratio',
|
| | 'Tracking Error': 'rolling_Tracking_Error',
|
| | 'Active Share': 'rolling_Active_Share'
|
| | }
|
| |
|
| |
|
| | for col in metric_map.values():
|
| | df[col] = np.nan
|
| |
|
| |
|
| | print("Calculating metrics (this may take a moment)...")
|
| |
|
| |
|
| | returns = df['Trade_Return'].values
|
| | profits = df['Profit'].values
|
| | balances = df['Balance'].values
|
| | times = df['Time_deal'].values
|
| |
|
| | n = len(df)
|
| |
|
| |
|
| | max_dd_duration_sec = 0.0
|
| |
|
| | for i in range(n):
|
| |
|
| | hist_rets = returns[:i+1]
|
| | hist_profits = profits[:i+1]
|
| | hist_bal = balances[:i+1]
|
| | current_time = times[i]
|
| |
|
| |
|
| | if len(hist_rets) > 1:
|
| | mean_ret = np.mean(hist_rets)
|
| | std_ret = np.std(hist_rets, ddof=1)
|
| | else:
|
| | mean_ret = hist_rets[0]
|
| | std_ret = 0.0
|
| |
|
| |
|
| | df.at[i, metric_map['volatility']] = std_ret
|
| | df.at[i, metric_map['return standard deviation']] = std_ret
|
| |
|
| |
|
| | if std_ret > 1e-9:
|
| | df.at[i, metric_map['Sharpe']] = mean_ret / std_ret
|
| | else:
|
| | df.at[i, metric_map['Sharpe']] = 0.0
|
| |
|
| |
|
| | neg_rets = hist_rets[hist_rets < 0]
|
| | if len(neg_rets) > 1:
|
| | down_std = np.std(neg_rets, ddof=1)
|
| | if down_std > 1e-9:
|
| | df.at[i, metric_map['Sortino']] = mean_ret / down_std
|
| |
|
| |
|
| | if len(hist_rets) > 2:
|
| | df.at[i, metric_map['skew']] = stats.skew(hist_rets)
|
| | df.at[i, metric_map['kurtosis']] = stats.kurtosis(hist_rets)
|
| |
|
| |
|
| | cum_max = np.maximum.accumulate(hist_bal)
|
| | drawdowns = (cum_max - hist_bal) / cum_max
|
| | max_dd = np.max(drawdowns)
|
| |
|
| |
|
| | if len(drawdowns) > 0:
|
| | df.at[i, metric_map['Ulcer Index']] = np.sqrt(np.mean(drawdowns**2))
|
| |
|
| |
|
| |
|
| | elapsed_seconds = (current_time - times[0]).astype('timedelta64[s]').astype(float)
|
| | years = elapsed_seconds / (365 * 24 * 3600)
|
| |
|
| | total_return = (hist_bal[-1] / (hist_bal[0] - hist_profits[0])) - 1 if (hist_bal[0] - hist_profits[0]) > 0 else 0
|
| |
|
| | cagr = 0
|
| | if years > 0:
|
| |
|
| | if total_return > -1:
|
| | cagr = (1 + total_return) ** (1 / years) - 1
|
| |
|
| | if max_dd > 0:
|
| | df.at[i, metric_map['Calmar']] = cagr / max_dd
|
| | df.at[i, metric_map['MAR Ratio']] = cagr / max_dd
|
| |
|
| |
|
| | net_profit = np.sum(hist_profits)
|
| | dd_amounts = cum_max - hist_bal
|
| | max_dd_amt = np.max(dd_amounts)
|
| |
|
| | if max_dd_amt > 0:
|
| | df.at[i, metric_map['Recovery Factor']] = net_profit / max_dd_amt
|
| |
|
| |
|
| |
|
| | hwm_idx = np.argmax(hist_bal)
|
| | current_dd_duration = (current_time - times[hwm_idx]).astype('timedelta64[s]').astype(float)
|
| | df.at[i, metric_map['Drawdown duration']] = current_dd_duration
|
| |
|
| |
|
| | max_dd_duration_sec = max(max_dd_duration_sec, current_dd_duration)
|
| | df.at[i, metric_map['Maximum drawdown duration']] = max_dd_duration_sec
|
| |
|
| |
|
| | if len(hist_bal) > 2:
|
| | try:
|
| |
|
| | y = np.log(np.abs(hist_bal) + 1e-9)
|
| | x = np.arange(len(y))
|
| | slope, intercept, r_val, p_val, std_err = stats.linregress(x, y)
|
| | df.at[i, metric_map['Stability']] = r_val ** 2
|
| |
|
| |
|
| | slope_k, _, _, _, std_err_k = stats.linregress(x, hist_bal)
|
| | if std_err_k > 0:
|
| | df.at[i, metric_map['K-Ratio']] = slope_k / std_err_k
|
| | except:
|
| | pass
|
| |
|
| |
|
| | wins = hist_profits[hist_profits > 0]
|
| | losses = np.abs(hist_profits[hist_profits < 0])
|
| |
|
| |
|
| | if np.sum(losses) > 0:
|
| | df.at[i, metric_map['omega ratio']] = np.sum(wins) / np.sum(losses)
|
| |
|
| |
|
| | if len(wins) > 0 and len(losses) > 0:
|
| | win_rate = len(wins) / len(hist_profits)
|
| | avg_win = np.mean(wins)
|
| | avg_loss = np.mean(losses)
|
| | if avg_loss > 0:
|
| | b_ratio = avg_win / avg_loss
|
| |
|
| | df.at[i, metric_map['Kelly Criterion']] = win_rate - (1 - win_rate) / b_ratio
|
| |
|
| |
|
| | pf = np.sum(wins) / np.sum(losses)
|
| | df.at[i, metric_map['CPC Index']] = pf * win_rate * b_ratio
|
| |
|
| |
|
| | if len(hist_profits) > 1:
|
| | std_profit = np.std(hist_profits, ddof=1)
|
| | if std_profit > 0:
|
| | sqn = np.sqrt(len(hist_profits)) * np.mean(hist_profits) / std_profit
|
| | df.at[i, metric_map['System Quality Number (SQN)']] = sqn
|
| |
|
| |
|
| | if len(hist_rets) > 10:
|
| | t_95 = np.percentile(hist_rets, 95)
|
| | t_05 = np.abs(np.percentile(hist_rets, 5))
|
| | if t_05 > 0:
|
| | df.at[i, metric_map['tail ratio']] = t_95 / t_05
|
| |
|
| |
|
| | pf = np.sum(wins)/np.sum(losses) if np.sum(losses) > 0 else 0
|
| | df.at[i, metric_map['Common Sense Ratio']] = pf * (t_95 / t_05)
|
| |
|
| |
|
| | var_val = np.percentile(hist_rets, 5)
|
| | df.at[i, metric_map['VaR']] = var_val
|
| |
|
| |
|
| | cvar_vals = hist_rets[hist_rets <= var_val]
|
| | if len(cvar_vals) > 0:
|
| | df.at[i, metric_map['CVaR']] = np.mean(cvar_vals)
|
| |
|
| |
|
| | df.at[i, metric_map['AHPR']] = np.mean(1 + hist_rets)
|
| | if np.all((1 + hist_rets) > 0):
|
| | df.at[i, metric_map['GHPR']] = stats.gmean(1 + hist_rets)
|
| |
|
| | df.at[i, metric_map['Time-weighted return (TWR)']] = np.prod(1 + hist_rets) - 1
|
| |
|
| |
|
| |
|
| | df = df.drop(columns=['Prev_Balance', 'Trade_Return'])
|
| |
|
| | print(f"Saving to {output_file}...")
|
| | df.to_csv(output_file, index=False)
|
| | print("Done.")
|
| |
|
| | except FileNotFoundError:
|
| | print(f"Error: The file '{input_file}' was not found.")
|
| | except Exception as e:
|
| | print(f"An error occurred: {e}")
|
| |
|
| | if __name__ == "__main__":
|
| | calculate_rolling_metrics(
|
| | 'merged_extracted_orders_and_deals.csv',
|
| | '7_layer_output.csv'
|
| | ) |