Spaces:
Sleeping
Sleeping
| """ | |
| MACD (Moving Average Convergence Divergence) calculation. | |
| Provides MACD calculation using TA-Lib as primary library with pandas-ta fallback. | |
| MACD shows the relationship between two moving averages of prices. | |
| """ | |
| from typing import Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| class IndicatorCalculationError(Exception): | |
| """Raised when indicator calculation fails.""" | |
| pass | |
| def calculate_macd( | |
| df: pd.DataFrame, | |
| fast_period: int = 12, | |
| slow_period: int = 26, | |
| signal_period: int = 9, | |
| column: str = "close", | |
| use_talib: bool = True, | |
| ) -> Tuple[pd.Series, pd.Series, pd.Series]: | |
| """ | |
| Calculate MACD (Moving Average Convergence Divergence). | |
| MACD consists of: | |
| - MACD Line: (12-day EMA - 26-day EMA) | |
| - Signal Line: 9-day EMA of MACD Line | |
| - Histogram: MACD Line - Signal Line | |
| Args: | |
| df: DataFrame with OHLC data | |
| fast_period: Fast EMA period (default: 12) | |
| slow_period: Slow EMA period (default: 26) | |
| signal_period: Signal line EMA period (default: 9) | |
| column: Column to calculate MACD on (default: Close) | |
| use_talib: Whether to try TA-Lib first (default: True) | |
| Returns: | |
| Tuple of (macd_line, signal_line, histogram) as Series | |
| Raises: | |
| IndicatorCalculationError: If calculation fails | |
| """ | |
| if column not in df.columns: | |
| raise IndicatorCalculationError(f"Column '{column}' not found in DataFrame") | |
| min_bars = slow_period + signal_period | |
| if len(df) < min_bars: | |
| raise IndicatorCalculationError( | |
| f"Insufficient data for MACD calculation (need {min_bars} bars, got {len(df)})" | |
| ) | |
| prices = df[column].values | |
| # Try TA-Lib first if requested | |
| if use_talib: | |
| try: | |
| import talib | |
| macd, signal, hist = talib.MACD( | |
| prices, | |
| fastperiod=fast_period, | |
| slowperiod=slow_period, | |
| signalperiod=signal_period, | |
| ) | |
| return ( | |
| pd.Series(macd, index=df.index, name="MACD"), | |
| pd.Series(signal, index=df.index, name="MACD_Signal"), | |
| pd.Series(hist, index=df.index, name="MACD_Hist"), | |
| ) | |
| except ImportError: | |
| pass # Fall back to pandas-ta | |
| except Exception as e: | |
| pass # TA-Lib error, fall back | |
| # Fall back to pandas-ta | |
| try: | |
| import pandas_ta as ta | |
| macd_df = ta.macd( | |
| df[column], | |
| fast=fast_period, | |
| slow=slow_period, | |
| signal=signal_period, | |
| ) | |
| if macd_df is None or macd_df.empty: | |
| raise IndicatorCalculationError( | |
| "pandas_ta.macd returned None or empty DataFrame" | |
| ) | |
| # pandas_ta returns DataFrame with columns: MACD_{fast}_{slow}_{signal}, etc. | |
| macd_col = f"MACD_{fast_period}_{slow_period}_{signal_period}" | |
| signal_col = f"MACDs_{fast_period}_{slow_period}_{signal_period}" | |
| hist_col = f"MACDh_{fast_period}_{slow_period}_{signal_period}" | |
| return ( | |
| macd_df[macd_col].rename("MACD"), | |
| macd_df[signal_col].rename("MACD_Signal"), | |
| macd_df[hist_col].rename("MACD_Hist"), | |
| ) | |
| except ImportError as e: | |
| raise IndicatorCalculationError( | |
| "Neither TA-Lib nor pandas-ta is available. Install one: " | |
| "pip install TA-Lib or pip install pandas-ta" | |
| ) from e | |
| except Exception as e: | |
| pass # Try manual calculation | |
| # Manual MACD calculation as last resort | |
| try: | |
| macd, signal, hist = _calculate_macd_manual( | |
| prices, fast_period, slow_period, signal_period | |
| ) | |
| return ( | |
| pd.Series(macd, index=df.index, name="MACD"), | |
| pd.Series(signal, index=df.index, name="MACD_Signal"), | |
| pd.Series(hist, index=df.index, name="MACD_Hist"), | |
| ) | |
| except Exception as e: | |
| raise IndicatorCalculationError(f"MACD calculation failed: {str(e)}") from e | |
| def _calculate_ema(prices: np.ndarray, period: int) -> np.ndarray: | |
| """ | |
| Calculate Exponential Moving Average. | |
| Args: | |
| prices: Price array | |
| period: EMA period | |
| Returns: | |
| EMA values array | |
| """ | |
| ema = np.full(len(prices), np.nan) | |
| multiplier = 2.0 / (period + 1) | |
| # Initialize with SMA | |
| ema[period - 1] = np.mean(prices[:period]) | |
| # Calculate EMA | |
| for i in range(period, len(prices)): | |
| ema[i] = (prices[i] - ema[i - 1]) * multiplier + ema[i - 1] | |
| return ema | |
| def _calculate_macd_manual( | |
| prices: np.ndarray, | |
| fast_period: int, | |
| slow_period: int, | |
| signal_period: int, | |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| """ | |
| Manual MACD calculation. | |
| Args: | |
| prices: Price array | |
| fast_period: Fast EMA period | |
| slow_period: Slow EMA period | |
| signal_period: Signal line period | |
| Returns: | |
| Tuple of (macd, signal, histogram) arrays | |
| """ | |
| # Calculate fast and slow EMAs | |
| fast_ema = _calculate_ema(prices, fast_period) | |
| slow_ema = _calculate_ema(prices, slow_period) | |
| # MACD Line = Fast EMA - Slow EMA | |
| macd = fast_ema - slow_ema | |
| # Signal Line = EMA of MACD Line | |
| # Need to handle NaN values for signal calculation | |
| valid_idx = ~np.isnan(macd) | |
| signal = np.full(len(prices), np.nan) | |
| if valid_idx.sum() >= signal_period: | |
| macd_valid = macd[valid_idx] | |
| signal_valid = _calculate_ema(macd_valid, signal_period) | |
| signal[valid_idx] = signal_valid | |
| # Histogram = MACD - Signal | |
| histogram = macd - signal | |
| return macd, signal, histogram | |
| def interpret_macd( | |
| macd_value: float, | |
| signal_value: float, | |
| hist_value: float, | |
| prev_hist_value: Optional[float] = None, | |
| ) -> str: | |
| """ | |
| Interpret MACD values. | |
| Args: | |
| macd_value: Current MACD line value | |
| signal_value: Current signal line value | |
| hist_value: Current histogram value | |
| prev_hist_value: Previous histogram value for crossover detection | |
| Returns: | |
| Interpretation string | |
| """ | |
| if np.isnan(macd_value) or np.isnan(signal_value): | |
| return "Insufficient data" | |
| # Detect crossovers | |
| if prev_hist_value is not None and not np.isnan(prev_hist_value): | |
| if prev_hist_value < 0 and hist_value > 0: | |
| return "Bullish crossover (MACD crossed above signal)" | |
| elif prev_hist_value > 0 and hist_value < 0: | |
| return "Bearish crossover (MACD crossed below signal)" | |
| # General interpretation | |
| if macd_value > signal_value: | |
| if hist_value > 0: | |
| return f"Bullish (MACD above signal, histogram: {hist_value:.4f})" | |
| else: | |
| return f"Weakening bullish momentum (histogram: {hist_value:.4f})" | |
| else: | |
| if hist_value < 0: | |
| return f"Bearish (MACD below signal, histogram: {hist_value:.4f})" | |
| else: | |
| return f"Weakening bearish momentum (histogram: {hist_value:.4f})" | |
| def find_macd_crossovers( | |
| macd_series: pd.Series, | |
| signal_series: pd.Series, | |
| ) -> dict: | |
| """ | |
| Find bullish and bearish MACD crossovers. | |
| Args: | |
| macd_series: MACD line series | |
| signal_series: Signal line series | |
| Returns: | |
| Dict with 'bullish' and 'bearish' crossover indices | |
| """ | |
| bullish = [] | |
| bearish = [] | |
| hist = (macd_series - signal_series).values | |
| for i in range(1, len(hist)): | |
| if not np.isnan(hist[i]) and not np.isnan(hist[i - 1]): | |
| # Bullish crossover: histogram crosses from negative to positive | |
| if hist[i - 1] < 0 and hist[i] > 0: | |
| bullish.append(i) | |
| # Bearish crossover: histogram crosses from positive to negative | |
| elif hist[i - 1] > 0 and hist[i] < 0: | |
| bearish.append(i) | |
| return { | |
| "bullish": bullish, | |
| "bearish": bearish, | |
| } | |
| def find_macd_divergence( | |
| df: pd.DataFrame, | |
| macd_series: pd.Series, | |
| window: int = 14, | |
| ) -> dict: | |
| """ | |
| Detect bullish and bearish MACD divergences. | |
| Args: | |
| df: OHLC DataFrame | |
| macd_series: MACD line series | |
| window: Window for finding local extrema | |
| Returns: | |
| Dict with 'bullish' and 'bearish' divergence indices | |
| """ | |
| bullish = [] | |
| bearish = [] | |
| prices = df["close"].values | |
| macd = macd_series.values | |
| for i in range(window, len(df) - window): | |
| # Bullish divergence | |
| if prices[i] == np.min(prices[i - window : i + window]): | |
| for j in range(i - 2 * window, i - window): | |
| if j >= 0 and prices[j] == np.min(prices[j - window : j + window]): | |
| if not np.isnan(macd[i]) and not np.isnan(macd[j]): | |
| if prices[i] < prices[j] and macd[i] > macd[j]: | |
| bullish.append(i) | |
| break | |
| # Bearish divergence | |
| if prices[i] == np.max(prices[i - window : i + window]): | |
| for j in range(i - 2 * window, i - window): | |
| if j >= 0 and prices[j] == np.max(prices[j - window : j + window]): | |
| if not np.isnan(macd[i]) and not np.isnan(macd[j]): | |
| if prices[i] > prices[j] and macd[i] < macd[j]: | |
| bearish.append(i) | |
| break | |
| return { | |
| "bullish": bullish, | |
| "bearish": bearish, | |
| } | |