Spaces:
Sleeping
Sleeping
| # hl_indicators2.py | |
| from __future__ import annotations | |
| import time | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any, List, Tuple, Literal, Iterable, Union, Optional | |
| from hyperliquid.info import Info | |
| from hyperliquid.utils import constants | |
| Interval = Literal["1m", "5m", "15m", "1h", "4h", "1d"] | |
| _MS = {"1m": 60_000, "5m": 5*60_000, "15m": 15*60_000, "1h": 60*60_000, "4h": 4*60*60_000, "1d": 24*60*60_000} | |
| def _now_ms() -> int: | |
| return int(time.time() * 1000) | |
| def _start_end_from_limit(interval: Interval, limit: int, end_ms: int | None = None) -> tuple[int, int, str, str]: | |
| end_ms = end_ms or _now_ms() | |
| span = (limit + 2) * _MS[interval] # small buffer for smoothing windows | |
| start_ms = max(0, end_ms - span) | |
| start_utc = datetime.fromtimestamp(start_ms / 1000, tz=timezone.utc).strftime("%Y-%m-%d %H:%M") | |
| end_utc = datetime.fromtimestamp(end_ms / 1000, tz=timezone.utc).strftime("%Y-%m-%d %H:%M") | |
| return start_ms, end_ms, start_utc, end_utc | |
| def fetch_candles( | |
| name: str, | |
| interval: Interval = "1h", | |
| limit: int = 300, | |
| testnet: bool = False, | |
| end_ms: int | None = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Fetch OHLCV candles via Info.candles_snapshot(name, interval, startTime, endTime). | |
| Returns DataFrame with ['timestamp','open','high','low','close','volume'] sorted by time. | |
| """ | |
| api_url = constants.TESTNET_API_URL if testnet else constants.MAINNET_API_URL | |
| info = Info(api_url, skip_ws=True) | |
| start_ms, end_ms, _, _ = _start_end_from_limit(interval, limit, end_ms) | |
| raw = info.candles_snapshot(name, interval, start_ms, end_ms) | |
| if not raw: | |
| raise ValueError(f"No candles returned for {name} {interval}") | |
| df = pd.DataFrame(raw).rename(columns={ | |
| "t": "timestamp", "o": "open", "h": "high", "l": "low", "c": "close", "v": "volume", | |
| "T": "close_time", "i": "interval", "s": "symbol", "n": "trades", | |
| }) | |
| needed = ["timestamp", "open", "high", "low", "close", "volume"] | |
| for k in needed: | |
| if k not in df.columns: | |
| raise ValueError(f"Missing '{k}' in candles_snapshot payload. Got: {list(df.columns)}") | |
| df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms", errors="coerce", utc=True) | |
| df = df.dropna(subset=["timestamp","close"]).sort_values("timestamp").reset_index(drop=True) | |
| for k in ["open","high","low","close","volume"]: | |
| df[k] = pd.to_numeric(df[k], errors="coerce") | |
| df = df.dropna(subset=["timestamp","close"]).sort_values("timestamp").reset_index(drop=True) | |
| if len(df) > limit: | |
| df = df.iloc[-limit:].reset_index(drop=True) | |
| return df | |
| # ---------------- Base indicators ---------------- # | |
| def ema(series: pd.Series, period: int) -> pd.Series: | |
| return series.ewm(span=period, adjust=False).mean() | |
| def rsi(series: pd.Series, period: int = 14) -> pd.Series: | |
| delta = series.diff() | |
| up = delta.clip(lower=0.0) | |
| down = (-delta).clip(lower=0.0) | |
| avg_up = up.ewm(alpha=1/period, adjust=False).mean() | |
| avg_down = down.ewm(alpha=1/period, adjust=False).mean() | |
| rs = avg_up / avg_down.replace(0, np.nan) | |
| return (100 - (100 / (1 + rs))).fillna(0) | |
| def true_range(high: pd.Series, low: pd.Series, close: pd.Series) -> pd.Series: | |
| prev_close = close.shift(1) | |
| tr = pd.concat([(high - low).abs(), | |
| (high - prev_close).abs(), | |
| (low - prev_close).abs()], axis=1).max(axis=1) | |
| return tr | |
| def atr(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> pd.Series: | |
| return true_range(high, low, close).ewm(alpha=1/period, adjust=False).mean() | |
| def di_adx(high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14) -> Tuple[pd.Series, pd.Series, pd.Series]: | |
| up_move = high.diff() | |
| down_move = -low.diff() | |
| plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0) | |
| minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0) | |
| atr_s = atr(high, low, close, period) | |
| plus_di = 100 * pd.Series(plus_dm, index=high.index).ewm(alpha=1/period, adjust=False).mean() / atr_s.replace(0, np.nan) | |
| minus_di = 100 * pd.Series(minus_dm, index=high.index).ewm(alpha=1/period, adjust=False).mean() / atr_s.replace(0, np.nan) | |
| dx = (100 * (plus_di - minus_di).abs() / (plus_di + minus_di).replace(0, np.nan)).fillna(0) | |
| adx = dx.ewm(alpha=1/period, adjust=False).mean() | |
| return plus_di.fillna(0), minus_di.fillna(0), adx.fillna(0) | |
| def bbands(series: pd.Series, period: int = 20, std_mult: float = 2.0) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series, pd.Series]: | |
| ma = series.rolling(period, min_periods=period).mean() | |
| sd = series.rolling(period, min_periods=period).std(ddof=0) | |
| upper = ma + std_mult * sd | |
| lower = ma - std_mult * sd | |
| width = (upper - lower) | |
| pct_b = ((series - lower) / width).clip(lower=0, upper=1) | |
| bandwidth = (width / ma.replace(0, np.nan)).replace([np.inf, -np.inf], np.nan) | |
| return ma, upper, lower, pct_b, bandwidth | |
| def obv(close: pd.Series, volume: pd.Series) -> pd.Series: | |
| sign = np.sign(close.diff()).fillna(0.0) | |
| return (volume * sign).cumsum() | |
| def adl(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series) -> pd.Series: | |
| rng = (high - low).replace(0, np.nan) | |
| mfm = ((close - low) - (high - close)) / rng | |
| return (mfm.fillna(0.0) * volume).cumsum() | |
| def macd(series: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> Tuple[pd.Series, pd.Series, pd.Series]: | |
| fast_ema = series.ewm(span=fast, adjust=False).mean() | |
| slow_ema = series.ewm(span=slow, adjust=False).mean() | |
| line = fast_ema - slow_ema | |
| sig = line.ewm(span=signal, adjust=False).mean() | |
| hist = line - sig | |
| return line, sig, hist | |
| def stoch_rsi(series: pd.Series, rsi_length: int = 14, stoch_length: int = 14, | |
| k_smooth: int = 3, d_smooth: int = 3) -> Tuple[pd.Series, pd.Series, pd.Series]: | |
| # Base RSI | |
| delta = series.diff() | |
| up = delta.clip(lower=0.0) | |
| down = (-delta).clip(lower=0.0) | |
| avg_up = up.ewm(alpha=1/rsi_length, adjust=False).mean() | |
| avg_down = down.ewm(alpha=1/rsi_length, adjust=False).mean() | |
| rs = avg_up / avg_down.replace(0, np.nan) | |
| rsi = (100 - (100 / (1 + rs))).fillna(0) | |
| # Stoch over RSI | |
| r_low = rsi.rolling(stoch_length, min_periods=stoch_length).min() | |
| r_high = rsi.rolling(stoch_length, min_periods=stoch_length).max() | |
| base = (rsi - r_low) / (r_high - r_low) | |
| k = base.rolling(k_smooth, min_periods=k_smooth).mean() * 100.0 | |
| d = k.rolling(d_smooth, min_periods=d_smooth).mean() | |
| return (base * 100.0).fillna(0), k.fillna(0), d.fillna(0) | |
| def vwap_cumulative(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series) -> pd.Series: | |
| tp = (high + low + close) / 3.0 | |
| cum_v = volume.cumsum().replace(0, np.nan) | |
| return (tp * volume).cumsum() / cum_v | |
| def vwap_daily(high: pd.Series, low: pd.Series, close: pd.Series, volume: pd.Series, ts: pd.Series) -> pd.Series: | |
| tp = (high + low + close) / 3.0 | |
| df = pd.DataFrame({"tp": tp, "v": volume, "date": pd.to_datetime(ts, utc=True).dt.date}) | |
| df["tpv"] = df["tp"] * df["v"] | |
| cum = df.groupby("date")[["tpv", "v"]].cumsum() | |
| vw = (cum["tpv"] / cum["v"].replace(0, np.nan)).values | |
| return pd.Series(vw, index=tp.index) | |
| def _utc_str_col(series: pd.Series) -> pd.Series: | |
| return pd.to_datetime(series, utc=True).dt.strftime("%Y-%m-%d %H:%M:%S") | |
| def _slope_last_n(s: pd.Series, n: int = 20) -> float: | |
| n = min(n, len(s)) | |
| if n < 2: return 0.0 | |
| y = s.iloc[-n:].astype(float).values | |
| x = np.arange(n, dtype=float) | |
| xm, ym = x.mean(), y.mean() | |
| denom = ((x - xm) ** 2).sum() | |
| return 0.0 if denom == 0 else float(((x - xm) * (y - ym)).sum() / denom) | |
| # ---------------- MCP-friendly functions (per indicator) ---------------- # | |
| def get_ema( | |
| name: str, | |
| interval: Interval = "1h", | |
| lookback_period: int = 6, | |
| limit: int = 600, | |
| testnet: bool = False, | |
| include_signals: bool = True, | |
| exclude_current_bar: bool = True, # NEW | |
| ) -> pd.DataFrame: | |
| if lookback_period < 1: | |
| raise ValueError("lookback_period must be >= 1") | |
| df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet) | |
| if df.empty: | |
| raise ValueError(f"No candles returned for {name} {interval}") | |
| df = df.copy() | |
| df["close"] = pd.to_numeric(df["close"], errors="coerce") | |
| df = df.dropna(subset=["close"]).reset_index(drop=True) | |
| df["ema_9"] = ema(df["close"], 9) | |
| df["ema_20"] = ema(df["close"], 20) | |
| df["ema_200"] = ema(df["close"], 200) | |
| if include_signals: | |
| df["ema9_gt_ema20"] = df["ema_9"] > df["ema_20"] | |
| df["ema20_gt_ema200"] = df["ema_20"] > df["ema_200"] | |
| def _cross(now: pd.Series, prev: pd.Series) -> pd.Series: | |
| return np.where((prev == False) & (now == True), "bullish_cross", | |
| np.where((prev == True) & (now == False), "bearish_cross", "none")) | |
| prev_9_gt_20 = df["ema9_gt_ema20"].shift(1) | |
| prev_20_gt_200 = df["ema20_gt_ema200"].shift(1) | |
| df["cross_9_20"] = _cross(df["ema9_gt_ema20"], prev_9_gt_20) | |
| df["cross_20_200"] = _cross(df["ema20_gt_ema200"], prev_20_gt_200) | |
| def _trend_row(r) -> str: | |
| if r["ema_9"] > r["ema_20"] > r["ema_200"]: | |
| return "bullish" | |
| if r["ema_9"] < r["ema_20"] < r["ema_200"]: | |
| return "bearish" | |
| return "neutral" | |
| df["trend"] = df[["ema_9","ema_20","ema_200"]].apply(_trend_row, axis=1) | |
| # Apply the exclusion consistently at the end | |
| df_ex = df.iloc[:-1] if exclude_current_bar else df | |
| cols = ["timestamp", "close", "ema_9", "ema_20", "ema_200"] | |
| if include_signals: | |
| cols += ["ema9_gt_ema20", "ema20_gt_ema200", "cross_9_20", "cross_20_200", "trend"] | |
| result = df_ex[cols].tail(lookback_period).reset_index(drop=True) | |
| result = result.rename(columns={"timestamp": "utc_timestamp"}) | |
| result["utc_timestamp"] = result["utc_timestamp"].dt.strftime("%Y-%m-%d %H:%M:%S") | |
| return result | |
| def get_volume( | |
| name: str, | |
| interval: Interval = "1h", | |
| lookback_period: int = 6, | |
| limit: int = 600, | |
| testnet: bool = False, | |
| include_signals: bool = True, | |
| exclude_current_bar: bool = True, | |
| high_ratio: float = 1.5, | |
| low_ratio: float = 0.7, | |
| z_lookback: int = 100, | |
| z_hi: float = 2.0, | |
| z_lo: float = -2.0, | |
| ) -> pd.DataFrame: | |
| """ | |
| Returns the last `lookback_period` rows with volume-based features: | |
| - volume | |
| - avg_past_24_session_volume | |
| - volume_ratio, volume_change, zscore_volume, volume_signal (if include_signals=True) | |
| """ | |
| # --- fetch and basic prep --- | |
| df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet) | |
| if df.empty: | |
| raise ValueError(f"No candles returned for {name} {interval}") | |
| df = df[["timestamp", "volume"]].copy() | |
| # 24-session average (optionally exclude current bar from the average computation) | |
| vol_for_avg = df["volume"].shift(1) if exclude_current_bar else df["volume"] | |
| df["avg_past_24_session_volume"] = vol_for_avg.rolling(window=24, min_periods=24).mean() | |
| # Conditionally exclude the current (possibly still-forming) bar from all downstream calcs | |
| df_ex = df.iloc[:-1] if exclude_current_bar else df | |
| # Slice tail window & format timestamp | |
| out = df_ex.tail(lookback_period).reset_index(drop=True).copy() | |
| out["utc_timestamp"] = pd.to_datetime(out["timestamp"], utc=True).dt.strftime("%Y-%m-%d %H:%M:%S") | |
| # --- signals --- | |
| if include_signals: | |
| # Ratio: current volume vs 24-period average | |
| out["volume_ratio"] = out["volume"] / out["avg_past_24_session_volume"] | |
| # Percent change vs previous bar | |
| out["volume_change"] = out["volume"].pct_change() | |
| # Z-score vs rolling mean/std (computed on full excluded dataset) | |
| base_ex = df_ex.copy() | |
| roll_mean = base_ex["volume"].rolling(z_lookback, min_periods=z_lookback).mean() | |
| roll_std = base_ex["volume"].rolling(z_lookback, min_periods=z_lookback).std(ddof=0) | |
| base_ex["zscore_volume"] = (base_ex["volume"] - roll_mean) / roll_std | |
| out["zscore_volume"] = base_ex["zscore_volume"].tail(lookback_period).to_numpy() | |
| # Categorical signal from ratio thresholds | |
| ratio = out["volume_ratio"] | |
| conds = [ | |
| (ratio.notna()) & (ratio > high_ratio), | |
| (ratio.notna()) & (ratio < low_ratio) | |
| ] | |
| out["volume_signal"] = np.select(conds, ["high", "low"], default="normal") | |
| # --- clean integers for readability --- | |
| for col in ["volume", "avg_past_24_session_volume"]: | |
| out[col] = out[col].round(0).astype("Int64") | |
| cols = ["utc_timestamp", "volume", "avg_past_24_session_volume"] | |
| if include_signals: | |
| cols += ["volume_ratio", "volume_change", "zscore_volume", "volume_signal"] | |
| return out[cols] | |
| # ======= RSI ======= | |
| def get_rsi( | |
| name: str, | |
| interval: Interval = "1h", | |
| lookback_period: int = 6, | |
| limit: int = 600, | |
| testnet: bool = False, | |
| period: int = 14, | |
| include_signals: bool = True, | |
| exclude_current_bar: bool = True, | |
| ) -> pd.DataFrame: | |
| df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet) | |
| if df.empty: | |
| raise ValueError(f"No candles returned for {name} {interval}") | |
| close = df["close"].copy() | |
| r = rsi(close, period) | |
| # Apply exclusion ONCE, then use the sliced frame for all outputs | |
| if exclude_current_bar: | |
| df = df.iloc[:-1] | |
| r = r.iloc[:-1] | |
| close = close.iloc[:-1] | |
| out = pd.DataFrame({ | |
| "timestamp": df["timestamp"].tail(lookback_period).values, | |
| "close": close.tail(lookback_period).values, | |
| "rsi": r.tail(lookback_period).values, | |
| }).reset_index(drop=True) | |
| if include_signals: | |
| out["rsi_state"] = np.select( | |
| [out["rsi"] >= 70, out["rsi"] <= 30], | |
| ["overbought", "oversold"], default="neutral" | |
| ) | |
| prev = out["rsi"].shift(1) | |
| out["rsi_cross_50"] = ((prev < 50) & (out["rsi"] >= 50)) | ((prev > 50) & (out["rsi"] <= 50)) | |
| out["rsi_slope"] = out["rsi"].diff() | |
| out["utc_timestamp"] = _utc_str_col(out["timestamp"]) | |
| return out.drop(columns=["timestamp"]) | |
| # ======= Bollinger Bands ======= | |
| def get_bbands( | |
| name: str, | |
| interval: Interval = "1h", | |
| lookback_period: int = 6, | |
| limit: int = 600, | |
| testnet: bool = False, | |
| period: int = 20, | |
| std_mult: float = 2.0, | |
| include_signals: bool = True, | |
| squeeze_lookback: int = 180, | |
| exclude_current_bar: bool = True, | |
| ) -> pd.DataFrame: | |
| df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet) | |
| ma, up, lo, pctb, bw = bbands(df["close"], period, std_mult) | |
| if exclude_current_bar: | |
| df, ma, up, lo, pctb, bw = df.iloc[:-1], ma.iloc[:-1], up.iloc[:-1], lo.iloc[:-1], pctb.iloc[:-1], bw.iloc[:-1] | |
| out = pd.DataFrame({ | |
| "timestamp": df["timestamp"].tail(lookback_period).values, | |
| "close": df["close"].tail(lookback_period).values, | |
| "bb_basis": ma.tail(lookback_period).values, | |
| "bb_upper": up.tail(lookback_period).values, | |
| "bb_lower": lo.tail(lookback_period).values, | |
| "pct_b": pctb.tail(lookback_period).values, | |
| "bandwidth": bw.tail(lookback_period).values, | |
| }).reset_index(drop=True) | |
| if include_signals: | |
| # rolling percentile rank for squeeze (lower percentile = tighter bands) | |
| bw_roll = bw.rolling(squeeze_lookback, min_periods=10) | |
| # percentile of current bandwidth within window | |
| def _pr(x): | |
| s = pd.Series(x).dropna() | |
| if len(s) == 0: return np.nan | |
| return (s.rank(pct=True).iloc[-1]) | |
| squeeze_p = bw_roll.apply(_pr, raw=False) | |
| squeeze_p = squeeze_p.tail(lookback_period).values | |
| out["squeeze_percentile"] = squeeze_p | |
| out["in_squeeze"] = pd.Series(squeeze_p).le(0.1) # bottom 10% | |
| out["above_basis"] = out["close"] > out["bb_basis"] | |
| out["touching_upper"] = out["close"] >= out["bb_upper"] | |
| out["touching_lower"] = out["close"] <= out["bb_lower"] | |
| out["utc_timestamp"] = _utc_str_col(out["timestamp"]) | |
| return out.drop(columns=["timestamp"]) | |
| # ======= ATR + ADX ======= | |
| def get_atr_adx( | |
| name: str, | |
| interval: Interval = "1h", | |
| lookback_period: int = 6, | |
| limit: int = 600, | |
| testnet: bool = False, | |
| period: int = 14, | |
| include_signals: bool = True, | |
| adx_threshold: float = 25.0, | |
| exclude_current_bar: bool = True, | |
| ) -> pd.DataFrame: | |
| df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet) | |
| plus_di, minus_di, adx_s = di_adx(df["high"], df["low"], df["close"], period) | |
| atr_s = atr(df["high"], df["low"], df["close"], period) | |
| if exclude_current_bar: | |
| df, plus_di, minus_di, adx_s, atr_s = df.iloc[:-1], plus_di.iloc[:-1], minus_di.iloc[:-1], adx_s.iloc[:-1], atr_s.iloc[:-1] | |
| out = pd.DataFrame({ | |
| "timestamp": df["timestamp"].tail(lookback_period).values, | |
| "+DI": plus_di.tail(lookback_period).values, | |
| "-DI": minus_di.tail(lookback_period).values, | |
| "ADX": adx_s.tail(lookback_period).values, | |
| "ATR": atr_s.tail(lookback_period).values, | |
| }).reset_index(drop=True) | |
| if include_signals: | |
| out["trend_ok"] = out["ADX"] >= adx_threshold | |
| out["direction"] = np.where(out["+DI"] > out["-DI"], "up", "down") | |
| out["utc_timestamp"] = _utc_str_col(out["timestamp"]) | |
| return out.drop(columns=["timestamp"]) | |
| # ======= OBV ======= | |
| def get_obv( | |
| name: str, | |
| interval: Interval = "1h", | |
| lookback_period: int = 6, | |
| limit: int = 600, | |
| testnet: bool = False, | |
| include_signals: bool = True, | |
| slope_lookback: int = 20, | |
| exclude_current_bar: bool = True, | |
| ) -> pd.DataFrame: | |
| df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet) | |
| series = obv(df["close"], df["volume"]) | |
| if exclude_current_bar: | |
| df, series = df.iloc[:-1], series.iloc[:-1] | |
| out = pd.DataFrame({ | |
| "timestamp": df["timestamp"].tail(lookback_period).values, | |
| "obv": series.tail(lookback_period).values, | |
| }).reset_index(drop=True) | |
| if include_signals: | |
| slope = _slope_last_n(series, slope_lookback) | |
| out["obv_slope_full"] = slope | |
| # local slope over the displayed window: | |
| out["obv_slope_window"] = _slope_last_n(pd.Series(out["obv"]), min(slope_lookback, lookback_period)) | |
| out["obv_trend"] = np.where(out["obv_slope_window"] > 0, "up", np.where(out["obv_slope_window"] < 0, "down", "flat")) | |
| out["utc_timestamp"] = _utc_str_col(out["timestamp"]) | |
| return out.drop(columns=["timestamp"]) | |
| # ======= ADL ======= | |
| def get_adl( | |
| name: str, | |
| interval: Interval = "1h", | |
| lookback_period: int = 6, | |
| limit: int = 600, | |
| testnet: bool = False, | |
| include_signals: bool = True, | |
| exclude_current_bar: bool = True, | |
| ) -> pd.DataFrame: | |
| """ | |
| Accumulation/Distribution Line (ADL) with simple change/direction signals. | |
| Signals are computed on float values; the displayed ADL is rounded to Int64. | |
| """ | |
| df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet) | |
| if df.empty: | |
| raise ValueError(f"No candles returned for {name} {interval}") | |
| adl_series = adl(df["high"], df["low"], df["close"], df["volume"]) | |
| if exclude_current_bar: | |
| df, adl_series = df.iloc[:-1], adl_series.iloc[:-1] | |
| if df.empty: | |
| raise ValueError("Not enough completed candles after excluding the current bar.") | |
| # Assemble last window (float first) | |
| out = pd.DataFrame({ | |
| "timestamp": df["timestamp"].tail(lookback_period).values, | |
| "adl": adl_series.tail(lookback_period).astype(float).values, | |
| }).reset_index(drop=True) | |
| if include_signals: | |
| eps = 1e-9 | |
| chg = out["adl"].diff() | |
| prev = out["adl"].shift(1) | |
| out["adl_change"] = chg | |
| out["adl_change_pct"] = np.where(prev.ne(0), chg / prev, np.nan) | |
| gt = chg.gt(eps).fillna(False) | |
| lt = chg.lt(-eps).fillna(False) | |
| out["adl_direction"] = np.where(gt, "up", np.where(lt, "down", "flat")) | |
| # After computing signals, round ADL for display (keeps nullable Int64) | |
| out["adl"] = out["adl"].round(0).astype("Int64") | |
| out["utc_timestamp"] = pd.to_datetime(out["timestamp"], utc=True).dt.strftime("%Y-%m-%d %H:%M:%S") | |
| return out.drop(columns=["timestamp"]) | |
| # ======= VWAP ======= | |
| def get_vwap( | |
| name: str, | |
| interval: Interval = "1h", | |
| lookback_period: int = 6, | |
| limit: int = 600, | |
| testnet: bool = False, | |
| daily_reset: bool = False, | |
| include_signals: bool = True, | |
| exclude_current_bar: bool = True, | |
| ) -> pd.DataFrame: | |
| df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet) | |
| series = vwap_daily(df["high"], df["low"], df["close"], df["volume"], df["timestamp"]) if daily_reset \ | |
| else vwap_cumulative(df["high"], df["low"], df["close"], df["volume"]) | |
| if exclude_current_bar: | |
| df, series = df.iloc[:-1], series.iloc[:-1] | |
| out = pd.DataFrame({ | |
| "timestamp": df["timestamp"].tail(lookback_period).values, | |
| "close": df["close"].tail(lookback_period).values, | |
| "vwap": series.tail(lookback_period).values, | |
| }).reset_index(drop=True) | |
| if include_signals: | |
| out["dist_to_vwap_pct"] = 100.0 * (out["close"] - out["vwap"]) / out["vwap"].replace(0, np.nan) | |
| out["above_vwap"] = out["close"] > out["vwap"] | |
| out["utc_timestamp"] = _utc_str_col(out["timestamp"]) | |
| return out.drop(columns=["timestamp"]) | |
| # ======= STOCH RSI ======= | |
| def get_stoch_rsi( | |
| name: str, | |
| interval: Interval = "1h", | |
| lookback_period: int = 6, | |
| limit: int = 600, | |
| testnet: bool = False, | |
| rsi_length: int = 14, | |
| stoch_length: int = 14, | |
| k_smooth: int = 3, | |
| d_smooth: int = 3, | |
| include_signals: bool = True, | |
| exclude_current_bar: bool = True, | |
| ) -> pd.DataFrame: | |
| df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet) | |
| stoch, K, D = stoch_rsi(df["close"], rsi_length, stoch_length, k_smooth, d_smooth) | |
| if exclude_current_bar: | |
| df, stoch, K, D = df.iloc[:-1], stoch.iloc[:-1], K.iloc[:-1], D.iloc[:-1] | |
| out = pd.DataFrame({ | |
| "timestamp": df["timestamp"].tail(lookback_period).values, | |
| "close": df["close"].tail(lookback_period).values, | |
| "stoch_rsi": stoch.tail(lookback_period).values, | |
| "%K": K.tail(lookback_period).values, | |
| "%D": D.tail(lookback_period).values, | |
| }).reset_index(drop=True) | |
| if include_signals: | |
| # Zones & crosses | |
| out["zone"] = np.select( | |
| [out["stoch_rsi"] >= 80, out["stoch_rsi"] <= 20], | |
| ["overbought", "oversold"], default="mid" | |
| ) | |
| prevK, prevD = out["%K"].shift(1), out["%D"].shift(1) | |
| out["cross"] = np.where( | |
| (prevK < prevD) & (out["%K"] > out["%D"]), "bull_cross", | |
| np.where((prevK > prevD) & (out["%K"] < out["%D"]), "bear_cross", "none") | |
| ) | |
| # Optional momentum bias (fast) | |
| out["bias"] = np.where(out["%K"] > out["%D"], "up", "down") | |
| out["utc_timestamp"] = pd.to_datetime(out["timestamp"], utc=True).dt.strftime("%Y-%m-%d %H:%M:%S") | |
| return out.drop(columns=["timestamp"]) | |
| # ======= MACD ======= | |
| def get_macd( | |
| name: str, | |
| interval: Interval = "1h", | |
| lookback_period: int = 6, | |
| limit: int = 600, | |
| testnet: bool = False, | |
| fast: int = 12, | |
| slow: int = 26, | |
| signal: int = 9, | |
| include_signals: bool = True, | |
| exclude_current_bar: bool = True, | |
| ) -> pd.DataFrame: | |
| df = fetch_candles(name, interval=interval, limit=limit, testnet=testnet) | |
| line, sig, hist = macd(df["close"], fast, slow, signal) | |
| if exclude_current_bar: | |
| df, line, sig, hist = df.iloc[:-1], line.iloc[:-1], sig.iloc[:-1], hist.iloc[:-1] | |
| out = pd.DataFrame({ | |
| "timestamp": df["timestamp"].tail(lookback_period).values, | |
| "close": df["close"].tail(lookback_period).values, | |
| "macd": line.tail(lookback_period).values, | |
| "signal": sig.tail(lookback_period).values, | |
| "hist": hist.tail(lookback_period).values, | |
| }).reset_index(drop=True) | |
| if include_signals: | |
| # Cross of MACD line vs signal line | |
| prev_macd, prev_sig = out["macd"].shift(1), out["signal"].shift(1) | |
| out["macd_cross"] = np.where( | |
| (prev_macd < prev_sig) & (out["macd"] > out["signal"]), "bull_cross", | |
| np.where((prev_macd > prev_sig) & (out["macd"] < out["signal"]), "bear_cross", "none") | |
| ) | |
| # Zero-line context (momentum regime) | |
| out["above_zero"] = out["macd"] > 0 | |
| # Histogram momentum bias | |
| out["hist_trend"] = np.where(out["hist"].diff() > 0, "up", np.where(out["hist"].diff() < 0, "down", "flat")) | |
| out["utc_timestamp"] = pd.to_datetime(out["timestamp"], utc=True).dt.strftime("%Y-%m-%d %H:%M:%S") | |
| return out.drop(columns=["timestamp"]) | |
| def get_bundle( | |
| name: str, | |
| interval: Interval = "1h", | |
| lookback_period: int = 6, | |
| limit: int = 600, | |
| testnet: bool = False, | |
| include: Optional[Iterable[str]] = None, | |
| include_signals: bool = True, | |
| exclude_current_bar: bool = True, | |
| indicator_params: Optional[Dict[str, Dict[str, Any]]] = None, | |
| return_last_only: bool = False, | |
| profile: str = "default", # NEW: optional profiles | |
| ) -> Dict[str, Any]: | |
| """ | |
| Bundle multiple indicators into one JSON-serializable dict. | |
| - Use `profile` to select a preset, or pass `include` explicitly. | |
| - Merges `indicator_params` per-indicator (e.g., {"macd": {"fast": 8}}). | |
| - If `return_last_only=True`, returns the latest completed row per indicator. | |
| Returns: | |
| { | |
| "coin": str, | |
| "interval": str, | |
| "asof_utc": "YYYY-MM-DD HH:MM:SS" | None, | |
| "included": [str, ...], | |
| "<indicator>": list[dict] | dict | |
| } | |
| """ | |
| indicator_params = indicator_params or {} | |
| # Presets (match your curated bundle) | |
| PROFILES: Dict[str, List[str]] = { | |
| "default": ["ema","stoch_rsi","macd","adl","volume","atr_adx","bbands"], | |
| "trend": ["ema","macd","atr_adx","adl","volume"], | |
| "momentum": ["rsi","stoch_rsi","macd","volume"], | |
| "volatility": ["bbands","atr_adx","volume"], | |
| "scalper": ["stoch_rsi","macd","volume","adl"], | |
| "intraday": ["ema","vwap","adl","volume"], | |
| } | |
| # If include is provided, it overrides profile | |
| include_list = list(include) if include is not None else PROFILES.get(profile, PROFILES["default"]) | |
| # Deduplicate & keep order | |
| seen = set() | |
| include_list = [x for x in include_list if not (x in seen or seen.add(x))] | |
| out: Dict[str, Any] = {"coin": name, "interval": interval, "included": include_list} | |
| ts_list: List[str] = [] | |
| # Map names to local getters (adjust to hi.get_* if this lives outside hl_indicators2.py) | |
| mapping = { | |
| "ema": get_ema, | |
| "rsi": get_rsi, | |
| "stoch_rsi": get_stoch_rsi, | |
| "macd": get_macd, | |
| "bbands": get_bbands, | |
| "atr_adx": get_atr_adx, | |
| "obv": get_obv, | |
| "adl": get_adl, | |
| "vwap": get_vwap, | |
| "volume": get_volume, | |
| } | |
| def _run(ind_name: str): | |
| fn = mapping.get(ind_name) | |
| if fn is None: | |
| # unknown indicator name; skip safely | |
| return | |
| # common params | |
| params = dict( | |
| name=name, | |
| interval=interval, | |
| lookback_period=lookback_period, | |
| limit=limit, | |
| testnet=testnet, | |
| include_signals=include_signals, | |
| exclude_current_bar=exclude_current_bar, | |
| ) | |
| # merge per-indicator overrides | |
| params.update(indicator_params.get(ind_name, {})) | |
| try: | |
| df = fn(**params) | |
| except Exception as e: | |
| # fail-soft: surface error text, don't kill the whole bundle | |
| out[ind_name] = {"error": str(e)} | |
| return | |
| # Normalize to records | |
| if isinstance(df, pd.DataFrame): | |
| recs = df.to_dict(orient="records") | |
| out[ind_name] = recs[-1] if (return_last_only and recs) else recs | |
| if recs and "utc_timestamp" in recs[-1]: | |
| ts_list.append(recs[-1]["utc_timestamp"]) | |
| elif isinstance(df, list): | |
| out[ind_name] = df[-1] if (return_last_only and df) else df | |
| if df and isinstance(df[-1], dict) and "utc_timestamp" in df[-1]: | |
| ts_list.append(df[-1]["utc_timestamp"]) | |
| else: | |
| # unexpected type; just return it | |
| out[ind_name] = df | |
| for ind in include_list: | |
| _run(ind) | |
| # As-of time = max completed timestamp (string in UTC format is lexicographically safe) | |
| ts_clean = [t for t in ts_list if t] | |
| out["asof_utc"] = max(ts_clean) if ts_clean else None | |
| out["profile"] = profile | |
| return out | |