File size: 13,318 Bytes
106c666
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
import os
import asyncio
import httpx
import json
from datetime import datetime
import ccxt.pro as ccxt
from ccxt.base.errors import RateLimitExceeded, DDoSProtection
import pandas as pd
import pandas_ta as ta
import time

# Placeholder for future on-chain/sentiment data fetching
def get_on_chain_data(symbol):
    print(f"🚧 Placeholder: Fetching on-chain data for {symbol}...")
    return {}

def get_sentiment_data(symbol):
    print(f"🚧 Placeholder: Fetching sentiment data for {symbol}...")
    return {}

class DataManager:
    """Manages all data fetching and processing."""
    def __init__(self, contracts_db):
        self.contracts_db = contracts_db
        self.exchange = ccxt.kucoin()
        self.exchange.rateLimit = 1000  # Adjusted rateLimit to 1 second
        self._whale_data_cache = {}
        self.http_client = None

    async def initialize(self):
        """Initialize async HTTP client."""
        self.http_client = httpx.AsyncClient(timeout=30.0)

    async def close(self):
        """Close connections properly."""
        if self.http_client:
            await self.http_client.aclose()
        await self.exchange.close()

    async def get_market_context_async(self):
        """Fetches and compiles data on market health from various sources."""
        try:
            if not self.http_client:
                await self.initialize()
            top_coins_response = await self.http_client.get(
                'https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd'
            )
            top_coins_data = top_coins_response.json()
            fng_response = await self.http_client.get('https://api.alternative.me/fng/')
            fng_data = fng_response.json()
            market_context = {
                'timestamp': datetime.now().isoformat(),
                'bitcoin_price_usd': top_coins_data.get('bitcoin', {}).get('usd'),
                'ethereum_price_usd': top_coins_data.get('ethereum', {}).get('usd'),
                'fear_and_greed_index': fng_data['data'][0]['value'] if fng_data.get('data') else None
            }
            fng_value = int(market_context['fear_and_greed_index']) if market_context['fear_and_greed_index'] else 50
            if fng_value >= 70:
                market_context['sentiment'] = 'EXTREME_GREED'
            elif fng_value >= 50:
                market_context['sentiment'] = 'GREED'
            elif fng_value > 30:
                market_context['sentiment'] = 'NEUTRAL'
            elif fng_value >= 15:
                market_context['sentiment'] = 'FEAR'
            else:
                market_context['sentiment'] = 'EXTREME_FEAR'
            btc_price = market_context.get('bitcoin_price_usd', 0)
            if btc_price > 60000:
                market_context['btc_sentiment'] = 'BULLISH'
            elif btc_price < 55000:
                market_context['btc_sentiment'] = 'BEARISH'
            else:
                market_context['btc_sentiment'] = 'NEUTRAL'
            print("πŸ“Š Market context fetched successfully.")
            return market_context
        except httpx.TimeoutException:
            print("⏱️ Timeout fetching market context")
            return None
        except Exception as e:
            print(f"❌ Failed to get market context: {e}")
            return None

    async def get_top_symbols(self, n):
        """Fetches the top N symbols by volume."""
        try:
            async with asyncio.timeout(60):
                markets = await self.exchange.fetch_tickers()
                usdt_markets = {symbol: data for symbol, data in markets.items() if '/USDT' in symbol}
                sorted_by_volume = sorted(usdt_markets.items(), key=lambda x: x[1]['quoteVolume'], reverse=True)
                top_symbols = [symbol for symbol, data in sorted_by_volume[:n]]
                print(f"βœ”οΈ Found {len(top_symbols)} top symbols from KuCoin.")
                return top_symbols
        except asyncio.TimeoutError:
            print("⏱️ Timeout fetching top symbols")
            return []
        except (RateLimitExceeded, DDoSProtection) as e:
            print(f"❌ Rate limit exceeded while fetching symbols: {e}")
            await asyncio.sleep(60)
            return []
        except Exception as e:
            print(f"❌ Failed to fetch top symbols: {e}")
            return []
    
    async def fetch_ohlcv_with_retry(self, symbol, timeframe, limit, retries=3):
        """Fetch OHLCV data with timeout and retry logic."""
        for i in range(retries):
            try:
                async with asyncio.timeout(30):
                    return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=500)
            except asyncio.TimeoutError:
                print(f"⏱️ Timeout for {symbol} on {timeframe}. Retry {i+1}/{retries}")
                if i < retries - 1:
                    await asyncio.sleep(5)
            except (RateLimitExceeded, DDoSProtection) as e:
                print(f"⚠️ Rate limit for {symbol} on {timeframe}. Waiting 90s...")
                await asyncio.sleep(90)
                if i < retries - 1:
                    continue
            except Exception as e:
                print(f"❌ Failed to fetch {symbol} on {timeframe}: {e}")
                return None
        return None

    async def get_fast_pass_data_async(self, symbols):
        """Fetches OHLCV data with concurrent limiting and progress tracking."""
        timeframes = ['1h', '4h', '1d', '1w']
        data = []
        total = len(symbols)
        completed = 0
        success = 0
        failed = 0
        semaphore = asyncio.Semaphore(4) # Increased concurrency to 4
        async def fetch_symbol_data(symbol, index):
            nonlocal completed, success, failed
            async with semaphore:
                try:
                    print(f"⏳ [{index+1}/{total}] Fetching {symbol}...")
                    ohlcv_data = {}
                    timeframes_fetched = 0
                    for timeframe in timeframes:
                        candles = await self.fetch_ohlcv_with_retry(symbol, timeframe, limit=500)
                        if candles:
                            ohlcv_data[timeframe] = candles
                            timeframes_fetched += 1
                        else:
                            ohlcv_data[timeframe] = []
                        # Add a short delay to mitigate rate limits
                        await asyncio.sleep(0.25)
                    completed += 1
                    if any(ohlcv_data.values()):
                        success += 1
                        print(f"βœ… [{index+1}/{total}] {symbol} - {timeframes_fetched}/4 timeframes | Progress: {completed}/{total} ({int(completed/total*100)}%)")
                        return {
                            'symbol': symbol,
                            'ohlcv': ohlcv_data,
                        }
                    else:
                        failed += 1
                        print(f"⚠️ [{index+1}/{total}] {symbol} - No data | Progress: {completed}/{total} ({int(completed/total*100)}%)")
                        return None
                except Exception as e:
                    completed += 1
                    failed += 1
                    print(f"❌ [{index+1}/{total}] {symbol} - Error: {str(e)[:50]} | Progress: {completed}/{total} ({int(completed/total*100)}%)")
                    return None
        print(f"\n{'='*70}")
        print(f"πŸ“Š Starting data fetch for {total} symbols")
        print(f"{'='*70}\n")
        tasks = [fetch_symbol_data(symbol, i) for i, symbol in enumerate(symbols)]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for result in results:
            if isinstance(result, Exception):
                print(f"❌ Exception in fetch task: {result}")
            elif result is not None:
                data.append(result)
        print(f"\n{'='*70}")
        print(f"βœ… Data fetching complete!")
        print(f"   Total: {total} | Success: {success} | Failed: {failed}")
        print(f"   Success Rate: {int(success/total*100)}%")
        print(f"{'='*70}\n")
        return data

    async def fetch_and_update_contracts_db_async(self):
        """Fetches the contracts DB from R2 or initializes an empty one."""
        print("πŸ’Ύ Using local contracts database.")
        return {}

