File size: 10,272 Bytes
a1bf219
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
"""
Stochastic Oscillator calculation.

Provides Stochastic Oscillator calculation using TA-Lib as primary library with pandas-ta fallback.
The Stochastic Oscillator compares a closing price to its price range over a given time period.
"""

from typing import Optional, Tuple

import numpy as np
import pandas as pd


class IndicatorCalculationError(Exception):
    """Raised when indicator calculation fails."""

    pass


def calculate_stochastic(
    df: pd.DataFrame,
    k_period: int = 14,
    d_period: int = 3,
    smooth_k: int = 3,
    use_talib: bool = True,
) -> Tuple[pd.Series, pd.Series]:
    """
    Calculate Stochastic Oscillator (%K and %D).

    The Stochastic Oscillator consists of:
    - %K (Fast): ((Close - Lowest Low) / (Highest High - Lowest Low)) * 100
    - %D (Slow): Moving average of %K

    Traditional interpretation:
    - %K or %D > 80: Overbought
    - %K or %D < 20: Oversold
    - %K crosses above %D: Bullish signal
    - %K crosses below %D: Bearish signal

    Args:
        df: DataFrame with OHLC data
        k_period: Period for %K calculation (default: 14)
        d_period: Period for %D moving average (default: 3)
        smooth_k: Smoothing period for %K (default: 3, use 1 for fast stochastic)
        use_talib: Whether to try TA-Lib first (default: True)

    Returns:
        Tuple of (%K, %D) as Series

    Raises:
        IndicatorCalculationError: If calculation fails
    """
    required_cols = ["high", "low", "close"]
    missing_cols = [col for col in required_cols if col not in df.columns]
    if missing_cols:
        raise IndicatorCalculationError(f"Missing required columns: {missing_cols}")

    min_bars = k_period + d_period
    if len(df) < min_bars:
        raise IndicatorCalculationError(
            f"Insufficient data for Stochastic calculation (need {min_bars} bars, got {len(df)})"
        )

    high = df["high"].values
    low = df["low"].values
    close = df["close"].values

    # Try TA-Lib first if requested
    if use_talib:
        try:
            import talib

            slowk, slowd = talib.STOCH(
                high,
                low,
                close,
                fastk_period=k_period,
                slowk_period=smooth_k,
                slowk_matype=0,  # SMA
                slowd_period=d_period,
                slowd_matype=0,  # SMA
            )
            return (
                pd.Series(slowk, index=df.index, name=f"STOCH_K_{k_period}"),
                pd.Series(slowd, index=df.index, name=f"STOCH_D_{d_period}"),
            )
        except ImportError:
            pass  # Fall back to pandas-ta
        except Exception as e:
            pass  # TA-Lib error, fall back

    # Fall back to pandas-ta
    try:
        import pandas_ta as ta

        stoch_df = ta.stoch(
            df["high"],
            df["low"],
            df["close"],
            k=k_period,
            d=d_period,
            smooth_k=smooth_k,
        )
        if stoch_df is None or stoch_df.empty:
            raise IndicatorCalculationError(
                "pandas_ta.stoch returned None or empty DataFrame"
            )

        # pandas_ta returns DataFrame with columns: STOCHk_{k}_{d}_{smooth_k}, STOCHd_{k}_{d}_{smooth_k}
        k_col = f"STOCHk_{k_period}_{d_period}_{smooth_k}"
        d_col = f"STOCHd_{k_period}_{d_period}_{smooth_k}"

        return (
            stoch_df[k_col].rename(f"STOCH_K_{k_period}"),
            stoch_df[d_col].rename(f"STOCH_D_{d_period}"),
        )
    except ImportError as e:
        raise IndicatorCalculationError(
            "Neither TA-Lib nor pandas-ta is available. Install one: "
            "pip install TA-Lib or pip install pandas-ta"
        ) from e
    except Exception as e:
        pass  # Try manual calculation

    # Manual Stochastic calculation as last resort
    try:
        k, d = _calculate_stochastic_manual(
            high, low, close, k_period, d_period, smooth_k
        )
        return (
            pd.Series(k, index=df.index, name=f"STOCH_K_{k_period}"),
            pd.Series(d, index=df.index, name=f"STOCH_D_{d_period}"),
        )
    except Exception as e:
        raise IndicatorCalculationError(
            f"Stochastic calculation failed: {str(e)}"
        ) from e


