""" Backtest engine for Nifty Time-Based Short Straddle / Strangle strategy. Refactored from tbs_straddle_backtest_multi.py for FastAPI backend. All matplotlib/IPython dependencies removed — pure computation only. """ import pandas as pd import numpy as np import os import re import gc # ─────────────────────────── CONFIG ─────────────────────────── OPEN_TIME = "09:30" ENTRY_TIME = "09:30" EXIT_TIME = "15:30" FRICTION = 0.001 # 0.1 % per leg on entry + exit premium MONEYNESS = { 'ATM': {'ce_off': 0, 'pe_off': 0}, 'OTM1': {'ce_off': +50, 'pe_off': -50}, 'OTM2': {'ce_off': +100, 'pe_off': -100}, 'ITM1': {'ce_off': -50, 'pe_off': +50}, 'ITM2': {'ce_off': -100, 'pe_off': +100}, } SL_LEVELS = list(range(10, 101, 10)) + [150, 200, 250, 300] SL_LABELS = [f"{s}%" for s in SL_LEVELS] ANNUALISE = np.sqrt(52) # weekly expiries # ─────────────────────────── ENGINE ─────────────────────────── class BacktestEngine: """Loads the parquet once, builds numpy panel, exposes backtest methods.""" def __init__(self, data_path: str): chain = pd.read_parquet(data_path) chain['date_str'] = chain['date'].astype(str) chain['hhmm'] = chain['hhmm'].astype(str).str.strip() self.data_info = { 'rows': int(chain.shape[0]), 'cols': int(chain.shape[1]), 'date_min': str(chain['date_str'].min()), 'date_max': str(chain['date_str'].max()), 'expiry_days': int(chain.loc[chain['dte'] == 0, 'date_str'].nunique()), } self._build_panels(chain) del chain gc.collect() # ── panel construction ──────────────────────────────────── def _build_panels(self, chain): d0 = chain[chain['dte'] == 0].copy() # opening straddle series self.open_strad = ( d0[d0['hhmm'] == OPEN_TIME] .dropna(subset=['straddle']) .assign(dt=lambda x: pd.to_datetime(x['date_str'])) .sort_values('dt')[['dt', 'date_str', 'straddle', 'atm']] .reset_index(drop=True) ) self.open_strad['year'] = self.open_strad['dt'].dt.year # per-day panels work = chain[chain['dte'] == 0].copy() dates_with_open = work[work['hhmm'] == OPEN_TIME]['date_str'].unique() work = work[work['date_str'].isin(dates_with_open)].copy() work = work.dropna(subset=['atm', 'straddle']) work = work[(work['hhmm'] >= '09:15') & (work['hhmm'] <= '15:30')].copy() time_range = pd.date_range('09:15', '15:30', freq='min').strftime('%H:%M').tolist() grouped = {} for date_str, g in work.groupby('date_str'): rg = g.set_index('hhmm').reindex(time_range) fcols = [c for c in rg.columns if c.startswith(('ce_', 'pe_')) or c in ('atm', 'straddle')] rg[fcols] = rg[fcols].ffill() if not rg.empty: grouped[date_str] = rg self.PANEL = {} for ds, df in grouped.items(): idx = {t: i for i, t in enumerate(df.index)} cols = {c: df[c].to_numpy(dtype='float32') for c in df.columns if c.startswith(('ce_', 'pe_')) or c == 'atm'} so = df.loc[OPEN_TIME, 'straddle'] if OPEN_TIME in idx else np.nan self.PANEL[ds] = { 'idx': idx, 'atm': cols['atm'], 'cols': cols, 'open': float(so) if pd.notna(so) else np.nan, } self._COLNAME = {} # ── helpers ─────────────────────────────────────────────── def _col_name(self, opt_type: str, delta: int) -> str: k = (opt_type, delta) if k not in self._COLNAME: if delta == 0: self._COLNAME[k] = f'{opt_type}_atm' elif delta > 0: self._COLNAME[k] = f'{opt_type}_p{delta}' else: self._COLNAME[k] = f'{opt_type}_m{abs(delta)}' return self._COLNAME[k] @staticmethod def _gen_entries(start, end, step=15): out, t = [], pd.to_datetime(start, format='%H:%M') e = pd.to_datetime(end, format='%H:%M') while t <= e: out.append(t.strftime('%H:%M')) t += pd.Timedelta(minutes=step) return out # ── core backtest (exact replica of original logic) ─────── def _run_backtest(self, sl_pct, moneyness='ATM', min_straddle=0.0, entry_time='09:30', exit_time='15:30'): ce_off = MONEYNESS[moneyness]['ce_off'] pe_off = MONEYNESS[moneyness]['pe_off'] ce_entry_col = self._col_name('ce', ce_off) pe_entry_col = self._col_name('pe', pe_off) all_trades, skipped = [], 0 for date_str, p in self.PANEL.items(): idx = p['idx'] if entry_time not in idx or exit_time not in idx: continue op = p['open'] if np.isnan(op) or op < min_straddle: skipped += 1 continue ei, xi = idx[entry_time], idx[exit_time] atm_arr, cols = p['atm'], p['cols'] ce_arr = cols.get(ce_entry_col) pe_arr = cols.get(pe_entry_col) if ce_arr is None or pe_arr is None: continue eav, ece, epe = atm_arr[ei], ce_arr[ei], pe_arr[ei] if np.isnan(eav) or np.isnan(ece) or np.isnan(epe): continue entry_atm = int(eav) ce_strike = entry_atm + ce_off pe_strike = entry_atm + pe_off ce_sl_t = ece * (1 + sl_pct) pe_sl_t = epe * (1 + sl_pct) ce_active = pe_active = True ce_exit = ece; pe_exit = epe last_ce = ece; last_pe = epe ce_reason = pe_reason = f'{exit_time} Sqoff' for i in range(ei + 1, xi + 1): if not ce_active and not pe_active: break cav = atm_arr[i] if np.isnan(cav): continue curr_atm = int(cav) if ce_active: delta = int(round((ce_strike - curr_atm) / 50.0) * 50) arr = cols.get(self._col_name('ce', delta)) if arr is not None: v = arr[i] if not np.isnan(v): last_ce = v if v >= ce_sl_t: ce_active = False; ce_exit = v; ce_reason = 'SL Hit' if pe_active: delta = int(round((pe_strike - curr_atm) / 50.0) * 50) arr = cols.get(self._col_name('pe', delta)) if arr is not None: v = arr[i] if not np.isnan(v): last_pe = v if v >= pe_sl_t: pe_active = False; pe_exit = v; pe_reason = 'SL Hit' if ce_active: ce_exit = last_ce if pe_active: pe_exit = last_pe ce_cost = FRICTION * (ece + ce_exit) pe_cost = FRICTION * (epe + pe_exit) ce_pnl = (ece - ce_exit) - ce_cost pe_pnl = (epe - pe_exit) - pe_cost all_trades.append({ 'Date': date_str, 'Config': moneyness, 'Open_Straddle': round(op, 2), 'Entry_ATM': entry_atm, 'CE_Strike': ce_strike, 'PE_Strike': pe_strike, 'Entry_CE': round(ece, 2), 'Entry_PE': round(epe, 2), 'Exit_CE': round(ce_exit, 2), 'Exit_PE': round(pe_exit, 2), 'CE_Exit_Reason': ce_reason, 'PE_Exit_Reason': pe_reason, 'CE_PnL_Net': round(ce_pnl, 2), 'PE_PnL_Net': round(pe_pnl, 2), 'Day_PnL': round(ce_pnl + pe_pnl, 2), }) return pd.DataFrame(all_trades), skipped # ── metrics (exact replica) ─────────────────────────────── def _calc_metrics(self, trades_df, sl_label, config_key=None): total = len(trades_df) base = { 'SL Level': sl_label, 'Trades': total, 'Win Rate': 0, 'Implied RR': 0, 'Avg Winner': 0, 'Avg Loser': 0, 'Realized RR': 0, 'RR Efficiency': 0, 'Sharpe': 0, 'Profit Factor': 0, 'Max DD': 0, 'Total PnL': 0, } if config_key is not None: base = {'Config Key': config_key, **base} if total == 0: return base pnl = trades_df['Day_PnL'].values winners = pnl[pnl > 0] losers = pnl[pnl < 0] win_rate = len(winners) / total * 100 avg_w = float(winners.mean()) if len(winners) else 0 avg_l = float(losers.mean()) if len(losers) else 0 realized_rr = avg_w / abs(avg_l) if avg_l != 0 else 0 sl_num = float(sl_label.replace('%', '')) / 100.0 implied_rr = 1.0 / sl_num if sl_num > 0 else 0 rr_eff = realized_rr / implied_rr if implied_rr > 0 else 0 sharpe = float(pnl.mean() / pnl.std() * ANNUALISE) if pnl.std() > 0 else 0 cum = np.cumsum(pnl) peak = np.maximum.accumulate(cum) max_dd = float((cum - peak).min()) gw = float(winners.sum()) if len(winners) else 0 gl = abs(float(losers.sum())) if len(losers) else 0.01 base.update({ 'Win Rate': round(win_rate, 1), 'Implied RR': round(implied_rr, 2), 'Avg Winner': round(avg_w, 2), 'Avg Loser': round(avg_l, 2), 'Realized RR': round(realized_rr, 2), 'RR Efficiency': round(rr_eff, 2), 'Sharpe': round(sharpe, 2), 'Profit Factor': round(gw / gl, 2), 'Max DD': round(max_dd, 2), 'Total PnL': round(float(pnl.sum()), 2), }) return base # ═══════════════════ PUBLIC API ═════════════════════════ def get_opening_straddle(self): """Return opening-straddle trend data for the chart.""" os_data = self.open_strad.copy() rolling_med = os_data['straddle'].rolling(8, min_periods=1).median() yearly = (os_data.groupby('year')['straddle'] .agg(['count', 'min', 'median', 'mean', 'max']).round(1)) yearly_stats = {} for yr, row in yearly.iterrows(): yrg = os_data[os_data['year'] == yr] yearly_stats[int(yr)] = { 'count': int(row['count']), 'min': float(row['min']), 'median': float(row['median']), 'mean': float(row['mean']), 'max': float(row['max']), } return { 'dates': os_data['dt'].dt.strftime('%Y-%m-%d').tolist(), 'straddles': [round(v, 2) for v in os_data['straddle'].tolist()], 'rolling_median': [round(v, 2) for v in rolling_med.tolist()], 'yearly_stats': yearly_stats, 'total_days': len(os_data), } def run_single_entry(self, moneyness='ATM', min_straddle=0.0): """Framing 1 — single 09:30 → 15:30 entry across all SL levels.""" all_results, skipped = {}, 0 for sl in SL_LEVELS: trades, sk = self._run_backtest( sl / 100.0, moneyness, min_straddle, entry_time=ENTRY_TIME, exit_time=EXIT_TIME) all_results[f"{sl}%"] = trades skipped = sk # leaderboard leaderboard = [self._calc_metrics(t, lab) for lab, t in all_results.items()] leaderboard.sort(key=lambda x: x['Sharpe'], reverse=True) # PnL by SL sl_order = sorted(all_results.keys(), key=lambda x: float(x.replace('%', ''))) pnl_by_sl = {k: round(float(all_results[k]['Day_PnL'].sum()), 2) if len(all_results[k]) > 0 else 0 for k in sl_order} # RR data rr_data = {'sl_levels': sl_order, 'realized': [], 'implied': []} metric_map = {m['SL Level']: m for m in leaderboard} for sl in sl_order: m = metric_map.get(sl, {}) rr_data['realized'].append(m.get('Realized RR', 0)) rr_data['implied'].append(m.get('Implied RR', 0)) # yearly PnL yearly_pnl = {} for sl_label, t in all_results.items(): if len(t) == 0: continue tt = t.copy() tt['Year'] = pd.to_datetime(tt['Date']).dt.year for yr, ydf in tt.groupby('Year'): yr = int(yr) if yr not in yearly_pnl: yearly_pnl[yr] = {} yearly_pnl[yr][sl_label] = round(float(ydf['Day_PnL'].sum()), 2) # quarterly PnL quarterly_pnl = {} for sl_label, t in all_results.items(): if len(t) == 0: continue tt = t.copy() tt['Date'] = pd.to_datetime(tt['Date']) tt['Year'] = tt['Date'].dt.year tt['Quarter'] = tt['Date'].dt.quarter for (yr, q), qdf in tt.groupby(['Year', 'Quarter']): key = f"{int(yr)}-Q{int(q)}" if key not in quarterly_pnl: quarterly_pnl[key] = {} quarterly_pnl[key][sl_label] = round(float(qdf['Day_PnL'].sum()), 2) # equity curve (best Sharpe SL) best_sl = leaderboard[0]['SL Level'] if leaderboard else '30%' best_trades = all_results.get(best_sl, pd.DataFrame()) equity_curve = [] if len(best_trades) > 0: cum_pnl = best_trades['Day_PnL'].cumsum() equity_curve = [ {'date': d, 'pnl': round(float(p), 2)} for d, p in zip(best_trades['Date'], cum_pnl) ] # trade log for best SL (limited) trade_log = [] if len(best_trades) > 0: for _, row in best_trades.iterrows(): trade_log.append({ 'Date': row['Date'], 'Entry_ATM': int(row['Entry_ATM']), 'CE_Strike': int(row['CE_Strike']), 'PE_Strike': int(row['PE_Strike']), 'Entry_CE': float(row['Entry_CE']), 'Entry_PE': float(row['Entry_PE']), 'Exit_CE': float(row['Exit_CE']), 'Exit_PE': float(row['Exit_PE']), 'CE_Exit_Reason': row['CE_Exit_Reason'], 'PE_Exit_Reason': row['PE_Exit_Reason'], 'Day_PnL': float(row['Day_PnL']), }) cfg = MONEYNESS[moneyness] eligible = int((self.open_strad['straddle'] >= min_straddle).sum()) return { 'config': { 'moneyness': moneyness, 'ce_off': cfg['ce_off'], 'pe_off': cfg['pe_off'], 'min_straddle': min_straddle, 'eligible_days': eligible, 'total_days': len(self.open_strad), 'skipped_days': skipped, }, 'leaderboard': leaderboard, 'pnl_by_sl': pnl_by_sl, 'rr_data': rr_data, 'yearly_pnl': yearly_pnl, 'quarterly_pnl': quarterly_pnl, 'equity_curve': equity_curve, 'best_sl': best_sl, 'trade_log': trade_log, } def run_basket_backtest(self, moneyness='ATM', min_straddle=0.0, basket_type='15min'): """Framing 2 (15-min) or Framing 3 (2-hour) basket analysis.""" if basket_type == '15min': time_configs = [] for e in self._gen_entries('09:30', '11:15'): time_configs.append({'entry': e, 'exit': '15:30', 'basket': 'Morning'}) for e in self._gen_entries('11:30', '13:15'): time_configs.append({'entry': e, 'exit': '15:30', 'basket': 'Mid-day'}) for e in self._gen_entries('13:30', '15:15'): time_configs.append({'entry': e, 'exit': '15:30', 'basket': 'Afternoon'}) else: time_configs = [] for e in ['09:16', '09:20', '09:25', '09:30']: time_configs.append({'entry': e, 'exit': '11:30', 'basket': 'Morning'}) for e in ['11:16', '11:20', '11:25', '11:30']: time_configs.append({'entry': e, 'exit': '13:30', 'basket': 'Mid-day'}) for e in ['13:16', '13:20', '13:25', '13:30']: time_configs.append({'entry': e, 'exit': '15:30', 'basket': 'Afternoon'}) # run every config × SL all_flex = {} for c in time_configs: key = f"Basket: {c['basket']}, Entry: {c['entry']}, Exit: {c['exit']}" all_flex[key] = {} for sl in SL_LEVELS: trades, _ = self._run_backtest( sl / 100.0, moneyness=moneyness, min_straddle=min_straddle, entry_time=c['entry'], exit_time=c['exit']) all_flex[key][f"{sl}%"] = trades # master leaderboard rows = [] for cfg_key, d in all_flex.items(): for sl, t in d.items(): rows.append(self._calc_metrics(t, sl, cfg_key)) master_df = pd.DataFrame(rows) master_df[['Basket', 'Entry Time', 'Exit Time']] = ( master_df['Config Key'].str.extract( r'Basket: ([\w-]+), Entry: (\d{2}:\d{2}), Exit: (\d{2}:\d{2})')) # top 10 by Sharpe top10 = (master_df.sort_values('Sharpe', ascending=False) .head(10).drop(columns='Config Key').to_dict('records')) # avg PnL by basket & SL avg_pnl = (master_df.groupby(['Basket', 'SL Level'])['Total PnL'] .mean().reset_index()) basket_sl_pnl = {} for basket in ['Morning', 'Mid-day', 'Afternoon']: bdata = avg_pnl[avg_pnl['Basket'] == basket] sl_order = sorted(bdata['SL Level'].unique(), key=lambda x: float(x.replace('%', ''))) basket_sl_pnl[basket] = { row['SL Level']: round(float(row['Total PnL']), 2) for _, row in bdata.iterrows() } # RR by basket sl_order_all = [f"{s}%" for s in sorted(SL_LEVELS)] rr_by_basket = {} for basket in ['Morning', 'Mid-day', 'Afternoon']: bdata = master_df[master_df['Basket'] == basket] avg_rr = (bdata.groupby('SL Level') .agg(realized=('Realized RR', 'mean'), implied=('Implied RR', 'mean')) .reindex(sl_order_all)) rr_by_basket[basket] = { 'sl_levels': sl_order_all, 'realized': [round(float(v), 2) if not np.isnan(v) else 0 for v in avg_rr['realized'].values], 'implied': [round(float(v), 2) if not np.isnan(v) else 0 for v in avg_rr['implied'].values], } # yearly basket PnL yearly_basket = {} for key, d in all_flex.items(): bm = re.search(r'Basket: ([\w-]+)', key) basket = bm.group(1) if bm else 'Unknown' for sl, t in d.items(): if t.empty: continue tt = t.copy() tt['Year'] = pd.to_datetime(tt['Date']).dt.year for yr, pnl_val in tt.groupby('Year')['Day_PnL'].sum().items(): yr = int(yr) yearly_basket.setdefault(yr, {}).setdefault(basket, {}).setdefault(sl, []).append(float(pnl_val)) # average the lists for yr in yearly_basket: for basket in yearly_basket[yr]: for sl in yearly_basket[yr][basket]: vals = yearly_basket[yr][basket][sl] yearly_basket[yr][basket][sl] = round(sum(vals) / len(vals), 2) return { 'type': basket_type, 'total_configs': len(time_configs), 'total_backtests': len(time_configs) * len(SL_LEVELS), 'top10': top10, 'avg_pnl_by_basket_sl': basket_sl_pnl, 'rr_by_basket': rr_by_basket, 'yearly_basket_pnl': yearly_basket, }