Spaces:
Sleeping
Sleeping
| """ | |
| Tool: compute_technical_indicators | |
| Pure numpy/pandas implementations of standard technical indicators. | |
| No external TA library is required. | |
| Supported indicators: | |
| - RSI (Relative Strength Index) -- 14-period default | |
| - MACD (Moving Average Convergence/Divergence) -- (12, 26, 9) | |
| - Bollinger Bands -- 20-period, 2 std | |
| - VWAP (Volume Weighted Average Price) | |
| - SMA (Simple Moving Average) -- configurable | |
| - EMA (Exponential Moving Average) -- configurable | |
| - ATR (Average True Range) -- 14-period default | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from typing import Any | |
| import numpy as np | |
| import pandas as pd | |
| from tools.base import BaseTool, ToolResult | |
| logger = logging.getLogger(__name__) | |
| # ====================================================================== # | |
| # Indicator functions (standalone, testable) | |
| # ====================================================================== # | |
| def compute_rsi(closes: np.ndarray, period: int = 14) -> np.ndarray: | |
| """Relative Strength Index. | |
| RSI = 100 - 100 / (1 + RS) | |
| RS = avg_gain / avg_loss (Wilder smoothing) | |
| Returns an array the same length as *closes* with NaN for the first | |
| *period* entries where the indicator is undefined. | |
| """ | |
| deltas = np.diff(closes) | |
| gains = np.where(deltas > 0, deltas, 0.0) | |
| losses = np.where(deltas < 0, -deltas, 0.0) | |
| rsi = np.full(len(closes), np.nan) | |
| if len(gains) < period: | |
| return rsi | |
| # Seed with simple average. | |
| avg_gain = np.mean(gains[:period]) | |
| avg_loss = np.mean(losses[:period]) | |
| for i in range(period, len(gains)): | |
| avg_gain = (avg_gain * (period - 1) + gains[i]) / period | |
| avg_loss = (avg_loss * (period - 1) + losses[i]) / period | |
| if avg_loss == 0: | |
| rsi[i + 1] = 100.0 | |
| else: | |
| rs = avg_gain / avg_loss | |
| rsi[i + 1] = 100.0 - 100.0 / (1.0 + rs) | |
| # Fill the first valid value. | |
| if avg_loss == 0: | |
| rsi[period] = 100.0 | |
| else: | |
| rs = np.mean(gains[:period]) / np.mean(losses[:period]) if np.mean(losses[:period]) != 0 else float("inf") | |
| rsi[period] = 100.0 - 100.0 / (1.0 + rs) | |
| return rsi | |
| def compute_ema(values: np.ndarray, span: int) -> np.ndarray: | |
| """Exponential Moving Average. | |
| EMA_t = alpha * x_t + (1 - alpha) * EMA_{t-1} | |
| alpha = 2 / (span + 1) | |
| """ | |
| series = pd.Series(values) | |
| return series.ewm(span=span, adjust=False).mean().values | |
| def compute_sma(values: np.ndarray, window: int) -> np.ndarray: | |
| """Simple Moving Average with NaN for the warm-up period.""" | |
| series = pd.Series(values) | |
| return series.rolling(window=window).mean().values | |
| def compute_macd( | |
| closes: np.ndarray, | |
| fast: int = 12, | |
| slow: int = 26, | |
| signal: int = 9, | |
| ) -> dict[str, np.ndarray]: | |
| """Moving Average Convergence/Divergence. | |
| MACD Line = EMA(fast) - EMA(slow) | |
| Signal Line = EMA(MACD Line, signal) | |
| Histogram = MACD Line - Signal Line | |
| """ | |
| ema_fast = compute_ema(closes, fast) | |
| ema_slow = compute_ema(closes, slow) | |
| macd_line = ema_fast - ema_slow | |
| signal_line = compute_ema(macd_line, signal) | |
| histogram = macd_line - signal_line | |
| return { | |
| "macd": macd_line, | |
| "signal": signal_line, | |
| "histogram": histogram, | |
| } | |
| def compute_bollinger_bands( | |
| closes: np.ndarray, window: int = 20, num_std: float = 2.0 | |
| ) -> dict[str, np.ndarray]: | |
| """Bollinger Bands. | |
| Middle = SMA(window) | |
| Upper = Middle + num_std * rolling_std | |
| Lower = Middle - num_std * rolling_std | |
| """ | |
| series = pd.Series(closes) | |
| middle = series.rolling(window=window).mean().values | |
| std = series.rolling(window=window).std(ddof=0).values | |
| upper = middle + num_std * std | |
| lower = middle - num_std * std | |
| return {"upper": upper, "middle": middle, "lower": lower} | |
| def compute_vwap( | |
| highs: np.ndarray, | |
| lows: np.ndarray, | |
| closes: np.ndarray, | |
| volumes: np.ndarray, | |
| ) -> np.ndarray: | |
| """Volume Weighted Average Price. | |
| VWAP = cumsum(Typical_Price * Volume) / cumsum(Volume) | |
| Typical_Price = (High + Low + Close) / 3 | |
| """ | |
| typical = (highs + lows + closes) / 3.0 | |
| cum_tp_vol = np.cumsum(typical * volumes) | |
| cum_vol = np.cumsum(volumes.astype(float)) | |
| # Avoid division by zero. | |
| cum_vol = np.where(cum_vol == 0, np.nan, cum_vol) | |
| return cum_tp_vol / cum_vol | |
| def compute_atr( | |
| highs: np.ndarray, | |
| lows: np.ndarray, | |
| closes: np.ndarray, | |
| period: int = 14, | |
| ) -> np.ndarray: | |
| """Average True Range. | |
| TR = max(High-Low, |High-PrevClose|, |Low-PrevClose|) | |
| ATR = Wilder-smoothed TR over *period* bars. | |
| """ | |
| prev_close = np.roll(closes, 1) | |
| prev_close[0] = closes[0] | |
| tr1 = highs - lows | |
| tr2 = np.abs(highs - prev_close) | |
| tr3 = np.abs(lows - prev_close) | |
| true_range = np.maximum(tr1, np.maximum(tr2, tr3)) | |
| atr = np.full(len(closes), np.nan) | |
| if len(closes) < period: | |
| return atr | |
| atr[period - 1] = np.mean(true_range[:period]) | |
| for i in range(period, len(closes)): | |
| atr[i] = (atr[i - 1] * (period - 1) + true_range[i]) / period | |
| return atr | |
| # ====================================================================== # | |
| # Tool class | |
| # ====================================================================== # | |
| class ComputeTechnicalIndicatorsTool(BaseTool): | |
| name = "compute_technical_indicators" | |
| description = ( | |
| "Compute technical indicators (RSI, MACD, Bollinger Bands, VWAP, " | |
| "SMA, EMA, ATR) from OHLCV data. Provide either raw data arrays " | |
| "or a ticker (data will be fetched automatically)." | |
| ) | |
| parameters = { | |
| "type": "object", | |
| "properties": { | |
| "ticker": { | |
| "type": "string", | |
| "description": "Ticker symbol. If provided, OHLCV data is fetched automatically.", | |
| }, | |
| "period": { | |
| "type": "string", | |
| "description": "Lookback period when fetching data (e.g. 3mo, 1y).", | |
| "default": "3mo", | |
| }, | |
| "interval": { | |
| "type": "string", | |
| "description": "Bar interval when fetching data.", | |
| "default": "1d", | |
| }, | |
| "indicators": { | |
| "type": "array", | |
| "description": ( | |
| "Which indicators to compute. " | |
| "Options: rsi, macd, bollinger, vwap, sma, ema, atr, all. " | |
| "Defaults to all." | |
| ), | |
| "default": ["all"], | |
| }, | |
| "sma_period": { | |
| "type": "integer", | |
| "description": "SMA window.", | |
| "default": 20, | |
| }, | |
| "ema_period": { | |
| "type": "integer", | |
| "description": "EMA span.", | |
| "default": 20, | |
| }, | |
| "ohlcv_data": { | |
| "type": "object", | |
| "description": "Pre-fetched OHLCV data dict (optional).", | |
| }, | |
| }, | |
| "required": ["ticker"], | |
| } | |
| async def execute(self, **kwargs: Any) -> ToolResult: | |
| ticker: str = kwargs["ticker"].upper() | |
| period: str = kwargs.get("period", "3mo") | |
| interval: str = kwargs.get("interval", "1d") | |
| indicator_names: list[str] = kwargs.get("indicators", ["all"]) | |
| sma_period: int = kwargs.get("sma_period", 20) | |
| ema_period: int = kwargs.get("ema_period", 20) | |
| ohlcv: dict | None = kwargs.get("ohlcv_data") | |
| # Fetch data if not provided. | |
| if ohlcv is None: | |
| from tools.market_data import FetchMarketDataTool | |
| md_tool = FetchMarketDataTool() | |
| md_result = await md_tool.execute( | |
| ticker=ticker, interval=interval, period=period | |
| ) | |
| if not md_result.success: | |
| return ToolResult( | |
| success=False, | |
| error=f"Could not fetch data for {ticker}: {md_result.error}", | |
| ) | |
| ohlcv = md_result.data | |
| closes = np.array(ohlcv["close"], dtype=float) | |
| highs = np.array(ohlcv["high"], dtype=float) | |
| lows = np.array(ohlcv["low"], dtype=float) | |
| volumes = np.array(ohlcv["volume"], dtype=float) | |
| want_all = "all" in indicator_names | |
| results: dict[str, Any] = {"ticker": ticker} | |
| if want_all or "rsi" in indicator_names: | |
| rsi = compute_rsi(closes) | |
| current_rsi = float(rsi[~np.isnan(rsi)][-1]) if np.any(~np.isnan(rsi)) else None | |
| results["rsi"] = { | |
| "values": _to_list(rsi), | |
| "current": round(current_rsi, 2) if current_rsi is not None else None, | |
| "interpretation": _interpret_rsi(current_rsi), | |
| } | |
| if want_all or "macd" in indicator_names: | |
| macd = compute_macd(closes) | |
| results["macd"] = { | |
| "macd_line": _to_list(macd["macd"]), | |
| "signal_line": _to_list(macd["signal"]), | |
| "histogram": _to_list(macd["histogram"]), | |
| "current_macd": round(float(macd["macd"][-1]), 4), | |
| "current_signal": round(float(macd["signal"][-1]), 4), | |
| "interpretation": _interpret_macd( | |
| float(macd["macd"][-1]), float(macd["signal"][-1]) | |
| ), | |
| } | |
| if want_all or "bollinger" in indicator_names: | |
| bb = compute_bollinger_bands(closes) | |
| results["bollinger_bands"] = { | |
| "upper": _to_list(bb["upper"]), | |
| "middle": _to_list(bb["middle"]), | |
| "lower": _to_list(bb["lower"]), | |
| "current_price": round(float(closes[-1]), 2), | |
| "bandwidth": round( | |
| float((bb["upper"][-1] - bb["lower"][-1]) / bb["middle"][-1] * 100), 2 | |
| ) | |
| if not np.isnan(bb["middle"][-1]) | |
| else None, | |
| "interpretation": _interpret_bollinger( | |
| float(closes[-1]), | |
| float(bb["upper"][-1]), | |
| float(bb["lower"][-1]), | |
| ), | |
| } | |
| if want_all or "vwap" in indicator_names: | |
| vwap = compute_vwap(highs, lows, closes, volumes) | |
| results["vwap"] = { | |
| "values": _to_list(vwap), | |
| "current": round(float(vwap[-1]), 2) if not np.isnan(vwap[-1]) else None, | |
| "price_vs_vwap": "above" if closes[-1] > vwap[-1] else "below", | |
| } | |
| if want_all or "sma" in indicator_names: | |
| sma = compute_sma(closes, sma_period) | |
| results["sma"] = { | |
| "period": sma_period, | |
| "values": _to_list(sma), | |
| "current": round(float(sma[-1]), 2) if not np.isnan(sma[-1]) else None, | |
| } | |
| if want_all or "ema" in indicator_names: | |
| ema = compute_ema(closes, ema_period) | |
| results["ema"] = { | |
| "period": ema_period, | |
| "values": _to_list(ema), | |
| "current": round(float(ema[-1]), 2), | |
| } | |
| if want_all or "atr" in indicator_names: | |
| atr = compute_atr(highs, lows, closes) | |
| current_atr = float(atr[~np.isnan(atr)][-1]) if np.any(~np.isnan(atr)) else None | |
| results["atr"] = { | |
| "values": _to_list(atr), | |
| "current": round(current_atr, 2) if current_atr is not None else None, | |
| "atr_percent": ( | |
| round(current_atr / closes[-1] * 100, 2) | |
| if current_atr is not None | |
| else None | |
| ), | |
| } | |
| return ToolResult(success=True, data=results) | |
| # ====================================================================== # | |
| # Interpretation helpers | |
| # ====================================================================== # | |
| def _interpret_rsi(rsi_val: float | None) -> str: | |
| if rsi_val is None: | |
| return "Insufficient data" | |
| if rsi_val >= 70: | |
| return "Overbought (RSI >= 70) -- potential bearish reversal signal" | |
| if rsi_val <= 30: | |
| return "Oversold (RSI <= 30) -- potential bullish reversal signal" | |
| if rsi_val >= 60: | |
| return "Approaching overbought territory" | |
| if rsi_val <= 40: | |
| return "Approaching oversold territory" | |
| return "Neutral" | |
| def _interpret_macd(macd_val: float, signal_val: float) -> str: | |
| if macd_val > signal_val and macd_val > 0: | |
| return "Bullish -- MACD above signal and positive" | |
| if macd_val > signal_val: | |
| return "Bullish crossover -- MACD crossed above signal" | |
| if macd_val < signal_val and macd_val < 0: | |
| return "Bearish -- MACD below signal and negative" | |
| return "Bearish crossover -- MACD crossed below signal" | |
| def _interpret_bollinger(price: float, upper: float, lower: float) -> str: | |
| if np.isnan(upper) or np.isnan(lower): | |
| return "Insufficient data" | |
| if price >= upper: | |
| return "Price at/above upper band -- potentially overbought" | |
| if price <= lower: | |
| return "Price at/below lower band -- potentially oversold" | |
| mid = (upper + lower) / 2 | |
| if price > mid: | |
| return "Price in upper half of bands -- mild bullish bias" | |
| return "Price in lower half of bands -- mild bearish bias" | |
| def _to_list(arr: np.ndarray) -> list: | |
| """Convert numpy array to JSON-safe list, replacing NaN with None.""" | |
| return [None if np.isnan(v) else round(float(v), 4) for v in arr] | |