class FeatureProcessor:
    """Processes raw data into tradable features and scores opportunities."""
    def __init__(self, market_context):
        self.market_context = market_context

    def _calculate_indicators(self, df):
        """Calculates key technical indicators for a given DataFrame."""
        if len(df) < 20:
            return None, None
        df['rsi'] = ta.rsi(df['close'], length=14)
        macd = ta.macd(df['close'])
        df['macd'] = macd['MACDh_12_26_9']
        df['macd_signal'] = macd['MACD_12_26_9']
        return df['rsi'].iloc[-1], df['macd'].iloc[-1], df['macd_signal'].iloc[-1]

    def _calculate_liquidity_score(self, df_1h):
        """Calculates a simple liquidity score based on volume and price."""
        if df_1h.empty:
            return 0
        df_1h['dollar_volume'] = df_1h['volume'] * df_1h['close']
        avg_dollar_volume = df_1h['dollar_volume'].mean()
        return avg_dollar_volume

    def _calculate_fib_levels(self, df_1d):
        """Calculates Fibonacci retracement levels based on a price swing."""
        if len(df_1d) < 50:
            return {}
        
        # Find the highest high and lowest low in the last 50 days
        recent_high = df_1d['high'].iloc[-50:].max()
        recent_low = df_1d['low'].iloc[-50:].min()
        
        diff = recent_high - recent_low
        
        fib_levels = {
            "0.0%": recent_high,
            "23.6%": recent_high - 0.236 * diff,
            "38.2%": recent_high - 0.382 * diff,
            "50.0%": recent_high - 0.50 * diff,
            "61.8%": recent_high - 0.618 * diff,
            "78.6%": recent_high - 0.786 * diff,
            "100.0%": recent_low
        }
        
        return fib_levels

    def process_and_score_symbol(self, raw_data):
        """
        Processes raw market data, calculates features, and assigns a final score.
        """
        symbol = raw_data['symbol']
        ohlcv = raw_data['ohlcv']
        
        if not ohlcv.get('1d') or not ohlcv.get('1h'):
            return None
        
        try:
            df_1h = pd.DataFrame(ohlcv['1h'], columns=['time', 'open', 'high', 'low', 'close', 'volume'])
            df_1d = pd.DataFrame(ohlcv['1d'], columns=['time', 'open', 'high', 'low', 'close', 'volume'])
            
            df_1h[['open', 'high', 'low', 'close', 'volume']] = df_1h[['open', 'high', 'low', 'close', 'volume']].astype(float)
            df_1d[['open', 'high', 'low', 'close', 'volume']] = df_1d[['open', 'high', 'low', 'close', 'volume']].astype(float)

            rsi, macd, macd_signal = self._calculate_indicators(df_1h)
            if rsi is None:
                return None
            
            liquidity_score = self._calculate_liquidity_score(df_1h)
            avg_daily_volume = df_1d['volume'].mean()
            fib_levels = self._calculate_fib_levels(df_1d)

            # --- Scoring Logic ---
            score = 0.5 # Base score
            if rsi < 50:
                score += 0.15
            if macd > 0 and macd_signal > 0 and macd > macd_signal:
                score += 0.25
            
            # Simple bonus for bullish momentum
            if df_1h['close'].iloc[-1] > df_1h['close'].iloc[-2]:
                score += 0.1
            
            # Additional scores based on conditions
            if rsi > 70 or rsi < 30:
                score += 0.2
            if liquidity_score > 100000:
                score += 0.2
            if avg_daily_volume > 500000:
                score += 0.2
            
            if self.market_context:
                if self.market_context['btc_sentiment'] == 'BULLISH':
                    score *= 1.1
                elif self.market_context['btc_sentiment'] == 'BEARISH':
                    score *= 0.9

            return {
                'symbol': symbol,
                'current_price': df_1h['close'].iloc[-1],
                'features': {
                    'rsi': float(rsi),
                    'macd': float(macd),
                    'macd_signal': float(macd_signal),
                    'liquidity_score': float(liquidity_score),
                    'avg_daily_volume': float(avg_daily_volume)
                },
                'fibonacci_levels': fib_levels,
                'final_score': score
            }
        except KeyError as e:
            print(f"⚠️ Missing key in data for {raw_data.get('symbol', 'unknown')}: {e}")
            return None
        except Exception as e:
            print(f"❌ Failed to process {raw_data.get('symbol', 'unknown')}: {e}")
            return None

    def filter_top_candidates(self, candidates, n=10):
        """Filters the top N candidates based on their final score."""
        valid_candidates = [c for c in candidates if c is not None]
        return sorted(valid_candidates, key=lambda x: x['final_score'], reverse=True)[:n]