import pandas as pd import numpy as np from typing import Optional, List, Dict, Any import logging logger = logging.getLogger(__name__) class TechnicalIndicators: def __init__(self): pass @staticmethod def sma(data: pd.Series, period: int) -> pd.Series: return data.rolling(window=period).mean() @staticmethod def ema(data: pd.Series, period: int) -> pd.Series: return data.ewm(span=period, adjust=False).mean() @staticmethod def rsi(data: pd.Series, period: int = 14) -> pd.Series: delta = data.diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi @staticmethod def macd(data: pd.Series, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9) -> Dict[str, pd.Series]: fast_ema = TechnicalIndicators.ema(data, fast_period) slow_ema = TechnicalIndicators.ema(data, slow_period) macd_line = fast_ema - slow_ema signal_line = TechnicalIndicators.ema(macd_line, signal_period) histogram = macd_line - signal_line return { 'macd': macd_line, 'signal': signal_line, 'histogram': histogram } @staticmethod def bollinger_bands(data: pd.Series, period: int = 20, std_dev: float = 2.0) -> Dict[str, pd.Series]: sma = TechnicalIndicators.sma(data, period) std = data.rolling(window=period).std() upper_band = sma + (std * std_dev) lower_band = sma - (std * std_dev) return { 'upper': upper_band, 'middle': sma, 'lower': lower_band } @staticmethod def adx(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: hl = high - low hc = np.abs(high - close.shift(1)) lc = np.abs(low - close.shift(1)) tr = pd.concat([hl, hc, lc], axis=1).max(axis=1) dm_plus = np.where((high - high.shift(1)) > (low.shift(1) - low), np.maximum(high - high.shift(1), 0), 0) dm_minus = np.where((low.shift(1) - low) > (high - high.shift(1)), np.maximum(low.shift(1) - low), 0) tr_smooth = tr.rolling(window=period).mean() dm_plus_smooth = pd.Series(dm_plus).rolling(window=period).mean() dm_minus_smooth = pd.Series(dm_minus).rolling(window=period).mean() di_plus = 100 * (dm_plus_smooth / tr_smooth) di_minus = 100 * (dm_minus_smooth / tr_smooth) dx = 100 * np.abs(di_plus - di_minus) / (di_plus + di_minus) adx = dx.rolling(window=period).mean() return adx @staticmethod def stochastic_oscillator(high: pd.Series, low: pd.Series, close: pd.Series, k_period: int = 14, d_period: int = 3) -> Dict[str, pd.Series]: lowest_low = low.rolling(window=k_period).min() highest_high = high.rolling(window=k_period).max() k_percent = 100 * ((close - lowest_low) / (highest_high - lowest_low)) d_percent = k_percent.rolling(window=d_period).mean() return { 'k': k_percent, 'd': d_percent } @staticmethod def williams_r(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: highest_high = high.rolling(window=period).max() lowest_low = low.rolling(window=period).min() williams_r = -100 * ((highest_high - close) / (highest_high - lowest_low)) return williams_r @staticmethod def cci(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 20) -> pd.Series: typical_price = (high + low + close) / 3 sma_tp = TechnicalIndicators.sma(typical_price, period) mad = lambda x: np.mean(np.abs(x - x.mean())) mean_deviation = typical_price.rolling(window=period).apply(mad, raw=False) cci = (typical_price - sma_tp) / (0.015 * mean_deviation) return cci @staticmethod def atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: hl = high - low hc = np.abs(high - close.shift(1)) lc = np.abs(low - close.shift(1)) tr = pd.concat([hl, hc, lc], axis=1).max(axis=1) atr = tr.rolling(window=period).mean() return atr @staticmethod def ichimoku_cloud(high: pd.Series, low: pd.Series, conversion_period: int = 9, base_period: int = 26, span_b_period: int = 52, lagging_period: int = 26) -> Dict[str, pd.Series]: conversion_line = (high.rolling(window=conversion_period).max() + low.rolling(window=conversion_period).min()) / 2 base_line = (high.rolling(window=base_period).max() + low.rolling(window=base_period).min()) / 2 leading_span_a = ((conversion_line + base_line) / 2).shift(lagging_period) leading_span_b = ((high.rolling(window=span_b_period).max() + low.rolling(window=span_b_period).min()) / 2).shift(lagging_period) lagging_span = close.shift(-lagging_period) return { 'conversion_line': conversion_line, 'base_line': base_line, 'leading_span_a': leading_span_a, 'leading_span_b': leading_span_b, 'lagging_span': lagging_span } @staticmethod def fibonacci_retracements(high: pd.Series, low: pd.Series) -> Dict[str, float]: max_high = high.max() min_low = low.min() diff = max_high - min_low levels = { '0.0': max_high, '0.236': max_high - 0.236 * diff, '0.382': max_high - 0.382 * diff, '0.5': max_high - 0.5 * diff, '0.618': max_high - 0.618 * diff, '0.786': max_high - 0.786 * diff, '1.0': min_low } return levels @staticmethod def detect_crossover(fast_series: pd.Series, slow_series: pd.Series) -> Dict[str, bool]: crossed_above = (fast_series.shift(1) <= slow_series.shift(1)) & (fast_series > slow_series) crossed_below = (fast_series.shift(1) >= slow_series.shift(1)) & (fast_series < slow_series) return { 'crossed_above': crossed_above.iloc[-1] if not crossed_above.empty else False, 'crossed_below': crossed_below.iloc[-1] if not crossed_below.empty else False } @staticmethod def volume_profile(volume: pd.Series, price: pd.Series, bins: int = 50) -> Dict[str, Any]: price_min, price_max = price.min(), price.max() price_bins = np.linspace(price_min, price_max, bins) volume_profile = [] for i in range(len(price_bins) - 1): mask = (price >= price_bins[i]) & (price < price_bins[i + 1]) bin_volume = volume[mask].sum() volume_profile.append({ 'price_level': (price_bins[i] + price_bins[i + 1]) / 2, 'volume': bin_volume }) poc = max(volume_profile, key=lambda x: x['volume'])['price_level'] total_volume = sum(x['volume'] for x in volume_profile) sorted_profile = sorted(volume_profile, key=lambda x: x['volume'], reverse=True) cumulative_volume = 0 value_area_high = None value_area_low = None for level in sorted_profile: cumulative_volume += level['volume'] if cumulative_volume >= total_volume * 0.7: break in_value_area = sorted([x for x in volume_profile if x['volume'] > 0], key=lambda x: x['volume'], reverse=True)[:len([x for x in sorted_profile if cumulative_volume >= total_volume * 0.7])] if in_value_area: prices_in_va = [x['price_level'] for x in in_value_area] value_area_high = max(prices_in_va) value_area_low = min(prices_in_va) return { 'volume_profile': volume_profile, 'poc': poc, 'value_area_high': value_area_high, 'value_area_low': value_area_low } @staticmethod def order_flow_imbalance(orderbook: Dict) -> Dict[str, float]: if not orderbook or 'b' not in orderbook or 'a' not in orderbook: return {'imbalance': 0.0, 'bid_pressure': 0.0, 'ask_pressure': 0.0} bids = orderbook['b'][:10] asks = orderbook['a'][:10] bid_volume = sum(float(bid[1]) for bid in bids) ask_volume = sum(float(ask[1]) for ask in asks) total_volume = bid_volume + ask_volume if total_volume == 0: return {'imbalance': 0.0, 'bid_pressure': 0.0, 'ask_pressure': 0.0} imbalance = (bid_volume - ask_volume) / total_volume bid_pressure = bid_volume / total_volume ask_pressure = ask_volume / total_volume return { 'imbalance': imbalance, 'bid_pressure': bid_pressure, 'ask_pressure': ask_pressure }