""" MACD (Moving Average Convergence Divergence) calculation. Provides MACD calculation using TA-Lib as primary library with pandas-ta fallback. MACD shows the relationship between two moving averages of prices. """ from typing import Optional, Tuple import numpy as np import pandas as pd class IndicatorCalculationError(Exception): """Raised when indicator calculation fails.""" pass def calculate_macd( df: pd.DataFrame, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9, column: str = "close", use_talib: bool = True, ) -> Tuple[pd.Series, pd.Series, pd.Series]: """ Calculate MACD (Moving Average Convergence Divergence). MACD consists of: - MACD Line: (12-day EMA - 26-day EMA) - Signal Line: 9-day EMA of MACD Line - Histogram: MACD Line - Signal Line Args: df: DataFrame with OHLC data fast_period: Fast EMA period (default: 12) slow_period: Slow EMA period (default: 26) signal_period: Signal line EMA period (default: 9) column: Column to calculate MACD on (default: Close) use_talib: Whether to try TA-Lib first (default: True) Returns: Tuple of (macd_line, signal_line, histogram) as Series Raises: IndicatorCalculationError: If calculation fails """ if column not in df.columns: raise IndicatorCalculationError(f"Column '{column}' not found in DataFrame") min_bars = slow_period + signal_period if len(df) < min_bars: raise IndicatorCalculationError( f"Insufficient data for MACD calculation (need {min_bars} bars, got {len(df)})" ) prices = df[column].values # Try TA-Lib first if requested if use_talib: try: import talib macd, signal, hist = talib.MACD( prices, fastperiod=fast_period, slowperiod=slow_period, signalperiod=signal_period, ) return ( pd.Series(macd, index=df.index, name="MACD"), pd.Series(signal, index=df.index, name="MACD_Signal"), pd.Series(hist, index=df.index, name="MACD_Hist"), ) except ImportError: pass # Fall back to pandas-ta except Exception as e: pass # TA-Lib error, fall back # Fall back to pandas-ta try: import pandas_ta as ta macd_df = ta.macd( df[column], fast=fast_period, slow=slow_period, signal=signal_period, ) if macd_df is None or macd_df.empty: raise IndicatorCalculationError( "pandas_ta.macd returned None or empty DataFrame" ) # pandas_ta returns DataFrame with columns: MACD_{fast}_{slow}_{signal}, etc. macd_col = f"MACD_{fast_period}_{slow_period}_{signal_period}" signal_col = f"MACDs_{fast_period}_{slow_period}_{signal_period}" hist_col = f"MACDh_{fast_period}_{slow_period}_{signal_period}" return ( macd_df[macd_col].rename("MACD"), macd_df[signal_col].rename("MACD_Signal"), macd_df[hist_col].rename("MACD_Hist"), ) except ImportError as e: raise IndicatorCalculationError( "Neither TA-Lib nor pandas-ta is available. Install one: " "pip install TA-Lib or pip install pandas-ta" ) from e except Exception as e: pass # Try manual calculation # Manual MACD calculation as last resort try: macd, signal, hist = _calculate_macd_manual( prices, fast_period, slow_period, signal_period ) return ( pd.Series(macd, index=df.index, name="MACD"), pd.Series(signal, index=df.index, name="MACD_Signal"), pd.Series(hist, index=df.index, name="MACD_Hist"), ) except Exception as e: raise IndicatorCalculationError(f"MACD calculation failed: {str(e)}") from e def _calculate_ema(prices: np.ndarray, period: int) -> np.ndarray: """ Calculate Exponential Moving Average. Args: prices: Price array period: EMA period Returns: EMA values array """ ema = np.full(len(prices), np.nan) multiplier = 2.0 / (period + 1) # Initialize with SMA ema[period - 1] = np.mean(prices[:period]) # Calculate EMA for i in range(period, len(prices)): ema[i] = (prices[i] - ema[i - 1]) * multiplier + ema[i - 1] return ema def _calculate_macd_manual( prices: np.ndarray, fast_period: int, slow_period: int, signal_period: int, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Manual MACD calculation. Args: prices: Price array fast_period: Fast EMA period slow_period: Slow EMA period signal_period: Signal line period Returns: Tuple of (macd, signal, histogram) arrays """ # Calculate fast and slow EMAs fast_ema = _calculate_ema(prices, fast_period) slow_ema = _calculate_ema(prices, slow_period) # MACD Line = Fast EMA - Slow EMA macd = fast_ema - slow_ema # Signal Line = EMA of MACD Line # Need to handle NaN values for signal calculation valid_idx = ~np.isnan(macd) signal = np.full(len(prices), np.nan) if valid_idx.sum() >= signal_period: macd_valid = macd[valid_idx] signal_valid = _calculate_ema(macd_valid, signal_period) signal[valid_idx] = signal_valid # Histogram = MACD - Signal histogram = macd - signal return macd, signal, histogram def interpret_macd( macd_value: float, signal_value: float, hist_value: float, prev_hist_value: Optional[float] = None, ) -> str: """ Interpret MACD values. Args: macd_value: Current MACD line value signal_value: Current signal line value hist_value: Current histogram value prev_hist_value: Previous histogram value for crossover detection Returns: Interpretation string """ if np.isnan(macd_value) or np.isnan(signal_value): return "Insufficient data" # Detect crossovers if prev_hist_value is not None and not np.isnan(prev_hist_value): if prev_hist_value < 0 and hist_value > 0: return "Bullish crossover (MACD crossed above signal)" elif prev_hist_value > 0 and hist_value < 0: return "Bearish crossover (MACD crossed below signal)" # General interpretation if macd_value > signal_value: if hist_value > 0: return f"Bullish (MACD above signal, histogram: {hist_value:.4f})" else: return f"Weakening bullish momentum (histogram: {hist_value:.4f})" else: if hist_value < 0: return f"Bearish (MACD below signal, histogram: {hist_value:.4f})" else: return f"Weakening bearish momentum (histogram: {hist_value:.4f})" def find_macd_crossovers( macd_series: pd.Series, signal_series: pd.Series, ) -> dict: """ Find bullish and bearish MACD crossovers. Args: macd_series: MACD line series signal_series: Signal line series Returns: Dict with 'bullish' and 'bearish' crossover indices """ bullish = [] bearish = [] hist = (macd_series - signal_series).values for i in range(1, len(hist)): if not np.isnan(hist[i]) and not np.isnan(hist[i - 1]): # Bullish crossover: histogram crosses from negative to positive if hist[i - 1] < 0 and hist[i] > 0: bullish.append(i) # Bearish crossover: histogram crosses from positive to negative elif hist[i - 1] > 0 and hist[i] < 0: bearish.append(i) return { "bullish": bullish, "bearish": bearish, } def find_macd_divergence( df: pd.DataFrame, macd_series: pd.Series, window: int = 14, ) -> dict: """ Detect bullish and bearish MACD divergences. Args: df: OHLC DataFrame macd_series: MACD line series window: Window for finding local extrema Returns: Dict with 'bullish' and 'bearish' divergence indices """ bullish = [] bearish = [] prices = df["close"].values macd = macd_series.values for i in range(window, len(df) - window): # Bullish divergence if prices[i] == np.min(prices[i - window : i + window]): for j in range(i - 2 * window, i - window): if j >= 0 and prices[j] == np.min(prices[j - window : j + window]): if not np.isnan(macd[i]) and not np.isnan(macd[j]): if prices[i] < prices[j] and macd[i] > macd[j]: bullish.append(i) break # Bearish divergence if prices[i] == np.max(prices[i - window : i + window]): for j in range(i - 2 * window, i - window): if j >= 0 and prices[j] == np.max(prices[j - window : j + window]): if not np.isnan(macd[i]) and not np.isnan(macd[j]): if prices[i] > prices[j] and macd[i] < macd[j]: bearish.append(i) break return { "bullish": bullish, "bearish": bearish, }