File size: 7,721 Bytes
7169bc5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
"""
Technical analysis module for Financial Market Data MCP Server.
Calculates technical indicators: RSI, MACD, Bollinger Bands, Moving Averages.
"""
import pandas as pd
import numpy as np
from typing import Dict
from .data_fetcher import fetch_historical_data
from .validators import validate_ticker, validate_period
from .config import logger
def calculate_sma(prices: pd.Series, window: int) -> pd.Series:
"""Calculate Simple Moving Average."""
return prices.rolling(window=window).mean()
def calculate_ema(prices: pd.Series, window: int) -> pd.Series:
"""Calculate Exponential Moving Average."""
return prices.ewm(span=window, adjust=False).mean()
def calculate_rsi(prices: pd.Series, period: int = 14) -> float:
"""Calculate Relative Strength Index."""
if len(prices) < period + 1:
return 50.0 # Neutral RSI if insufficient data
delta = prices.diff()
gains = delta.where(delta > 0, 0)
losses = -delta.where(delta < 0, 0)
avg_gain = gains.rolling(window=period).mean()
avg_loss = losses.rolling(window=period).mean()
with np.errstate(divide='ignore', invalid='ignore'):
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
if avg_loss.iloc[-1] == 0 and avg_gain.iloc[-1] > 0:
return 100.0 # All gains, no losses
if avg_gain.iloc[-1] == 0 and avg_loss.iloc[-1] > 0:
return 0.0 # All losses, no gains
last_value = rsi.iloc[-1]
if pd.isna(last_value) or np.isinf(last_value):
return 50.0
return round(float(last_value), 2)
def calculate_macd(prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Dict:
"""Calculate MACD (Moving Average Convergence Divergence)."""
if len(prices) < slow + signal:
return {
"macd": 0.0,
"signal": 0.0,
"histogram": 0.0,
"trend": "neutral"
}
ema_fast = prices.ewm(span=fast, adjust=False).mean()
ema_slow = prices.ewm(span=slow, adjust=False).mean()
macd_line = ema_fast - ema_slow
signal_line = macd_line.ewm(span=signal, adjust=False).mean()
histogram = macd_line - signal_line
return {
"macd": round(macd_line.iloc[-1], 2),
"signal": round(signal_line.iloc[-1], 2),
"histogram": round(histogram.iloc[-1], 2),
"trend": "bullish" if histogram.iloc[-1] > 0 else "bearish"
}
def calculate_bollinger_bands(prices: pd.Series, window: int = 20, num_std: int = 2) -> Dict:
"""Calculate Bollinger Bands."""
if len(prices) < window:
current_price = prices.iloc[-1] if not prices.empty else 0
return {
"upper": round(current_price, 2),
"middle": round(current_price, 2),
"lower": round(current_price, 2),
"current_price": round(current_price, 2),
"position": "neutral"
}
sma = prices.rolling(window=window).mean()
std = prices.rolling(window=window).std()
upper_band = sma + (std * num_std)
lower_band = sma - (std * num_std)
current_price = prices.iloc[-1]
return {
"upper": round(upper_band.iloc[-1], 2),
"middle": round(sma.iloc[-1], 2),
"lower": round(lower_band.iloc[-1], 2),
"current_price": round(current_price, 2),
"position": "overbought" if current_price > upper_band.iloc[-1]
else "oversold" if current_price < lower_band.iloc[-1]
else "neutral"
}
def get_technical_analysis(ticker: str, period: str = "3mo") -> Dict:
"""
Perform comprehensive technical analysis.
Args:
ticker: Stock ticker symbol
period: Historical data period for analysis
Returns:
Dictionary with multiple technical indicators and signals
"""
from datetime import datetime
# Validate inputs
is_valid_ticker, sanitized_ticker, error = validate_ticker(ticker)
if not is_valid_ticker:
return {"error": error, "error_code": "INVALID_TICKER"}
is_valid_period, sanitized_period, error = validate_period(period)
if not is_valid_period:
return {"error": error, "error_code": "INVALID_PERIOD"}
logger.info(f"Performing technical analysis: {sanitized_ticker}, {sanitized_period}")
try:
hist = fetch_historical_data(sanitized_ticker, sanitized_period, "1d")
if hist.empty or len(hist) < 50:
return {
"error": f"Insufficient data for technical analysis of {sanitized_ticker}",
"error_code": "INSUFFICIENT_DATA"
}
prices = hist['Close']
# Calculate indicators
sma_20 = calculate_sma(prices, 20)
sma_50 = calculate_sma(prices, 50)
sma_200 = calculate_sma(prices, 200) if len(hist) >= 200 else None
ema_12 = calculate_ema(prices, 12)
ema_26 = calculate_ema(prices, 26)
rsi = calculate_rsi(prices)
macd = calculate_macd(prices)
bollinger = calculate_bollinger_bands(prices)
current_price = prices.iloc[-1]
# Generate trading signals
signals = []
if rsi < 30:
signals.append("π’ RSI oversold - potential BUY signal")
elif rsi > 70:
signals.append("π΄ RSI overbought - potential SELL signal")
else:
signals.append("π‘ RSI neutral")
if macd['trend'] == 'bullish' and macd['histogram'] > 0:
signals.append("π’ MACD bullish crossover")
elif macd['trend'] == 'bearish' and macd['histogram'] < 0:
signals.append("π΄ MACD bearish crossover")
if sma_200 is not None and current_price > sma_50.iloc[-1] > sma_200.iloc[-1]:
signals.append("π’ Golden Cross formation (bullish)")
elif sma_200 is not None and current_price < sma_50.iloc[-1] < sma_200.iloc[-1]:
signals.append("π΄ Death Cross formation (bearish)")
if bollinger['position'] == 'oversold':
signals.append("π’ Price at lower Bollinger Band (potential bounce)")
elif bollinger['position'] == 'overbought':
signals.append("π΄ Price at upper Bollinger Band (potential pullback)")
# Overall recommendation
bullish_count = sum(1 for s in signals if 'π’' in s)
bearish_count = sum(1 for s in signals if 'π΄' in s)
if bullish_count > bearish_count:
overall = "BUY"
elif bearish_count > bullish_count:
overall = "SELL"
else:
overall = "HOLD"
return {
"ticker": sanitized_ticker,
"current_price": round(current_price, 2),
"indicators": {
"sma_20": round(sma_20.iloc[-1], 2),
"sma_50": round(sma_50.iloc[-1], 2),
"sma_200": round(sma_200.iloc[-1], 2) if sma_200 is not None else None,
"ema_12": round(ema_12.iloc[-1], 2),
"ema_26": round(ema_26.iloc[-1], 2),
"rsi": rsi,
"macd": macd,
"bollinger_bands": bollinger
},
"signals": signals,
"recommendation": overall,
"confidence": f"{max(bullish_count, bearish_count)}/{len(signals)}",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error in technical analysis for {sanitized_ticker}: {e}")
return {
"error": "Unable to perform technical analysis",
"error_code": "ANALYSIS_ERROR"
}
|