scalperBot / core /data_engine.py
nexusbert's picture
Upload 36 files
96e0cc2 verified
import pandas as pd
import numpy as np
from collections import deque
import logging
import time
import yaml
logger = logging.getLogger(__name__)
class DataEngine:
def __init__(self):
self.settings = yaml.safe_load(open("config/settings.yaml"))
self.price_buffers = {}
self.volume_buffers = {}
self.candle_buffers = {}
self.indicators_cache = {}
self.max_price_buffer = 1000
self.max_candle_buffer = 200
pairs = yaml.safe_load(open("config/pairs.yaml"))["pairs"]
for pair in pairs:
self._init_buffers(pair)
def _init_buffers(self, symbol):
self.price_buffers[symbol] = deque(maxlen=self.max_price_buffer)
self.volume_buffers[symbol] = deque(maxlen=self.max_price_buffer)
intervals = ["1", "5", "15"]
for interval in intervals:
key = f"{symbol}_{interval}"
self.candle_buffers[key] = deque(maxlen=self.max_candle_buffer)
self.indicators_cache[key] = {}
def update_price(self, symbol, price, volume=0):
self.price_buffers[symbol].append(float(price))
if volume > 0:
self.volume_buffers[symbol].append(float(volume))
def update_candle(self, symbol, interval, candle_data):
key = f"{symbol}_{interval}"
if isinstance(candle_data, dict):
candle = {
'timestamp': int(candle_data.get('timestamp', candle_data.get('start', time.time() * 1000))),
'open': float(candle_data['open']),
'high': float(candle_data['high']),
'low': float(candle_data['low']),
'close': float(candle_data['close']),
'volume': float(candle_data.get('volume', 0))
}
elif isinstance(candle_data, list):
candle = {
'timestamp': int(candle_data[0]),
'open': float(candle_data[1]),
'high': float(candle_data[2]),
'low': float(candle_data[3]),
'close': float(candle_data[4]),
'volume': float(candle_data[5])
}
else:
logger.error(f"Invalid candle data format: {candle_data}")
return
self.candle_buffers[key].append(candle)
self.update_price(symbol, candle['close'], candle['volume'])
self.indicators_cache[key] = {}
def get_prices(self, symbol, limit=100):
return list(self.price_buffers.get(symbol, deque()))[-limit:]
def get_candles(self, symbol, interval="1", limit=100):
key = f"{symbol}_{interval}"
candles = list(self.candle_buffers.get(key, deque()))[-limit:]
if not candles:
return pd.DataFrame()
df = pd.DataFrame(candles)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
return df
def calculate_ema(self, symbol, interval="1", period=9):
key = f"{symbol}_{interval}"
cache_key = f"ema_{period}"
if cache_key in self.indicators_cache[key]:
return self.indicators_cache[key][cache_key]
df = self.get_candles(symbol, interval, limit=period * 2)
if df.empty or len(df) < period:
return None
ema = df['close'].ewm(span=period, adjust=False).mean()
self.indicators_cache[key][cache_key] = ema.iloc[-1] if not ema.empty else None
return ema.iloc[-1]
def calculate_rsi(self, symbol, interval="1", period=14):
key = f"{symbol}_{interval}"
cache_key = f"rsi_{period}"
if cache_key in self.indicators_cache[key]:
return self.indicators_cache[key][cache_key]
df = self.get_candles(symbol, interval, limit=period * 3)
if df.empty or len(df) < period + 1:
return None
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
self.indicators_cache[key][cache_key] = rsi.iloc[-1] if not rsi.empty else None
return rsi.iloc[-1]
def calculate_adx(self, symbol, interval="1", period=14):
key = f"{symbol}_{interval}"
cache_key = f"adx_{period}"
if cache_key in self.indicators_cache[key]:
return self.indicators_cache[key][cache_key]
df = self.get_candles(symbol, interval, limit=period * 3)
if df.empty or len(df) < period + 1:
return None
df['hl'] = df['high'] - df['low']
df['hc'] = np.abs(df['high'] - df['close'].shift(1))
df['lc'] = np.abs(df['low'] - df['close'].shift(1))
df['tr'] = df[['hl', 'hc', 'lc']].max(axis=1)
df['dm_plus'] = np.where((df['high'] - df['high'].shift(1)) > (df['low'].shift(1) - df['low']),
np.maximum(df['high'] - df['high'].shift(1), 0), 0)
df['dm_minus'] = np.where((df['low'].shift(1) - df['low']) > (df['high'] - df['high'].shift(1)),
np.maximum(df['low'].shift(1) - df['low'], 0), 0)
df['di_plus'] = 100 * (df['dm_plus'].rolling(window=period).mean() / df['tr'].rolling(window=period).mean())
df['di_minus'] = 100 * (df['dm_minus'].rolling(window=period).mean() / df['tr'].rolling(window=period).mean())
df['dx'] = 100 * np.abs(df['di_plus'] - df['di_minus']) / (df['di_plus'] + df['di_minus'])
adx = df['dx'].rolling(window=period).mean()
self.indicators_cache[key][cache_key] = adx.iloc[-1] if not adx.empty else None
return adx.iloc[-1]
def get_orderbook_imbalance(self, symbol, orderbook_data=None):
if orderbook_data is None:
return 0.0
bids = orderbook_data.get('b', [])
asks = orderbook_data.get('a', [])
bid_volume = sum(float(bid[1]) for bid in bids[:10])
ask_volume = sum(float(ask[1]) for ask in asks[:10])
total_volume = bid_volume + ask_volume
if total_volume == 0:
return 0.0
imbalance = (bid_volume - ask_volume) / total_volume
return imbalance
def get_spread(self, symbol, orderbook_data=None):
if orderbook_data is None:
return 0.0
bids = orderbook_data.get('b', [])
asks = orderbook_data.get('a', [])
if not bids or not asks:
return 0.0
best_bid = float(bids[0][0])
best_ask = float(asks[0][0])
spread = (best_ask - best_bid) / best_bid
return spread
def detect_volume_spike(self, symbol, interval="1", threshold=2.0):
df = self.get_candles(symbol, interval, limit=20)
if df.empty or len(df) < 5:
return False
current_volume = df['volume'].iloc[-1]
avg_volume = df['volume'].iloc[:-1].mean()
if avg_volume == 0:
return False
return (current_volume / avg_volume) > threshold
def get_price_change_rate(self, symbol, periods=5):
prices = self.get_prices(symbol, limit=periods + 1)
if len(prices) < periods + 1:
return 0.0
old_price = prices[0]
new_price = prices[-1]
if old_price == 0:
return 0.0
return (new_price - old_price) / old_price
def clear_cache(self, symbol=None, interval=None):
if symbol and interval:
key = f"{symbol}_{interval}"
if key in self.indicators_cache:
self.indicators_cache[key] = {}
elif symbol:
for key in list(self.indicators_cache.keys()):
if key.startswith(f"{symbol}_"):
self.indicators_cache[key] = {}
else:
self.indicators_cache = {}
def get_buffer_status(self):
status = {
'price_buffers': {symbol: len(buffer) for symbol, buffer in self.price_buffers.items()},
'candle_buffers': {key: len(buffer) for key, buffer in self.candle_buffers.items()},
'indicators_cache': {key: list(cache.keys()) for key, cache in self.indicators_cache.items()}
}
return status