Tradtesting / ml_engine /sniper_engine.py
Riy777's picture
Update ml_engine/sniper_engine.py
76c05b8
raw
history blame
10.7 kB
# ============================================================
# 🎯 ml_engine/sniper_engine.py (V1.1 - L2 Entry Sniper)
# (معدل ليقرأ النماذج من نفس مجلد Guard V2)
# ============================================================
import os
import sys
import numpy as np
import pandas as pd
import pandas_ta as ta
import lightgbm as lgb
import joblib
import asyncio
import traceback
from typing import List, Dict, Any
# --- [ 💡 💡 💡 ] ---
# [ 🚀 🚀 🚀 ] العتبة الافتراضية (بناءً على طلبك 0.60)
DEFAULT_SNIPER_THRESHOLD = 0.60
# [ 🚀 🚀 🚀 ]
# --- [ 💡 💡 💡 ] ---
N_SPLITS = 5
LOOKBACK_WINDOW = 500 # (الحد الأدنى للشموع 1m لحساب Z-Score (w=500))
# ============================================================
# 🔧 1. دوال هندسة الميزات (مطابقة 100% للمرحلة 2.ب)
# ============================================================
def _z_score_rolling(x, w=500):
"""حساب Z-Score المتدحرج (آمن من القسمة على صفر)"""
r = x.rolling(w).mean()
s = x.rolling(w).std().replace(0, np.nan)
z = (x - r) / s
return z.fillna(0)
def _add_liquidity_proxies(df):
"""
إضافة بدائل السيولة وتدفق الطلب المتقدمة.
"""
df_proxy = df.copy()
if 'datetime' not in df_proxy.index:
if 'timestamp' in df_proxy.columns:
df_proxy['datetime'] = pd.to_datetime(df_proxy['timestamp'], unit='ms')
df_proxy = df_proxy.set_index('datetime')
else:
print("❌ [SniperEngine] خطأ في بدائل السيولة: المؤشر الزمني مفقود.")
return df_proxy
df_proxy['ret'] = df_proxy['close'].pct_change().fillna(0)
df_proxy['dollar_vol'] = df_proxy['close'] * df_proxy['volume']
df_proxy['amihud'] = (df_proxy['ret'].abs() / df_proxy['dollar_vol'].replace(0, np.nan)).fillna(np.inf)
dp = df_proxy['close'].diff()
roll_cov = dp.rolling(64).cov(dp.shift(1))
df_proxy['roll_spread'] = (2 * np.sqrt(np.maximum(0, -roll_cov))).bfill()
sign = np.sign(df_proxy['close'].diff()).fillna(0)
df_proxy['signed_vol'] = sign * df_proxy['volume']
df_proxy['ofi'] = df_proxy['signed_vol'].rolling(30).sum().fillna(0)
buy_vol = (sign > 0) * df_proxy['volume']
sell_vol = (sign < 0) * df_proxy['volume']
imb = (buy_vol.rolling(60).sum() - sell_vol.rolling(60).sum()).abs()
tot = df_proxy['volume'].rolling(60).sum()
df_proxy['vpin'] = (imb / tot.replace(0, np.nan)).fillna(0)
df_proxy['rv_gk'] = (np.log(df_proxy['high'] / df_proxy['low'])**2) / 2 - \
(2 * np.log(2) - 1) * (np.log(df_proxy['close'] / df_proxy['open'])**2)
vwap_window = 20
df_proxy['vwap'] = (df_proxy['close'] * df_proxy['volume']).rolling(vwap_window).sum() / \
df_proxy['volume'].rolling(vwap_window).sum()
df_proxy['vwap_dev'] = (df_proxy['close'] - df_proxy['vwap']).fillna(0)
df_proxy['L_score'] = (
_z_score_rolling(df_proxy['volume']) +
_z_score_rolling(1 / df_proxy['amihud'].replace(np.inf, np.nan)) +
_z_score_rolling(-df_proxy['roll_spread']) +
_z_score_rolling(-df_proxy['rv_gk'].abs()) +
_z_score_rolling(-df_proxy['vwap_dev'].abs()) +
_z_score_rolling(df_proxy['ofi'])
)
return df_proxy
def _add_standard_features(df):
"""إضافة الميزات القياسية (عوائد، زخم، حجم)"""
df_feat = df.copy()
df_feat['return_1m'] = df_feat['close'].pct_change(1)
df_feat['return_3m'] = df_feat['close'].pct_change(3)
df_feat['return_5m'] = df_feat['close'].pct_change(5)
df_feat['return_15m'] = df_feat['close'].pct_change(15)
df_feat['rsi_14'] = ta.rsi(df_feat['close'], length=14)
ema_9 = ta.ema(df_feat['close'], length=9)
ema_21 = ta.ema(df_feat['close'], length=21)
df_feat['ema_9_slope'] = (ema_9 - ema_9.shift(1)) / ema_9.shift(1)
df_feat['ema_21_dist'] = (df_feat['close'] - ema_21) / ema_21
df_feat['atr'] = ta.atr(df_feat['high'], df_feat['low'], df_feat['close'], length=100)
df_feat['vol_zscore_50'] = _z_score_rolling(df_feat['volume'], w=50)
df_feat['candle_range'] = df_feat['high'] - df_feat['low']
df_feat['close_pos_in_range'] = (df_feat['close'] - df_feat['low']) / (df_feat['candle_range'].replace(0, np.nan))
return df_feat
# ============================================================
# 🎯 2. كلاس المحرك الرئيسي (SniperEngine V1)
# ============================================================
class SniperEngine:
# [ 🚀 🚀 🚀 ]
# [ 💡 💡 💡 ] التعديل: تغيير __init__ ليطابق GuardEngine
def __init__(self, models_dir: str):
"""
تهيئة محرك قناص الدخول V1 (L2 Sniper).
Args:
models_dir: المسار المباشر للمجلد (e.g., "ml_models/guard_v2")
"""
self.models_dir = models_dir
# [ 🚀 🚀 🚀 ]
self.models: List[lgb.Booster] = []
self.feature_names: List[str] = []
self.threshold = DEFAULT_SNIPER_THRESHOLD
self.initialized = False
# (جعل LOOKBACK_WINDOW متاحاً للكود الخارجي)
self.LOOKBACK_WINDOW = LOOKBACK_WINDOW
print("🎯 [SniperEngine V1] تم الإنشاء. جاهز للتهيئة.")
async def initialize(self):
"""
تحميل النماذج الخمسة (Ensemble) وقائمة الميزات.
"""
print(f"🎯 [SniperEngine V1] جاري التهيئة من {self.models_dir}...")
try:
# (سيبحث الآن داخل 'ml_models/guard_v2/' عن هذه الملفات)
model_files = [f for f in os.listdir(self.models_dir) if f.startswith('lgbm_guard_v3_fold_')]
if len(model_files) < N_SPLITS:
print(f"❌ [SniperEngine V1] خطأ فادح: تم العثور على {len(model_files)} نماذج فقط، مطلوب {N_SPLITS}.")
print(f" -> (تأكد من وجود ملفات 'lgbm_guard_v3...' داخل {self.models_dir})")
return
for f in sorted(model_files):
model_path = os.path.join(self.models_dir, f)
self.models.append(lgb.Booster(model_file=model_path))
self.feature_names = self.models[0].feature_name()
self.initialized = True
print(f"✅ [SniperEngine V1] تم تحميل {len(self.models)} نماذج قنص بنجاح.")
print(f" -> تم تحديد {len(self.feature_names)} ميزة مطلوبة.")
print(f" -> تم ضبط عتبة الدخول الافتراضية على: {self.threshold * 100:.1f}%")
except Exception as e:
print(f"❌ [SniperEngine V1] فشل التهيئة: {e}")
traceback.print_exc()
self.initialized = False
def set_entry_threshold(self, new_threshold: float):
"""
السماح بتغيير العتبة أثناء التشغيل.
"""
if 0.30 <= new_threshold <= 0.90: # (توسيع النطاق)
print(f"🎯 [SniperEngine V1] تم تحديث العتبة من {self.threshold} إلى {new_threshold}")
self.threshold = new_threshold
else:
print(f"⚠️ [SniperEngine V1] تم تجاهل العتبة (خارج النطاق): {new_threshold}")
def _calculate_features_live(self, df_1m: pd.DataFrame) -> pd.DataFrame:
"""
الدالة الخاصة لتطبيق خط أنابيب الميزات الكامل.
"""
try:
df_with_std_feats = _add_standard_features(df_1m)
df_with_all_feats = _add_liquidity_proxies(df_with_std_feats)
df_final = df_with_all_feats.replace([np.inf, -np.inf], np.nan)
return df_final
except Exception as e:
print(f"❌ [SniperEngine V1] فشل حساب الميزات: {e}")
return pd.DataFrame()
async def check_entry_signal_async(self, ohlcv_1m_data: List[List]) -> Dict[str, Any]:
"""
الدالة الرئيسية: التحقق من إشارة الدخول لأحدث شمعة.
Args:
ohlcv_1m_data: قائمة بالشموع (آخر 500+ شمعة 1m)
"""
if not self.initialized:
return {'signal': 'WAIT', 'reason': 'Sniper Engine not initialized'}
if len(ohlcv_1m_data) < self.LOOKBACK_WINDOW:
return {'signal': 'WAIT', 'reason': f'Insufficient 1m data ({len(ohlcv_1m_data)} < {self.LOOKBACK_WINDOW})'}
try:
df = pd.DataFrame(ohlcv_1m_data, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df[['open', 'high', 'low', 'close', 'volume']] = df[['open', 'high', 'low', 'close', 'volume']].astype(float)
df_features = self._calculate_features_live(df)
if df_features.empty:
return {'signal': 'WAIT', 'reason': 'Feature calculation failed'}
latest_features_row = df_features.iloc[-1:]
X_live = latest_features_row[self.feature_names].fillna(0)
all_probs = []
for model in self.models:
all_probs.append(model.predict(X_live))
stacked_probs = np.stack(all_probs)
mean_probs = np.mean(stacked_probs, axis=0)
avg_prob_1 = mean_probs[0][1]
if avg_prob_1 >= self.threshold:
# (طباعة مخففة، لأنها قد تتكرر كثيراً في وضع L2)
# print(f"🔥 [Sniper V1] إشارة شراء! (الثقة: {avg_prob_1*100:.2f}% > {self.threshold*100:.2f}%)")
return {
'signal': 'BUY',
'confidence_prob': float(avg_prob_1),
'threshold': self.threshold
}
else:
return {
'signal': 'WAIT',
'reason': 'Sniper confidence below threshold',
'confidence_prob': float(avg_prob_1),
'threshold': self.threshold
}
except Exception as e:
print(f"❌ [SniperEngine V1] خطأ فادح في التحقق من الإشارة: {e}")
traceback.print_exc()
return {'signal': 'WAIT', 'reason': f'Exception: {e}'}