trading_agent_v2 / hl_indicators_server.py
samsonleegh's picture
Update hl_indicators_server.py
3fa618d verified
from __future__ import annotations
from typing import Dict, Any, List, Literal, Iterable, Optional
import numpy as np
import pandas as pd
from mcp.server.fastmcp import FastMCP
import hl_indicators as hi # your indicators module
Interval = Literal["1m", "5m", "15m", "1h", "4h", "1d"]
mcp = FastMCP("hl_indicators_server")
# ------------------ Health check ------------------ #
@mcp.tool()
async def ping() -> str:
"""Health check for MCP server.
Returns "pong" if the server is up and responding.
"""
return "pong"
# ------------------ Helpers ------------------ #
def _df_to_records(df: pd.DataFrame) -> List[Dict[str, Any]]:
"""Convert a pandas DataFrame to JSON records.
Falls back to returning the object unchanged if already list[dict].
"""
if isinstance(df, pd.DataFrame):
return df.to_dict(orient="records")
return df # type: ignore[return-value]
# ------------------ Indicator tools ------------------ #
# ------------------ Bundle tool ------------------ #
PROFILES: Dict[str, List[str]] = {
"default": ["ema","stoch_rsi","macd","adl","volume","atr_adx","bbands"],
"trend": ["ema","macd","atr_adx","adl","volume"],
"momentum": ["rsi","stoch_rsi","macd","volume"],
"volatility": ["bbands","atr_adx","volume"],
"scalper": ["stoch_rsi","macd","volume","adl"],
"intraday": ["ema","vwap","adl","volume"],
}
@mcp.tool()
def bundle(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
include: Optional[Iterable[str]] = None,
profile: str = "default",
include_signals: bool = True,
exclude_current_bar: bool = True,
return_last_only: bool = False,
indicator_params: Optional[Dict[str, Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""Bundle multiple indicators into one JSON payload.
Profiles available:
- "default": ["ema","stoch_rsi","macd","adl","volume","atr_adx","bbands"]
- "trend": ["ema","macd","atr_adx","adl","volume"]
- "momentum": ["rsi","stoch_rsi","macd","volume"]
- "volatility": ["bbands","atr_adx","volume"]
- "scalper": ["stoch_rsi","macd","volume","adl"]
- "intraday": ["ema","vwap","adl","volume"]
Choose a preset with `profile` or pass `include` explicitly. Merges
`indicator_params` into per-indicator calls.
"""
indicator_params = indicator_params or {}
include_list = list(include) if include else PROFILES.get(profile, PROFILES["default"])
out: Dict[str, Any] = {"coin": name, "interval": interval}
last_ts: List[str] = []
def _run(ind_name: str, fn) -> None:
params = dict(
name=name, interval=interval, lookback_period=lookback_period, limit=limit,
testnet=testnet, include_signals=include_signals, exclude_current_bar=exclude_current_bar
)
params.update(indicator_params.get(ind_name, {}))
df = fn(**params)
if isinstance(df, pd.DataFrame):
recs = df.to_dict(orient="records")
out[ind_name] = recs[-1] if (return_last_only and recs) else recs
if len(recs):
last_ts.append(recs[-1].get("utc_timestamp"))
else:
out[ind_name] = df[-1] if (return_last_only and isinstance(df, list) and df) else df
mapping = {
"ema": hi.get_ema,
"rsi": hi.get_rsi,
"stoch_rsi": hi.get_stoch_rsi,
"macd": hi.get_macd,
"bbands": hi.get_bbands,
"atr_adx": hi.get_atr_adx,
"obv": hi.get_obv,
"adl": hi.get_adl,
"vwap": hi.get_vwap,
"volume": hi.get_volume,
}
for ind in include_list:
if ind not in mapping:
continue
_run(ind, mapping[ind])
ts_clean = [t for t in last_ts if t]
out["asof_utc"] = max(ts_clean) if ts_clean else None
out["profile"] = profile
out["included"] = include_list
return out
@mcp.tool()
def ema(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
include_signals: bool = True,
exclude_current_bar: bool = True,
) -> List[Dict[str, Any]]:
"""EMA-9/20/200 with optional crossovers & trend.
Excludes the current forming bar if exclude_current_bar=True.
Returns last `lookback_period` rows with utc_timestamp, close, ema_9/20/200,
and when include_signals=True: ema relationships, crossover labels, trend.
"""
df = hi.get_ema(
name=name, interval=interval, lookback_period=lookback_period, limit=limit,
testnet=testnet, include_signals=include_signals, exclude_current_bar=exclude_current_bar
)
return _df_to_records(df)
@mcp.tool()
def rsi(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
period: int = 14,
include_signals: bool = True,
exclude_current_bar: bool = True,
) -> List[Dict[str, Any]]:
"""RSI(14) with optional state/cross/slope signals.
Excludes the current forming bar if exclude_current_bar=True.
Returns utc_timestamp, close, rsi; plus overbought/oversold, 50-line cross,
and slope when include_signals=True.
"""
df = hi.get_rsi(
name=name, interval=interval, lookback_period=lookback_period, limit=limit,
testnet=testnet, period=period, include_signals=include_signals,
exclude_current_bar=exclude_current_bar
)
return _df_to_records(df)
@mcp.tool()
def stoch_rsi(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
rsi_length: int = 14,
stoch_length: int = 14,
k_smooth: int = 3,
d_smooth: int = 3,
include_signals: bool = True,
exclude_current_bar: bool = True,
) -> List[Dict[str, Any]]:
"""StochRSI(14,14,3,3) with zone/cross/bias signals.
Uses RSI normalization over `stoch_length`; smooths to %K/%D.
Returns utc_timestamp, close, stoch_rsi, %K, %D; plus zone, cross, bias when
include_signals=True. Excludes current bar if exclude_current_bar=True.
"""
df = hi.get_stoch_rsi(
name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet,
rsi_length=rsi_length, stoch_length=stoch_length, k_smooth=k_smooth, d_smooth=d_smooth,
include_signals=include_signals, exclude_current_bar=exclude_current_bar
)
return _df_to_records(df)
@mcp.tool()
def macd(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
fast: int = 12,
slow: int = 26,
signal_len: int = 9,
include_signals: bool = True,
exclude_current_bar: bool = True,
) -> List[Dict[str, Any]]:
"""MACD(fast, slow, signal_len) with cross/zero/hist trend.
Returns utc_timestamp, close, macd, signal, hist; plus cross labels,
above_zero flag, and histogram slope when include_signals=True.
Excludes current bar if exclude_current_bar=True.
"""
df = hi.get_macd(
name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet,
fast=fast, slow=slow, signal=signal_len,
include_signals=include_signals, exclude_current_bar=exclude_current_bar
)
return _df_to_records(df)
@mcp.tool()
def bbands(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
period: int = 20,
std_mult: float = 2.0,
include_signals: bool = True,
squeeze_lookback: int = 180,
exclude_current_bar: bool = True,
) -> List[Dict[str, Any]]:
"""Bollinger Bands with squeeze/touch flags.
Returns utc_timestamp, close, basis/upper/lower, %b, bandwidth; plus
squeeze_percentile, in_squeeze (bottom 10%), touches, and above_basis when
include_signals=True. Excludes current bar if exclude_current_bar=True.
"""
df = hi.get_bbands(
name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet,
period=period, std_mult=std_mult, include_signals=include_signals,
squeeze_lookback=squeeze_lookback, exclude_current_bar=exclude_current_bar
)
return _df_to_records(df)
@mcp.tool()
def atr_adx(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
period: int = 14,
include_signals: bool = True,
adx_threshold: float = 25.0,
exclude_current_bar: bool = True,
) -> List[Dict[str, Any]]:
"""+DI/−DI, ADX, and ATR with direction/strength flags.
Returns utc_timestamp, +DI, −DI, ADX, ATR; plus direction (up/down) and
trend_ok (ADX ≥ threshold) when include_signals=True.
Excludes current bar if exclude_current_bar=True.
"""
df = hi.get_atr_adx(
name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet,
period=period, include_signals=include_signals, adx_threshold=adx_threshold,
exclude_current_bar=exclude_current_bar
)
return _df_to_records(df)
@mcp.tool()
def obv(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
include_signals: bool = True,
slope_lookback: int = 20,
exclude_current_bar: bool = True,
) -> List[Dict[str, Any]]:
"""On-Balance Volume with slope/trend.
Returns utc_timestamp, obv; plus slope metrics and obv_trend (up/down/flat)
when include_signals=True. Excludes current bar if exclude_current_bar=True.
"""
df = hi.get_obv(
name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet,
include_signals=include_signals, slope_lookback=slope_lookback, exclude_current_bar=exclude_current_bar
)
return _df_to_records(df)
@mcp.tool()
def adl(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
include_signals: bool = True,
exclude_current_bar: bool = True,
) -> List[Dict[str, Any]]:
"""Accumulation/Distribution Line with change/direction.
Returns utc_timestamp, adl; plus adl_change, adl_change_pct, and
adl_direction (up/down/flat) when include_signals=True.
Excludes current bar if exclude_current_bar=True.
"""
df = hi.get_adl(
name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet,
include_signals=include_signals, exclude_current_bar=exclude_current_bar
)
return _df_to_records(df)
@mcp.tool()
def vwap(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
daily_reset: bool = False,
include_signals: bool = True,
exclude_current_bar: bool = True,
) -> List[Dict[str, Any]]:
"""VWAP (cumulative or daily) with distance flags.
Returns utc_timestamp, close, vwap; plus dist_to_vwap_pct and above_vwap
when include_signals=True. Excludes current bar if exclude_current_bar=True.
"""
df = hi.get_vwap(
name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet,
daily_reset=daily_reset, include_signals=include_signals, exclude_current_bar=exclude_current_bar
)
return _df_to_records(df)
@mcp.tool()
def volume(
name: str,
interval: Interval = "1h",
lookback_period: int = 6,
limit: int = 600,
testnet: bool = False,
include_signals: bool = True,
exclude_current_bar: bool = True,
high_ratio: float = 1.5,
low_ratio: float = 0.7,
z_lookback: int = 100,
z_hi: float = 2.0,
z_lo: float = -2.0,
) -> List[Dict[str, Any]]:
"""Volume features with avg-24, ratio, change, z-score, and signal.
Returns utc_timestamp, volume, avg_past_24_session_volume; plus
volume_ratio, volume_change, zscore_volume, and volume_signal
when include_signals=True. Excludes current bar if exclude_current_bar=True.
"""
df = hi.get_volume(
name=name, interval=interval, lookback_period=lookback_period, limit=limit, testnet=testnet,
include_signals=include_signals, exclude_current_bar=exclude_current_bar,
high_ratio=high_ratio, low_ratio=low_ratio, z_lookback=z_lookback, z_hi=z_hi, z_lo=z_lo
)
return _df_to_records(df)
# ------------------ Entry point ------------------ #
if __name__ == "__main__":
# Run as a standalone process (stdio can conflict with notebooks).
mcp.run(transport="stdio")