Deploy Bot
Deploy Trading Analysis Platform to HuggingFace Spaces
a1bf219
"""
MACD (Moving Average Convergence Divergence) calculation.
Provides MACD calculation using TA-Lib as primary library with pandas-ta fallback.
MACD shows the relationship between two moving averages of prices.
"""
from typing import Optional, Tuple
import numpy as np
import pandas as pd
class IndicatorCalculationError(Exception):
"""Raised when indicator calculation fails."""
pass
def calculate_macd(
df: pd.DataFrame,
fast_period: int = 12,
slow_period: int = 26,
signal_period: int = 9,
column: str = "close",
use_talib: bool = True,
) -> Tuple[pd.Series, pd.Series, pd.Series]:
"""
Calculate MACD (Moving Average Convergence Divergence).
MACD consists of:
- MACD Line: (12-day EMA - 26-day EMA)
- Signal Line: 9-day EMA of MACD Line
- Histogram: MACD Line - Signal Line
Args:
df: DataFrame with OHLC data
fast_period: Fast EMA period (default: 12)
slow_period: Slow EMA period (default: 26)
signal_period: Signal line EMA period (default: 9)
column: Column to calculate MACD on (default: Close)
use_talib: Whether to try TA-Lib first (default: True)
Returns:
Tuple of (macd_line, signal_line, histogram) as Series
Raises:
IndicatorCalculationError: If calculation fails
"""
if column not in df.columns:
raise IndicatorCalculationError(f"Column '{column}' not found in DataFrame")
min_bars = slow_period + signal_period
if len(df) < min_bars:
raise IndicatorCalculationError(
f"Insufficient data for MACD calculation (need {min_bars} bars, got {len(df)})"
)
prices = df[column].values
# Try TA-Lib first if requested
if use_talib:
try:
import talib
macd, signal, hist = talib.MACD(
prices,
fastperiod=fast_period,
slowperiod=slow_period,
signalperiod=signal_period,
)
return (
pd.Series(macd, index=df.index, name="MACD"),
pd.Series(signal, index=df.index, name="MACD_Signal"),
pd.Series(hist, index=df.index, name="MACD_Hist"),
)
except ImportError:
pass # Fall back to pandas-ta
except Exception as e:
pass # TA-Lib error, fall back
# Fall back to pandas-ta
try:
import pandas_ta as ta
macd_df = ta.macd(
df[column],
fast=fast_period,
slow=slow_period,
signal=signal_period,
)
if macd_df is None or macd_df.empty:
raise IndicatorCalculationError(
"pandas_ta.macd returned None or empty DataFrame"
)
# pandas_ta returns DataFrame with columns: MACD_{fast}_{slow}_{signal}, etc.
macd_col = f"MACD_{fast_period}_{slow_period}_{signal_period}"
signal_col = f"MACDs_{fast_period}_{slow_period}_{signal_period}"
hist_col = f"MACDh_{fast_period}_{slow_period}_{signal_period}"
return (
macd_df[macd_col].rename("MACD"),
macd_df[signal_col].rename("MACD_Signal"),
macd_df[hist_col].rename("MACD_Hist"),
)
except ImportError as e:
raise IndicatorCalculationError(
"Neither TA-Lib nor pandas-ta is available. Install one: "
"pip install TA-Lib or pip install pandas-ta"
) from e
except Exception as e:
pass # Try manual calculation
# Manual MACD calculation as last resort
try:
macd, signal, hist = _calculate_macd_manual(
prices, fast_period, slow_period, signal_period
)
return (
pd.Series(macd, index=df.index, name="MACD"),
pd.Series(signal, index=df.index, name="MACD_Signal"),
pd.Series(hist, index=df.index, name="MACD_Hist"),
)
except Exception as e:
raise IndicatorCalculationError(f"MACD calculation failed: {str(e)}") from e
def _calculate_ema(prices: np.ndarray, period: int) -> np.ndarray:
"""
Calculate Exponential Moving Average.
Args:
prices: Price array
period: EMA period
Returns:
EMA values array
"""
ema = np.full(len(prices), np.nan)
multiplier = 2.0 / (period + 1)
# Initialize with SMA
ema[period - 1] = np.mean(prices[:period])
# Calculate EMA
for i in range(period, len(prices)):
ema[i] = (prices[i] - ema[i - 1]) * multiplier + ema[i - 1]
return ema
def _calculate_macd_manual(
prices: np.ndarray,
fast_period: int,
slow_period: int,
signal_period: int,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Manual MACD calculation.
Args:
prices: Price array
fast_period: Fast EMA period
slow_period: Slow EMA period
signal_period: Signal line period
Returns:
Tuple of (macd, signal, histogram) arrays
"""
# Calculate fast and slow EMAs
fast_ema = _calculate_ema(prices, fast_period)
slow_ema = _calculate_ema(prices, slow_period)
# MACD Line = Fast EMA - Slow EMA
macd = fast_ema - slow_ema
# Signal Line = EMA of MACD Line
# Need to handle NaN values for signal calculation
valid_idx = ~np.isnan(macd)
signal = np.full(len(prices), np.nan)
if valid_idx.sum() >= signal_period:
macd_valid = macd[valid_idx]
signal_valid = _calculate_ema(macd_valid, signal_period)
signal[valid_idx] = signal_valid
# Histogram = MACD - Signal
histogram = macd - signal
return macd, signal, histogram
def interpret_macd(
macd_value: float,
signal_value: float,
hist_value: float,
prev_hist_value: Optional[float] = None,
) -> str:
"""
Interpret MACD values.
Args:
macd_value: Current MACD line value
signal_value: Current signal line value
hist_value: Current histogram value
prev_hist_value: Previous histogram value for crossover detection
Returns:
Interpretation string
"""
if np.isnan(macd_value) or np.isnan(signal_value):
return "Insufficient data"
# Detect crossovers
if prev_hist_value is not None and not np.isnan(prev_hist_value):
if prev_hist_value < 0 and hist_value > 0:
return "Bullish crossover (MACD crossed above signal)"
elif prev_hist_value > 0 and hist_value < 0:
return "Bearish crossover (MACD crossed below signal)"
# General interpretation
if macd_value > signal_value:
if hist_value > 0:
return f"Bullish (MACD above signal, histogram: {hist_value:.4f})"
else:
return f"Weakening bullish momentum (histogram: {hist_value:.4f})"
else:
if hist_value < 0:
return f"Bearish (MACD below signal, histogram: {hist_value:.4f})"
else:
return f"Weakening bearish momentum (histogram: {hist_value:.4f})"
def find_macd_crossovers(
macd_series: pd.Series,
signal_series: pd.Series,
) -> dict:
"""
Find bullish and bearish MACD crossovers.
Args:
macd_series: MACD line series
signal_series: Signal line series
Returns:
Dict with 'bullish' and 'bearish' crossover indices
"""
bullish = []
bearish = []
hist = (macd_series - signal_series).values
for i in range(1, len(hist)):
if not np.isnan(hist[i]) and not np.isnan(hist[i - 1]):
# Bullish crossover: histogram crosses from negative to positive
if hist[i - 1] < 0 and hist[i] > 0:
bullish.append(i)
# Bearish crossover: histogram crosses from positive to negative
elif hist[i - 1] > 0 and hist[i] < 0:
bearish.append(i)
return {
"bullish": bullish,
"bearish": bearish,
}
def find_macd_divergence(
df: pd.DataFrame,
macd_series: pd.Series,
window: int = 14,
) -> dict:
"""
Detect bullish and bearish MACD divergences.
Args:
df: OHLC DataFrame
macd_series: MACD line series
window: Window for finding local extrema
Returns:
Dict with 'bullish' and 'bearish' divergence indices
"""
bullish = []
bearish = []
prices = df["close"].values
macd = macd_series.values
for i in range(window, len(df) - window):
# Bullish divergence
if prices[i] == np.min(prices[i - window : i + window]):
for j in range(i - 2 * window, i - window):
if j >= 0 and prices[j] == np.min(prices[j - window : j + window]):
if not np.isnan(macd[i]) and not np.isnan(macd[j]):
if prices[i] < prices[j] and macd[i] > macd[j]:
bullish.append(i)
break
# Bearish divergence
if prices[i] == np.max(prices[i - window : i + window]):
for j in range(i - 2 * window, i - window):
if j >= 0 and prices[j] == np.max(prices[j - window : j + window]):
if not np.isnan(macd[i]) and not np.isnan(macd[j]):
if prices[i] > prices[j] and macd[i] < macd[j]:
bearish.append(i)
break
return {
"bullish": bullish,
"bearish": bearish,
}