tbs-dashboard / engine.py
iwilldoit's picture
Upload folder using huggingface_hub
9df26df verified
Raw
History Blame Contribute Delete
21.2 kB
"""
Backtest engine for Nifty Time-Based Short Straddle / Strangle strategy.
Refactored from tbs_straddle_backtest_multi.py for FastAPI backend.
All matplotlib/IPython dependencies removed — pure computation only.
"""
import pandas as pd
import numpy as np
import os
import re
import gc
# ─────────────────────────── CONFIG ───────────────────────────
OPEN_TIME = "09:30"
ENTRY_TIME = "09:30"
EXIT_TIME = "15:30"
FRICTION = 0.001 # 0.1 % per leg on entry + exit premium
MONEYNESS = {
'ATM': {'ce_off': 0, 'pe_off': 0},
'OTM1': {'ce_off': +50, 'pe_off': -50},
'OTM2': {'ce_off': +100, 'pe_off': -100},
'ITM1': {'ce_off': -50, 'pe_off': +50},
'ITM2': {'ce_off': -100, 'pe_off': +100},
}
SL_LEVELS = list(range(10, 101, 10)) + [150, 200, 250, 300]
SL_LABELS = [f"{s}%" for s in SL_LEVELS]
ANNUALISE = np.sqrt(52) # weekly expiries
# ─────────────────────────── ENGINE ───────────────────────────
class BacktestEngine:
"""Loads the parquet once, builds numpy panel, exposes backtest methods."""
def __init__(self, data_path: str):
chain = pd.read_parquet(data_path)
chain['date_str'] = chain['date'].astype(str)
chain['hhmm'] = chain['hhmm'].astype(str).str.strip()
self.data_info = {
'rows': int(chain.shape[0]),
'cols': int(chain.shape[1]),
'date_min': str(chain['date_str'].min()),
'date_max': str(chain['date_str'].max()),
'expiry_days': int(chain.loc[chain['dte'] == 0, 'date_str'].nunique()),
}
self._build_panels(chain)
del chain
gc.collect()
# ── panel construction ────────────────────────────────────
def _build_panels(self, chain):
d0 = chain[chain['dte'] == 0].copy()
# opening straddle series
self.open_strad = (
d0[d0['hhmm'] == OPEN_TIME]
.dropna(subset=['straddle'])
.assign(dt=lambda x: pd.to_datetime(x['date_str']))
.sort_values('dt')[['dt', 'date_str', 'straddle', 'atm']]
.reset_index(drop=True)
)
self.open_strad['year'] = self.open_strad['dt'].dt.year
# per-day panels
work = chain[chain['dte'] == 0].copy()
dates_with_open = work[work['hhmm'] == OPEN_TIME]['date_str'].unique()
work = work[work['date_str'].isin(dates_with_open)].copy()
work = work.dropna(subset=['atm', 'straddle'])
work = work[(work['hhmm'] >= '09:15') & (work['hhmm'] <= '15:30')].copy()
time_range = pd.date_range('09:15', '15:30', freq='min').strftime('%H:%M').tolist()
grouped = {}
for date_str, g in work.groupby('date_str'):
rg = g.set_index('hhmm').reindex(time_range)
fcols = [c for c in rg.columns
if c.startswith(('ce_', 'pe_')) or c in ('atm', 'straddle')]
rg[fcols] = rg[fcols].ffill()
if not rg.empty:
grouped[date_str] = rg
self.PANEL = {}
for ds, df in grouped.items():
idx = {t: i for i, t in enumerate(df.index)}
cols = {c: df[c].to_numpy(dtype='float32')
for c in df.columns
if c.startswith(('ce_', 'pe_')) or c == 'atm'}
so = df.loc[OPEN_TIME, 'straddle'] if OPEN_TIME in idx else np.nan
self.PANEL[ds] = {
'idx': idx,
'atm': cols['atm'],
'cols': cols,
'open': float(so) if pd.notna(so) else np.nan,
}
self._COLNAME = {}
# ── helpers ───────────────────────────────────────────────
def _col_name(self, opt_type: str, delta: int) -> str:
k = (opt_type, delta)
if k not in self._COLNAME:
if delta == 0:
self._COLNAME[k] = f'{opt_type}_atm'
elif delta > 0:
self._COLNAME[k] = f'{opt_type}_p{delta}'
else:
self._COLNAME[k] = f'{opt_type}_m{abs(delta)}'
return self._COLNAME[k]
@staticmethod
def _gen_entries(start, end, step=15):
out, t = [], pd.to_datetime(start, format='%H:%M')
e = pd.to_datetime(end, format='%H:%M')
while t <= e:
out.append(t.strftime('%H:%M'))
t += pd.Timedelta(minutes=step)
return out
# ── core backtest (exact replica of original logic) ───────
def _run_backtest(self, sl_pct, moneyness='ATM', min_straddle=0.0,
entry_time='09:30', exit_time='15:30'):
ce_off = MONEYNESS[moneyness]['ce_off']
pe_off = MONEYNESS[moneyness]['pe_off']
ce_entry_col = self._col_name('ce', ce_off)
pe_entry_col = self._col_name('pe', pe_off)
all_trades, skipped = [], 0
for date_str, p in self.PANEL.items():
idx = p['idx']
if entry_time not in idx or exit_time not in idx:
continue
op = p['open']
if np.isnan(op) or op < min_straddle:
skipped += 1
continue
ei, xi = idx[entry_time], idx[exit_time]
atm_arr, cols = p['atm'], p['cols']
ce_arr = cols.get(ce_entry_col)
pe_arr = cols.get(pe_entry_col)
if ce_arr is None or pe_arr is None:
continue
eav, ece, epe = atm_arr[ei], ce_arr[ei], pe_arr[ei]
if np.isnan(eav) or np.isnan(ece) or np.isnan(epe):
continue
entry_atm = int(eav)
ce_strike = entry_atm + ce_off
pe_strike = entry_atm + pe_off
ce_sl_t = ece * (1 + sl_pct)
pe_sl_t = epe * (1 + sl_pct)
ce_active = pe_active = True
ce_exit = ece; pe_exit = epe
last_ce = ece; last_pe = epe
ce_reason = pe_reason = f'{exit_time} Sqoff'
for i in range(ei + 1, xi + 1):
if not ce_active and not pe_active:
break
cav = atm_arr[i]
if np.isnan(cav):
continue
curr_atm = int(cav)
if ce_active:
delta = int(round((ce_strike - curr_atm) / 50.0) * 50)
arr = cols.get(self._col_name('ce', delta))
if arr is not None:
v = arr[i]
if not np.isnan(v):
last_ce = v
if v >= ce_sl_t:
ce_active = False; ce_exit = v; ce_reason = 'SL Hit'
if pe_active:
delta = int(round((pe_strike - curr_atm) / 50.0) * 50)
arr = cols.get(self._col_name('pe', delta))
if arr is not None:
v = arr[i]
if not np.isnan(v):
last_pe = v
if v >= pe_sl_t:
pe_active = False; pe_exit = v; pe_reason = 'SL Hit'
if ce_active:
ce_exit = last_ce
if pe_active:
pe_exit = last_pe
ce_cost = FRICTION * (ece + ce_exit)
pe_cost = FRICTION * (epe + pe_exit)
ce_pnl = (ece - ce_exit) - ce_cost
pe_pnl = (epe - pe_exit) - pe_cost
all_trades.append({
'Date': date_str, 'Config': moneyness,
'Open_Straddle': round(op, 2),
'Entry_ATM': entry_atm,
'CE_Strike': ce_strike, 'PE_Strike': pe_strike,
'Entry_CE': round(ece, 2), 'Entry_PE': round(epe, 2),
'Exit_CE': round(ce_exit, 2), 'Exit_PE': round(pe_exit, 2),
'CE_Exit_Reason': ce_reason, 'PE_Exit_Reason': pe_reason,
'CE_PnL_Net': round(ce_pnl, 2), 'PE_PnL_Net': round(pe_pnl, 2),
'Day_PnL': round(ce_pnl + pe_pnl, 2),
})
return pd.DataFrame(all_trades), skipped
# ── metrics (exact replica) ───────────────────────────────
def _calc_metrics(self, trades_df, sl_label, config_key=None):
total = len(trades_df)
base = {
'SL Level': sl_label, 'Trades': total, 'Win Rate': 0,
'Implied RR': 0, 'Avg Winner': 0, 'Avg Loser': 0,
'Realized RR': 0, 'RR Efficiency': 0, 'Sharpe': 0,
'Profit Factor': 0, 'Max DD': 0, 'Total PnL': 0,
}
if config_key is not None:
base = {'Config Key': config_key, **base}
if total == 0:
return base
pnl = trades_df['Day_PnL'].values
winners = pnl[pnl > 0]
losers = pnl[pnl < 0]
win_rate = len(winners) / total * 100
avg_w = float(winners.mean()) if len(winners) else 0
avg_l = float(losers.mean()) if len(losers) else 0
realized_rr = avg_w / abs(avg_l) if avg_l != 0 else 0
sl_num = float(sl_label.replace('%', '')) / 100.0
implied_rr = 1.0 / sl_num if sl_num > 0 else 0
rr_eff = realized_rr / implied_rr if implied_rr > 0 else 0
sharpe = float(pnl.mean() / pnl.std() * ANNUALISE) if pnl.std() > 0 else 0
cum = np.cumsum(pnl)
peak = np.maximum.accumulate(cum)
max_dd = float((cum - peak).min())
gw = float(winners.sum()) if len(winners) else 0
gl = abs(float(losers.sum())) if len(losers) else 0.01
base.update({
'Win Rate': round(win_rate, 1),
'Implied RR': round(implied_rr, 2),
'Avg Winner': round(avg_w, 2),
'Avg Loser': round(avg_l, 2),
'Realized RR': round(realized_rr, 2),
'RR Efficiency': round(rr_eff, 2),
'Sharpe': round(sharpe, 2),
'Profit Factor': round(gw / gl, 2),
'Max DD': round(max_dd, 2),
'Total PnL': round(float(pnl.sum()), 2),
})
return base
# ═══════════════════ PUBLIC API ═════════════════════════
def get_opening_straddle(self):
"""Return opening-straddle trend data for the chart."""
os_data = self.open_strad.copy()
rolling_med = os_data['straddle'].rolling(8, min_periods=1).median()
yearly = (os_data.groupby('year')['straddle']
.agg(['count', 'min', 'median', 'mean', 'max']).round(1))
yearly_stats = {}
for yr, row in yearly.iterrows():
yrg = os_data[os_data['year'] == yr]
yearly_stats[int(yr)] = {
'count': int(row['count']),
'min': float(row['min']),
'median': float(row['median']),
'mean': float(row['mean']),
'max': float(row['max']),
}
return {
'dates': os_data['dt'].dt.strftime('%Y-%m-%d').tolist(),
'straddles': [round(v, 2) for v in os_data['straddle'].tolist()],
'rolling_median': [round(v, 2) for v in rolling_med.tolist()],
'yearly_stats': yearly_stats,
'total_days': len(os_data),
}
def run_single_entry(self, moneyness='ATM', min_straddle=0.0):
"""Framing 1 — single 09:30 → 15:30 entry across all SL levels."""
all_results, skipped = {}, 0
for sl in SL_LEVELS:
trades, sk = self._run_backtest(
sl / 100.0, moneyness, min_straddle,
entry_time=ENTRY_TIME, exit_time=EXIT_TIME)
all_results[f"{sl}%"] = trades
skipped = sk
# leaderboard
leaderboard = [self._calc_metrics(t, lab) for lab, t in all_results.items()]
leaderboard.sort(key=lambda x: x['Sharpe'], reverse=True)
# PnL by SL
sl_order = sorted(all_results.keys(),
key=lambda x: float(x.replace('%', '')))
pnl_by_sl = {k: round(float(all_results[k]['Day_PnL'].sum()), 2)
if len(all_results[k]) > 0 else 0
for k in sl_order}
# RR data
rr_data = {'sl_levels': sl_order, 'realized': [], 'implied': []}
metric_map = {m['SL Level']: m for m in leaderboard}
for sl in sl_order:
m = metric_map.get(sl, {})
rr_data['realized'].append(m.get('Realized RR', 0))
rr_data['implied'].append(m.get('Implied RR', 0))
# yearly PnL
yearly_pnl = {}
for sl_label, t in all_results.items():
if len(t) == 0:
continue
tt = t.copy()
tt['Year'] = pd.to_datetime(tt['Date']).dt.year
for yr, ydf in tt.groupby('Year'):
yr = int(yr)
if yr not in yearly_pnl:
yearly_pnl[yr] = {}
yearly_pnl[yr][sl_label] = round(float(ydf['Day_PnL'].sum()), 2)
# quarterly PnL
quarterly_pnl = {}
for sl_label, t in all_results.items():
if len(t) == 0:
continue
tt = t.copy()
tt['Date'] = pd.to_datetime(tt['Date'])
tt['Year'] = tt['Date'].dt.year
tt['Quarter'] = tt['Date'].dt.quarter
for (yr, q), qdf in tt.groupby(['Year', 'Quarter']):
key = f"{int(yr)}-Q{int(q)}"
if key not in quarterly_pnl:
quarterly_pnl[key] = {}
quarterly_pnl[key][sl_label] = round(float(qdf['Day_PnL'].sum()), 2)
# equity curve (best Sharpe SL)
best_sl = leaderboard[0]['SL Level'] if leaderboard else '30%'
best_trades = all_results.get(best_sl, pd.DataFrame())
equity_curve = []
if len(best_trades) > 0:
cum_pnl = best_trades['Day_PnL'].cumsum()
equity_curve = [
{'date': d, 'pnl': round(float(p), 2)}
for d, p in zip(best_trades['Date'], cum_pnl)
]
# trade log for best SL (limited)
trade_log = []
if len(best_trades) > 0:
for _, row in best_trades.iterrows():
trade_log.append({
'Date': row['Date'],
'Entry_ATM': int(row['Entry_ATM']),
'CE_Strike': int(row['CE_Strike']),
'PE_Strike': int(row['PE_Strike']),
'Entry_CE': float(row['Entry_CE']),
'Entry_PE': float(row['Entry_PE']),
'Exit_CE': float(row['Exit_CE']),
'Exit_PE': float(row['Exit_PE']),
'CE_Exit_Reason': row['CE_Exit_Reason'],
'PE_Exit_Reason': row['PE_Exit_Reason'],
'Day_PnL': float(row['Day_PnL']),
})
cfg = MONEYNESS[moneyness]
eligible = int((self.open_strad['straddle'] >= min_straddle).sum())
return {
'config': {
'moneyness': moneyness,
'ce_off': cfg['ce_off'],
'pe_off': cfg['pe_off'],
'min_straddle': min_straddle,
'eligible_days': eligible,
'total_days': len(self.open_strad),
'skipped_days': skipped,
},
'leaderboard': leaderboard,
'pnl_by_sl': pnl_by_sl,
'rr_data': rr_data,
'yearly_pnl': yearly_pnl,
'quarterly_pnl': quarterly_pnl,
'equity_curve': equity_curve,
'best_sl': best_sl,
'trade_log': trade_log,
}
def run_basket_backtest(self, moneyness='ATM', min_straddle=0.0,
basket_type='15min'):
"""Framing 2 (15-min) or Framing 3 (2-hour) basket analysis."""
if basket_type == '15min':
time_configs = []
for e in self._gen_entries('09:30', '11:15'):
time_configs.append({'entry': e, 'exit': '15:30', 'basket': 'Morning'})
for e in self._gen_entries('11:30', '13:15'):
time_configs.append({'entry': e, 'exit': '15:30', 'basket': 'Mid-day'})
for e in self._gen_entries('13:30', '15:15'):
time_configs.append({'entry': e, 'exit': '15:30', 'basket': 'Afternoon'})
else:
time_configs = []
for e in ['09:16', '09:20', '09:25', '09:30']:
time_configs.append({'entry': e, 'exit': '11:30', 'basket': 'Morning'})
for e in ['11:16', '11:20', '11:25', '11:30']:
time_configs.append({'entry': e, 'exit': '13:30', 'basket': 'Mid-day'})
for e in ['13:16', '13:20', '13:25', '13:30']:
time_configs.append({'entry': e, 'exit': '15:30', 'basket': 'Afternoon'})
# run every config × SL
all_flex = {}
for c in time_configs:
key = f"Basket: {c['basket']}, Entry: {c['entry']}, Exit: {c['exit']}"
all_flex[key] = {}
for sl in SL_LEVELS:
trades, _ = self._run_backtest(
sl / 100.0, moneyness=moneyness,
min_straddle=min_straddle,
entry_time=c['entry'], exit_time=c['exit'])
all_flex[key][f"{sl}%"] = trades
# master leaderboard
rows = []
for cfg_key, d in all_flex.items():
for sl, t in d.items():
rows.append(self._calc_metrics(t, sl, cfg_key))
master_df = pd.DataFrame(rows)
master_df[['Basket', 'Entry Time', 'Exit Time']] = (
master_df['Config Key'].str.extract(
r'Basket: ([\w-]+), Entry: (\d{2}:\d{2}), Exit: (\d{2}:\d{2})'))
# top 10 by Sharpe
top10 = (master_df.sort_values('Sharpe', ascending=False)
.head(10).drop(columns='Config Key').to_dict('records'))
# avg PnL by basket & SL
avg_pnl = (master_df.groupby(['Basket', 'SL Level'])['Total PnL']
.mean().reset_index())
basket_sl_pnl = {}
for basket in ['Morning', 'Mid-day', 'Afternoon']:
bdata = avg_pnl[avg_pnl['Basket'] == basket]
sl_order = sorted(bdata['SL Level'].unique(),
key=lambda x: float(x.replace('%', '')))
basket_sl_pnl[basket] = {
row['SL Level']: round(float(row['Total PnL']), 2)
for _, row in bdata.iterrows()
}
# RR by basket
sl_order_all = [f"{s}%" for s in sorted(SL_LEVELS)]
rr_by_basket = {}
for basket in ['Morning', 'Mid-day', 'Afternoon']:
bdata = master_df[master_df['Basket'] == basket]
avg_rr = (bdata.groupby('SL Level')
.agg(realized=('Realized RR', 'mean'),
implied=('Implied RR', 'mean'))
.reindex(sl_order_all))
rr_by_basket[basket] = {
'sl_levels': sl_order_all,
'realized': [round(float(v), 2) if not np.isnan(v) else 0
for v in avg_rr['realized'].values],
'implied': [round(float(v), 2) if not np.isnan(v) else 0
for v in avg_rr['implied'].values],
}
# yearly basket PnL
yearly_basket = {}
for key, d in all_flex.items():
bm = re.search(r'Basket: ([\w-]+)', key)
basket = bm.group(1) if bm else 'Unknown'
for sl, t in d.items():
if t.empty:
continue
tt = t.copy()
tt['Year'] = pd.to_datetime(tt['Date']).dt.year
for yr, pnl_val in tt.groupby('Year')['Day_PnL'].sum().items():
yr = int(yr)
yearly_basket.setdefault(yr, {}).setdefault(basket, {}).setdefault(sl, []).append(float(pnl_val))
# average the lists
for yr in yearly_basket:
for basket in yearly_basket[yr]:
for sl in yearly_basket[yr][basket]:
vals = yearly_basket[yr][basket][sl]
yearly_basket[yr][basket][sl] = round(sum(vals) / len(vals), 2)
return {
'type': basket_type,
'total_configs': len(time_configs),
'total_backtests': len(time_configs) * len(SL_LEVELS),
'top10': top10,
'avg_pnl_by_basket_sl': basket_sl_pnl,
'rr_by_basket': rr_by_basket,
'yearly_basket_pnl': yearly_basket,
}