nse-bot-backend / dataset_generator.py
ash001's picture
Deploy from GitHub Actions to nse-bot-backend
789e5eb verified
from pathlib import Path
from datetime import datetime, timedelta, time
from typing import Dict, Optional, List
import pandas as pd
from history_utils import get_stock_5m, get_option_5m
from indicators import add_strategy_indicators
from option_utils import select_nearest_option_contract
# Reuse the exact same strategy helpers from your current engine
from strategy_engine import (
_qualifies,
_trigger_price,
_breaks_trigger,
_entry_price_from_option_candle,
_stop_from_option_indication_candle,
_scan_option_exit,
)
BASE_DIR = Path(__file__).resolve().parent
UNIVERSE_PATH = BASE_DIR / "option_stock_universe.csv"
OUT_DIR = BASE_DIR / "outputs"
OUT_DIR.mkdir(exist_ok=True)
# -----------------------------
# Config
# -----------------------------
MODE = "priority" # "priority" or "all"
VARIANT = "v2" # "v1" or "v2"
START_DATE = "2026-01-01"
END_DATE = "2026-02-28"
# Keep None to scan everything in the chosen mode
MAX_SYMBOLS = None
# Scan starts from 9:30
START_SCAN_TIME = time(9, 30)
# -----------------------------
# Utility helpers
# -----------------------------
def _body_high(row: pd.Series) -> float:
return float(max(row["open"], row["close"]))
def _body_low(row: pd.Series) -> float:
return float(min(row["open"], row["close"]))
def _candle_color(row: pd.Series) -> str:
if row["close"] > row["open"]:
return "GREEN"
if row["close"] < row["open"]:
return "RED"
return "DOJI"
def _safe_row(x):
if isinstance(x, pd.DataFrame):
return x.iloc[0]
return x
def _to_float(x, default=0.0):
try:
if pd.isna(x):
return default
return float(x)
except Exception:
return default
def _pct_change(a, b):
a = _to_float(a, 0.0)
b = _to_float(b, 0.0)
if b == 0:
return 0.0
return (a - b) / b
def _minutes_from_open(ts):
# Market open assumed 09:15 IST
return (pd.Timestamp(ts).hour * 60 + pd.Timestamp(ts).minute) - (9 * 60 + 15)
def _extract_candle_features(prefix: str, row: pd.Series) -> Dict:
open_ = _to_float(row["open"])
high = _to_float(row["high"])
low = _to_float(row["low"])
close = _to_float(row["close"])
volume = _to_float(row.get("volume", 0.0))
body_high = max(open_, close)
body_low = min(open_, close)
candle_range = high - low
body_size = abs(close - open_)
upper_wick = high - body_high
lower_wick = body_low - low
candle_dir = 1 if close > open_ else -1 if close < open_ else 0
return {
f"{prefix}_open": round(open_, 4),
f"{prefix}_high": round(high, 4),
f"{prefix}_low": round(low, 4),
f"{prefix}_close": round(close, 4),
f"{prefix}_volume": round(volume, 2),
f"{prefix}_body_high": round(body_high, 4),
f"{prefix}_body_low": round(body_low, 4),
f"{prefix}_range": round(candle_range, 4),
f"{prefix}_body_size": round(body_size, 4),
f"{prefix}_upper_wick": round(upper_wick, 4),
f"{prefix}_lower_wick": round(lower_wick, 4),
f"{prefix}_color": _candle_color(row),
f"{prefix}_dir": candle_dir,
}
def load_universe() -> pd.DataFrame:
df = pd.read_csv(UNIVERSE_PATH)
df["symbol"] = df["symbol"].astype(str).str.upper()
df["sector"] = df["sector"].fillna("").astype(str)
df["priority_rank"] = pd.to_numeric(df["priority_rank"], errors="coerce").fillna(999)
if MODE == "priority":
df = df[df["priority_rank"] == 1].copy()
elif MODE == "all":
df = df.copy()
else:
raise ValueError("MODE must be 'priority' or 'all'")
df = df.sort_values(["priority_rank", "symbol"]).reset_index(drop=True)
if MAX_SYMBOLS is not None:
df = df.head(MAX_SYMBOLS).copy()
return df
def get_trade_dates(start_date: str, end_date: str) -> List[pd.Timestamp]:
# Weekdays only. Exchange holidays will naturally produce no data and be skipped.
return list(pd.bdate_range(start=start_date, end=end_date))
# -----------------------------
# Dataset row builder
# -----------------------------
def _build_dataset_row(
stock_symbol: str,
sector: str,
direction: str,
variant: str,
contract: dict,
indication_row: pd.Series,
confirmation_row: pd.Series,
trigger_row: pd.Series,
indication_option_row: pd.Series,
trigger_option_row: pd.Series,
buy_price: float,
stop_loss: float,
target_1: float,
target_2: float,
trigger_level: float,
t1_time,
t2_time,
exit_status: str,
day_df: pd.DataFrame,
):
strike_value = contract.get("strike")
lot_size = int(contract["lot_size"]) if pd.notna(contract.get("lot_size")) else 1
option_symbol = str(contract["tradingsymbol"]).upper()
expiry = contract.get("expiry")
reward = buy_price - stop_loss
if reward <= 0:
return None
label_1to1 = int(pd.notna(t1_time))
label_1to2 = int(pd.notna(t2_time))
day_open = _to_float(day_df.iloc[0]["open"]) if not day_df.empty else 0.0
entry_underlying_price = _to_float(trigger_row["close"])
entry_option_price = _to_float(buy_price)
confirmation_ema9 = _to_float(confirmation_row["ema9"])
confirmation_ema21 = _to_float(confirmation_row["ema21"])
ema_gap = confirmation_ema9 - confirmation_ema21
ema_gap_pct = (ema_gap / confirmation_ema21) if confirmation_ema21 else 0.0
row = {
"trade_date": str(pd.Timestamp(trigger_row["timestamp"]).date()),
"weekday": pd.Timestamp(trigger_row["timestamp"]).day_name(),
"symbol": stock_symbol,
"sector": sector,
"direction": direction,
"call_put": direction,
"trade_side": f"{direction} {int(strike_value)}" if pd.notna(strike_value) else direction,
"strike": int(strike_value) if pd.notna(strike_value) else None,
"option_symbol": option_symbol,
"expiry": str(expiry) if pd.notna(expiry) else None,
"lot_size": lot_size,
"variant": variant,
"indication_time": indication_row["timestamp"],
"confirmation_time": confirmation_row["timestamp"],
"buy_time": trigger_row["timestamp"],
"minutes_from_open": _minutes_from_open(trigger_row["timestamp"]),
"entry_vs_day_open_pct": round(_pct_change(entry_underlying_price, day_open), 6),
"bb_phase_indication": indication_row.get("bb_phase"),
"bb_phase_confirmation": confirmation_row.get("bb_phase"),
"bb_change_up_confirmation": int(bool(confirmation_row.get("bb_change_up", False))),
"bb_change_down_confirmation": int(bool(confirmation_row.get("bb_change_down", False))),
"ema9_indication": round(_to_float(indication_row["ema9"]), 6),
"ema21_indication": round(_to_float(indication_row["ema21"]), 6),
"ema9_confirmation": round(confirmation_ema9, 6),
"ema21_confirmation": round(confirmation_ema21, 6),
"ema_gap_confirmation": round(ema_gap, 6),
"ema_gap_pct_confirmation": round(ema_gap_pct, 6),
"trigger_level": round(trigger_level, 4),
"underlying_entry_close": round(entry_underlying_price, 4),
"option_entry_price": round(entry_option_price, 4),
"stop_loss": round(stop_loss, 4),
"risk_per_unit": round(reward, 4),
"target_1": round(target_1, 4),
"target_2": round(target_2, 4),
"capital_per_lot": round(entry_option_price * lot_size, 2),
"stop_loss_amt_per_lot": round((entry_option_price - stop_loss) * lot_size, 2),
"label_1to1": label_1to1,
"label_1to2": label_1to2,
"exit_status": exit_status,
}
# Prior returns from confirmation context
row["ret_1_confirmation"] = round(_to_float(confirmation_row.get("ret_1", 0.0)), 6)
row["ret_3_confirmation"] = round(_to_float(confirmation_row.get("ret_3", 0.0)), 6)
# Underlying candle features
row.update(_extract_candle_features("underlying_indication", indication_row))
row.update(_extract_candle_features("underlying_confirmation", confirmation_row))
row.update(_extract_candle_features("underlying_trigger", trigger_row))
# Option candle features
row.update(_extract_candle_features("option_indication", indication_option_row))
row.update(_extract_candle_features("option_trigger", trigger_option_row))
# Direction-specific breakout strength
if direction == "CALL":
trigger_test_value = _body_high(trigger_row) if variant == "v2" else _to_float(trigger_row["high"])
breakout_distance = trigger_test_value - trigger_level
else:
trigger_test_value = _body_low(trigger_row) if variant == "v2" else _to_float(trigger_row["low"])
breakout_distance = trigger_level - trigger_test_value
row["breakout_distance"] = round(breakout_distance, 4)
row["breakout_distance_pct"] = round(breakout_distance / trigger_level, 6) if trigger_level else 0.0
return row
# -----------------------------
# Single-symbol single-day scan
# -----------------------------
def generate_rows_for_symbol_day(
stock_symbol: str,
sector: str,
trade_date: str,
variant: str = "v2",
option_cache: Optional[Dict[str, pd.DataFrame]] = None,
) -> pd.DataFrame:
if option_cache is None:
option_cache = {}
trade_day = pd.Timestamp(trade_date).date()
fetch_from_dt = datetime.combine(trade_day - timedelta(days=5), time(9, 15))
fetch_to_dt = datetime.combine(trade_day, time(15, 30))
stock_df = get_stock_5m(stock_symbol, fetch_from_dt, fetch_to_dt)
if stock_df.empty:
return pd.DataFrame()
stock_df = add_strategy_indicators(stock_df)
stock_df = stock_df.sort_values("timestamp").reset_index(drop=True)
# Extra context features available at entry time
stock_df["ret_1"] = stock_df["close"].pct_change(1)
stock_df["ret_3"] = stock_df["close"].pct_change(3)
day_df = stock_df[
(stock_df["timestamp"].dt.date == trade_day) &
(stock_df["timestamp"].dt.time >= START_SCAN_TIME)
].copy()
day_df = day_df.sort_values("timestamp").reset_index(drop=True)
if len(day_df) < 4:
return pd.DataFrame()
dataset_rows = []
i = 0
while i <= len(day_df) - 4:
row_1 = day_df.iloc[i]
row_2 = day_df.iloc[i + 1]
direction = None
if _qualifies(row_1, "CALL") and _qualifies(row_2, "CALL"):
direction = "CALL"
elif _qualifies(row_1, "PUT") and _qualifies(row_2, "PUT"):
direction = "PUT"
if direction is None:
i += 1
continue
indication_row = row_1
confirmation_row = row_2
trigger_level = _trigger_price(confirmation_row, direction, variant)
contract = select_nearest_option_contract(
underlying_symbol=stock_symbol,
direction=direction,
reference_price=trigger_level,
asof_timestamp=confirmation_row["timestamp"],
)
if contract is None:
i += 1
continue
option_symbol = str(contract["tradingsymbol"]).upper()
if option_symbol not in option_cache:
option_df = get_option_5m(option_symbol, fetch_from_dt, fetch_to_dt)
if option_df.empty:
i += 1
continue
option_df = option_df.sort_values("timestamp").reset_index(drop=True)
option_cache[option_symbol] = option_df
option_df = option_cache[option_symbol]
option_map = option_df.set_index("timestamp")
trigger_hit_idx = None
for idx in [i + 2, i + 3]:
if idx >= len(day_df):
break
test_row = day_df.iloc[idx]
if _breaks_trigger(test_row, direction, trigger_level, variant):
trigger_hit_idx = idx
break
if trigger_hit_idx is None:
i += 1
continue
trigger_row = day_df.iloc[trigger_hit_idx]
buy_time = trigger_row["timestamp"]
indication_time = indication_row["timestamp"]
if buy_time not in option_map.index or indication_time not in option_map.index:
i += 1
continue
trigger_option_row = _safe_row(option_map.loc[buy_time])
indication_option_row = _safe_row(option_map.loc[indication_time])
buy_price = _entry_price_from_option_candle(trigger_option_row, variant)
stop_loss = _stop_from_option_indication_candle(indication_option_row, variant)
if pd.isna(buy_price) or pd.isna(stop_loss):
i += 1
continue
buy_price = float(buy_price)
stop_loss = float(stop_loss)
if buy_price <= 0 or stop_loss <= 0 or stop_loss >= buy_price:
i += 1
continue
reward = buy_price - stop_loss
target_1 = buy_price + reward
target_2 = buy_price + (2 * reward)
t1_time, t2_time, exit_status, exit_time = _scan_option_exit(
option_df=option_df,
buy_time=buy_time,
stop_loss=stop_loss,
target_1=target_1,
target_2=target_2,
)
dataset_row = _build_dataset_row(
stock_symbol=stock_symbol,
sector=sector,
direction=direction,
variant=variant,
contract=contract,
indication_row=indication_row,
confirmation_row=confirmation_row,
trigger_row=trigger_row,
indication_option_row=indication_option_row,
trigger_option_row=trigger_option_row,
buy_price=buy_price,
stop_loss=stop_loss,
target_1=target_1,
target_2=target_2,
trigger_level=trigger_level,
t1_time=t1_time,
t2_time=t2_time,
exit_status=exit_status,
day_df=day_df,
)
if dataset_row is not None:
dataset_rows.append(dataset_row)
# Continue scanning after exit if possible, otherwise after trigger candle
next_idx = trigger_hit_idx + 1
if not pd.isna(exit_time):
later = day_df.index[day_df["timestamp"] > exit_time]
if len(later) > 0:
next_idx = int(later[0])
else:
next_idx = len(day_df)
i = max(next_idx, i + 1)
if not dataset_rows:
return pd.DataFrame()
return pd.DataFrame(dataset_rows)
# -----------------------------
# Main driver
# -----------------------------
def main():
universe = load_universe()
dates = get_trade_dates(START_DATE, END_DATE)
print(f"Mode: {MODE}")
print(f"Variant: {VARIANT}")
print(f"Symbols: {len(universe)}")
print(f"Dates: {len(dates)} ({START_DATE} to {END_DATE})")
all_frames = []
for dt in dates:
trade_date = str(dt.date())
print(f"\n=== {trade_date} ===")
option_cache = {}
for _, row in universe.iterrows():
symbol = row["symbol"]
sector = row["sector"]
try:
df = generate_rows_for_symbol_day(
stock_symbol=symbol,
sector=sector,
trade_date=trade_date,
variant=VARIANT,
option_cache=option_cache,
)
if not df.empty:
all_frames.append(df)
print(f"{symbol}: {len(df)} rows")
except Exception as e:
print(f"{symbol}: ERROR -> {e}")
if all_frames:
dataset = pd.concat(all_frames, ignore_index=True)
else:
dataset = pd.DataFrame()
out_path = OUT_DIR / f"ml_dataset_{MODE}_{VARIANT}_{START_DATE}_to_{END_DATE}.csv"
dataset.to_csv(out_path, index=False)
print(f"\nSaved dataset to: {out_path}")
print(f"Total rows: {len(dataset)}")
if not dataset.empty:
print("\nLabel summary:")
print(dataset[["label_1to1", "label_1to2"]].mean().rename("hit_rate"))
print("\nSample rows:")
print(dataset.head(10).to_string(index=False))
if __name__ == "__main__":
main()