File size: 7,721 Bytes
7169bc5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
"""
Technical analysis module for Financial Market Data MCP Server.
Calculates technical indicators: RSI, MACD, Bollinger Bands, Moving Averages.
"""

import pandas as pd
import numpy as np
from typing import Dict
from .data_fetcher import fetch_historical_data
from .validators import validate_ticker, validate_period
from .config import logger


def calculate_sma(prices: pd.Series, window: int) -> pd.Series:
    """Calculate Simple Moving Average."""
    return prices.rolling(window=window).mean()


def calculate_ema(prices: pd.Series, window: int) -> pd.Series:
    """Calculate Exponential Moving Average."""
    return prices.ewm(span=window, adjust=False).mean()


def calculate_rsi(prices: pd.Series, period: int = 14) -> float:
    """Calculate Relative Strength Index."""
    if len(prices) < period + 1:
        return 50.0  # Neutral RSI if insufficient data
    
    delta = prices.diff()
    gains = delta.where(delta > 0, 0)
    losses = -delta.where(delta < 0, 0)
    avg_gain = gains.rolling(window=period).mean()
    avg_loss = losses.rolling(window=period).mean()
    
    with np.errstate(divide='ignore', invalid='ignore'):
        rs = avg_gain / avg_loss
        rsi = 100 - (100 / (1 + rs))
    
    if avg_loss.iloc[-1] == 0 and avg_gain.iloc[-1] > 0:
        return 100.0  # All gains, no losses
    if avg_gain.iloc[-1] == 0 and avg_loss.iloc[-1] > 0:
        return 0.0  # All losses, no gains
    
    last_value = rsi.iloc[-1]
    if pd.isna(last_value) or np.isinf(last_value):
        return 50.0
    
    return round(float(last_value), 2)


