Tradi / ML.py
Riy777's picture
Upload 4 files
106c666 verified
import os
import asyncio
import httpx
import json
from datetime import datetime
import ccxt.pro as ccxt
from ccxt.base.errors import RateLimitExceeded, DDoSProtection
import pandas as pd
import pandas_ta as ta
import time
# Placeholder for future on-chain/sentiment data fetching
def get_on_chain_data(symbol):
print(f"🚧 Placeholder: Fetching on-chain data for {symbol}...")
return {}
def get_sentiment_data(symbol):
print(f"🚧 Placeholder: Fetching sentiment data for {symbol}...")
return {}
class DataManager:
"""Manages all data fetching and processing."""
def __init__(self, contracts_db):
self.contracts_db = contracts_db
self.exchange = ccxt.kucoin()
self.exchange.rateLimit = 1000 # Adjusted rateLimit to 1 second
self._whale_data_cache = {}
self.http_client = None
async def initialize(self):
"""Initialize async HTTP client."""
self.http_client = httpx.AsyncClient(timeout=30.0)
async def close(self):
"""Close connections properly."""
if self.http_client:
await self.http_client.aclose()
await self.exchange.close()
async def get_market_context_async(self):
"""Fetches and compiles data on market health from various sources."""
try:
if not self.http_client:
await self.initialize()
top_coins_response = await self.http_client.get(
'https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd'
)
top_coins_data = top_coins_response.json()
fng_response = await self.http_client.get('https://api.alternative.me/fng/')
fng_data = fng_response.json()
market_context = {
'timestamp': datetime.now().isoformat(),
'bitcoin_price_usd': top_coins_data.get('bitcoin', {}).get('usd'),
'ethereum_price_usd': top_coins_data.get('ethereum', {}).get('usd'),
'fear_and_greed_index': fng_data['data'][0]['value'] if fng_data.get('data') else None
}
fng_value = int(market_context['fear_and_greed_index']) if market_context['fear_and_greed_index'] else 50
if fng_value >= 70:
market_context['sentiment'] = 'EXTREME_GREED'
elif fng_value >= 50:
market_context['sentiment'] = 'GREED'
elif fng_value > 30:
market_context['sentiment'] = 'NEUTRAL'
elif fng_value >= 15:
market_context['sentiment'] = 'FEAR'
else:
market_context['sentiment'] = 'EXTREME_FEAR'
btc_price = market_context.get('bitcoin_price_usd', 0)
if btc_price > 60000:
market_context['btc_sentiment'] = 'BULLISH'
elif btc_price < 55000:
market_context['btc_sentiment'] = 'BEARISH'
else:
market_context['btc_sentiment'] = 'NEUTRAL'
print("📊 Market context fetched successfully.")
return market_context
except httpx.TimeoutException:
print("⏱️ Timeout fetching market context")
return None
except Exception as e:
print(f"❌ Failed to get market context: {e}")
return None
async def get_top_symbols(self, n):
"""Fetches the top N symbols by volume."""
try:
async with asyncio.timeout(60):
markets = await self.exchange.fetch_tickers()
usdt_markets = {symbol: data for symbol, data in markets.items() if '/USDT' in symbol}
sorted_by_volume = sorted(usdt_markets.items(), key=lambda x: x[1]['quoteVolume'], reverse=True)
top_symbols = [symbol for symbol, data in sorted_by_volume[:n]]
print(f"✔️ Found {len(top_symbols)} top symbols from KuCoin.")
return top_symbols
except asyncio.TimeoutError:
print("⏱️ Timeout fetching top symbols")
return []
except (RateLimitExceeded, DDoSProtection) as e:
print(f"❌ Rate limit exceeded while fetching symbols: {e}")
await asyncio.sleep(60)
return []
except Exception as e:
print(f"❌ Failed to fetch top symbols: {e}")
return []
async def fetch_ohlcv_with_retry(self, symbol, timeframe, limit, retries=3):
"""Fetch OHLCV data with timeout and retry logic."""
for i in range(retries):
try:
async with asyncio.timeout(30):
return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=500)
except asyncio.TimeoutError:
print(f"⏱️ Timeout for {symbol} on {timeframe}. Retry {i+1}/{retries}")
if i < retries - 1:
await asyncio.sleep(5)
except (RateLimitExceeded, DDoSProtection) as e:
print(f"⚠️ Rate limit for {symbol} on {timeframe}. Waiting 90s...")
await asyncio.sleep(90)
if i < retries - 1:
continue
except Exception as e:
print(f"❌ Failed to fetch {symbol} on {timeframe}: {e}")
return None
return None
async def get_fast_pass_data_async(self, symbols):
"""Fetches OHLCV data with concurrent limiting and progress tracking."""
timeframes = ['1h', '4h', '1d', '1w']
data = []
total = len(symbols)
completed = 0
success = 0
failed = 0
semaphore = asyncio.Semaphore(4) # Increased concurrency to 4
async def fetch_symbol_data(symbol, index):
nonlocal completed, success, failed
async with semaphore:
try:
print(f"⏳ [{index+1}/{total}] Fetching {symbol}...")
ohlcv_data = {}
timeframes_fetched = 0
for timeframe in timeframes:
candles = await self.fetch_ohlcv_with_retry(symbol, timeframe, limit=500)
if candles:
ohlcv_data[timeframe] = candles
timeframes_fetched += 1
else:
ohlcv_data[timeframe] = []
# Add a short delay to mitigate rate limits
await asyncio.sleep(0.25)
completed += 1
if any(ohlcv_data.values()):
success += 1
print(f"✅ [{index+1}/{total}] {symbol} - {timeframes_fetched}/4 timeframes | Progress: {completed}/{total} ({int(completed/total*100)}%)")
return {
'symbol': symbol,
'ohlcv': ohlcv_data,
}
else:
failed += 1
print(f"⚠️ [{index+1}/{total}] {symbol} - No data | Progress: {completed}/{total} ({int(completed/total*100)}%)")
return None
except Exception as e:
completed += 1
failed += 1
print(f"❌ [{index+1}/{total}] {symbol} - Error: {str(e)[:50]} | Progress: {completed}/{total} ({int(completed/total*100)}%)")
return None
print(f"\n{'='*70}")
print(f"📊 Starting data fetch for {total} symbols")
print(f"{'='*70}\n")
tasks = [fetch_symbol_data(symbol, i) for i, symbol in enumerate(symbols)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"❌ Exception in fetch task: {result}")
elif result is not None:
data.append(result)
print(f"\n{'='*70}")
print(f"✅ Data fetching complete!")
print(f" Total: {total} | Success: {success} | Failed: {failed}")
print(f" Success Rate: {int(success/total*100)}%")
print(f"{'='*70}\n")
return data
async def fetch_and_update_contracts_db_async(self):
"""Fetches the contracts DB from R2 or initializes an empty one."""
print("💾 Using local contracts database.")
return {}
class FeatureProcessor:
"""Processes raw data into tradable features and scores opportunities."""
def __init__(self, market_context):
self.market_context = market_context
def _calculate_indicators(self, df):
"""Calculates key technical indicators for a given DataFrame."""
if len(df) < 20:
return None, None
df['rsi'] = ta.rsi(df['close'], length=14)
macd = ta.macd(df['close'])
df['macd'] = macd['MACDh_12_26_9']
df['macd_signal'] = macd['MACD_12_26_9']
return df['rsi'].iloc[-1], df['macd'].iloc[-1], df['macd_signal'].iloc[-1]
def _calculate_liquidity_score(self, df_1h):
"""Calculates a simple liquidity score based on volume and price."""
if df_1h.empty:
return 0
df_1h['dollar_volume'] = df_1h['volume'] * df_1h['close']
avg_dollar_volume = df_1h['dollar_volume'].mean()
return avg_dollar_volume
def _calculate_fib_levels(self, df_1d):
"""Calculates Fibonacci retracement levels based on a price swing."""
if len(df_1d) < 50:
return {}
# Find the highest high and lowest low in the last 50 days
recent_high = df_1d['high'].iloc[-50:].max()
recent_low = df_1d['low'].iloc[-50:].min()
diff = recent_high - recent_low
fib_levels = {
"0.0%": recent_high,
"23.6%": recent_high - 0.236 * diff,
"38.2%": recent_high - 0.382 * diff,
"50.0%": recent_high - 0.50 * diff,
"61.8%": recent_high - 0.618 * diff,
"78.6%": recent_high - 0.786 * diff,
"100.0%": recent_low
}
return fib_levels
def process_and_score_symbol(self, raw_data):
"""
Processes raw market data, calculates features, and assigns a final score.
"""
symbol = raw_data['symbol']
ohlcv = raw_data['ohlcv']
if not ohlcv.get('1d') or not ohlcv.get('1h'):
return None
try:
df_1h = pd.DataFrame(ohlcv['1h'], columns=['time', 'open', 'high', 'low', 'close', 'volume'])
df_1d = pd.DataFrame(ohlcv['1d'], columns=['time', 'open', 'high', 'low', 'close', 'volume'])
df_1h[['open', 'high', 'low', 'close', 'volume']] = df_1h[['open', 'high', 'low', 'close', 'volume']].astype(float)
df_1d[['open', 'high', 'low', 'close', 'volume']] = df_1d[['open', 'high', 'low', 'close', 'volume']].astype(float)
rsi, macd, macd_signal = self._calculate_indicators(df_1h)
if rsi is None:
return None
liquidity_score = self._calculate_liquidity_score(df_1h)
avg_daily_volume = df_1d['volume'].mean()
fib_levels = self._calculate_fib_levels(df_1d)
# --- Scoring Logic ---
score = 0.5 # Base score
if rsi < 50:
score += 0.15
if macd > 0 and macd_signal > 0 and macd > macd_signal:
score += 0.25
# Simple bonus for bullish momentum
if df_1h['close'].iloc[-1] > df_1h['close'].iloc[-2]:
score += 0.1
# Additional scores based on conditions
if rsi > 70 or rsi < 30:
score += 0.2
if liquidity_score > 100000:
score += 0.2
if avg_daily_volume > 500000:
score += 0.2
if self.market_context:
if self.market_context['btc_sentiment'] == 'BULLISH':
score *= 1.1
elif self.market_context['btc_sentiment'] == 'BEARISH':
score *= 0.9
return {
'symbol': symbol,
'current_price': df_1h['close'].iloc[-1],
'features': {
'rsi': float(rsi),
'macd': float(macd),
'macd_signal': float(macd_signal),
'liquidity_score': float(liquidity_score),
'avg_daily_volume': float(avg_daily_volume)
},
'fibonacci_levels': fib_levels,
'final_score': score
}
except KeyError as e:
print(f"⚠️ Missing key in data for {raw_data.get('symbol', 'unknown')}: {e}")
return None
except Exception as e:
print(f"❌ Failed to process {raw_data.get('symbol', 'unknown')}: {e}")
return None
def filter_top_candidates(self, candidates, n=10):
"""Filters the top N candidates based on their final score."""
valid_candidates = [c for c in candidates if c is not None]
return sorted(valid_candidates, key=lambda x: x['final_score'], reverse=True)[:n]