Spaces:
Sleeping
Sleeping
| """ | |
| Stochastic Oscillator calculation. | |
| Provides Stochastic Oscillator calculation using TA-Lib as primary library with pandas-ta fallback. | |
| The Stochastic Oscillator compares a closing price to its price range over a given time period. | |
| """ | |
| from typing import Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| class IndicatorCalculationError(Exception): | |
| """Raised when indicator calculation fails.""" | |
| pass | |
| def calculate_stochastic( | |
| df: pd.DataFrame, | |
| k_period: int = 14, | |
| d_period: int = 3, | |
| smooth_k: int = 3, | |
| use_talib: bool = True, | |
| ) -> Tuple[pd.Series, pd.Series]: | |
| """ | |
| Calculate Stochastic Oscillator (%K and %D). | |
| The Stochastic Oscillator consists of: | |
| - %K (Fast): ((Close - Lowest Low) / (Highest High - Lowest Low)) * 100 | |
| - %D (Slow): Moving average of %K | |
| Traditional interpretation: | |
| - %K or %D > 80: Overbought | |
| - %K or %D < 20: Oversold | |
| - %K crosses above %D: Bullish signal | |
| - %K crosses below %D: Bearish signal | |
| Args: | |
| df: DataFrame with OHLC data | |
| k_period: Period for %K calculation (default: 14) | |
| d_period: Period for %D moving average (default: 3) | |
| smooth_k: Smoothing period for %K (default: 3, use 1 for fast stochastic) | |
| use_talib: Whether to try TA-Lib first (default: True) | |
| Returns: | |
| Tuple of (%K, %D) as Series | |
| Raises: | |
| IndicatorCalculationError: If calculation fails | |
| """ | |
| required_cols = ["high", "low", "close"] | |
| missing_cols = [col for col in required_cols if col not in df.columns] | |
| if missing_cols: | |
| raise IndicatorCalculationError(f"Missing required columns: {missing_cols}") | |
| min_bars = k_period + d_period | |
| if len(df) < min_bars: | |
| raise IndicatorCalculationError( | |
| f"Insufficient data for Stochastic calculation (need {min_bars} bars, got {len(df)})" | |
| ) | |
| high = df["high"].values | |
| low = df["low"].values | |
| close = df["close"].values | |
| # Try TA-Lib first if requested | |
| if use_talib: | |
| try: | |
| import talib | |
| slowk, slowd = talib.STOCH( | |
| high, | |
| low, | |
| close, | |
| fastk_period=k_period, | |
| slowk_period=smooth_k, | |
| slowk_matype=0, # SMA | |
| slowd_period=d_period, | |
| slowd_matype=0, # SMA | |
| ) | |
| return ( | |
| pd.Series(slowk, index=df.index, name=f"STOCH_K_{k_period}"), | |
| pd.Series(slowd, index=df.index, name=f"STOCH_D_{d_period}"), | |
| ) | |
| except ImportError: | |
| pass # Fall back to pandas-ta | |
| except Exception as e: | |
| pass # TA-Lib error, fall back | |
| # Fall back to pandas-ta | |
| try: | |
| import pandas_ta as ta | |
| stoch_df = ta.stoch( | |
| df["high"], | |
| df["low"], | |
| df["close"], | |
| k=k_period, | |
| d=d_period, | |
| smooth_k=smooth_k, | |
| ) | |
| if stoch_df is None or stoch_df.empty: | |
| raise IndicatorCalculationError( | |
| "pandas_ta.stoch returned None or empty DataFrame" | |
| ) | |
| # pandas_ta returns DataFrame with columns: STOCHk_{k}_{d}_{smooth_k}, STOCHd_{k}_{d}_{smooth_k} | |
| k_col = f"STOCHk_{k_period}_{d_period}_{smooth_k}" | |
| d_col = f"STOCHd_{k_period}_{d_period}_{smooth_k}" | |
| return ( | |
| stoch_df[k_col].rename(f"STOCH_K_{k_period}"), | |
| stoch_df[d_col].rename(f"STOCH_D_{d_period}"), | |
| ) | |
| except ImportError as e: | |
| raise IndicatorCalculationError( | |
| "Neither TA-Lib nor pandas-ta is available. Install one: " | |
| "pip install TA-Lib or pip install pandas-ta" | |
| ) from e | |
| except Exception as e: | |
| pass # Try manual calculation | |
| # Manual Stochastic calculation as last resort | |
| try: | |
| k, d = _calculate_stochastic_manual( | |
| high, low, close, k_period, d_period, smooth_k | |
| ) | |
| return ( | |
| pd.Series(k, index=df.index, name=f"STOCH_K_{k_period}"), | |
| pd.Series(d, index=df.index, name=f"STOCH_D_{d_period}"), | |
| ) | |
| except Exception as e: | |
| raise IndicatorCalculationError( | |
| f"Stochastic calculation failed: {str(e)}" | |
| ) from e | |
| def _calculate_stochastic_manual( | |
| high: np.ndarray, | |
| low: np.ndarray, | |
| close: np.ndarray, | |
| k_period: int, | |
| d_period: int, | |
| smooth_k: int, | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Manual Stochastic Oscillator calculation. | |
| Args: | |
| high: High prices array | |
| low: Low prices array | |
| close: Close prices array | |
| k_period: Period for %K | |
| d_period: Period for %D | |
| smooth_k: Smoothing for %K | |
| Returns: | |
| Tuple of (%K, %D) arrays | |
| """ | |
| n = len(close) | |
| fast_k = np.full(n, np.nan) | |
| # Calculate Fast %K | |
| for i in range(k_period - 1, n): | |
| period_high = np.max(high[i - k_period + 1 : i + 1]) | |
| period_low = np.min(low[i - k_period + 1 : i + 1]) | |
| if period_high == period_low: | |
| fast_k[i] = 50.0 # Avoid division by zero | |
| else: | |
| fast_k[i] = ((close[i] - period_low) / (period_high - period_low)) * 100.0 | |
| # Smooth %K if smooth_k > 1 | |
| if smooth_k > 1: | |
| slow_k = _simple_moving_average(fast_k, smooth_k) | |
| else: | |
| slow_k = fast_k | |
| # Calculate %D (SMA of %K) | |
| slow_d = _simple_moving_average(slow_k, d_period) | |
| return slow_k, slow_d | |
| def _simple_moving_average(values: np.ndarray, period: int) -> np.ndarray: | |
| """ | |
| Calculate Simple Moving Average. | |
| Args: | |
| values: Input array | |
| period: SMA period | |
| Returns: | |
| SMA array | |
| """ | |
| sma = np.full(len(values), np.nan) | |
| for i in range(period - 1, len(values)): | |
| if not np.isnan(values[i - period + 1 : i + 1]).any(): | |
| sma[i] = np.mean(values[i - period + 1 : i + 1]) | |
| return sma | |
| def interpret_stochastic( | |
| k_value: float, | |
| d_value: float, | |
| prev_k: Optional[float] = None, | |
| prev_d: Optional[float] = None, | |
| overbought: float = 80, | |
| oversold: float = 20, | |
| ) -> str: | |
| """ | |
| Interpret Stochastic Oscillator values. | |
| Args: | |
| k_value: Current %K value | |
| d_value: Current %D value | |
| prev_k: Previous %K value for crossover detection | |
| prev_d: Previous %D value for crossover detection | |
| overbought: Overbought threshold (default: 80) | |
| oversold: Oversold threshold (default: 20) | |
| Returns: | |
| Interpretation string | |
| """ | |
| if np.isnan(k_value) or np.isnan(d_value): | |
| return "Insufficient data" | |
| # Detect crossovers | |
| if prev_k is not None and prev_d is not None: | |
| if not np.isnan(prev_k) and not np.isnan(prev_d): | |
| # Bullish crossover: %K crosses above %D | |
| if prev_k < prev_d and k_value > d_value: | |
| if k_value < oversold: | |
| return f"Strong bullish signal (%K crossed above %D in oversold zone: {k_value:.2f})" | |
| else: | |
| return f"Bullish crossover (%K: {k_value:.2f}, %D: {d_value:.2f})" | |
| # Bearish crossover: %K crosses below %D | |
| elif prev_k > prev_d and k_value < d_value: | |
| if k_value > overbought: | |
| return f"Strong bearish signal (%K crossed below %D in overbought zone: {k_value:.2f})" | |
| else: | |
| return f"Bearish crossover (%K: {k_value:.2f}, %D: {d_value:.2f})" | |
| # General interpretation based on zones | |
| if k_value > overbought: | |
| return f"Overbought (%K: {k_value:.2f}, %D: {d_value:.2f})" | |
| elif k_value < oversold: | |
| return f"Oversold (%K: {k_value:.2f}, %D: {d_value:.2f})" | |
| else: | |
| trend = "bullish" if k_value > d_value else "bearish" | |
| return f"Neutral ({trend}, %K: {k_value:.2f}, %D: {d_value:.2f})" | |
| def find_stochastic_crossovers( | |
| k_series: pd.Series, | |
| d_series: pd.Series, | |
| ) -> dict: | |
| """ | |
| Find bullish and bearish Stochastic crossovers. | |
| Args: | |
| k_series: %K series | |
| d_series: %D series | |
| Returns: | |
| Dict with 'bullish' and 'bearish' crossover indices | |
| """ | |
| bullish = [] | |
| bearish = [] | |
| k = k_series.values | |
| d = d_series.values | |
| for i in range(1, len(k)): | |
| if ( | |
| not np.isnan(k[i]) | |
| and not np.isnan(d[i]) | |
| and not np.isnan(k[i - 1]) | |
| and not np.isnan(d[i - 1]) | |
| ): | |
| # Bullish crossover: %K crosses above %D | |
| if k[i - 1] < d[i - 1] and k[i] > d[i]: | |
| bullish.append(i) | |
| # Bearish crossover: %K crosses below %D | |
| elif k[i - 1] > d[i - 1] and k[i] < d[i]: | |
| bearish.append(i) | |
| return { | |
| "bullish": bullish, | |
| "bearish": bearish, | |
| } | |
| def find_stochastic_divergence( | |
| df: pd.DataFrame, | |
| k_series: pd.Series, | |
| window: int = 14, | |
| ) -> dict: | |
| """ | |
| Detect bullish and bearish Stochastic divergences. | |
| Args: | |
| df: OHLC DataFrame | |
| k_series: %K series | |
| window: Window for finding local extrema | |
| Returns: | |
| Dict with 'bullish' and 'bearish' divergence indices | |
| """ | |
| bullish = [] | |
| bearish = [] | |
| prices = df["close"].values | |
| k = k_series.values | |
| for i in range(window, len(df) - window): | |
| # Bullish divergence | |
| if prices[i] == np.min(prices[i - window : i + window]): | |
| for j in range(i - 2 * window, i - window): | |
| if j >= 0 and prices[j] == np.min(prices[j - window : j + window]): | |
| if not np.isnan(k[i]) and not np.isnan(k[j]): | |
| if prices[i] < prices[j] and k[i] > k[j]: | |
| bullish.append(i) | |
| break | |
| # Bearish divergence | |
| if prices[i] == np.max(prices[i - window : i + window]): | |
| for j in range(i - 2 * window, i - window): | |
| if j >= 0 and prices[j] == np.max(prices[j - window : j + window]): | |
| if not np.isnan(k[i]) and not np.isnan(k[j]): | |
| if prices[i] > prices[j] and k[i] < k[j]: | |
| bearish.append(i) | |
| break | |
| return { | |
| "bullish": bullish, | |
| "bearish": bearish, | |
| } | |