import pandas as pd import numpy as np from collections import deque import logging import time import yaml logger = logging.getLogger(__name__) class DataEngine: def __init__(self): self.settings = yaml.safe_load(open("config/settings.yaml")) self.price_buffers = {} self.volume_buffers = {} self.candle_buffers = {} self.indicators_cache = {} self.max_price_buffer = 1000 self.max_candle_buffer = 200 pairs = yaml.safe_load(open("config/pairs.yaml"))["pairs"] for pair in pairs: self._init_buffers(pair) def _init_buffers(self, symbol): self.price_buffers[symbol] = deque(maxlen=self.max_price_buffer) self.volume_buffers[symbol] = deque(maxlen=self.max_price_buffer) intervals = ["1", "5", "15"] for interval in intervals: key = f"{symbol}_{interval}" self.candle_buffers[key] = deque(maxlen=self.max_candle_buffer) self.indicators_cache[key] = {} def update_price(self, symbol, price, volume=0): self.price_buffers[symbol].append(float(price)) if volume > 0: self.volume_buffers[symbol].append(float(volume)) def update_candle(self, symbol, interval, candle_data): key = f"{symbol}_{interval}" if isinstance(candle_data, dict): candle = { 'timestamp': int(candle_data.get('timestamp', candle_data.get('start', time.time() * 1000))), 'open': float(candle_data['open']), 'high': float(candle_data['high']), 'low': float(candle_data['low']), 'close': float(candle_data['close']), 'volume': float(candle_data.get('volume', 0)) } elif isinstance(candle_data, list): candle = { 'timestamp': int(candle_data[0]), 'open': float(candle_data[1]), 'high': float(candle_data[2]), 'low': float(candle_data[3]), 'close': float(candle_data[4]), 'volume': float(candle_data[5]) } else: logger.error(f"Invalid candle data format: {candle_data}") return self.candle_buffers[key].append(candle) self.update_price(symbol, candle['close'], candle['volume']) self.indicators_cache[key] = {} def get_prices(self, symbol, limit=100): return list(self.price_buffers.get(symbol, deque()))[-limit:] def get_candles(self, symbol, interval="1", limit=100): key = f"{symbol}_{interval}" candles = list(self.candle_buffers.get(key, deque()))[-limit:] if not candles: return pd.DataFrame() df = pd.DataFrame(candles) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') df.set_index('timestamp', inplace=True) return df def calculate_ema(self, symbol, interval="1", period=9): key = f"{symbol}_{interval}" cache_key = f"ema_{period}" if cache_key in self.indicators_cache[key]: return self.indicators_cache[key][cache_key] df = self.get_candles(symbol, interval, limit=period * 2) if df.empty or len(df) < period: return None ema = df['close'].ewm(span=period, adjust=False).mean() self.indicators_cache[key][cache_key] = ema.iloc[-1] if not ema.empty else None return ema.iloc[-1] def calculate_rsi(self, symbol, interval="1", period=14): key = f"{symbol}_{interval}" cache_key = f"rsi_{period}" if cache_key in self.indicators_cache[key]: return self.indicators_cache[key][cache_key] df = self.get_candles(symbol, interval, limit=period * 3) if df.empty or len(df) < period + 1: return None delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) self.indicators_cache[key][cache_key] = rsi.iloc[-1] if not rsi.empty else None return rsi.iloc[-1] def calculate_adx(self, symbol, interval="1", period=14): key = f"{symbol}_{interval}" cache_key = f"adx_{period}" if cache_key in self.indicators_cache[key]: return self.indicators_cache[key][cache_key] df = self.get_candles(symbol, interval, limit=period * 3) if df.empty or len(df) < period + 1: return None df['hl'] = df['high'] - df['low'] df['hc'] = np.abs(df['high'] - df['close'].shift(1)) df['lc'] = np.abs(df['low'] - df['close'].shift(1)) df['tr'] = df[['hl', 'hc', 'lc']].max(axis=1) df['dm_plus'] = np.where((df['high'] - df['high'].shift(1)) > (df['low'].shift(1) - df['low']), np.maximum(df['high'] - df['high'].shift(1), 0), 0) df['dm_minus'] = np.where((df['low'].shift(1) - df['low']) > (df['high'] - df['high'].shift(1)), np.maximum(df['low'].shift(1) - df['low'], 0), 0) df['di_plus'] = 100 * (df['dm_plus'].rolling(window=period).mean() / df['tr'].rolling(window=period).mean()) df['di_minus'] = 100 * (df['dm_minus'].rolling(window=period).mean() / df['tr'].rolling(window=period).mean()) df['dx'] = 100 * np.abs(df['di_plus'] - df['di_minus']) / (df['di_plus'] + df['di_minus']) adx = df['dx'].rolling(window=period).mean() self.indicators_cache[key][cache_key] = adx.iloc[-1] if not adx.empty else None return adx.iloc[-1] def get_orderbook_imbalance(self, symbol, orderbook_data=None): if orderbook_data is None: return 0.0 bids = orderbook_data.get('b', []) asks = orderbook_data.get('a', []) bid_volume = sum(float(bid[1]) for bid in bids[:10]) ask_volume = sum(float(ask[1]) for ask in asks[:10]) total_volume = bid_volume + ask_volume if total_volume == 0: return 0.0 imbalance = (bid_volume - ask_volume) / total_volume return imbalance def get_spread(self, symbol, orderbook_data=None): if orderbook_data is None: return 0.0 bids = orderbook_data.get('b', []) asks = orderbook_data.get('a', []) if not bids or not asks: return 0.0 best_bid = float(bids[0][0]) best_ask = float(asks[0][0]) spread = (best_ask - best_bid) / best_bid return spread def detect_volume_spike(self, symbol, interval="1", threshold=2.0): df = self.get_candles(symbol, interval, limit=20) if df.empty or len(df) < 5: return False current_volume = df['volume'].iloc[-1] avg_volume = df['volume'].iloc[:-1].mean() if avg_volume == 0: return False return (current_volume / avg_volume) > threshold def get_price_change_rate(self, symbol, periods=5): prices = self.get_prices(symbol, limit=periods + 1) if len(prices) < periods + 1: return 0.0 old_price = prices[0] new_price = prices[-1] if old_price == 0: return 0.0 return (new_price - old_price) / old_price def clear_cache(self, symbol=None, interval=None): if symbol and interval: key = f"{symbol}_{interval}" if key in self.indicators_cache: self.indicators_cache[key] = {} elif symbol: for key in list(self.indicators_cache.keys()): if key.startswith(f"{symbol}_"): self.indicators_cache[key] = {} else: self.indicators_cache = {} def get_buffer_status(self): status = { 'price_buffers': {symbol: len(buffer) for symbol, buffer in self.price_buffers.items()}, 'candle_buffers': {key: len(buffer) for key, buffer in self.candle_buffers.items()}, 'indicators_cache': {key: list(cache.keys()) for key, cache in self.indicators_cache.items()} } return status