Tradtesting / ml_engine /patterns.py
Riy777's picture
Update ml_engine/patterns.py
ed354c8 verified
# ml_engine/patterns.py
# (V30.0 - GEM-Architect: Config-Injectable Edition)
import os
import gc
import logging
import numpy as np
import pandas as pd
import pandas_ta as ta
import xgboost as xgb
import warnings
# إخماد تحذيرات المكتبات للحفاظ على نظافة السجلات
warnings.filterwarnings("ignore", category=UserWarning)
# إعداد التسجيل
logging.basicConfig(level=logging.INFO, format='%(asctime)s - [PatternEngine] - %(message)s')
logger = logging.getLogger(__name__)
try:
from hurst import compute_Hc
HURST_AVAILABLE = True
except ImportError:
HURST_AVAILABLE = False
logger.warning("⚠️ مكتبة 'hurst' غير موجودة. سيتم استخدام القيمة الافتراضية.")
# ==============================================================================
# 🛠️ INTERNAL HELPER FUNCTIONS (Essential for Feature Engineering)
# ==============================================================================
def _zv(x):
"""حساب Z-Score الآمن (يتجنب القسمة على صفر)"""
with np.errstate(divide='ignore', invalid='ignore'):
x = np.asarray(x, dtype="float32")
m = np.nanmean(x)
s = np.nanstd(x) + 1e-9
x_norm = (x - m) / s
return np.nan_to_num(x_norm, nan=0.0).astype("float32")
def _ema_np_safe(x, n):
"""حساب المتوسط المتحرك الأسي (EMA) بشكل سريع باستخدام Numpy"""
x = np.asarray(x, dtype="float32")
k = 2.0 / (n + 1.0)
out = np.empty_like(x)
out[0] = x[0] if not np.isnan(x[0]) else 0.0
for i in range(1, len(x)):
val = x[i] if not np.isnan(x[i]) else out[i-1]
out[i] = out[i-1] + k * (val - out[i-1])
return out
def _mc_simple_fast(closes_np: np.ndarray, target_profit=0.005):
"""نسخة سريعة من محاكاة مونت كارلو للميزات الإحصائية"""
try:
if len(closes_np) < 30: return 0.5, 0.0
c = closes_np
cur = float(c[-1])
if cur <= 0: return 0.5, 0.0
lr = np.diff(np.log1p(c))
lr = lr[np.isfinite(lr)]
if len(lr) < 20: return 0.5, 0.0
mu = np.mean(lr)
sigma = np.std(lr)
if sigma < 1e-9: return 0.5, 0.0
n_sims = 500
drift = (mu - 0.5 * sigma**2)
diffusion = sigma * np.random.standard_t(df=10, size=n_sims)
sim_prices = cur * np.exp(drift + diffusion)
var95 = np.percentile(sim_prices, 5)
var95_pct = (cur - var95) / (cur + 1e-9)
prob_gain = np.mean(sim_prices >= cur * (1 + target_profit))
return float(prob_gain), float(var95_pct)
except Exception:
return 0.5, 0.0
def _transform_candles_for_ml(df_window: pd.DataFrame):
"""
تحويل نافذة من الشموع (200 شمعة) إلى متجه ميزات جاهز لنموذج ML.
"""
try:
if len(df_window) < 200:
return None
df = df_window.iloc[-200:].copy()
o = df["open"].to_numpy(dtype="float32")
h = df["high"].to_numpy(dtype="float32")
l = df["low"].to_numpy(dtype="float32")
c = df["close"].to_numpy(dtype="float32")
v = df["volume"].to_numpy(dtype="float32")
# 1. Basic Features
base = np.stack([o, h, l, c, v], axis=1)
base_z = _zv(base)
# 2. Extra Features
lr = np.zeros_like(c); lr[1:] = np.diff(np.log1p(c))
rng = (h - l) / (c + 1e-9)
extra = np.stack([lr, rng], axis=1)
extra_z = _zv(extra)
# 3. Technical Indicators
ema9 = _ema_np_safe(c, 9)
ema21 = _ema_np_safe(c, 21)
ema50 = _ema_np_safe(c, 50)
ema200 = _ema_np_safe(c, 200)
slope21 = np.concatenate([[0.0], np.diff(ema21)])
slope50 = np.concatenate([[0.0], np.diff(ema50)])
try: rsi = ta.rsi(pd.Series(c), length=14).fillna(50).to_numpy(dtype="float32")
except: rsi = np.full_like(c, 50.0, dtype="float32")
try:
macd_data = ta.macd(pd.Series(c), fast=12, slow=26, signal=9)
macd_line = macd_data.iloc[:, 0].fillna(0).to_numpy(dtype="float32")
macd_hist = macd_data.iloc[:, 2].fillna(0).to_numpy(dtype="float32")
except:
macd_line = np.zeros_like(c); macd_hist = np.zeros_like(c)
try: atr = ta.atr(pd.Series(h), pd.Series(l), pd.Series(c), length=14).fillna(0).to_numpy(dtype="float32")
except: atr = np.zeros_like(c)
try:
bb = ta.bbands(pd.Series(c), length=20, std=2)
bb_p = ((c - bb.iloc[:, 0]) / (bb.iloc[:, 2] - bb.iloc[:, 0] + 1e-9)).fillna(0.5).to_numpy(dtype="float32")
except: bb_p = np.full_like(c, 0.5)
try: obv = ta.obv(pd.Series(c), pd.Series(v)).fillna(0).to_numpy(dtype="float32")
except: obv = np.zeros_like(c)
indicators = np.stack([ema9, ema21, ema50, ema200, slope21, slope50, rsi, macd_line, macd_hist, atr / (c + 1e-9), bb_p, obv], axis=1)
indicators_z = _zv(indicators)
# 4. Flatten
X_seq = np.concatenate([base_z, extra_z, indicators_z], axis=1)
X_seq_flat = X_seq.reshape(1, -1)
# 5. Static Features
try: mc_p, mc_var = _mc_simple_fast(c[-100:])
except: mc_p, mc_var = 0.5, 0.0
hurst_val = 0.5
if HURST_AVAILABLE:
try: hurst_val = compute_Hc(c[-100:], kind='price', simplified=True)[0]
except: pass
X_stat = np.array([[mc_p, mc_var, hurst_val]], dtype="float32")
# 6. Final Merge
X_final = np.concatenate([X_seq_flat, X_stat], axis=1)
X_final = np.nan_to_num(X_final, nan=0.0, posinf=0.0, neginf=0.0)
return X_final
except Exception:
return None
# ==============================================================================
# 🤖 CHART PATTERN ANALYZER CLASS
# ==============================================================================
class ChartPatternAnalyzer:
def __init__(self, models_dir="ml_models/xgboost_pattern2"):
"""
تهيئة محرك الأنماط الموحد (Unified Pattern Engine).
"""
self.models_dir = models_dir
self.models = {}
# ✅ القيم الافتراضية (Placeholder)
# سيتم الكتابة عليها بواسطة Processor عند التشغيل
self.timeframe_weights = {'15m': 0.40, '1h': 0.30, '5m': 0.20, '4h': 0.10, '1d': 0.00}
self.thresh_bullish = 0.60
self.thresh_bearish = 0.40
self.supported_timeframes = list(self.timeframe_weights.keys())
self.initialized = False
def configure_thresholds(self, weights: dict, bull_thresh: float, bear_thresh: float):
"""
دالة استقبال الإعدادات من المعالج المركزي (Processor Injection).
"""
self.timeframe_weights = weights
self.thresh_bullish = bull_thresh
self.thresh_bearish = bear_thresh
self.supported_timeframes = list(weights.keys())
logger.info(f"🔧 [PatternEngine] Config Injected: Bull > {self.thresh_bullish}, Weights set.")
async def initialize(self):
"""تحميل نماذج XGBoost"""
if self.initialized: return True
logger.info(f"⚡ [PatternEngine] Loading models from {self.models_dir}...")
if not os.path.exists(self.models_dir):
logger.error(f"❌ Models directory not found: {self.models_dir}")
return False
loaded_count = 0
# تحميل النماذج بناءً على الأطر الزمنية المدعومة (التي تم تكوينها)
for tf in self.supported_timeframes:
model_path = os.path.join(self.models_dir, f"xgb_{tf}.json")
if os.path.exists(model_path):
try:
model = xgb.Booster()
model.load_model(model_path)
self.models[tf] = model
loaded_count += 1
except Exception as e:
logger.error(f" ❌ Failed to load {tf}: {e}")
if loaded_count > 0:
self.initialized = True
logger.info(f"✅ [PatternEngine] Initialized with {loaded_count} models.")
return True
return False
async def detect_chart_patterns(self, ohlcv_data: dict) -> dict:
"""تحليل الأنماط لكافة الأطر الزمنية المتوفرة"""
if not self.initialized:
return self._get_empty_result("Not initialized")
details = {}
weighted_score_sum = 0.0
total_weight_used = 0.0
for tf, model in self.models.items():
candles = ohlcv_data.get(tf)
# نحتاج 200 شمعة على الأقل للتحويل
if candles and len(candles) >= 200:
try:
df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# استخدام الدالة الداخلية المدمجة
X_features = _transform_candles_for_ml(df)
if X_features is not None:
dtest = xgb.DMatrix(X_features)
prob_up = model.predict(dtest)[0]
details[tf] = float(prob_up)
# ✅ استخدام الوزن الديناميكي
weight = self.timeframe_weights.get(tf, 0.0)
if weight > 0:
weighted_score_sum += prob_up * weight
total_weight_used += weight
except Exception:
details[tf] = None
else:
details[tf] = None
final_score = 0.0
if total_weight_used > 0:
final_score = weighted_score_sum / total_weight_used
# ✅ استخدام العتبات الديناميكية للتصنيف
pattern_text = "Neutral"
if final_score >= self.thresh_bullish:
pattern_text = "Bullish Signal"
elif final_score <= self.thresh_bearish:
pattern_text = "Bearish Signal"
return {
'pattern_detected': pattern_text,
'pattern_confidence': float(final_score),
'details': details
}
def _get_empty_result(self, reason=""):
return {'pattern_detected': 'Neutral', 'pattern_confidence': 0.0, 'details': {'error': reason}}
def clear_memory(self):
"""تنظيف الذاكرة"""
self.models.clear()
self.initialized = False
gc.collect()
logger.info("🧹 [PatternEngine] Memory cleared.")