Trad / ml_engine /data_manager.py
Riy777's picture
Update ml_engine/data_manager.py
3f2975d verified
raw
history blame
17.6 kB
# ============================================================
# 📂 ml_engine/data_manager.py
# (V45.0 - GEM-Architect: Anti-FOMO Revival)
# ============================================================
import asyncio
import httpx
import traceback
import logging
import pandas as pd
import numpy as np
import pandas_ta as ta # سنحتاج بعض الدوال المساعدة
from typing import List, Dict, Any
import ccxt.async_support as ccxt
# ✅ استيراد الدستور الديناميكي (للحفاظ على توافق النظام)
try:
from ml_engine.processor import SystemLimits
except ImportError:
class SystemLimits:
L1_MIN_AFFINITY_SCORE = 15.0
CURRENT_REGIME = "RANGE"
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)
class DataManager:
"""
DataManager V45.0 (Anti-FOMO Revival)
- Restores the STRICT Logic Tree from V15.2.
- Filters: 8% Max Pump, 12% Max Daily, RSI < 70 strict limit.
- Targets: Clean Breakouts & Oversold Reversals ONLY.
"""
def __init__(self, contracts_db, whale_monitor, r2_service=None):
self.contracts_db = contracts_db or {}
self.whale_monitor = whale_monitor
self.r2_service = r2_service
self.exchange = ccxt.kucoin({
'enableRateLimit': True,
'timeout': 60000,
'options': {'defaultType': 'spot'}
})
self.http_client = None
self.market_cache = {}
# القائمة السوداء
self.BLACKLIST_TOKENS = [
'USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'FDUSD', 'EUR', 'PAX',
'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L', 'USDD', 'USDP', 'HT', 'KCS'
]
print(f"📦 [DataManager V45.0] Anti-FOMO Shield Active.")
async def initialize(self):
print(" > [DataManager] Starting initialization...")
try:
self.http_client = httpx.AsyncClient(timeout=30.0)
await self._load_markets()
print(f"✅ [DataManager] Ready (Logic: STRICT Anti-FOMO).")
except Exception as e:
print(f"❌ [DataManager] Init Error: {e}")
traceback.print_exc()
async def _load_markets(self):
try:
if self.exchange and not self.exchange.markets:
await self.exchange.load_markets()
self.market_cache = self.exchange.markets
except Exception: pass
async def close(self):
if self.http_client: await self.http_client.aclose()
if self.exchange: await self.exchange.close()
async def load_contracts_from_r2(self):
if not self.r2_service: return
try:
self.contracts_db = await self.r2_service.load_contracts_db_async()
except: self.contracts_db = {}
def get_contracts_db(self): return self.contracts_db
# ==================================================================
# 🛡️ Layer 1: The Strict Logic Tree (From V15.2)
# ==================================================================
async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
print(f"🔍 [L1 Anti-FOMO] Filtering Universe...")
# 1. المرحلة 0: فلتر الكون (السيولة العالية فقط)
# V15.2 كان يطلب مليون دولار سيولة، سنبقيه كما هو للصرامة
initial_candidates = await self._stage0_universe_filter()
if not initial_candidates:
print("⚠️ [Layer 1] Universe empty.")
return []
# 2. جلب البيانات الفنية لأفضل 300 عملة (كما في V15.2)
top_liquid_candidates = initial_candidates[:300]
print(f" -> Analyzing top {len(top_liquid_candidates)} liquid assets...")
enriched_data = await self._fetch_technical_data_batch(top_liquid_candidates)
# 3. تطبيق شجرة القرار الصارمة
breakout_list = []
reversal_list = []
for item in enriched_data:
# هنا نستخدم منطق V15.2 الأصلي
classification = self._apply_logic_tree(item)
if classification['type'] == 'BREAKOUT':
item['l1_score'] = classification['score']
item['type'] = 'BREAKOUT'
breakout_list.append(item)
elif classification['type'] == 'REVERSAL':
item['l1_score'] = classification['score']
item['type'] = 'REVERSAL'
reversal_list.append(item)
print(f" -> [L1 Logic] Found: {len(breakout_list)} Breakouts, {len(reversal_list)} Reversals.")
# 4. الترتيب والدمج النهائي
# الـ Breakout نرتبهم بالأعلى سكور (فوليوم)
breakout_list.sort(key=lambda x: x['l1_score'], reverse=True)
# الـ Reversal نرتبهم بالأعلى سكور (سكور الارتداد في V15.2 كان 100-RSI، يعني الأعلى أفضل)
reversal_list.sort(key=lambda x: x['l1_score'], reverse=True)
# نختار الأفضل فقط (مزيج متوازن)
final_selection = breakout_list[:25] + reversal_list[:15]
return [
{
'symbol': c['symbol'],
'quote_volume': c.get('quote_volume', 0),
'current_price': c.get('current_price', 0),
'type': c.get('type', 'UNKNOWN'),
'l1_score': c.get('l1_score', 0)
}
for c in final_selection
]
# ==================================================================
# 🔗 Bridge for Backtest Engine Compatibility (IMPORTANT)
# ==================================================================
def _calculate_structural_score(self, df: pd.DataFrame, symbol: str, regime: str) -> (float, List[str]):
"""
[Compatibility Wrapper]
هذه الدالة موجودة لكي لا يتعطل محرك الباكتست (backtest_engine.py).
تقوم بتحويل بيانات الباكتست إلى تنسيق يفهمه منطق V15.2.
"""
# محاكاة تنسيق البيانات الذي يطلبه _apply_logic_tree
# نحتاج تقسيم الـ DF إلى 1H و 15M تقريبياً
try:
# Resample لإنشاء بيانات 1H و 15M من البيانات المدخلة (التي غالباً تكون 15M)
df_15m = df.copy()
agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'}
df_1h = df.resample('1H').agg(agg_dict).dropna()
# تحويلها لقوائم كما يتوقع الكود القديم
ohlcv_1h = df_1h.reset_index()[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
ohlcv_15m = df_15m.reset_index()[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist()
dummy_data = {
'ohlcv_1h': ohlcv_1h,
'ohlcv_15m': ohlcv_15m,
'change_24h': 0.0 # غير متوفر بدقة في الباكتست الجزئي، نتجاوزه
}
res = self._apply_logic_tree(dummy_data)
score = res.get('score', 0.0)
# تحويل السكور ليكون متوافقاً مع الباكتست (حول 20-80)
if res['type'] == 'BREAKOUT':
return score * 20.0, ["BREAKOUT"] # Breakout score is usually low (ratio), boost it
elif res['type'] == 'REVERSAL':
return score, ["REVERSAL"] # Reversal score is already 0-100
return 0.0, ["NONE"]
except Exception:
return 0.0, ["ERROR"]
# ==================================================================
# 🏗️ V15.2 Logic Core (Unchanged Logic)
# ==================================================================
def _apply_logic_tree(self, data: Dict[str, Any]) -> Dict[str, Any]:
try:
df_1h = self._calc_indicators(data['ohlcv_1h'])
df_15m = self._calc_indicators(data['ohlcv_15m'])
except:
return {'type': 'NONE'}
if df_1h.empty or df_15m.empty: return {'type': 'NONE'}
curr_1h = df_1h.iloc[-1]
curr_15m = df_15m.iloc[-1]
# --- Stage 2: Anti-FOMO Filters (STRICT) ---
try:
# حساب التغير في آخر 4 ساعات
if len(df_1h) >= 5:
close_4h_ago = df_1h.iloc[-5]['close']
change_4h = ((curr_1h['close'] - close_4h_ago) / close_4h_ago) * 100
else:
change_4h = 0.0
except: change_4h = 0.0
# 1. فلتر المضخات: ممنوع الدخول إذا صعدت أكثر من 8% في 4 ساعات
if change_4h > 8.0: return {'type': 'NONE'}
# 2. فلتر التذبذب اليومي: ممنوع أكثر من 12% (للبعد عن العملات المجنونة)
if data.get('change_24h', 0) > 12.0: return {'type': 'NONE'}
# 3. فلتر القمة: ممنوع RSI فوق 70 قطعاً
if curr_1h['rsi'] > 70: return {'type': 'NONE'}
# 4. فلتر الامتداد: ممنوع الابتعاد عن المتوسط كثيراً
deviation = (curr_1h['close'] - curr_1h['ema20']) / curr_1h['atr'] if curr_1h['atr'] > 0 else 0
if deviation > 1.8: return {'type': 'NONE'}
# --- Stage 3: Setup Classification ---
# === A. Breakout Logic ===
is_breakout = False
breakout_score = 0.0
# تريند صاعد
bullish_structure = (curr_1h['ema20'] > curr_1h['ema50']) or (curr_1h['close'] > curr_1h['ema20'])
if bullish_structure:
# RSI يجب أن يكون فيه مساحة للصعود (ليس منخفضاً جداً ولا مرتفعاً جداً)
if 45 <= curr_1h['rsi'] <= 68:
if curr_15m['close'] >= curr_15m['ema20']:
# Volatility Squeeze (هدوء ما قبل العاصفة)
avg_range = (df_15m['high'] - df_15m['low']).rolling(10).mean().iloc[-1]
current_range = curr_15m['high'] - curr_15m['low']
if current_range <= avg_range * 1.8:
vol_ma20 = df_15m['volume'].rolling(20).mean().iloc[-1]
# شرط الفوليوم: شمعة الحالية فيها سيولة 1.5 ضعف المتوسط
if curr_15m['volume'] >= 1.5 * vol_ma20:
is_breakout = True
breakout_score = curr_15m['volume'] / vol_ma20 if vol_ma20 > 0 else 1.0
if is_breakout:
return {'type': 'BREAKOUT', 'score': breakout_score}
# === B. Reversal Logic ===
is_reversal = False
reversal_score = 0.0
# تشبع بيعي واضح
if 20 <= curr_1h['rsi'] <= 40:
# السعر هبط مؤخراً
if change_4h <= -2.0:
# البحث عن شمعة انعكاسية (Hammer / Green Body)
last_3 = df_15m.iloc[-3:]
found_rejection = False
for _, row in last_3.iterrows():
rng = row['high'] - row['low']
if rng > 0:
is_green = row['close'] > row['open']
# Hammer pattern logic
lower_wick = min(row['open'], row['close']) - row['low']
body = abs(row['close'] - row['open'])
hammer_shape = lower_wick > (body * 1.5)
if is_green or hammer_shape:
found_rejection = True
break
if found_rejection:
is_reversal = True
# السكور كلما قل الـ RSI كان أفضل للارتداد
reversal_score = (100 - curr_1h['rsi'])
if is_reversal:
return {'type': 'REVERSAL', 'score': reversal_score}
return {'type': 'NONE'}
# ------------------------------------------------------------------
# Manual Indicator Calculation (Pandas pure - Exactly like V15.2)
# ------------------------------------------------------------------
def _calc_indicators(self, ohlcv_list):
if not ohlcv_list: return pd.DataFrame()
df = pd.DataFrame(ohlcv_list, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# RSI Calculation
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))
# EMA
df['ema20'] = df['close'].ewm(span=20, adjust=False).mean()
df['ema50'] = df['close'].ewm(span=50, adjust=False).mean()
# ATR
high_low = df['high'] - df['low']
high_close = np.abs(df['high'] - df['close'].shift())
low_close = np.abs(df['low'] - df['close'].shift())
ranges = pd.concat([high_low, high_close, low_close], axis=1)
true_range = np.max(ranges, axis=1)
df['atr'] = true_range.rolling(14).mean()
df.fillna(0, inplace=True)
return df
# ==================================================================
# 🌌 Stage 0: Universe Filter (V15.2 Logic)
# ==================================================================
async def _stage0_universe_filter(self) -> List[Dict[str, Any]]:
try:
tickers = await self.exchange.fetch_tickers()
candidates = []
for symbol, ticker in tickers.items():
if not symbol.endswith('/USDT'): continue
base_curr = symbol.split('/')[0]
if any(bad in base_curr for bad in self.BLACKLIST_TOKENS): continue
# شرط السيولة الصارم: 1 مليون دولار
quote_vol = ticker.get('quoteVolume')
if not quote_vol or quote_vol < 1_000_000: continue
last_price = ticker.get('last')
if not last_price or last_price < 0.0005: continue
candidates.append({
'symbol': symbol,
'quote_volume': quote_vol,
'current_price': last_price,
'change_24h': float(ticker.get('percentage', 0.0))
})
# ترتيب مبدئي بالحجم
candidates.sort(key=lambda x: x['quote_volume'], reverse=True)
return candidates
except Exception as e:
print(f"❌ [L1 Error] Universe filter failed: {e}")
return []
# ==================================================================
# 🔄 Batch Fetching
# ==================================================================
async def _fetch_technical_data_batch(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
chunk_size = 15
results = []
for i in range(0, len(candidates), chunk_size):
chunk = candidates[i:i + chunk_size]
chunk_tasks = [self._fetch_single_tech_data(c) for c in chunk]
chunk_results = await asyncio.gather(*chunk_tasks)
results.extend([r for r in chunk_results if r is not None])
await asyncio.sleep(0.05)
return results
async def _fetch_single_tech_data(self, candidate: Dict[str, Any]) -> Any:
symbol = candidate['symbol']
try:
# V15.2 Requires 1H and 15M
ohlcv_1h = await self.exchange.fetch_ohlcv(symbol, '1h', limit=60)
ohlcv_15m = await self.exchange.fetch_ohlcv(symbol, '15m', limit=60)
if not ohlcv_1h or len(ohlcv_1h) < 55 or not ohlcv_15m or len(ohlcv_15m) < 55:
return None
candidate['ohlcv_1h'] = ohlcv_1h
candidate['ohlcv_15m'] = ohlcv_15m
return candidate
except Exception:
return None
# ==================================================================
# 🎯 Public Helpers
# ==================================================================
async def get_latest_price_async(self, symbol: str) -> float:
try:
ticker = await self.exchange.fetch_ticker(symbol)
return float(ticker['last'])
except Exception: return 0.0
async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
try:
candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
return candles or []
except Exception: return []
async def get_order_book_snapshot(self, symbol: str, limit: int = 20) -> Dict[str, Any]:
try:
ob = await self.exchange.fetch_order_book(symbol, limit)
return ob
except Exception: return {}