Tradcloneai / ml_engine /data_manager.py
Riy777's picture
Update ml_engine/data_manager.py
369a5e8 verified
# ============================================================
# 📂 ml_engine/data_manager.py
# (V68.5 - GEM-Architect: Full Integrity & Breadth Scanner)
# ============================================================
import asyncio
import httpx
import traceback
import ccxt.async_support as ccxt
import logging
import pandas as pd
import numpy as np
import time
from typing import List, Dict, Any, Optional
# محاولة استيراد حدود النظام
try:
from ml_engine.processor import SystemLimits
except ImportError:
SystemLimits = None
# تقليل ضوضاء السجلات
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)
class DataManager:
def __init__(self, contracts_db, whale_monitor, r2_service=None):
self.contracts_db = contracts_db or {}
self.whale_monitor = whale_monitor
self.r2_service = r2_service
self.adaptive_hub_ref = None
# إعداد الاتصال بـ KuCoin
self.exchange = ccxt.kucoin({
'enableRateLimit': True,
'timeout': 60000,
'options': {'defaultType': 'spot'}
})
self.http_client = None
self.market_cache = {}
# القائمة السوداء للعملات غير المرغوبة (Leveraged/Stable/Fiat)
self.BLACKLIST_TOKENS = [
'USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'FDUSD', 'EUR', 'PAX',
'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L', '5S', '5L'
]
print(f"📦 [DataManager V68.5] Initialized (Full Integrity Mode).")
async def initialize(self):
"""تهيئة الاتصال والأسواق"""
print(" > [DataManager] Starting initialization...")
self.http_client = httpx.AsyncClient(timeout=30.0)
await self._load_markets()
await self.load_contracts_from_r2()
async def _load_markets(self):
try:
if self.exchange:
await self.exchange.load_markets()
self.market_cache = self.exchange.markets
print(f" > [DataManager] Markets loaded: {len(self.market_cache)}")
except Exception as e:
print(f" ⚠️ [DataManager] Market load warning: {e}")
async def close(self):
if self.http_client: await self.http_client.aclose()
if self.exchange: await self.exchange.close()
async def load_contracts_from_r2(self):
if not self.r2_service: return
try:
self.contracts_db = await self.r2_service.load_contracts_db_async()
except Exception: self.contracts_db = {}
def get_contracts_db(self) -> Dict[str, Any]:
return self.contracts_db
# ==================================================================
# 🌍 Global Market Validator V2 (Smart Breadth Scanner)
# ==================================================================
async def check_global_market_health(self) -> Dict[str, Any]:
"""
يفحص صحة السوق العامة باستخدام منطق مزدوج:
1. فحص سلامة BTC (تجنب الانهيارات).
2. فحص نشاط العملات البديلة (Altcoin Pulse).
"""
try:
# 1. جلب بيانات البيتكوين الأساسية
btc_ohlcv = await self.exchange.fetch_ohlcv('BTC/USDT', '1d', limit=30)
if not btc_ohlcv: return {'is_safe': True, 'reason': 'No BTC Data - Bypassed'}
df = pd.DataFrame(btc_ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
current_close = df['c'].iloc[-1]
prev_close = df['c'].iloc[-2]
# --- [ CRITICAL CHECK ] ---
# إذا كان البيتكوين ينهار، نوقف كل شيء لأن السيولة ستجف
daily_change = (current_close - prev_close) / prev_close
if daily_change < -0.045: # السماح بمرونة أكبر قليلاً (-4.5%)
return {'is_safe': False, 'reason': f'🚨 BTC CRASHING ({daily_change*100:.2f}%)'}
# فحص المتوسطات (Trend Check)
sma20 = df['c'].rolling(20).mean().iloc[-1]
if current_close < sma20 * 0.92: # إذا كان السعر تحت المتوسط بـ 8% (سوق هابط عنيف)
return {'is_safe': False, 'reason': '📉 Deep Bear Market (Risk Off)'}
# --- [ ALTCOIN PULSE CHECK ] ---
# بدلاً من إيقاف السوق بسبب ضعف فوليوم البيتكوين، نفحص هل هناك عملات تتحرك؟
avg_vol = df['v'].rolling(7).mean().iloc[-1]
curr_vol = df['v'].iloc[-1]
btc_is_dead = curr_vol < (avg_vol * 0.4) # البيتكوين ميت
if btc_is_dead:
# 🕵️ فحص النبض: هل هناك عملات تنفصل عن البيتكوين؟
print(" ⚠️ [Validator] BTC Volume Low.. Scanning Altcoin Pulse...")
tickers = await self.exchange.fetch_tickers()
green_coins = 0
pump_coins = 0
total_checked = 0
# نفحص العملات ذات الفوليوم العالي فقط
for sym, data in tickers.items():
if not sym.endswith('/USDT'): continue
vol = float(data.get('quoteVolume') or 0)
if vol < 1000000: continue # تجاهل العملات الصغيرة جداً
change = float(data.get('percentage') or 0)
total_checked += 1
if change > 1.0: green_coins += 1
if change > 5.0: pump_coins += 1
# المنطق: إذا وجدنا أكثر من 3 عملات تضخ بقوة، أو 40% من السوق أخضر -> السوق يعمل
if pump_coins >= 3 or (total_checked > 0 and (green_coins / total_checked) > 0.4):
return {'is_safe': True, 'reason': f'✅ Decoupled Alts Active ({pump_coins} Pumping)'}
else:
return {'is_safe': False, 'reason': '💤 Dead Market (BTC & Alts Flat)'}
return {'is_safe': True, 'reason': '✅ Market Healthy'}
except Exception as e:
print(f"⚠️ [Market Validator] Error: {e}")
return {'is_safe': True, 'reason': 'Error Bypass'}
# ==================================================================
# 🧠 Layer 1: Classification (Relaxed Funnel)
# ==================================================================
async def layer1_rapid_screening(self, limit=300, adaptive_hub_ref=None) -> List[Dict[str, Any]]:
self.adaptive_hub_ref = adaptive_hub_ref
print(f"🔍 [Layer 1] Screening Market (Smart Breadth)...")
# 0. فحص صحة السوق
market_health = await self.check_global_market_health()
if not market_health['is_safe']:
print(f"⛔ [Market Validator] Trading Halted: {market_health['reason']}")
return []
else:
print(f" 🌍 [Market Validator] Status: {market_health['reason']}")
# 1. فلتر السيولة الأساسي
initial_candidates = await self._stage0_universe_filter()
if not initial_candidates:
print("⚠️ [Layer 1] Stage 0 returned 0 candidates.")
return []
# 2. جلب البيانات الفنية (Batch Fetching)
# نأخذ أعلى 600 عملة من حيث الحجم لفحصها فنياً
top_candidates = initial_candidates[:600]
enriched_data = await self._fetch_technical_data_batch(top_candidates)
semi_final_list = []
# 3. التصنيف الفني
for item in enriched_data:
classification = self._classify_opportunity_type(item)
if classification['type'] != 'NONE':
# تشخيص الحالة (BULL, BEAR, RANGE)
regime_info = self._diagnose_asset_regime(item)
item['asset_regime'] = regime_info['regime']
item['asset_regime_conf'] = regime_info['conf']
item['strategy_type'] = classification['type']
item['l1_sort_score'] = classification['score']
item['strategy_tag'] = classification['type']
# إذا كان التشخيص العام "ميت" لكن العملة في حالة ضغط (Squeeze)، نمررها
if regime_info['regime'] == 'DEAD' and classification['type'] == 'MOMENTUM_LAUNCH':
if not classification.get('is_squeeze', False):
continue
semi_final_list.append(item)
# 4. فحص العمق وحقن الإعدادات
final_list = []
semi_final_list.sort(key=lambda x: x['l1_sort_score'], reverse=True)
candidates_for_depth = semi_final_list[:limit] # نأخذ العدد المطلوب للفحص العميق
if candidates_for_depth:
print(f" 🛡️ [Layer 1.5] Checking Depth for {len(candidates_for_depth)} candidates...")
for item in candidates_for_depth:
# أ. فحص العمق (Depth Check)
if item['strategy_type'] in ['ACCUMULATION_SQUEEZE', 'SAFE_BOTTOM']:
try:
atr_val = item.get('atr_value', 0.0)
curr_price = item.get('current_price', 0.0)
if atr_val > 0 and curr_price > 0:
range_2h = atr_val * 2.0
ob_score = await self._check_ob_pressure(item['symbol'], curr_price, range_2h)
if ob_score > 0.6:
item['l1_sort_score'] += 0.15
item['note'] = f"Strong Depth Support ({ob_score:.2f})"
elif ob_score < 0.4:
item['l1_sort_score'] -= 0.10
except Exception: pass
# ب. حقن الإعدادات من AdaptiveHub
if self.adaptive_hub_ref:
coin_type = item.get('strategy_type', 'SAFE_BOTTOM')
dynamic_config = self.adaptive_hub_ref.get_coin_type_config(coin_type)
item['dynamic_limits'] = dynamic_config
final_list.append(item)
# الترتيب النهائي
final_list.sort(key=lambda x: x['l1_sort_score'], reverse=True)
selection = final_list[:limit]
print(f"✅ [Layer 1] Passed {len(selection)} active candidates.")
return selection
# ==================================================================
# 🧱 Order Book Depth Scanner
# ==================================================================
async def _check_ob_pressure(self, symbol: str, current_price: float, price_range: float) -> float:
"""فحص ضغط الشراء مقابل البيع في عمق السوق"""
try:
ob = await self.exchange.fetch_order_book(symbol, limit=50)
bids = ob['bids']
asks = ob['asks']
min_price = current_price - price_range
max_price = current_price + price_range
support_vol = 0.0
resistance_vol = 0.0
for p, v in bids:
if p >= min_price: support_vol += v
else: break
for p, v in asks:
if p <= max_price: resistance_vol += v
else: break
if (support_vol + resistance_vol) == 0: return 0.5
return support_vol / (support_vol + resistance_vol)
except Exception:
return 0.5
# ==================================================================
# ⚖️ The Dual-Classifier Logic (RELAXED FUNNEL)
# ==================================================================
def _classify_opportunity_type(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""تصنيف العملة إلى نوع استراتيجية محدد"""
try:
df_1h = self._calc_indicators(data['ohlcv_1h_raw'])
curr = df_1h.iloc[-1]
data['atr_value'] = curr['atr']
except: return {'type': 'NONE', 'score': 0}
rsi = curr['rsi']
close = curr['close']
ema20 = curr['ema20']
ema50 = curr['ema50']
ema200 = curr['ema200'] if 'ema200' in curr else ema50
atr = curr['atr']
lower_bb = curr['lower_bb'] if 'lower_bb' in curr else (curr['ema20'] - (2*curr['atr']))
upper_bb = curr['upper_bb'] if 'upper_bb' in curr else (curr['ema20'] + (2*curr['atr']))
bb_width = (upper_bb - lower_bb) / curr['ema20'] if curr['ema20'] > 0 else 1.0
# 🔥 1. Dead Coin Filter (Relaxed to 0.3%)
volatility_pct = (atr / close) * 100 if close > 0 else 0
if volatility_pct < 0.3: return {'type': 'NONE', 'score': 0}
# 🛡️ TYPE 1: SAFE_BOTTOM (القيعان الآمنة)
# تشبع بيعي مع كسر للحد السفلي للبولنجر
if rsi < 55:
if close <= lower_bb * 1.08:
score = (60 - rsi) / 20.0
return {'type': 'SAFE_BOTTOM', 'score': min(score, 1.0)}
# 🔋 TYPE 2: ACCUMULATION_SQUEEZE (التجميع والضغط)
# RSI محايد، نطاق ضيق جداً (BB Width قليل)
elif 40 <= rsi <= 65:
if bb_width < 0.18:
score = 1.0 - (bb_width * 3.0)
return {'type': 'ACCUMULATION_SQUEEZE', 'score': max(score, 0.5), 'is_squeeze': True}
# 🚀 TYPE 3: MOMENTUM_LAUNCH (انطلاق الزخم)
# RSI قوي، السعر فوق المتوسطات، واقتراب من الحد العلوي
elif 50 < rsi < 85:
if close > ema50:
dist_to_upper = (upper_bb - close) / close
if dist_to_upper < 0.12: # قريب من الاختراق
score = rsi / 100.0
return {'type': 'MOMENTUM_LAUNCH', 'score': score}
# 🃏 Special Case: High Volatility Catch
if volatility_pct > 1.5:
return {'type': 'SAFE_BOTTOM', 'score': 0.4}
return {'type': 'NONE', 'score': 0}
# ==================================================================
# 🔍 Stage 0: Universe Filter
# ==================================================================
async def _stage0_universe_filter(self) -> List[Dict[str, Any]]:
"""جلب كل العملات وتصفيتها حسب الحجم"""
try:
MIN_VOLUME_THRESHOLD = 1000000.0 # 1 Million USDT
print(f" 🛡️ [Stage 0] Fetching Tickers (Min Vol: ${MIN_VOLUME_THRESHOLD:,.0f})...")
tickers = await self.exchange.fetch_tickers()
candidates = []
SOVEREIGN_COINS = ['BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'XRP/USDT']
reject_stats = {"volume": 0, "change": 0, "blacklist": 0}
for symbol, ticker in tickers.items():
if not symbol.endswith('/USDT'): continue
base_curr = symbol.split('/')[0]
if any(bad in base_curr for bad in self.BLACKLIST_TOKENS):
reject_stats["blacklist"] += 1
continue
base_vol = float(ticker.get('baseVolume') or 0.0)
last_price = float(ticker.get('last') or 0.0)
calc_quote_vol = base_vol * last_price
is_sovereign = symbol in SOVEREIGN_COINS
if not is_sovereign:
if calc_quote_vol < MIN_VOLUME_THRESHOLD:
reject_stats["volume"] += 1
continue
change_pct = ticker.get('percentage')
if change_pct is None: change_pct = 0.0
# استبعاد العملات التي تحركت بشكل جنوني (>35%) لتجنب القمم
if abs(change_pct) > 35.0:
reject_stats["change"] += 1
continue
candidates.append({
'symbol': symbol,
'quote_volume': calc_quote_vol,
'current_price': last_price,
'change_24h': change_pct
})
candidates.sort(key=lambda x: x['quote_volume'], reverse=True)
print(f" ℹ️ [Stage 0] Ignored {reject_stats['volume']} low-vol coins.")
return candidates
except Exception as e:
print(f"❌ [L1 Error] Universe filter failed: {e}")
traceback.print_exc()
return []
# ------------------------------------------------------------------
# 🧭 The Diagnoser (Market Regime Detection)
# ------------------------------------------------------------------
def _diagnose_asset_regime(self, item: Dict[str, Any]) -> Dict[str, Any]:
"""تشخيص حالة العملة الفردية (BULL/BEAR/RANGE/DEAD)"""
try:
if 'df_1h' not in item:
if 'ohlcv_1h_raw' in item:
item['df_1h'] = self._calc_indicators(item['ohlcv_1h_raw'])
else:
return {'regime': 'RANGE', 'conf': 0.0}
df = item['df_1h']
if df.empty: return {'regime': 'RANGE', 'conf': 0.0}
curr = df.iloc[-1]
price = curr['close']
ema20 = curr['ema20']
ema50 = curr['ema50']
rsi = curr['rsi']
atr = curr['atr']
atr_pct = (atr / price) * 100 if price > 0 else 0
regime = "RANGE"
conf = 0.5
if atr_pct < 0.4: return {'regime': 'DEAD', 'conf': 0.9}
if price > ema20 and ema20 > ema50 and rsi > 50:
regime = "BULL"
conf = 0.8 if rsi > 55 else 0.6
elif price < ema20 and ema20 < ema50 and rsi < 50:
regime = "BEAR"
conf = 0.8 if rsi < 45 else 0.6
return {'regime': regime, 'conf': conf}
except Exception: return {'regime': 'RANGE', 'conf': 0.0}
# ------------------------------------------------------------------
# Helpers & Indicators
# ------------------------------------------------------------------
async def _fetch_technical_data_batch(self, candidates):
"""جلب البيانات الفنية (1h, 15m) على دفعات"""
chunk_size = 10; results = []
for i in range(0, len(candidates), chunk_size):
chunk = candidates[i:i+chunk_size]
tasks = [self._fetch_single(c) for c in chunk]
res = await asyncio.gather(*tasks)
results.extend([r for r in res if r])
await asyncio.sleep(0.05) # Rate Limit Protection
return results
async def _fetch_single(self, c):
try:
h1 = await self.exchange.fetch_ohlcv(c['symbol'], '1h', limit=210)
m15 = await self.exchange.fetch_ohlcv(c['symbol'], '15m', limit=60)
if not h1 or not m15: return None
c['ohlcv'] = {'1h': h1, '15m': m15}
c['ohlcv_1h_raw'] = h1
c['ohlcv_15m_raw'] = m15
c['df_1h'] = self._calc_indicators(h1)
return c
except: return None
def _calc_indicators(self, ohlcv):
"""حساب المؤشرات يدوياً باستخدام Pandas"""
df = pd.DataFrame(ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
# RSI
delta = df['c'].diff()
gain = (delta.where(delta>0, 0)).rolling(14).mean()
loss = (-delta.where(delta<0, 0)).rolling(14).mean()
rs = gain/loss
df['rsi'] = 100 - (100/(1+rs))
# EMAs
df['ema20'] = df['c'].ewm(span=20).mean()
df['ema50'] = df['c'].ewm(span=50).mean()
df['ema200'] = df['c'].ewm(span=200).mean()
# ATR
tr = np.maximum(df['h']-df['l'], np.maximum(abs(df['h']-df['c'].shift()), abs(df['l']-df['c'].shift())))
df['atr'] = tr.rolling(14).mean()
# Bollinger Bands
std = df['c'].rolling(20).std()
df['upper_bb'] = df['ema20'] + (2 * std)
df['lower_bb'] = df['ema20'] - (2 * std)
df.rename(columns={'o':'open', 'h':'high', 'l':'low', 'c':'close', 'v':'volume'}, inplace=True)
return df.fillna(0)
async def get_latest_price_async(self, symbol):
try: return float((await self.exchange.fetch_ticker(symbol))['last'])
except: return 0.0
async def get_latest_ohlcv(self, symbol, timeframe='5m', limit=100):
try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
except: return []
async def get_order_book_snapshot(self, symbol, limit=20):
try: return await self.exchange.fetch_order_book(symbol, limit)
except: return {}