""" Stochastic Oscillator calculation. Provides Stochastic Oscillator calculation using TA-Lib as primary library with pandas-ta fallback. The Stochastic Oscillator compares a closing price to its price range over a given time period. """ from typing import Optional, Tuple import numpy as np import pandas as pd class IndicatorCalculationError(Exception): """Raised when indicator calculation fails.""" pass def calculate_stochastic( df: pd.DataFrame, k_period: int = 14, d_period: int = 3, smooth_k: int = 3, use_talib: bool = True, ) -> Tuple[pd.Series, pd.Series]: """ Calculate Stochastic Oscillator (%K and %D). The Stochastic Oscillator consists of: - %K (Fast): ((Close - Lowest Low) / (Highest High - Lowest Low)) * 100 - %D (Slow): Moving average of %K Traditional interpretation: - %K or %D > 80: Overbought - %K or %D < 20: Oversold - %K crosses above %D: Bullish signal - %K crosses below %D: Bearish signal Args: df: DataFrame with OHLC data k_period: Period for %K calculation (default: 14) d_period: Period for %D moving average (default: 3) smooth_k: Smoothing period for %K (default: 3, use 1 for fast stochastic) use_talib: Whether to try TA-Lib first (default: True) Returns: Tuple of (%K, %D) as Series Raises: IndicatorCalculationError: If calculation fails """ required_cols = ["high", "low", "close"] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: raise IndicatorCalculationError(f"Missing required columns: {missing_cols}") min_bars = k_period + d_period if len(df) < min_bars: raise IndicatorCalculationError( f"Insufficient data for Stochastic calculation (need {min_bars} bars, got {len(df)})" ) high = df["high"].values low = df["low"].values close = df["close"].values # Try TA-Lib first if requested if use_talib: try: import talib slowk, slowd = talib.STOCH( high, low, close, fastk_period=k_period, slowk_period=smooth_k, slowk_matype=0, # SMA slowd_period=d_period, slowd_matype=0, # SMA ) return ( pd.Series(slowk, index=df.index, name=f"STOCH_K_{k_period}"), pd.Series(slowd, index=df.index, name=f"STOCH_D_{d_period}"), ) except ImportError: pass # Fall back to pandas-ta except Exception as e: pass # TA-Lib error, fall back # Fall back to pandas-ta try: import pandas_ta as ta stoch_df = ta.stoch( df["high"], df["low"], df["close"], k=k_period, d=d_period, smooth_k=smooth_k, ) if stoch_df is None or stoch_df.empty: raise IndicatorCalculationError( "pandas_ta.stoch returned None or empty DataFrame" ) # pandas_ta returns DataFrame with columns: STOCHk_{k}_{d}_{smooth_k}, STOCHd_{k}_{d}_{smooth_k} k_col = f"STOCHk_{k_period}_{d_period}_{smooth_k}" d_col = f"STOCHd_{k_period}_{d_period}_{smooth_k}" return ( stoch_df[k_col].rename(f"STOCH_K_{k_period}"), stoch_df[d_col].rename(f"STOCH_D_{d_period}"), ) except ImportError as e: raise IndicatorCalculationError( "Neither TA-Lib nor pandas-ta is available. Install one: " "pip install TA-Lib or pip install pandas-ta" ) from e except Exception as e: pass # Try manual calculation # Manual Stochastic calculation as last resort try: k, d = _calculate_stochastic_manual( high, low, close, k_period, d_period, smooth_k ) return ( pd.Series(k, index=df.index, name=f"STOCH_K_{k_period}"), pd.Series(d, index=df.index, name=f"STOCH_D_{d_period}"), ) except Exception as e: raise IndicatorCalculationError( f"Stochastic calculation failed: {str(e)}" ) from e def _calculate_stochastic_manual( high: np.ndarray, low: np.ndarray, close: np.ndarray, k_period: int, d_period: int, smooth_k: int, ) -> Tuple[np.ndarray, np.ndarray]: """ Manual Stochastic Oscillator calculation. Args: high: High prices array low: Low prices array close: Close prices array k_period: Period for %K d_period: Period for %D smooth_k: Smoothing for %K Returns: Tuple of (%K, %D) arrays """ n = len(close) fast_k = np.full(n, np.nan) # Calculate Fast %K for i in range(k_period - 1, n): period_high = np.max(high[i - k_period + 1 : i + 1]) period_low = np.min(low[i - k_period + 1 : i + 1]) if period_high == period_low: fast_k[i] = 50.0 # Avoid division by zero else: fast_k[i] = ((close[i] - period_low) / (period_high - period_low)) * 100.0 # Smooth %K if smooth_k > 1 if smooth_k > 1: slow_k = _simple_moving_average(fast_k, smooth_k) else: slow_k = fast_k # Calculate %D (SMA of %K) slow_d = _simple_moving_average(slow_k, d_period) return slow_k, slow_d def _simple_moving_average(values: np.ndarray, period: int) -> np.ndarray: """ Calculate Simple Moving Average. Args: values: Input array period: SMA period Returns: SMA array """ sma = np.full(len(values), np.nan) for i in range(period - 1, len(values)): if not np.isnan(values[i - period + 1 : i + 1]).any(): sma[i] = np.mean(values[i - period + 1 : i + 1]) return sma def interpret_stochastic( k_value: float, d_value: float, prev_k: Optional[float] = None, prev_d: Optional[float] = None, overbought: float = 80, oversold: float = 20, ) -> str: """ Interpret Stochastic Oscillator values. Args: k_value: Current %K value d_value: Current %D value prev_k: Previous %K value for crossover detection prev_d: Previous %D value for crossover detection overbought: Overbought threshold (default: 80) oversold: Oversold threshold (default: 20) Returns: Interpretation string """ if np.isnan(k_value) or np.isnan(d_value): return "Insufficient data" # Detect crossovers if prev_k is not None and prev_d is not None: if not np.isnan(prev_k) and not np.isnan(prev_d): # Bullish crossover: %K crosses above %D if prev_k < prev_d and k_value > d_value: if k_value < oversold: return f"Strong bullish signal (%K crossed above %D in oversold zone: {k_value:.2f})" else: return f"Bullish crossover (%K: {k_value:.2f}, %D: {d_value:.2f})" # Bearish crossover: %K crosses below %D elif prev_k > prev_d and k_value < d_value: if k_value > overbought: return f"Strong bearish signal (%K crossed below %D in overbought zone: {k_value:.2f})" else: return f"Bearish crossover (%K: {k_value:.2f}, %D: {d_value:.2f})" # General interpretation based on zones if k_value > overbought: return f"Overbought (%K: {k_value:.2f}, %D: {d_value:.2f})" elif k_value < oversold: return f"Oversold (%K: {k_value:.2f}, %D: {d_value:.2f})" else: trend = "bullish" if k_value > d_value else "bearish" return f"Neutral ({trend}, %K: {k_value:.2f}, %D: {d_value:.2f})" def find_stochastic_crossovers( k_series: pd.Series, d_series: pd.Series, ) -> dict: """ Find bullish and bearish Stochastic crossovers. Args: k_series: %K series d_series: %D series Returns: Dict with 'bullish' and 'bearish' crossover indices """ bullish = [] bearish = [] k = k_series.values d = d_series.values for i in range(1, len(k)): if ( not np.isnan(k[i]) and not np.isnan(d[i]) and not np.isnan(k[i - 1]) and not np.isnan(d[i - 1]) ): # Bullish crossover: %K crosses above %D if k[i - 1] < d[i - 1] and k[i] > d[i]: bullish.append(i) # Bearish crossover: %K crosses below %D elif k[i - 1] > d[i - 1] and k[i] < d[i]: bearish.append(i) return { "bullish": bullish, "bearish": bearish, } def find_stochastic_divergence( df: pd.DataFrame, k_series: pd.Series, window: int = 14, ) -> dict: """ Detect bullish and bearish Stochastic divergences. Args: df: OHLC DataFrame k_series: %K series window: Window for finding local extrema Returns: Dict with 'bullish' and 'bearish' divergence indices """ bullish = [] bearish = [] prices = df["close"].values k = k_series.values for i in range(window, len(df) - window): # Bullish divergence if prices[i] == np.min(prices[i - window : i + window]): for j in range(i - 2 * window, i - window): if j >= 0 and prices[j] == np.min(prices[j - window : j + window]): if not np.isnan(k[i]) and not np.isnan(k[j]): if prices[i] < prices[j] and k[i] > k[j]: bullish.append(i) break # Bearish divergence if prices[i] == np.max(prices[i - window : i + window]): for j in range(i - 2 * window, i - window): if j >= 0 and prices[j] == np.max(prices[j - window : j + window]): if not np.isnan(k[i]) and not np.isnan(k[j]): if prices[i] > prices[j] and k[i] < k[j]: bearish.append(i) break return { "bullish": bullish, "bearish": bearish, }