File size: 10,496 Bytes
a461698 baa4e7d 8d2a737 baa4e7d 8d2a737 a461698 baa4e7d a461698 8d2a737 baa4e7d a461698 8d2a737 baa4e7d a461698 8d2a737 baa4e7d a461698 baa4e7d 8d2a737 baa4e7d 8d2a737 a461698 8d2a737 a461698 8d2a737 a461698 8d2a737 baa4e7d a461698 baa4e7d a461698 baa4e7d a461698 8d2a737 baa4e7d a461698 8d2a737 a461698 8d2a737 a461698 8d2a737 baa4e7d a461698 baa4e7d a461698 8d2a737 a461698 8d2a737 a461698 8d2a737 a461698 8d2a737 a461698 8d2a737 a461698 baa4e7d a461698 baa4e7d a461698 baa4e7d 8d2a737 a461698 8d2a737 a461698 8d2a737 a461698 8d2a737 a461698 baa4e7d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 | """
volume_analysis.py — Volume & order flow with absorption detection,
multi-bar breakout confirmation, and fake breakout identification.
Key fixes vs prior version:
- Absorption detection: high-volume small-body bars at resistance = institutional selling
- Multi-bar breakout confirmation (BREAKOUT_CONFIRMATION_BARS) before firing signal
- ATR buffer on breakout level (price must exceed level by N*ATR, not just 1 tick)
- OBV slope computed over configurable window, normalized vs rolling stddev
- Climax threshold lowered (3.0x) and now triggers a hard absorption check
- Failed retest detection: breakout that closes back below the level = fake
"""
from typing import Dict, Any
import numpy as np
import pandas as pd
from config import (
VOLUME_MA_PERIOD,
VOLUME_SPIKE_MULT,
VOLUME_CLIMAX_MULT,
VOLUME_WEAK_THRESHOLD,
BREAKOUT_LOOKBACK,
BREAKOUT_ATR_BUFFER,
BREAKOUT_CONFIRMATION_BARS,
BREAKOUT_RETEST_BARS,
ABSORPTION_WICK_RATIO,
ABSORPTION_VOL_MULT,
ABSORPTION_BODY_RATIO,
OBV_SLOPE_BARS,
ATR_PERIOD,
)
def compute_volume_ma(df: pd.DataFrame, period: int = VOLUME_MA_PERIOD) -> pd.Series:
return df["volume"].rolling(period).mean()
def detect_spikes(df: pd.DataFrame, vol_ma: pd.Series) -> pd.Series:
return df["volume"] > vol_ma * VOLUME_SPIKE_MULT
def detect_climax(df: pd.DataFrame, vol_ma: pd.Series) -> pd.Series:
return df["volume"] > vol_ma * VOLUME_CLIMAX_MULT
def detect_absorption(df: pd.DataFrame, vol_ma: pd.Series) -> pd.Series:
"""
Absorption = high-volume bar with small body and large upper wick,
occurring near recent highs (institutional supply absorbing retail demand).
Conditions (all must be true):
- Volume > ABSORPTION_VOL_MULT * MA
- Body / range < ABSORPTION_BODY_RATIO (small real body)
- Upper wick / range > ABSORPTION_WICK_RATIO (large upper wick)
- Close is in lower half of the bar's range (sellers won the bar)
"""
bar_range = (df["high"] - df["low"]).replace(0, np.nan)
body = (df["close"] - df["open"]).abs()
upper_wick = df["high"] - df[["close", "open"]].max(axis=1)
body_ratio = body / bar_range
wick_ratio = upper_wick / bar_range
close_in_lower_half = df["close"] < (df["low"] + bar_range * 0.5)
high_volume = df["volume"] > vol_ma * ABSORPTION_VOL_MULT
small_body = body_ratio < ABSORPTION_BODY_RATIO
large_wick = wick_ratio > ABSORPTION_WICK_RATIO
return high_volume & small_body & large_wick & close_in_lower_half
def compute_obv(df: pd.DataFrame) -> pd.Series:
direction = np.sign(df["close"].diff()).fillna(0)
return (df["volume"] * direction).cumsum()
def compute_obv_slope(obv: pd.Series, bars: int = OBV_SLOPE_BARS) -> pd.Series:
"""
OBV slope normalized by rolling stddev of OBV to make it comparable
across different price scales. Values > 1 = strong upward flow.
"""
x = np.arange(bars)
def slope_normalized(window):
if len(window) < bars:
return np.nan
s = np.polyfit(x, window, 1)[0]
std = np.std(window)
return s / std if std > 0 else 0.0
return obv.rolling(bars).apply(slope_normalized, raw=True)
def compute_delta_approx(df: pd.DataFrame) -> pd.Series:
body = df["close"] - df["open"]
wick = (df["high"] - df["low"]).replace(0, np.nan)
buy_ratio = ((body / wick) * 0.5 + 0.5).clip(0.0, 1.0).fillna(0.5)
return df["volume"] * buy_ratio - df["volume"] * (1 - buy_ratio)
def compute_vwap_deviation(df: pd.DataFrame, period: int = VOLUME_MA_PERIOD) -> pd.Series:
typical = (df["high"] + df["low"] + df["close"]) / 3
cum_vp = (typical * df["volume"]).rolling(period).sum()
cum_vol = df["volume"].rolling(period).sum().replace(0, np.nan)
vwap = cum_vp / cum_vol
atr_approx = (df["high"] - df["low"]).rolling(ATR_PERIOD).mean().replace(0, np.nan)
return (df["close"] - vwap) / atr_approx
def compute_confirmed_breakout(
df: pd.DataFrame,
atr_series: pd.Series,
vol_ma: pd.Series,
lookback: int = BREAKOUT_LOOKBACK,
confirm_bars: int = BREAKOUT_CONFIRMATION_BARS,
atr_buffer: float = BREAKOUT_ATR_BUFFER,
) -> pd.Series:
"""
Genuine breakout requires ALL of:
1. Close exceeds prior N-bar high/low by at least atr_buffer * ATR
2. Close holds above/below that level for confirm_bars consecutive bars
3. Volume spike on at least one of the confirmation bars
4. No absorption signal on the breakout bar or confirmation bars
Returns: +1 confirmed bull breakout, -1 confirmed bear, 0 none
"""
prior_high = df["high"].rolling(lookback).max().shift(lookback)
prior_low = df["low"].rolling(lookback).min().shift(lookback)
spike = detect_spikes(df, vol_ma)
absorption = detect_absorption(df, vol_ma)
# Level cleared with buffer
cleared_up = df["close"] > prior_high + atr_series * atr_buffer
cleared_dn = df["close"] < prior_low - atr_series * atr_buffer
# Rolling confirmation: all bars in last confirm_bars cleared the level
held_up = cleared_up.rolling(confirm_bars).min().fillna(0).astype(bool)
held_dn = cleared_dn.rolling(confirm_bars).min().fillna(0).astype(bool)
# Volume spike in confirmation window
vol_ok = spike.rolling(confirm_bars).max().fillna(0).astype(bool)
# No absorption in confirmation window
no_absorption = (~absorption).rolling(confirm_bars).min().fillna(1).astype(bool)
signal = pd.Series(0, index=df.index)
signal[held_up & vol_ok & no_absorption] = 1
signal[held_dn & vol_ok & no_absorption] = -1
return signal
def detect_failed_breakout(
df: pd.DataFrame,
breakout_series: pd.Series,
atr_series: pd.Series,
retest_bars: int = BREAKOUT_RETEST_BARS,
) -> pd.Series:
"""
A breakout that closes back below/above the breakout level within
retest_bars is flagged as a failed (fake) breakout.
Returns: True where a prior confirmed breakout has since failed.
"""
prior_high = df["high"].rolling(BREAKOUT_LOOKBACK).max().shift(BREAKOUT_LOOKBACK)
prior_low = df["low"].rolling(BREAKOUT_LOOKBACK).min().shift(BREAKOUT_LOOKBACK)
had_bull_bo = breakout_series.shift(1).rolling(retest_bars).max().fillna(0) > 0
had_bear_bo = breakout_series.shift(1).rolling(retest_bars).min().fillna(0) < 0
# Failed: price returned below the breakout level
bull_failed = had_bull_bo & (df["close"] < prior_high.shift(retest_bars))
bear_failed = had_bear_bo & (df["close"] > prior_low.shift(retest_bars))
return bull_failed | bear_failed
def analyze_volume(df: pd.DataFrame, atr_series: pd.Series = None) -> Dict[str, Any]:
if atr_series is None:
# Fallback: compute simple ATR if not provided
high, low, prev_close = df["high"], df["low"], df["close"].shift(1)
tr = pd.concat(
[high - low, (high - prev_close).abs(), (low - prev_close).abs()],
axis=1,
).max(axis=1)
atr_series = tr.ewm(alpha=1.0 / ATR_PERIOD, adjust=False).mean()
vol_ma = compute_volume_ma(df, VOLUME_MA_PERIOD)
spike_series = detect_spikes(df, vol_ma)
climax_series = detect_climax(df, vol_ma)
absorption_series = detect_absorption(df, vol_ma)
obv = compute_obv(df)
obv_slope_series = compute_obv_slope(obv, OBV_SLOPE_BARS)
delta = compute_delta_approx(df)
vwap_dev = compute_vwap_deviation(df, VOLUME_MA_PERIOD)
breakout_series = compute_confirmed_breakout(
df, atr_series, vol_ma,
lookback=BREAKOUT_LOOKBACK,
confirm_bars=BREAKOUT_CONFIRMATION_BARS,
atr_buffer=BREAKOUT_ATR_BUFFER,
)
failed_breakout_series = detect_failed_breakout(df, breakout_series, atr_series)
last_vol = float(df["volume"].iloc[-1])
last_vol_ma = float(vol_ma.iloc[-1]) if not np.isnan(vol_ma.iloc[-1]) else 1.0
last_spike = bool(spike_series.iloc[-1])
last_climax = bool(climax_series.iloc[-1])
last_absorption = bool(absorption_series.iloc[-1])
last_breakout = int(breakout_series.iloc[-1])
last_failed_bo = bool(failed_breakout_series.iloc[-1])
last_obv_slope = float(obv_slope_series.iloc[-1]) if not np.isnan(obv_slope_series.iloc[-1]) else 0.0
last_vwap_dev = float(vwap_dev.iloc[-1]) if not np.isnan(vwap_dev.iloc[-1]) else 0.0
vol_ratio = last_vol / last_vol_ma if last_vol_ma > 0 else 1.0
weak_vol = vol_ratio < VOLUME_WEAK_THRESHOLD
delta_5 = float(delta.iloc[-5:].sum())
delta_sign = 1 if delta_5 > 0 else -1
# Recent failed breakout count (rolling 10 bars) — context for trust level
recent_failed = int(failed_breakout_series.iloc[-10:].sum())
# Score construction
if last_absorption:
# Absorption at high: bearish signal masquerading as bullish
base_score = 0.15
elif last_climax:
base_score = 0.25
elif last_breakout != 0 and not last_failed_bo:
base_score = 1.0
elif last_breakout != 0 and last_failed_bo:
base_score = 0.20
elif last_spike and not last_absorption:
base_score = 0.60
elif vol_ratio >= 1.2:
base_score = 0.45
elif vol_ratio >= 0.8:
base_score = 0.30
else:
base_score = 0.10
# OBV slope bonus/penalty (normalized)
obv_bonus = float(np.clip(last_obv_slope * 0.08, -0.12, 0.12))
# VWAP deviation bonus for on-side entries
vwap_bonus = 0.05 if (last_vwap_dev > 0 and last_breakout == 1) else 0.0
vwap_bonus += 0.05 if (last_vwap_dev < 0 and last_breakout == -1) else 0.0
# Penalty for recent failed breakouts (trust decay)
fake_penalty = min(0.20, recent_failed * 0.05)
volume_score = float(np.clip(base_score + obv_bonus + vwap_bonus - fake_penalty, 0.0, 1.0))
return {
"vol_ratio": round(vol_ratio, 3),
"spike": last_spike,
"climax": last_climax,
"absorption": last_absorption,
"weak": weak_vol,
"breakout": last_breakout,
"failed_breakout": last_failed_bo,
"recent_failed_count": recent_failed,
"obv_slope_norm": round(last_obv_slope, 4),
"delta_sum_5": round(delta_5, 2),
"delta_sign": delta_sign,
"vwap_deviation": round(last_vwap_dev, 4),
"volume_score": round(volume_score, 4),
"spike_series": spike_series,
"climax_series": climax_series,
"absorption_series": absorption_series,
"breakout_series": breakout_series,
"failed_breakout_series": failed_breakout_series,
}
|