scalperBot / core /indicators.py
nexusbert's picture
Upload 36 files
96e0cc2 verified
import pandas as pd
import numpy as np
from typing import Optional, List, Dict, Any
import logging
logger = logging.getLogger(__name__)
class TechnicalIndicators:
def __init__(self):
pass
@staticmethod
def sma(data: pd.Series, period: int) -> pd.Series:
return data.rolling(window=period).mean()
@staticmethod
def ema(data: pd.Series, period: int) -> pd.Series:
return data.ewm(span=period, adjust=False).mean()
@staticmethod
def rsi(data: pd.Series, period: int = 14) -> pd.Series:
delta = data.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
@staticmethod
def macd(data: pd.Series, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9) -> Dict[str, pd.Series]:
fast_ema = TechnicalIndicators.ema(data, fast_period)
slow_ema = TechnicalIndicators.ema(data, slow_period)
macd_line = fast_ema - slow_ema
signal_line = TechnicalIndicators.ema(macd_line, signal_period)
histogram = macd_line - signal_line
return {
'macd': macd_line,
'signal': signal_line,
'histogram': histogram
}
@staticmethod
def bollinger_bands(data: pd.Series, period: int = 20, std_dev: float = 2.0) -> Dict[str, pd.Series]:
sma = TechnicalIndicators.sma(data, period)
std = data.rolling(window=period).std()
upper_band = sma + (std * std_dev)
lower_band = sma - (std * std_dev)
return {
'upper': upper_band,
'middle': sma,
'lower': lower_band
}
@staticmethod
def adx(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series:
hl = high - low
hc = np.abs(high - close.shift(1))
lc = np.abs(low - close.shift(1))
tr = pd.concat([hl, hc, lc], axis=1).max(axis=1)
dm_plus = np.where((high - high.shift(1)) > (low.shift(1) - low),
np.maximum(high - high.shift(1), 0), 0)
dm_minus = np.where((low.shift(1) - low) > (high - high.shift(1)),
np.maximum(low.shift(1) - low), 0)
tr_smooth = tr.rolling(window=period).mean()
dm_plus_smooth = pd.Series(dm_plus).rolling(window=period).mean()
dm_minus_smooth = pd.Series(dm_minus).rolling(window=period).mean()
di_plus = 100 * (dm_plus_smooth / tr_smooth)
di_minus = 100 * (dm_minus_smooth / tr_smooth)
dx = 100 * np.abs(di_plus - di_minus) / (di_plus + di_minus)
adx = dx.rolling(window=period).mean()
return adx
@staticmethod
def stochastic_oscillator(high: pd.Series, low: pd.Series, close: pd.Series,
k_period: int = 14, d_period: int = 3) -> Dict[str, pd.Series]:
lowest_low = low.rolling(window=k_period).min()
highest_high = high.rolling(window=k_period).max()
k_percent = 100 * ((close - lowest_low) / (highest_high - lowest_low))
d_percent = k_percent.rolling(window=d_period).mean()
return {
'k': k_percent,
'd': d_percent
}
@staticmethod
def williams_r(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series:
highest_high = high.rolling(window=period).max()
lowest_low = low.rolling(window=period).min()
williams_r = -100 * ((highest_high - close) / (highest_high - lowest_low))
return williams_r
@staticmethod
def cci(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 20) -> pd.Series:
typical_price = (high + low + close) / 3
sma_tp = TechnicalIndicators.sma(typical_price, period)
mad = lambda x: np.mean(np.abs(x - x.mean()))
mean_deviation = typical_price.rolling(window=period).apply(mad, raw=False)
cci = (typical_price - sma_tp) / (0.015 * mean_deviation)
return cci
@staticmethod
def atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series:
hl = high - low
hc = np.abs(high - close.shift(1))
lc = np.abs(low - close.shift(1))
tr = pd.concat([hl, hc, lc], axis=1).max(axis=1)
atr = tr.rolling(window=period).mean()
return atr
@staticmethod
def ichimoku_cloud(high: pd.Series, low: pd.Series, conversion_period: int = 9,
base_period: int = 26, span_b_period: int = 52,
lagging_period: int = 26) -> Dict[str, pd.Series]:
conversion_line = (high.rolling(window=conversion_period).max() +
low.rolling(window=conversion_period).min()) / 2
base_line = (high.rolling(window=base_period).max() +
low.rolling(window=base_period).min()) / 2
leading_span_a = ((conversion_line + base_line) / 2).shift(lagging_period)
leading_span_b = ((high.rolling(window=span_b_period).max() +
low.rolling(window=span_b_period).min()) / 2).shift(lagging_period)
lagging_span = close.shift(-lagging_period)
return {
'conversion_line': conversion_line,
'base_line': base_line,
'leading_span_a': leading_span_a,
'leading_span_b': leading_span_b,
'lagging_span': lagging_span
}
@staticmethod
def fibonacci_retracements(high: pd.Series, low: pd.Series) -> Dict[str, float]:
max_high = high.max()
min_low = low.min()
diff = max_high - min_low
levels = {
'0.0': max_high,
'0.236': max_high - 0.236 * diff,
'0.382': max_high - 0.382 * diff,
'0.5': max_high - 0.5 * diff,
'0.618': max_high - 0.618 * diff,
'0.786': max_high - 0.786 * diff,
'1.0': min_low
}
return levels
@staticmethod
def detect_crossover(fast_series: pd.Series, slow_series: pd.Series) -> Dict[str, bool]:
crossed_above = (fast_series.shift(1) <= slow_series.shift(1)) & (fast_series > slow_series)
crossed_below = (fast_series.shift(1) >= slow_series.shift(1)) & (fast_series < slow_series)
return {
'crossed_above': crossed_above.iloc[-1] if not crossed_above.empty else False,
'crossed_below': crossed_below.iloc[-1] if not crossed_below.empty else False
}
@staticmethod
def volume_profile(volume: pd.Series, price: pd.Series, bins: int = 50) -> Dict[str, Any]:
price_min, price_max = price.min(), price.max()
price_bins = np.linspace(price_min, price_max, bins)
volume_profile = []
for i in range(len(price_bins) - 1):
mask = (price >= price_bins[i]) & (price < price_bins[i + 1])
bin_volume = volume[mask].sum()
volume_profile.append({
'price_level': (price_bins[i] + price_bins[i + 1]) / 2,
'volume': bin_volume
})
poc = max(volume_profile, key=lambda x: x['volume'])['price_level']
total_volume = sum(x['volume'] for x in volume_profile)
sorted_profile = sorted(volume_profile, key=lambda x: x['volume'], reverse=True)
cumulative_volume = 0
value_area_high = None
value_area_low = None
for level in sorted_profile:
cumulative_volume += level['volume']
if cumulative_volume >= total_volume * 0.7:
break
in_value_area = sorted([x for x in volume_profile if x['volume'] > 0],
key=lambda x: x['volume'], reverse=True)[:len([x for x in sorted_profile if cumulative_volume >= total_volume * 0.7])]
if in_value_area:
prices_in_va = [x['price_level'] for x in in_value_area]
value_area_high = max(prices_in_va)
value_area_low = min(prices_in_va)
return {
'volume_profile': volume_profile,
'poc': poc,
'value_area_high': value_area_high,
'value_area_low': value_area_low
}
@staticmethod
def order_flow_imbalance(orderbook: Dict) -> Dict[str, float]:
if not orderbook or 'b' not in orderbook or 'a' not in orderbook:
return {'imbalance': 0.0, 'bid_pressure': 0.0, 'ask_pressure': 0.0}
bids = orderbook['b'][:10]
asks = orderbook['a'][:10]
bid_volume = sum(float(bid[1]) for bid in bids)
ask_volume = sum(float(ask[1]) for ask in asks)
total_volume = bid_volume + ask_volume
if total_volume == 0:
return {'imbalance': 0.0, 'bid_pressure': 0.0, 'ask_pressure': 0.0}
imbalance = (bid_volume - ask_volume) / total_volume
bid_pressure = bid_volume / total_volume
ask_pressure = ask_volume / total_volume
return {
'imbalance': imbalance,
'bid_pressure': bid_pressure,
'ask_pressure': ask_pressure
}