""" Tool: compute_technical_indicators Pure numpy/pandas implementations of standard technical indicators. No external TA library is required. Supported indicators: - RSI (Relative Strength Index) -- 14-period default - MACD (Moving Average Convergence/Divergence) -- (12, 26, 9) - Bollinger Bands -- 20-period, 2 std - VWAP (Volume Weighted Average Price) - SMA (Simple Moving Average) -- configurable - EMA (Exponential Moving Average) -- configurable - ATR (Average True Range) -- 14-period default """ from __future__ import annotations import logging from typing import Any import numpy as np import pandas as pd from tools.base import BaseTool, ToolResult logger = logging.getLogger(__name__) # ====================================================================== # # Indicator functions (standalone, testable) # ====================================================================== # def compute_rsi(closes: np.ndarray, period: int = 14) -> np.ndarray: """Relative Strength Index. RSI = 100 - 100 / (1 + RS) RS = avg_gain / avg_loss (Wilder smoothing) Returns an array the same length as *closes* with NaN for the first *period* entries where the indicator is undefined. """ deltas = np.diff(closes) gains = np.where(deltas > 0, deltas, 0.0) losses = np.where(deltas < 0, -deltas, 0.0) rsi = np.full(len(closes), np.nan) if len(gains) < period: return rsi # Seed with simple average. avg_gain = np.mean(gains[:period]) avg_loss = np.mean(losses[:period]) for i in range(period, len(gains)): avg_gain = (avg_gain * (period - 1) + gains[i]) / period avg_loss = (avg_loss * (period - 1) + losses[i]) / period if avg_loss == 0: rsi[i + 1] = 100.0 else: rs = avg_gain / avg_loss rsi[i + 1] = 100.0 - 100.0 / (1.0 + rs) # Fill the first valid value. if avg_loss == 0: rsi[period] = 100.0 else: rs = np.mean(gains[:period]) / np.mean(losses[:period]) if np.mean(losses[:period]) != 0 else float("inf") rsi[period] = 100.0 - 100.0 / (1.0 + rs) return rsi def compute_ema(values: np.ndarray, span: int) -> np.ndarray: """Exponential Moving Average. EMA_t = alpha * x_t + (1 - alpha) * EMA_{t-1} alpha = 2 / (span + 1) """ series = pd.Series(values) return series.ewm(span=span, adjust=False).mean().values def compute_sma(values: np.ndarray, window: int) -> np.ndarray: """Simple Moving Average with NaN for the warm-up period.""" series = pd.Series(values) return series.rolling(window=window).mean().values def compute_macd( closes: np.ndarray, fast: int = 12, slow: int = 26, signal: int = 9, ) -> dict[str, np.ndarray]: """Moving Average Convergence/Divergence. MACD Line = EMA(fast) - EMA(slow) Signal Line = EMA(MACD Line, signal) Histogram = MACD Line - Signal Line """ ema_fast = compute_ema(closes, fast) ema_slow = compute_ema(closes, slow) macd_line = ema_fast - ema_slow signal_line = compute_ema(macd_line, signal) histogram = macd_line - signal_line return { "macd": macd_line, "signal": signal_line, "histogram": histogram, } def compute_bollinger_bands( closes: np.ndarray, window: int = 20, num_std: float = 2.0 ) -> dict[str, np.ndarray]: """Bollinger Bands. Middle = SMA(window) Upper = Middle + num_std * rolling_std Lower = Middle - num_std * rolling_std """ series = pd.Series(closes) middle = series.rolling(window=window).mean().values std = series.rolling(window=window).std(ddof=0).values upper = middle + num_std * std lower = middle - num_std * std return {"upper": upper, "middle": middle, "lower": lower} def compute_vwap( highs: np.ndarray, lows: np.ndarray, closes: np.ndarray, volumes: np.ndarray, ) -> np.ndarray: """Volume Weighted Average Price. VWAP = cumsum(Typical_Price * Volume) / cumsum(Volume) Typical_Price = (High + Low + Close) / 3 """ typical = (highs + lows + closes) / 3.0 cum_tp_vol = np.cumsum(typical * volumes) cum_vol = np.cumsum(volumes.astype(float)) # Avoid division by zero. cum_vol = np.where(cum_vol == 0, np.nan, cum_vol) return cum_tp_vol / cum_vol def compute_atr( highs: np.ndarray, lows: np.ndarray, closes: np.ndarray, period: int = 14, ) -> np.ndarray: """Average True Range. TR = max(High-Low, |High-PrevClose|, |Low-PrevClose|) ATR = Wilder-smoothed TR over *period* bars. """ prev_close = np.roll(closes, 1) prev_close[0] = closes[0] tr1 = highs - lows tr2 = np.abs(highs - prev_close) tr3 = np.abs(lows - prev_close) true_range = np.maximum(tr1, np.maximum(tr2, tr3)) atr = np.full(len(closes), np.nan) if len(closes) < period: return atr atr[period - 1] = np.mean(true_range[:period]) for i in range(period, len(closes)): atr[i] = (atr[i - 1] * (period - 1) + true_range[i]) / period return atr # ====================================================================== # # Tool class # ====================================================================== # class ComputeTechnicalIndicatorsTool(BaseTool): name = "compute_technical_indicators" description = ( "Compute technical indicators (RSI, MACD, Bollinger Bands, VWAP, " "SMA, EMA, ATR) from OHLCV data. Provide either raw data arrays " "or a ticker (data will be fetched automatically)." ) parameters = { "type": "object", "properties": { "ticker": { "type": "string", "description": "Ticker symbol. If provided, OHLCV data is fetched automatically.", }, "period": { "type": "string", "description": "Lookback period when fetching data (e.g. 3mo, 1y).", "default": "3mo", }, "interval": { "type": "string", "description": "Bar interval when fetching data.", "default": "1d", }, "indicators": { "type": "array", "description": ( "Which indicators to compute. " "Options: rsi, macd, bollinger, vwap, sma, ema, atr, all. " "Defaults to all." ), "default": ["all"], }, "sma_period": { "type": "integer", "description": "SMA window.", "default": 20, }, "ema_period": { "type": "integer", "description": "EMA span.", "default": 20, }, "ohlcv_data": { "type": "object", "description": "Pre-fetched OHLCV data dict (optional).", }, }, "required": ["ticker"], } async def execute(self, **kwargs: Any) -> ToolResult: ticker: str = kwargs["ticker"].upper() period: str = kwargs.get("period", "3mo") interval: str = kwargs.get("interval", "1d") indicator_names: list[str] = kwargs.get("indicators", ["all"]) sma_period: int = kwargs.get("sma_period", 20) ema_period: int = kwargs.get("ema_period", 20) ohlcv: dict | None = kwargs.get("ohlcv_data") # Fetch data if not provided. if ohlcv is None: from tools.market_data import FetchMarketDataTool md_tool = FetchMarketDataTool() md_result = await md_tool.execute( ticker=ticker, interval=interval, period=period ) if not md_result.success: return ToolResult( success=False, error=f"Could not fetch data for {ticker}: {md_result.error}", ) ohlcv = md_result.data closes = np.array(ohlcv["close"], dtype=float) highs = np.array(ohlcv["high"], dtype=float) lows = np.array(ohlcv["low"], dtype=float) volumes = np.array(ohlcv["volume"], dtype=float) want_all = "all" in indicator_names results: dict[str, Any] = {"ticker": ticker} if want_all or "rsi" in indicator_names: rsi = compute_rsi(closes) current_rsi = float(rsi[~np.isnan(rsi)][-1]) if np.any(~np.isnan(rsi)) else None results["rsi"] = { "values": _to_list(rsi), "current": round(current_rsi, 2) if current_rsi is not None else None, "interpretation": _interpret_rsi(current_rsi), } if want_all or "macd" in indicator_names: macd = compute_macd(closes) results["macd"] = { "macd_line": _to_list(macd["macd"]), "signal_line": _to_list(macd["signal"]), "histogram": _to_list(macd["histogram"]), "current_macd": round(float(macd["macd"][-1]), 4), "current_signal": round(float(macd["signal"][-1]), 4), "interpretation": _interpret_macd( float(macd["macd"][-1]), float(macd["signal"][-1]) ), } if want_all or "bollinger" in indicator_names: bb = compute_bollinger_bands(closes) results["bollinger_bands"] = { "upper": _to_list(bb["upper"]), "middle": _to_list(bb["middle"]), "lower": _to_list(bb["lower"]), "current_price": round(float(closes[-1]), 2), "bandwidth": round( float((bb["upper"][-1] - bb["lower"][-1]) / bb["middle"][-1] * 100), 2 ) if not np.isnan(bb["middle"][-1]) else None, "interpretation": _interpret_bollinger( float(closes[-1]), float(bb["upper"][-1]), float(bb["lower"][-1]), ), } if want_all or "vwap" in indicator_names: vwap = compute_vwap(highs, lows, closes, volumes) results["vwap"] = { "values": _to_list(vwap), "current": round(float(vwap[-1]), 2) if not np.isnan(vwap[-1]) else None, "price_vs_vwap": "above" if closes[-1] > vwap[-1] else "below", } if want_all or "sma" in indicator_names: sma = compute_sma(closes, sma_period) results["sma"] = { "period": sma_period, "values": _to_list(sma), "current": round(float(sma[-1]), 2) if not np.isnan(sma[-1]) else None, } if want_all or "ema" in indicator_names: ema = compute_ema(closes, ema_period) results["ema"] = { "period": ema_period, "values": _to_list(ema), "current": round(float(ema[-1]), 2), } if want_all or "atr" in indicator_names: atr = compute_atr(highs, lows, closes) current_atr = float(atr[~np.isnan(atr)][-1]) if np.any(~np.isnan(atr)) else None results["atr"] = { "values": _to_list(atr), "current": round(current_atr, 2) if current_atr is not None else None, "atr_percent": ( round(current_atr / closes[-1] * 100, 2) if current_atr is not None else None ), } return ToolResult(success=True, data=results) # ====================================================================== # # Interpretation helpers # ====================================================================== # def _interpret_rsi(rsi_val: float | None) -> str: if rsi_val is None: return "Insufficient data" if rsi_val >= 70: return "Overbought (RSI >= 70) -- potential bearish reversal signal" if rsi_val <= 30: return "Oversold (RSI <= 30) -- potential bullish reversal signal" if rsi_val >= 60: return "Approaching overbought territory" if rsi_val <= 40: return "Approaching oversold territory" return "Neutral" def _interpret_macd(macd_val: float, signal_val: float) -> str: if macd_val > signal_val and macd_val > 0: return "Bullish -- MACD above signal and positive" if macd_val > signal_val: return "Bullish crossover -- MACD crossed above signal" if macd_val < signal_val and macd_val < 0: return "Bearish -- MACD below signal and negative" return "Bearish crossover -- MACD crossed below signal" def _interpret_bollinger(price: float, upper: float, lower: float) -> str: if np.isnan(upper) or np.isnan(lower): return "Insufficient data" if price >= upper: return "Price at/above upper band -- potentially overbought" if price <= lower: return "Price at/below lower band -- potentially oversold" mid = (upper + lower) / 2 if price > mid: return "Price in upper half of bands -- mild bullish bias" return "Price in lower half of bands -- mild bearish bias" def _to_list(arr: np.ndarray) -> list: """Convert numpy array to JSON-safe list, replacing NaN with None.""" return [None if np.isnan(v) else round(float(v), 4) for v in arr]