File size: 9,412 Bytes
a1bf219
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
"""
MACD (Moving Average Convergence Divergence) calculation.

Provides MACD calculation using TA-Lib as primary library with pandas-ta fallback.
MACD shows the relationship between two moving averages of prices.
"""

from typing import Optional, Tuple

import numpy as np
import pandas as pd


class IndicatorCalculationError(Exception):
    """Raised when indicator calculation fails."""

    pass


def calculate_macd(
    df: pd.DataFrame,
    fast_period: int = 12,
    slow_period: int = 26,
    signal_period: int = 9,
    column: str = "close",
    use_talib: bool = True,
) -> Tuple[pd.Series, pd.Series, pd.Series]:
    """
    Calculate MACD (Moving Average Convergence Divergence).

    MACD consists of:
    - MACD Line: (12-day EMA - 26-day EMA)
    - Signal Line: 9-day EMA of MACD Line
    - Histogram: MACD Line - Signal Line

    Args:
        df: DataFrame with OHLC data
        fast_period: Fast EMA period (default: 12)
        slow_period: Slow EMA period (default: 26)
        signal_period: Signal line EMA period (default: 9)
        column: Column to calculate MACD on (default: Close)
        use_talib: Whether to try TA-Lib first (default: True)

    Returns:
        Tuple of (macd_line, signal_line, histogram) as Series

    Raises:
        IndicatorCalculationError: If calculation fails
    """
    if column not in df.columns:
        raise IndicatorCalculationError(f"Column '{column}' not found in DataFrame")

    min_bars = slow_period + signal_period
    if len(df) < min_bars:
        raise IndicatorCalculationError(
            f"Insufficient data for MACD calculation (need {min_bars} bars, got {len(df)})"
        )

    prices = df[column].values

    # Try TA-Lib first if requested
    if use_talib:
        try:
            import talib

            macd, signal, hist = talib.MACD(
                prices,
                fastperiod=fast_period,
                slowperiod=slow_period,
                signalperiod=signal_period,
            )
            return (
                pd.Series(macd, index=df.index, name="MACD"),
                pd.Series(signal, index=df.index, name="MACD_Signal"),
                pd.Series(hist, index=df.index, name="MACD_Hist"),
            )
        except ImportError:
            pass  # Fall back to pandas-ta
        except Exception as e:
            pass  # TA-Lib error, fall back

    # Fall back to pandas-ta
    try:
        import pandas_ta as ta

        macd_df = ta.macd(
            df[column],
            fast=fast_period,
            slow=slow_period,
            signal=signal_period,
        )
        if macd_df is None or macd_df.empty:
            raise IndicatorCalculationError(
                "pandas_ta.macd returned None or empty DataFrame"
            )

        # pandas_ta returns DataFrame with columns: MACD_{fast}_{slow}_{signal}, etc.
        macd_col = f"MACD_{fast_period}_{slow_period}_{signal_period}"
        signal_col = f"MACDs_{fast_period}_{slow_period}_{signal_period}"
        hist_col = f"MACDh_{fast_period}_{slow_period}_{signal_period}"

        return (
            macd_df[macd_col].rename("MACD"),
            macd_df[signal_col].rename("MACD_Signal"),
            macd_df[hist_col].rename("MACD_Hist"),
        )
    except ImportError as e:
        raise IndicatorCalculationError(
            "Neither TA-Lib nor pandas-ta is available. Install one: "
            "pip install TA-Lib or pip install pandas-ta"
        ) from e
    except Exception as e:
        pass  # Try manual calculation

    # Manual MACD calculation as last resort
    try:
        macd, signal, hist = _calculate_macd_manual(
            prices, fast_period, slow_period, signal_period
        )
        return (
            pd.Series(macd, index=df.index, name="MACD"),
            pd.Series(signal, index=df.index, name="MACD_Signal"),
            pd.Series(hist, index=df.index, name="MACD_Hist"),
        )
    except Exception as e:
        raise IndicatorCalculationError(f"MACD calculation failed: {str(e)}") from e


def _calculate_ema(prices: np.ndarray, period: int) -> np.ndarray:
    """
    Calculate Exponential Moving Average.

    Args:
        prices: Price array
        period: EMA period

    Returns:
        EMA values array
    """
    ema = np.full(len(prices), np.nan)
    multiplier = 2.0 / (period + 1)

    # Initialize with SMA
    ema[period - 1] = np.mean(prices[:period])

    # Calculate EMA
    for i in range(period, len(prices)):
        ema[i] = (prices[i] - ema[i - 1]) * multiplier + ema[i - 1]

    return ema


