finance-data-mcp / src /technical_analysis.py
dlrklc's picture
Initial commit: Gradio MCP app for real-time financial data
7169bc5
"""
Technical analysis module for Financial Market Data MCP Server.
Calculates technical indicators: RSI, MACD, Bollinger Bands, Moving Averages.
"""
import pandas as pd
import numpy as np
from typing import Dict
from .data_fetcher import fetch_historical_data
from .validators import validate_ticker, validate_period
from .config import logger
def calculate_sma(prices: pd.Series, window: int) -> pd.Series:
"""Calculate Simple Moving Average."""
return prices.rolling(window=window).mean()
def calculate_ema(prices: pd.Series, window: int) -> pd.Series:
"""Calculate Exponential Moving Average."""
return prices.ewm(span=window, adjust=False).mean()
def calculate_rsi(prices: pd.Series, period: int = 14) -> float:
"""Calculate Relative Strength Index."""
if len(prices) < period + 1:
return 50.0 # Neutral RSI if insufficient data
delta = prices.diff()
gains = delta.where(delta > 0, 0)
losses = -delta.where(delta < 0, 0)
avg_gain = gains.rolling(window=period).mean()
avg_loss = losses.rolling(window=period).mean()
with np.errstate(divide='ignore', invalid='ignore'):
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
if avg_loss.iloc[-1] == 0 and avg_gain.iloc[-1] > 0:
return 100.0 # All gains, no losses
if avg_gain.iloc[-1] == 0 and avg_loss.iloc[-1] > 0:
return 0.0 # All losses, no gains
last_value = rsi.iloc[-1]
if pd.isna(last_value) or np.isinf(last_value):
return 50.0
return round(float(last_value), 2)
def calculate_macd(prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Dict:
"""Calculate MACD (Moving Average Convergence Divergence)."""
if len(prices) < slow + signal:
return {
"macd": 0.0,
"signal": 0.0,
"histogram": 0.0,
"trend": "neutral"
}
ema_fast = prices.ewm(span=fast, adjust=False).mean()
ema_slow = prices.ewm(span=slow, adjust=False).mean()
macd_line = ema_fast - ema_slow
signal_line = macd_line.ewm(span=signal, adjust=False).mean()
histogram = macd_line - signal_line
return {
"macd": round(macd_line.iloc[-1], 2),
"signal": round(signal_line.iloc[-1], 2),
"histogram": round(histogram.iloc[-1], 2),
"trend": "bullish" if histogram.iloc[-1] > 0 else "bearish"
}
def calculate_bollinger_bands(prices: pd.Series, window: int = 20, num_std: int = 2) -> Dict:
"""Calculate Bollinger Bands."""
if len(prices) < window:
current_price = prices.iloc[-1] if not prices.empty else 0
return {
"upper": round(current_price, 2),
"middle": round(current_price, 2),
"lower": round(current_price, 2),
"current_price": round(current_price, 2),
"position": "neutral"
}
sma = prices.rolling(window=window).mean()
std = prices.rolling(window=window).std()
upper_band = sma + (std * num_std)
lower_band = sma - (std * num_std)
current_price = prices.iloc[-1]
return {
"upper": round(upper_band.iloc[-1], 2),
"middle": round(sma.iloc[-1], 2),
"lower": round(lower_band.iloc[-1], 2),
"current_price": round(current_price, 2),
"position": "overbought" if current_price > upper_band.iloc[-1]
else "oversold" if current_price < lower_band.iloc[-1]
else "neutral"
}
def get_technical_analysis(ticker: str, period: str = "3mo") -> Dict:
"""
Perform comprehensive technical analysis.
Args:
ticker: Stock ticker symbol
period: Historical data period for analysis
Returns:
Dictionary with multiple technical indicators and signals
"""
from datetime import datetime
# Validate inputs
is_valid_ticker, sanitized_ticker, error = validate_ticker(ticker)
if not is_valid_ticker:
return {"error": error, "error_code": "INVALID_TICKER"}
is_valid_period, sanitized_period, error = validate_period(period)
if not is_valid_period:
return {"error": error, "error_code": "INVALID_PERIOD"}
logger.info(f"Performing technical analysis: {sanitized_ticker}, {sanitized_period}")
try:
hist = fetch_historical_data(sanitized_ticker, sanitized_period, "1d")
if hist.empty or len(hist) < 50:
return {
"error": f"Insufficient data for technical analysis of {sanitized_ticker}",
"error_code": "INSUFFICIENT_DATA"
}
prices = hist['Close']
# Calculate indicators
sma_20 = calculate_sma(prices, 20)
sma_50 = calculate_sma(prices, 50)
sma_200 = calculate_sma(prices, 200) if len(hist) >= 200 else None
ema_12 = calculate_ema(prices, 12)
ema_26 = calculate_ema(prices, 26)
rsi = calculate_rsi(prices)
macd = calculate_macd(prices)
bollinger = calculate_bollinger_bands(prices)
current_price = prices.iloc[-1]
# Generate trading signals
signals = []
if rsi < 30:
signals.append("🟒 RSI oversold - potential BUY signal")
elif rsi > 70:
signals.append("πŸ”΄ RSI overbought - potential SELL signal")
else:
signals.append("🟑 RSI neutral")
if macd['trend'] == 'bullish' and macd['histogram'] > 0:
signals.append("🟒 MACD bullish crossover")
elif macd['trend'] == 'bearish' and macd['histogram'] < 0:
signals.append("πŸ”΄ MACD bearish crossover")
if sma_200 is not None and current_price > sma_50.iloc[-1] > sma_200.iloc[-1]:
signals.append("🟒 Golden Cross formation (bullish)")
elif sma_200 is not None and current_price < sma_50.iloc[-1] < sma_200.iloc[-1]:
signals.append("πŸ”΄ Death Cross formation (bearish)")
if bollinger['position'] == 'oversold':
signals.append("🟒 Price at lower Bollinger Band (potential bounce)")
elif bollinger['position'] == 'overbought':
signals.append("πŸ”΄ Price at upper Bollinger Band (potential pullback)")
# Overall recommendation
bullish_count = sum(1 for s in signals if '🟒' in s)
bearish_count = sum(1 for s in signals if 'πŸ”΄' in s)
if bullish_count > bearish_count:
overall = "BUY"
elif bearish_count > bullish_count:
overall = "SELL"
else:
overall = "HOLD"
return {
"ticker": sanitized_ticker,
"current_price": round(current_price, 2),
"indicators": {
"sma_20": round(sma_20.iloc[-1], 2),
"sma_50": round(sma_50.iloc[-1], 2),
"sma_200": round(sma_200.iloc[-1], 2) if sma_200 is not None else None,
"ema_12": round(ema_12.iloc[-1], 2),
"ema_26": round(ema_26.iloc[-1], 2),
"rsi": rsi,
"macd": macd,
"bollinger_bands": bollinger
},
"signals": signals,
"recommendation": overall,
"confidence": f"{max(bullish_count, bearish_count)}/{len(signals)}",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error in technical analysis for {sanitized_ticker}: {e}")
return {
"error": "Unable to perform technical analysis",
"error_code": "ANALYSIS_ERROR"
}