Riy777's picture
Update whale_monitor/core.py
2f19d31 verified
# whale_monitor/core.py
# (V3.3 - GEM-Architect: Enterprise Edition - Smart Web3 Limits)
# ูŠุชุถู…ู†: Web3 Engine, Smart Retry for Limits, Solscan Fixes, & Full Logic.
import os
import asyncio
import httpx
import json
import traceback
import time
from datetime import datetime, timedelta, timezone
from collections import defaultdict, deque
import logging
import ssl
from typing import List, Dict, Any
# Web3 Integration
from web3 import AsyncWeb3
# Local Imports
from .config import (
DEFAULT_WHALE_THRESHOLD_USD, TRANSFER_EVENT_SIGNATURE, NATIVE_COINS,
DEFAULT_EXCHANGE_ADDRESSES, COINGECKO_BASE_URL, COINGECKO_SYMBOL_MAPPING,
ERC20_ABI
)
from .rpc_manager import AdaptiveRpcManager
# ุชุนุทูŠู„ ุชุณุฌูŠู„ HTTP ุงู„ู…ุฒุนุฌ
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("web3").setLevel(logging.WARNING)
class EnhancedWhaleMonitor:
def __init__(self, contracts_db=None, r2_service=None):
print("๐Ÿ”„ [WhaleMonitor V3.3] Initializing Enterprise Engine...")
self.r2_service = r2_service
self.data_manager = None
self.rpc_manager: AdaptiveRpcManager = None
self.whale_threshold_usd = DEFAULT_WHALE_THRESHOLD_USD
self.contracts_db = {}
self._initialize_contracts_db(contracts_db or {})
self.address_labels = {}
self.address_categories = {'exchange': set(), 'cex': set(), 'dex': set(), 'bridge': set(), 'whale': set(), 'unknown': set()}
self._initialize_comprehensive_exchange_addresses()
self.token_price_cache = {}
self.token_decimals_cache = {}
if self.r2_service:
asyncio.create_task(self._load_contracts_from_r2())
print("โœ… [WhaleMonitor V3.3] System Ready.")
def set_rpc_manager(self, rpc_manager: AdaptiveRpcManager):
self.rpc_manager = rpc_manager
print("โœ… [WhaleMonitor] RPC Manager Linked.")
# ==============================================================================
# ๐Ÿ“š Initialization & Database Management
# ==============================================================================
def _initialize_contracts_db(self, initial_contracts):
for symbol, contract_data in initial_contracts.items():
symbol_lower = symbol.lower()
if isinstance(contract_data, dict) and 'address' in contract_data and 'network' in contract_data:
self.contracts_db[symbol_lower] = contract_data
elif isinstance(contract_data, str):
self.contracts_db[symbol_lower] = {
'address': contract_data,
'network': self._detect_network_from_address(contract_data)
}
def _detect_network_from_address(self, address):
if not isinstance(address, str): return 'ethereum'
address_lower = address.lower()
if address_lower.startswith('0x') and len(address_lower) == 42:
return 'ethereum'
if len(address) > 30 and not address.startswith('0x'):
return 'solana'
return 'ethereum'
def _initialize_comprehensive_exchange_addresses(self):
for category, addresses in DEFAULT_EXCHANGE_ADDRESSES.items():
for address in addresses:
if not isinstance(address, str): continue
addr_lower = address.lower()
self.address_labels[addr_lower] = category
self.address_categories['exchange'].add(addr_lower)
if category in ['uniswap', 'pancakeswap', 'sushiswap']:
self.address_categories['dex'].add(addr_lower)
elif 'wormhole' in category or 'bridge' in category:
self.address_categories['bridge'].add(addr_lower)
else:
self.address_categories['cex'].add(addr_lower)
async def _load_contracts_from_r2(self):
if not self.r2_service: return
try:
key = "contracts.json"
if hasattr(self.r2_service, 's3_client'):
response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key)
contracts_data = json.loads(response['Body'].read())
for s, d in contracts_data.items():
if s.lower() not in self.contracts_db:
self.contracts_db[s.lower()] = d
except Exception: pass
# ==============================================================================
# ๐Ÿง  Core Analysis Logic
# ==============================================================================
async def get_symbol_whale_activity(self, symbol: str, known_price: float = 0.0) -> Dict[str, Any]:
try:
if not self.rpc_manager:
return self._create_error_response(symbol, "RPC Manager Not Injected")
self.rpc_manager.reset_session_stats()
base_symbol = symbol.split("/")[0].upper() if '/' in symbol else symbol.upper()
if base_symbol in NATIVE_COINS:
return self._create_native_coin_response(symbol)
contract_info = await self._find_contract_address_enhanced(symbol)
if not contract_info or not contract_info.get('address'):
return self._create_no_contract_response(symbol)
contract_address = contract_info['address']
network = contract_info['network']
current_price = known_price
if current_price <= 0:
current_price = await self._get_token_price(symbol)
if current_price <= 0:
return self._create_error_response(symbol, "Price unavailable")
decimals = await self._get_token_decimals(contract_address, network)
if decimals is None:
return self._create_error_response(symbol, f"Decimals missing on {network}")
# [CONFIG] Scan window
scan_hours = 4
all_transfers = await self._get_targeted_transfer_data(
contract_address, network, hours=scan_hours, price=current_price, decimals=decimals
)
if not all_transfers:
return self._create_no_transfers_response(symbol)
print(f"๐Ÿ“Š [WhaleMonitor] Analyzed {symbol}: {len(all_transfers)} transfers found ({scan_hours}h).")
analysis_windows = [
{'name': '1h', 'minutes': 60},
{'name': '4h', 'minutes': 240},
{'name': '24h', 'minutes': 1440}
]
multi_window_analysis = {}
current_time_utc = datetime.now(timezone.utc)
timestamp_cutoff_base = current_time_utc.timestamp()
for window in analysis_windows:
window_name = window['name']
cutoff_ts = timestamp_cutoff_base - (window['minutes'] * 60)
window_transfers = [
t for t in all_transfers
if float(t.get('timeStamp', 0)) >= cutoff_ts
]
analysis_result = self._analyze_transfer_list(
symbol=symbol,
transfers=window_transfers,
daily_volume_usd=0
)
multi_window_analysis[window_name] = analysis_result
if self.r2_service:
asyncio.create_task(self._save_learning_record(
symbol, current_price, multi_window_analysis, self.rpc_manager.get_session_stats()
))
short_term = multi_window_analysis.get('1h', {})
long_term = multi_window_analysis.get('4h', {})
signal = self._generate_enhanced_trading_signal(short_term)
llm_summary = self._create_enhanced_llm_summary(signal, short_term)
return {
'symbol': symbol,
'data_available': True,
'analysis_timestamp': current_time_utc.isoformat(),
'summary': {
'total_transfers_scanned': len(all_transfers),
'whale_count_1h': short_term.get('whale_transfers_count', 0),
'net_flow_1h': short_term.get('net_flow_usd', 0)
},
'exchange_flows': short_term,
'accumulation_analysis_24h': long_term,
'trading_signal': signal,
'llm_friendly_summary': llm_summary
}
except Exception as e:
# print(f"โŒ [WhaleMonitor] Critical Error {symbol}: {e}")
traceback.print_exc()
return self._create_error_response(symbol, str(e))
# ==============================================================================
# ๐Ÿ•ต๏ธโ€โ™‚๏ธ Data Fetching & Web3 Logic (Smart Limits)
# ==============================================================================
async def _get_targeted_transfer_data(self, contract_address: str, network: str, hours: int, price: float, decimals: int) -> List[Dict]:
all_transfers = []
# A. Solana Logic
if network == 'solana':
try:
transfers = await self._get_solscan_token_data(contract_address, hours, price)
if transfers: return transfers
except Exception as e:
print(f" โš ๏ธ [Solana] Solscan failed: {e}")
return []
# B. EVM Logic
# 1. Web3 Direct (with Smart Retry)
try:
print(f" โšก [Web3] Scanning {network} logs...")
web3_transfers = await self._get_web3_transfers(contract_address, network, hours, price, decimals)
if web3_transfers:
print(f" โœ… [Web3] Found {len(web3_transfers)} transfers.")
return web3_transfers
except Exception as e:
print(f" โš ๏ธ [Web3] Failed: {e}. Trying fallbacks...")
# 2. Moralis Fallback
try:
chain_id = DEFAULT_NETWORK_CONFIGS.get(network, {}).get('moralis_chain_id')
if chain_id:
print(f" ๐Ÿ›ก๏ธ [Moralis] Fallback scan...")
moralis_transfers = await self._get_moralis_token_data(contract_address, chain_id, hours, price, decimals)
if moralis_transfers: return moralis_transfers
except Exception: pass
# 3. Scanners Fallback
try:
print(f" ๐Ÿ” [Scanner] Fallback scan...")
scanner_transfers = await self._get_scanner_token_data(contract_address, network, hours, price, decimals)
if scanner_transfers: return scanner_transfers
except Exception: pass
return []
async def _get_web3_transfers(self, address: str, network: str, hours: int, price: float, decimals: int):
"""Web3 Fetcher with Limit Exceeded Handling"""
w3 = self.rpc_manager.get_web3(network)
if not w3: raise Exception(f"No Web3 provider for {network}")
latest = await w3.eth.block_number
block_time = 3 if network == 'bsc' else 12 if network == 'ethereum' else 2
# ู…ุญุงูˆู„ุฉ ุฃูˆู„ูŠุฉ ู„ูƒุงู…ู„ ุงู„ู…ุฏุฉ
blocks_back = int((hours * 3600) / block_time)
from_block = max(0, latest - blocks_back)
contract = w3.eth.contract(address=w3.to_checksum_address(address), abi=ERC20_ABI)
try:
# ุงู„ู…ุญุงูˆู„ุฉ ุงู„ุฃูˆู„ู‰
logs = await w3.eth.get_logs({
'fromBlock': from_block, 'toBlock': 'latest',
'address': w3.to_checksum_address(address),
'topics': [TRANSFER_EVENT_SIGNATURE]
})
except Exception as e:
# [SMART RETRY] ุฅุฐุง ูุดู„ ุจุณุจุจ ุงู„ุญุฏุŒ ู‚ู„ู„ ุงู„ู…ุฏุฉ ู„ู„ุฑุจุน (1 ุณุงุนุฉ)
err_str = str(e).lower()
if 'limit' in err_str or 'range' in err_str or 'exceeded' in err_str:
print(f" โš ๏ธ [Web3] Range too big. Retrying with 1 hour window...")
blocks_back = int((1 * 3600) / block_time) # 1 hour
from_block = max(0, latest - blocks_back)
logs = await w3.eth.get_logs({
'fromBlock': from_block, 'toBlock': 'latest',
'address': w3.to_checksum_address(address),
'topics': [TRANSFER_EVENT_SIGNATURE]
})
else:
raise e # ุฎุทุฃ ุขุฎุฑ ู„ุง ูŠู…ูƒู† ุงู„ุชุนุงู…ู„ ู…ุนู‡
transfers = []
for log in logs:
try:
if len(log['topics']) < 3: continue
val_hex = log['data'].hex()
val_int = int(val_hex, 16)
amount = val_int / (10 ** decimals)
val_usd = amount * price
# ุงุณุชุฎุฏุงู… ุญุฏ ุฃุฏู†ู‰ 20k ู„ู„ู…ุถุงุฑุจุฉ ุงู„ุณุฑูŠุนุฉ
if val_usd < 20000.0: continue
from_addr = '0x' + log['topics'][1].hex()[-40:]
to_addr = '0x' + log['topics'][2].hex()[-40:]
transfers.append({
'hash': log['transactionHash'].hex(),
'from': from_addr,
'to': to_addr,
'value_usd': val_usd,
'timeStamp': time.time(),
'network': network,
'source': 'web3'
})
except: continue
return transfers
async def _get_solscan_token_data(self, address: str, hours: int, price: float):
params = {"address": address, "limit": 50}
data = await self.rpc_manager.get_solscan_api("/v2.0/token/transfer", params)
if not data or not data.get('data'): return []
transfers = []
cutoff = time.time() - (hours * 3600)
for tx in data['data']:
try:
ts = tx.get('blockTime', 0)
if ts < cutoff: continue
amount_raw = int(tx.get('amount', 0))
dec = tx.get('decimals', 9)
val_usd = (amount_raw / (10**dec)) * price
if val_usd < 20000.0: continue
transfers.append({
'hash': tx.get('signature'),
'from': tx.get('from_address'),
'to': tx.get('to_address'),
'value_usd': val_usd,
'timeStamp': ts,
'network': 'solana',
'source': 'solscan'
})
except: continue
return transfers
async def _get_moralis_token_data(self, address, chain, hours, price, decimals):
params = {
"chain": chain, "contract_address": address,
"order": "DESC", "limit": 100
}
data = await self.rpc_manager.get_moralis_api(params)
if not data or not data.get('result'): return []
transfers = []
for tx in data['result']:
try:
val = int(tx['value']) / (10**decimals) * price
if val < 20000.0: continue
transfers.append({
'hash': tx['transaction_hash'],
'from': tx['from_address'],
'to': tx['to_address'],
'value_usd': val,
'timeStamp': datetime.fromisoformat(tx['block_timestamp'][:-1]).timestamp(),
'source': 'moralis'
})
except: continue
return transfers
async def _get_scanner_token_data(self, address, network, hours, price, decimals):
config = self.rpc_manager.get_explorer_config(network)
if not config: return []
params = {
"module": "account", "action": "tokentx",
"contractaddress": address, "page": 1, "offset": 100,
"sort": "desc", "apikey": config.get('api_key')
}
data = await self.rpc_manager.get_scanner_api(config['api_url'], params)
if not data or not data.get('result') or isinstance(data['result'], str): return []
transfers = []
for tx in data['result']:
try:
val = int(tx['value']) / (10**decimals) * price
if val < 20000.0: continue
transfers.append({
'hash': tx['hash'],
'from': tx['from'],
'to': tx['to'],
'value_usd': val,
'timeStamp': int(tx['timeStamp']),
'source': 'scanner'
})
except: continue
return transfers
# ==============================================================================
# ๐Ÿ•ต๏ธโ€โ™‚๏ธ Contract Finding & Search
# ==============================================================================
async def _find_contract_address_enhanced(self, symbol: str):
base_symbol = symbol.split('/')[0].lower()
if base_symbol in self.contracts_db: return self.contracts_db[base_symbol]
print(f" ๐Ÿ” Searching CoinGecko for {base_symbol}...")
try:
data = await self.rpc_manager.get_coingecko_api('/search', params={'query': base_symbol})
if not data or not data.get('coins'): return None
best_id = None
for coin in data['coins']:
if coin['symbol'].lower() == base_symbol:
best_id = coin['id']
break
if not best_id: best_id = data['coins'][0]['id']
details = await self.rpc_manager.get_coingecko_api(f'/coins/{best_id}', params={
"localization": "false", "tickers": "false", "market_data": "false",
"community_data": "false", "developer_data": "false"
})
if not details or 'platforms' not in details: return None
platforms = details['platforms']
priority_nets = ['binance-smart-chain', 'polygon-pos', 'ethereum', 'solana', 'avalanche']
mapping = {'binance-smart-chain': 'bsc', 'polygon-pos': 'polygon', 'ethereum': 'ethereum', 'solana': 'solana', 'avalanche': 'avalanche'}
for net_key in priority_nets:
if net_key in platforms and platforms[net_key]:
res = {'address': platforms[net_key], 'network': mapping[net_key]}
self.contracts_db[base_symbol] = res
return res
except Exception: pass
return None
# ==============================================================================
# ๐Ÿ“ Analysis Logic
# ==============================================================================
def _analyze_transfer_list(self, symbol: str, transfers: List[Dict], daily_volume_usd: float) -> Dict[str, Any]:
stats = {
'to_exchanges_usd': 0.0, 'from_exchanges_usd': 0.0,
'deposit_count': 0, 'withdrawal_count': 0,
'whale_transfers_count': 0, 'total_volume': 0.0
}
for tx in transfers:
val = tx.get('value_usd', 0)
stats['whale_transfers_count'] += 1
stats['total_volume'] += val
to_addr = tx.get('to', '').lower()
from_addr = tx.get('from', '').lower()
is_deposit = to_addr in self.address_categories['exchange']
is_withdrawal = from_addr in self.address_categories['exchange']
if is_deposit:
stats['to_exchanges_usd'] += val
stats['deposit_count'] += 1
if is_withdrawal:
stats['from_exchanges_usd'] += val
stats['withdrawal_count'] += 1
stats['net_flow_usd'] = stats['to_exchanges_usd'] - stats['from_exchanges_usd']
return stats
def _generate_enhanced_trading_signal(self, analysis: Dict) -> Dict:
net = analysis.get('net_flow_usd', 0)
whales = analysis.get('whale_transfers_count', 0)
if whales < 3: return {'action': 'HOLD', 'confidence': 0.1, 'reason': 'Low Activity'}
if net > 500_000: return {'action': 'SELL', 'confidence': 0.8, 'reason': 'Exchange Inflow'}
elif net < -500_000: return {'action': 'BUY', 'confidence': 0.8, 'reason': 'Exchange Outflow'}
return {'action': 'WATCH', 'confidence': 0.5, 'reason': 'Mixed Activity'}
def _create_enhanced_llm_summary(self, signal, analysis):
return {'summary': f"Whales: {analysis.get('whale_transfers_count')}", 'action': signal['action']}
# ==============================================================================
# โš™๏ธ Helpers
# ==============================================================================
async def _get_token_price(self, symbol):
try:
res = await self.rpc_manager.get_coingecko_api('/simple/price',
params={'ids': COINGECKO_SYMBOL_MAPPING.get(symbol.split('/')[0], symbol), 'vs_currencies': 'usd'})
if res: return list(res.values())[0]['usd']
except: return 0.0
async def _get_token_decimals(self, address, network):
key = f"{address}_{network}"
if key in self.token_decimals_cache: return self.token_decimals_cache[key]
try:
w3 = self.rpc_manager.get_web3(network)
if w3:
contract = w3.eth.contract(address=w3.to_checksum_address(address), abi=ERC20_ABI)
dec = await contract.functions.decimals().call()
self.token_decimals_cache[key] = dec
return dec
except: pass
if network == 'solana': return 9
return 18
async def _save_learning_record(self, symbol, price, analysis, api_stats):
if not self.r2_service: return
try:
record = {'id': f"{symbol}_{int(time.time())}", 'symbol': symbol, 'price': price, 'analysis': analysis, 'api_stats': api_stats, 'timestamp': datetime.now(timezone.utc).isoformat()}
if hasattr(self.r2_service, 'save_record'): await self.r2_service.save_record(record)
except: pass
def _create_error_response(self, symbol, err): return {'symbol': symbol, 'data_available': False, 'error': err, 'trading_signal': {}}
def _create_native_coin_response(self, symbol): return {'symbol': symbol, 'data_available': False, 'error': 'Native Coin', 'trading_signal': {}}
def _create_no_contract_response(self, symbol): return {'symbol': symbol, 'data_available': False, 'error': 'No Contract', 'trading_signal': {}}
def _create_no_transfers_response(self, symbol): return {'symbol': symbol, 'data_available': True, 'summary': {'total': 0}, 'trading_signal': {'action': 'HOLD'}}
async def cleanup(self): print("๐Ÿ›‘ [WhaleMonitor] Cleanup.")