Spaces:
Sleeping
Sleeping
| """ | |
| Backtest engine for Nifty Time-Based Short Straddle / Strangle strategy. | |
| Refactored from tbs_straddle_backtest_multi.py for FastAPI backend. | |
| All matplotlib/IPython dependencies removed — pure computation only. | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import re | |
| import gc | |
| # ─────────────────────────── CONFIG ─────────────────────────── | |
| OPEN_TIME = "09:30" | |
| ENTRY_TIME = "09:30" | |
| EXIT_TIME = "15:30" | |
| FRICTION = 0.001 # 0.1 % per leg on entry + exit premium | |
| MONEYNESS = { | |
| 'ATM': {'ce_off': 0, 'pe_off': 0}, | |
| 'OTM1': {'ce_off': +50, 'pe_off': -50}, | |
| 'OTM2': {'ce_off': +100, 'pe_off': -100}, | |
| 'ITM1': {'ce_off': -50, 'pe_off': +50}, | |
| 'ITM2': {'ce_off': -100, 'pe_off': +100}, | |
| } | |
| SL_LEVELS = list(range(10, 101, 10)) + [150, 200, 250, 300] | |
| SL_LABELS = [f"{s}%" for s in SL_LEVELS] | |
| ANNUALISE = np.sqrt(52) # weekly expiries | |
| # ─────────────────────────── ENGINE ─────────────────────────── | |
| class BacktestEngine: | |
| """Loads the parquet once, builds numpy panel, exposes backtest methods.""" | |
| def __init__(self, data_path: str): | |
| chain = pd.read_parquet(data_path) | |
| chain['date_str'] = chain['date'].astype(str) | |
| chain['hhmm'] = chain['hhmm'].astype(str).str.strip() | |
| self.data_info = { | |
| 'rows': int(chain.shape[0]), | |
| 'cols': int(chain.shape[1]), | |
| 'date_min': str(chain['date_str'].min()), | |
| 'date_max': str(chain['date_str'].max()), | |
| 'expiry_days': int(chain.loc[chain['dte'] == 0, 'date_str'].nunique()), | |
| } | |
| self._build_panels(chain) | |
| del chain | |
| gc.collect() | |
| # ── panel construction ──────────────────────────────────── | |
| def _build_panels(self, chain): | |
| d0 = chain[chain['dte'] == 0].copy() | |
| # opening straddle series | |
| self.open_strad = ( | |
| d0[d0['hhmm'] == OPEN_TIME] | |
| .dropna(subset=['straddle']) | |
| .assign(dt=lambda x: pd.to_datetime(x['date_str'])) | |
| .sort_values('dt')[['dt', 'date_str', 'straddle', 'atm']] | |
| .reset_index(drop=True) | |
| ) | |
| self.open_strad['year'] = self.open_strad['dt'].dt.year | |
| # per-day panels | |
| work = chain[chain['dte'] == 0].copy() | |
| dates_with_open = work[work['hhmm'] == OPEN_TIME]['date_str'].unique() | |
| work = work[work['date_str'].isin(dates_with_open)].copy() | |
| work = work.dropna(subset=['atm', 'straddle']) | |
| work = work[(work['hhmm'] >= '09:15') & (work['hhmm'] <= '15:30')].copy() | |
| time_range = pd.date_range('09:15', '15:30', freq='min').strftime('%H:%M').tolist() | |
| grouped = {} | |
| for date_str, g in work.groupby('date_str'): | |
| rg = g.set_index('hhmm').reindex(time_range) | |
| fcols = [c for c in rg.columns | |
| if c.startswith(('ce_', 'pe_')) or c in ('atm', 'straddle')] | |
| rg[fcols] = rg[fcols].ffill() | |
| if not rg.empty: | |
| grouped[date_str] = rg | |
| self.PANEL = {} | |
| for ds, df in grouped.items(): | |
| idx = {t: i for i, t in enumerate(df.index)} | |
| cols = {c: df[c].to_numpy(dtype='float32') | |
| for c in df.columns | |
| if c.startswith(('ce_', 'pe_')) or c == 'atm'} | |
| so = df.loc[OPEN_TIME, 'straddle'] if OPEN_TIME in idx else np.nan | |
| self.PANEL[ds] = { | |
| 'idx': idx, | |
| 'atm': cols['atm'], | |
| 'cols': cols, | |
| 'open': float(so) if pd.notna(so) else np.nan, | |
| } | |
| self._COLNAME = {} | |
| # ── helpers ─────────────────────────────────────────────── | |
| def _col_name(self, opt_type: str, delta: int) -> str: | |
| k = (opt_type, delta) | |
| if k not in self._COLNAME: | |
| if delta == 0: | |
| self._COLNAME[k] = f'{opt_type}_atm' | |
| elif delta > 0: | |
| self._COLNAME[k] = f'{opt_type}_p{delta}' | |
| else: | |
| self._COLNAME[k] = f'{opt_type}_m{abs(delta)}' | |
| return self._COLNAME[k] | |
| def _gen_entries(start, end, step=15): | |
| out, t = [], pd.to_datetime(start, format='%H:%M') | |
| e = pd.to_datetime(end, format='%H:%M') | |
| while t <= e: | |
| out.append(t.strftime('%H:%M')) | |
| t += pd.Timedelta(minutes=step) | |
| return out | |
| # ── core backtest (exact replica of original logic) ─────── | |
| def _run_backtest(self, sl_pct, moneyness='ATM', min_straddle=0.0, | |
| entry_time='09:30', exit_time='15:30'): | |
| ce_off = MONEYNESS[moneyness]['ce_off'] | |
| pe_off = MONEYNESS[moneyness]['pe_off'] | |
| ce_entry_col = self._col_name('ce', ce_off) | |
| pe_entry_col = self._col_name('pe', pe_off) | |
| all_trades, skipped = [], 0 | |
| for date_str, p in self.PANEL.items(): | |
| idx = p['idx'] | |
| if entry_time not in idx or exit_time not in idx: | |
| continue | |
| op = p['open'] | |
| if np.isnan(op) or op < min_straddle: | |
| skipped += 1 | |
| continue | |
| ei, xi = idx[entry_time], idx[exit_time] | |
| atm_arr, cols = p['atm'], p['cols'] | |
| ce_arr = cols.get(ce_entry_col) | |
| pe_arr = cols.get(pe_entry_col) | |
| if ce_arr is None or pe_arr is None: | |
| continue | |
| eav, ece, epe = atm_arr[ei], ce_arr[ei], pe_arr[ei] | |
| if np.isnan(eav) or np.isnan(ece) or np.isnan(epe): | |
| continue | |
| entry_atm = int(eav) | |
| ce_strike = entry_atm + ce_off | |
| pe_strike = entry_atm + pe_off | |
| ce_sl_t = ece * (1 + sl_pct) | |
| pe_sl_t = epe * (1 + sl_pct) | |
| ce_active = pe_active = True | |
| ce_exit = ece; pe_exit = epe | |
| last_ce = ece; last_pe = epe | |
| ce_reason = pe_reason = f'{exit_time} Sqoff' | |
| for i in range(ei + 1, xi + 1): | |
| if not ce_active and not pe_active: | |
| break | |
| cav = atm_arr[i] | |
| if np.isnan(cav): | |
| continue | |
| curr_atm = int(cav) | |
| if ce_active: | |
| delta = int(round((ce_strike - curr_atm) / 50.0) * 50) | |
| arr = cols.get(self._col_name('ce', delta)) | |
| if arr is not None: | |
| v = arr[i] | |
| if not np.isnan(v): | |
| last_ce = v | |
| if v >= ce_sl_t: | |
| ce_active = False; ce_exit = v; ce_reason = 'SL Hit' | |
| if pe_active: | |
| delta = int(round((pe_strike - curr_atm) / 50.0) * 50) | |
| arr = cols.get(self._col_name('pe', delta)) | |
| if arr is not None: | |
| v = arr[i] | |
| if not np.isnan(v): | |
| last_pe = v | |
| if v >= pe_sl_t: | |
| pe_active = False; pe_exit = v; pe_reason = 'SL Hit' | |
| if ce_active: | |
| ce_exit = last_ce | |
| if pe_active: | |
| pe_exit = last_pe | |
| ce_cost = FRICTION * (ece + ce_exit) | |
| pe_cost = FRICTION * (epe + pe_exit) | |
| ce_pnl = (ece - ce_exit) - ce_cost | |
| pe_pnl = (epe - pe_exit) - pe_cost | |
| all_trades.append({ | |
| 'Date': date_str, 'Config': moneyness, | |
| 'Open_Straddle': round(op, 2), | |
| 'Entry_ATM': entry_atm, | |
| 'CE_Strike': ce_strike, 'PE_Strike': pe_strike, | |
| 'Entry_CE': round(ece, 2), 'Entry_PE': round(epe, 2), | |
| 'Exit_CE': round(ce_exit, 2), 'Exit_PE': round(pe_exit, 2), | |
| 'CE_Exit_Reason': ce_reason, 'PE_Exit_Reason': pe_reason, | |
| 'CE_PnL_Net': round(ce_pnl, 2), 'PE_PnL_Net': round(pe_pnl, 2), | |
| 'Day_PnL': round(ce_pnl + pe_pnl, 2), | |
| }) | |
| return pd.DataFrame(all_trades), skipped | |
| # ── metrics (exact replica) ─────────────────────────────── | |
| def _calc_metrics(self, trades_df, sl_label, config_key=None): | |
| total = len(trades_df) | |
| base = { | |
| 'SL Level': sl_label, 'Trades': total, 'Win Rate': 0, | |
| 'Implied RR': 0, 'Avg Winner': 0, 'Avg Loser': 0, | |
| 'Realized RR': 0, 'RR Efficiency': 0, 'Sharpe': 0, | |
| 'Profit Factor': 0, 'Max DD': 0, 'Total PnL': 0, | |
| } | |
| if config_key is not None: | |
| base = {'Config Key': config_key, **base} | |
| if total == 0: | |
| return base | |
| pnl = trades_df['Day_PnL'].values | |
| winners = pnl[pnl > 0] | |
| losers = pnl[pnl < 0] | |
| win_rate = len(winners) / total * 100 | |
| avg_w = float(winners.mean()) if len(winners) else 0 | |
| avg_l = float(losers.mean()) if len(losers) else 0 | |
| realized_rr = avg_w / abs(avg_l) if avg_l != 0 else 0 | |
| sl_num = float(sl_label.replace('%', '')) / 100.0 | |
| implied_rr = 1.0 / sl_num if sl_num > 0 else 0 | |
| rr_eff = realized_rr / implied_rr if implied_rr > 0 else 0 | |
| sharpe = float(pnl.mean() / pnl.std() * ANNUALISE) if pnl.std() > 0 else 0 | |
| cum = np.cumsum(pnl) | |
| peak = np.maximum.accumulate(cum) | |
| max_dd = float((cum - peak).min()) | |
| gw = float(winners.sum()) if len(winners) else 0 | |
| gl = abs(float(losers.sum())) if len(losers) else 0.01 | |
| base.update({ | |
| 'Win Rate': round(win_rate, 1), | |
| 'Implied RR': round(implied_rr, 2), | |
| 'Avg Winner': round(avg_w, 2), | |
| 'Avg Loser': round(avg_l, 2), | |
| 'Realized RR': round(realized_rr, 2), | |
| 'RR Efficiency': round(rr_eff, 2), | |
| 'Sharpe': round(sharpe, 2), | |
| 'Profit Factor': round(gw / gl, 2), | |
| 'Max DD': round(max_dd, 2), | |
| 'Total PnL': round(float(pnl.sum()), 2), | |
| }) | |
| return base | |
| # ═══════════════════ PUBLIC API ═════════════════════════ | |
| def get_opening_straddle(self): | |
| """Return opening-straddle trend data for the chart.""" | |
| os_data = self.open_strad.copy() | |
| rolling_med = os_data['straddle'].rolling(8, min_periods=1).median() | |
| yearly = (os_data.groupby('year')['straddle'] | |
| .agg(['count', 'min', 'median', 'mean', 'max']).round(1)) | |
| yearly_stats = {} | |
| for yr, row in yearly.iterrows(): | |
| yrg = os_data[os_data['year'] == yr] | |
| yearly_stats[int(yr)] = { | |
| 'count': int(row['count']), | |
| 'min': float(row['min']), | |
| 'median': float(row['median']), | |
| 'mean': float(row['mean']), | |
| 'max': float(row['max']), | |
| } | |
| return { | |
| 'dates': os_data['dt'].dt.strftime('%Y-%m-%d').tolist(), | |
| 'straddles': [round(v, 2) for v in os_data['straddle'].tolist()], | |
| 'rolling_median': [round(v, 2) for v in rolling_med.tolist()], | |
| 'yearly_stats': yearly_stats, | |
| 'total_days': len(os_data), | |
| } | |
| def run_single_entry(self, moneyness='ATM', min_straddle=0.0): | |
| """Framing 1 — single 09:30 → 15:30 entry across all SL levels.""" | |
| all_results, skipped = {}, 0 | |
| for sl in SL_LEVELS: | |
| trades, sk = self._run_backtest( | |
| sl / 100.0, moneyness, min_straddle, | |
| entry_time=ENTRY_TIME, exit_time=EXIT_TIME) | |
| all_results[f"{sl}%"] = trades | |
| skipped = sk | |
| # leaderboard | |
| leaderboard = [self._calc_metrics(t, lab) for lab, t in all_results.items()] | |
| leaderboard.sort(key=lambda x: x['Sharpe'], reverse=True) | |
| # PnL by SL | |
| sl_order = sorted(all_results.keys(), | |
| key=lambda x: float(x.replace('%', ''))) | |
| pnl_by_sl = {k: round(float(all_results[k]['Day_PnL'].sum()), 2) | |
| if len(all_results[k]) > 0 else 0 | |
| for k in sl_order} | |
| # RR data | |
| rr_data = {'sl_levels': sl_order, 'realized': [], 'implied': []} | |
| metric_map = {m['SL Level']: m for m in leaderboard} | |
| for sl in sl_order: | |
| m = metric_map.get(sl, {}) | |
| rr_data['realized'].append(m.get('Realized RR', 0)) | |
| rr_data['implied'].append(m.get('Implied RR', 0)) | |
| # yearly PnL | |
| yearly_pnl = {} | |
| for sl_label, t in all_results.items(): | |
| if len(t) == 0: | |
| continue | |
| tt = t.copy() | |
| tt['Year'] = pd.to_datetime(tt['Date']).dt.year | |
| for yr, ydf in tt.groupby('Year'): | |
| yr = int(yr) | |
| if yr not in yearly_pnl: | |
| yearly_pnl[yr] = {} | |
| yearly_pnl[yr][sl_label] = round(float(ydf['Day_PnL'].sum()), 2) | |
| # quarterly PnL | |
| quarterly_pnl = {} | |
| for sl_label, t in all_results.items(): | |
| if len(t) == 0: | |
| continue | |
| tt = t.copy() | |
| tt['Date'] = pd.to_datetime(tt['Date']) | |
| tt['Year'] = tt['Date'].dt.year | |
| tt['Quarter'] = tt['Date'].dt.quarter | |
| for (yr, q), qdf in tt.groupby(['Year', 'Quarter']): | |
| key = f"{int(yr)}-Q{int(q)}" | |
| if key not in quarterly_pnl: | |
| quarterly_pnl[key] = {} | |
| quarterly_pnl[key][sl_label] = round(float(qdf['Day_PnL'].sum()), 2) | |
| # equity curve (best Sharpe SL) | |
| best_sl = leaderboard[0]['SL Level'] if leaderboard else '30%' | |
| best_trades = all_results.get(best_sl, pd.DataFrame()) | |
| equity_curve = [] | |
| if len(best_trades) > 0: | |
| cum_pnl = best_trades['Day_PnL'].cumsum() | |
| equity_curve = [ | |
| {'date': d, 'pnl': round(float(p), 2)} | |
| for d, p in zip(best_trades['Date'], cum_pnl) | |
| ] | |
| # trade log for best SL (limited) | |
| trade_log = [] | |
| if len(best_trades) > 0: | |
| for _, row in best_trades.iterrows(): | |
| trade_log.append({ | |
| 'Date': row['Date'], | |
| 'Entry_ATM': int(row['Entry_ATM']), | |
| 'CE_Strike': int(row['CE_Strike']), | |
| 'PE_Strike': int(row['PE_Strike']), | |
| 'Entry_CE': float(row['Entry_CE']), | |
| 'Entry_PE': float(row['Entry_PE']), | |
| 'Exit_CE': float(row['Exit_CE']), | |
| 'Exit_PE': float(row['Exit_PE']), | |
| 'CE_Exit_Reason': row['CE_Exit_Reason'], | |
| 'PE_Exit_Reason': row['PE_Exit_Reason'], | |
| 'Day_PnL': float(row['Day_PnL']), | |
| }) | |
| cfg = MONEYNESS[moneyness] | |
| eligible = int((self.open_strad['straddle'] >= min_straddle).sum()) | |
| return { | |
| 'config': { | |
| 'moneyness': moneyness, | |
| 'ce_off': cfg['ce_off'], | |
| 'pe_off': cfg['pe_off'], | |
| 'min_straddle': min_straddle, | |
| 'eligible_days': eligible, | |
| 'total_days': len(self.open_strad), | |
| 'skipped_days': skipped, | |
| }, | |
| 'leaderboard': leaderboard, | |
| 'pnl_by_sl': pnl_by_sl, | |
| 'rr_data': rr_data, | |
| 'yearly_pnl': yearly_pnl, | |
| 'quarterly_pnl': quarterly_pnl, | |
| 'equity_curve': equity_curve, | |
| 'best_sl': best_sl, | |
| 'trade_log': trade_log, | |
| } | |
| def run_basket_backtest(self, moneyness='ATM', min_straddle=0.0, | |
| basket_type='15min'): | |
| """Framing 2 (15-min) or Framing 3 (2-hour) basket analysis.""" | |
| if basket_type == '15min': | |
| time_configs = [] | |
| for e in self._gen_entries('09:30', '11:15'): | |
| time_configs.append({'entry': e, 'exit': '15:30', 'basket': 'Morning'}) | |
| for e in self._gen_entries('11:30', '13:15'): | |
| time_configs.append({'entry': e, 'exit': '15:30', 'basket': 'Mid-day'}) | |
| for e in self._gen_entries('13:30', '15:15'): | |
| time_configs.append({'entry': e, 'exit': '15:30', 'basket': 'Afternoon'}) | |
| else: | |
| time_configs = [] | |
| for e in ['09:16', '09:20', '09:25', '09:30']: | |
| time_configs.append({'entry': e, 'exit': '11:30', 'basket': 'Morning'}) | |
| for e in ['11:16', '11:20', '11:25', '11:30']: | |
| time_configs.append({'entry': e, 'exit': '13:30', 'basket': 'Mid-day'}) | |
| for e in ['13:16', '13:20', '13:25', '13:30']: | |
| time_configs.append({'entry': e, 'exit': '15:30', 'basket': 'Afternoon'}) | |
| # run every config × SL | |
| all_flex = {} | |
| for c in time_configs: | |
| key = f"Basket: {c['basket']}, Entry: {c['entry']}, Exit: {c['exit']}" | |
| all_flex[key] = {} | |
| for sl in SL_LEVELS: | |
| trades, _ = self._run_backtest( | |
| sl / 100.0, moneyness=moneyness, | |
| min_straddle=min_straddle, | |
| entry_time=c['entry'], exit_time=c['exit']) | |
| all_flex[key][f"{sl}%"] = trades | |
| # master leaderboard | |
| rows = [] | |
| for cfg_key, d in all_flex.items(): | |
| for sl, t in d.items(): | |
| rows.append(self._calc_metrics(t, sl, cfg_key)) | |
| master_df = pd.DataFrame(rows) | |
| master_df[['Basket', 'Entry Time', 'Exit Time']] = ( | |
| master_df['Config Key'].str.extract( | |
| r'Basket: ([\w-]+), Entry: (\d{2}:\d{2}), Exit: (\d{2}:\d{2})')) | |
| # top 10 by Sharpe | |
| top10 = (master_df.sort_values('Sharpe', ascending=False) | |
| .head(10).drop(columns='Config Key').to_dict('records')) | |
| # avg PnL by basket & SL | |
| avg_pnl = (master_df.groupby(['Basket', 'SL Level'])['Total PnL'] | |
| .mean().reset_index()) | |
| basket_sl_pnl = {} | |
| for basket in ['Morning', 'Mid-day', 'Afternoon']: | |
| bdata = avg_pnl[avg_pnl['Basket'] == basket] | |
| sl_order = sorted(bdata['SL Level'].unique(), | |
| key=lambda x: float(x.replace('%', ''))) | |
| basket_sl_pnl[basket] = { | |
| row['SL Level']: round(float(row['Total PnL']), 2) | |
| for _, row in bdata.iterrows() | |
| } | |
| # RR by basket | |
| sl_order_all = [f"{s}%" for s in sorted(SL_LEVELS)] | |
| rr_by_basket = {} | |
| for basket in ['Morning', 'Mid-day', 'Afternoon']: | |
| bdata = master_df[master_df['Basket'] == basket] | |
| avg_rr = (bdata.groupby('SL Level') | |
| .agg(realized=('Realized RR', 'mean'), | |
| implied=('Implied RR', 'mean')) | |
| .reindex(sl_order_all)) | |
| rr_by_basket[basket] = { | |
| 'sl_levels': sl_order_all, | |
| 'realized': [round(float(v), 2) if not np.isnan(v) else 0 | |
| for v in avg_rr['realized'].values], | |
| 'implied': [round(float(v), 2) if not np.isnan(v) else 0 | |
| for v in avg_rr['implied'].values], | |
| } | |
| # yearly basket PnL | |
| yearly_basket = {} | |
| for key, d in all_flex.items(): | |
| bm = re.search(r'Basket: ([\w-]+)', key) | |
| basket = bm.group(1) if bm else 'Unknown' | |
| for sl, t in d.items(): | |
| if t.empty: | |
| continue | |
| tt = t.copy() | |
| tt['Year'] = pd.to_datetime(tt['Date']).dt.year | |
| for yr, pnl_val in tt.groupby('Year')['Day_PnL'].sum().items(): | |
| yr = int(yr) | |
| yearly_basket.setdefault(yr, {}).setdefault(basket, {}).setdefault(sl, []).append(float(pnl_val)) | |
| # average the lists | |
| for yr in yearly_basket: | |
| for basket in yearly_basket[yr]: | |
| for sl in yearly_basket[yr][basket]: | |
| vals = yearly_basket[yr][basket][sl] | |
| yearly_basket[yr][basket][sl] = round(sum(vals) / len(vals), 2) | |
| return { | |
| 'type': basket_type, | |
| 'total_configs': len(time_configs), | |
| 'total_backtests': len(time_configs) * len(SL_LEVELS), | |
| 'top10': top10, | |
| 'avg_pnl_by_basket_sl': basket_sl_pnl, | |
| 'rr_by_basket': rr_by_basket, | |
| 'yearly_basket_pnl': yearly_basket, | |
| } | |