Spaces:
Paused
Paused
| import pandas as pd | |
| import numpy as np | |
| from collections import deque | |
| import logging | |
| import time | |
| import yaml | |
| logger = logging.getLogger(__name__) | |
| class DataEngine: | |
| def __init__(self): | |
| self.settings = yaml.safe_load(open("config/settings.yaml")) | |
| self.price_buffers = {} | |
| self.volume_buffers = {} | |
| self.candle_buffers = {} | |
| self.indicators_cache = {} | |
| self.max_price_buffer = 1000 | |
| self.max_candle_buffer = 200 | |
| pairs = yaml.safe_load(open("config/pairs.yaml"))["pairs"] | |
| for pair in pairs: | |
| self._init_buffers(pair) | |
| def _init_buffers(self, symbol): | |
| self.price_buffers[symbol] = deque(maxlen=self.max_price_buffer) | |
| self.volume_buffers[symbol] = deque(maxlen=self.max_price_buffer) | |
| intervals = ["1", "5", "15"] | |
| for interval in intervals: | |
| key = f"{symbol}_{interval}" | |
| self.candle_buffers[key] = deque(maxlen=self.max_candle_buffer) | |
| self.indicators_cache[key] = {} | |
| def update_price(self, symbol, price, volume=0): | |
| self.price_buffers[symbol].append(float(price)) | |
| if volume > 0: | |
| self.volume_buffers[symbol].append(float(volume)) | |
| def update_candle(self, symbol, interval, candle_data): | |
| key = f"{symbol}_{interval}" | |
| if isinstance(candle_data, dict): | |
| candle = { | |
| 'timestamp': int(candle_data.get('timestamp', candle_data.get('start', time.time() * 1000))), | |
| 'open': float(candle_data['open']), | |
| 'high': float(candle_data['high']), | |
| 'low': float(candle_data['low']), | |
| 'close': float(candle_data['close']), | |
| 'volume': float(candle_data.get('volume', 0)) | |
| } | |
| elif isinstance(candle_data, list): | |
| candle = { | |
| 'timestamp': int(candle_data[0]), | |
| 'open': float(candle_data[1]), | |
| 'high': float(candle_data[2]), | |
| 'low': float(candle_data[3]), | |
| 'close': float(candle_data[4]), | |
| 'volume': float(candle_data[5]) | |
| } | |
| else: | |
| logger.error(f"Invalid candle data format: {candle_data}") | |
| return | |
| self.candle_buffers[key].append(candle) | |
| self.update_price(symbol, candle['close'], candle['volume']) | |
| self.indicators_cache[key] = {} | |
| def get_prices(self, symbol, limit=100): | |
| return list(self.price_buffers.get(symbol, deque()))[-limit:] | |
| def get_candles(self, symbol, interval="1", limit=100): | |
| key = f"{symbol}_{interval}" | |
| candles = list(self.candle_buffers.get(key, deque()))[-limit:] | |
| if not candles: | |
| return pd.DataFrame() | |
| df = pd.DataFrame(candles) | |
| df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') | |
| df.set_index('timestamp', inplace=True) | |
| return df | |
| def calculate_ema(self, symbol, interval="1", period=9): | |
| key = f"{symbol}_{interval}" | |
| cache_key = f"ema_{period}" | |
| if cache_key in self.indicators_cache[key]: | |
| return self.indicators_cache[key][cache_key] | |
| df = self.get_candles(symbol, interval, limit=period * 2) | |
| if df.empty or len(df) < period: | |
| return None | |
| ema = df['close'].ewm(span=period, adjust=False).mean() | |
| self.indicators_cache[key][cache_key] = ema.iloc[-1] if not ema.empty else None | |
| return ema.iloc[-1] | |
| def calculate_rsi(self, symbol, interval="1", period=14): | |
| key = f"{symbol}_{interval}" | |
| cache_key = f"rsi_{period}" | |
| if cache_key in self.indicators_cache[key]: | |
| return self.indicators_cache[key][cache_key] | |
| df = self.get_candles(symbol, interval, limit=period * 3) | |
| if df.empty or len(df) < period + 1: | |
| return None | |
| delta = df['close'].diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() | |
| rs = gain / loss | |
| rsi = 100 - (100 / (1 + rs)) | |
| self.indicators_cache[key][cache_key] = rsi.iloc[-1] if not rsi.empty else None | |
| return rsi.iloc[-1] | |
| def calculate_adx(self, symbol, interval="1", period=14): | |
| key = f"{symbol}_{interval}" | |
| cache_key = f"adx_{period}" | |
| if cache_key in self.indicators_cache[key]: | |
| return self.indicators_cache[key][cache_key] | |
| df = self.get_candles(symbol, interval, limit=period * 3) | |
| if df.empty or len(df) < period + 1: | |
| return None | |
| df['hl'] = df['high'] - df['low'] | |
| df['hc'] = np.abs(df['high'] - df['close'].shift(1)) | |
| df['lc'] = np.abs(df['low'] - df['close'].shift(1)) | |
| df['tr'] = df[['hl', 'hc', 'lc']].max(axis=1) | |
| df['dm_plus'] = np.where((df['high'] - df['high'].shift(1)) > (df['low'].shift(1) - df['low']), | |
| np.maximum(df['high'] - df['high'].shift(1), 0), 0) | |
| df['dm_minus'] = np.where((df['low'].shift(1) - df['low']) > (df['high'] - df['high'].shift(1)), | |
| np.maximum(df['low'].shift(1) - df['low'], 0), 0) | |
| df['di_plus'] = 100 * (df['dm_plus'].rolling(window=period).mean() / df['tr'].rolling(window=period).mean()) | |
| df['di_minus'] = 100 * (df['dm_minus'].rolling(window=period).mean() / df['tr'].rolling(window=period).mean()) | |
| df['dx'] = 100 * np.abs(df['di_plus'] - df['di_minus']) / (df['di_plus'] + df['di_minus']) | |
| adx = df['dx'].rolling(window=period).mean() | |
| self.indicators_cache[key][cache_key] = adx.iloc[-1] if not adx.empty else None | |
| return adx.iloc[-1] | |
| def get_orderbook_imbalance(self, symbol, orderbook_data=None): | |
| if orderbook_data is None: | |
| return 0.0 | |
| bids = orderbook_data.get('b', []) | |
| asks = orderbook_data.get('a', []) | |
| bid_volume = sum(float(bid[1]) for bid in bids[:10]) | |
| ask_volume = sum(float(ask[1]) for ask in asks[:10]) | |
| total_volume = bid_volume + ask_volume | |
| if total_volume == 0: | |
| return 0.0 | |
| imbalance = (bid_volume - ask_volume) / total_volume | |
| return imbalance | |
| def get_spread(self, symbol, orderbook_data=None): | |
| if orderbook_data is None: | |
| return 0.0 | |
| bids = orderbook_data.get('b', []) | |
| asks = orderbook_data.get('a', []) | |
| if not bids or not asks: | |
| return 0.0 | |
| best_bid = float(bids[0][0]) | |
| best_ask = float(asks[0][0]) | |
| spread = (best_ask - best_bid) / best_bid | |
| return spread | |
| def detect_volume_spike(self, symbol, interval="1", threshold=2.0): | |
| df = self.get_candles(symbol, interval, limit=20) | |
| if df.empty or len(df) < 5: | |
| return False | |
| current_volume = df['volume'].iloc[-1] | |
| avg_volume = df['volume'].iloc[:-1].mean() | |
| if avg_volume == 0: | |
| return False | |
| return (current_volume / avg_volume) > threshold | |
| def get_price_change_rate(self, symbol, periods=5): | |
| prices = self.get_prices(symbol, limit=periods + 1) | |
| if len(prices) < periods + 1: | |
| return 0.0 | |
| old_price = prices[0] | |
| new_price = prices[-1] | |
| if old_price == 0: | |
| return 0.0 | |
| return (new_price - old_price) / old_price | |
| def clear_cache(self, symbol=None, interval=None): | |
| if symbol and interval: | |
| key = f"{symbol}_{interval}" | |
| if key in self.indicators_cache: | |
| self.indicators_cache[key] = {} | |
| elif symbol: | |
| for key in list(self.indicators_cache.keys()): | |
| if key.startswith(f"{symbol}_"): | |
| self.indicators_cache[key] = {} | |
| else: | |
| self.indicators_cache = {} | |
| def get_buffer_status(self): | |
| status = { | |
| 'price_buffers': {symbol: len(buffer) for symbol, buffer in self.price_buffers.items()}, | |
| 'candle_buffers': {key: len(buffer) for key, buffer in self.candle_buffers.items()}, | |
| 'indicators_cache': {key: list(cache.keys()) for key, cache in self.indicators_cache.items()} | |
| } | |
| return status | |