# ============================================================ # 🎯 ml_engine/sniper_engine.py # (V2.3 - GEM-Architect: Dynamic Window Fix) # ============================================================ # - Fixed: Rolling window error (min_periods > window). # - Logic: Auto-adjusts min_periods to fit requested window size. # - Integrity: Full ML & OB functionality restored. # ============================================================ import os import time import numpy as np import pandas as pd import pandas_ta as ta import lightgbm as lgb import traceback from typing import List, Dict, Any, Optional N_SPLITS = 5 # السماح ببيانات أقل للمرونة (بدلاً من 500) LOOKBACK_WINDOW = 150 # ============================================================ # 🔧 1. Feature Engineering (Standard + Liquidity Proxies) # ============================================================ def _z_score_rolling(x, w=500): # ✅ FIX: Ensure min_periods never exceeds window size (w) # If w=50 (like in vol_zscore), min_periods becomes 50. # If w=500, min_periods becomes 100 (allowing partial calculation). effective_min = min(w, 100) r = x.rolling(w, min_periods=effective_min).mean() s = x.rolling(w, min_periods=effective_min).std().replace(0, np.nan) z = (x - r) / s return z.fillna(0) def _add_liquidity_proxies(df): """حساب مؤشرات السيولة المتقدمة""" df_proxy = df.copy() if 'datetime' not in df_proxy.index.names and 'timestamp' in df_proxy.columns: df_proxy['datetime'] = pd.to_datetime(df_proxy['timestamp'], unit='ms') df_proxy = df_proxy.set_index('datetime') df_proxy['ret'] = df_proxy['close'].pct_change().fillna(0) df_proxy['dollar_vol'] = df_proxy['close'] * df_proxy['volume'] df_proxy['amihud'] = (df_proxy['ret'].abs() / df_proxy['dollar_vol'].replace(0, np.nan)).fillna(np.inf) dp = df_proxy['close'].diff() # Reduced min_periods for reliability roll_cov = dp.rolling(64, min_periods=20).cov(dp.shift(1)) df_proxy['roll_spread'] = (2 * np.sqrt(np.maximum(0, -roll_cov))).bfill() sign = np.sign(df_proxy['close'].diff()).fillna(0) df_proxy['signed_vol'] = sign * df_proxy['volume'] df_proxy['ofi'] = df_proxy['signed_vol'].rolling(30).sum().fillna(0) buy_vol = (sign > 0) * df_proxy['volume'] sell_vol = (sign < 0) * df_proxy['volume'] imb = (buy_vol.rolling(60).sum() - sell_vol.rolling(60).sum()).abs() tot = df_proxy['volume'].rolling(60).sum() df_proxy['vpin'] = (imb / tot.replace(0, np.nan)).fillna(0) df_proxy['rv_gk'] = (np.log(df_proxy['high'] / df_proxy['low'])**2) / 2 - \ (2 * np.log(2) - 1) * (np.log(df_proxy['close'] / df_proxy['open'])**2) vwap_window = 20 df_proxy['vwap'] = (df_proxy['close'] * df_proxy['volume']).rolling(vwap_window).sum() / \ df_proxy['volume'].rolling(vwap_window).sum() df_proxy['vwap_dev'] = (df_proxy['close'] - df_proxy['vwap']).fillna(0) df_proxy['L_score'] = ( _z_score_rolling(df_proxy['volume']) + _z_score_rolling(1 / df_proxy['amihud'].replace([np.inf, -np.inf], np.nan)) + _z_score_rolling(-df_proxy['roll_spread']) + _z_score_rolling(-df_proxy['rv_gk'].abs()) + _z_score_rolling(-df_proxy['vwap_dev'].abs()) + _z_score_rolling(df_proxy['ofi']) ) return df_proxy def _add_standard_features(df): df_feat = df.copy() df_feat['return_1m'] = df_feat['close'].pct_change(1) df_feat['return_3m'] = df_feat['close'].pct_change(3) df_feat['return_5m'] = df_feat['close'].pct_change(5) df_feat['return_15m'] = df_feat['close'].pct_change(15) df_feat['rsi_14'] = ta.rsi(df_feat['close'], length=14) ema_9 = ta.ema(df_feat['close'], length=9) ema_21 = ta.ema(df_feat['close'], length=21) if ema_9 is not None: df_feat['ema_9_slope'] = (ema_9 - ema_9.shift(1)) / ema_9.shift(1) else: df_feat['ema_9_slope'] = 0 if ema_21 is not None: df_feat['ema_21_dist'] = (df_feat['close'] - ema_21) / ema_21 else: df_feat['ema_21_dist'] = 0 df_feat['atr'] = ta.atr(df_feat['high'], df_feat['low'], df_feat['close'], length=100) # This was causing the error (w=50 vs min=100). Now fixed in helper. df_feat['vol_zscore_50'] = _z_score_rolling(df_feat['volume'], w=50) df_feat['candle_range'] = df_feat['high'] - df_feat['low'] df_feat['close_pos_in_range'] = (df_feat['close'] - df_feat['low']) / (df_feat['candle_range'].replace(0, np.nan)) return df_feat # ============================================================ # 🎯 2. SniperEngine Class (Robust) # ============================================================ class SniperEngine: def __init__(self, models_dir: str): self.models_dir = models_dir self.models: List[lgb.Booster] = [] self.feature_names: List[str] = [] # --- Configurable Thresholds --- self.entry_threshold = 0.40 self.wall_ratio_limit = 0.40 self.weight_ml = 0.60 self.weight_ob = 0.40 self.ob_depth_decay = 0.15 self.max_wall_dist = 0.005 self.max_spread_pct = 0.002 self.spoof_patience = 0 self.initialized = False self.LOOKBACK_WINDOW = LOOKBACK_WINDOW self.ORDER_BOOK_DEPTH = 20 self._wall_cache = {} print("🎯 [SniperEngine V2.3] Dynamic Window Logic Loaded.") def configure_settings(self, threshold: float, wall_ratio: float, w_ml: float = 0.60, w_ob: float = 0.40, max_wall_dist: float = 0.005, max_spread: float = 0.002): self.entry_threshold = threshold self.wall_ratio_limit = wall_ratio self.weight_ml = w_ml self.weight_ob = w_ob self.max_wall_dist = max_wall_dist self.max_spread_pct = max_spread async def initialize(self): print(f"🎯 [SniperEngine] Loading models from {self.models_dir}...") try: model_files = [f for f in os.listdir(self.models_dir) if f.startswith('lgbm_guard_v3_fold_')] if len(model_files) < N_SPLITS: print(f"❌ [SniperEngine] Error: Found {len(model_files)} models, need {N_SPLITS}.") for f in sorted(model_files): model_path = os.path.join(self.models_dir, f) self.models.append(lgb.Booster(model_file=model_path)) if self.models: self.feature_names = self.models[0].feature_name() self.initialized = True print(f"✅ [SniperEngine] Active. WallLimit: {self.wall_ratio_limit}, MaxDist: {self.max_wall_dist*100}%") except Exception as e: print(f"❌ [SniperEngine] Init failed: {e}") traceback.print_exc() self.initialized = False def _calculate_features_live(self, df_1m: pd.DataFrame) -> pd.DataFrame: try: df_with_std_feats = _add_standard_features(df_1m) df_with_all_feats = _add_liquidity_proxies(df_with_std_feats) df_final = df_with_all_feats.replace([np.inf, -np.inf], np.nan) return df_final except Exception as e: print(f"❌ [SniperEngine] Feature calc error: {e}") return pd.DataFrame() # ============================================================================== # 📊 3. Smart Order Book Logic (OKX Safe) # ============================================================================== def _score_order_book(self, order_book: Dict[str, Any], symbol: str = None) -> Dict[str, Any]: try: bids = order_book.get('bids', []) asks = order_book.get('asks', []) if not bids or not asks: return {'score': 0.0, 'imbalance': 0.0, 'veto': True, 'reason': 'Empty OB'} # ✅ Safe Indexing best_bid = float(bids[0][0]) best_ask = float(asks[0][0]) spread_pct = (best_ask - best_bid) / best_bid if spread_pct > self.max_spread_pct: return { 'score': 0.0, 'veto': True, 'reason': f"Wide Spread ({spread_pct:.2%})" } w_bid_vol = 0.0 w_ask_vol = 0.0 total_raw_ask_vol = 0.0 depth = min(len(bids), len(asks), self.ORDER_BOOK_DEPTH) for i in range(depth): weight = 1.0 / (1.0 + (self.ob_depth_decay * i)) bid_vol = float(bids[i][1]) ask_vol = float(asks[i][1]) w_bid_vol += bid_vol * weight w_ask_vol += ask_vol * weight total_raw_ask_vol += ask_vol total_w_vol = w_bid_vol + w_ask_vol weighted_imbalance = w_bid_vol / total_w_vol if total_w_vol > 0 else 0.5 max_valid_wall = 0.0 limit_price = best_ask * (1 + self.max_wall_dist) for item in asks[:depth]: p = float(item[0]) v = float(item[1]) if p <= limit_price: if v > max_valid_wall: max_valid_wall = v wall_ratio = max_valid_wall / total_raw_ask_vol if total_raw_ask_vol > 0 else 0 veto_wall = False veto_reason = "OK" if wall_ratio >= self.wall_ratio_limit: veto_wall = True veto_reason = f"Sell Wall ({wall_ratio:.2f})" if symbol: curr_time = time.time() cache = self._wall_cache.get(symbol, {'last_check': 0, 'count': 0}) if curr_time - cache['last_check'] > 5.0: cache['count'] = 1 else: cache['count'] += 1 cache['last_check'] = curr_time self._wall_cache[symbol] = cache else: if symbol and symbol in self._wall_cache: self._wall_cache[symbol]['count'] = 0 return { 'score': float(weighted_imbalance), 'imbalance': float(weighted_imbalance), 'wall_ratio': float(wall_ratio), 'veto': veto_wall, 'spread_ok': True, 'reason': veto_reason } except Exception as e: return {'score': 0.0, 'veto': True, 'reason': f"OB Error: {e}"} # ============================================================================== # 🎯 4. Main Signal Check (Fixed Logging) # ============================================================================== async def check_entry_signal_async(self, ohlcv_1m_data: List[List], order_book_data: Dict[str, Any] = None, symbol: str = None) -> Dict[str, Any]: if not self.initialized: return {'signal': 'WAIT', 'reason': 'Not initialized'} ml_score = 0.5 ml_reason = "No Data" # ✅ Relaxed Check: Allow partial data (min 100) instead of strict 500 if len(ohlcv_1m_data) >= 100 and self.models: try: df = pd.DataFrame(ohlcv_1m_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float) df_features = self._calculate_features_live(df) if not df_features.empty: X_live = df_features.iloc[-1:][self.feature_names].fillna(0) preds = [m.predict(X_live)[0] for m in self.models] ml_score = float(np.mean(preds)) ml_reason = f"ML:{ml_score:.2f}" except Exception as e: print(f"❌ [Sniper] ML Error: {e}") ml_reason = "ML Err" ob_res = {'score': 0.5, 'imbalance': 0.5, 'veto': False, 'reason': 'No OB'} if order_book_data: ob_res = self._score_order_book(order_book_data, symbol=symbol) ob_str = f"OB:{ob_res['score']:.2f}" if ob_res.get('veto', False): final_score = 0.0 signal = 'WAIT' reason_str = f"⛔ {ob_res['reason']} | {ml_reason} | {ob_str}" else: final_score = (ml_score * self.weight_ml) + (ob_res['score'] * self.weight_ob) if final_score >= self.entry_threshold: signal = 'BUY' reason_str = f"✅ GO: {final_score:.2f} | {ml_reason} | {ob_str}" else: signal = 'WAIT' reason_str = f"📉 Low Score: {final_score:.2f} | {ml_reason} | {ob_str}" return { 'signal': signal, 'confidence_prob': final_score, 'ml_score': ml_score, 'ob_score': ob_res['score'], 'entry_price': float(order_book_data['asks'][0][0]) if order_book_data and order_book_data.get('asks') else 0.0, 'reason': reason_str }