def _calculate_stochastic_manual(
    high: np.ndarray,
    low: np.ndarray,
    close: np.ndarray,
    k_period: int,
    d_period: int,
    smooth_k: int,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Manual Stochastic Oscillator calculation.

    Args:
        high: High prices array
        low: Low prices array
        close: Close prices array
        k_period: Period for %K
        d_period: Period for %D
        smooth_k: Smoothing for %K

    Returns:
        Tuple of (%K, %D) arrays
    """
    n = len(close)
    fast_k = np.full(n, np.nan)

    # Calculate Fast %K
    for i in range(k_period - 1, n):
        period_high = np.max(high[i - k_period + 1 : i + 1])
        period_low = np.min(low[i - k_period + 1 : i + 1])

        if period_high == period_low:
            fast_k[i] = 50.0  # Avoid division by zero
        else:
            fast_k[i] = ((close[i] - period_low) / (period_high - period_low)) * 100.0

    # Smooth %K if smooth_k > 1
    if smooth_k > 1:
        slow_k = _simple_moving_average(fast_k, smooth_k)
    else:
        slow_k = fast_k

    # Calculate %D (SMA of %K)
    slow_d = _simple_moving_average(slow_k, d_period)

    return slow_k, slow_d


def _simple_moving_average(values: np.ndarray, period: int) -> np.ndarray:
    """
    Calculate Simple Moving Average.

    Args:
        values: Input array
        period: SMA period

    Returns:
        SMA array
    """
    sma = np.full(len(values), np.nan)

    for i in range(period - 1, len(values)):
        if not np.isnan(values[i - period + 1 : i + 1]).any():
            sma[i] = np.mean(values[i - period + 1 : i + 1])

    return sma


def interpret_stochastic(
    k_value: float,
    d_value: float,
    prev_k: Optional[float] = None,
    prev_d: Optional[float] = None,
    overbought: float = 80,
    oversold: float = 20,
) -> str:
    """
    Interpret Stochastic Oscillator values.

    Args:
        k_value: Current %K value
        d_value: Current %D value
        prev_k: Previous %K value for crossover detection
        prev_d: Previous %D value for crossover detection
        overbought: Overbought threshold (default: 80)
        oversold: Oversold threshold (default: 20)

    Returns:
        Interpretation string
    """
    if np.isnan(k_value) or np.isnan(d_value):
        return "Insufficient data"

    # Detect crossovers
    if prev_k is not None and prev_d is not None:
        if not np.isnan(prev_k) and not np.isnan(prev_d):
            # Bullish crossover: %K crosses above %D
            if prev_k < prev_d and k_value > d_value:
                if k_value < oversold:
                    return f"Strong bullish signal (%K crossed above %D in oversold zone: {k_value:.2f})"
                else:
                    return f"Bullish crossover (%K: {k_value:.2f}, %D: {d_value:.2f})"

            # Bearish crossover: %K crosses below %D
            elif prev_k > prev_d and k_value < d_value:
                if k_value > overbought:
                    return f"Strong bearish signal (%K crossed below %D in overbought zone: {k_value:.2f})"
                else:
                    return f"Bearish crossover (%K: {k_value:.2f}, %D: {d_value:.2f})"

    # General interpretation based on zones
    if k_value > overbought:
        return f"Overbought (%K: {k_value:.2f}, %D: {d_value:.2f})"
    elif k_value < oversold:
        return f"Oversold (%K: {k_value:.2f}, %D: {d_value:.2f})"
    else:
        trend = "bullish" if k_value > d_value else "bearish"
        return f"Neutral ({trend}, %K: {k_value:.2f}, %D: {d_value:.2f})"


def find_stochastic_crossovers(
    k_series: pd.Series,
    d_series: pd.Series,
) -> dict:
    """
    Find bullish and bearish Stochastic crossovers.

    Args:
        k_series: %K series
        d_series: %D series

    Returns:
        Dict with 'bullish' and 'bearish' crossover indices
    """
    bullish = []
    bearish = []

    k = k_series.values
    d = d_series.values

    for i in range(1, len(k)):
        if (
            not np.isnan(k[i])
            and not np.isnan(d[i])
            and not np.isnan(k[i - 1])
            and not np.isnan(d[i - 1])
        ):
            # Bullish crossover: %K crosses above %D
            if k[i - 1] < d[i - 1] and k[i] > d[i]:
                bullish.append(i)
            # Bearish crossover: %K crosses below %D
            elif k[i - 1] > d[i - 1] and k[i] < d[i]:
                bearish.append(i)

    return {
        "bullish": bullish,
        "bearish": bearish,
    }


def find_stochastic_divergence(
    df: pd.DataFrame,
    k_series: pd.Series,
    window: int = 14,
) -> dict:
    """
    Detect bullish and bearish Stochastic divergences.

    Args:
        df: OHLC DataFrame
        k_series: %K series
        window: Window for finding local extrema

    Returns:
        Dict with 'bullish' and 'bearish' divergence indices
    """
    bullish = []
    bearish = []

    prices = df["close"].values
    k = k_series.values

    for i in range(window, len(df) - window):
        # Bullish divergence
        if prices[i] == np.min(prices[i - window : i + window]):
            for j in range(i - 2 * window, i - window):
                if j >= 0 and prices[j] == np.min(prices[j - window : j + window]):
                    if not np.isnan(k[i]) and not np.isnan(k[j]):
                        if prices[i] < prices[j] and k[i] > k[j]:
                            bullish.append(i)
                    break

        # Bearish divergence
        if prices[i] == np.max(prices[i - window : i + window]):
            for j in range(i - 2 * window, i - window):
                if j >= 0 and prices[j] == np.max(prices[j - window : j + window]):
                    if not np.isnan(k[i]) and not np.isnan(k[j]):
                        if prices[i] > prices[j] and k[i] < k[j]:
                            bearish.append(i)
                    break

    return {
        "bullish": bullish,
        "bearish": bearish,
    }