# ============================================================ # 🏛️ governance_engine.py (V38.0 - GEM-Architect: Context-Aware Weights) # ============================================================ # Description: # Evaluates trade quality using 156 INDICATORS. # Update V38.0: Dynamic Weighting based on Strategy Type (Bottom vs Momentum). # ============================================================ import numpy as np import pandas as pd try: import pandas_ta as ta except Exception as _e: ta = None from typing import Dict, Any, List class GovernanceEngine: def __init__(self): # ⚖️ Default Strategic Weights (For Normal/Range Operations) self.DEFAULT_WEIGHTS = { "order_book": 0.25, # 25% "market_structure": 0.20, # 20% "trend": 0.15, # 15% "momentum": 0.15, # 15% "volume": 0.10, # 10% "volatility": 0.05, # 5% "cycle_math": 0.10 # 10% } print("🏛️ [Governance Engine V38.0] Context-Aware Protocols Active.") async def evaluate_trade( self, symbol: str, ohlcv_data: Dict[str, Any], order_book: Dict[str, Any], strategy_type: str = "NORMAL", # ✅ New Parameter verbose: bool = True, include_details: bool = False, use_multi_timeframes: bool = False ) -> Dict[str, Any]: """ Main Execution Entry. Now adapts weights based on 'strategy_type' (SAFE_BOTTOM vs MOMENTUM_LAUNCH). """ try: if ta is None: return self._create_rejection('Missing dependency: pandas_ta') # 1) Data Prep if not isinstance(ohlcv_data, dict) or '15m' not in ohlcv_data: return self._create_rejection("No 15m Data") def _get_df(tf: str) -> Any: if tf not in ohlcv_data: return None df_tf = self._prepare_dataframe(ohlcv_data[tf]) if len(df_tf) < 60: return None return df_tf df15 = _get_df('15m') if df15 is None: return self._create_rejection("Insufficient Data Length (<60)") # optional timeframes (only used when enabled) df_map: Dict[str, pd.DataFrame] = {'15m': df15} if use_multi_timeframes: for tf in ('1h', '4h', '1d'): d = _get_df(tf) if d is not None: df_map[tf] = d if verbose: print(f"\n📝 [Gov Audit] Opening Session for {symbol} ({strategy_type})...") print("-" * 80) # 2) Calculate Domains details_pack = {} # only filled when include_details=True if not use_multi_timeframes: s_trend = self._calc_trend_domain(df15, verbose, include_details, details_pack) s_mom = self._calc_momentum_domain(df15, verbose, include_details, details_pack) s_vol = self._calc_volatility_domain(df15, verbose, include_details, details_pack) s_volu = self._calc_volume_domain(df15, verbose, include_details, details_pack) s_cycle = self._calc_cycle_math_domain(df15, verbose, include_details, details_pack) s_struct = self._calc_structure_domain(df15, verbose, include_details, details_pack) else: # Weighted by timeframe importance; only timeframes available are used tfw = {'15m': 0.50, '1h': 0.30, '4h': 0.20, '1d': 0.10} def _agg(fn, name: str) -> float: total_w = 0.0 acc = 0.0 per_tf = {} for tf, df_tf in df_map.items(): w = tfw.get(tf, 0.1) s = fn(df_tf, False, include_details, details_pack) # per-tf verbose off to avoid noise per_tf[tf] = float(s) acc += w * float(s) total_w += w if include_details: details_pack[f"{name}_per_tf"] = per_tf return (acc / total_w) if total_w > 0 else 0.0 s_trend = _agg(self._calc_trend_domain, "trend") s_mom = _agg(self._calc_momentum_domain, "momentum") s_vol = _agg(self._calc_volatility_domain, "volatility") s_volu = _agg(self._calc_volume_domain, "volume") s_cycle = _agg(self._calc_cycle_math_domain, "cycle_math") s_struct = _agg(self._calc_structure_domain, "structure") if verbose: print(f" 🧩 Multi-TF used: {', '.join(df_map.keys())}") s_ob = self._calc_orderbook_domain(order_book, verbose, include_details, details_pack) if verbose: print("-" * 80) # ============================================================ # ⚙️ DYNAMIC WEIGHT SELECTION # ============================================================ current_weights = self.DEFAULT_WEIGHTS.copy() if strategy_type == 'SAFE_BOTTOM': # للقاع: نغفر ضعف الترند، ونركز على الرياضيات (الانحراف) والتقلبات والبنية current_weights = { "order_book": 0.20, "market_structure": 0.20, # Hammer/Support important "trend": 0.05, # Trend is likely negative, ignore it mostly "momentum": 0.15, # Divergence matters "volume": 0.10, "volatility": 0.15, # Exhaustion/BB Squeeze "cycle_math": 0.15 # Mean Reversion / Z-Score } elif strategy_type == 'MOMENTUM_LAUNCH': # للانطلاق: الترند والزخم ودفتر الطلبات هم الملوك current_weights = { "order_book": 0.25, # Walls needed to push "market_structure": 0.15, "trend": 0.25, # MUST be uptrending "momentum": 0.20, # High RSI is good here "volume": 0.10, # Volume backing the move "volatility": 0.05, "cycle_math": 0.00 # Less relevant for breakout } # ============================================================ # 🛑 1. STRICT CONSENSUS CHECK (Veto Power) # All domains must be non-negative (>= 0). # Exception: For SAFE_BOTTOM, we tolerate negative Trend if other metrics are strong. # ============================================================ domain_scores = { "Trend": s_trend, "Momentum": s_mom, "Volatility": s_vol, "Volume": s_volu, "Math": s_cycle, "Structure": s_struct, "OrderBook": s_ob } veto_domains = [] for name, score in domain_scores.items(): if score < 0: # Special Exemption for Bottom Fishing if strategy_type == 'SAFE_BOTTOM' and name == 'Trend': continue veto_domains.append(name) if veto_domains: reason = f"Vetoed by negative domains: {', '.join(veto_domains)}" if verbose: print(f"⛔ [Governance VETO] {reason}") return self._create_rejection(reason) # 3) Weighted Aggregation using DYNAMIC weights raw_weighted_score = ( (s_trend * current_weights['trend']) + (s_mom * current_weights['momentum']) + (s_vol * current_weights['volatility']) + (s_volu * current_weights['volume']) + (s_cycle * current_weights['cycle_math']) + (s_struct * current_weights['market_structure']) + (s_ob * current_weights['order_book']) ) # 4) Final Scoring & Grading final_score = max(0.0, min(100.0, ((raw_weighted_score + 1) / 2) * 100)) # ============================================================ # 🛑 2. SCORE THRESHOLD CHECK (> 50%) # ============================================================ if final_score <= 50.0: if verbose: print(f"⛔ [Governance FAIL] Score {final_score:.2f}% is too low (Must be > 50%).") return self._create_rejection(f"Low Score: {final_score:.2f}% (Threshold > 50%)") grade = self._get_grade(final_score) result = { "governance_score": round(final_score, 2), "grade": grade, "components": { "trend": round(float(s_trend), 3), "momentum": round(float(s_mom), 3), "volatility": round(float(s_vol), 3), "volume": round(float(s_volu), 3), "cycle_math": round(float(s_cycle), 3), "structure": round(float(s_struct), 3), "order_book": round(float(s_ob), 3), }, "status": "APPROVED", } if include_details: result["details"] = details_pack result["timeframes_used"] = list(df_map.keys()) if use_multi_timeframes else ["15m"] return result except Exception as e: if verbose: print(f"❌ [Governance Critical Error] {e}") return self._create_rejection(f"Exception: {str(e)}") # ============================================================================== # 📈 DOMAIN 1: TREND (Fixed) # ============================================================================== def _calc_trend_domain(self, df: pd.DataFrame, verbose: bool, include_details: bool = False, details_pack: Any = None) -> float: points = 0.0 details = [] try: c = df['close'] # 1. EMA 9 > 21 ema9 = ta.ema(c, 9); ema21 = ta.ema(c, 21) if self._valid(ema9) and self._valid(ema21) and ema9.iloc[-1] > ema21.iloc[-1]: points += 1; details.append("EMA9>21") # 2. EMA 21 > 50 ema50 = ta.ema(c, 50) if self._valid(ema21) and self._valid(ema50) and ema21.iloc[-1] > ema50.iloc[-1]: points += 1; details.append("EMA21>50") # 3. Price > EMA 200 ema200 = ta.ema(c, 200) if self._valid(ema200): if c.iloc[-1] > ema200.iloc[-1]: points += 2; details.append("Price>EMA200") else: points -= 2; details.append("Price st_line.iloc[-1]: points += 1; details.append("ST:Bull") else: points -= 1 # 5. Parabolic SAR psar = ta.psar(df['high'], df['low'], c) if self._valid(psar): # Handle both single series or dataframe return val = psar.iloc[-1] if isinstance(val, pd.Series): val = val.dropna().iloc[0] if not val.dropna().empty else 0 if val != 0: if val < c.iloc[-1]: points += 1; details.append("PSAR:Bull") else: points -= 1 # 6. ADX adx = ta.adx(df['high'], df['low'], c, length=14) if self._valid(adx): val = adx[adx.columns[0]].iloc[-1] dmp = adx[adx.columns[1]].iloc[-1] dmn = adx[adx.columns[2]].iloc[-1] if val > 25: if dmp > dmn: points += 1.5; details.append("ADX:StrongBull") else: points -= 1.5; details.append("ADX:StrongBear") else: details.append("ADX:Weak") # 7. Ichimoku ichi = ta.ichimoku(df['high'], df['low'], c) # Ichimoku returns a tuple of (DataFrame, DataFrame) if ichi is not None and isinstance(ichi, tuple) and self._valid(ichi[0]): span_a = ichi[0][ichi[0].columns[0]].iloc[-1] span_b = ichi[0][ichi[0].columns[1]].iloc[-1] if c.iloc[-1] > span_a and c.iloc[-1] > span_b: points += 1; details.append("Ichi:AboveCloud") # 8. Vortex vortex = ta.vortex(df['high'], df['low'], c) if self._valid(vortex): if vortex[vortex.columns[0]].iloc[-1] > vortex[vortex.columns[1]].iloc[-1]: points += 1; details.append("Vortex:Bull") # 9. Aroon aroon = ta.aroon(df['high'], df['low']) if self._valid(aroon): if aroon[aroon.columns[0]].iloc[-1] > 70: points += 1; details.append("Aroon:Up") elif aroon[aroon.columns[1]].iloc[-1] > 70: points -= 1; details.append("Aroon:Down") # 10. Slope slope = ta.slope(c, length=14) if self._valid(slope) and slope.iloc[-1] > 0: points += 1; details.append("Slope:Pos") # 11. KAMA kama = ta.kama(c, length=10) if self._valid(kama) and c.iloc[-1] > kama.iloc[-1]: points += 1; details.append("KAMA:Bull") # 12. TRIX trix = ta.trix(c, length=30) trix_val = self._safe_last(trix, col='trix') if np.isfinite(trix_val) and trix_val > 0: points += 1; details.append("TRIX:Bull") # 13. DPO dpo = ta.dpo(c, length=20) if self._valid(dpo) and dpo.iloc[-1] > 0: points += 1; details.append("DPO:Bull") # 14. SMA Cluster sma20 = ta.sma(c, 20); sma50 = ta.sma(c, 50) if self._valid(sma20) and self._valid(sma50) and sma20.iloc[-1] > sma50.iloc[-1]: points += 1; details.append("SMA20>50") # 15. ZigZag if df['high'].iloc[-1] > df['high'].iloc[-5]: points += 1; details.append("ZigZag:Up") # 16. MACD Slope macd = ta.macd(c) if self._valid(macd): ml = macd[macd.columns[0]] if ml.iloc[-1] > ml.iloc[-2]: points += 1; details.append("MACD_Slope:Up") # 17. Coppock coppock = ta.coppock(c) if self._valid(coppock) and coppock.iloc[-1] > 0: points += 0.5; details.append("Coppock:Bull") # 18. HMA hma = ta.hma(c, length=9) if self._valid(hma) and c.iloc[-1] > hma.iloc[-1]: points += 1; details.append("HMA:Bull") # 19. Donchian dc = ta.donchian(df['high'], df['low']) if self._valid(dc) and c.iloc[-1] > dc[dc.columns[1]].iloc[-1]: points += 1; details.append("Donchian:Upper") # 20. Keltner kc = ta.kc(df['high'], df['low'], c) if self._valid(kc) and c.iloc[-1] > kc[kc.columns[0]].iloc[-1]: points += 0.5; details.append("Keltner:Safe") except Exception as e: details.append(f"TrendErr:{str(e)[:15]}") norm_score = self._normalize(points, max_possible=22.0) if include_details and details_pack is not None: details_pack['trend'] = details if verbose: print(f" 📈 [TREND] Score: {norm_score:.2f} | {', '.join(details)}") return norm_score # ============================================================================== # 🚀 DOMAIN 2: MOMENTUM (Fixed) # ============================================================================== def _calc_momentum_domain(self, df: pd.DataFrame, verbose: bool, include_details: bool = False, details_pack: Any = None) -> float: points = 0.0 details = [] try: c = df['close'] # 1. RSI rsi = ta.rsi(c, length=14) if self._valid(rsi): val = rsi.iloc[-1] if 50 < val < 70: points += 2; details.append(f"RSI:{val:.0f}") elif val > 70: points -= 1; details.append("RSI:OB") elif val < 30: points += 1; details.append("RSI:OS") # 2. MACD macd = ta.macd(c) if self._valid(macd): if macd[macd.columns[0]].iloc[-1] > macd[macd.columns[2]].iloc[-1]: points += 1.5; details.append("MACD:X_Bull") if macd[macd.columns[1]].iloc[-1] > 0: points += 1; details.append("MACD_Hist:Pos") # 4. Stochastic stoch = ta.stoch(df['high'], df['low'], c) if self._valid(stoch): k = stoch[stoch.columns[0]].iloc[-1] d = stoch[stoch.columns[1]].iloc[-1] if 20 < k < 80 and k > d: points += 1; details.append("Stoch:Bull") # 5. AO ao = ta.ao(df['high'], df['low']) if self._valid(ao) and ao.iloc[-1] > 0 and ao.iloc[-1] > ao.iloc[-2]: points += 1; details.append("AO:Rising") # 6. CCI cci = ta.cci(df['high'], df['low'], c) if self._valid(cci): val = cci.iloc[-1] if val > 100: points += 1; details.append("CCI:>100") elif val < -100: points -= 1 # 7. Williams %R willr = ta.willr(df['high'], df['low'], c) if self._valid(willr) and willr.iloc[-1] < -80: points += 1; details.append("WillR:OS") # 8. ROC roc = ta.roc(c, length=10) if self._valid(roc) and roc.iloc[-1] > 0: points += 1; details.append(f"ROC:{roc.iloc[-1]:.2f}") # 9. MOM mom = ta.mom(c, length=10) if self._valid(mom) and mom.iloc[-1] > 0: points += 1; details.append("MOM:Pos") # 10. PPO ppo = ta.ppo(c) if self._valid(ppo) and ppo[ppo.columns[0]].iloc[-1] > 0: points += 1; details.append("PPO:Pos") # 11. TSI tsi = ta.tsi(c) if self._valid(tsi) and tsi[tsi.columns[0]].iloc[-1] > tsi[tsi.columns[1]].iloc[-1]: points += 1; details.append("TSI:Bull") # 12. Fisher fish = ta.fisher(df['high'], df['low']) if self._valid(fish) and fish[fish.columns[0]].iloc[-1] > fish[fish.columns[1]].iloc[-1]: points += 1; details.append("Fisher:Bull") # 13. CMO cmo = ta.cmo(c, length=14) if self._valid(cmo) and cmo.iloc[-1] > 0: points += 1; details.append("CMO:Pos") # 14. Squeeze bb = ta.bbands(c, length=20) kc = ta.kc(df['high'], df['low'], c) if self._valid(bb) and self._valid(kc): if bb[bb.columns[0]].iloc[-1] < kc[kc.columns[0]].iloc[-1]: points += 1; details.append("SQZ:Active") # 15. UO uo = ta.uo(df['high'], df['low'], c) if self._valid(uo) and uo.iloc[-1] > 50: points += 0.5; details.append("UO:>50") # 16. KDJ (kdj returns df) kdj = ta.kdj(df['high'], df['low'], c) if self._valid(kdj) and kdj[kdj.columns[0]].iloc[-1] > kdj[kdj.columns[1]].iloc[-1]: points += 0.5; details.append("KDJ:Bull") # 17. StochRSI stochrsi = ta.stochrsi(c) if self._valid(stochrsi) and stochrsi[stochrsi.columns[0]].iloc[-1] < 20: points += 1; details.append("StochRSI:OS") # 18. Elder Ray ema13 = ta.ema(c, 13) if self._valid(ema13): bull_power = df['high'] - ema13 if bull_power.iloc[-1] > 0 and bull_power.iloc[-1] > bull_power.iloc[-2]: points += 1; details.append("BullPower:Rising") # 19. Streak if c.iloc[-1] > c.iloc[-2] and c.iloc[-2] > c.iloc[-3]: points += 0.5; details.append("Streak:Up") # 20. Bias ema20 = ta.ema(c, 20) if self._valid(ema20): bias = (c.iloc[-1] - ema20.iloc[-1]) / ema20.iloc[-1] if 0 < bias < 0.05: points += 1; details.append("Bias:Healthy") except Exception as e: details.append(f"MomErr:{str(e)[:10]}") norm_score = self._normalize(points, max_possible=20.0) if include_details and details_pack is not None: details_pack['momentum'] = details if verbose: print(f" 🚀 [MOMENTUM] Score: {norm_score:.2f} | {', '.join(details)}") return norm_score # ============================================================================== # 🌊 DOMAIN 3: VOLATILITY (Fixed) # ============================================================================== def _calc_volatility_domain(self, df: pd.DataFrame, verbose: bool, include_details: bool = False, details_pack: Any = None) -> float: points = 0.0 details = [] try: # 1. Bollinger Bands (Bandwidth + %B) bb = ta.bbands(df['close'], length=20) if self._valid(bb): # pandas_ta names usually: BBL_, BBM_, BBU_, BBB_ (bandwidth), BBP_ (%B) bw_col = self._find_col(bb, ["bbb_", "bandwidth", "bbw"]) pb_col = self._find_col(bb, ["bbp_", "%b", "percentb", "pb"]) width = self._safe_last(bb, col=bw_col) if bw_col else np.nan pct_b = self._safe_last(bb, col=pb_col) if pb_col else np.nan # Bandwidth: smaller -> squeeze, larger -> expansion # Typical BBB values ~ 0.02 - 0.25 in many markets (depends on volatility) if np.isfinite(width): if width < 0.05: points -= 1; details.append("BBW:Squeeze") elif width > 0.18: points += 1; details.append("BBW:Expand") # %B: location within bands (0..1 typically) if np.isfinite(pct_b): if pct_b > 0.90: points += 0.5; details.append("BB%B:High") elif pct_b < 0.10: points -= 0.5; details.append("BB%B:Low") # 3. ATR atr = ta.atr(df['high'], df['low'], df['close'], length=14) if self._valid(atr) and atr.iloc[-1] > atr.iloc[-5]: points += 1; details.append("ATR:Rising") # 4. KC Break kc = ta.kc(df['high'], df['low'], df['close']) if self._valid(kc): kcu_col = self._find_col(kc, ['kcu_', 'upper']) or kc.columns[-1] if df['close'].iloc[-1] > kc[kcu_col].iloc[-1]: points += 2; details.append("KC:Breakout") # 5. Donchian dc = ta.donchian(df['high'], df['low']) if self._valid(dc): dcu_col = self._find_col(dc, ['dcu_', 'upper']) or dc.columns[-1] if df['high'].iloc[-1] >= dc[dcu_col].iloc[-2]: points += 1; details.append("DC:High") # 6. Mass Index mass = ta.massi(df['high'], df['low']) if self._valid(mass) and mass.iloc[-1] > 25: points -= 1; details.append("Mass:Risk") # 7. Chaikin Vol c_vol = ta.stdev(df['close'], 20) if self._valid(c_vol) and c_vol.iloc[-1] > c_vol.iloc[-10]: points += 1; details.append("Vol:Exp") # 8. Ulcer ui = ta.ui(df['close']) if self._valid(ui): val = ui.iloc[-1] if val < 2: points += 1; details.append("UI:Safe") else: points -= 1 # 9. NATR natr = ta.natr(df['high'], df['low'], df['close']) if self._valid(natr) and natr.iloc[-1] > 1.0: points += 1; details.append(f"NATR:{natr.iloc[-1]:.1f}") # 10. Gap if self._valid(atr): gap = abs(df['open'].iloc[-1] - df['close'].iloc[-2]) if gap > atr.iloc[-1] * 0.5: points += 1; details.append("Gap") # 11. Vol Ratio if self._valid(atr): vr = atr.iloc[-1] / atr.iloc[-20] if vr > 1.2: points += 1; details.append("VolRatio:High") # 12. RVI (Proxy) if self._valid(c_vol): std_rsi = ta.rsi(c_vol, length=14) if self._valid(std_rsi) and std_rsi.iloc[-1] > 50: points += 0.5 # 13. StdDev Channel mean = df['close'].rolling(20).mean() std = df['close'].rolling(20).std() z = (df['close'].iloc[-1] - mean.iloc[-1]) / std.iloc[-1] if abs(z) < 2: points += 0.5 # 14. ATS if self._valid(atr): ats = df['close'].iloc[-1] - (atr.iloc[-1] * 2) if df['close'].iloc[-1] > ats: points += 1 # 15. Chop chop = ta.chop(df['high'], df['low'], df['close']) if self._valid(chop): val = chop.iloc[-1] if val < 38.2: points += 1; details.append("Chop:Trend") elif val > 61.8: points -= 1; details.append("Chop:Range") # 16. KC Width if self._valid(kc): kw = kc[kc.columns[0]].iloc[-1] - kc[kc.columns[2]].iloc[-1] if kw > kw * 1.1: points += 0.5 # 17. Accel if df['close'].diff().iloc[-1] > df['close'].diff().iloc[-2]: points += 0.5 # 18. Efficiency denom = (df['high'].rolling(10).max() - df['low'].rolling(10).min()).iloc[-1] if denom > 0: eff = abs(df['close'].iloc[-1] - df['close'].iloc[-10]) / denom if eff > 0.5: points += 1; details.append("Eff:High") # 19. Gator if ta.ema(df['close'], 5).iloc[-1] > ta.ema(df['close'], 13).iloc[-1]: points += 0.5 # 20. Range if self._valid(atr): rng = df['high'].iloc[-1] - df['low'].iloc[-1] if rng > atr.iloc[-1]: points += 1 except Exception as e: details.append(f"VolErr:{str(e)[:10]}") norm_score = self._normalize(points, max_possible=18.0) if include_details and details_pack is not None: details_pack['volatility'] = details if verbose: print(f" 🌊 [VOLATILITY] Score: {norm_score:.2f} | {', '.join(details)}") return norm_score # ============================================================================== # ⛽ DOMAIN 4: VOLUME (Fixed) # ============================================================================== def _calc_volume_domain(self, df: pd.DataFrame, verbose: bool, include_details: bool = False, details_pack: Any = None) -> float: points = 0.0 details = [] try: c = df['close']; v = df['volume'] # 1. OBV obv = ta.obv(c, v) if self._valid(obv) and obv.iloc[-1] > obv.iloc[-5]: points += 1.5; details.append("OBV:Up") # 2. CMF cmf = ta.cmf(df['high'], df['low'], c, v, length=20) if self._valid(cmf): val = cmf.iloc[-1] if val > 0.05: points += 2; details.append(f"CMF:{val:.2f}") elif val < -0.05: points -= 2 # 3. MFI mfi = ta.mfi(df['high'], df['low'], c, v, length=14) if self._valid(mfi): val = mfi.iloc[-1] if 50 < val < 80: points += 1; details.append(f"MFI:{val:.0f}") # 4. Vol > Avg vol_ma = v.rolling(20).mean().iloc[-1] if v.iloc[-1] > vol_ma: points += 1 # 5. Vol Spike if v.iloc[-1] > vol_ma * 1.5: points += 1; details.append("Vol:Spike") # 6. EOM eom = ta.eom(df['high'], df['low'], c, v) if self._valid(eom) and eom.iloc[-1] > 0: points += 1; details.append("EOM:Pos") # 7. VWAP vwap = ta.vwap(df['high'], df['low'], c, v) if self._valid(vwap) and c.iloc[-1] > vwap.iloc[-1]: points += 1; details.append("Price>VWAP") # 8. NVI nvi = ta.nvi(c, v) if self._valid(nvi) and nvi.iloc[-1] > nvi.iloc[-5]: points += 1; details.append("NVI:Smart") # 9. PVI pvi = ta.pvi(c, v) if self._valid(pvi) and pvi.iloc[-1] > pvi.iloc[-5]: points += 0.5 # 10. ADL adl = ta.ad(df['high'], df['low'], c, v) if self._valid(adl) and adl.iloc[-1] > adl.iloc[-2]: points += 1; details.append("ADL:Up") # 11. PVT pvt = ta.pvt(c, v) if self._valid(pvt) and pvt.iloc[-1] > pvt.iloc[-2]: points += 1 # 12. Vol Osc if v.rolling(5).mean().iloc[-1] > v.rolling(10).mean().iloc[-1]: points += 1 # 13. KVO kvo = ta.kvo(df['high'], df['low'], c, v) if self._valid(kvo) and kvo[kvo.columns[0]].iloc[-1] > 0: points += 1; details.append("KVO:Bull") # 14. Force fi = (c.diff() * v).rolling(13).mean() if fi.iloc[-1] > 0: points += 1 # 15. MFI (Bill Williams) if v.iloc[-1] > 0: my_mfi = (df['high'] - df['low']) / v if my_mfi.iloc[-1] > my_mfi.iloc[-2] and v.iloc[-1] > v.iloc[-2]: points += 1 # 16. Buying Climax if v.iloc[-1] > vol_ma * 3 and c.iloc[-1] > df['high'].iloc[-2]: points -= 1 # 17. RVOL if vol_ma > 0: rvol = v.iloc[-1] / vol_ma if rvol > 1.2: points += 1; details.append(f"RVOL:{rvol:.1f}") # 18. Delta delta = (c.iloc[-1] - df['open'].iloc[-1]) * v.iloc[-1] if delta > 0: points += 1 # 20. Low Vol Gap if self._valid(ta.atr(df['high'], df['low'], c)): if v.iloc[-1] < vol_ma * 0.5 and abs(c.diff().iloc[-1]) > ta.atr(df['high'], df['low'], c).iloc[-1]: points -= 1 except Exception as e: details.append(f"VolErr:{str(e)[:10]}") norm_score = self._normalize(points, max_possible=18.0) if include_details and details_pack is not None: details_pack['volume'] = details if verbose: print(f" ⛽ [VOLUME] Score: {norm_score:.2f} | {', '.join(details)}") return norm_score # ============================================================================== # 🔢 DOMAIN 5: CYCLE & MATH (Fixed) # ============================================================================== def _calc_cycle_math_domain(self, df: pd.DataFrame, verbose: bool, include_details: bool = False, details_pack: Any = None) -> float: points = 0.0 details = [] try: c = df['close']; h = df['high']; l = df['low'] # 1. Pivot pp = (h.iloc[-2] + l.iloc[-2] + c.iloc[-2]) / 3 if c.iloc[-1] > pp: points += 1; details.append("AbovePP") # 2. R1 r1 = (2 * pp) - l.iloc[-2] if c.iloc[-1] > r1: points += 1; details.append("AboveR1") # 3. Fib 618 range_h = h.rolling(100).max().iloc[-1] range_l = l.rolling(100).min().iloc[-1] fib_618 = range_l + (range_h - range_l) * 0.618 if c.iloc[-1] > fib_618: points += 1; details.append("AboveFib") # 4. Z-Score zscore = ta.zscore(c, length=30) if self._valid(zscore): z = zscore.iloc[-1] if z < -2: points += 2; details.append("Z:OS") elif -1 < z < 1: points += 0.5; details.append("Z:Norm") # 5. Entropy entropy = ta.entropy(c, length=10) if self._valid(entropy) and entropy.iloc[-1] < 0.5: points += 1; details.append(f"Ent:{entropy.iloc[-1]:.2f}") # 6. Kurtosis kurt = c.rolling(30).kurt().iloc[-1] if kurt > 3: points -= 0.5 # 7. Skew skew = c.rolling(30).skew().iloc[-1] if skew > 0: points += 0.5; details.append("PosSkew") # 8. Variance var = ta.variance(c, length=20) if self._valid(var): points += 0 # 9. StdDev std = c.rolling(20).std().iloc[-1] if c.iloc[-1] > (c.rolling(20).mean().iloc[-1] + std): points += 0.5 # 10. LinReg linreg = ta.linreg(c, length=20) if self._valid(linreg) and c.iloc[-1] > linreg.iloc[-1]: points += 1; details.append("AboveLinReg") # 13. CG cg = ta.cg(c, length=10) if self._valid(cg) and c.diff().iloc[-1] > 0: points += 0.5 # 20. Mean Rev dist_mean = abs(c.iloc[-1] - c.rolling(50).mean().iloc[-1]) if dist_mean > std * 2: points -= 1 else: points += 0.5 except Exception as e: details.append(f"MathErr:{str(e)[:10]}") norm_score = self._normalize(points, max_possible=12.0) if include_details and details_pack is not None: details_pack['cycle_math'] = details if verbose: print(f" 🔢 [MATH] Score: {norm_score:.2f} | {', '.join(details)}") return norm_score # ============================================================================== # 🧱 DOMAIN 6: STRUCTURE (Fixed) # ============================================================================== def _calc_structure_domain(self, df: pd.DataFrame, verbose: bool, include_details: bool = False, details_pack: Any = None) -> float: points = 0.0 details = [] try: closes = df['close'].values; opens = df['open'].values highs = df['high'].values; lows = df['low'].values # 1. HH if highs[-1] > highs[-2] and highs[-2] > highs[-3]: points += 2; details.append("HH") # 2. HL if lows[-1] > lows[-2] and lows[-2] > lows[-3]: points += 2; details.append("HL") # 3. Engulfing if closes[-1] > opens[-1]: if closes[-1] > highs[-2] and opens[-1] < lows[-2]: points += 2; details.append("Engulfing") # 4. Hammer body = abs(closes[-1] - opens[-1]) lower_wick = min(closes[-1], opens[-1]) - lows[-1] if lower_wick > body * 2: points += 2; details.append("Hammer") # 5. BOS recent_high = np.max(highs[-11:-1]) if closes[-1] > recent_high: points += 2; details.append("BOS") # 6. FVG if len(closes) > 3 and lows[-1] > highs[-3] * 1.001: points += 1; details.append("FVG") # 7. Order Block if closes[-2] < opens[-2] and closes[-1] > opens[-1]: if (closes[-1] - opens[-1]) > (opens[-2] - closes[-2]) * 2: points += 1.5; details.append("OB") # 8. SFP if lows[-1] < lows[-2] and closes[-1] > lows[-2]: points += 2.5; details.append("SFP") # 9. Inside Bar if highs[-1] < highs[-2] and lows[-1] > lows[-2]: points -= 0.5; details.append("IB") # 10. Morning Star if closes[-3] < opens[-3] and abs(closes[-2]-opens[-2]) < body*0.5 and closes[-1] > opens[-1]: points += 2; details.append("MorningStar") # 14. Golden Cross Struct m50 = np.mean(closes[-50:]); m200 = np.mean(closes[-200:]) if len(closes)>200 else m50 if m50 > m200: points += 1 # 16. Impulse avg_body = np.mean([abs(c-o) for c,o in zip(closes[-10:], opens[-10:])]) if body > avg_body * 2: points += 1; details.append("Impulse") except Exception as e: details.append(f"PAErr:{str(e)[:10]}") norm_score = self._normalize(points, max_possible=18.0) if include_details and details_pack is not None: details_pack['structure'] = details if verbose: print(f" 🧱 [STRUCTURE] Score: {norm_score:.2f} | {', '.join(details)}") return norm_score # ============================================================================== # 📖 DOMAIN 7: ORDER BOOK (Already Safe, but kept consistent) # ============================================================================== def _calc_orderbook_domain(self, ob: Dict[str, Any], verbose: bool, include_details: bool = False, details_pack: Any = None) -> float: points = 0.0 details = [] if not ob or 'bids' not in ob or 'asks' not in ob: return 0.0 try: bids = np.array(ob['bids'], dtype=float) asks = np.array(ob['asks'], dtype=float) if len(bids) < 20 or len(asks) < 20: return 0.0 bid_vol = np.sum(bids[:20, 1]) ask_vol = np.sum(asks[:20, 1]) imbal = (bid_vol - ask_vol) / (bid_vol + ask_vol) points += imbal * 5; details.append(f"Imbal:{imbal:.2f}") avg_size = np.mean(bids[:50, 1]) if np.max(bids[:20, 1]) > avg_size * 5: points += 3; details.append("BidWall") if np.max(asks[:20, 1]) > avg_size * 5: points -= 3; details.append("AskWall") spread = (asks[0,0] - bids[0,0]) / bids[0,0] * 100 if spread < 0.05: points += 1; details.append("TightSpread") elif spread > 0.2: points -= 1; details.append("WideSpread") if bid_vol > ask_vol * 1.5: points += 2; details.append("Depth:Bull") if bids[0,1] > bids[1,1] and bids[1,1] > bids[2,1]: points += 1; details.append("Slope:Up") # Slippage / depth-to-move (normalized; avoids hard-coded thresholds) mid = (asks[0, 0] + bids[0, 0]) / 2.0 target_p = mid * 1.005 # ~0.5% up move vol_needed = 0.0 for p, s in asks: if p > target_p: break vol_needed += float(s) # Normalize by visible depth (top 20) visible_ask = float(np.sum(asks[:20, 1])) if len(asks) >= 20 else float(np.sum(asks[:, 1])) ratio = (vol_needed / visible_ask) if visible_ask > 0 else 0.0 # Higher ratio => more depth needed to move price => thicker book (safer entry) if ratio > 0.65: points += 1; details.append(f"ThickBook:{ratio:.2f}") elif ratio < 0.30: points -= 1; details.append(f"ThinBook:{ratio:.2f}") else: details.append(f"BookOK:{ratio:.2f}") # Best-level dominance (simple slope proxy) if bids[0, 1] > asks[0, 1] * 2: points += 1; details.append("TopBid>TopAsk*2") top_bid_notional = float(bids[0, 0] * bids[0, 1]) # Dynamic whale detection vs median level notional (top 20) level_notionals = (bids[:20, 0] * bids[:20, 1]).astype(float) med_notional = float(np.median(level_notionals)) if len(level_notionals) else 0.0 if med_notional > 0 and (top_bid_notional / med_notional) >= 8.0: points += 1; details.append(f"WhaleBid:{top_bid_notional/med_notional:.1f}x") except Exception as e: details.append("OBErr") norm_score = self._normalize(points, max_possible=15.0) if include_details and details_pack is not None: details_pack['order_book'] = details if verbose: print(f" 📖 [ORDERBOOK] Score: {norm_score:.2f} | {', '.join(details)}") return norm_score # ============================================================================== # 🔧 Utilities # ============================================================================== def _valid(self, item, col: Any = None) -> bool: """Return True if item has a finite last value (Series) or at least one finite last-row value (DataFrame). If col is provided and item is a DataFrame, checks that column's last value. """ if item is None: return False # pandas_ta sometimes returns tuples (e.g., ichimoku) if isinstance(item, tuple): # consider valid if any element is valid return any(self._valid(x, col=col) for x in item) try: if isinstance(item, pd.Series): if item.empty: return False v = item.iloc[-1] return pd.notna(v) and np.isfinite(v) if isinstance(item, pd.DataFrame): if item.empty: return False if col is not None: c = self._find_col(item, [col]) or (col if col in item.columns else None) if c is None: return False v = item[c].iloc[-1] return pd.notna(v) and np.isfinite(v) # any finite in last row last = item.iloc[-1] if isinstance(last, pd.Series): vals = last.values.astype(float, copy=False) return np.isfinite(vals).any() return False # scalars if isinstance(item, (int, float, np.number)): return np.isfinite(item) return True except Exception: return False def _find_col(self, df: pd.DataFrame, contains_any: List[str]) -> Any: """Find first column whose name contains any of the provided substrings (case-insensitive).""" if df is None or getattr(df, "empty", True): return None cols = list(df.columns) lowered = [str(c).lower() for c in cols] needles = [s.lower() for s in contains_any] for n in needles: for c, lc in zip(cols, lowered): if n in lc: return c return None def _safe_last(self, item, default=np.nan, col: Any = None) -> float: """Safely get last finite value from Series/DataFrame (optionally from matched column).""" if not self._valid(item, col=col): return float(default) try: if isinstance(item, pd.Series): return float(item.iloc[-1]) if isinstance(item, pd.DataFrame): if col is None: # pick first finite value in last row last = item.iloc[-1] for v in last.values: if pd.notna(v) and np.isfinite(v): return float(v) return float(default) c = self._find_col(item, [col]) or (col if col in item.columns else None) if c is None: return float(default) return float(item[c].iloc[-1]) if isinstance(item, (int, float, np.number)): return float(item) return float(default) except Exception: return float(default) def _normalize(self, value: float, max_possible: float) -> float: if max_possible == 0: return 0.0 return max(-1.0, min(1.0, value / max_possible)) def _prepare_dataframe(self, ohlcv: List) -> pd.DataFrame: df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df.set_index('timestamp', inplace=True) cols = ['open', 'high', 'low', 'close', 'volume'] df[cols] = df[cols].astype(float) return df def _get_grade(self, score: float) -> str: if score >= 85: return "ULTRA" if score >= 70: return "STRONG" if score > 50: return "NORMAL" return "REJECT" def _create_rejection(self, reason: str): return { "governance_score": 0.0, "grade": "REJECT", "status": "REJECTED", "reason": reason, "components": {} }