from __future__ import annotations from typing import Dict, Any, List, Literal, Iterable, Optional import numpy as np import pandas as pd from mcp.server.fastmcp import FastMCP import hl_indicators as hi # your indicators module Interval = Literal["1m", "5m", "15m", "1h", "4h", "1d"] mcp = FastMCP("hl_indicators_server") # ------------------ Health check ------------------ # @mcp.tool() async def ping() -> str: """Health check for MCP server. Returns "pong" if the server is up and responding. """ return "pong" # ------------------ Helpers ------------------ # def _df_to_records(df: pd.DataFrame) -> List[Dict[str, Any]]: """Convert a pandas DataFrame to JSON records. Falls back to returning the object unchanged if already list[dict]. """ if isinstance(df, pd.DataFrame): return df.to_dict(orient="records") return df # type: ignore[return-value] # ------------------ Indicator tools ------------------ # # ------------------ Bundle tool ------------------ # PROFILES: Dict[str, List[str]] = { "default": ["ema","stoch_rsi","macd","adl","volume","atr_adx","bbands"], "trend": ["ema","macd","atr_adx","adl","volume"], "momentum": ["rsi","stoch_rsi","macd","volume"], "volatility": ["bbands","atr_adx","volume"], "scalper": ["stoch_rsi","macd","volume","adl"], "intraday": ["ema","vwap","adl","volume"], } @mcp.tool() def bundle( name: str, interval: Interval = "1h", lookback_period: int = 6, limit: int = 600, testnet: bool = False, include: Optional[Iterable[str]] = None, profile: str = "default", include_signals: bool = True, exclude_current_bar: bool = True, return_last_only: bool = False, indicator_params: Optional[Dict[str, Dict[str, Any]]] = None, ) -> Dict[str, Any]: """Bundle multiple indicators into one JSON payload. Profiles available: - "default": ["ema","stoch_rsi","macd","adl","volume","atr_adx","bbands"] - "trend": ["ema","macd","atr_adx","adl","volume"] - "momentum": ["rsi","stoch_rsi","macd","volume"] - "volatility": ["bbands","atr_adx","volume"] - "scalper": ["stoch_rsi","macd","volume","adl"] - "intraday": ["ema","vwap","adl","volume"] Choose a preset with `profile` or pass `include` explicitly. Merges `indicator_params` into per-indicator calls. """ indicator_params = indicator_params or {} include_list = list(include) if include else PROFILES.get(profile, PROFILES["default"]) out: Dict[str, Any] = {"coin": name, "interval": interval} last_ts: List[str] = [] def _run(ind_name: str, fn) -> None: params = dict( name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet, include_signals=include_signals, exclude_current_bar=exclude_current_bar ) params.update(indicator_params.get(ind_name, {})) df = fn(**params) if isinstance(df, pd.DataFrame): recs = df.to_dict(orient="records") out[ind_name] = recs[-1] if (return_last_only and recs) else recs if len(recs): last_ts.append(recs[-1].get("utc_timestamp")) else: out[ind_name] = df[-1] if (return_last_only and isinstance(df, list) and df) else df mapping = { "ema": hi.get_ema, "rsi": hi.get_rsi, "stoch_rsi": hi.get_stoch_rsi, "macd": hi.get_macd, "bbands": hi.get_bbands, "atr_adx": hi.get_atr_adx, "obv": hi.get_obv, "adl": hi.get_adl, "vwap": hi.get_vwap, "volume": hi.get_volume, } for ind in include_list: if ind not in mapping: continue _run(ind, mapping[ind]) ts_clean = [t for t in last_ts if t] out["asof_utc"] = max(ts_clean) if ts_clean else None out["profile"] = profile out["included"] = include_list return out @mcp.tool() def ema( name: str, interval: Interval = "1h", lookback_period: int = 6, limit: int = 600, testnet: bool = False, include_signals: bool = True, exclude_current_bar: bool = True, ) -> List[Dict[str, Any]]: """EMA-9/20/200 with optional crossovers & trend. Excludes the current forming bar if exclude_current_bar=True. Returns last `lookback_period` rows with utc_timestamp, close, ema_9/20/200, and when include_signals=True: ema relationships, crossover labels, trend. """ df = hi.get_ema( name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet, include_signals=include_signals, exclude_current_bar=exclude_current_bar ) return _df_to_records(df) @mcp.tool() def rsi( name: str, interval: Interval = "1h", lookback_period: int = 6, limit: int = 600, testnet: bool = False, period: int = 14, include_signals: bool = True, exclude_current_bar: bool = True, ) -> List[Dict[str, Any]]: """RSI(14) with optional state/cross/slope signals. Excludes the current forming bar if exclude_current_bar=True. Returns utc_timestamp, close, rsi; plus overbought/oversold, 50-line cross, and slope when include_signals=True. """ df = hi.get_rsi( name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet, period=period, include_signals=include_signals, exclude_current_bar=exclude_current_bar ) return _df_to_records(df) @mcp.tool() def stoch_rsi( name: str, interval: Interval = "1h", lookback_period: int = 6, limit: int = 600, testnet: bool = False, rsi_length: int = 14, stoch_length: int = 14, k_smooth: int = 3, d_smooth: int = 3, include_signals: bool = True, exclude_current_bar: bool = True, ) -> List[Dict[str, Any]]: """StochRSI(14,14,3,3) with zone/cross/bias signals. Uses RSI normalization over `stoch_length`; smooths to %K/%D. Returns utc_timestamp, close, stoch_rsi, %K, %D; plus zone, cross, bias when include_signals=True. Excludes current bar if exclude_current_bar=True. """ df = hi.get_stoch_rsi( name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet, rsi_length=rsi_length, stoch_length=stoch_length, k_smooth=k_smooth, d_smooth=d_smooth, include_signals=include_signals, exclude_current_bar=exclude_current_bar ) return _df_to_records(df) @mcp.tool() def macd( name: str, interval: Interval = "1h", lookback_period: int = 6, limit: int = 600, testnet: bool = False, fast: int = 12, slow: int = 26, signal_len: int = 9, include_signals: bool = True, exclude_current_bar: bool = True, ) -> List[Dict[str, Any]]: """MACD(fast, slow, signal_len) with cross/zero/hist trend. Returns utc_timestamp, close, macd, signal, hist; plus cross labels, above_zero flag, and histogram slope when include_signals=True. Excludes current bar if exclude_current_bar=True. """ df = hi.get_macd( name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet, fast=fast, slow=slow, signal=signal_len, include_signals=include_signals, exclude_current_bar=exclude_current_bar ) return _df_to_records(df) @mcp.tool() def bbands( name: str, interval: Interval = "1h", lookback_period: int = 6, limit: int = 600, testnet: bool = False, period: int = 20, std_mult: float = 2.0, include_signals: bool = True, squeeze_lookback: int = 180, exclude_current_bar: bool = True, ) -> List[Dict[str, Any]]: """Bollinger Bands with squeeze/touch flags. Returns utc_timestamp, close, basis/upper/lower, %b, bandwidth; plus squeeze_percentile, in_squeeze (bottom 10%), touches, and above_basis when include_signals=True. Excludes current bar if exclude_current_bar=True. """ df = hi.get_bbands( name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet, period=period, std_mult=std_mult, include_signals=include_signals, squeeze_lookback=squeeze_lookback, exclude_current_bar=exclude_current_bar ) return _df_to_records(df) @mcp.tool() def atr_adx( name: str, interval: Interval = "1h", lookback_period: int = 6, limit: int = 600, testnet: bool = False, period: int = 14, include_signals: bool = True, adx_threshold: float = 25.0, exclude_current_bar: bool = True, ) -> List[Dict[str, Any]]: """+DI/−DI, ADX, and ATR with direction/strength flags. Returns utc_timestamp, +DI, −DI, ADX, ATR; plus direction (up/down) and trend_ok (ADX ≥ threshold) when include_signals=True. Excludes current bar if exclude_current_bar=True. """ df = hi.get_atr_adx( name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet, period=period, include_signals=include_signals, adx_threshold=adx_threshold, exclude_current_bar=exclude_current_bar ) return _df_to_records(df) @mcp.tool() def obv( name: str, interval: Interval = "1h", lookback_period: int = 6, limit: int = 600, testnet: bool = False, include_signals: bool = True, slope_lookback: int = 20, exclude_current_bar: bool = True, ) -> List[Dict[str, Any]]: """On-Balance Volume with slope/trend. Returns utc_timestamp, obv; plus slope metrics and obv_trend (up/down/flat) when include_signals=True. Excludes current bar if exclude_current_bar=True. """ df = hi.get_obv( name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet, include_signals=include_signals, slope_lookback=slope_lookback, exclude_current_bar=exclude_current_bar ) return _df_to_records(df) @mcp.tool() def adl( name: str, interval: Interval = "1h", lookback_period: int = 6, limit: int = 600, testnet: bool = False, include_signals: bool = True, exclude_current_bar: bool = True, ) -> List[Dict[str, Any]]: """Accumulation/Distribution Line with change/direction. Returns utc_timestamp, adl; plus adl_change, adl_change_pct, and adl_direction (up/down/flat) when include_signals=True. Excludes current bar if exclude_current_bar=True. """ df = hi.get_adl( name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet, include_signals=include_signals, exclude_current_bar=exclude_current_bar ) return _df_to_records(df) @mcp.tool() def vwap( name: str, interval: Interval = "1h", lookback_period: int = 6, limit: int = 600, testnet: bool = False, daily_reset: bool = False, include_signals: bool = True, exclude_current_bar: bool = True, ) -> List[Dict[str, Any]]: """VWAP (cumulative or daily) with distance flags. Returns utc_timestamp, close, vwap; plus dist_to_vwap_pct and above_vwap when include_signals=True. Excludes current bar if exclude_current_bar=True. """ df = hi.get_vwap( name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet, daily_reset=daily_reset, include_signals=include_signals, exclude_current_bar=exclude_current_bar ) return _df_to_records(df) @mcp.tool() def volume( name: str, interval: Interval = "1h", lookback_period: int = 6, limit: int = 600, testnet: bool = False, include_signals: bool = True, exclude_current_bar: bool = True, high_ratio: float = 1.5, low_ratio: float = 0.7, z_lookback: int = 100, z_hi: float = 2.0, z_lo: float = -2.0, ) -> List[Dict[str, Any]]: """Volume features with avg-24, ratio, change, z-score, and signal. Returns utc_timestamp, volume, avg_past_24_session_volume; plus volume_ratio, volume_change, zscore_volume, and volume_signal when include_signals=True. Excludes current bar if exclude_current_bar=True. """ df = hi.get_volume( name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet, include_signals=include_signals, exclude_current_bar=exclude_current_bar, high_ratio=high_ratio, low_ratio=low_ratio, z_lookback=z_lookback, z_hi=z_hi, z_lo=z_lo ) return _df_to_records(df) # ------------------ Entry point ------------------ # if __name__ == "__main__": # Run as a standalone process (stdio can conflict with notebooks). mcp.run(transport="stdio")