deepseek_auto_trader / hl_indicators.py
samsonleegh's picture
Upload 4 files
cab57ec verified
# hl_indicators2.py
from __future__ import annotations
import time
import numpy as np
import pandas as pd
from datetime import datetime, timezone
from typing import Dict, Any, List, Tuple, Literal, Iterable, Union, Optional
from hyperliquid.info import Info
from hyperliquid.utils import constants
Interval = Literal["1m", "5m", "15m", "1h", "4h", "1d"]
_MS = {"1m": 60_000, "5m": 5*60_000, "15m": 15*60_000, "1h": 60*60_000, "4h": 4*60*60_000, "1d": 24*60*60_000}
def _now_ms() -> int:
return int(time.time() * 1000)
def _start_end_from_limit(interval: Interval, limit: int, end_ms: int | None = None) -> tuple[int, int, str, str]:
end_ms = end_ms or _now_ms()
span = (limit + 2) * _MS[interval] # small buffer for smoothing windows
start_ms = max(0, end_ms - span)
start_utc = datetime.fromtimestamp(start_ms / 1000, tz=timezone.utc).strftime("%Y-%m-%d %H:%M")
end_utc = datetime.fromtimestamp(end_ms / 1000, tz=timezone.utc).strftime("%Y-%m-%d %H:%M")
return start_ms, end_ms, start_utc, end_utc
def fetch_candles(
name: str,
interval: Interval = "1h",
limit: int = 300,
testnet: bool = False,
end_ms: int | None = None,
) -> pd.DataFrame:
"""
Fetch OHLCV candles via Info.candles_snapshot(name, interval, startTime, endTime).
Returns DataFrame with ['timestamp','open','high','low','close','volume'] sorted by time.
"""
api_url = constants.TESTNET_API_URL if testnet else constants.MAINNET_API_URL
info = Info(api_url, skip_ws=True)
start_ms, end_ms, _, _ = _start_end_from_limit(interval, limit, end_ms)
raw = info.candles_snapshot(name, interval, start_ms, end_ms)
if not raw:
raise ValueError(f"No candles returned for {name} {interval}")
df = pd.DataFrame(raw).rename(columns={
"t": "timestamp", "o": "open", "h": "high", "l": "low", "c": "close", "v": "volume",
"T": "close_time", "i": "interval", "s": "symbol", "n": "trades",
})
needed = ["timestamp", "open", "high", "low", "close", "volume"]
for k in needed:
if k not in df.columns:
raise ValueError(f"Missing '{k}' in candles_snapshot payload. Got: {list(df.columns)}")
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms", errors="coerce", utc=True)
df = df.dropna(subset=["timestamp","close"]).sort_values("timestamp").reset_index(drop=True)
for k in ["open","high","low","close","volume"]:
df[k] = pd.to_numeric(df[k], errors="coerce")
df = df.dropna(subset=["timestamp","close"]).sort_values("timestamp").reset_index(drop=True)
if len(df) > limit:
df = df.iloc[-limit:].reset_index(drop=True)
return df
# ---------------- Base indicators ---------------- #
def ema(series: pd.Series, period: int) -> pd.Series:
return series.ewm(span=period, adjust=False).mean()
def rsi(series: pd.Series, period: int = 14) -> pd.Series:
delta = series.diff()
up = delta.clip(lower=0.0)
down = (-delta).clip(lower=0.0)
avg_up = up.ewm(alpha=1/period, adjust=False).mean()
avg_down = down.ewm(alpha=1/period, adjust=False).mean()
rs = avg_up / avg_down.replace(0, np.nan)
return (100 - (100 / (1 + rs))).fillna(0)
def true_range(high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series:
prev_close = close.shift(1)
tr = pd.concat([(high - low).abs(),
(high - prev_close).abs(),
(low - prev_close).abs()], axis=1).max(axis=1)
return tr
def atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series:
return true_range(high, low, close).ewm(alpha=1/period, adjust=False).mean()
def di_adx(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> Tuple[pd.Series, pd.Series, pd.Series]:
up_move = high.diff()
down_move = -low.diff()
plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0)
minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0)
atr_s = atr(high, low, close, period)
plus_di = 100 * pd.Series(plus_dm, index=high.index).ewm(alpha=1/period, adjust=False).mean() / atr_s.replace(0, np.nan)
minus_di = 100 * pd.Series(minus_dm, index=high.index).ewm(alpha=1/period, adjust=False).mean() / atr_s.replace(0, np.nan)
dx = (100 * (plus_di - minus_di).abs() / (plus_di + minus_di).replace(0, np.nan)).fillna(0)
adx = dx.ewm(alpha=1/period, adjust=False).mean()
return plus_di.fillna(0), minus_di.fillna(0), adx.fillna(0)
def bbands(series: pd.Series, period: int = 20, std_mult: float = 2.0) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series, pd.Series]:
ma = series.rolling(period, min_periods=period).mean()
sd = series.rolling(period, min_periods=period).std(ddof=0)
upper = ma + std_mult * sd
lower = ma - std_mult * sd
width = (upper - lower)
pct_b = ((series - lower) / width).clip(lower=0, upper=1)
bandwidth = (width / ma.replace(0, np.nan)).replace([np.inf, -np.inf], np.nan)
return ma, upper, lower, pct_b, bandwidth
def obv(close: pd.Series, volume: pd.Series) -> pd.Series:
sign = np.sign(close.diff()).fillna(0.0)
return (volume * sign).cumsum()
def adl(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series) -> pd.Series:
rng = (high - low).replace(0, np.nan)
mfm = ((close - low) - (high - close)) / rng
return (mfm.fillna(0.0) * volume).cumsum()
def macd(series: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series, pd.Series]:
fast_ema = series.ewm(span=fast, adjust=False).mean()
slow_ema = series.ewm(span=slow, adjust=False).mean()
line = fast_ema - slow_ema
sig = line.ewm(span=signal, adjust=False).mean()
hist = line - sig
return line, sig, hist
def stoch_rsi(series: pd.Series, rsi_length: int = 14, stoch_length: int = 14,
k_smooth: int = 3, d_smooth: int = 3) -> Tuple[pd.Series, pd.Series, pd.Series]:
# Base RSI
delta = series.diff()
up = delta.clip(lower=0.0)
down = (-delta).clip(lower=0.0)
avg_up = up.ewm(alpha=1/rsi_length, adjust=False).mean()
avg_down = down.ewm(alpha=1/rsi_length, adjust=False).mean()
rs = avg_up / avg_down.replace(0, np.nan)
rsi = (100 - (100 / (1 + rs))).fillna(0)
# Stoch over RSI
r_low = rsi.rolling(stoch_length, min_periods=stoch_length).min()
r_high = rsi.rolling(stoch_length, min_periods=stoch_length).max()
base = (rsi - r_low) / (r_high - r_low)
k = base.rolling(k_smooth, min_periods=k_smooth).mean() * 100.0
d = k.rolling(d_smooth, min_periods=d_smooth).mean()
return (base * 100.0).fillna(0), k.fillna(0), d.fillna(0)
def vwap_cumulative(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series) -> pd.Series:
tp = (high + low + close) / 3.0
cum_v = volume.cumsum().replace(0, np.nan)
return (tp * volume).cumsum() / cum_v
def vwap_daily(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series, ts: pd.Series) -> pd.Series:
tp = (high + low + close) / 3.0
df = pd.DataFrame({"tp": tp, "v": volume, "date": pd.to_datetime(ts, utc=True).dt.date})
df["tpv"] = df["tp"] * df["v"]
cum = df.groupby("date")[["tpv", "v"]].cumsum()
vw = (cum["tpv"] / cum["v"].replace(0, np.nan)).values
return pd.Series(vw, index=tp.index)
def _utc_str_col(series: pd.Series) -> pd.Series:
return pd.to_datetime(series, utc=True).dt.strftime("%Y-%m-%d %H:%M:%S")
def _slope_last_n(s: pd.Series, n: int = 20) -> float:
n = min(n, len(s))
if n < 2: return 0.0
y = s.iloc[-n:].astype(float).values
x = np.arange(n, dtype=float)
xm, ym = x.mean(), y.mean()
denom = ((x - xm) ** 2).sum()
return 0.0 if denom == 0 else float(((x - xm) * (y - ym)).sum() / denom)
# ---------------- MCP-friendly functions (per indicator) ---------------- #
def get_ema(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
include_signals: bool = True,
exclude_current_bar: bool = True, # NEW
) -> pd.DataFrame:
if lookback_period < 1:
raise ValueError("lookback_period must be >= 1")
df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet)
if df.empty:
raise ValueError(f"No candles returned for {name} {interval}")
df = df.copy()
df["close"] = pd.to_numeric(df["close"], errors="coerce")
df = df.dropna(subset=["close"]).reset_index(drop=True)
df["ema_9"] = ema(df["close"], 9)
df["ema_20"] = ema(df["close"], 20)
df["ema_200"] = ema(df["close"], 200)
if include_signals:
df["ema9_gt_ema20"] = df["ema_9"] > df["ema_20"]
df["ema20_gt_ema200"] = df["ema_20"] > df["ema_200"]
def _cross(now: pd.Series, prev: pd.Series) -> pd.Series:
return np.where((prev == False) & (now == True), "bullish_cross",
np.where((prev == True) & (now == False), "bearish_cross", "none"))
prev_9_gt_20 = df["ema9_gt_ema20"].shift(1)
prev_20_gt_200 = df["ema20_gt_ema200"].shift(1)
df["cross_9_20"] = _cross(df["ema9_gt_ema20"], prev_9_gt_20)
df["cross_20_200"] = _cross(df["ema20_gt_ema200"], prev_20_gt_200)
def _trend_row(r) -> str:
if r["ema_9"] > r["ema_20"] > r["ema_200"]:
return "bullish"
if r["ema_9"] < r["ema_20"] < r["ema_200"]:
return "bearish"
return "neutral"
df["trend"] = df[["ema_9","ema_20","ema_200"]].apply(_trend_row, axis=1)
# Apply the exclusion consistently at the end
df_ex = df.iloc[:-1] if exclude_current_bar else df
cols = ["timestamp", "close", "ema_9", "ema_20", "ema_200"]
if include_signals:
cols += ["ema9_gt_ema20", "ema20_gt_ema200", "cross_9_20", "cross_20_200", "trend"]
result = df_ex[cols].tail(lookback_period).reset_index(drop=True)
result = result.rename(columns={"timestamp": "utc_timestamp"})
result["utc_timestamp"] = result["utc_timestamp"].dt.strftime("%Y-%m-%d %H:%M:%S")
return result
def get_volume(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
include_signals: bool = True,
exclude_current_bar: bool = True,
high_ratio: float = 1.5,
low_ratio: float = 0.7,
z_lookback: int = 100,
z_hi: float = 2.0,
z_lo: float = -2.0,
) -> pd.DataFrame:
"""
Returns the last `lookback_period` rows with volume-based features:
- volume
- avg_past_24_session_volume
- volume_ratio, volume_change, zscore_volume, volume_signal (if include_signals=True)
"""
# --- fetch and basic prep ---
df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet)
if df.empty:
raise ValueError(f"No candles returned for {name} {interval}")
df = df[["timestamp", "volume"]].copy()
# 24-session average (optionally exclude current bar from the average computation)
vol_for_avg = df["volume"].shift(1) if exclude_current_bar else df["volume"]
df["avg_past_24_session_volume"] = vol_for_avg.rolling(window=24, min_periods=24).mean()
# Conditionally exclude the current (possibly still-forming) bar from all downstream calcs
df_ex = df.iloc[:-1] if exclude_current_bar else df
# Slice tail window & format timestamp
out = df_ex.tail(lookback_period).reset_index(drop=True).copy()
out["utc_timestamp"] = pd.to_datetime(out["timestamp"], utc=True).dt.strftime("%Y-%m-%d %H:%M:%S")
# --- signals ---
if include_signals:
# Ratio: current volume vs 24-period average
out["volume_ratio"] = out["volume"] / out["avg_past_24_session_volume"]
# Percent change vs previous bar
out["volume_change"] = out["volume"].pct_change()
# Z-score vs rolling mean/std (computed on full excluded dataset)
base_ex = df_ex.copy()
roll_mean = base_ex["volume"].rolling(z_lookback, min_periods=z_lookback).mean()
roll_std = base_ex["volume"].rolling(z_lookback, min_periods=z_lookback).std(ddof=0)
base_ex["zscore_volume"] = (base_ex["volume"] - roll_mean) / roll_std
out["zscore_volume"] = base_ex["zscore_volume"].tail(lookback_period).to_numpy()
# Categorical signal from ratio thresholds
ratio = out["volume_ratio"]
conds = [
(ratio.notna()) & (ratio > high_ratio),
(ratio.notna()) & (ratio < low_ratio)
]
out["volume_signal"] = np.select(conds, ["high", "low"], default="normal")
# --- clean integers for readability ---
for col in ["volume", "avg_past_24_session_volume"]:
out[col] = out[col].round(0).astype("Int64")
cols = ["utc_timestamp", "volume", "avg_past_24_session_volume"]
if include_signals:
cols += ["volume_ratio", "volume_change", "zscore_volume", "volume_signal"]
return out[cols]
# ======= RSI =======
def get_rsi(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
period: int = 14,
include_signals: bool = True,
exclude_current_bar: bool = True,
) -> pd.DataFrame:
df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet)
if df.empty:
raise ValueError(f"No candles returned for {name} {interval}")
close = df["close"].copy()
r = rsi(close, period)
# Apply exclusion ONCE, then use the sliced frame for all outputs
if exclude_current_bar:
df = df.iloc[:-1]
r = r.iloc[:-1]
close = close.iloc[:-1]
out = pd.DataFrame({
"timestamp": df["timestamp"].tail(lookback_period).values,
"close": close.tail(lookback_period).values,
"rsi": r.tail(lookback_period).values,
}).reset_index(drop=True)
if include_signals:
out["rsi_state"] = np.select(
[out["rsi"] >= 70, out["rsi"] <= 30],
["overbought", "oversold"], default="neutral"
)
prev = out["rsi"].shift(1)
out["rsi_cross_50"] = ((prev < 50) & (out["rsi"] >= 50)) | ((prev > 50) & (out["rsi"] <= 50))
out["rsi_slope"] = out["rsi"].diff()
out["utc_timestamp"] = _utc_str_col(out["timestamp"])
return out.drop(columns=["timestamp"])
# ======= Bollinger Bands =======
def get_bbands(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
period: int = 20,
std_mult: float = 2.0,
include_signals: bool = True,
squeeze_lookback: int = 180,
exclude_current_bar: bool = True,
) -> pd.DataFrame:
df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet)
ma, up, lo, pctb, bw = bbands(df["close"], period, std_mult)
if exclude_current_bar:
df, ma, up, lo, pctb, bw = df.iloc[:-1], ma.iloc[:-1], up.iloc[:-1], lo.iloc[:-1], pctb.iloc[:-1], bw.iloc[:-1]
out = pd.DataFrame({
"timestamp": df["timestamp"].tail(lookback_period).values,
"close": df["close"].tail(lookback_period).values,
"bb_basis": ma.tail(lookback_period).values,
"bb_upper": up.tail(lookback_period).values,
"bb_lower": lo.tail(lookback_period).values,
"pct_b": pctb.tail(lookback_period).values,
"bandwidth": bw.tail(lookback_period).values,
}).reset_index(drop=True)
if include_signals:
# rolling percentile rank for squeeze (lower percentile = tighter bands)
bw_roll = bw.rolling(squeeze_lookback, min_periods=10)
# percentile of current bandwidth within window
def _pr(x):
s = pd.Series(x).dropna()
if len(s) == 0: return np.nan
return (s.rank(pct=True).iloc[-1])
squeeze_p = bw_roll.apply(_pr, raw=False)
squeeze_p = squeeze_p.tail(lookback_period).values
out["squeeze_percentile"] = squeeze_p
out["in_squeeze"] = pd.Series(squeeze_p).le(0.1) # bottom 10%
out["above_basis"] = out["close"] > out["bb_basis"]
out["touching_upper"] = out["close"] >= out["bb_upper"]
out["touching_lower"] = out["close"] <= out["bb_lower"]
out["utc_timestamp"] = _utc_str_col(out["timestamp"])
return out.drop(columns=["timestamp"])
# ======= ATR + ADX =======
def get_atr_adx(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
period: int = 14,
include_signals: bool = True,
adx_threshold: float = 25.0,
exclude_current_bar: bool = True,
) -> pd.DataFrame:
df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet)
plus_di, minus_di, adx_s = di_adx(df["high"], df["low"], df["close"], period)
atr_s = atr(df["high"], df["low"], df["close"], period)
if exclude_current_bar:
df, plus_di, minus_di, adx_s, atr_s = df.iloc[:-1], plus_di.iloc[:-1], minus_di.iloc[:-1], adx_s.iloc[:-1], atr_s.iloc[:-1]
out = pd.DataFrame({
"timestamp": df["timestamp"].tail(lookback_period).values,
"+DI": plus_di.tail(lookback_period).values,
"-DI": minus_di.tail(lookback_period).values,
"ADX": adx_s.tail(lookback_period).values,
"ATR": atr_s.tail(lookback_period).values,
}).reset_index(drop=True)
if include_signals:
out["trend_ok"] = out["ADX"] >= adx_threshold
out["direction"] = np.where(out["+DI"] > out["-DI"], "up", "down")
out["utc_timestamp"] = _utc_str_col(out["timestamp"])
return out.drop(columns=["timestamp"])
# ======= OBV =======
def get_obv(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
include_signals: bool = True,
slope_lookback: int = 20,
exclude_current_bar: bool = True,
) -> pd.DataFrame:
df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet)
series = obv(df["close"], df["volume"])
if exclude_current_bar:
df, series = df.iloc[:-1], series.iloc[:-1]
out = pd.DataFrame({
"timestamp": df["timestamp"].tail(lookback_period).values,
"obv": series.tail(lookback_period).values,
}).reset_index(drop=True)
if include_signals:
slope = _slope_last_n(series, slope_lookback)
out["obv_slope_full"] = slope
# local slope over the displayed window:
out["obv_slope_window"] = _slope_last_n(pd.Series(out["obv"]), min(slope_lookback, lookback_period))
out["obv_trend"] = np.where(out["obv_slope_window"] > 0, "up", np.where(out["obv_slope_window"] < 0, "down", "flat"))
out["utc_timestamp"] = _utc_str_col(out["timestamp"])
return out.drop(columns=["timestamp"])
# ======= ADL =======
def get_adl(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
include_signals: bool = True,
exclude_current_bar: bool = True,
) -> pd.DataFrame:
"""
Accumulation/Distribution Line (ADL) with simple change/direction signals.
Signals are computed on float values; the displayed ADL is rounded to Int64.
"""
df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet)
if df.empty:
raise ValueError(f"No candles returned for {name} {interval}")
adl_series = adl(df["high"], df["low"], df["close"], df["volume"])
if exclude_current_bar:
df, adl_series = df.iloc[:-1], adl_series.iloc[:-1]
if df.empty:
raise ValueError("Not enough completed candles after excluding the current bar.")
# Assemble last window (float first)
out = pd.DataFrame({
"timestamp": df["timestamp"].tail(lookback_period).values,
"adl": adl_series.tail(lookback_period).astype(float).values,
}).reset_index(drop=True)
if include_signals:
eps = 1e-9
chg = out["adl"].diff()
prev = out["adl"].shift(1)
out["adl_change"] = chg
out["adl_change_pct"] = np.where(prev.ne(0), chg / prev, np.nan)
gt = chg.gt(eps).fillna(False)
lt = chg.lt(-eps).fillna(False)
out["adl_direction"] = np.where(gt, "up", np.where(lt, "down", "flat"))
# After computing signals, round ADL for display (keeps nullable Int64)
out["adl"] = out["adl"].round(0).astype("Int64")
out["utc_timestamp"] = pd.to_datetime(out["timestamp"], utc=True).dt.strftime("%Y-%m-%d %H:%M:%S")
return out.drop(columns=["timestamp"])
# ======= VWAP =======
def get_vwap(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
daily_reset: bool = False,
include_signals: bool = True,
exclude_current_bar: bool = True,
) -> pd.DataFrame:
df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet)
series = vwap_daily(df["high"], df["low"], df["close"], df["volume"], df["timestamp"]) if daily_reset \
else vwap_cumulative(df["high"], df["low"], df["close"], df["volume"])
if exclude_current_bar:
df, series = df.iloc[:-1], series.iloc[:-1]
out = pd.DataFrame({
"timestamp": df["timestamp"].tail(lookback_period).values,
"close": df["close"].tail(lookback_period).values,
"vwap": series.tail(lookback_period).values,
}).reset_index(drop=True)
if include_signals:
out["dist_to_vwap_pct"] = 100.0 * (out["close"] - out["vwap"]) / out["vwap"].replace(0, np.nan)
out["above_vwap"] = out["close"] > out["vwap"]
out["utc_timestamp"] = _utc_str_col(out["timestamp"])
return out.drop(columns=["timestamp"])
# ======= STOCH RSI =======
def get_stoch_rsi(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
rsi_length: int = 14,
stoch_length: int = 14,
k_smooth: int = 3,
d_smooth: int = 3,
include_signals: bool = True,
exclude_current_bar: bool = True,
) -> pd.DataFrame:
df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet)
stoch, K, D = stoch_rsi(df["close"], rsi_length, stoch_length, k_smooth, d_smooth)
if exclude_current_bar:
df, stoch, K, D = df.iloc[:-1], stoch.iloc[:-1], K.iloc[:-1], D.iloc[:-1]
out = pd.DataFrame({
"timestamp": df["timestamp"].tail(lookback_period).values,
"close": df["close"].tail(lookback_period).values,
"stoch_rsi": stoch.tail(lookback_period).values,
"%K": K.tail(lookback_period).values,
"%D": D.tail(lookback_period).values,
}).reset_index(drop=True)
if include_signals:
# Zones & crosses
out["zone"] = np.select(
[out["stoch_rsi"] >= 80, out["stoch_rsi"] <= 20],
["overbought", "oversold"], default="mid"
)
prevK, prevD = out["%K"].shift(1), out["%D"].shift(1)
out["cross"] = np.where(
(prevK < prevD) & (out["%K"] > out["%D"]), "bull_cross",
np.where((prevK > prevD) & (out["%K"] < out["%D"]), "bear_cross", "none")
)
# Optional momentum bias (fast)
out["bias"] = np.where(out["%K"] > out["%D"], "up", "down")
out["utc_timestamp"] = pd.to_datetime(out["timestamp"], utc=True).dt.strftime("%Y-%m-%d %H:%M:%S")
return out.drop(columns=["timestamp"])
# ======= MACD =======
def get_macd(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
fast: int = 12,
slow: int = 26,
signal: int = 9,
include_signals: bool = True,
exclude_current_bar: bool = True,
) -> pd.DataFrame:
df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet)
line, sig, hist = macd(df["close"], fast, slow, signal)
if exclude_current_bar:
df, line, sig, hist = df.iloc[:-1], line.iloc[:-1], sig.iloc[:-1], hist.iloc[:-1]
out = pd.DataFrame({
"timestamp": df["timestamp"].tail(lookback_period).values,
"close": df["close"].tail(lookback_period).values,
"macd": line.tail(lookback_period).values,
"signal": sig.tail(lookback_period).values,
"hist": hist.tail(lookback_period).values,
}).reset_index(drop=True)
if include_signals:
# Cross of MACD line vs signal line
prev_macd, prev_sig = out["macd"].shift(1), out["signal"].shift(1)
out["macd_cross"] = np.where(
(prev_macd < prev_sig) & (out["macd"] > out["signal"]), "bull_cross",
np.where((prev_macd > prev_sig) & (out["macd"] < out["signal"]), "bear_cross", "none")
)
# Zero-line context (momentum regime)
out["above_zero"] = out["macd"] > 0
# Histogram momentum bias
out["hist_trend"] = np.where(out["hist"].diff() > 0, "up", np.where(out["hist"].diff() < 0, "down", "flat"))
out["utc_timestamp"] = pd.to_datetime(out["timestamp"], utc=True).dt.strftime("%Y-%m-%d %H:%M:%S")
return out.drop(columns=["timestamp"])
def get_bundle(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
include: Optional[Iterable[str]] = None,
include_signals: bool = True,
exclude_current_bar: bool = True,
indicator_params: Optional[Dict[str, Dict[str, Any]]] = None,
return_last_only: bool = False,
profile: str = "default", # NEW: optional profiles
) -> Dict[str, Any]:
"""
Bundle multiple indicators into one JSON-serializable dict.
- Use `profile` to select a preset, or pass `include` explicitly.
- Merges `indicator_params` per-indicator (e.g., {"macd": {"fast": 8}}).
- If `return_last_only=True`, returns the latest completed row per indicator.
Returns:
{
"coin": str,
"interval": str,
"asof_utc": "YYYY-MM-DD HH:MM:SS" | None,
"included": [str, ...],
"<indicator>": list[dict] | dict
}
"""
indicator_params = indicator_params or {}
# Presets (match your curated bundle)
PROFILES: Dict[str, List[str]] = {
"default": ["ema","stoch_rsi","macd","adl","volume","atr_adx","bbands"],
"trend": ["ema","macd","atr_adx","adl","volume"],
"momentum": ["rsi","stoch_rsi","macd","volume"],
"volatility": ["bbands","atr_adx","volume"],
"scalper": ["stoch_rsi","macd","volume","adl"],
"intraday": ["ema","vwap","adl","volume"],
}
# If include is provided, it overrides profile
include_list = list(include) if include is not None else PROFILES.get(profile, PROFILES["default"])
# Deduplicate & keep order
seen = set()
include_list = [x for x in include_list if not (x in seen or seen.add(x))]
out: Dict[str, Any] = {"coin": name, "interval": interval, "included": include_list}
ts_list: List[str] = []
# Map names to local getters (adjust to hi.get_* if this lives outside hl_indicators2.py)
mapping = {
"ema": get_ema,
"rsi": get_rsi,
"stoch_rsi": get_stoch_rsi,
"macd": get_macd,
"bbands": get_bbands,
"atr_adx": get_atr_adx,
"obv": get_obv,
"adl": get_adl,
"vwap": get_vwap,
"volume": get_volume,
}
def _run(ind_name: str):
fn = mapping.get(ind_name)
if fn is None:
# unknown indicator name; skip safely
return
# common params
params = dict(
name=name,
interval=interval,
lookback_period=lookback_period,
limit=limit,
testnet=testnet,
include_signals=include_signals,
exclude_current_bar=exclude_current_bar,
)
# merge per-indicator overrides
params.update(indicator_params.get(ind_name, {}))
try:
df = fn(**params)
except Exception as e:
# fail-soft: surface error text, don't kill the whole bundle
out[ind_name] = {"error": str(e)}
return
# Normalize to records
if isinstance(df, pd.DataFrame):
recs = df.to_dict(orient="records")
out[ind_name] = recs[-1] if (return_last_only and recs) else recs
if recs and "utc_timestamp" in recs[-1]:
ts_list.append(recs[-1]["utc_timestamp"])
elif isinstance(df, list):
out[ind_name] = df[-1] if (return_last_only and df) else df
if df and isinstance(df[-1], dict) and "utc_timestamp" in df[-1]:
ts_list.append(df[-1]["utc_timestamp"])
else:
# unexpected type; just return it
out[ind_name] = df
for ind in include_list:
_run(ind)
# As-of time = max completed timestamp (string in UTC format is lexicographically safe)
ts_clean = [t for t in ts_list if t]
out["asof_utc"] = max(ts_clean) if ts_clean else None
out["profile"] = profile
return out