|
|
""" |
|
|
Technical analysis module for Financial Market Data MCP Server. |
|
|
Calculates technical indicators: RSI, MACD, Bollinger Bands, Moving Averages. |
|
|
""" |
|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from typing import Dict |
|
|
from .data_fetcher import fetch_historical_data |
|
|
from .validators import validate_ticker, validate_period |
|
|
from .config import logger |
|
|
|
|
|
|
|
|
def calculate_sma(prices: pd.Series, window: int) -> pd.Series: |
|
|
"""Calculate Simple Moving Average.""" |
|
|
return prices.rolling(window=window).mean() |
|
|
|
|
|
|
|
|
def calculate_ema(prices: pd.Series, window: int) -> pd.Series: |
|
|
"""Calculate Exponential Moving Average.""" |
|
|
return prices.ewm(span=window, adjust=False).mean() |
|
|
|
|
|
|
|
|
def calculate_rsi(prices: pd.Series, period: int = 14) -> float: |
|
|
"""Calculate Relative Strength Index.""" |
|
|
if len(prices) < period + 1: |
|
|
return 50.0 |
|
|
|
|
|
delta = prices.diff() |
|
|
gains = delta.where(delta > 0, 0) |
|
|
losses = -delta.where(delta < 0, 0) |
|
|
avg_gain = gains.rolling(window=period).mean() |
|
|
avg_loss = losses.rolling(window=period).mean() |
|
|
|
|
|
with np.errstate(divide='ignore', invalid='ignore'): |
|
|
rs = avg_gain / avg_loss |
|
|
rsi = 100 - (100 / (1 + rs)) |
|
|
|
|
|
if avg_loss.iloc[-1] == 0 and avg_gain.iloc[-1] > 0: |
|
|
return 100.0 |
|
|
if avg_gain.iloc[-1] == 0 and avg_loss.iloc[-1] > 0: |
|
|
return 0.0 |
|
|
|
|
|
last_value = rsi.iloc[-1] |
|
|
if pd.isna(last_value) or np.isinf(last_value): |
|
|
return 50.0 |
|
|
|
|
|
return round(float(last_value), 2) |
|
|
|
|
|
|
|
|
def calculate_macd(prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Dict: |
|
|
"""Calculate MACD (Moving Average Convergence Divergence).""" |
|
|
if len(prices) < slow + signal: |
|
|
return { |
|
|
"macd": 0.0, |
|
|
"signal": 0.0, |
|
|
"histogram": 0.0, |
|
|
"trend": "neutral" |
|
|
} |
|
|
|
|
|
ema_fast = prices.ewm(span=fast, adjust=False).mean() |
|
|
ema_slow = prices.ewm(span=slow, adjust=False).mean() |
|
|
|
|
|
macd_line = ema_fast - ema_slow |
|
|
signal_line = macd_line.ewm(span=signal, adjust=False).mean() |
|
|
histogram = macd_line - signal_line |
|
|
|
|
|
return { |
|
|
"macd": round(macd_line.iloc[-1], 2), |
|
|
"signal": round(signal_line.iloc[-1], 2), |
|
|
"histogram": round(histogram.iloc[-1], 2), |
|
|
"trend": "bullish" if histogram.iloc[-1] > 0 else "bearish" |
|
|
} |
|
|
|
|
|
|
|
|
def calculate_bollinger_bands(prices: pd.Series, window: int = 20, num_std: int = 2) -> Dict: |
|
|
"""Calculate Bollinger Bands.""" |
|
|
if len(prices) < window: |
|
|
current_price = prices.iloc[-1] if not prices.empty else 0 |
|
|
return { |
|
|
"upper": round(current_price, 2), |
|
|
"middle": round(current_price, 2), |
|
|
"lower": round(current_price, 2), |
|
|
"current_price": round(current_price, 2), |
|
|
"position": "neutral" |
|
|
} |
|
|
|
|
|
sma = prices.rolling(window=window).mean() |
|
|
std = prices.rolling(window=window).std() |
|
|
|
|
|
upper_band = sma + (std * num_std) |
|
|
lower_band = sma - (std * num_std) |
|
|
|
|
|
current_price = prices.iloc[-1] |
|
|
|
|
|
return { |
|
|
"upper": round(upper_band.iloc[-1], 2), |
|
|
"middle": round(sma.iloc[-1], 2), |
|
|
"lower": round(lower_band.iloc[-1], 2), |
|
|
"current_price": round(current_price, 2), |
|
|
"position": "overbought" if current_price > upper_band.iloc[-1] |
|
|
else "oversold" if current_price < lower_band.iloc[-1] |
|
|
else "neutral" |
|
|
} |
|
|
|
|
|
|
|
|
def get_technical_analysis(ticker: str, period: str = "3mo") -> Dict: |
|
|
""" |
|
|
Perform comprehensive technical analysis. |
|
|
|
|
|
Args: |
|
|
ticker: Stock ticker symbol |
|
|
period: Historical data period for analysis |
|
|
|
|
|
Returns: |
|
|
Dictionary with multiple technical indicators and signals |
|
|
""" |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
is_valid_ticker, sanitized_ticker, error = validate_ticker(ticker) |
|
|
if not is_valid_ticker: |
|
|
return {"error": error, "error_code": "INVALID_TICKER"} |
|
|
|
|
|
is_valid_period, sanitized_period, error = validate_period(period) |
|
|
if not is_valid_period: |
|
|
return {"error": error, "error_code": "INVALID_PERIOD"} |
|
|
|
|
|
logger.info(f"Performing technical analysis: {sanitized_ticker}, {sanitized_period}") |
|
|
|
|
|
try: |
|
|
hist = fetch_historical_data(sanitized_ticker, sanitized_period, "1d") |
|
|
|
|
|
if hist.empty or len(hist) < 50: |
|
|
return { |
|
|
"error": f"Insufficient data for technical analysis of {sanitized_ticker}", |
|
|
"error_code": "INSUFFICIENT_DATA" |
|
|
} |
|
|
|
|
|
prices = hist['Close'] |
|
|
|
|
|
|
|
|
sma_20 = calculate_sma(prices, 20) |
|
|
sma_50 = calculate_sma(prices, 50) |
|
|
sma_200 = calculate_sma(prices, 200) if len(hist) >= 200 else None |
|
|
ema_12 = calculate_ema(prices, 12) |
|
|
ema_26 = calculate_ema(prices, 26) |
|
|
rsi = calculate_rsi(prices) |
|
|
macd = calculate_macd(prices) |
|
|
bollinger = calculate_bollinger_bands(prices) |
|
|
|
|
|
current_price = prices.iloc[-1] |
|
|
|
|
|
|
|
|
signals = [] |
|
|
|
|
|
if rsi < 30: |
|
|
signals.append("π’ RSI oversold - potential BUY signal") |
|
|
elif rsi > 70: |
|
|
signals.append("π΄ RSI overbought - potential SELL signal") |
|
|
else: |
|
|
signals.append("π‘ RSI neutral") |
|
|
|
|
|
if macd['trend'] == 'bullish' and macd['histogram'] > 0: |
|
|
signals.append("π’ MACD bullish crossover") |
|
|
elif macd['trend'] == 'bearish' and macd['histogram'] < 0: |
|
|
signals.append("π΄ MACD bearish crossover") |
|
|
|
|
|
if sma_200 is not None and current_price > sma_50.iloc[-1] > sma_200.iloc[-1]: |
|
|
signals.append("π’ Golden Cross formation (bullish)") |
|
|
elif sma_200 is not None and current_price < sma_50.iloc[-1] < sma_200.iloc[-1]: |
|
|
signals.append("π΄ Death Cross formation (bearish)") |
|
|
|
|
|
if bollinger['position'] == 'oversold': |
|
|
signals.append("π’ Price at lower Bollinger Band (potential bounce)") |
|
|
elif bollinger['position'] == 'overbought': |
|
|
signals.append("π΄ Price at upper Bollinger Band (potential pullback)") |
|
|
|
|
|
|
|
|
bullish_count = sum(1 for s in signals if 'π’' in s) |
|
|
bearish_count = sum(1 for s in signals if 'π΄' in s) |
|
|
|
|
|
if bullish_count > bearish_count: |
|
|
overall = "BUY" |
|
|
elif bearish_count > bullish_count: |
|
|
overall = "SELL" |
|
|
else: |
|
|
overall = "HOLD" |
|
|
|
|
|
return { |
|
|
"ticker": sanitized_ticker, |
|
|
"current_price": round(current_price, 2), |
|
|
"indicators": { |
|
|
"sma_20": round(sma_20.iloc[-1], 2), |
|
|
"sma_50": round(sma_50.iloc[-1], 2), |
|
|
"sma_200": round(sma_200.iloc[-1], 2) if sma_200 is not None else None, |
|
|
"ema_12": round(ema_12.iloc[-1], 2), |
|
|
"ema_26": round(ema_26.iloc[-1], 2), |
|
|
"rsi": rsi, |
|
|
"macd": macd, |
|
|
"bollinger_bands": bollinger |
|
|
}, |
|
|
"signals": signals, |
|
|
"recommendation": overall, |
|
|
"confidence": f"{max(bullish_count, bearish_count)}/{len(signals)}", |
|
|
"timestamp": datetime.now().isoformat() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error in technical analysis for {sanitized_ticker}: {e}") |
|
|
return { |
|
|
"error": "Unable to perform technical analysis", |
|
|
"error_code": "ANALYSIS_ERROR" |
|
|
} |
|
|
|
|
|
|