Spaces:
Paused
Paused
| import pandas as pd | |
| import numpy as np | |
| from typing import Optional, List, Dict, Any | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class TechnicalIndicators: | |
| def __init__(self): | |
| pass | |
| def sma(data: pd.Series, period: int) -> pd.Series: | |
| return data.rolling(window=period).mean() | |
| def ema(data: pd.Series, period: int) -> pd.Series: | |
| return data.ewm(span=period, adjust=False).mean() | |
| def rsi(data: pd.Series, period: int = 14) -> pd.Series: | |
| delta = data.diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() | |
| rs = gain / loss | |
| rsi = 100 - (100 / (1 + rs)) | |
| return rsi | |
| def macd(data: pd.Series, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9) -> Dict[str, pd.Series]: | |
| fast_ema = TechnicalIndicators.ema(data, fast_period) | |
| slow_ema = TechnicalIndicators.ema(data, slow_period) | |
| macd_line = fast_ema - slow_ema | |
| signal_line = TechnicalIndicators.ema(macd_line, signal_period) | |
| histogram = macd_line - signal_line | |
| return { | |
| 'macd': macd_line, | |
| 'signal': signal_line, | |
| 'histogram': histogram | |
| } | |
| def bollinger_bands(data: pd.Series, period: int = 20, std_dev: float = 2.0) -> Dict[str, pd.Series]: | |
| sma = TechnicalIndicators.sma(data, period) | |
| std = data.rolling(window=period).std() | |
| upper_band = sma + (std * std_dev) | |
| lower_band = sma - (std * std_dev) | |
| return { | |
| 'upper': upper_band, | |
| 'middle': sma, | |
| 'lower': lower_band | |
| } | |
| def adx(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: | |
| hl = high - low | |
| hc = np.abs(high - close.shift(1)) | |
| lc = np.abs(low - close.shift(1)) | |
| tr = pd.concat([hl, hc, lc], axis=1).max(axis=1) | |
| dm_plus = np.where((high - high.shift(1)) > (low.shift(1) - low), | |
| np.maximum(high - high.shift(1), 0), 0) | |
| dm_minus = np.where((low.shift(1) - low) > (high - high.shift(1)), | |
| np.maximum(low.shift(1) - low), 0) | |
| tr_smooth = tr.rolling(window=period).mean() | |
| dm_plus_smooth = pd.Series(dm_plus).rolling(window=period).mean() | |
| dm_minus_smooth = pd.Series(dm_minus).rolling(window=period).mean() | |
| di_plus = 100 * (dm_plus_smooth / tr_smooth) | |
| di_minus = 100 * (dm_minus_smooth / tr_smooth) | |
| dx = 100 * np.abs(di_plus - di_minus) / (di_plus + di_minus) | |
| adx = dx.rolling(window=period).mean() | |
| return adx | |
| def stochastic_oscillator(high: pd.Series, low: pd.Series, close: pd.Series, | |
| k_period: int = 14, d_period: int = 3) -> Dict[str, pd.Series]: | |
| lowest_low = low.rolling(window=k_period).min() | |
| highest_high = high.rolling(window=k_period).max() | |
| k_percent = 100 * ((close - lowest_low) / (highest_high - lowest_low)) | |
| d_percent = k_percent.rolling(window=d_period).mean() | |
| return { | |
| 'k': k_percent, | |
| 'd': d_percent | |
| } | |
| def williams_r(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: | |
| highest_high = high.rolling(window=period).max() | |
| lowest_low = low.rolling(window=period).min() | |
| williams_r = -100 * ((highest_high - close) / (highest_high - lowest_low)) | |
| return williams_r | |
| def cci(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 20) -> pd.Series: | |
| typical_price = (high + low + close) / 3 | |
| sma_tp = TechnicalIndicators.sma(typical_price, period) | |
| mad = lambda x: np.mean(np.abs(x - x.mean())) | |
| mean_deviation = typical_price.rolling(window=period).apply(mad, raw=False) | |
| cci = (typical_price - sma_tp) / (0.015 * mean_deviation) | |
| return cci | |
| def atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: | |
| hl = high - low | |
| hc = np.abs(high - close.shift(1)) | |
| lc = np.abs(low - close.shift(1)) | |
| tr = pd.concat([hl, hc, lc], axis=1).max(axis=1) | |
| atr = tr.rolling(window=period).mean() | |
| return atr | |
| def ichimoku_cloud(high: pd.Series, low: pd.Series, conversion_period: int = 9, | |
| base_period: int = 26, span_b_period: int = 52, | |
| lagging_period: int = 26) -> Dict[str, pd.Series]: | |
| conversion_line = (high.rolling(window=conversion_period).max() + | |
| low.rolling(window=conversion_period).min()) / 2 | |
| base_line = (high.rolling(window=base_period).max() + | |
| low.rolling(window=base_period).min()) / 2 | |
| leading_span_a = ((conversion_line + base_line) / 2).shift(lagging_period) | |
| leading_span_b = ((high.rolling(window=span_b_period).max() + | |
| low.rolling(window=span_b_period).min()) / 2).shift(lagging_period) | |
| lagging_span = close.shift(-lagging_period) | |
| return { | |
| 'conversion_line': conversion_line, | |
| 'base_line': base_line, | |
| 'leading_span_a': leading_span_a, | |
| 'leading_span_b': leading_span_b, | |
| 'lagging_span': lagging_span | |
| } | |
| def fibonacci_retracements(high: pd.Series, low: pd.Series) -> Dict[str, float]: | |
| max_high = high.max() | |
| min_low = low.min() | |
| diff = max_high - min_low | |
| levels = { | |
| '0.0': max_high, | |
| '0.236': max_high - 0.236 * diff, | |
| '0.382': max_high - 0.382 * diff, | |
| '0.5': max_high - 0.5 * diff, | |
| '0.618': max_high - 0.618 * diff, | |
| '0.786': max_high - 0.786 * diff, | |
| '1.0': min_low | |
| } | |
| return levels | |
| def detect_crossover(fast_series: pd.Series, slow_series: pd.Series) -> Dict[str, bool]: | |
| crossed_above = (fast_series.shift(1) <= slow_series.shift(1)) & (fast_series > slow_series) | |
| crossed_below = (fast_series.shift(1) >= slow_series.shift(1)) & (fast_series < slow_series) | |
| return { | |
| 'crossed_above': crossed_above.iloc[-1] if not crossed_above.empty else False, | |
| 'crossed_below': crossed_below.iloc[-1] if not crossed_below.empty else False | |
| } | |
| def volume_profile(volume: pd.Series, price: pd.Series, bins: int = 50) -> Dict[str, Any]: | |
| price_min, price_max = price.min(), price.max() | |
| price_bins = np.linspace(price_min, price_max, bins) | |
| volume_profile = [] | |
| for i in range(len(price_bins) - 1): | |
| mask = (price >= price_bins[i]) & (price < price_bins[i + 1]) | |
| bin_volume = volume[mask].sum() | |
| volume_profile.append({ | |
| 'price_level': (price_bins[i] + price_bins[i + 1]) / 2, | |
| 'volume': bin_volume | |
| }) | |
| poc = max(volume_profile, key=lambda x: x['volume'])['price_level'] | |
| total_volume = sum(x['volume'] for x in volume_profile) | |
| sorted_profile = sorted(volume_profile, key=lambda x: x['volume'], reverse=True) | |
| cumulative_volume = 0 | |
| value_area_high = None | |
| value_area_low = None | |
| for level in sorted_profile: | |
| cumulative_volume += level['volume'] | |
| if cumulative_volume >= total_volume * 0.7: | |
| break | |
| in_value_area = sorted([x for x in volume_profile if x['volume'] > 0], | |
| key=lambda x: x['volume'], reverse=True)[:len([x for x in sorted_profile if cumulative_volume >= total_volume * 0.7])] | |
| if in_value_area: | |
| prices_in_va = [x['price_level'] for x in in_value_area] | |
| value_area_high = max(prices_in_va) | |
| value_area_low = min(prices_in_va) | |
| return { | |
| 'volume_profile': volume_profile, | |
| 'poc': poc, | |
| 'value_area_high': value_area_high, | |
| 'value_area_low': value_area_low | |
| } | |
| def order_flow_imbalance(orderbook: Dict) -> Dict[str, float]: | |
| if not orderbook or 'b' not in orderbook or 'a' not in orderbook: | |
| return {'imbalance': 0.0, 'bid_pressure': 0.0, 'ask_pressure': 0.0} | |
| bids = orderbook['b'][:10] | |
| asks = orderbook['a'][:10] | |
| bid_volume = sum(float(bid[1]) for bid in bids) | |
| ask_volume = sum(float(ask[1]) for ask in asks) | |
| total_volume = bid_volume + ask_volume | |
| if total_volume == 0: | |
| return {'imbalance': 0.0, 'bid_pressure': 0.0, 'ask_pressure': 0.0} | |
| imbalance = (bid_volume - ask_volume) / total_volume | |
| bid_pressure = bid_volume / total_volume | |
| ask_pressure = ask_volume / total_volume | |
| return { | |
| 'imbalance': imbalance, | |
| 'bid_pressure': bid_pressure, | |
| 'ask_pressure': ask_pressure | |
| } | |