agentic-market-analyzer / tools /technical_indicators.py
WolfDavid's picture
Upload folder using huggingface_hub
75418e4 verified
"""
Tool: compute_technical_indicators
Pure numpy/pandas implementations of standard technical indicators.
No external TA library is required.
Supported indicators:
- RSI (Relative Strength Index) -- 14-period default
- MACD (Moving Average Convergence/Divergence) -- (12, 26, 9)
- Bollinger Bands -- 20-period, 2 std
- VWAP (Volume Weighted Average Price)
- SMA (Simple Moving Average) -- configurable
- EMA (Exponential Moving Average) -- configurable
- ATR (Average True Range) -- 14-period default
"""
from __future__ import annotations
import logging
from typing import Any
import numpy as np
import pandas as pd
from tools.base import BaseTool, ToolResult
logger = logging.getLogger(__name__)
# ====================================================================== #
# Indicator functions (standalone, testable)
# ====================================================================== #
def compute_rsi(closes: np.ndarray, period: int = 14) -> np.ndarray:
"""Relative Strength Index.
RSI = 100 - 100 / (1 + RS)
RS = avg_gain / avg_loss (Wilder smoothing)
Returns an array the same length as *closes* with NaN for the first
*period* entries where the indicator is undefined.
"""
deltas = np.diff(closes)
gains = np.where(deltas > 0, deltas, 0.0)
losses = np.where(deltas < 0, -deltas, 0.0)
rsi = np.full(len(closes), np.nan)
if len(gains) < period:
return rsi
# Seed with simple average.
avg_gain = np.mean(gains[:period])
avg_loss = np.mean(losses[:period])
for i in range(period, len(gains)):
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
if avg_loss == 0:
rsi[i + 1] = 100.0
else:
rs = avg_gain / avg_loss
rsi[i + 1] = 100.0 - 100.0 / (1.0 + rs)
# Fill the first valid value.
if avg_loss == 0:
rsi[period] = 100.0
else:
rs = np.mean(gains[:period]) / np.mean(losses[:period]) if np.mean(losses[:period]) != 0 else float("inf")
rsi[period] = 100.0 - 100.0 / (1.0 + rs)
return rsi
def compute_ema(values: np.ndarray, span: int) -> np.ndarray:
"""Exponential Moving Average.
EMA_t = alpha * x_t + (1 - alpha) * EMA_{t-1}
alpha = 2 / (span + 1)
"""
series = pd.Series(values)
return series.ewm(span=span, adjust=False).mean().values
def compute_sma(values: np.ndarray, window: int) -> np.ndarray:
"""Simple Moving Average with NaN for the warm-up period."""
series = pd.Series(values)
return series.rolling(window=window).mean().values
def compute_macd(
closes: np.ndarray,
fast: int = 12,
slow: int = 26,
signal: int = 9,
) -> dict[str, np.ndarray]:
"""Moving Average Convergence/Divergence.
MACD Line = EMA(fast) - EMA(slow)
Signal Line = EMA(MACD Line, signal)
Histogram = MACD Line - Signal Line
"""
ema_fast = compute_ema(closes, fast)
ema_slow = compute_ema(closes, slow)
macd_line = ema_fast - ema_slow
signal_line = compute_ema(macd_line, signal)
histogram = macd_line - signal_line
return {
"macd": macd_line,
"signal": signal_line,
"histogram": histogram,
}
def compute_bollinger_bands(
closes: np.ndarray, window: int = 20, num_std: float = 2.0
) -> dict[str, np.ndarray]:
"""Bollinger Bands.
Middle = SMA(window)
Upper = Middle + num_std * rolling_std
Lower = Middle - num_std * rolling_std
"""
series = pd.Series(closes)
middle = series.rolling(window=window).mean().values
std = series.rolling(window=window).std(ddof=0).values
upper = middle + num_std * std
lower = middle - num_std * std
return {"upper": upper, "middle": middle, "lower": lower}
def compute_vwap(
highs: np.ndarray,
lows: np.ndarray,
closes: np.ndarray,
volumes: np.ndarray,
) -> np.ndarray:
"""Volume Weighted Average Price.
VWAP = cumsum(Typical_Price * Volume) / cumsum(Volume)
Typical_Price = (High + Low + Close) / 3
"""
typical = (highs + lows + closes) / 3.0
cum_tp_vol = np.cumsum(typical * volumes)
cum_vol = np.cumsum(volumes.astype(float))
# Avoid division by zero.
cum_vol = np.where(cum_vol == 0, np.nan, cum_vol)
return cum_tp_vol / cum_vol
def compute_atr(
highs: np.ndarray,
lows: np.ndarray,
closes: np.ndarray,
period: int = 14,
) -> np.ndarray:
"""Average True Range.
TR = max(High-Low, |High-PrevClose|, |Low-PrevClose|)
ATR = Wilder-smoothed TR over *period* bars.
"""
prev_close = np.roll(closes, 1)
prev_close[0] = closes[0]
tr1 = highs - lows
tr2 = np.abs(highs - prev_close)
tr3 = np.abs(lows - prev_close)
true_range = np.maximum(tr1, np.maximum(tr2, tr3))
atr = np.full(len(closes), np.nan)
if len(closes) < period:
return atr
atr[period - 1] = np.mean(true_range[:period])
for i in range(period, len(closes)):
atr[i] = (atr[i - 1] * (period - 1) + true_range[i]) / period
return atr
# ====================================================================== #
# Tool class
# ====================================================================== #
class ComputeTechnicalIndicatorsTool(BaseTool):
name = "compute_technical_indicators"
description = (
"Compute technical indicators (RSI, MACD, Bollinger Bands, VWAP, "
"SMA, EMA, ATR) from OHLCV data. Provide either raw data arrays "
"or a ticker (data will be fetched automatically)."
)
parameters = {
"type": "object",
"properties": {
"ticker": {
"type": "string",
"description": "Ticker symbol. If provided, OHLCV data is fetched automatically.",
},
"period": {
"type": "string",
"description": "Lookback period when fetching data (e.g. 3mo, 1y).",
"default": "3mo",
},
"interval": {
"type": "string",
"description": "Bar interval when fetching data.",
"default": "1d",
},
"indicators": {
"type": "array",
"description": (
"Which indicators to compute. "
"Options: rsi, macd, bollinger, vwap, sma, ema, atr, all. "
"Defaults to all."
),
"default": ["all"],
},
"sma_period": {
"type": "integer",
"description": "SMA window.",
"default": 20,
},
"ema_period": {
"type": "integer",
"description": "EMA span.",
"default": 20,
},
"ohlcv_data": {
"type": "object",
"description": "Pre-fetched OHLCV data dict (optional).",
},
},
"required": ["ticker"],
}
async def execute(self, **kwargs: Any) -> ToolResult:
ticker: str = kwargs["ticker"].upper()
period: str = kwargs.get("period", "3mo")
interval: str = kwargs.get("interval", "1d")
indicator_names: list[str] = kwargs.get("indicators", ["all"])
sma_period: int = kwargs.get("sma_period", 20)
ema_period: int = kwargs.get("ema_period", 20)
ohlcv: dict | None = kwargs.get("ohlcv_data")
# Fetch data if not provided.
if ohlcv is None:
from tools.market_data import FetchMarketDataTool
md_tool = FetchMarketDataTool()
md_result = await md_tool.execute(
ticker=ticker, interval=interval, period=period
)
if not md_result.success:
return ToolResult(
success=False,
error=f"Could not fetch data for {ticker}: {md_result.error}",
)
ohlcv = md_result.data
closes = np.array(ohlcv["close"], dtype=float)
highs = np.array(ohlcv["high"], dtype=float)
lows = np.array(ohlcv["low"], dtype=float)
volumes = np.array(ohlcv["volume"], dtype=float)
want_all = "all" in indicator_names
results: dict[str, Any] = {"ticker": ticker}
if want_all or "rsi" in indicator_names:
rsi = compute_rsi(closes)
current_rsi = float(rsi[~np.isnan(rsi)][-1]) if np.any(~np.isnan(rsi)) else None
results["rsi"] = {
"values": _to_list(rsi),
"current": round(current_rsi, 2) if current_rsi is not None else None,
"interpretation": _interpret_rsi(current_rsi),
}
if want_all or "macd" in indicator_names:
macd = compute_macd(closes)
results["macd"] = {
"macd_line": _to_list(macd["macd"]),
"signal_line": _to_list(macd["signal"]),
"histogram": _to_list(macd["histogram"]),
"current_macd": round(float(macd["macd"][-1]), 4),
"current_signal": round(float(macd["signal"][-1]), 4),
"interpretation": _interpret_macd(
float(macd["macd"][-1]), float(macd["signal"][-1])
),
}
if want_all or "bollinger" in indicator_names:
bb = compute_bollinger_bands(closes)
results["bollinger_bands"] = {
"upper": _to_list(bb["upper"]),
"middle": _to_list(bb["middle"]),
"lower": _to_list(bb["lower"]),
"current_price": round(float(closes[-1]), 2),
"bandwidth": round(
float((bb["upper"][-1] - bb["lower"][-1]) / bb["middle"][-1] * 100), 2
)
if not np.isnan(bb["middle"][-1])
else None,
"interpretation": _interpret_bollinger(
float(closes[-1]),
float(bb["upper"][-1]),
float(bb["lower"][-1]),
),
}
if want_all or "vwap" in indicator_names:
vwap = compute_vwap(highs, lows, closes, volumes)
results["vwap"] = {
"values": _to_list(vwap),
"current": round(float(vwap[-1]), 2) if not np.isnan(vwap[-1]) else None,
"price_vs_vwap": "above" if closes[-1] > vwap[-1] else "below",
}
if want_all or "sma" in indicator_names:
sma = compute_sma(closes, sma_period)
results["sma"] = {
"period": sma_period,
"values": _to_list(sma),
"current": round(float(sma[-1]), 2) if not np.isnan(sma[-1]) else None,
}
if want_all or "ema" in indicator_names:
ema = compute_ema(closes, ema_period)
results["ema"] = {
"period": ema_period,
"values": _to_list(ema),
"current": round(float(ema[-1]), 2),
}
if want_all or "atr" in indicator_names:
atr = compute_atr(highs, lows, closes)
current_atr = float(atr[~np.isnan(atr)][-1]) if np.any(~np.isnan(atr)) else None
results["atr"] = {
"values": _to_list(atr),
"current": round(current_atr, 2) if current_atr is not None else None,
"atr_percent": (
round(current_atr / closes[-1] * 100, 2)
if current_atr is not None
else None
),
}
return ToolResult(success=True, data=results)
# ====================================================================== #
# Interpretation helpers
# ====================================================================== #
def _interpret_rsi(rsi_val: float | None) -> str:
if rsi_val is None:
return "Insufficient data"
if rsi_val >= 70:
return "Overbought (RSI >= 70) -- potential bearish reversal signal"
if rsi_val <= 30:
return "Oversold (RSI <= 30) -- potential bullish reversal signal"
if rsi_val >= 60:
return "Approaching overbought territory"
if rsi_val <= 40:
return "Approaching oversold territory"
return "Neutral"
def _interpret_macd(macd_val: float, signal_val: float) -> str:
if macd_val > signal_val and macd_val > 0:
return "Bullish -- MACD above signal and positive"
if macd_val > signal_val:
return "Bullish crossover -- MACD crossed above signal"
if macd_val < signal_val and macd_val < 0:
return "Bearish -- MACD below signal and negative"
return "Bearish crossover -- MACD crossed below signal"
def _interpret_bollinger(price: float, upper: float, lower: float) -> str:
if np.isnan(upper) or np.isnan(lower):
return "Insufficient data"
if price >= upper:
return "Price at/above upper band -- potentially overbought"
if price <= lower:
return "Price at/below lower band -- potentially oversold"
mid = (upper + lower) / 2
if price > mid:
return "Price in upper half of bands -- mild bullish bias"
return "Price in lower half of bands -- mild bearish bias"
def _to_list(arr: np.ndarray) -> list:
"""Convert numpy array to JSON-safe list, replacing NaN with None."""
return [None if np.isnan(v) else round(float(v), 4) for v in arr]