def calculate_macd(prices: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Dict:
    """Calculate MACD (Moving Average Convergence Divergence)."""
    if len(prices) < slow + signal:
        return {
            "macd": 0.0,
            "signal": 0.0,
            "histogram": 0.0,
            "trend": "neutral"
        }
    
    ema_fast = prices.ewm(span=fast, adjust=False).mean()
    ema_slow = prices.ewm(span=slow, adjust=False).mean()
    
    macd_line = ema_fast - ema_slow
    signal_line = macd_line.ewm(span=signal, adjust=False).mean()
    histogram = macd_line - signal_line
    
    return {
        "macd": round(macd_line.iloc[-1], 2),
        "signal": round(signal_line.iloc[-1], 2),
        "histogram": round(histogram.iloc[-1], 2),
        "trend": "bullish" if histogram.iloc[-1] > 0 else "bearish"
    }


def calculate_bollinger_bands(prices: pd.Series, window: int = 20, num_std: int = 2) -> Dict:
    """Calculate Bollinger Bands."""
    if len(prices) < window:
        current_price = prices.iloc[-1] if not prices.empty else 0
        return {
            "upper": round(current_price, 2),
            "middle": round(current_price, 2),
            "lower": round(current_price, 2),
            "current_price": round(current_price, 2),
            "position": "neutral"
        }
    
    sma = prices.rolling(window=window).mean()
    std = prices.rolling(window=window).std()
    
    upper_band = sma + (std * num_std)
    lower_band = sma - (std * num_std)
    
    current_price = prices.iloc[-1]
    
    return {
        "upper": round(upper_band.iloc[-1], 2),
        "middle": round(sma.iloc[-1], 2),
        "lower": round(lower_band.iloc[-1], 2),
        "current_price": round(current_price, 2),
        "position": "overbought" if current_price > upper_band.iloc[-1] 
                   else "oversold" if current_price < lower_band.iloc[-1] 
                   else "neutral"
    }


def get_technical_analysis(ticker: str, period: str = "3mo") -> Dict:
    """
    Perform comprehensive technical analysis.
    
    Args:
        ticker: Stock ticker symbol
        period: Historical data period for analysis
        
    Returns:
        Dictionary with multiple technical indicators and signals
    """
    from datetime import datetime
    
    # Validate inputs
    is_valid_ticker, sanitized_ticker, error = validate_ticker(ticker)
    if not is_valid_ticker:
        return {"error": error, "error_code": "INVALID_TICKER"}
    
    is_valid_period, sanitized_period, error = validate_period(period)
    if not is_valid_period:
        return {"error": error, "error_code": "INVALID_PERIOD"}
    
    logger.info(f"Performing technical analysis: {sanitized_ticker}, {sanitized_period}")
    
    try:
        hist = fetch_historical_data(sanitized_ticker, sanitized_period, "1d")
        
        if hist.empty or len(hist) < 50:
            return {
                "error": f"Insufficient data for technical analysis of {sanitized_ticker}",
                "error_code": "INSUFFICIENT_DATA"
            }
        
        prices = hist['Close']
        
        # Calculate indicators
        sma_20 = calculate_sma(prices, 20)
        sma_50 = calculate_sma(prices, 50)
        sma_200 = calculate_sma(prices, 200) if len(hist) >= 200 else None
        ema_12 = calculate_ema(prices, 12)
        ema_26 = calculate_ema(prices, 26)
        rsi = calculate_rsi(prices)
        macd = calculate_macd(prices)
        bollinger = calculate_bollinger_bands(prices)
        
        current_price = prices.iloc[-1]
        
        # Generate trading signals
        signals = []
        
        if rsi < 30:
            signals.append("🟒 RSI oversold - potential BUY signal")
        elif rsi > 70:
            signals.append("πŸ”΄ RSI overbought - potential SELL signal")
        else:
            signals.append("🟑 RSI neutral")
        
        if macd['trend'] == 'bullish' and macd['histogram'] > 0:
            signals.append("🟒 MACD bullish crossover")
        elif macd['trend'] == 'bearish' and macd['histogram'] < 0:
            signals.append("πŸ”΄ MACD bearish crossover")
        
        if sma_200 is not None and current_price > sma_50.iloc[-1] > sma_200.iloc[-1]:
            signals.append("🟒 Golden Cross formation (bullish)")
        elif sma_200 is not None and current_price < sma_50.iloc[-1] < sma_200.iloc[-1]:
            signals.append("πŸ”΄ Death Cross formation (bearish)")
        
        if bollinger['position'] == 'oversold':
            signals.append("🟒 Price at lower Bollinger Band (potential bounce)")
        elif bollinger['position'] == 'overbought':
            signals.append("πŸ”΄ Price at upper Bollinger Band (potential pullback)")
        
        # Overall recommendation
        bullish_count = sum(1 for s in signals if '🟒' in s)
        bearish_count = sum(1 for s in signals if 'πŸ”΄' in s)
        
        if bullish_count > bearish_count:
            overall = "BUY"
        elif bearish_count > bullish_count:
            overall = "SELL"
        else:
            overall = "HOLD"
        
        return {
            "ticker": sanitized_ticker,
            "current_price": round(current_price, 2),
            "indicators": {
                "sma_20": round(sma_20.iloc[-1], 2),
                "sma_50": round(sma_50.iloc[-1], 2),
                "sma_200": round(sma_200.iloc[-1], 2) if sma_200 is not None else None,
                "ema_12": round(ema_12.iloc[-1], 2),
                "ema_26": round(ema_26.iloc[-1], 2),
                "rsi": rsi,
                "macd": macd,
                "bollinger_bands": bollinger
            },
            "signals": signals,
            "recommendation": overall,
            "confidence": f"{max(bullish_count, bearish_count)}/{len(signals)}",
            "timestamp": datetime.now().isoformat()
        }
    except Exception as e:
        logger.error(f"Error in technical analysis for {sanitized_ticker}: {e}")
        return {
            "error": "Unable to perform technical analysis",
            "error_code": "ANALYSIS_ERROR"
        }