def _calculate_macd_manual(
    prices: np.ndarray,
    fast_period: int,
    slow_period: int,
    signal_period: int,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Manual MACD calculation.

    Args:
        prices: Price array
        fast_period: Fast EMA period
        slow_period: Slow EMA period
        signal_period: Signal line period

    Returns:
        Tuple of (macd, signal, histogram) arrays
    """
    # Calculate fast and slow EMAs
    fast_ema = _calculate_ema(prices, fast_period)
    slow_ema = _calculate_ema(prices, slow_period)

    # MACD Line = Fast EMA - Slow EMA
    macd = fast_ema - slow_ema

    # Signal Line = EMA of MACD Line
    # Need to handle NaN values for signal calculation
    valid_idx = ~np.isnan(macd)
    signal = np.full(len(prices), np.nan)

    if valid_idx.sum() >= signal_period:
        macd_valid = macd[valid_idx]
        signal_valid = _calculate_ema(macd_valid, signal_period)
        signal[valid_idx] = signal_valid

    # Histogram = MACD - Signal
    histogram = macd - signal

    return macd, signal, histogram


def interpret_macd(
    macd_value: float,
    signal_value: float,
    hist_value: float,
    prev_hist_value: Optional[float] = None,
) -> str:
    """
    Interpret MACD values.

    Args:
        macd_value: Current MACD line value
        signal_value: Current signal line value
        hist_value: Current histogram value
        prev_hist_value: Previous histogram value for crossover detection

    Returns:
        Interpretation string
    """
    if np.isnan(macd_value) or np.isnan(signal_value):
        return "Insufficient data"

    # Detect crossovers
    if prev_hist_value is not None and not np.isnan(prev_hist_value):
        if prev_hist_value < 0 and hist_value > 0:
            return "Bullish crossover (MACD crossed above signal)"
        elif prev_hist_value > 0 and hist_value < 0:
            return "Bearish crossover (MACD crossed below signal)"

    # General interpretation
    if macd_value > signal_value:
        if hist_value > 0:
            return f"Bullish (MACD above signal, histogram: {hist_value:.4f})"
        else:
            return f"Weakening bullish momentum (histogram: {hist_value:.4f})"
    else:
        if hist_value < 0:
            return f"Bearish (MACD below signal, histogram: {hist_value:.4f})"
        else:
            return f"Weakening bearish momentum (histogram: {hist_value:.4f})"


def find_macd_crossovers(
    macd_series: pd.Series,
    signal_series: pd.Series,
) -> dict:
    """
    Find bullish and bearish MACD crossovers.

    Args:
        macd_series: MACD line series
        signal_series: Signal line series

    Returns:
        Dict with 'bullish' and 'bearish' crossover indices
    """
    bullish = []
    bearish = []

    hist = (macd_series - signal_series).values

    for i in range(1, len(hist)):
        if not np.isnan(hist[i]) and not np.isnan(hist[i - 1]):
            # Bullish crossover: histogram crosses from negative to positive
            if hist[i - 1] < 0 and hist[i] > 0:
                bullish.append(i)
            # Bearish crossover: histogram crosses from positive to negative
            elif hist[i - 1] > 0 and hist[i] < 0:
                bearish.append(i)

    return {
        "bullish": bullish,
        "bearish": bearish,
    }


def find_macd_divergence(
    df: pd.DataFrame,
    macd_series: pd.Series,
    window: int = 14,
) -> dict:
    """
    Detect bullish and bearish MACD divergences.

    Args:
        df: OHLC DataFrame
        macd_series: MACD line series
        window: Window for finding local extrema

    Returns:
        Dict with 'bullish' and 'bearish' divergence indices
    """
    bullish = []
    bearish = []

    prices = df["close"].values
    macd = macd_series.values

    for i in range(window, len(df) - window):
        # Bullish divergence
        if prices[i] == np.min(prices[i - window : i + window]):
            for j in range(i - 2 * window, i - window):
                if j >= 0 and prices[j] == np.min(prices[j - window : j + window]):
                    if not np.isnan(macd[i]) and not np.isnan(macd[j]):
                        if prices[i] < prices[j] and macd[i] > macd[j]:
                            bullish.append(i)
                    break

        # Bearish divergence
        if prices[i] == np.max(prices[i - window : i + window]):
            for j in range(i - 2 * window, i - window):
                if j >= 0 and prices[j] == np.max(prices[j - window : j + window]):
                    if not np.isnan(macd[i]) and not np.isnan(macd[j]):
                        if prices[i] > prices[j] and macd[i] < macd[j]:
                            bearish.append(i)
                    break

    return {
        "bullish": bullish,
        "bearish": bearish,
    }