# hl_indicators.py # pip install: hyperliquid-python-sdk pandas numpy from __future__ import annotations from typing import Dict, Any, List, Tuple, Literal, Iterable import time import numpy as np import pandas as pd from hyperliquid.info import Info from hyperliquid.utils import constants Interval = Literal["1m", "5m", "15m", "1h", "4h", "1d"] _MS = {"1m": 60_000, "5m": 5*60_000, "15m": 15*60_000, "1h": 60*60_000, "4h": 4*60*60_000, "1d": 24*60*60_000} def _now_ms() -> int: return int(time.time() * 1000) def _start_end_from_limit(interval: Interval, limit: int, end_ms: int | None = None) -> tuple[int, int]: end_ms = end_ms or _now_ms() span = (limit + 2) * _MS[interval] # small buffer for smoothing windows start_ms = max(0, end_ms - span) return start_ms, end_ms # ---------------- Data fetch via candles_snapshot ---------------- # def fetch_candles( name: str, interval: Interval = "1h", limit: int = 600, testnet: bool = True, end_ms: int | None = None, ) -> pd.DataFrame: """ Fetch OHLCV candles via Info.candles_snapshot(name, interval, startTime, endTime). Returns DataFrame with ['timestamp','open','high','low','close','volume'] sorted by time. """ api_url = constants.TESTNET_API_URL if testnet else constants.MAINNET_API_URL info = Info(api_url, skip_ws=True) start_ms, end_ms = _start_end_from_limit(interval, limit, end_ms) raw = info.candles_snapshot(name, interval, start_ms, end_ms) if not raw: raise ValueError(f"No candles returned for {name} {interval}") df = pd.DataFrame(raw).rename(columns={ "t": "timestamp", "o": "open", "h": "high", "l": "low", "c": "close", "v": "volume", "T": "close_time", "i": "interval", "s": "symbol", "n": "trades", }) needed = ["timestamp", "open", "high", "low", "close", "volume"] for k in needed: if k not in df.columns: raise ValueError(f"Missing '{k}' in candles_snapshot payload. Got: {list(df.columns)}") df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms", errors="coerce") for k in ["open","high","low","close","volume"]: df[k] = pd.to_numeric(df[k], errors="coerce") df = df.dropna(subset=["timestamp","close"]).sort_values("timestamp").reset_index(drop=True) if len(df) > limit: df = df.iloc[-limit:].reset_index(drop=True) return df # ---------------- Base indicators ---------------- # def ema(series: pd.Series, period: int) -> pd.Series: return series.ewm(span=period, adjust=False).mean() def macd(series: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series, pd.Series]: fast_ema, slow_ema = ema(series, fast), ema(series, slow) line = fast_ema - slow_ema sig = ema(line, signal) hist = line - sig return line, sig, hist def rsi(series: pd.Series, period: int = 14) -> pd.Series: delta = series.diff() up = pd.Series(np.where(delta > 0, delta, 0.0), index=series.index) down = pd.Series(np.where(delta < 0, -delta, 0.0), index=series.index) avg_up = up.ewm(alpha=1/period, adjust=False).mean() avg_down = down.ewm(alpha=1/period, adjust=False).mean() rs = avg_up / avg_down.replace(0, np.nan) return (100 - (100 / (1 + rs))).fillna(0) def stoch_rsi(series: pd.Series, rsi_length: int = 14, stoch_length: int = 14, k_smooth: int = 3, d_smooth: int = 3 ) -> Tuple[pd.Series, pd.Series, pd.Series]: r = rsi(series, rsi_length) r_low, r_high = r.rolling(stoch_length).min(), r.rolling(stoch_length).max() base = (r - r_low) / (r_high - r_low) k = base.rolling(k_smooth).mean() * 100.0 d = k.rolling(d_smooth).mean() return base * 100.0, k, d # ---------------- Volume/volatility family ---------------- # def adl(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series) -> pd.Series: """ Chaikin Accumulation/Distribution Line. mfm = ((close - low) - (high - close)) / (high - low), guarded for zero range. ADL = cumulative sum(mfm * volume) """ hl_range = (high - low).replace(0, np.nan) mfm = ((close - low) - (high - close)) / hl_range mfm = mfm.fillna(0.0) mfv = mfm * volume return mfv.cumsum() def obv(close: pd.Series, volume: pd.Series) -> pd.Series: """ On-Balance Volume. """ sign = np.sign(close.diff()).fillna(0) return (volume * sign).cumsum() def true_range(high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series: prev_close = close.shift(1) tr = pd.concat([(high - low).abs(), (high - prev_close).abs(), (low - prev_close).abs()], axis=1).max(axis=1) return tr def atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: tr = true_range(high, low, close) return tr.ewm(alpha=1/period, adjust=False).mean() def di_adx(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14 ) -> Tuple[pd.Series, pd.Series, pd.Series]: up_move = high.diff() down_move = -low.diff() plus_dm = pd.Series(np.where((up_move > down_move) & (up_move > 0), up_move, 0.0), index=high.index) minus_dm = pd.Series(np.where((down_move > up_move) & (down_move > 0), down_move, 0.0), index=high.index) atr_series = atr(high, low, close, period) plus_di = 100 * (plus_dm.ewm(alpha=1/period, adjust=False).mean() / atr_series.replace(0, np.nan)) minus_di = 100 * (minus_dm.ewm(alpha=1/period, adjust=False).mean() / atr_series.replace(0, np.nan)) dx = (100 * (plus_di - minus_di).abs() / (plus_di + minus_di).replace(0, np.nan)).fillna(0) adx = dx.ewm(alpha=1/period, adjust=False).mean() return plus_di.fillna(0), minus_di.fillna(0), adx.fillna(0) def bbands(series: pd.Series, period: int = 20, std_mult: float = 2.0 ) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series, pd.Series]: ma = series.rolling(period).mean() sd = series.rolling(period).std(ddof=0) upper = ma + std_mult * sd lower = ma - std_mult * sd pct_b = (series - lower) / (upper - lower) bandwidth = (upper - lower) / ma return ma, upper, lower, pct_b, bandwidth def mfi(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series, period: int = 14) -> pd.Series: tp = (high + low + close) / 3.0 rmf = tp * volume pos_flow = pd.Series(np.where(tp > tp.shift(1), rmf, 0.0), index=tp.index).rolling(period).sum() neg_flow = pd.Series(np.where(tp < tp.shift(1), rmf, 0.0), index=tp.index).rolling(period).sum() money_ratio = pos_flow / neg_flow.replace(0, np.nan) out = 100 - (100 / (1 + money_ratio)) return out.fillna(0) def vwap_cumulative(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series) -> pd.Series: """ Cumulative VWAP over the full series: sum(TP*V)/sum(V) where TP=(H+L+C)/3. Resets only at the beginning (not each day). """ tp = (high + low + close) / 3.0 cum_v = volume.cumsum().replace(0, np.nan) cum_tp_v = (tp * volume).cumsum() return (cum_tp_v / cum_v).fillna(method="bfill").fillna(0) def vwap_daily(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series, timestamps: pd.Series) -> pd.Series: """ Session VWAP that resets daily (by calendar date of 'timestamps'). """ tp = (high + low + close) / 3.0 dates = pd.to_datetime(timestamps).dt.date df = pd.DataFrame({"tp": tp, "v": volume, "date": dates}) df["tpv"] = df["tp"] * df["v"] cum = df.groupby("date")[["tpv", "v"]].cumsum() vwap = (cum["tpv"] / cum["v"].replace(0, np.nan)).values return pd.Series(vwap, index=high.index).fillna(method="bfill").fillna(0) # ---------------- JSON helpers ---------------- # def _pts(ts: pd.Series, vals: pd.Series) -> List[Dict[str, float]]: out: List[Dict[str, float]] = [] for t, v in zip(ts, vals): if pd.isna(t) or pd.isna(v): continue out.append({"t": int(pd.Timestamp(t).timestamp() * 1000), "v": float(v)}) return out def _tail_pts(ts: pd.Series, vals: pd.Series, n: int) -> List[Dict[str, float]]: """Return only the last n timestamp/value points (safe if n > len).""" if n is None or n <= 0: return _pts(ts, vals) tail_ts = ts.iloc[-n:] if len(ts) > n else ts tail_vals = vals.iloc[-n:] if len(vals) > n else vals return _pts(tail_ts, tail_vals) # ---------------- MCP-friendly functions (per indicator) ---------------- # def get_ema( name: str, periods: List[int] | None = None, interval: Interval = "1h", limit: int = 300, testnet: bool = False, output_tail: int = 30, # NEW ) -> Dict[str, Any]: periods = periods or [20, 200] df = fetch_candles(name, interval, limit, testnet) res: Dict[str, Any] = { "coin": name, "interval": interval, "ema": {}, "close": _tail_pts(df["timestamp"], df["close"], output_tail), # sliced "last": {"close": float(df["close"].iloc[-1])}, } for p in periods: e = ema(df["close"], p) res["ema"][str(p)] = _tail_pts(df["timestamp"], e, output_tail) # sliced res["last"][f"ema_{p}"] = float(e.iloc[-1]) return res def get_macd( name: str, fast: int = 12, slow: int = 26, signal: int = 9, interval: Interval = "1h", limit: int = 300, testnet: bool = False, output_tail: int = 30, # NEW ) -> Dict[str, Any]: df = fetch_candles(name, interval, limit, testnet) line, sig, hist = macd(df["close"], fast, slow, signal) return { "coin": name, "interval": interval, "params": {"fast": fast, "slow": slow, "signal": signal}, "macd_line": _tail_pts(df["timestamp"], line, output_tail), # sliced "signal": _tail_pts(df["timestamp"], sig, output_tail), # sliced "histogram": _tail_pts(df["timestamp"], hist, output_tail), # sliced "last": { "macd_line": float(line.iloc[-1]), "signal": float(sig.iloc[-1]), "histogram": float(hist.iloc[-1]), "close": float(df["close"].iloc[-1]), }, } def get_stoch_rsi( name: str, rsi_length: int = 14, stoch_length: int = 14, k_smooth: int = 3, d_smooth: int = 3, interval: Interval = "1h", limit: int = 300, testnet: bool = False, output_tail: int = 30, # NEW ) -> Dict[str, Any]: df = fetch_candles(name, interval, limit, testnet) stoch, k, d = stoch_rsi(df["close"], rsi_length, stoch_length, k_smooth, d_smooth) return { "coin": name, "interval": interval, "params": { "rsi_length": rsi_length, "stoch_length": stoch_length, "k_smooth": k_smooth, "d_smooth": d_smooth, }, "stoch_rsi": _tail_pts(df["timestamp"], stoch, output_tail), # sliced "%K": _tail_pts(df["timestamp"], k, output_tail), # sliced "%D": _tail_pts(df["timestamp"], d, output_tail), # sliced "last": { "stoch_rsi": float(stoch.iloc[-1]), "k": float(k.iloc[-1]), "d": float(d.iloc[-1]), "close": float(df["close"].iloc[-1]), }, } def get_adl( name: str, interval: Interval = "1h", limit: int = 300, testnet: bool = False, output_tail: int = 30, # NEW ) -> Dict[str, Any]: df = fetch_candles(name, interval, limit, testnet) series = adl(df["high"], df["low"], df["close"], df["volume"]) return { "coin": name, "interval": interval, "adl": _tail_pts(df["timestamp"], series, output_tail), # sliced "last": {"adl": float(series.iloc[-1])}, } def get_obv( name: str, interval: Interval = "1h", limit: int = 300, testnet: bool = False, output_tail: int = 30, # NEW ) -> Dict[str, Any]: df = fetch_candles(name, interval, limit, testnet) series = obv(df["close"], df["volume"]) return { "coin": name, "interval": interval, "obv": _tail_pts(df["timestamp"], series, output_tail), # sliced "last": {"obv": float(series.iloc[-1])}, } def get_atr_adx( name: str, period: int = 14, interval: Interval = "1h", limit: int = 300, testnet: bool = False, output_tail: int = 30, # NEW ) -> Dict[str, Any]: df = fetch_candles(name, interval, limit, testnet) plus_di, minus_di, adx_series = di_adx(df["high"], df["low"], df["close"], period) atr_series = atr(df["high"], df["low"], df["close"], period) return { "coin": name, "interval": interval, "params": {"period": period}, "+DI": _tail_pts(df["timestamp"], plus_di, output_tail), # sliced "-DI": _tail_pts(df["timestamp"], minus_di, output_tail), # sliced "ADX": _tail_pts(df["timestamp"], adx_series, output_tail),# sliced "ATR": _tail_pts(df["timestamp"], atr_series, output_tail),# sliced "last": { "+DI": float(plus_di.iloc[-1]), "-DI": float(minus_di.iloc[-1]), "ADX": float(adx_series.iloc[-1]), "ATR": float(atr_series.iloc[-1]), }, } def get_bbands( name: str, period: int = 20, std_mult: float = 2.0, interval: Interval = "1h", limit: int = 300, testnet: bool = False, output_tail: int = 30, # NEW ) -> Dict[str, Any]: df = fetch_candles(name, interval, limit, testnet) ma, upper, lower, pct_b, bandwidth = bbands(df["close"], period, std_mult) return { "coin": name, "interval": interval, "params": {"period": period, "std_mult": std_mult}, "basis": _tail_pts(df["timestamp"], ma, output_tail), # sliced "upper": _tail_pts(df["timestamp"], upper, output_tail), # sliced "lower": _tail_pts(df["timestamp"], lower, output_tail), # sliced "%b": _tail_pts(df["timestamp"], pct_b, output_tail), # sliced "bandwidth": _tail_pts(df["timestamp"], bandwidth, output_tail), # sliced "last": { "basis": float(ma.iloc[-1]), "upper": float(upper.iloc[-1]), "lower": float(lower.iloc[-1]), "%b": float(pct_b.iloc[-1]), "bandwidth": float(bandwidth.iloc[-1]), }, } def get_mfi( name: str, period: int = 14, interval: Interval = "1h", limit: int = 300, testnet: bool = False, output_tail: int = 30, # NEW ) -> Dict[str, Any]: df = fetch_candles(name, interval, limit, testnet) series = mfi(df["high"], df["low"], df["close"], df["volume"], period) return { "coin": name, "interval": interval, "params": {"period": period}, "mfi": _tail_pts(df["timestamp"], series, output_tail), # sliced "last": {"mfi": float(series.iloc[-1])}, } def get_vwap( name: str, daily_reset: bool = False, interval: Interval = "1h", limit: int = 300, testnet: bool = False, output_tail: int = 30, # NEW ) -> Dict[str, Any]: df = fetch_candles(name, interval, limit, testnet) series = ( vwap_daily(df["high"], df["low"], df["close"], df["volume"], df["timestamp"]) if daily_reset else vwap_cumulative(df["high"], df["low"], df["close"], df["volume"]) ) return { "coin": name, "interval": interval, "params": {"daily_reset": bool(daily_reset)}, "vwap": _tail_pts(df["timestamp"], series, output_tail), # sliced "last": {"vwap": float(series.iloc[-1])}, } def get_volume( name: str, interval: Interval = "1h", limit: int = 300, testnet: bool = False, output_tail: int = 30, # NEW ) -> Dict[str, Any]: df = fetch_candles(name, interval, limit, testnet) return { "coin": name, "interval": interval, "volume": _tail_pts(df["timestamp"], df["volume"], output_tail), # sliced "last": {"volume": float(df["volume"].iloc[-1])}, } def get_bundle( name: str, interval: Interval = "1h", limit: int = 300, testnet: bool = False, include: Iterable[str] = ("ema","macd","stoch_rsi","adl","obv","atr_adx","bbands","mfi","vwap","volume"), ema_periods: List[int] | None = None, macd_fast: int = 12, macd_slow: int = 26, macd_signal: int = 9, stoch_rsi_len: int = 14, stoch_len: int = 14, k_smooth: int = 3, d_smooth: int = 3, bb_period: int = 20, bb_std: float = 2.0, mfi_period: int = 14, vwap_daily_reset: bool = False, output_tail: int = 30, # NEW ) -> Dict[str, Any]: df = fetch_candles(name, interval, limit, testnet) out: Dict[str, Any] = { "coin": name, "interval": interval, "close": _tail_pts(df["timestamp"], df["close"], output_tail), # sliced "last": {"close": float(df["close"].iloc[-1])}, } if "ema" in include: ema_periods = ema_periods or [20, 200] out["ema"] = {} for p in ema_periods: e = ema(df["close"], p) out["ema"][str(p)] = _tail_pts(df["timestamp"], e, output_tail) # sliced out["last"][f"ema_{p}"] = float(e.iloc[-1]) if "macd" in include: line, sig, hist = macd(df["close"], macd_fast, macd_slow, macd_signal) out["macd"] = { "params": {"fast": macd_fast, "slow": macd_slow, "signal": macd_signal}, "macd_line": _tail_pts(df["timestamp"], line, output_tail), # sliced "signal": _tail_pts(df["timestamp"], sig, output_tail), # sliced "histogram": _tail_pts(df["timestamp"], hist, output_tail), # sliced "last": {"macd_line": float(line.iloc[-1]), "signal": float(sig.iloc[-1]), "histogram": float(hist.iloc[-1])}, } if "stoch_rsi" in include: st, k, d = stoch_rsi(df["close"], stoch_rsi_len, stoch_len, k_smooth, d_smooth) out["stoch_rsi"] = { "params": {"rsi_length": stoch_rsi_len, "stoch_length": stoch_len, "k_smooth": k_smooth, "d_smooth": d_smooth}, "stoch_rsi": _tail_pts(df["timestamp"], st, output_tail), # sliced "%K": _tail_pts(df["timestamp"], k, output_tail), # sliced "%D": _tail_pts(df["timestamp"], d, output_tail), # sliced "last": {"stoch_rsi": float(st.iloc[-1]), "k": float(k.iloc[-1]), "d": float(d.iloc[-1])}, } if "adl" in include: series = adl(df["high"], df["low"], df["close"], df["volume"]) out["adl"] = {"series": _tail_pts(df["timestamp"], series, output_tail), "last": float(series.iloc[-1])} if "obv" in include: series = obv(df["close"], df["volume"]) out["obv"] = {"series": _tail_pts(df["timestamp"], series, output_tail), "last": float(series.iloc[-1])} if "atr_adx" in include: plus_di, minus_di, adx_series = di_adx(df["high"], df["low"], df["close"]) atr_series = atr(df["high"], df["low"], df["close"]) out["atr_adx"] = { "+DI": _tail_pts(df["timestamp"], plus_di, output_tail), # sliced "-DI": _tail_pts(df["timestamp"], minus_di, output_tail), # sliced "ADX": _tail_pts(df["timestamp"], adx_series, output_tail), # sliced "ATR": _tail_pts(df["timestamp"], atr_series, output_tail), # sliced "last": {"+DI": float(plus_di.iloc[-1]), "-DI": float(minus_di.iloc[-1]), "ADX": float(adx_series.iloc[-1]), "ATR": float(atr_series.iloc[-1])}, } if "bbands" in include: ma, up, lo, pct_b, bw = bbands(df["close"], bb_period, bb_std) out["bbands"] = { "params": {"period": bb_period, "std_mult": bb_std}, "basis": _tail_pts(df["timestamp"], ma, output_tail), # sliced "upper": _tail_pts(df["timestamp"], up, output_tail), # sliced "lower": _tail_pts(df["timestamp"], lo, output_tail), # sliced "%b": _tail_pts(df["timestamp"], pct_b, output_tail),# sliced "bandwidth": _tail_pts(df["timestamp"], bw, output_tail), # sliced "last": {"basis": float(ma.iloc[-1]), "upper": float(up.iloc[-1]), "lower": float(lo.iloc[-1]), "%b": float(pct_b.iloc[-1]), "bandwidth": float(bw.iloc[-1])}, } if "mfi" in include: series = mfi(df["high"], df["low"], df["close"], df["volume"], mfi_period) out["mfi"] = {"params": {"period": mfi_period}, "series": _tail_pts(df["timestamp"], series, output_tail), "last": float(series.iloc[-1])} if "vwap" in include: series = vwap_daily(df["high"], df["low"], df["close"], df["volume"], df["timestamp"]) if vwap_daily_reset else \ vwap_cumulative(df["high"], df["low"], df["close"], df["volume"]) out["vwap"] = {"params": {"daily_reset": bool(vwap_daily_reset)}, "series": _tail_pts(df["timestamp"], series, output_tail), "last": float(series.iloc[-1])} if "volume" in include: out["volume"] = {"series": _tail_pts(df["timestamp"], df["volume"], output_tail), "last": float(df["volume"].iloc[-1])} return out