""" Technical analysis module for Financial Market Data MCP Server. Calculates technical indicators: RSI, MACD, Bollinger Bands, Moving Averages. """ import pandas as pd import numpy as np from typing import Dict from .data_fetcher import fetch_historical_data from .validators import validate_ticker, validate_period from .config import logger def calculate_sma(prices: pd.Series, window: int) -> pd.Series: """Calculate Simple Moving Average.""" return prices.rolling(window=window).mean() def calculate_ema(prices: pd.Series, window: int) -> pd.Series: """Calculate Exponential Moving Average.""" return prices.ewm(span=window, adjust=False).mean() def calculate_rsi(prices: pd.Series, period: int = 14) -> float: """Calculate Relative Strength Index.""" if len(prices) < period + 1: return 50.0 # Neutral RSI if insufficient data delta = prices.diff() gains = delta.where(delta > 0, 0) losses = -delta.where(delta < 0, 0) avg_gain = gains.rolling(window=period).mean() avg_loss = losses.rolling(window=period).mean() with np.errstate(divide='ignore', invalid='ignore'): rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) if avg_loss.iloc[-1] == 0 and avg_gain.iloc[-1] > 0: return 100.0 # All gains, no losses if avg_gain.iloc[-1] == 0 and avg_loss.iloc[-1] > 0: return 0.0 # All losses, no gains last_value = rsi.iloc[-1] if pd.isna(last_value) or np.isinf(last_value): return 50.0 return round(float(last_value), 2) def calculate_macd(prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Dict: """Calculate MACD (Moving Average Convergence Divergence).""" if len(prices) < slow + signal: return { "macd": 0.0, "signal": 0.0, "histogram": 0.0, "trend": "neutral" } ema_fast = prices.ewm(span=fast, adjust=False).mean() ema_slow = prices.ewm(span=slow, adjust=False).mean() macd_line = ema_fast - ema_slow signal_line = macd_line.ewm(span=signal, adjust=False).mean() histogram = macd_line - signal_line return { "macd": round(macd_line.iloc[-1], 2), "signal": round(signal_line.iloc[-1], 2), "histogram": round(histogram.iloc[-1], 2), "trend": "bullish" if histogram.iloc[-1] > 0 else "bearish" } def calculate_bollinger_bands(prices: pd.Series, window: int = 20, num_std: int = 2) -> Dict: """Calculate Bollinger Bands.""" if len(prices) < window: current_price = prices.iloc[-1] if not prices.empty else 0 return { "upper": round(current_price, 2), "middle": round(current_price, 2), "lower": round(current_price, 2), "current_price": round(current_price, 2), "position": "neutral" } sma = prices.rolling(window=window).mean() std = prices.rolling(window=window).std() upper_band = sma + (std * num_std) lower_band = sma - (std * num_std) current_price = prices.iloc[-1] return { "upper": round(upper_band.iloc[-1], 2), "middle": round(sma.iloc[-1], 2), "lower": round(lower_band.iloc[-1], 2), "current_price": round(current_price, 2), "position": "overbought" if current_price > upper_band.iloc[-1] else "oversold" if current_price < lower_band.iloc[-1] else "neutral" } def get_technical_analysis(ticker: str, period: str = "3mo") -> Dict: """ Perform comprehensive technical analysis. Args: ticker: Stock ticker symbol period: Historical data period for analysis Returns: Dictionary with multiple technical indicators and signals """ from datetime import datetime # Validate inputs is_valid_ticker, sanitized_ticker, error = validate_ticker(ticker) if not is_valid_ticker: return {"error": error, "error_code": "INVALID_TICKER"} is_valid_period, sanitized_period, error = validate_period(period) if not is_valid_period: return {"error": error, "error_code": "INVALID_PERIOD"} logger.info(f"Performing technical analysis: {sanitized_ticker}, {sanitized_period}") try: hist = fetch_historical_data(sanitized_ticker, sanitized_period, "1d") if hist.empty or len(hist) < 50: return { "error": f"Insufficient data for technical analysis of {sanitized_ticker}", "error_code": "INSUFFICIENT_DATA" } prices = hist['Close'] # Calculate indicators sma_20 = calculate_sma(prices, 20) sma_50 = calculate_sma(prices, 50) sma_200 = calculate_sma(prices, 200) if len(hist) >= 200 else None ema_12 = calculate_ema(prices, 12) ema_26 = calculate_ema(prices, 26) rsi = calculate_rsi(prices) macd = calculate_macd(prices) bollinger = calculate_bollinger_bands(prices) current_price = prices.iloc[-1] # Generate trading signals signals = [] if rsi < 30: signals.append("🟢 RSI oversold - potential BUY signal") elif rsi > 70: signals.append("🔴 RSI overbought - potential SELL signal") else: signals.append("🟡 RSI neutral") if macd['trend'] == 'bullish' and macd['histogram'] > 0: signals.append("🟢 MACD bullish crossover") elif macd['trend'] == 'bearish' and macd['histogram'] < 0: signals.append("🔴 MACD bearish crossover") if sma_200 is not None and current_price > sma_50.iloc[-1] > sma_200.iloc[-1]: signals.append("🟢 Golden Cross formation (bullish)") elif sma_200 is not None and current_price < sma_50.iloc[-1] < sma_200.iloc[-1]: signals.append("🔴 Death Cross formation (bearish)") if bollinger['position'] == 'oversold': signals.append("🟢 Price at lower Bollinger Band (potential bounce)") elif bollinger['position'] == 'overbought': signals.append("🔴 Price at upper Bollinger Band (potential pullback)") # Overall recommendation bullish_count = sum(1 for s in signals if '🟢' in s) bearish_count = sum(1 for s in signals if '🔴' in s) if bullish_count > bearish_count: overall = "BUY" elif bearish_count > bullish_count: overall = "SELL" else: overall = "HOLD" return { "ticker": sanitized_ticker, "current_price": round(current_price, 2), "indicators": { "sma_20": round(sma_20.iloc[-1], 2), "sma_50": round(sma_50.iloc[-1], 2), "sma_200": round(sma_200.iloc[-1], 2) if sma_200 is not None else None, "ema_12": round(ema_12.iloc[-1], 2), "ema_26": round(ema_26.iloc[-1], 2), "rsi": rsi, "macd": macd, "bollinger_bands": bollinger }, "signals": signals, "recommendation": overall, "confidence": f"{max(bullish_count, bearish_count)}/{len(signals)}", "timestamp": datetime.now().isoformat() } except Exception as e: logger.error(f"Error in technical analysis for {sanitized_ticker}: {e}") return { "error": "Unable to perform technical analysis", "error_code": "ANALYSIS_ERROR" }