Deploy Bot
Deploy Trading Analysis Platform to HuggingFace Spaces
a1bf219
"""
Stochastic Oscillator calculation.
Provides Stochastic Oscillator calculation using TA-Lib as primary library with pandas-ta fallback.
The Stochastic Oscillator compares a closing price to its price range over a given time period.
"""
from typing import Optional, Tuple
import numpy as np
import pandas as pd
class IndicatorCalculationError(Exception):
"""Raised when indicator calculation fails."""
pass
def calculate_stochastic(
df: pd.DataFrame,
k_period: int = 14,
d_period: int = 3,
smooth_k: int = 3,
use_talib: bool = True,
) -> Tuple[pd.Series, pd.Series]:
"""
Calculate Stochastic Oscillator (%K and %D).
The Stochastic Oscillator consists of:
- %K (Fast): ((Close - Lowest Low) / (Highest High - Lowest Low)) * 100
- %D (Slow): Moving average of %K
Traditional interpretation:
- %K or %D > 80: Overbought
- %K or %D < 20: Oversold
- %K crosses above %D: Bullish signal
- %K crosses below %D: Bearish signal
Args:
df: DataFrame with OHLC data
k_period: Period for %K calculation (default: 14)
d_period: Period for %D moving average (default: 3)
smooth_k: Smoothing period for %K (default: 3, use 1 for fast stochastic)
use_talib: Whether to try TA-Lib first (default: True)
Returns:
Tuple of (%K, %D) as Series
Raises:
IndicatorCalculationError: If calculation fails
"""
required_cols = ["high", "low", "close"]
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise IndicatorCalculationError(f"Missing required columns: {missing_cols}")
min_bars = k_period + d_period
if len(df) < min_bars:
raise IndicatorCalculationError(
f"Insufficient data for Stochastic calculation (need {min_bars} bars, got {len(df)})"
)
high = df["high"].values
low = df["low"].values
close = df["close"].values
# Try TA-Lib first if requested
if use_talib:
try:
import talib
slowk, slowd = talib.STOCH(
high,
low,
close,
fastk_period=k_period,
slowk_period=smooth_k,
slowk_matype=0, # SMA
slowd_period=d_period,
slowd_matype=0, # SMA
)
return (
pd.Series(slowk, index=df.index, name=f"STOCH_K_{k_period}"),
pd.Series(slowd, index=df.index, name=f"STOCH_D_{d_period}"),
)
except ImportError:
pass # Fall back to pandas-ta
except Exception as e:
pass # TA-Lib error, fall back
# Fall back to pandas-ta
try:
import pandas_ta as ta
stoch_df = ta.stoch(
df["high"],
df["low"],
df["close"],
k=k_period,
d=d_period,
smooth_k=smooth_k,
)
if stoch_df is None or stoch_df.empty:
raise IndicatorCalculationError(
"pandas_ta.stoch returned None or empty DataFrame"
)
# pandas_ta returns DataFrame with columns: STOCHk_{k}_{d}_{smooth_k}, STOCHd_{k}_{d}_{smooth_k}
k_col = f"STOCHk_{k_period}_{d_period}_{smooth_k}"
d_col = f"STOCHd_{k_period}_{d_period}_{smooth_k}"
return (
stoch_df[k_col].rename(f"STOCH_K_{k_period}"),
stoch_df[d_col].rename(f"STOCH_D_{d_period}"),
)
except ImportError as e:
raise IndicatorCalculationError(
"Neither TA-Lib nor pandas-ta is available. Install one: "
"pip install TA-Lib or pip install pandas-ta"
) from e
except Exception as e:
pass # Try manual calculation
# Manual Stochastic calculation as last resort
try:
k, d = _calculate_stochastic_manual(
high, low, close, k_period, d_period, smooth_k
)
return (
pd.Series(k, index=df.index, name=f"STOCH_K_{k_period}"),
pd.Series(d, index=df.index, name=f"STOCH_D_{d_period}"),
)
except Exception as e:
raise IndicatorCalculationError(
f"Stochastic calculation failed: {str(e)}"
) from e
def _calculate_stochastic_manual(
high: np.ndarray,
low: np.ndarray,
close: np.ndarray,
k_period: int,
d_period: int,
smooth_k: int,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Manual Stochastic Oscillator calculation.
Args:
high: High prices array
low: Low prices array
close: Close prices array
k_period: Period for %K
d_period: Period for %D
smooth_k: Smoothing for %K
Returns:
Tuple of (%K, %D) arrays
"""
n = len(close)
fast_k = np.full(n, np.nan)
# Calculate Fast %K
for i in range(k_period - 1, n):
period_high = np.max(high[i - k_period + 1 : i + 1])
period_low = np.min(low[i - k_period + 1 : i + 1])
if period_high == period_low:
fast_k[i] = 50.0 # Avoid division by zero
else:
fast_k[i] = ((close[i] - period_low) / (period_high - period_low)) * 100.0
# Smooth %K if smooth_k > 1
if smooth_k > 1:
slow_k = _simple_moving_average(fast_k, smooth_k)
else:
slow_k = fast_k
# Calculate %D (SMA of %K)
slow_d = _simple_moving_average(slow_k, d_period)
return slow_k, slow_d
def _simple_moving_average(values: np.ndarray, period: int) -> np.ndarray:
"""
Calculate Simple Moving Average.
Args:
values: Input array
period: SMA period
Returns:
SMA array
"""
sma = np.full(len(values), np.nan)
for i in range(period - 1, len(values)):
if not np.isnan(values[i - period + 1 : i + 1]).any():
sma[i] = np.mean(values[i - period + 1 : i + 1])
return sma
def interpret_stochastic(
k_value: float,
d_value: float,
prev_k: Optional[float] = None,
prev_d: Optional[float] = None,
overbought: float = 80,
oversold: float = 20,
) -> str:
"""
Interpret Stochastic Oscillator values.
Args:
k_value: Current %K value
d_value: Current %D value
prev_k: Previous %K value for crossover detection
prev_d: Previous %D value for crossover detection
overbought: Overbought threshold (default: 80)
oversold: Oversold threshold (default: 20)
Returns:
Interpretation string
"""
if np.isnan(k_value) or np.isnan(d_value):
return "Insufficient data"
# Detect crossovers
if prev_k is not None and prev_d is not None:
if not np.isnan(prev_k) and not np.isnan(prev_d):
# Bullish crossover: %K crosses above %D
if prev_k < prev_d and k_value > d_value:
if k_value < oversold:
return f"Strong bullish signal (%K crossed above %D in oversold zone: {k_value:.2f})"
else:
return f"Bullish crossover (%K: {k_value:.2f}, %D: {d_value:.2f})"
# Bearish crossover: %K crosses below %D
elif prev_k > prev_d and k_value < d_value:
if k_value > overbought:
return f"Strong bearish signal (%K crossed below %D in overbought zone: {k_value:.2f})"
else:
return f"Bearish crossover (%K: {k_value:.2f}, %D: {d_value:.2f})"
# General interpretation based on zones
if k_value > overbought:
return f"Overbought (%K: {k_value:.2f}, %D: {d_value:.2f})"
elif k_value < oversold:
return f"Oversold (%K: {k_value:.2f}, %D: {d_value:.2f})"
else:
trend = "bullish" if k_value > d_value else "bearish"
return f"Neutral ({trend}, %K: {k_value:.2f}, %D: {d_value:.2f})"
def find_stochastic_crossovers(
k_series: pd.Series,
d_series: pd.Series,
) -> dict:
"""
Find bullish and bearish Stochastic crossovers.
Args:
k_series: %K series
d_series: %D series
Returns:
Dict with 'bullish' and 'bearish' crossover indices
"""
bullish = []
bearish = []
k = k_series.values
d = d_series.values
for i in range(1, len(k)):
if (
not np.isnan(k[i])
and not np.isnan(d[i])
and not np.isnan(k[i - 1])
and not np.isnan(d[i - 1])
):
# Bullish crossover: %K crosses above %D
if k[i - 1] < d[i - 1] and k[i] > d[i]:
bullish.append(i)
# Bearish crossover: %K crosses below %D
elif k[i - 1] > d[i - 1] and k[i] < d[i]:
bearish.append(i)
return {
"bullish": bullish,
"bearish": bearish,
}
def find_stochastic_divergence(
df: pd.DataFrame,
k_series: pd.Series,
window: int = 14,
) -> dict:
"""
Detect bullish and bearish Stochastic divergences.
Args:
df: OHLC DataFrame
k_series: %K series
window: Window for finding local extrema
Returns:
Dict with 'bullish' and 'bearish' divergence indices
"""
bullish = []
bearish = []
prices = df["close"].values
k = k_series.values
for i in range(window, len(df) - window):
# Bullish divergence
if prices[i] == np.min(prices[i - window : i + window]):
for j in range(i - 2 * window, i - window):
if j >= 0 and prices[j] == np.min(prices[j - window : j + window]):
if not np.isnan(k[i]) and not np.isnan(k[j]):
if prices[i] < prices[j] and k[i] > k[j]:
bullish.append(i)
break
# Bearish divergence
if prices[i] == np.max(prices[i - window : i + window]):
for j in range(i - 2 * window, i - window):
if j >= 0 and prices[j] == np.max(prices[j - window : j + window]):
if not np.isnan(k[i]) and not np.isnan(k[j]):
if prices[i] > prices[j] and k[i] < k[j]:
bearish.append(i)
break
return {
"bullish": bullish,
"bearish": bearish,
}