Tradtesting / governance_engine.py
Riy777's picture
Update governance_engine.py
fd663b2 verified
# ============================================================
# ๐Ÿ›๏ธ governance_engine.py (V38.0 - GEM-Architect: Context-Aware Weights)
# ============================================================
# Description:
# Evaluates trade quality using 156 INDICATORS.
# Update V38.0: Dynamic Weighting based on Strategy Type (Bottom vs Momentum).
# ============================================================
import numpy as np
import pandas as pd
try:
import pandas_ta as ta
except Exception as _e:
ta = None
from typing import Dict, Any, List
class GovernanceEngine:
def __init__(self):
# โš–๏ธ Default Strategic Weights (For Normal/Range Operations)
self.DEFAULT_WEIGHTS = {
"order_book": 0.25, # 25%
"market_structure": 0.20, # 20%
"trend": 0.15, # 15%
"momentum": 0.15, # 15%
"volume": 0.10, # 10%
"volatility": 0.05, # 5%
"cycle_math": 0.10 # 10%
}
print("๐Ÿ›๏ธ [Governance Engine V38.0] Context-Aware Protocols Active.")
async def evaluate_trade(
self,
symbol: str,
ohlcv_data: Dict[str, Any],
order_book: Dict[str, Any],
strategy_type: str = "NORMAL", # โœ… New Parameter
verbose: bool = True,
include_details: bool = False,
use_multi_timeframes: bool = False
) -> Dict[str, Any]:
"""
Main Execution Entry.
Now adapts weights based on 'strategy_type' (SAFE_BOTTOM vs MOMENTUM_LAUNCH).
"""
try:
if ta is None:
return self._create_rejection('Missing dependency: pandas_ta')
# 1) Data Prep
if not isinstance(ohlcv_data, dict) or '15m' not in ohlcv_data:
return self._create_rejection("No 15m Data")
def _get_df(tf: str) -> Any:
if tf not in ohlcv_data:
return None
df_tf = self._prepare_dataframe(ohlcv_data[tf])
if len(df_tf) < 60:
return None
return df_tf
df15 = _get_df('15m')
if df15 is None:
return self._create_rejection("Insufficient Data Length (<60)")
# optional timeframes (only used when enabled)
df_map: Dict[str, pd.DataFrame] = {'15m': df15}
if use_multi_timeframes:
for tf in ('1h', '4h', '1d'):
d = _get_df(tf)
if d is not None:
df_map[tf] = d
if verbose:
print(f"\n๐Ÿ“ [Gov Audit] Opening Session for {symbol} ({strategy_type})...")
print("-" * 80)
# 2) Calculate Domains
details_pack = {} # only filled when include_details=True
if not use_multi_timeframes:
s_trend = self._calc_trend_domain(df15, verbose, include_details, details_pack)
s_mom = self._calc_momentum_domain(df15, verbose, include_details, details_pack)
s_vol = self._calc_volatility_domain(df15, verbose, include_details, details_pack)
s_volu = self._calc_volume_domain(df15, verbose, include_details, details_pack)
s_cycle = self._calc_cycle_math_domain(df15, verbose, include_details, details_pack)
s_struct = self._calc_structure_domain(df15, verbose, include_details, details_pack)
else:
# Weighted by timeframe importance; only timeframes available are used
tfw = {'15m': 0.50, '1h': 0.30, '4h': 0.20, '1d': 0.10}
def _agg(fn, name: str) -> float:
total_w = 0.0
acc = 0.0
per_tf = {}
for tf, df_tf in df_map.items():
w = tfw.get(tf, 0.1)
s = fn(df_tf, False, include_details, details_pack) # per-tf verbose off to avoid noise
per_tf[tf] = float(s)
acc += w * float(s)
total_w += w
if include_details:
details_pack[f"{name}_per_tf"] = per_tf
return (acc / total_w) if total_w > 0 else 0.0
s_trend = _agg(self._calc_trend_domain, "trend")
s_mom = _agg(self._calc_momentum_domain, "momentum")
s_vol = _agg(self._calc_volatility_domain, "volatility")
s_volu = _agg(self._calc_volume_domain, "volume")
s_cycle = _agg(self._calc_cycle_math_domain, "cycle_math")
s_struct = _agg(self._calc_structure_domain, "structure")
if verbose:
print(f" ๐Ÿงฉ Multi-TF used: {', '.join(df_map.keys())}")
s_ob = self._calc_orderbook_domain(order_book, verbose, include_details, details_pack)
if verbose:
print("-" * 80)
# ============================================================
# โš™๏ธ DYNAMIC WEIGHT SELECTION
# ============================================================
current_weights = self.DEFAULT_WEIGHTS.copy()
if strategy_type == 'SAFE_BOTTOM':
# ู„ู„ู‚ุงุน: ู†ุบูุฑ ุถุนู ุงู„ุชุฑู†ุฏุŒ ูˆู†ุฑูƒุฒ ุนู„ู‰ ุงู„ุฑูŠุงุถูŠุงุช (ุงู„ุงู†ุญุฑุงู) ูˆุงู„ุชู‚ู„ุจุงุช ูˆุงู„ุจู†ูŠุฉ
current_weights = {
"order_book": 0.20,
"market_structure": 0.20, # Hammer/Support important
"trend": 0.05, # Trend is likely negative, ignore it mostly
"momentum": 0.15, # Divergence matters
"volume": 0.10,
"volatility": 0.15, # Exhaustion/BB Squeeze
"cycle_math": 0.15 # Mean Reversion / Z-Score
}
elif strategy_type == 'MOMENTUM_LAUNCH':
# ู„ู„ุงู†ุทู„ุงู‚: ุงู„ุชุฑู†ุฏ ูˆุงู„ุฒุฎู… ูˆุฏูุชุฑ ุงู„ุทู„ุจุงุช ู‡ู… ุงู„ู…ู„ูˆูƒ
current_weights = {
"order_book": 0.25, # Walls needed to push
"market_structure": 0.15,
"trend": 0.25, # MUST be uptrending
"momentum": 0.20, # High RSI is good here
"volume": 0.10, # Volume backing the move
"volatility": 0.05,
"cycle_math": 0.00 # Less relevant for breakout
}
# ============================================================
# ๐Ÿ›‘ 1. STRICT CONSENSUS CHECK (Veto Power)
# All domains must be non-negative (>= 0).
# Exception: For SAFE_BOTTOM, we tolerate negative Trend if other metrics are strong.
# ============================================================
domain_scores = {
"Trend": s_trend,
"Momentum": s_mom,
"Volatility": s_vol,
"Volume": s_volu,
"Math": s_cycle,
"Structure": s_struct,
"OrderBook": s_ob
}
veto_domains = []
for name, score in domain_scores.items():
if score < 0:
# Special Exemption for Bottom Fishing
if strategy_type == 'SAFE_BOTTOM' and name == 'Trend':
continue
veto_domains.append(name)
if veto_domains:
reason = f"Vetoed by negative domains: {', '.join(veto_domains)}"
if verbose:
print(f"โ›” [Governance VETO] {reason}")
return self._create_rejection(reason)
# 3) Weighted Aggregation using DYNAMIC weights
raw_weighted_score = (
(s_trend * current_weights['trend']) +
(s_mom * current_weights['momentum']) +
(s_vol * current_weights['volatility']) +
(s_volu * current_weights['volume']) +
(s_cycle * current_weights['cycle_math']) +
(s_struct * current_weights['market_structure']) +
(s_ob * current_weights['order_book'])
)
# 4) Final Scoring & Grading
final_score = max(0.0, min(100.0, ((raw_weighted_score + 1) / 2) * 100))
# ============================================================
# ๐Ÿ›‘ 2. SCORE THRESHOLD CHECK (> 50%)
# ============================================================
if final_score <= 50.0:
if verbose:
print(f"โ›” [Governance FAIL] Score {final_score:.2f}% is too low (Must be > 50%).")
return self._create_rejection(f"Low Score: {final_score:.2f}% (Threshold > 50%)")
grade = self._get_grade(final_score)
result = {
"governance_score": round(final_score, 2),
"grade": grade,
"components": {
"trend": round(float(s_trend), 3),
"momentum": round(float(s_mom), 3),
"volatility": round(float(s_vol), 3),
"volume": round(float(s_volu), 3),
"cycle_math": round(float(s_cycle), 3),
"structure": round(float(s_struct), 3),
"order_book": round(float(s_ob), 3),
},
"status": "APPROVED",
}
if include_details:
result["details"] = details_pack
result["timeframes_used"] = list(df_map.keys()) if use_multi_timeframes else ["15m"]
return result
except Exception as e:
if verbose:
print(f"โŒ [Governance Critical Error] {e}")
return self._create_rejection(f"Exception: {str(e)}")
# ==============================================================================
# ๐Ÿ“ˆ DOMAIN 1: TREND (Fixed)
# ==============================================================================
def _calc_trend_domain(self, df: pd.DataFrame, verbose: bool, include_details: bool = False, details_pack: Any = None) -> float:
points = 0.0
details = []
try:
c = df['close']
# 1. EMA 9 > 21
ema9 = ta.ema(c, 9); ema21 = ta.ema(c, 21)
if self._valid(ema9) and self._valid(ema21) and ema9.iloc[-1] > ema21.iloc[-1]:
points += 1; details.append("EMA9>21")
# 2. EMA 21 > 50
ema50 = ta.ema(c, 50)
if self._valid(ema21) and self._valid(ema50) and ema21.iloc[-1] > ema50.iloc[-1]:
points += 1; details.append("EMA21>50")
# 3. Price > EMA 200
ema200 = ta.ema(c, 200)
if self._valid(ema200):
if c.iloc[-1] > ema200.iloc[-1]: points += 2; details.append("Price>EMA200")
else: points -= 2; details.append("Price<EMA200")
# 4. Supertrend
st = ta.supertrend(df['high'], df['low'], c, length=10, multiplier=3)
if self._valid(st):
# Supertrend returns [trend, direction, long, short], usually col 0 is trend line
st_line = st.iloc[:, 0]
if c.iloc[-1] > st_line.iloc[-1]: points += 1; details.append("ST:Bull")
else: points -= 1
# 5. Parabolic SAR
psar = ta.psar(df['high'], df['low'], c)
if self._valid(psar):
# Handle both single series or dataframe return
val = psar.iloc[-1]
if isinstance(val, pd.Series): val = val.dropna().iloc[0] if not val.dropna().empty else 0
if val != 0:
if val < c.iloc[-1]: points += 1; details.append("PSAR:Bull")
else: points -= 1
# 6. ADX
adx = ta.adx(df['high'], df['low'], c, length=14)
if self._valid(adx):
val = adx[adx.columns[0]].iloc[-1]
dmp = adx[adx.columns[1]].iloc[-1]
dmn = adx[adx.columns[2]].iloc[-1]
if val > 25:
if dmp > dmn: points += 1.5; details.append("ADX:StrongBull")
else: points -= 1.5; details.append("ADX:StrongBear")
else: details.append("ADX:Weak")
# 7. Ichimoku
ichi = ta.ichimoku(df['high'], df['low'], c)
# Ichimoku returns a tuple of (DataFrame, DataFrame)
if ichi is not None and isinstance(ichi, tuple) and self._valid(ichi[0]):
span_a = ichi[0][ichi[0].columns[0]].iloc[-1]
span_b = ichi[0][ichi[0].columns[1]].iloc[-1]
if c.iloc[-1] > span_a and c.iloc[-1] > span_b: points += 1; details.append("Ichi:AboveCloud")
# 8. Vortex
vortex = ta.vortex(df['high'], df['low'], c)
if self._valid(vortex):
if vortex[vortex.columns[0]].iloc[-1] > vortex[vortex.columns[1]].iloc[-1]:
points += 1; details.append("Vortex:Bull")
# 9. Aroon
aroon = ta.aroon(df['high'], df['low'])
if self._valid(aroon):
if aroon[aroon.columns[0]].iloc[-1] > 70: points += 1; details.append("Aroon:Up")
elif aroon[aroon.columns[1]].iloc[-1] > 70: points -= 1; details.append("Aroon:Down")
# 10. Slope
slope = ta.slope(c, length=14)
if self._valid(slope) and slope.iloc[-1] > 0: points += 1; details.append("Slope:Pos")
# 11. KAMA
kama = ta.kama(c, length=10)
if self._valid(kama) and c.iloc[-1] > kama.iloc[-1]: points += 1; details.append("KAMA:Bull")
# 12. TRIX
trix = ta.trix(c, length=30)
trix_val = self._safe_last(trix, col='trix')
if np.isfinite(trix_val) and trix_val > 0: points += 1; details.append("TRIX:Bull")
# 13. DPO
dpo = ta.dpo(c, length=20)
if self._valid(dpo) and dpo.iloc[-1] > 0: points += 1; details.append("DPO:Bull")
# 14. SMA Cluster
sma20 = ta.sma(c, 20); sma50 = ta.sma(c, 50)
if self._valid(sma20) and self._valid(sma50) and sma20.iloc[-1] > sma50.iloc[-1]:
points += 1; details.append("SMA20>50")
# 15. ZigZag
if df['high'].iloc[-1] > df['high'].iloc[-5]: points += 1; details.append("ZigZag:Up")
# 16. MACD Slope
macd = ta.macd(c)
if self._valid(macd):
ml = macd[macd.columns[0]]
if ml.iloc[-1] > ml.iloc[-2]: points += 1; details.append("MACD_Slope:Up")
# 17. Coppock
coppock = ta.coppock(c)
if self._valid(coppock) and coppock.iloc[-1] > 0: points += 0.5; details.append("Coppock:Bull")
# 18. HMA
hma = ta.hma(c, length=9)
if self._valid(hma) and c.iloc[-1] > hma.iloc[-1]: points += 1; details.append("HMA:Bull")
# 19. Donchian
dc = ta.donchian(df['high'], df['low'])
if self._valid(dc) and c.iloc[-1] > dc[dc.columns[1]].iloc[-1]:
points += 1; details.append("Donchian:Upper")
# 20. Keltner
kc = ta.kc(df['high'], df['low'], c)
if self._valid(kc) and c.iloc[-1] > kc[kc.columns[0]].iloc[-1]:
points += 0.5; details.append("Keltner:Safe")
except Exception as e: details.append(f"TrendErr:{str(e)[:15]}")
norm_score = self._normalize(points, max_possible=22.0)
if include_details and details_pack is not None:
details_pack['trend'] = details
if verbose: print(f" ๐Ÿ“ˆ [TREND] Score: {norm_score:.2f} | {', '.join(details)}")
return norm_score
# ==============================================================================
# ๐Ÿš€ DOMAIN 2: MOMENTUM (Fixed)
# ==============================================================================
def _calc_momentum_domain(self, df: pd.DataFrame, verbose: bool, include_details: bool = False, details_pack: Any = None) -> float:
points = 0.0
details = []
try:
c = df['close']
# 1. RSI
rsi = ta.rsi(c, length=14)
if self._valid(rsi):
val = rsi.iloc[-1]
if 50 < val < 70: points += 2; details.append(f"RSI:{val:.0f}")
elif val > 70: points -= 1; details.append("RSI:OB")
elif val < 30: points += 1; details.append("RSI:OS")
# 2. MACD
macd = ta.macd(c)
if self._valid(macd):
if macd[macd.columns[0]].iloc[-1] > macd[macd.columns[2]].iloc[-1]:
points += 1.5; details.append("MACD:X_Bull")
if macd[macd.columns[1]].iloc[-1] > 0:
points += 1; details.append("MACD_Hist:Pos")
# 4. Stochastic
stoch = ta.stoch(df['high'], df['low'], c)
if self._valid(stoch):
k = stoch[stoch.columns[0]].iloc[-1]
d = stoch[stoch.columns[1]].iloc[-1]
if 20 < k < 80 and k > d: points += 1; details.append("Stoch:Bull")
# 5. AO
ao = ta.ao(df['high'], df['low'])
if self._valid(ao) and ao.iloc[-1] > 0 and ao.iloc[-1] > ao.iloc[-2]:
points += 1; details.append("AO:Rising")
# 6. CCI
cci = ta.cci(df['high'], df['low'], c)
if self._valid(cci):
val = cci.iloc[-1]
if val > 100: points += 1; details.append("CCI:>100")
elif val < -100: points -= 1
# 7. Williams %R
willr = ta.willr(df['high'], df['low'], c)
if self._valid(willr) and willr.iloc[-1] < -80:
points += 1; details.append("WillR:OS")
# 8. ROC
roc = ta.roc(c, length=10)
if self._valid(roc) and roc.iloc[-1] > 0:
points += 1; details.append(f"ROC:{roc.iloc[-1]:.2f}")
# 9. MOM
mom = ta.mom(c, length=10)
if self._valid(mom) and mom.iloc[-1] > 0:
points += 1; details.append("MOM:Pos")
# 10. PPO
ppo = ta.ppo(c)
if self._valid(ppo) and ppo[ppo.columns[0]].iloc[-1] > 0:
points += 1; details.append("PPO:Pos")
# 11. TSI
tsi = ta.tsi(c)
if self._valid(tsi) and tsi[tsi.columns[0]].iloc[-1] > tsi[tsi.columns[1]].iloc[-1]:
points += 1; details.append("TSI:Bull")
# 12. Fisher
fish = ta.fisher(df['high'], df['low'])
if self._valid(fish) and fish[fish.columns[0]].iloc[-1] > fish[fish.columns[1]].iloc[-1]:
points += 1; details.append("Fisher:Bull")
# 13. CMO
cmo = ta.cmo(c, length=14)
if self._valid(cmo) and cmo.iloc[-1] > 0:
points += 1; details.append("CMO:Pos")
# 14. Squeeze
bb = ta.bbands(c, length=20)
kc = ta.kc(df['high'], df['low'], c)
if self._valid(bb) and self._valid(kc):
if bb[bb.columns[0]].iloc[-1] < kc[kc.columns[0]].iloc[-1]:
points += 1; details.append("SQZ:Active")
# 15. UO
uo = ta.uo(df['high'], df['low'], c)
if self._valid(uo) and uo.iloc[-1] > 50:
points += 0.5; details.append("UO:>50")
# 16. KDJ (kdj returns df)
kdj = ta.kdj(df['high'], df['low'], c)
if self._valid(kdj) and kdj[kdj.columns[0]].iloc[-1] > kdj[kdj.columns[1]].iloc[-1]:
points += 0.5; details.append("KDJ:Bull")
# 17. StochRSI
stochrsi = ta.stochrsi(c)
if self._valid(stochrsi) and stochrsi[stochrsi.columns[0]].iloc[-1] < 20:
points += 1; details.append("StochRSI:OS")
# 18. Elder Ray
ema13 = ta.ema(c, 13)
if self._valid(ema13):
bull_power = df['high'] - ema13
if bull_power.iloc[-1] > 0 and bull_power.iloc[-1] > bull_power.iloc[-2]:
points += 1; details.append("BullPower:Rising")
# 19. Streak
if c.iloc[-1] > c.iloc[-2] and c.iloc[-2] > c.iloc[-3]:
points += 0.5; details.append("Streak:Up")
# 20. Bias
ema20 = ta.ema(c, 20)
if self._valid(ema20):
bias = (c.iloc[-1] - ema20.iloc[-1]) / ema20.iloc[-1]
if 0 < bias < 0.05: points += 1; details.append("Bias:Healthy")
except Exception as e: details.append(f"MomErr:{str(e)[:10]}")
norm_score = self._normalize(points, max_possible=20.0)
if include_details and details_pack is not None:
details_pack['momentum'] = details
if verbose: print(f" ๐Ÿš€ [MOMENTUM] Score: {norm_score:.2f} | {', '.join(details)}")
return norm_score
# ==============================================================================
# ๐ŸŒŠ DOMAIN 3: VOLATILITY (Fixed)
# ==============================================================================
def _calc_volatility_domain(self, df: pd.DataFrame, verbose: bool, include_details: bool = False, details_pack: Any = None) -> float:
points = 0.0
details = []
try:
# 1. Bollinger Bands (Bandwidth + %B)
bb = ta.bbands(df['close'], length=20)
if self._valid(bb):
# pandas_ta names usually: BBL_, BBM_, BBU_, BBB_ (bandwidth), BBP_ (%B)
bw_col = self._find_col(bb, ["bbb_", "bandwidth", "bbw"])
pb_col = self._find_col(bb, ["bbp_", "%b", "percentb", "pb"])
width = self._safe_last(bb, col=bw_col) if bw_col else np.nan
pct_b = self._safe_last(bb, col=pb_col) if pb_col else np.nan
# Bandwidth: smaller -> squeeze, larger -> expansion
# Typical BBB values ~ 0.02 - 0.25 in many markets (depends on volatility)
if np.isfinite(width):
if width < 0.05:
points -= 1; details.append("BBW:Squeeze")
elif width > 0.18:
points += 1; details.append("BBW:Expand")
# %B: location within bands (0..1 typically)
if np.isfinite(pct_b):
if pct_b > 0.90:
points += 0.5; details.append("BB%B:High")
elif pct_b < 0.10:
points -= 0.5; details.append("BB%B:Low")
# 3. ATR
atr = ta.atr(df['high'], df['low'], df['close'], length=14)
if self._valid(atr) and atr.iloc[-1] > atr.iloc[-5]:
points += 1; details.append("ATR:Rising")
# 4. KC Break
kc = ta.kc(df['high'], df['low'], df['close'])
if self._valid(kc):
kcu_col = self._find_col(kc, ['kcu_', 'upper']) or kc.columns[-1]
if df['close'].iloc[-1] > kc[kcu_col].iloc[-1]:
points += 2; details.append("KC:Breakout")
# 5. Donchian
dc = ta.donchian(df['high'], df['low'])
if self._valid(dc):
dcu_col = self._find_col(dc, ['dcu_', 'upper']) or dc.columns[-1]
if df['high'].iloc[-1] >= dc[dcu_col].iloc[-2]:
points += 1; details.append("DC:High")
# 6. Mass Index
mass = ta.massi(df['high'], df['low'])
if self._valid(mass) and mass.iloc[-1] > 25:
points -= 1; details.append("Mass:Risk")
# 7. Chaikin Vol
c_vol = ta.stdev(df['close'], 20)
if self._valid(c_vol) and c_vol.iloc[-1] > c_vol.iloc[-10]:
points += 1; details.append("Vol:Exp")
# 8. Ulcer
ui = ta.ui(df['close'])
if self._valid(ui):
val = ui.iloc[-1]
if val < 2: points += 1; details.append("UI:Safe")
else: points -= 1
# 9. NATR
natr = ta.natr(df['high'], df['low'], df['close'])
if self._valid(natr) and natr.iloc[-1] > 1.0:
points += 1; details.append(f"NATR:{natr.iloc[-1]:.1f}")
# 10. Gap
if self._valid(atr):
gap = abs(df['open'].iloc[-1] - df['close'].iloc[-2])
if gap > atr.iloc[-1] * 0.5: points += 1; details.append("Gap")
# 11. Vol Ratio
if self._valid(atr):
vr = atr.iloc[-1] / atr.iloc[-20]
if vr > 1.2: points += 1; details.append("VolRatio:High")
# 12. RVI (Proxy)
if self._valid(c_vol):
std_rsi = ta.rsi(c_vol, length=14)
if self._valid(std_rsi) and std_rsi.iloc[-1] > 50: points += 0.5
# 13. StdDev Channel
mean = df['close'].rolling(20).mean()
std = df['close'].rolling(20).std()
z = (df['close'].iloc[-1] - mean.iloc[-1]) / std.iloc[-1]
if abs(z) < 2: points += 0.5
# 14. ATS
if self._valid(atr):
ats = df['close'].iloc[-1] - (atr.iloc[-1] * 2)
if df['close'].iloc[-1] > ats: points += 1
# 15. Chop
chop = ta.chop(df['high'], df['low'], df['close'])
if self._valid(chop):
val = chop.iloc[-1]
if val < 38.2: points += 1; details.append("Chop:Trend")
elif val > 61.8: points -= 1; details.append("Chop:Range")
# 16. KC Width
if self._valid(kc):
kw = kc[kc.columns[0]].iloc[-1] - kc[kc.columns[2]].iloc[-1]
if kw > kw * 1.1: points += 0.5
# 17. Accel
if df['close'].diff().iloc[-1] > df['close'].diff().iloc[-2]: points += 0.5
# 18. Efficiency
denom = (df['high'].rolling(10).max() - df['low'].rolling(10).min()).iloc[-1]
if denom > 0:
eff = abs(df['close'].iloc[-1] - df['close'].iloc[-10]) / denom
if eff > 0.5: points += 1; details.append("Eff:High")
# 19. Gator
if ta.ema(df['close'], 5).iloc[-1] > ta.ema(df['close'], 13).iloc[-1]: points += 0.5
# 20. Range
if self._valid(atr):
rng = df['high'].iloc[-1] - df['low'].iloc[-1]
if rng > atr.iloc[-1]: points += 1
except Exception as e: details.append(f"VolErr:{str(e)[:10]}")
norm_score = self._normalize(points, max_possible=18.0)
if include_details and details_pack is not None:
details_pack['volatility'] = details
if verbose: print(f" ๐ŸŒŠ [VOLATILITY] Score: {norm_score:.2f} | {', '.join(details)}")
return norm_score
# ==============================================================================
# โ›ฝ DOMAIN 4: VOLUME (Fixed)
# ==============================================================================
def _calc_volume_domain(self, df: pd.DataFrame, verbose: bool, include_details: bool = False, details_pack: Any = None) -> float:
points = 0.0
details = []
try:
c = df['close']; v = df['volume']
# 1. OBV
obv = ta.obv(c, v)
if self._valid(obv) and obv.iloc[-1] > obv.iloc[-5]:
points += 1.5; details.append("OBV:Up")
# 2. CMF
cmf = ta.cmf(df['high'], df['low'], c, v, length=20)
if self._valid(cmf):
val = cmf.iloc[-1]
if val > 0.05: points += 2; details.append(f"CMF:{val:.2f}")
elif val < -0.05: points -= 2
# 3. MFI
mfi = ta.mfi(df['high'], df['low'], c, v, length=14)
if self._valid(mfi):
val = mfi.iloc[-1]
if 50 < val < 80: points += 1; details.append(f"MFI:{val:.0f}")
# 4. Vol > Avg
vol_ma = v.rolling(20).mean().iloc[-1]
if v.iloc[-1] > vol_ma: points += 1
# 5. Vol Spike
if v.iloc[-1] > vol_ma * 1.5: points += 1; details.append("Vol:Spike")
# 6. EOM
eom = ta.eom(df['high'], df['low'], c, v)
if self._valid(eom) and eom.iloc[-1] > 0: points += 1; details.append("EOM:Pos")
# 7. VWAP
vwap = ta.vwap(df['high'], df['low'], c, v)
if self._valid(vwap) and c.iloc[-1] > vwap.iloc[-1]: points += 1; details.append("Price>VWAP")
# 8. NVI
nvi = ta.nvi(c, v)
if self._valid(nvi) and nvi.iloc[-1] > nvi.iloc[-5]: points += 1; details.append("NVI:Smart")
# 9. PVI
pvi = ta.pvi(c, v)
if self._valid(pvi) and pvi.iloc[-1] > pvi.iloc[-5]: points += 0.5
# 10. ADL
adl = ta.ad(df['high'], df['low'], c, v)
if self._valid(adl) and adl.iloc[-1] > adl.iloc[-2]: points += 1; details.append("ADL:Up")
# 11. PVT
pvt = ta.pvt(c, v)
if self._valid(pvt) and pvt.iloc[-1] > pvt.iloc[-2]: points += 1
# 12. Vol Osc
if v.rolling(5).mean().iloc[-1] > v.rolling(10).mean().iloc[-1]: points += 1
# 13. KVO
kvo = ta.kvo(df['high'], df['low'], c, v)
if self._valid(kvo) and kvo[kvo.columns[0]].iloc[-1] > 0: points += 1; details.append("KVO:Bull")
# 14. Force
fi = (c.diff() * v).rolling(13).mean()
if fi.iloc[-1] > 0: points += 1
# 15. MFI (Bill Williams)
if v.iloc[-1] > 0:
my_mfi = (df['high'] - df['low']) / v
if my_mfi.iloc[-1] > my_mfi.iloc[-2] and v.iloc[-1] > v.iloc[-2]: points += 1
# 16. Buying Climax
if v.iloc[-1] > vol_ma * 3 and c.iloc[-1] > df['high'].iloc[-2]: points -= 1
# 17. RVOL
if vol_ma > 0:
rvol = v.iloc[-1] / vol_ma
if rvol > 1.2: points += 1; details.append(f"RVOL:{rvol:.1f}")
# 18. Delta
delta = (c.iloc[-1] - df['open'].iloc[-1]) * v.iloc[-1]
if delta > 0: points += 1
# 20. Low Vol Gap
if self._valid(ta.atr(df['high'], df['low'], c)):
if v.iloc[-1] < vol_ma * 0.5 and abs(c.diff().iloc[-1]) > ta.atr(df['high'], df['low'], c).iloc[-1]:
points -= 1
except Exception as e: details.append(f"VolErr:{str(e)[:10]}")
norm_score = self._normalize(points, max_possible=18.0)
if include_details and details_pack is not None:
details_pack['volume'] = details
if verbose: print(f" โ›ฝ [VOLUME] Score: {norm_score:.2f} | {', '.join(details)}")
return norm_score
# ==============================================================================
# ๐Ÿ”ข DOMAIN 5: CYCLE & MATH (Fixed)
# ==============================================================================
def _calc_cycle_math_domain(self, df: pd.DataFrame, verbose: bool, include_details: bool = False, details_pack: Any = None) -> float:
points = 0.0
details = []
try:
c = df['close']; h = df['high']; l = df['low']
# 1. Pivot
pp = (h.iloc[-2] + l.iloc[-2] + c.iloc[-2]) / 3
if c.iloc[-1] > pp: points += 1; details.append("AbovePP")
# 2. R1
r1 = (2 * pp) - l.iloc[-2]
if c.iloc[-1] > r1: points += 1; details.append("AboveR1")
# 3. Fib 618
range_h = h.rolling(100).max().iloc[-1]
range_l = l.rolling(100).min().iloc[-1]
fib_618 = range_l + (range_h - range_l) * 0.618
if c.iloc[-1] > fib_618: points += 1; details.append("AboveFib")
# 4. Z-Score
zscore = ta.zscore(c, length=30)
if self._valid(zscore):
z = zscore.iloc[-1]
if z < -2: points += 2; details.append("Z:OS")
elif -1 < z < 1: points += 0.5; details.append("Z:Norm")
# 5. Entropy
entropy = ta.entropy(c, length=10)
if self._valid(entropy) and entropy.iloc[-1] < 0.5:
points += 1; details.append(f"Ent:{entropy.iloc[-1]:.2f}")
# 6. Kurtosis
kurt = c.rolling(30).kurt().iloc[-1]
if kurt > 3: points -= 0.5
# 7. Skew
skew = c.rolling(30).skew().iloc[-1]
if skew > 0: points += 0.5; details.append("PosSkew")
# 8. Variance
var = ta.variance(c, length=20)
if self._valid(var): points += 0
# 9. StdDev
std = c.rolling(20).std().iloc[-1]
if c.iloc[-1] > (c.rolling(20).mean().iloc[-1] + std): points += 0.5
# 10. LinReg
linreg = ta.linreg(c, length=20)
if self._valid(linreg) and c.iloc[-1] > linreg.iloc[-1]:
points += 1; details.append("AboveLinReg")
# 13. CG
cg = ta.cg(c, length=10)
if self._valid(cg) and c.diff().iloc[-1] > 0: points += 0.5
# 20. Mean Rev
dist_mean = abs(c.iloc[-1] - c.rolling(50).mean().iloc[-1])
if dist_mean > std * 2: points -= 1
else: points += 0.5
except Exception as e: details.append(f"MathErr:{str(e)[:10]}")
norm_score = self._normalize(points, max_possible=12.0)
if include_details and details_pack is not None:
details_pack['cycle_math'] = details
if verbose: print(f" ๐Ÿ”ข [MATH] Score: {norm_score:.2f} | {', '.join(details)}")
return norm_score
# ==============================================================================
# ๐Ÿงฑ DOMAIN 6: STRUCTURE (Fixed)
# ==============================================================================
def _calc_structure_domain(self, df: pd.DataFrame, verbose: bool, include_details: bool = False, details_pack: Any = None) -> float:
points = 0.0
details = []
try:
closes = df['close'].values; opens = df['open'].values
highs = df['high'].values; lows = df['low'].values
# 1. HH
if highs[-1] > highs[-2] and highs[-2] > highs[-3]:
points += 2; details.append("HH")
# 2. HL
if lows[-1] > lows[-2] and lows[-2] > lows[-3]:
points += 2; details.append("HL")
# 3. Engulfing
if closes[-1] > opens[-1]:
if closes[-1] > highs[-2] and opens[-1] < lows[-2]:
points += 2; details.append("Engulfing")
# 4. Hammer
body = abs(closes[-1] - opens[-1])
lower_wick = min(closes[-1], opens[-1]) - lows[-1]
if lower_wick > body * 2:
points += 2; details.append("Hammer")
# 5. BOS
recent_high = np.max(highs[-11:-1])
if closes[-1] > recent_high: points += 2; details.append("BOS")
# 6. FVG
if len(closes) > 3 and lows[-1] > highs[-3] * 1.001:
points += 1; details.append("FVG")
# 7. Order Block
if closes[-2] < opens[-2] and closes[-1] > opens[-1]:
if (closes[-1] - opens[-1]) > (opens[-2] - closes[-2]) * 2:
points += 1.5; details.append("OB")
# 8. SFP
if lows[-1] < lows[-2] and closes[-1] > lows[-2]:
points += 2.5; details.append("SFP")
# 9. Inside Bar
if highs[-1] < highs[-2] and lows[-1] > lows[-2]:
points -= 0.5; details.append("IB")
# 10. Morning Star
if closes[-3] < opens[-3] and abs(closes[-2]-opens[-2]) < body*0.5 and closes[-1] > opens[-1]:
points += 2; details.append("MorningStar")
# 14. Golden Cross Struct
m50 = np.mean(closes[-50:]); m200 = np.mean(closes[-200:]) if len(closes)>200 else m50
if m50 > m200: points += 1
# 16. Impulse
avg_body = np.mean([abs(c-o) for c,o in zip(closes[-10:], opens[-10:])])
if body > avg_body * 2: points += 1; details.append("Impulse")
except Exception as e: details.append(f"PAErr:{str(e)[:10]}")
norm_score = self._normalize(points, max_possible=18.0)
if include_details and details_pack is not None:
details_pack['structure'] = details
if verbose: print(f" ๐Ÿงฑ [STRUCTURE] Score: {norm_score:.2f} | {', '.join(details)}")
return norm_score
# ==============================================================================
# ๐Ÿ“– DOMAIN 7: ORDER BOOK (Already Safe, but kept consistent)
# ==============================================================================
def _calc_orderbook_domain(self, ob: Dict[str, Any], verbose: bool, include_details: bool = False, details_pack: Any = None) -> float:
points = 0.0
details = []
if not ob or 'bids' not in ob or 'asks' not in ob: return 0.0
try:
bids = np.array(ob['bids'], dtype=float)
asks = np.array(ob['asks'], dtype=float)
if len(bids) < 20 or len(asks) < 20: return 0.0
bid_vol = np.sum(bids[:20, 1])
ask_vol = np.sum(asks[:20, 1])
imbal = (bid_vol - ask_vol) / (bid_vol + ask_vol)
points += imbal * 5; details.append(f"Imbal:{imbal:.2f}")
avg_size = np.mean(bids[:50, 1])
if np.max(bids[:20, 1]) > avg_size * 5: points += 3; details.append("BidWall")
if np.max(asks[:20, 1]) > avg_size * 5: points -= 3; details.append("AskWall")
spread = (asks[0,0] - bids[0,0]) / bids[0,0] * 100
if spread < 0.05: points += 1; details.append("TightSpread")
elif spread > 0.2: points -= 1; details.append("WideSpread")
if bid_vol > ask_vol * 1.5: points += 2; details.append("Depth:Bull")
if bids[0,1] > bids[1,1] and bids[1,1] > bids[2,1]: points += 1; details.append("Slope:Up")
# Slippage / depth-to-move (normalized; avoids hard-coded thresholds)
mid = (asks[0, 0] + bids[0, 0]) / 2.0
target_p = mid * 1.005 # ~0.5% up move
vol_needed = 0.0
for p, s in asks:
if p > target_p:
break
vol_needed += float(s)
# Normalize by visible depth (top 20)
visible_ask = float(np.sum(asks[:20, 1])) if len(asks) >= 20 else float(np.sum(asks[:, 1]))
ratio = (vol_needed / visible_ask) if visible_ask > 0 else 0.0
# Higher ratio => more depth needed to move price => thicker book (safer entry)
if ratio > 0.65:
points += 1; details.append(f"ThickBook:{ratio:.2f}")
elif ratio < 0.30:
points -= 1; details.append(f"ThinBook:{ratio:.2f}")
else:
details.append(f"BookOK:{ratio:.2f}")
# Best-level dominance (simple slope proxy)
if bids[0, 1] > asks[0, 1] * 2:
points += 1; details.append("TopBid>TopAsk*2")
top_bid_notional = float(bids[0, 0] * bids[0, 1])
# Dynamic whale detection vs median level notional (top 20)
level_notionals = (bids[:20, 0] * bids[:20, 1]).astype(float)
med_notional = float(np.median(level_notionals)) if len(level_notionals) else 0.0
if med_notional > 0 and (top_bid_notional / med_notional) >= 8.0:
points += 1; details.append(f"WhaleBid:{top_bid_notional/med_notional:.1f}x")
except Exception as e: details.append("OBErr")
norm_score = self._normalize(points, max_possible=15.0)
if include_details and details_pack is not None:
details_pack['order_book'] = details
if verbose: print(f" ๐Ÿ“– [ORDERBOOK] Score: {norm_score:.2f} | {', '.join(details)}")
return norm_score
# ==============================================================================
# ๐Ÿ”ง Utilities
# ==============================================================================
def _valid(self, item, col: Any = None) -> bool:
"""Return True if item has a finite last value (Series) or at least one finite last-row value (DataFrame).
If col is provided and item is a DataFrame, checks that column's last value.
"""
if item is None:
return False
# pandas_ta sometimes returns tuples (e.g., ichimoku)
if isinstance(item, tuple):
# consider valid if any element is valid
return any(self._valid(x, col=col) for x in item)
try:
if isinstance(item, pd.Series):
if item.empty:
return False
v = item.iloc[-1]
return pd.notna(v) and np.isfinite(v)
if isinstance(item, pd.DataFrame):
if item.empty:
return False
if col is not None:
c = self._find_col(item, [col]) or (col if col in item.columns else None)
if c is None:
return False
v = item[c].iloc[-1]
return pd.notna(v) and np.isfinite(v)
# any finite in last row
last = item.iloc[-1]
if isinstance(last, pd.Series):
vals = last.values.astype(float, copy=False)
return np.isfinite(vals).any()
return False
# scalars
if isinstance(item, (int, float, np.number)):
return np.isfinite(item)
return True
except Exception:
return False
def _find_col(self, df: pd.DataFrame, contains_any: List[str]) -> Any:
"""Find first column whose name contains any of the provided substrings (case-insensitive)."""
if df is None or getattr(df, "empty", True):
return None
cols = list(df.columns)
lowered = [str(c).lower() for c in cols]
needles = [s.lower() for s in contains_any]
for n in needles:
for c, lc in zip(cols, lowered):
if n in lc:
return c
return None
def _safe_last(self, item, default=np.nan, col: Any = None) -> float:
"""Safely get last finite value from Series/DataFrame (optionally from matched column)."""
if not self._valid(item, col=col):
return float(default)
try:
if isinstance(item, pd.Series):
return float(item.iloc[-1])
if isinstance(item, pd.DataFrame):
if col is None:
# pick first finite value in last row
last = item.iloc[-1]
for v in last.values:
if pd.notna(v) and np.isfinite(v):
return float(v)
return float(default)
c = self._find_col(item, [col]) or (col if col in item.columns else None)
if c is None:
return float(default)
return float(item[c].iloc[-1])
if isinstance(item, (int, float, np.number)):
return float(item)
return float(default)
except Exception:
return float(default)
def _normalize(self, value: float, max_possible: float) -> float:
if max_possible == 0: return 0.0
return max(-1.0, min(1.0, value / max_possible))
def _prepare_dataframe(self, ohlcv: List) -> pd.DataFrame:
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
cols = ['open', 'high', 'low', 'close', 'volume']
df[cols] = df[cols].astype(float)
return df
def _get_grade(self, score: float) -> str:
if score >= 85: return "ULTRA"
if score >= 70: return "STRONG"
if score > 50: return "NORMAL"
return "REJECT"
def _create_rejection(self, reason: str):
return {
"governance_score": 0.0,
"grade": "REJECT",
"status": "REJECTED",
"reason": reason,
"components": {}
}