interactive_trading_bot / hl_indicators.py
samsonleegh's picture
Upload 10 files
80fa9cc verified
# hl_indicators.py
# pip install: hyperliquid-python-sdk pandas numpy
from __future__ import annotations
from typing import Dict, Any, List, Tuple, Literal, Iterable
import time
import numpy as np
import pandas as pd
from hyperliquid.info import Info
from hyperliquid.utils import constants
Interval = Literal["1m", "5m", "15m", "1h", "4h", "1d"]
_MS = {"1m": 60_000, "5m": 5*60_000, "15m": 15*60_000, "1h": 60*60_000, "4h": 4*60*60_000, "1d": 24*60*60_000}
def _now_ms() -> int:
return int(time.time() * 1000)
def _start_end_from_limit(interval: Interval, limit: int, end_ms: int | None = None) -> tuple[int, int]:
end_ms = end_ms or _now_ms()
span = (limit + 2) * _MS[interval] # small buffer for smoothing windows
start_ms = max(0, end_ms - span)
return start_ms, end_ms
# ---------------- Data fetch via candles_snapshot ---------------- #
def fetch_candles(
name: str,
interval: Interval = "1h",
limit: int = 600,
testnet: bool = True,
end_ms: int | None = None,
) -> pd.DataFrame:
"""
Fetch OHLCV candles via Info.candles_snapshot(name, interval, startTime, endTime).
Returns DataFrame with ['timestamp','open','high','low','close','volume'] sorted by time.
"""
api_url = constants.TESTNET_API_URL if testnet else constants.MAINNET_API_URL
info = Info(api_url, skip_ws=True)
start_ms, end_ms = _start_end_from_limit(interval, limit, end_ms)
raw = info.candles_snapshot(name, interval, start_ms, end_ms)
if not raw:
raise ValueError(f"No candles returned for {name} {interval}")
df = pd.DataFrame(raw).rename(columns={
"t": "timestamp", "o": "open", "h": "high", "l": "low", "c": "close", "v": "volume",
"T": "close_time", "i": "interval", "s": "symbol", "n": "trades",
})
needed = ["timestamp", "open", "high", "low", "close", "volume"]
for k in needed:
if k not in df.columns:
raise ValueError(f"Missing '{k}' in candles_snapshot payload. Got: {list(df.columns)}")
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms", errors="coerce")
for k in ["open","high","low","close","volume"]:
df[k] = pd.to_numeric(df[k], errors="coerce")
df = df.dropna(subset=["timestamp","close"]).sort_values("timestamp").reset_index(drop=True)
if len(df) > limit:
df = df.iloc[-limit:].reset_index(drop=True)
return df
# ---------------- Base indicators ---------------- #
def ema(series: pd.Series, period: int) -> pd.Series:
return series.ewm(span=period, adjust=False).mean()
def macd(series: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series, pd.Series]:
fast_ema, slow_ema = ema(series, fast), ema(series, slow)
line = fast_ema - slow_ema
sig = ema(line, signal)
hist = line - sig
return line, sig, hist
def rsi(series: pd.Series, period: int = 14) -> pd.Series:
delta = series.diff()
up = pd.Series(np.where(delta > 0, delta, 0.0), index=series.index)
down = pd.Series(np.where(delta < 0, -delta, 0.0), index=series.index)
avg_up = up.ewm(alpha=1/period, adjust=False).mean()
avg_down = down.ewm(alpha=1/period, adjust=False).mean()
rs = avg_up / avg_down.replace(0, np.nan)
return (100 - (100 / (1 + rs))).fillna(0)
def stoch_rsi(series: pd.Series, rsi_length: int = 14, stoch_length: int = 14, k_smooth: int = 3, d_smooth: int = 3
) -> Tuple[pd.Series, pd.Series, pd.Series]:
r = rsi(series, rsi_length)
r_low, r_high = r.rolling(stoch_length).min(), r.rolling(stoch_length).max()
base = (r - r_low) / (r_high - r_low)
k = base.rolling(k_smooth).mean() * 100.0
d = k.rolling(d_smooth).mean()
return base * 100.0, k, d
# ---------------- Volume/volatility family ---------------- #
def adl(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series) -> pd.Series:
"""
Chaikin Accumulation/Distribution Line.
mfm = ((close - low) - (high - close)) / (high - low), guarded for zero range.
ADL = cumulative sum(mfm * volume)
"""
hl_range = (high - low).replace(0, np.nan)
mfm = ((close - low) - (high - close)) / hl_range
mfm = mfm.fillna(0.0)
mfv = mfm * volume
return mfv.cumsum()
def obv(close: pd.Series, volume: pd.Series) -> pd.Series:
"""
On-Balance Volume.
"""
sign = np.sign(close.diff()).fillna(0)
return (volume * sign).cumsum()
def true_range(high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series:
prev_close = close.shift(1)
tr = pd.concat([(high - low).abs(), (high - prev_close).abs(), (low - prev_close).abs()], axis=1).max(axis=1)
return tr
def atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series:
tr = true_range(high, low, close)
return tr.ewm(alpha=1/period, adjust=False).mean()
def di_adx(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14
) -> Tuple[pd.Series, pd.Series, pd.Series]:
up_move = high.diff()
down_move = -low.diff()
plus_dm = pd.Series(np.where((up_move > down_move) & (up_move > 0), up_move, 0.0), index=high.index)
minus_dm = pd.Series(np.where((down_move > up_move) & (down_move > 0), down_move, 0.0), index=high.index)
atr_series = atr(high, low, close, period)
plus_di = 100 * (plus_dm.ewm(alpha=1/period, adjust=False).mean() / atr_series.replace(0, np.nan))
minus_di = 100 * (minus_dm.ewm(alpha=1/period, adjust=False).mean() / atr_series.replace(0, np.nan))
dx = (100 * (plus_di - minus_di).abs() / (plus_di + minus_di).replace(0, np.nan)).fillna(0)
adx = dx.ewm(alpha=1/period, adjust=False).mean()
return plus_di.fillna(0), minus_di.fillna(0), adx.fillna(0)
def bbands(series: pd.Series, period: int = 20, std_mult: float = 2.0
) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series, pd.Series]:
ma = series.rolling(period).mean()
sd = series.rolling(period).std(ddof=0)
upper = ma + std_mult * sd
lower = ma - std_mult * sd
pct_b = (series - lower) / (upper - lower)
bandwidth = (upper - lower) / ma
return ma, upper, lower, pct_b, bandwidth
def mfi(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series, period: int = 14) -> pd.Series:
tp = (high + low + close) / 3.0
rmf = tp * volume
pos_flow = pd.Series(np.where(tp > tp.shift(1), rmf, 0.0), index=tp.index).rolling(period).sum()
neg_flow = pd.Series(np.where(tp < tp.shift(1), rmf, 0.0), index=tp.index).rolling(period).sum()
money_ratio = pos_flow / neg_flow.replace(0, np.nan)
out = 100 - (100 / (1 + money_ratio))
return out.fillna(0)
def vwap_cumulative(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series) -> pd.Series:
"""
Cumulative VWAP over the full series: sum(TP*V)/sum(V) where TP=(H+L+C)/3.
Resets only at the beginning (not each day).
"""
tp = (high + low + close) / 3.0
cum_v = volume.cumsum().replace(0, np.nan)
cum_tp_v = (tp * volume).cumsum()
return (cum_tp_v / cum_v).fillna(method="bfill").fillna(0)
def vwap_daily(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series, timestamps: pd.Series) -> pd.Series:
"""
Session VWAP that resets daily (by calendar date of 'timestamps').
"""
tp = (high + low + close) / 3.0
dates = pd.to_datetime(timestamps).dt.date
df = pd.DataFrame({"tp": tp, "v": volume, "date": dates})
df["tpv"] = df["tp"] * df["v"]
cum = df.groupby("date")[["tpv", "v"]].cumsum()
vwap = (cum["tpv"] / cum["v"].replace(0, np.nan)).values
return pd.Series(vwap, index=high.index).fillna(method="bfill").fillna(0)
# ---------------- JSON helpers ---------------- #
def _pts(ts: pd.Series, vals: pd.Series) -> List[Dict[str, float]]:
out: List[Dict[str, float]] = []
for t, v in zip(ts, vals):
if pd.isna(t) or pd.isna(v):
continue
out.append({"t": int(pd.Timestamp(t).timestamp() * 1000), "v": float(v)})
return out
def _tail_pts(ts: pd.Series, vals: pd.Series, n: int) -> List[Dict[str, float]]:
"""Return only the last n timestamp/value points (safe if n > len)."""
if n is None or n <= 0:
return _pts(ts, vals)
tail_ts = ts.iloc[-n:] if len(ts) > n else ts
tail_vals = vals.iloc[-n:] if len(vals) > n else vals
return _pts(tail_ts, tail_vals)
# ---------------- MCP-friendly functions (per indicator) ---------------- #
def get_ema(
name: str,
periods: List[int] | None = None,
interval: Interval = "1h",
limit: int = 300,
testnet: bool = False,
output_tail: int = 30, # NEW
) -> Dict[str, Any]:
periods = periods or [20, 200]
df = fetch_candles(name, interval, limit, testnet)
res: Dict[str, Any] = {
"coin": name,
"interval": interval,
"ema": {},
"close": _tail_pts(df["timestamp"], df["close"], output_tail), # sliced
"last": {"close": float(df["close"].iloc[-1])},
}
for p in periods:
e = ema(df["close"], p)
res["ema"][str(p)] = _tail_pts(df["timestamp"], e, output_tail) # sliced
res["last"][f"ema_{p}"] = float(e.iloc[-1])
return res
def get_macd(
name: str,
fast: int = 12,
slow: int = 26,
signal: int = 9,
interval: Interval = "1h",
limit: int = 300,
testnet: bool = False,
output_tail: int = 30, # NEW
) -> Dict[str, Any]:
df = fetch_candles(name, interval, limit, testnet)
line, sig, hist = macd(df["close"], fast, slow, signal)
return {
"coin": name,
"interval": interval,
"params": {"fast": fast, "slow": slow, "signal": signal},
"macd_line": _tail_pts(df["timestamp"], line, output_tail), # sliced
"signal": _tail_pts(df["timestamp"], sig, output_tail), # sliced
"histogram": _tail_pts(df["timestamp"], hist, output_tail), # sliced
"last": {
"macd_line": float(line.iloc[-1]),
"signal": float(sig.iloc[-1]),
"histogram": float(hist.iloc[-1]),
"close": float(df["close"].iloc[-1]),
},
}
def get_stoch_rsi(
name: str,
rsi_length: int = 14,
stoch_length: int = 14,
k_smooth: int = 3,
d_smooth: int = 3,
interval: Interval = "1h",
limit: int = 300,
testnet: bool = False,
output_tail: int = 30, # NEW
) -> Dict[str, Any]:
df = fetch_candles(name, interval, limit, testnet)
stoch, k, d = stoch_rsi(df["close"], rsi_length, stoch_length, k_smooth, d_smooth)
return {
"coin": name,
"interval": interval,
"params": {
"rsi_length": rsi_length,
"stoch_length": stoch_length,
"k_smooth": k_smooth,
"d_smooth": d_smooth,
},
"stoch_rsi": _tail_pts(df["timestamp"], stoch, output_tail), # sliced
"%K": _tail_pts(df["timestamp"], k, output_tail), # sliced
"%D": _tail_pts(df["timestamp"], d, output_tail), # sliced
"last": {
"stoch_rsi": float(stoch.iloc[-1]),
"k": float(k.iloc[-1]),
"d": float(d.iloc[-1]),
"close": float(df["close"].iloc[-1]),
},
}
def get_adl(
name: str,
interval: Interval = "1h",
limit: int = 300,
testnet: bool = False,
output_tail: int = 30, # NEW
) -> Dict[str, Any]:
df = fetch_candles(name, interval, limit, testnet)
series = adl(df["high"], df["low"], df["close"], df["volume"])
return {
"coin": name,
"interval": interval,
"adl": _tail_pts(df["timestamp"], series, output_tail), # sliced
"last": {"adl": float(series.iloc[-1])},
}
def get_obv(
name: str,
interval: Interval = "1h",
limit: int = 300,
testnet: bool = False,
output_tail: int = 30, # NEW
) -> Dict[str, Any]:
df = fetch_candles(name, interval, limit, testnet)
series = obv(df["close"], df["volume"])
return {
"coin": name,
"interval": interval,
"obv": _tail_pts(df["timestamp"], series, output_tail), # sliced
"last": {"obv": float(series.iloc[-1])},
}
def get_atr_adx(
name: str,
period: int = 14,
interval: Interval = "1h",
limit: int = 300,
testnet: bool = False,
output_tail: int = 30, # NEW
) -> Dict[str, Any]:
df = fetch_candles(name, interval, limit, testnet)
plus_di, minus_di, adx_series = di_adx(df["high"], df["low"], df["close"], period)
atr_series = atr(df["high"], df["low"], df["close"], period)
return {
"coin": name,
"interval": interval,
"params": {"period": period},
"+DI": _tail_pts(df["timestamp"], plus_di, output_tail), # sliced
"-DI": _tail_pts(df["timestamp"], minus_di, output_tail), # sliced
"ADX": _tail_pts(df["timestamp"], adx_series, output_tail),# sliced
"ATR": _tail_pts(df["timestamp"], atr_series, output_tail),# sliced
"last": {
"+DI": float(plus_di.iloc[-1]),
"-DI": float(minus_di.iloc[-1]),
"ADX": float(adx_series.iloc[-1]),
"ATR": float(atr_series.iloc[-1]),
},
}
def get_bbands(
name: str,
period: int = 20,
std_mult: float = 2.0,
interval: Interval = "1h",
limit: int = 300,
testnet: bool = False,
output_tail: int = 30, # NEW
) -> Dict[str, Any]:
df = fetch_candles(name, interval, limit, testnet)
ma, upper, lower, pct_b, bandwidth = bbands(df["close"], period, std_mult)
return {
"coin": name,
"interval": interval,
"params": {"period": period, "std_mult": std_mult},
"basis": _tail_pts(df["timestamp"], ma, output_tail), # sliced
"upper": _tail_pts(df["timestamp"], upper, output_tail), # sliced
"lower": _tail_pts(df["timestamp"], lower, output_tail), # sliced
"%b": _tail_pts(df["timestamp"], pct_b, output_tail), # sliced
"bandwidth": _tail_pts(df["timestamp"], bandwidth, output_tail), # sliced
"last": {
"basis": float(ma.iloc[-1]),
"upper": float(upper.iloc[-1]),
"lower": float(lower.iloc[-1]),
"%b": float(pct_b.iloc[-1]),
"bandwidth": float(bandwidth.iloc[-1]),
},
}
def get_mfi(
name: str,
period: int = 14,
interval: Interval = "1h",
limit: int = 300,
testnet: bool = False,
output_tail: int = 30, # NEW
) -> Dict[str, Any]:
df = fetch_candles(name, interval, limit, testnet)
series = mfi(df["high"], df["low"], df["close"], df["volume"], period)
return {
"coin": name,
"interval": interval,
"params": {"period": period},
"mfi": _tail_pts(df["timestamp"], series, output_tail), # sliced
"last": {"mfi": float(series.iloc[-1])},
}
def get_vwap(
name: str,
daily_reset: bool = False,
interval: Interval = "1h",
limit: int = 300,
testnet: bool = False,
output_tail: int = 30, # NEW
) -> Dict[str, Any]:
df = fetch_candles(name, interval, limit, testnet)
series = (
vwap_daily(df["high"], df["low"], df["close"], df["volume"], df["timestamp"])
if daily_reset else
vwap_cumulative(df["high"], df["low"], df["close"], df["volume"])
)
return {
"coin": name,
"interval": interval,
"params": {"daily_reset": bool(daily_reset)},
"vwap": _tail_pts(df["timestamp"], series, output_tail), # sliced
"last": {"vwap": float(series.iloc[-1])},
}
def get_volume(
name: str,
interval: Interval = "1h",
limit: int = 300,
testnet: bool = False,
output_tail: int = 30, # NEW
) -> Dict[str, Any]:
df = fetch_candles(name, interval, limit, testnet)
return {
"coin": name,
"interval": interval,
"volume": _tail_pts(df["timestamp"], df["volume"], output_tail), # sliced
"last": {"volume": float(df["volume"].iloc[-1])},
}
def get_bundle(
name: str,
interval: Interval = "1h",
limit: int = 300,
testnet: bool = False,
include: Iterable[str] = ("ema","macd","stoch_rsi","adl","obv","atr_adx","bbands","mfi","vwap","volume"),
ema_periods: List[int] | None = None,
macd_fast: int = 12, macd_slow: int = 26, macd_signal: int = 9,
stoch_rsi_len: int = 14, stoch_len: int = 14, k_smooth: int = 3, d_smooth: int = 3,
bb_period: int = 20, bb_std: float = 2.0,
mfi_period: int = 14,
vwap_daily_reset: bool = False,
output_tail: int = 30, # NEW
) -> Dict[str, Any]:
df = fetch_candles(name, interval, limit, testnet)
out: Dict[str, Any] = {
"coin": name,
"interval": interval,
"close": _tail_pts(df["timestamp"], df["close"], output_tail), # sliced
"last": {"close": float(df["close"].iloc[-1])},
}
if "ema" in include:
ema_periods = ema_periods or [20, 200]
out["ema"] = {}
for p in ema_periods:
e = ema(df["close"], p)
out["ema"][str(p)] = _tail_pts(df["timestamp"], e, output_tail) # sliced
out["last"][f"ema_{p}"] = float(e.iloc[-1])
if "macd" in include:
line, sig, hist = macd(df["close"], macd_fast, macd_slow, macd_signal)
out["macd"] = {
"params": {"fast": macd_fast, "slow": macd_slow, "signal": macd_signal},
"macd_line": _tail_pts(df["timestamp"], line, output_tail), # sliced
"signal": _tail_pts(df["timestamp"], sig, output_tail), # sliced
"histogram": _tail_pts(df["timestamp"], hist, output_tail), # sliced
"last": {"macd_line": float(line.iloc[-1]), "signal": float(sig.iloc[-1]), "histogram": float(hist.iloc[-1])},
}
if "stoch_rsi" in include:
st, k, d = stoch_rsi(df["close"], stoch_rsi_len, stoch_len, k_smooth, d_smooth)
out["stoch_rsi"] = {
"params": {"rsi_length": stoch_rsi_len, "stoch_length": stoch_len, "k_smooth": k_smooth, "d_smooth": d_smooth},
"stoch_rsi": _tail_pts(df["timestamp"], st, output_tail), # sliced
"%K": _tail_pts(df["timestamp"], k, output_tail), # sliced
"%D": _tail_pts(df["timestamp"], d, output_tail), # sliced
"last": {"stoch_rsi": float(st.iloc[-1]), "k": float(k.iloc[-1]), "d": float(d.iloc[-1])},
}
if "adl" in include:
series = adl(df["high"], df["low"], df["close"], df["volume"])
out["adl"] = {"series": _tail_pts(df["timestamp"], series, output_tail), "last": float(series.iloc[-1])}
if "obv" in include:
series = obv(df["close"], df["volume"])
out["obv"] = {"series": _tail_pts(df["timestamp"], series, output_tail), "last": float(series.iloc[-1])}
if "atr_adx" in include:
plus_di, minus_di, adx_series = di_adx(df["high"], df["low"], df["close"])
atr_series = atr(df["high"], df["low"], df["close"])
out["atr_adx"] = {
"+DI": _tail_pts(df["timestamp"], plus_di, output_tail), # sliced
"-DI": _tail_pts(df["timestamp"], minus_di, output_tail), # sliced
"ADX": _tail_pts(df["timestamp"], adx_series, output_tail), # sliced
"ATR": _tail_pts(df["timestamp"], atr_series, output_tail), # sliced
"last": {"+DI": float(plus_di.iloc[-1]), "-DI": float(minus_di.iloc[-1]), "ADX": float(adx_series.iloc[-1]), "ATR": float(atr_series.iloc[-1])},
}
if "bbands" in include:
ma, up, lo, pct_b, bw = bbands(df["close"], bb_period, bb_std)
out["bbands"] = {
"params": {"period": bb_period, "std_mult": bb_std},
"basis": _tail_pts(df["timestamp"], ma, output_tail), # sliced
"upper": _tail_pts(df["timestamp"], up, output_tail), # sliced
"lower": _tail_pts(df["timestamp"], lo, output_tail), # sliced
"%b": _tail_pts(df["timestamp"], pct_b, output_tail),# sliced
"bandwidth": _tail_pts(df["timestamp"], bw, output_tail), # sliced
"last": {"basis": float(ma.iloc[-1]), "upper": float(up.iloc[-1]), "lower": float(lo.iloc[-1]), "%b": float(pct_b.iloc[-1]), "bandwidth": float(bw.iloc[-1])},
}
if "mfi" in include:
series = mfi(df["high"], df["low"], df["close"], df["volume"], mfi_period)
out["mfi"] = {"params": {"period": mfi_period}, "series": _tail_pts(df["timestamp"], series, output_tail), "last": float(series.iloc[-1])}
if "vwap" in include:
series = vwap_daily(df["high"], df["low"], df["close"], df["volume"], df["timestamp"]) if vwap_daily_reset else \
vwap_cumulative(df["high"], df["low"], df["close"], df["volume"])
out["vwap"] = {"params": {"daily_reset": bool(vwap_daily_reset)}, "series": _tail_pts(df["timestamp"], series, output_tail), "last": float(series.iloc[-1])}
if "volume" in include:
out["volume"] = {"series": _tail_pts(df["timestamp"], df["volume"], output_tail), "last": float(df["volume"].iloc[-1])}
return out