Tradtesting / ml_engine /data_manager.py
Riy777's picture
Update ml_engine/data_manager.py
b3b9df0 verified
raw
history blame
13.2 kB
# ============================================================
# 📂 ml_engine/data_manager.py
# (V60.4 - GEM-Architect: Regime Gating + Robust Data)
# ============================================================
import asyncio
import httpx
import traceback
import ccxt.async_support as ccxt
import logging
import pandas as pd
import numpy as np
from typing import List, Dict, Any
# محاولة استيراد حدود النظام
try:
from ml_engine.processor import SystemLimits
except ImportError:
SystemLimits = None
# تقليل ضوضاء السجلات
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)
class DataManager:
def __init__(self, contracts_db, whale_monitor, r2_service=None):
self.contracts_db = contracts_db or {}
self.whale_monitor = whale_monitor
self.r2_service = r2_service
self.adaptive_hub_ref = None
self.exchange = ccxt.kucoin({
'enableRateLimit': True,
'timeout': 60000,
'options': {'defaultType': 'spot'}
})
self.http_client = None
self.market_cache = {}
# القائمة السوداء
self.BLACKLIST_TOKENS = [
'USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'FDUSD', 'EUR', 'PAX',
'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L'
]
print(f"📦 [DataManager V60.4] Regime Gating (Range Protection) Active.")
async def initialize(self):
print(" > [DataManager] Starting initialization...")
self.http_client = httpx.AsyncClient(timeout=30.0)
await self._load_markets()
await self.load_contracts_from_r2()
async def _load_markets(self):
try:
if self.exchange:
await self.exchange.load_markets()
self.market_cache = self.exchange.markets
except Exception: pass
async def close(self):
if self.http_client: await self.http_client.aclose()
if self.exchange: await self.exchange.close()
async def load_contracts_from_r2(self):
if not self.r2_service: return
try:
self.contracts_db = await self.r2_service.load_contracts_db_async()
except Exception: self.contracts_db = {}
def get_contracts_db(self) -> Dict[str, Any]:
return self.contracts_db
# ==================================================================
# 🧠 Layer 1: Screening + Diagnosis + Regime Gating
# ==================================================================
async def layer1_rapid_screening(self, adaptive_hub_ref=None) -> List[Dict[str, Any]]:
self.adaptive_hub_ref = adaptive_hub_ref
print(f"🔍 [Layer 1] Screening with Regime Gating...")
# 1. فلتر السيولة
initial_candidates = await self._stage0_universe_filter()
if not initial_candidates:
print("⚠️ [Layer 1] Stage 0 returned 0 candidates.")
return []
# 2. جلب البيانات الفنية
top_candidates = initial_candidates[:300]
enriched_data = await self._fetch_technical_data_batch(top_candidates)
final_list = []
for item in enriched_data:
# 3. التصنيف الفني (Breakout vs Reversal)
classification = self._apply_strict_logic_tree(item)
if classification['type'] != 'NONE':
# 4. التشخيص (Diagnosis)
regime_info = self._diagnose_asset_regime(item)
current_regime = regime_info['regime']
# 🔥 5. Regime Gating (بوابة النظام - الحماية من المصيدة)
# إذا السوق عرضي (RANGE) أو ميت (DEAD)، نمنع الاختراقات (BREAKOUT)
# لأن الاختراقات تفشل في هذه الظروف وتصبح مصيدة ثيران.
if current_regime in ['RANGE', 'DEAD'] and classification['type'] == 'BREAKOUT':
# تخطي بصمت (حماية)
continue
# إذا مر من البوابة، نتابع
item['asset_regime'] = current_regime
item['asset_regime_conf'] = regime_info['conf']
# حقن العتبات
if self.adaptive_hub_ref:
dynamic_config = self.adaptive_hub_ref.get_regime_config(current_regime)
item['dynamic_limits'] = dynamic_config
item['l1_sort_score'] = classification['score']
item['strategy_tag'] = classification['type']
final_list.append(item)
# 6. الترتيب النهائي
final_list.sort(key=lambda x: x['l1_sort_score'], reverse=True)
selection = final_list[:50]
print(f"✅ [Layer 1] Passed {len(selection)} candidates (Safe Strategies Only).")
return selection
# ==================================================================
# 🔍 Stage 0: Universe Filter (Robust USD Calc)
# ==================================================================
async def _stage0_universe_filter(self) -> List[Dict[str, Any]]:
try:
print(" 🛡️ [Stage 0] Fetching Tickers...")
tickers = await self.exchange.fetch_tickers()
candidates = []
# القائمة السيادية (تمر دائماً)
SOVEREIGN_COINS = ['BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'XRP/USDT']
reject_stats = {"volume": 0, "change": 0, "blacklist": 0}
debug_printed = False
for symbol, ticker in tickers.items():
if not symbol.endswith('/USDT'): continue
base_curr = symbol.split('/')[0]
if any(bad in base_curr for bad in self.BLACKLIST_TOKENS):
reject_stats["blacklist"] += 1
continue
# حساب الحجم يدوياً لضمان الدقة
base_vol = float(ticker.get('baseVolume') or 0.0)
last_price = float(ticker.get('last') or 0.0)
calc_quote_vol = base_vol * last_price
is_sovereign = symbol in SOVEREIGN_COINS
# طباعة فحص BTC مرة واحدة
if "BTC/USDT" in symbol and not debug_printed:
print(f" 🐛 [DEBUG] BTC Vol: ${calc_quote_vol:,.0f}")
debug_printed = True
# فلتر السيولة (500k) - نتجاوزه للعملات السيادية
if not is_sovereign:
if calc_quote_vol < 500000:
reject_stats["volume"] += 1
continue
# فلتر التغير (20%)
change_pct = ticker.get('percentage')
if change_pct is None: change_pct = 0.0
if abs(change_pct) > 20.0:
reject_stats["change"] += 1
continue
candidates.append({
'symbol': symbol,
'quote_volume': calc_quote_vol,
'current_price': last_price,
'change_24h': change_pct
})
print(f" 📊 [Filter Stats] Total: {len(tickers)} | Passed: {len(candidates)}")
print(f" ❌ Rejected: Vol < 500k ({reject_stats['volume']}) | Change > 20% ({reject_stats['change']})")
candidates.sort(key=lambda x: x['quote_volume'], reverse=True)
return candidates
except Exception as e:
print(f"❌ [L1 Error] Universe filter failed: {e}")
traceback.print_exc()
return []
# ------------------------------------------------------------------
# 🧭 The Diagnoser
# ------------------------------------------------------------------
def _diagnose_asset_regime(self, item: Dict[str, Any]) -> Dict[str, Any]:
try:
if 'df_1h' not in item: return {'regime': 'RANGE', 'conf': 0.0}
df = item['df_1h']
curr = df.iloc[-1]
price = curr['close']
ema20 = curr['ema20']
ema50 = curr['ema50']
rsi = curr['rsi']
atr = curr['atr']
atr_pct = (atr / price) * 100 if price > 0 else 0
regime = "RANGE"
conf = 0.5
if atr_pct < 0.5: return {'regime': 'DEAD', 'conf': 0.9}
if price > ema20 and ema20 > ema50 and rsi > 50:
regime = "BULL"
conf = 0.8 if rsi > 55 else 0.6
elif price < ema20 and ema20 < ema50 and rsi < 50:
regime = "BEAR"
conf = 0.8 if rsi < 45 else 0.6
return {'regime': regime, 'conf': conf}
except Exception: return {'regime': 'RANGE', 'conf': 0.0}
# ------------------------------------------------------------------
# 🛡️ The Logic Tree (Anti-FOMO Tuned)
# ------------------------------------------------------------------
def _apply_strict_logic_tree(self, data: Dict[str, Any]) -> Dict[str, Any]:
try:
df_1h = self._calc_indicators(data['ohlcv_1h_raw'])
df_15m = self._calc_indicators(data['ohlcv_15m_raw'])
data['df_1h'] = df_1h
except: return {'type': 'NONE', 'score': 0}
curr_1h = df_1h.iloc[-1]
curr_15m = df_15m.iloc[-1]
try:
close_4h_ago = df_1h.iloc[-5]['close']
change_4h = ((curr_1h['close'] - close_4h_ago) / close_4h_ago) * 100
except: change_4h = 0.0
# Gates
if change_4h > 12.0: return {'type': 'NONE', 'score': 0}
if curr_1h['rsi'] > 75: return {'type': 'NONE', 'score': 0}
dev = (curr_1h['close'] - curr_1h['ema20']) / curr_1h['atr'] if curr_1h['atr'] > 0 else 0
if dev > 2.2: return {'type': 'NONE', 'score': 0}
# A. Breakout
is_bullish = (curr_1h['ema20'] > curr_1h['ema50']) or (curr_1h['close'] > curr_1h['ema20'])
if is_bullish and (45 <= curr_1h['rsi'] <= 75):
vol_ma = df_15m['volume'].rolling(20).mean().iloc[-1]
if curr_15m['volume'] >= 1.2 * vol_ma:
score = curr_15m['volume'] / vol_ma if vol_ma > 0 else 1.0
return {'type': 'BREAKOUT', 'score': score}
# B. Reversal
if 20 <= curr_1h['rsi'] <= 40 and change_4h <= -2.0:
score = (100 - curr_1h['rsi'])
return {'type': 'REVERSAL', 'score': score}
return {'type': 'NONE', 'score': 0}
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
async def _fetch_technical_data_batch(self, candidates):
chunk_size = 10; results = []
for i in range(0, len(candidates), chunk_size):
chunk = candidates[i:i+chunk_size]
tasks = [self._fetch_single(c) for c in chunk]
res = await asyncio.gather(*tasks)
results.extend([r for r in res if r])
await asyncio.sleep(0.05)
return results
async def _fetch_single(self, c):
try:
h1 = await self.exchange.fetch_ohlcv(c['symbol'], '1h', limit=60)
m15 = await self.exchange.fetch_ohlcv(c['symbol'], '15m', limit=60)
if not h1 or not m15: return None
c['ohlcv'] = {'1h': h1, '15m': m15}
c['ohlcv_1h_raw'] = h1
c['ohlcv_15m_raw'] = m15
return c
except: return None
def _calc_indicators(self, ohlcv):
df = pd.DataFrame(ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
delta = df['c'].diff()
gain = (delta.where(delta>0, 0)).rolling(14).mean()
loss = (-delta.where(delta<0, 0)).rolling(14).mean()
rs = gain/loss
df['rsi'] = 100 - (100/(1+rs))
df['ema20'] = df['c'].ewm(span=20).mean()
df['ema50'] = df['c'].ewm(span=50).mean()
tr = np.maximum(df['h']-df['l'], np.maximum(abs(df['h']-df['c'].shift()), abs(df['l']-df['c'].shift())))
df['atr'] = tr.rolling(14).mean()
df.rename(columns={'o':'open', 'h':'high', 'l':'low', 'c':'close', 'v':'volume'}, inplace=True)
return df.fillna(0)
async def get_latest_price_async(self, symbol):
try: return float((await self.exchange.fetch_ticker(symbol))['last'])
except: return 0.0
async def get_latest_ohlcv(self, symbol, timeframe='5m', limit=100):
try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
except: return []
async def get_order_book_snapshot(self, symbol, limit=20):
try: return await self.exchange.fetch_order_book(symbol, limit)
except: return {}