""" regime.py — Market regime detection with ADX, volatility compression, distance-from-mean filter, and regime confidence scoring. Key fixes vs prior version: - STRUCTURE_LOOKBACK halved (10) to reduce entry lag - True ATR (not EWM-only) with percentile-based compression detection - ADX for objective trend strength (replaces pure HH/HL heuristic) - Regime confidence: composite of trend + structure + vol alignment - Distance-from-mean filter to avoid entering extended moves """ from typing import Dict, Any import numpy as np import pandas as pd from config import ( ATR_PERIOD, STRUCTURE_LOOKBACK, STRUCTURE_CONFIRM_BARS, VOLATILITY_EXPANSION_MULT, VOLATILITY_CONTRACTION_MULT, VOL_COMPRESSION_LOOKBACK, VOL_COMPRESSION_PERCENTILE, VOL_EXPANSION_CONFIRM_MULT, ADX_PERIOD, ADX_TREND_THRESHOLD, ADX_STRONG_THRESHOLD, DIST_FROM_MEAN_MA, DIST_FROM_MEAN_ATR_MAX, REGIME_CONFIDENCE_MIN, ) def compute_atr(df: pd.DataFrame, period: int = ATR_PERIOD) -> pd.Series: high, low, prev_close = df["high"], df["low"], df["close"].shift(1) tr = pd.concat( [high - low, (high - prev_close).abs(), (low - prev_close).abs()], axis=1, ).max(axis=1) # Use Wilder's smoothing (RMA) — matches TradingView / industry standard return tr.ewm(alpha=1.0 / period, adjust=False).mean() def compute_adx(df: pd.DataFrame, period: int = ADX_PERIOD) -> pd.DataFrame: """ Returns DataFrame with columns: adx, di_plus, di_minus. Uses Wilder smoothing throughout to match standard ADX definition. """ high, low, close = df["high"], df["low"], df["close"] prev_high = high.shift(1) prev_low = low.shift(1) prev_close = close.shift(1) dm_plus = (high - prev_high).clip(lower=0) dm_minus = (prev_low - low).clip(lower=0) # Zero out when the other direction is larger mask = dm_plus >= dm_minus dm_plus = dm_plus.where(mask, 0.0) dm_minus = dm_minus.where(~mask, 0.0) tr = pd.concat( [high - low, (high - prev_close).abs(), (low - prev_close).abs()], axis=1, ).max(axis=1) alpha = 1.0 / period atr_w = tr.ewm(alpha=alpha, adjust=False).mean() sdm_plus = dm_plus.ewm(alpha=alpha, adjust=False).mean() sdm_minus = dm_minus.ewm(alpha=alpha, adjust=False).mean() di_plus = 100 * sdm_plus / atr_w.replace(0, np.nan) di_minus = 100 * sdm_minus / atr_w.replace(0, np.nan) dx = 100 * (di_plus - di_minus).abs() / (di_plus + di_minus).replace(0, np.nan) adx = dx.ewm(alpha=alpha, adjust=False).mean() return pd.DataFrame({"adx": adx, "di_plus": di_plus, "di_minus": di_minus}) def compute_structure(df: pd.DataFrame, lookback: int = STRUCTURE_LOOKBACK) -> pd.Series: roll_high = df["high"].rolling(lookback).max() roll_low = df["low"].rolling(lookback).min() half = max(1, lookback // 2) prev_high = roll_high.shift(half) prev_low = roll_low.shift(half) hh = roll_high > prev_high hl = roll_low > prev_low lh = roll_high < prev_high ll = roll_low < prev_low structure = pd.Series(0, index=df.index) structure[hh & hl] = 1 structure[lh & ll] = -1 return structure def compute_volatility_compression( atr_series: pd.Series, lookback: int = VOL_COMPRESSION_LOOKBACK, percentile: float = VOL_COMPRESSION_PERCENTILE, ) -> pd.Series: """ Returns True where current ATR is below the Nth percentile of its recent history — i.e., volatility is compressed (coiled). """ rolling_pct = atr_series.rolling(lookback).quantile(percentile / 100.0) return atr_series < rolling_pct def compute_volatility_expanding_from_compression( atr_series: pd.Series, compressed_series: pd.Series, mult: float = VOL_EXPANSION_CONFIRM_MULT, lookback: int = 5, ) -> pd.Series: """ Returns True where ATR is now expanding (current > recent_min * mult) AND was compressed within the last `lookback` bars. Catches the precise moment of volatility breakout from a base. """ recent_min_atr = atr_series.rolling(lookback).min().shift(1) expanding = atr_series > recent_min_atr * mult was_compressed = compressed_series.shift(1).rolling(lookback).max().fillna(0) > 0 return expanding & was_compressed def compute_distance_from_mean( df: pd.DataFrame, atr_series: pd.Series, ma_period: int = DIST_FROM_MEAN_MA, atr_max: float = DIST_FROM_MEAN_ATR_MAX, ) -> pd.Series: """ Returns ATR-normalised distance of close from its SMA. Values > atr_max mean price is too extended for a fresh long entry. """ sma = df["close"].rolling(ma_period).mean() distance_atr = (df["close"] - sma) / atr_series.replace(0, np.nan) return distance_atr def classify_trend( structure_series: pd.Series, adx_df: pd.DataFrame, lookback: int = STRUCTURE_CONFIRM_BARS, ) -> str: recent_struct = structure_series.iloc[-lookback:] bullish = (recent_struct == 1).sum() bearish = (recent_struct == -1).sum() adx_val = float(adx_df["adx"].iloc[-1]) if not np.isnan(adx_df["adx"].iloc[-1]) else 0.0 di_plus = float(adx_df["di_plus"].iloc[-1]) if not np.isnan(adx_df["di_plus"].iloc[-1]) else 0.0 di_minus = float(adx_df["di_minus"].iloc[-1]) if not np.isnan(adx_df["di_minus"].iloc[-1]) else 0.0 adx_trending = adx_val >= ADX_TREND_THRESHOLD if adx_trending and di_plus > di_minus and bullish >= max(1, lookback // 2): return "bullish" if adx_trending and di_minus > di_plus and bearish >= max(1, lookback // 2): return "bearish" return "ranging" def compute_regime_confidence( trend: str, adx_val: float, structure: int, vol_expanding_from_base: bool, vol_ratio: float, dist_atr: float, ) -> float: """ Composite confidence [0, 1] requiring alignment across: - ADX trend strength - Price structure - Volatility expanding from compression - Price not extended Low confidence = system holds off even if other scores look good. """ score = 0.0 # ADX contribution (0 to 0.35) if adx_val >= ADX_STRONG_THRESHOLD: score += 0.35 elif adx_val >= ADX_TREND_THRESHOLD: score += 0.20 else: score += 0.05 # Structure alignment (0 to 0.25) if trend == "bullish" and structure == 1: score += 0.25 elif trend == "bearish" and structure == -1: score += 0.25 elif structure == 0: score += 0.10 else: score += 0.0 # Volatility expanding from base (0 to 0.25) if vol_expanding_from_base: score += 0.25 elif 1.0 < vol_ratio < VOLATILITY_EXPANSION_MULT: score += 0.10 else: score += 0.0 # Price not extended (0 to 0.15) abs_dist = abs(dist_atr) if not np.isnan(dist_atr) else 0.0 if abs_dist < 1.0: score += 0.15 elif abs_dist < DIST_FROM_MEAN_ATR_MAX: score += 0.07 else: score += 0.0 return float(np.clip(score, 0.0, 1.0)) def detect_regime(df: pd.DataFrame) -> Dict[str, Any]: atr_series = compute_atr(df, ATR_PERIOD) adx_df = compute_adx(df, ADX_PERIOD) structure_series = compute_structure(df, STRUCTURE_LOOKBACK) compressed_series = compute_volatility_compression(atr_series) expanding_from_base = compute_volatility_expanding_from_compression( atr_series, compressed_series ) dist_atr_series = compute_distance_from_mean(df, atr_series) last_atr = float(atr_series.iloc[-1]) last_close = float(df["close"].iloc[-1]) last_structure = int(structure_series.iloc[-1]) last_adx = float(adx_df["adx"].iloc[-1]) if not np.isnan(adx_df["adx"].iloc[-1]) else 0.0 last_di_plus = float(adx_df["di_plus"].iloc[-1]) if not np.isnan(adx_df["di_plus"].iloc[-1]) else 0.0 last_di_minus = float(adx_df["di_minus"].iloc[-1]) if not np.isnan(adx_df["di_minus"].iloc[-1]) else 0.0 last_compressed = bool(compressed_series.iloc[-1]) last_expanding_from_base = bool(expanding_from_base.iloc[-1]) last_dist_atr = float(dist_atr_series.iloc[-1]) if not np.isnan(dist_atr_series.iloc[-1]) else 0.0 atr_ma = atr_series.rolling(ATR_PERIOD * 2).mean() last_atr_ma = float(atr_ma.iloc[-1]) if not np.isnan(atr_ma.iloc[-1]) else last_atr vol_ratio = last_atr / last_atr_ma if last_atr_ma > 0 else 1.0 vol_expanding = vol_ratio > VOLATILITY_EXPANSION_MULT vol_contracting = vol_ratio < VOLATILITY_CONTRACTION_MULT atr_pct = last_atr / last_close if last_close > 0 else 0.0 trend = classify_trend(structure_series, adx_df, STRUCTURE_CONFIRM_BARS) price_too_extended_long = last_dist_atr > DIST_FROM_MEAN_ATR_MAX price_too_extended_short = last_dist_atr < -DIST_FROM_MEAN_ATR_MAX regime_confidence = compute_regime_confidence( trend=trend, adx_val=last_adx, structure=last_structure, vol_expanding_from_base=last_expanding_from_base, vol_ratio=vol_ratio, dist_atr=last_dist_atr, ) # Regime score: raw directional quality if trend == "bullish" and not vol_expanding: regime_score = 1.0 elif trend == "bullish" and vol_expanding: regime_score = 0.55 elif trend == "ranging": regime_score = 0.25 elif trend == "bearish" and not vol_expanding: regime_score = 0.15 else: regime_score = 0.05 if last_adx >= ADX_STRONG_THRESHOLD: regime_score = min(1.0, regime_score + 0.1) elif last_adx < ADX_TREND_THRESHOLD: regime_score = max(0.0, regime_score - 0.15) if last_structure == 1: regime_score = min(1.0, regime_score + 0.1) elif last_structure == -1: regime_score = max(0.0, regime_score - 0.1) atr_ma_20 = atr_series.rolling(20).mean().iloc[-1] atr_ma_50 = atr_series.rolling(50).mean().iloc[-1] if len(df) >= 50 else atr_ma_20 atr_trend_dir = "rising" if atr_ma_20 > atr_ma_50 else "falling" return { "atr": last_atr, "atr_pct": atr_pct, "atr_pct_pct": round(atr_pct * 100, 3), "structure": last_structure, "trend": trend, "vol_ratio": round(vol_ratio, 3), "vol_expanding": vol_expanding, "vol_contracting": vol_contracting, "vol_compressed": last_compressed, "vol_expanding_from_base": last_expanding_from_base, "adx": round(last_adx, 2), "di_plus": round(last_di_plus, 2), "di_minus": round(last_di_minus, 2), "dist_atr": round(last_dist_atr, 3), "price_extended_long": price_too_extended_long, "price_extended_short": price_too_extended_short, "regime_confidence": round(regime_confidence, 4), "regime_score": round(float(np.clip(regime_score, 0.0, 1.0)), 4), "atr_trend": atr_trend_dir, "atr_series": atr_series, "structure_series": structure_series, "adx_series": adx_df, "compressed_series": compressed_series, "dist_atr_series": dist_atr_series, }