Spaces:
Running
Running
| from pathlib import Path | |
| from datetime import datetime, timedelta, time | |
| from typing import Dict, Optional, List | |
| import pandas as pd | |
| from history_utils import get_stock_5m, get_option_5m | |
| from indicators import add_strategy_indicators | |
| from option_utils import select_nearest_option_contract | |
| # Reuse the exact same strategy helpers from your current engine | |
| from strategy_engine import ( | |
| _qualifies, | |
| _trigger_price, | |
| _breaks_trigger, | |
| _entry_price_from_option_candle, | |
| _stop_from_option_indication_candle, | |
| _scan_option_exit, | |
| ) | |
| BASE_DIR = Path(__file__).resolve().parent | |
| UNIVERSE_PATH = BASE_DIR / "option_stock_universe.csv" | |
| OUT_DIR = BASE_DIR / "outputs" | |
| OUT_DIR.mkdir(exist_ok=True) | |
| # ----------------------------- | |
| # Config | |
| # ----------------------------- | |
| MODE = "priority" # "priority" or "all" | |
| VARIANT = "v2" # "v1" or "v2" | |
| START_DATE = "2026-01-01" | |
| END_DATE = "2026-02-28" | |
| # Keep None to scan everything in the chosen mode | |
| MAX_SYMBOLS = None | |
| # Scan starts from 9:30 | |
| START_SCAN_TIME = time(9, 30) | |
| # ----------------------------- | |
| # Utility helpers | |
| # ----------------------------- | |
| def _body_high(row: pd.Series) -> float: | |
| return float(max(row["open"], row["close"])) | |
| def _body_low(row: pd.Series) -> float: | |
| return float(min(row["open"], row["close"])) | |
| def _candle_color(row: pd.Series) -> str: | |
| if row["close"] > row["open"]: | |
| return "GREEN" | |
| if row["close"] < row["open"]: | |
| return "RED" | |
| return "DOJI" | |
| def _safe_row(x): | |
| if isinstance(x, pd.DataFrame): | |
| return x.iloc[0] | |
| return x | |
| def _to_float(x, default=0.0): | |
| try: | |
| if pd.isna(x): | |
| return default | |
| return float(x) | |
| except Exception: | |
| return default | |
| def _pct_change(a, b): | |
| a = _to_float(a, 0.0) | |
| b = _to_float(b, 0.0) | |
| if b == 0: | |
| return 0.0 | |
| return (a - b) / b | |
| def _minutes_from_open(ts): | |
| # Market open assumed 09:15 IST | |
| return (pd.Timestamp(ts).hour * 60 + pd.Timestamp(ts).minute) - (9 * 60 + 15) | |
| def _extract_candle_features(prefix: str, row: pd.Series) -> Dict: | |
| open_ = _to_float(row["open"]) | |
| high = _to_float(row["high"]) | |
| low = _to_float(row["low"]) | |
| close = _to_float(row["close"]) | |
| volume = _to_float(row.get("volume", 0.0)) | |
| body_high = max(open_, close) | |
| body_low = min(open_, close) | |
| candle_range = high - low | |
| body_size = abs(close - open_) | |
| upper_wick = high - body_high | |
| lower_wick = body_low - low | |
| candle_dir = 1 if close > open_ else -1 if close < open_ else 0 | |
| return { | |
| f"{prefix}_open": round(open_, 4), | |
| f"{prefix}_high": round(high, 4), | |
| f"{prefix}_low": round(low, 4), | |
| f"{prefix}_close": round(close, 4), | |
| f"{prefix}_volume": round(volume, 2), | |
| f"{prefix}_body_high": round(body_high, 4), | |
| f"{prefix}_body_low": round(body_low, 4), | |
| f"{prefix}_range": round(candle_range, 4), | |
| f"{prefix}_body_size": round(body_size, 4), | |
| f"{prefix}_upper_wick": round(upper_wick, 4), | |
| f"{prefix}_lower_wick": round(lower_wick, 4), | |
| f"{prefix}_color": _candle_color(row), | |
| f"{prefix}_dir": candle_dir, | |
| } | |
| def load_universe() -> pd.DataFrame: | |
| df = pd.read_csv(UNIVERSE_PATH) | |
| df["symbol"] = df["symbol"].astype(str).str.upper() | |
| df["sector"] = df["sector"].fillna("").astype(str) | |
| df["priority_rank"] = pd.to_numeric(df["priority_rank"], errors="coerce").fillna(999) | |
| if MODE == "priority": | |
| df = df[df["priority_rank"] == 1].copy() | |
| elif MODE == "all": | |
| df = df.copy() | |
| else: | |
| raise ValueError("MODE must be 'priority' or 'all'") | |
| df = df.sort_values(["priority_rank", "symbol"]).reset_index(drop=True) | |
| if MAX_SYMBOLS is not None: | |
| df = df.head(MAX_SYMBOLS).copy() | |
| return df | |
| def get_trade_dates(start_date: str, end_date: str) -> List[pd.Timestamp]: | |
| # Weekdays only. Exchange holidays will naturally produce no data and be skipped. | |
| return list(pd.bdate_range(start=start_date, end=end_date)) | |
| # ----------------------------- | |
| # Dataset row builder | |
| # ----------------------------- | |
| def _build_dataset_row( | |
| stock_symbol: str, | |
| sector: str, | |
| direction: str, | |
| variant: str, | |
| contract: dict, | |
| indication_row: pd.Series, | |
| confirmation_row: pd.Series, | |
| trigger_row: pd.Series, | |
| indication_option_row: pd.Series, | |
| trigger_option_row: pd.Series, | |
| buy_price: float, | |
| stop_loss: float, | |
| target_1: float, | |
| target_2: float, | |
| trigger_level: float, | |
| t1_time, | |
| t2_time, | |
| exit_status: str, | |
| day_df: pd.DataFrame, | |
| ): | |
| strike_value = contract.get("strike") | |
| lot_size = int(contract["lot_size"]) if pd.notna(contract.get("lot_size")) else 1 | |
| option_symbol = str(contract["tradingsymbol"]).upper() | |
| expiry = contract.get("expiry") | |
| reward = buy_price - stop_loss | |
| if reward <= 0: | |
| return None | |
| label_1to1 = int(pd.notna(t1_time)) | |
| label_1to2 = int(pd.notna(t2_time)) | |
| day_open = _to_float(day_df.iloc[0]["open"]) if not day_df.empty else 0.0 | |
| entry_underlying_price = _to_float(trigger_row["close"]) | |
| entry_option_price = _to_float(buy_price) | |
| confirmation_ema9 = _to_float(confirmation_row["ema9"]) | |
| confirmation_ema21 = _to_float(confirmation_row["ema21"]) | |
| ema_gap = confirmation_ema9 - confirmation_ema21 | |
| ema_gap_pct = (ema_gap / confirmation_ema21) if confirmation_ema21 else 0.0 | |
| row = { | |
| "trade_date": str(pd.Timestamp(trigger_row["timestamp"]).date()), | |
| "weekday": pd.Timestamp(trigger_row["timestamp"]).day_name(), | |
| "symbol": stock_symbol, | |
| "sector": sector, | |
| "direction": direction, | |
| "call_put": direction, | |
| "trade_side": f"{direction} {int(strike_value)}" if pd.notna(strike_value) else direction, | |
| "strike": int(strike_value) if pd.notna(strike_value) else None, | |
| "option_symbol": option_symbol, | |
| "expiry": str(expiry) if pd.notna(expiry) else None, | |
| "lot_size": lot_size, | |
| "variant": variant, | |
| "indication_time": indication_row["timestamp"], | |
| "confirmation_time": confirmation_row["timestamp"], | |
| "buy_time": trigger_row["timestamp"], | |
| "minutes_from_open": _minutes_from_open(trigger_row["timestamp"]), | |
| "entry_vs_day_open_pct": round(_pct_change(entry_underlying_price, day_open), 6), | |
| "bb_phase_indication": indication_row.get("bb_phase"), | |
| "bb_phase_confirmation": confirmation_row.get("bb_phase"), | |
| "bb_change_up_confirmation": int(bool(confirmation_row.get("bb_change_up", False))), | |
| "bb_change_down_confirmation": int(bool(confirmation_row.get("bb_change_down", False))), | |
| "ema9_indication": round(_to_float(indication_row["ema9"]), 6), | |
| "ema21_indication": round(_to_float(indication_row["ema21"]), 6), | |
| "ema9_confirmation": round(confirmation_ema9, 6), | |
| "ema21_confirmation": round(confirmation_ema21, 6), | |
| "ema_gap_confirmation": round(ema_gap, 6), | |
| "ema_gap_pct_confirmation": round(ema_gap_pct, 6), | |
| "trigger_level": round(trigger_level, 4), | |
| "underlying_entry_close": round(entry_underlying_price, 4), | |
| "option_entry_price": round(entry_option_price, 4), | |
| "stop_loss": round(stop_loss, 4), | |
| "risk_per_unit": round(reward, 4), | |
| "target_1": round(target_1, 4), | |
| "target_2": round(target_2, 4), | |
| "capital_per_lot": round(entry_option_price * lot_size, 2), | |
| "stop_loss_amt_per_lot": round((entry_option_price - stop_loss) * lot_size, 2), | |
| "label_1to1": label_1to1, | |
| "label_1to2": label_1to2, | |
| "exit_status": exit_status, | |
| } | |
| # Prior returns from confirmation context | |
| row["ret_1_confirmation"] = round(_to_float(confirmation_row.get("ret_1", 0.0)), 6) | |
| row["ret_3_confirmation"] = round(_to_float(confirmation_row.get("ret_3", 0.0)), 6) | |
| # Underlying candle features | |
| row.update(_extract_candle_features("underlying_indication", indication_row)) | |
| row.update(_extract_candle_features("underlying_confirmation", confirmation_row)) | |
| row.update(_extract_candle_features("underlying_trigger", trigger_row)) | |
| # Option candle features | |
| row.update(_extract_candle_features("option_indication", indication_option_row)) | |
| row.update(_extract_candle_features("option_trigger", trigger_option_row)) | |
| # Direction-specific breakout strength | |
| if direction == "CALL": | |
| trigger_test_value = _body_high(trigger_row) if variant == "v2" else _to_float(trigger_row["high"]) | |
| breakout_distance = trigger_test_value - trigger_level | |
| else: | |
| trigger_test_value = _body_low(trigger_row) if variant == "v2" else _to_float(trigger_row["low"]) | |
| breakout_distance = trigger_level - trigger_test_value | |
| row["breakout_distance"] = round(breakout_distance, 4) | |
| row["breakout_distance_pct"] = round(breakout_distance / trigger_level, 6) if trigger_level else 0.0 | |
| return row | |
| # ----------------------------- | |
| # Single-symbol single-day scan | |
| # ----------------------------- | |
| def generate_rows_for_symbol_day( | |
| stock_symbol: str, | |
| sector: str, | |
| trade_date: str, | |
| variant: str = "v2", | |
| option_cache: Optional[Dict[str, pd.DataFrame]] = None, | |
| ) -> pd.DataFrame: | |
| if option_cache is None: | |
| option_cache = {} | |
| trade_day = pd.Timestamp(trade_date).date() | |
| fetch_from_dt = datetime.combine(trade_day - timedelta(days=5), time(9, 15)) | |
| fetch_to_dt = datetime.combine(trade_day, time(15, 30)) | |
| stock_df = get_stock_5m(stock_symbol, fetch_from_dt, fetch_to_dt) | |
| if stock_df.empty: | |
| return pd.DataFrame() | |
| stock_df = add_strategy_indicators(stock_df) | |
| stock_df = stock_df.sort_values("timestamp").reset_index(drop=True) | |
| # Extra context features available at entry time | |
| stock_df["ret_1"] = stock_df["close"].pct_change(1) | |
| stock_df["ret_3"] = stock_df["close"].pct_change(3) | |
| day_df = stock_df[ | |
| (stock_df["timestamp"].dt.date == trade_day) & | |
| (stock_df["timestamp"].dt.time >= START_SCAN_TIME) | |
| ].copy() | |
| day_df = day_df.sort_values("timestamp").reset_index(drop=True) | |
| if len(day_df) < 4: | |
| return pd.DataFrame() | |
| dataset_rows = [] | |
| i = 0 | |
| while i <= len(day_df) - 4: | |
| row_1 = day_df.iloc[i] | |
| row_2 = day_df.iloc[i + 1] | |
| direction = None | |
| if _qualifies(row_1, "CALL") and _qualifies(row_2, "CALL"): | |
| direction = "CALL" | |
| elif _qualifies(row_1, "PUT") and _qualifies(row_2, "PUT"): | |
| direction = "PUT" | |
| if direction is None: | |
| i += 1 | |
| continue | |
| indication_row = row_1 | |
| confirmation_row = row_2 | |
| trigger_level = _trigger_price(confirmation_row, direction, variant) | |
| contract = select_nearest_option_contract( | |
| underlying_symbol=stock_symbol, | |
| direction=direction, | |
| reference_price=trigger_level, | |
| asof_timestamp=confirmation_row["timestamp"], | |
| ) | |
| if contract is None: | |
| i += 1 | |
| continue | |
| option_symbol = str(contract["tradingsymbol"]).upper() | |
| if option_symbol not in option_cache: | |
| option_df = get_option_5m(option_symbol, fetch_from_dt, fetch_to_dt) | |
| if option_df.empty: | |
| i += 1 | |
| continue | |
| option_df = option_df.sort_values("timestamp").reset_index(drop=True) | |
| option_cache[option_symbol] = option_df | |
| option_df = option_cache[option_symbol] | |
| option_map = option_df.set_index("timestamp") | |
| trigger_hit_idx = None | |
| for idx in [i + 2, i + 3]: | |
| if idx >= len(day_df): | |
| break | |
| test_row = day_df.iloc[idx] | |
| if _breaks_trigger(test_row, direction, trigger_level, variant): | |
| trigger_hit_idx = idx | |
| break | |
| if trigger_hit_idx is None: | |
| i += 1 | |
| continue | |
| trigger_row = day_df.iloc[trigger_hit_idx] | |
| buy_time = trigger_row["timestamp"] | |
| indication_time = indication_row["timestamp"] | |
| if buy_time not in option_map.index or indication_time not in option_map.index: | |
| i += 1 | |
| continue | |
| trigger_option_row = _safe_row(option_map.loc[buy_time]) | |
| indication_option_row = _safe_row(option_map.loc[indication_time]) | |
| buy_price = _entry_price_from_option_candle(trigger_option_row, variant) | |
| stop_loss = _stop_from_option_indication_candle(indication_option_row, variant) | |
| if pd.isna(buy_price) or pd.isna(stop_loss): | |
| i += 1 | |
| continue | |
| buy_price = float(buy_price) | |
| stop_loss = float(stop_loss) | |
| if buy_price <= 0 or stop_loss <= 0 or stop_loss >= buy_price: | |
| i += 1 | |
| continue | |
| reward = buy_price - stop_loss | |
| target_1 = buy_price + reward | |
| target_2 = buy_price + (2 * reward) | |
| t1_time, t2_time, exit_status, exit_time = _scan_option_exit( | |
| option_df=option_df, | |
| buy_time=buy_time, | |
| stop_loss=stop_loss, | |
| target_1=target_1, | |
| target_2=target_2, | |
| ) | |
| dataset_row = _build_dataset_row( | |
| stock_symbol=stock_symbol, | |
| sector=sector, | |
| direction=direction, | |
| variant=variant, | |
| contract=contract, | |
| indication_row=indication_row, | |
| confirmation_row=confirmation_row, | |
| trigger_row=trigger_row, | |
| indication_option_row=indication_option_row, | |
| trigger_option_row=trigger_option_row, | |
| buy_price=buy_price, | |
| stop_loss=stop_loss, | |
| target_1=target_1, | |
| target_2=target_2, | |
| trigger_level=trigger_level, | |
| t1_time=t1_time, | |
| t2_time=t2_time, | |
| exit_status=exit_status, | |
| day_df=day_df, | |
| ) | |
| if dataset_row is not None: | |
| dataset_rows.append(dataset_row) | |
| # Continue scanning after exit if possible, otherwise after trigger candle | |
| next_idx = trigger_hit_idx + 1 | |
| if not pd.isna(exit_time): | |
| later = day_df.index[day_df["timestamp"] > exit_time] | |
| if len(later) > 0: | |
| next_idx = int(later[0]) | |
| else: | |
| next_idx = len(day_df) | |
| i = max(next_idx, i + 1) | |
| if not dataset_rows: | |
| return pd.DataFrame() | |
| return pd.DataFrame(dataset_rows) | |
| # ----------------------------- | |
| # Main driver | |
| # ----------------------------- | |
| def main(): | |
| universe = load_universe() | |
| dates = get_trade_dates(START_DATE, END_DATE) | |
| print(f"Mode: {MODE}") | |
| print(f"Variant: {VARIANT}") | |
| print(f"Symbols: {len(universe)}") | |
| print(f"Dates: {len(dates)} ({START_DATE} to {END_DATE})") | |
| all_frames = [] | |
| for dt in dates: | |
| trade_date = str(dt.date()) | |
| print(f"\n=== {trade_date} ===") | |
| option_cache = {} | |
| for _, row in universe.iterrows(): | |
| symbol = row["symbol"] | |
| sector = row["sector"] | |
| try: | |
| df = generate_rows_for_symbol_day( | |
| stock_symbol=symbol, | |
| sector=sector, | |
| trade_date=trade_date, | |
| variant=VARIANT, | |
| option_cache=option_cache, | |
| ) | |
| if not df.empty: | |
| all_frames.append(df) | |
| print(f"{symbol}: {len(df)} rows") | |
| except Exception as e: | |
| print(f"{symbol}: ERROR -> {e}") | |
| if all_frames: | |
| dataset = pd.concat(all_frames, ignore_index=True) | |
| else: | |
| dataset = pd.DataFrame() | |
| out_path = OUT_DIR / f"ml_dataset_{MODE}_{VARIANT}_{START_DATE}_to_{END_DATE}.csv" | |
| dataset.to_csv(out_path, index=False) | |
| print(f"\nSaved dataset to: {out_path}") | |
| print(f"Total rows: {len(dataset)}") | |
| if not dataset.empty: | |
| print("\nLabel summary:") | |
| print(dataset[["label_1to1", "label_1to2"]].mean().rename("hit_rate")) | |
| print("\nSample rows:") | |
| print(dataset.head(10).to_string(index=False)) | |
| if __name__ == "__main__": | |
| main() |