SuperAI_Forecast / backend /analysis_engine.py
Thang6822's picture
Upload folder using huggingface_hub
7d63c70 verified
from __future__ import annotations
import math
import re
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
AI_SCORING_VERSION = "2026-05-05-path-direction-v2"
TECHNICAL_SCORING_VERSION = "2026-05-18-performance-votes-v12"
ANALYSIS_PRESENTATION_VERSION = "2026-05-16-gauge-view-v2"
EMA_PAIR_SEQUENCE: List[Tuple[int, int]] = [
(1, 2),
(2, 5),
(5, 10),
(10, 20),
(20, 50),
(50, 100),
(100, 200),
]
EMA_PAIR_THRESHOLD_PCT: Dict[str, float] = {
"ema_1_2": 0.05,
"ema_2_5": 0.08,
"ema_5_10": 0.10,
"ema_10_20": 0.14,
"ema_20_50": 0.22,
"ema_50_100": 0.35,
"ema_100_200": 0.55,
}
ADX_GAUGE_PERIOD = 10
PERFORMANCE_LOOKBACKS: Tuple[int, ...] = (1, 2, 3, 5, 7, 30, 90, 180, 365)
PERFORMANCE_VOTE_WEIGHTS: Dict[int, float] = {
2: 2.0,
}
PERFORMANCE_NEUTRAL_EPSILON_PCT = 1e-9
__all__ = [
"AI_SCORING_VERSION",
"ANALYSIS_PRESENTATION_VERSION",
"TECHNICAL_SCORING_VERSION",
"_apply_forecast_amplitude_calibration",
"_build_analysis_presentation",
"_apply_forecast_path_texture",
"_build_dashboard_payload",
"_build_raw_ohlc4_bundle",
"_build_regime_texture_template",
"_build_trade_analysis",
"_calc_ai_forecast_score",
"_calc_ma_score",
"_calc_performance_score",
"_calc_price_action_levels",
"_calc_pivot_points",
"_calc_summary_score_v2",
"_calc_technical_score_v2",
"_calc_vote_gauge",
"_clamp",
"_derive_target_amplitude_profile",
"_forecast_path_metrics",
"_gauge_to_signal",
"_momentum",
"_osc_action",
"_pct",
"_score_forecast_context_candidate",
"compute_indicators",
]
def _ema(arr: np.ndarray, period: int) -> np.ndarray:
"""Vectorized EMA using NumPy (replaces loops)."""
if len(arr) == 0: return np.array([], dtype=float)
alpha = 2.0 / (period + 1.0)
# Use pandas ewm for robust vectorized calculation (v6.0)
return pd.Series(arr).ewm(alpha=alpha, adjust=False).mean().values
def _rsi(close: np.ndarray, period: int = 14) -> np.ndarray:
"""Vectorized RSI using NumPy/Pandas."""
delta = np.diff(close)
gain = np.where(delta > 0, delta, 0.0)
loss = np.where(delta < 0, -delta, 0.0)
avg_gain = pd.Series(gain).ewm(alpha=1.0/period, adjust=False).mean()
avg_loss = pd.Series(loss).ewm(alpha=1.0/period, adjust=False).mean()
rs = avg_gain / avg_loss.replace(0, np.nan)
rsi = 100 - (100 / (1 + rs))
rsi = rsi.where(avg_loss > 0, 100.0)
rsi = rsi.where(avg_gain > 0, 0.0)
rsi = rsi.where(~((avg_gain == 0) & (avg_loss == 0)), 50.0)
# Prepend NaN to match original array length
return np.concatenate([[np.nan], rsi.values])
def _bollinger(close: np.ndarray, period=20, k=2.0) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Vectorized Bollinger Bands."""
s = pd.Series(close)
mid = s.rolling(window=period).mean()
std = s.rolling(window=period).std(ddof=0)
return (mid + k*std).values, mid.values, (mid - k*std).values
def _macd(close: np.ndarray, fast=12, slow=26, signal=9
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
ema_fast = _ema(close, fast)
ema_slow = _ema(close, slow)
macd_line = ema_fast - ema_slow
sig_line = _ema(np.where(np.isnan(macd_line), 0, macd_line), signal)
histogram = macd_line - sig_line
return macd_line, sig_line, histogram
def _atr(high: np.ndarray, low: np.ndarray, close: np.ndarray, period=14) -> np.ndarray:
"""Vectorized ATR."""
tr = np.maximum(high[1:] - low[1:],
np.maximum(np.abs(high[1:] - close[:-1]),
np.abs(low[1:] - close[:-1])))
tr = np.concatenate([[np.nan], tr])
return pd.Series(tr).ewm(alpha=1.0/period, adjust=False).mean().values
def _stoch_rsi(close: np.ndarray, rsi_period=14, stoch_period=14,
smooth_k=3, smooth_d=3) -> Tuple[np.ndarray, np.ndarray]:
"""Vectorized Stochastic RSI."""
rsi_vals = pd.Series(_rsi(close, rsi_period))
roll_min = rsi_vals.rolling(window=stoch_period).min()
roll_max = rsi_vals.rolling(window=stoch_period).max()
k = 100 * (rsi_vals - roll_min) / (roll_max - roll_min).replace(0, np.inf)
k_smooth = k.rolling(window=smooth_k).mean()
d_smooth = k_smooth.rolling(window=smooth_d).mean()
return k_smooth.values, d_smooth.values
def _stoch_kd(
high: np.ndarray,
low: np.ndarray,
close: np.ndarray,
k_period: int = 14,
smooth_k: int = 3,
d_period: int = 3,
) -> Tuple[np.ndarray, np.ndarray]:
"""Standard stochastic oscillator computed from price high/low/close."""
high_s = pd.Series(high)
low_s = pd.Series(low)
close_s = pd.Series(close)
hh = high_s.rolling(window=k_period).max()
ll = low_s.rolling(window=k_period).min()
raw_k = 100.0 * (close_s - ll) / (hh - ll).replace(0, np.nan)
k = raw_k.rolling(window=smooth_k).mean()
d = k.rolling(window=d_period).mean()
return k.values, d.values
def _sma(arr: np.ndarray, period: int) -> np.ndarray:
"""Vectorized Simple Moving Average."""
if len(arr) == 0: return np.array([], dtype=float)
return pd.Series(arr).rolling(window=period).mean().values
def _cci(high: np.ndarray, low: np.ndarray, close: np.ndarray, period: int = 20) -> np.ndarray:
"""Vectorized Commodity Channel Index."""
tp = (high + low + close) / 3.0
tp_series = pd.Series(tp)
sma = tp_series.rolling(window=period).mean()
# Optimized MAD calculation (Vectorized)
mad = tp_series.rolling(window=period).apply(lambda x: np.abs(x - x.mean()).mean(), raw=True)
return ((tp_series - sma) / (0.015 * mad.replace(0, np.inf))).values
def _adx(high: np.ndarray, low: np.ndarray, close: np.ndarray, period: int = 14) -> tuple:
"""Vectorized Average Directional Index (v6.0)."""
plus_dm = np.where((high[1:] - high[:-1] > low[:-1] - low[1:]) & (high[1:] - high[:-1] > 0), high[1:] - high[:-1], 0.0)
minus_dm = np.where((low[:-1] - low[1:] > high[1:] - high[:-1]) & (low[:-1] - low[1:] > 0), low[:-1] - low[1:], 0.0)
tr = np.maximum(high[1:] - low[1:], np.maximum(np.abs(high[1:] - close[:-1]), np.abs(low[1:] - close[:-1])))
# Pad first index
plus_dm = np.concatenate([[0.0], plus_dm])
minus_dm = np.concatenate([[0.0], minus_dm])
tr = np.concatenate([[0.0], tr])
tr_sum = pd.Series(tr).ewm(alpha=1.0/period, adjust=False).mean()
plus_di = 100 * pd.Series(plus_dm).ewm(alpha=1.0/period, adjust=False).mean() / tr_sum.replace(0, np.inf)
minus_di = 100 * pd.Series(minus_dm).ewm(alpha=1.0/period, adjust=False).mean() / tr_sum.replace(0, np.inf)
dx = 100 * np.abs(plus_di - minus_di) / (plus_di + minus_di).replace(0, np.inf)
adx = dx.ewm(alpha=1.0/period, adjust=False).mean()
return adx.values, plus_di.values, minus_di.values
def _awesome_oscillator(high: np.ndarray, low: np.ndarray) -> np.ndarray:
"""Awesome Oscillator = SMA(5, median) - SMA(34, median)."""
median = (high + low) / 2.0
sma5 = _sma(median, 5)
sma34 = _sma(median, 34)
return sma5 - sma34
def _momentum(close: np.ndarray, period: int = 10) -> np.ndarray:
"""Classic momentum oscillator centered at 0."""
base = pd.Series(close).shift(period)
return (pd.Series(close) - base).values
def _roc(close: np.ndarray, period: int = 12) -> np.ndarray:
"""Rate of Change in percentage."""
base = pd.Series(close).shift(period)
return (pd.Series(close) / base.replace(0, np.nan) - 1.0).mul(100.0).values
def _trix(close: np.ndarray, period: int = 18) -> np.ndarray:
"""Triple EMA oscillator in percentage."""
ema1 = pd.Series(_ema(close, period))
ema2 = ema1.ewm(span=period, adjust=False).mean()
ema3 = ema2.ewm(span=period, adjust=False).mean()
return ema3.pct_change().mul(100.0).values
def _ppo(close: np.ndarray, fast: int = 12, slow: int = 26) -> np.ndarray:
"""Percentage Price Oscillator."""
ema_fast = _ema(close, fast)
ema_slow = _ema(close, slow)
return ((ema_fast - ema_slow) / np.where(np.abs(ema_slow) < 1e-8, np.nan, ema_slow) * 100.0)
def _cmo(close: np.ndarray, period: int = 14) -> np.ndarray:
"""Chande Momentum Oscillator."""
delta = pd.Series(close).diff()
up = delta.clip(lower=0.0).rolling(period).sum()
down = (-delta.clip(upper=0.0)).rolling(period).sum()
return ((up - down) / (up + down).replace(0, np.nan) * 100.0).values
def _dpo(close: np.ndarray, period: int = 20) -> np.ndarray:
"""Detrended Price Oscillator."""
offset = int(period / 2) + 1
sma = pd.Series(close).rolling(period).mean()
return (pd.Series(close) - sma.shift(offset)).values
def _aroon_oscillator(high: np.ndarray, low: np.ndarray, period: int = 25) -> np.ndarray:
"""Aroon Oscillator = Aroon Up - Aroon Down."""
hs = pd.Series(high)
ls = pd.Series(low)
aroon_up = hs.rolling(period).apply(lambda x: ((period - 1 - (len(x) - 1 - int(np.argmax(x)))) / (period - 1)) * 100.0, raw=True)
aroon_down = ls.rolling(period).apply(lambda x: ((period - 1 - (len(x) - 1 - int(np.argmin(x)))) / (period - 1)) * 100.0, raw=True)
return (aroon_up - aroon_down).values
def _tsi(close: np.ndarray, long_period: int = 25, short_period: int = 13) -> np.ndarray:
"""True Strength Index."""
delta = pd.Series(close).diff()
abs_delta = delta.abs()
ema1 = delta.ewm(span=long_period, adjust=False).mean()
ema2 = ema1.ewm(span=short_period, adjust=False).mean()
abs_ema1 = abs_delta.ewm(span=long_period, adjust=False).mean()
abs_ema2 = abs_ema1.ewm(span=short_period, adjust=False).mean()
return (ema2 / abs_ema2.replace(0, np.nan) * 100.0).values
def _demarker(high: np.ndarray, low: np.ndarray, period: int = 14) -> np.ndarray:
"""DeMarker oscillator in range 0..100."""
high_delta = pd.Series(high).diff()
low_delta = -pd.Series(low).diff()
demax = high_delta.clip(lower=0.0)
demin = low_delta.clip(lower=0.0)
demax_avg = demax.rolling(period).sum()
demin_avg = demin.rolling(period).sum()
denom = (demax_avg + demin_avg).replace(0, np.nan)
return (demax_avg / denom * 100.0).values
def _williams_r(high: np.ndarray, low: np.ndarray, close: np.ndarray,
period: int = 14) -> np.ndarray:
"""Vectorized Williams %R."""
s = pd.Series(close)
hh = pd.Series(high).rolling(window=period).max()
ll = pd.Series(low).rolling(window=period).min()
wr = -100 * (hh - s) / (hh - ll).replace(0, np.inf)
return wr.values
def _bull_bear_power(high: np.ndarray, low: np.ndarray, close: np.ndarray,
period: int = 13) -> Tuple[np.ndarray, np.ndarray]:
"""Return Bull Power and Bear Power separately for sign-based scoring."""
ema_val = _ema(close, period)
return (high - ema_val), (low - ema_val)
def _ultimate_oscillator(high: np.ndarray, low: np.ndarray, close: np.ndarray,
p1: int = 7, p2: int = 14, p3: int = 28) -> np.ndarray:
"""Vectorized Ultimate Oscillator (v6.0)."""
n = len(close)
if n < p3 + 1: return np.full(n, np.nan)
prev_close = pd.Series(close).shift(1)
tl = np.minimum(low, prev_close.values)
th = np.maximum(high, prev_close.values)
bp = pd.Series(close - tl)
tr = pd.Series(th - tl)
def _avg(p):
return bp.rolling(p).sum() / tr.rolling(p).sum().replace(0, np.inf)
avg1 = _avg(p1)
avg2 = _avg(p2)
avg3 = _avg(p3)
uo = 100 * (4 * avg1 + 2 * avg2 + avg3) / 7.0
return uo.values
def _ichimoku_base(high: np.ndarray, low: np.ndarray, period: int = 26) -> np.ndarray:
"""Vectorized Ichimoku Base Line."""
hh = pd.Series(high).rolling(window=period).max()
ll = pd.Series(low).rolling(window=period).min()
return ((hh + ll) / 2.0).values
def _ichimoku_cloud(high: np.ndarray, low: np.ndarray) -> Dict[str, np.ndarray]:
"""Basic Ichimoku lines for cloud-position scoring."""
hs = pd.Series(high)
ls = pd.Series(low)
tenkan = ((hs.rolling(9).max() + ls.rolling(9).min()) / 2.0).values
kijun = ((hs.rolling(26).max() + ls.rolling(26).min()) / 2.0).values
span_a = ((pd.Series(tenkan) + pd.Series(kijun)) / 2.0).values
span_b = ((hs.rolling(52).max() + ls.rolling(52).min()) / 2.0).values
return {
"tenkan": tenkan,
"kijun": kijun,
"span_a": span_a,
"span_b": span_b,
}
def _vwma(close: np.ndarray, volume: np.ndarray, period: int = 20) -> np.ndarray:
"""Vectorized Volume Weighted Moving Average."""
cv = pd.Series(close * volume)
v = pd.Series(volume)
return (cv.rolling(period).sum() / v.rolling(period).sum().replace(0, np.inf)).values
def _wma(arr: np.ndarray, period: int) -> np.ndarray:
"""Weighted moving average used by Hull MA."""
if len(arr) == 0:
return np.array([], dtype=float)
weights = np.arange(1, period + 1, dtype=float)
return pd.Series(arr).rolling(window=period).apply(
lambda x: float(np.dot(x, weights) / weights.sum()),
raw=True,
).values
def _hull_ma(close: np.ndarray, period: int = 9) -> np.ndarray:
"""Hull Moving Average."""
half = max(period // 2, 1)
sqrt_p = max(int(math.sqrt(period)), 1)
wma_half = _wma(close, half)
wma_full = _wma(close, period)
diff = 2 * wma_half - wma_full
hull = _wma(np.where(np.isnan(diff), close, diff), sqrt_p)
return hull
def _price_action_config(interval: str) -> Tuple[int, int]:
config_map = {
"1m": (2, 120),
"5m": (2, 140),
"15m": (3, 180),
"1h": (4, 260),
"4h": (4, 320),
"1d": (5, 360),
"1w": (5, 260),
}
return config_map.get(interval, (4, 240))
def _cluster_price_action_candidates(
candidates: List[Tuple[float, float, int]],
tolerance: float,
) -> List[Dict[str, Any]]:
clusters: List[Dict[str, Any]] = []
for price, weight, candle_index in sorted(candidates, key=lambda item: item[0]):
cluster = next(
(item for item in clusters if abs(item["price"] - price) <= tolerance),
None,
)
if cluster is None:
clusters.append(
{
"price": float(price),
"weight": float(weight),
"touches": 1,
"last_index": int(candle_index),
}
)
continue
total_weight = float(cluster["weight"]) + float(weight)
cluster["price"] = (
(float(cluster["price"]) * float(cluster["weight"])) + (float(price) * float(weight))
) / max(total_weight, 1e-8)
cluster["weight"] = total_weight
cluster["touches"] = int(cluster["touches"]) + 1
cluster["last_index"] = max(int(cluster["last_index"]), int(candle_index))
return clusters
def _dedupe_sorted_levels(
levels: List[Dict[str, Any]],
*,
tolerance: float,
) -> List[Dict[str, Any]]:
deduped: List[Dict[str, Any]] = []
for level in levels:
if deduped and abs(float(deduped[-1]["price"]) - float(level["price"])) <= tolerance:
continue
deduped.append(level)
return deduped
def _calc_price_action_levels(
data: List[Dict[str, Any]],
current_price: float,
interval: str,
) -> Dict[str, Any]:
if not data:
rounded_price = round(float(current_price), 6)
return {
"method": "price_action",
"timeframe": interval,
"current_price": rounded_price,
"summary": "Khong du du lieu de xac dinh cac moc khang cu va ho tro theo Price Action.",
"data": [
{"level": "KC2", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)},
{"level": "KC1", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)},
{"level": "GiΓ‘ hiện tαΊ‘i", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)},
{"level": "HT1", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)},
{"level": "HT2", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)},
],
}
highs = np.array([float(row["high"]) for row in data], dtype=float)
lows = np.array([float(row["low"]) for row in data], dtype=float)
closes = np.array([float(row["close"]) for row in data], dtype=float)
pivot_window, lookback = _price_action_config(interval)
recent_start = max(0, len(closes) - min(len(closes), lookback))
recent_highs = highs[recent_start:]
recent_lows = lows[recent_start:]
recent_closes = closes[recent_start:]
atr_value = _latest_finite_value(_atr(recent_highs, recent_lows, recent_closes, period=14))
median_range = float(np.nanmedian(recent_highs - recent_lows)) if len(recent_highs) else 0.0
cluster_tolerance = max(
float(atr_value or 0.0) * 0.35,
median_range * 0.22,
abs(float(current_price)) * 0.0015,
1e-6,
)
extension_step = max(
float(atr_value or 0.0),
median_range,
abs(float(current_price)) * 0.003,
cluster_tolerance * 1.5,
)
resistance_candidates: List[Tuple[float, float, int]] = []
support_candidates: List[Tuple[float, float, int]] = []
recent_count = len(recent_closes)
for idx in range(pivot_window, max(pivot_window, recent_count - pivot_window)):
high_slice = recent_highs[idx - pivot_window : idx + pivot_window + 1]
low_slice = recent_lows[idx - pivot_window : idx + pivot_window + 1]
high_price = float(recent_highs[idx])
low_price = float(recent_lows[idx])
recency_weight = 1.0 + (idx / max(recent_count, 1))
if high_price >= float(np.max(high_slice)):
resistance_candidates.append((high_price, recency_weight, recent_start + idx))
if low_price <= float(np.min(low_slice)):
support_candidates.append((low_price, recency_weight, recent_start + idx))
fallback_spans = [10, 20, 50, 100, min(recent_count, lookback)]
for span in fallback_spans:
if span <= 0 or recent_count < span:
continue
resistance_candidates.append((float(np.max(recent_highs[-span:])), 0.75, len(data) - 1))
support_candidates.append((float(np.min(recent_lows[-span:])), 0.75, len(data) - 1))
resistance_clusters = _cluster_price_action_candidates(resistance_candidates, cluster_tolerance)
support_clusters = _cluster_price_action_candidates(support_candidates, cluster_tolerance)
resistance_levels = _dedupe_sorted_levels(
sorted(
[
cluster
for cluster in resistance_clusters
if float(cluster["price"]) > float(current_price) + (cluster_tolerance * 0.12)
],
key=lambda item: float(item["price"]),
),
tolerance=cluster_tolerance * 0.55,
)
support_levels = _dedupe_sorted_levels(
sorted(
[
cluster
for cluster in support_clusters
if float(cluster["price"]) < float(current_price) - (cluster_tolerance * 0.12)
],
key=lambda item: float(item["price"]),
reverse=True,
),
tolerance=cluster_tolerance * 0.55,
)
while len(resistance_levels) < 2:
base_price = float(resistance_levels[-1]["price"]) if resistance_levels else float(current_price)
resistance_levels.append(
{
"price": base_price + extension_step,
"weight": 0.0,
"touches": 0,
"last_index": len(data) - 1,
}
)
while len(support_levels) < 2:
base_price = float(support_levels[-1]["price"]) if support_levels else float(current_price)
support_levels.append(
{
"price": base_price - extension_step,
"weight": 0.0,
"touches": 0,
"last_index": len(data) - 1,
}
)
kc1 = float(resistance_levels[0]["price"])
kc2 = float(resistance_levels[1]["price"])
ht1 = float(support_levels[0]["price"])
ht2 = float(support_levels[1]["price"])
if current_price >= kc1:
summary = (
"Gia hien tai dang ap sat hoac vuot len tren KC1. "
"Neu duy tri duoc dong luc, KC2 la moc can theo doi tiep theo."
)
elif current_price <= ht1:
summary = (
"Gia hien tai dang ap sat hoac nam duoi HT1. "
"Neu ap luc ban tiep tuc, HT2 la vung ho tro can quan sat them."
)
else:
summary = (
"Gia hien tai dang nam giua HT1 va KC1. "
"Day la vung can theo doi phan ung gia truoc khi xac nhan huong di tiep theo."
)
def _row(level: str, price: float) -> Dict[str, Any]:
rounded_price = round(float(price), 6)
distance_pct = 0.0 if current_price == 0 else ((rounded_price - float(current_price)) / float(current_price)) * 100.0
return {
"level": level,
"price": rounded_price,
"display_value": _format_analysis_value(rounded_price),
"distance_pct": round(distance_pct, 2),
}
rows = [
_row("KC2", kc2),
_row("KC1", kc1),
_row("GiΓ‘ hiện tαΊ‘i", float(current_price)),
_row("HT1", ht1),
_row("HT2", ht2),
]
return {
"method": "price_action",
"timeframe": interval,
"current_price": round(float(current_price), 6),
"summary": summary,
"data": rows,
}
def _calc_pivot_points(
data: List[Dict[str, Any]],
current_price: float,
interval: str,
) -> Dict[str, Any]:
"""Compatibility wrapper for old callers. New logic uses Price Action only."""
return _calc_price_action_levels(data, current_price, interval)
# ─────────────────────────────────────────────────────────────────────────────
# TradingView-style action classification helpers
# ─────────────────────────────────────────────────────────────────────────────
def _latest_finite_value(values: Any) -> Optional[float]:
"""Return the latest finite float from a scalar or array-like input without rounding."""
if values is None:
return None
if isinstance(values, (float, int)):
value = float(values)
return None if math.isnan(value) else value
if isinstance(values, pd.Series):
values = values.values
if not hasattr(values, "__len__") or len(values) == 0:
return None
value = float(values[-1])
return None if math.isnan(value) else value
def _format_analysis_value(value: Optional[float]) -> str:
"""Format a technical value for UI without losing useful precision."""
if value is None or math.isnan(float(value)):
return "Ò€”"
abs_value = abs(float(value))
if abs_value >= 100:
decimals = 2
elif abs_value >= 10:
decimals = 3
elif abs_value >= 1:
decimals = 4
elif abs_value >= 0.1:
decimals = 6
elif abs_value >= 0.01:
decimals = 7
else:
decimals = 8
return f"{float(value):.{decimals}f}".rstrip("0").rstrip(".")
def _osc_action(name: str, value: float, **kw) -> str:
"""Classify oscillator value as 'Mua' / 'BΓ‘n' / 'Trung lαΊ­p'."""
if value is None or math.isnan(value):
return "Trung lαΊ­p"
atr = max(float(kw.get("atr", 0.0) or 0.0), 1e-8)
def _bounded_action(
current: float,
min_value: float,
max_value: float,
*,
bullish_high: bool = True,
) -> str:
span = max(max_value - min_value, 1e-8)
ratio = _clamp((current - min_value) / span, 0.0, 1.0)
if not bullish_high:
ratio = 1.0 - ratio
if ratio > (2.0 / 3.0):
return "Mua"
if ratio < (1.0 / 3.0):
return "BΓ‘n"
return "Trung lαΊ­p"
def _zero_line_action(current: float, epsilon: float) -> str:
band = max(float(epsilon), 1e-8)
if current > band:
return "Mua"
if current < -band:
return "BΓ‘n"
return "Trung lαΊ­p"
if name == "rsi":
return _bounded_action(value, 0.0, 100.0)
if name in {"stoch", "stoch_rsi", "ultimate", "demarker"}:
return _bounded_action(value, 0.0, 100.0)
if name == "williams":
return _bounded_action(value, -100.0, 0.0, bullish_high=True)
if name in {"aroon", "cmo", "tsi"}:
return _bounded_action(value, -100.0, 100.0)
if name == "cci":
return "Mua" if value > 50 else "BΓ‘n" if value < -50 else "Trung lαΊ­p"
if name == "momentum":
return _zero_line_action(value, kw.get("epsilon", atr * 0.025))
if name == "roc":
return _zero_line_action(value, kw.get("epsilon", 0.01))
if name == "trix":
return _zero_line_action(value, kw.get("epsilon", 0.01))
if name == "ppo":
return _zero_line_action(value, kw.get("epsilon", 0.01))
if name == "dpo":
return _zero_line_action(value, kw.get("epsilon", atr * 0.02))
if name == "adx":
plus_di = kw.get("plus_di", 0)
minus_di = kw.get("minus_di", 0)
if value < 20:
return "Trung lαΊ­p"
return "Mua" if plus_di > minus_di else "BΓ‘n"
if name == "ao":
return _zero_line_action(value, kw.get("epsilon", atr * 0.02))
if name == "macd":
return _zero_line_action(value, kw.get("epsilon", atr * 0.02))
if name == "bbp":
bull_power = kw.get("bull_power")
bear_power = kw.get("bear_power")
epsilon = max(float(kw.get("epsilon", atr * 0.02)), 1e-8)
if bull_power is None or bear_power is None:
return "Trung lαΊ­p"
if bull_power > epsilon and bear_power > epsilon:
return "Mua"
if bull_power < -epsilon and bear_power < -epsilon:
return "BΓ‘n"
return "Trung lαΊ­p"
return "Trung lαΊ­p"
def _ema_pair_action(fast_val: Optional[float], slow_val: Optional[float], pair_key: str = "") -> str:
"""EMA chain classification using raw spread instead of rounded display values."""
if fast_val is None or slow_val is None:
return "Trung lαΊ­p"
if math.isnan(fast_val) or math.isnan(slow_val):
return "Trung lαΊ­p"
spread_pct = _pct(fast_val, slow_val)
neutral_band_pct = max(_ema_pair_threshold_pct(pair_key) * 0.08, 0.0005)
if spread_pct > neutral_band_pct:
return "Mua"
if spread_pct < -neutral_band_pct:
return "BΓ‘n"
return "Trung lαΊ­p"
def _price_vs_level_action(
price: Optional[float],
level: Optional[float],
*,
neutral_band_pct: float = 0.08,
) -> str:
"""Simple trend classification from price relative to a pure-price reference line."""
if price is None or level is None:
return "Trung lαΊ­p"
if math.isnan(price) or math.isnan(level):
return "Trung lαΊ­p"
spread_pct = _pct(price, level)
band = max(float(neutral_band_pct), 0.0005)
if spread_pct > band:
return "Mua"
if spread_pct < -band:
return "BΓ‘n"
return "Trung lαΊ­p"
def _ema_pair_threshold_pct(pair_key: str) -> float:
return EMA_PAIR_THRESHOLD_PCT.get(pair_key, 0.18)
def compute_indicators(data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Compute a full suite of technical indicators on OHLCV data."""
if len(data) < 30:
return {"error": "Insufficient data (need β‰₯30 candles)"}
closes = np.array([d["close"] for d in data], dtype=float)
highs = np.array([d["high"] for d in data], dtype=float)
lows = np.array([d["low"] for d in data], dtype=float)
vols = np.array([d["volume"]for d in data], dtype=float)
times = [d["time"] for d in data]
def _last(arr):
v = arr[-1]
return None if math.isnan(v) else round(float(v), 8)
def _series(arr, n: int = 50):
out = []
window = arr[-min(n, len(arr)):]
offset = len(arr) - len(window)
for i, v in enumerate(window):
idx = offset + i
out.append({"time": times[idx], "value": None if math.isnan(v) else round(float(v), 8)})
return out
ema9 = _ema(closes, 9)
ema21 = _ema(closes, 21)
ema50 = _ema(closes, 50)
ema200 = _ema(closes, 200)
rsi14 = _rsi(closes, 14)
macd_l, macd_s, macd_h = _macd(closes)
bb_u, bb_m, bb_l = _bollinger(closes)
atr14 = _atr(highs, lows, closes, 14)
stoch_k, stoch_d = _stoch_rsi(closes)
roc12 = _roc(closes, 12)
trix18 = _trix(closes, 18)
ppo12 = _ppo(closes, 12, 26)
cmo14 = _cmo(closes, 14)
dpo20 = _dpo(closes, 20)
aroon25 = _aroon_oscillator(highs, lows, 25)
tsi25 = _tsi(closes, 25, 13)
# Volume SMA 20 (Vectorized v6.0)
vol_sma = _sma(vols, 20)
last_close = closes[-1]
last_atr = _last(atr14) or 0
return {
"ema": {
"ema9": _last(ema9), "ema21": _last(ema21),
"ema50": _last(ema50), "ema200": _last(ema200),
},
"rsi": {
"value": _last(rsi14),
"signal": (
"overbought" if (_last(rsi14) or 50) > 70
else "oversold" if (_last(rsi14) or 50) < 30
else "neutral"
),
},
"macd": {
"macd": _last(macd_l),
"signal": _last(macd_s),
"histogram": _last(macd_h),
"cross": (
"bullish" if (_last(macd_h) or 0) > 0
else "bearish" if (_last(macd_h) or 0) < 0
else "neutral"
),
},
"bollinger": {
"upper": _last(bb_u),
"middle": _last(bb_m),
"lower": _last(bb_l),
"bandwidth": round(((_last(bb_u) or 0) - (_last(bb_l) or 0)) / ((_last(bb_m) or 1)), 6),
},
"atr": {
"value": _last(atr14),
"pct": round(last_atr / last_close * 100, 4) if last_close else None,
},
"stoch_rsi": {
"k": _last(stoch_k),
"d": _last(stoch_d),
"signal": (
"overbought" if (_last(stoch_k) or 50) > 80
else "oversold" if (_last(stoch_k) or 50) < 20
else "neutral"
),
},
"roc": {
"value": _last(roc12),
"signal": "bullish" if (_last(roc12) or 0) > 1.0 else "bearish" if (_last(roc12) or 0) < -1.0 else "neutral",
},
"trix": {
"value": _last(trix18),
"signal": "bullish" if (_last(trix18) or 0) > 0 else "bearish" if (_last(trix18) or 0) < 0 else "neutral",
},
"ppo": {
"value": _last(ppo12),
"signal": "bullish" if (_last(ppo12) or 0) > 0.35 else "bearish" if (_last(ppo12) or 0) < -0.35 else "neutral",
},
"cmo": {
"value": _last(cmo14),
"signal": "bullish" if (_last(cmo14) or 0) > 20 else "bearish" if (_last(cmo14) or 0) < -20 else "neutral",
},
"dpo": {
"value": _last(dpo20),
"signal": "bullish" if (_last(dpo20) or 0) > 0 else "bearish" if (_last(dpo20) or 0) < 0 else "neutral",
},
"aroon": {
"value": _last(aroon25),
"signal": "bullish" if (_last(aroon25) or 0) > 25 else "bearish" if (_last(aroon25) or 0) < -25 else "neutral",
},
"tsi": {
"value": _last(tsi25),
"signal": "bullish" if (_last(tsi25) or 0) > 5 else "bearish" if (_last(tsi25) or 0) < -5 else "neutral",
},
"volume": {
"last": round(float(vols[-1]), 2),
"sma20": round(float(vol_sma[-1]), 2),
"above_avg": bool(vols[-1] > vol_sma[-1]),
},
"trend": {
"ema_bullish_stack": bool(
all(x is not None for x in [_last(ema9),_last(ema21),_last(ema50)]) and
_last(ema9) > _last(ema21) > _last(ema50) # type: ignore
),
"above_ema200": bool(_last(ema200) is not None and last_close > (_last(ema200) or 0)),
"close": round(float(last_close), 8),
"short_momentum_ref": float(closes[max(0, len(closes)-6)]) if len(closes) else float(last_close),
},
"short_momentum_ref": float(closes[max(0, len(closes)-6)]) if len(closes) else float(last_close),
"series": {
"ema9": _series(ema9),
"ema21": _series(ema21),
"ema50": _series(ema50),
"bb_upper": _series(bb_u),
"bb_mid": _series(bb_m),
"bb_lower": _series(bb_l),
},
}
# ─────────────────────────────────────────────────────────────────────────────
# UTILITIES
# ─────────────────────────────────────────────────────────────────────────────
def _clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
def _pct(a: float, b: float) -> float:
"""(a - b) / b * 100, an toΓ n vα»›i b = 0."""
return (a - b) / b * 100.0 if b != 0 else 0.0
def _safe(v: Optional[float], default: float = 0.0) -> float:
if v is None or math.isnan(v) or math.isinf(v):
return default
return float(v)
def _classify_regime(indicators: Dict[str, Any], last_close: float, atr_pct: float) -> Tuple[str, str]:
"""PhΓ’n loαΊ‘i trαΊ‘ng thΓ‘i thα»‹ trường dα»±a trΓͺn biαΊΏn Δ‘α»™ng vΓ  xu hΖ°α»›ng."""
rsi = _safe(indicators["rsi"].get("value"), 50.0)
ema9 = _safe(indicators["ema"].get("ema9"), last_close)
ema50 = _safe(indicators["ema"].get("ema50"), last_close)
ema200 = _safe(indicators["ema"].get("ema200"), last_close)
# Logic 8 trαΊ‘ng thΓ‘i
if atr_pct > 3.5:
if rsi > 60: return "volatile_bull", "TΔƒng trưởng biαΊΏn Δ‘α»™ng cao"
if rsi < 40: return "volatile_bear", "Sα»₯t giαΊ£m biαΊΏn Δ‘α»™ng cao"
return "high_chaos", "Thα»‹ trường hα»—n loαΊ‘n / Vol cao"
if last_close > ema200 and ema9 > ema50:
if rsi > 70: return "overextended_bull", "TΔƒng trưởng quΓ‘ mα»©c (Overbought)"
return "stable_bull", "Xu hướng tăng ổn định"
if last_close < ema200 and ema9 < ema50:
if rsi < 30: return "capitulation", "HoαΊ£ng loαΊ‘n / QuΓ‘ bΓ‘n (Oversold)"
return "stable_bear", "Xu hΖ°α»›ng giαΊ£m α»•n Δ‘α»‹nh"
if abs(_pct(ema9, ema50)) < 0.5:
return "tight_range", "TΓ­ch lΕ©y biΓͺn Δ‘α»™ hαΊΉp (Squeeze)"
return "sideways", "Thα»‹ trường Δ‘i ngang (Sideway)"
OSC_WEIGHTS = {
"rsi": 2.1,
"macd": 2.4,
"stoch_rsi": 1.1,
"stoch": 1.2,
"cci": 1.7,
"adx": 1.8,
"williams": 1.5,
"ultimate": 1.1,
"bbp": 1.0,
"ao": 0.9,
"momentum": 1.6,
"roc": 1.3,
"trix": 1.4,
"ppo": 1.4,
"cmo": 1.2,
"dpo": 0.8,
"aroon": 1.3,
"tsi": 1.4,
"demarker": 1.2,
}
MA_WEIGHT_MAP = {
"ema_1_2": 0.8,
"ema_2_5": 1.0,
"ema_5_10": 1.1,
"ema_10_20": 1.35,
"ema_20_50": 1.8,
"ema_50_100": 2.2,
"ema_100_200": 2.7,
"ichimoku_base_26": 1.9,
}
def _extract_osc_key(label: str) -> str:
l = label.lower()
if "sα»©c mαΊ‘nh tΖ°Ζ‘ng Δ‘α»‘i" in l or ("rsi" in l and "nhanh" not in l): return "rsi"
if "macd" in l: return "macd"
if "stochastic %k" in l: return "stoch"
if "nhanh" in l or "stoch_rsi" in l: return "stoch_rsi"
if "kΓͺnh hΓ ng hΓ³a" in l or "cci" in l: return "cci"
if "Δ‘α»‹nh hΖ°α»›ng" in l or "adx" in l: return "adx"
if "williams" in l: return "williams"
if "ultimate" in l: return "ultimate"
if "bbp" in l or "sα»©c mαΊ‘nh giΓ‘" in l: return "bbp"
if "ao" in l: return "ao"
if "xung lượng" in l or "momentum" in l: return "momentum"
if "roc" in l: return "roc"
if "trix" in l: return "trix"
if "ppo" in l: return "ppo"
if "cmo" in l: return "cmo"
if "dpo" in l: return "dpo"
if "aroon" in l: return "aroon"
if "tsi" in l: return "tsi"
if "demarker" in l: return "demarker"
return "unknown"
def _get_ma_weight(label: str) -> float:
l = label.lower()
if l in MA_WEIGHT_MAP:
return MA_WEIGHT_MAP[l]
pair = re.findall(r"\d+", l)
if len(pair) >= 2:
return MA_WEIGHT_MAP.get(f"ema_{pair[0]}_{pair[1]}", 1.0)
return 1.0
def _gauge_to_signal(gauge: float, interval: str = "1h") -> str:
"""Strict 3-level signal converter."""
if gauge >= 66.66: return "Mua"
if gauge > 33.33: return "Trung lαΊ­p"
return "BΓ‘n"
def _gauge_to_normalized_score(gauge: float) -> float:
"""Convert a 0..100 gauge into a -1..1 frontend-friendly scale."""
return _clamp((float(gauge) - 50.0) / 50.0, -1.0, 1.0)
def _forecast_path_metrics(p50_path: np.ndarray, last_close: float) -> Dict[str, float]:
"""Measure forecast quality from the full path, not only the final endpoint."""
if len(p50_path) == 0 or abs(last_close) <= 1e-8:
return {
"weighted_return_pct": 0.0,
"final_return_pct": 0.0,
"path_consistency": 50.0,
"monotonicity": 50.0,
"max_adverse_excursion_pct": 0.0,
"mean_step_return_pct": 0.0,
}
p50_safe = np.array(p50_path, dtype=float)
if not np.isfinite(p50_safe).all():
last_valid = float(last_close)
for i in range(len(p50_safe)):
if not np.isfinite(p50_safe[i]):
p50_safe[i] = last_valid
else:
last_valid = p50_safe[i]
ret_path = ((p50_safe / last_close) - 1.0) * 100.0
step_weights = np.linspace(1.0, 0.65, len(ret_path))
weighted_ret_pct = float(np.average(ret_path, weights=step_weights))
final_ret_pct = float(ret_path[-1])
final_sign = 0 if abs(final_ret_pct) < 0.05 else (1 if final_ret_pct > 0 else -1)
if len(ret_path) > 1 and final_sign != 0:
signed_steps = [
1.0 if np.sign(curr - prev) == final_sign else 0.0
for prev, curr in zip(ret_path[:-1], ret_path[1:])
if abs(curr - prev) >= 0.02
]
path_consistency = float(sum(signed_steps) / len(signed_steps)) if signed_steps else 0.5
else:
path_consistency = 0.5
if len(ret_path) > 1:
monotonicity = float(np.mean(np.diff(ret_path) >= 0)) if final_sign >= 0 else float(np.mean(np.diff(ret_path) <= 0))
else:
monotonicity = 0.5
if final_sign > 0:
adverse = abs(float(np.min(ret_path)))
elif final_sign < 0:
adverse = abs(float(np.max(ret_path)))
else:
adverse = max(abs(float(np.min(ret_path))), abs(float(np.max(ret_path))))
mean_step_return_pct = float(np.mean(np.diff(ret_path))) if len(ret_path) > 1 else final_ret_pct
return {
"weighted_return_pct": round(weighted_ret_pct, 2),
"final_return_pct": round(final_ret_pct, 2),
"path_consistency": round(path_consistency * 100.0, 1),
"monotonicity": round(monotonicity * 100.0, 1),
"max_adverse_excursion_pct": round(adverse, 2),
"mean_step_return_pct": round(mean_step_return_pct, 3),
}
def _forecast_band_profile(
p10_path: np.ndarray,
p50_path: np.ndarray,
p90_path: np.ndarray,
) -> Dict[str, float]:
"""Summarize uncertainty bands across the full forecast path."""
if len(p50_path) == 0:
return {
"avg_band_pct": 0.0,
"end_band_pct": 0.0,
"band_stability_pct": 0.0,
"band_step_change_pct": 0.0,
}
safe_mid = np.maximum(np.abs(p50_path), 1e-8)
band_pct = np.maximum(((p90_path - p10_path) / safe_mid) * 100.0, 0.0)
band_step_changes = np.abs(np.diff(band_pct)) if len(band_pct) > 1 else np.array([0.0], dtype=float)
return {
"avg_band_pct": round(float(np.mean(band_pct)), 4),
"end_band_pct": round(float(band_pct[-1]), 4),
"band_stability_pct": round(float(np.std(band_pct)), 4),
"band_step_change_pct": round(float(np.mean(band_step_changes)), 4),
}
def _forecast_path_responsiveness(p50_path: np.ndarray, last_close: float) -> Dict[str, float]:
"""Quantify how much the forecast path actually moves relative to the anchor."""
if len(p50_path) == 0 or abs(last_close) <= 1e-8:
return {
"range_pct": 0.0,
"step_abs_mean_pct": 0.0,
"step_abs_max_pct": 0.0,
}
ret_path = ((p50_path / last_close) - 1.0) * 100.0
diffs = np.diff(ret_path) if len(ret_path) > 1 else np.array([ret_path[-1]], dtype=float)
return {
"range_pct": round(float(np.max(ret_path) - np.min(ret_path)), 4),
"step_abs_mean_pct": round(float(np.mean(np.abs(diffs))), 4),
"step_abs_max_pct": round(float(np.max(np.abs(diffs))), 4),
}
def _recent_ohlc4_abs_step_pct(ohlc4: np.ndarray, window: int = 20) -> float:
"""Use recent OHLC4 movement as a volatility-aware floor for forecast responsiveness."""
if len(ohlc4) < 2:
return 0.0
base = np.maximum(np.abs(ohlc4[:-1]), 1e-8)
returns = np.diff(ohlc4) / base * 100.0
recent = returns[-min(window, len(returns)) :]
return round(float(np.mean(np.abs(recent))), 4) if len(recent) else 0.0
def _future_window_path_stats(
ohlc4: np.ndarray,
anchor_index: int,
horizon: int,
) -> Optional[Dict[str, float]]:
"""Measure the realized future path that followed a historical anchor point."""
if anchor_index < 0 or horizon <= 0:
return None
if anchor_index + horizon >= len(ohlc4):
return None
baseline = float(ohlc4[anchor_index])
future_path = np.asarray(ohlc4[anchor_index + 1 : anchor_index + 1 + horizon], dtype=float)
responsiveness = _forecast_path_responsiveness(future_path, baseline)
final_return_pct = (
((future_path[-1] / max(abs(baseline), 1e-8)) - 1.0) * 100.0
if len(future_path)
else 0.0
)
return {
"range_pct": float(responsiveness["range_pct"]),
"step_abs_mean_pct": float(responsiveness["step_abs_mean_pct"]),
"step_abs_max_pct": float(responsiveness["step_abs_max_pct"]),
"final_return_pct": float(final_return_pct),
}
def _rolling_future_path_targets(
ohlc4: np.ndarray,
horizon: int,
max_windows: int = 48,
) -> Optional[Dict[str, float]]:
"""Summarize realized future-path amplitudes from recent historical anchors."""
if len(ohlc4) < horizon + 2:
return None
last_anchor = len(ohlc4) - horizon - 1
start_anchor = max(0, last_anchor - max_windows + 1)
stats: List[Dict[str, float]] = []
for anchor_index in range(start_anchor, last_anchor + 1):
path_stats = _future_window_path_stats(ohlc4, anchor_index, horizon)
if path_stats is not None:
stats.append(path_stats)
if not stats:
return None
weights = np.linspace(0.6, 1.0, len(stats), dtype=float)
range_vals = np.array([item["range_pct"] for item in stats], dtype=float)
step_vals = np.array([item["step_abs_mean_pct"] for item in stats], dtype=float)
return {
"window_count": len(stats),
"range_pct": round(float(np.average(range_vals, weights=weights)), 4),
"step_abs_mean_pct": round(float(np.average(step_vals, weights=weights)), 4),
}
def _context_regime_signature(ohlc4_window: np.ndarray) -> Dict[str, float]:
"""Describe the local market regime of an OHLC4 window using scale-free features."""
if len(ohlc4_window) < 2:
return {
"net_change_pct": 0.0,
"vol_pct": 0.0,
"abs_step_mean_pct": 0.0,
"range_pct": 0.0,
}
base = np.maximum(np.abs(ohlc4_window[:-1]), 1e-8)
returns = np.diff(ohlc4_window) / base * 100.0
first = max(abs(float(ohlc4_window[0])), 1e-8)
return {
"net_change_pct": float(((ohlc4_window[-1] / first) - 1.0) * 100.0),
"vol_pct": float(np.std(returns)) if len(returns) > 1 else 0.0,
"abs_step_mean_pct": float(np.mean(np.abs(returns))) if len(returns) else 0.0,
"range_pct": float(((np.max(ohlc4_window) - np.min(ohlc4_window)) / first) * 100.0),
}
def _regime_signature_distance(current: Dict[str, float], candidate: Dict[str, float]) -> float:
"""Weighted distance between two market-regime signatures."""
terms = (
("net_change_pct", 0.34, 0.40),
("vol_pct", 0.28, 0.18),
("abs_step_mean_pct", 0.22, 0.12),
("range_pct", 0.16, 0.30),
)
distance = 0.0
for key, weight, floor in terms:
lhs = float(current.get(key, 0.0))
rhs = float(candidate.get(key, 0.0))
scale = max(abs(lhs), abs(rhs), floor)
distance += weight * (abs(lhs - rhs) / scale)
return float(distance)
def _regime_matched_future_targets(
ohlc4: np.ndarray,
context_len: int,
horizon: int,
search_limit: int = 720,
top_k: int = 18,
) -> Optional[Dict[str, float]]:
"""Find historical windows with a similar regime and measure their realized future amplitudes."""
max_anchor = len(ohlc4) - horizon - 1
if max_anchor < 32:
return None
signature_len = max(32, min(int(context_len), 96, max_anchor + 1))
if len(ohlc4) < signature_len + horizon + 1:
return None
current_signature = _context_regime_signature(
np.asarray(ohlc4[-signature_len:], dtype=float)
)
min_anchor = signature_len - 1
start_anchor = max(min_anchor, max_anchor - search_limit + 1)
matches: List[Tuple[float, Dict[str, float]]] = []
for anchor_index in range(start_anchor, max_anchor + 1):
context_window = np.asarray(
ohlc4[anchor_index - signature_len + 1 : anchor_index + 1],
dtype=float,
)
candidate_signature = _context_regime_signature(context_window)
path_stats = _future_window_path_stats(ohlc4, anchor_index, horizon)
if path_stats is None:
continue
distance = _regime_signature_distance(current_signature, candidate_signature)
matches.append((distance, path_stats))
if not matches:
return None
matches.sort(key=lambda item: item[0])
chosen = matches[: min(top_k, len(matches))]
weights = np.array([1.0 / (item[0] + 0.08) for item in chosen], dtype=float)
range_vals = np.array([item[1]["range_pct"] for item in chosen], dtype=float)
step_vals = np.array([item[1]["step_abs_mean_pct"] for item in chosen], dtype=float)
return {
"match_count": len(chosen),
"signature_len": signature_len,
"range_pct": round(float(np.average(range_vals, weights=weights)), 4),
"step_abs_mean_pct": round(float(np.average(step_vals, weights=weights)), 4),
}
def _build_regime_texture_template(
ohlc4: np.ndarray,
context_len: int,
horizon: int,
search_limit: int = 720,
top_k: int = 18,
) -> Optional[Dict[str, Any]]:
"""Build a step-return template from regime-matched historical futures."""
max_anchor = len(ohlc4) - horizon - 1
if max_anchor < 32:
return None
signature_len = max(32, min(int(context_len), 96, max_anchor + 1))
if len(ohlc4) < signature_len + horizon + 1:
return None
current_signature = _context_regime_signature(
np.asarray(ohlc4[-signature_len:], dtype=float)
)
min_anchor = signature_len - 1
start_anchor = max(min_anchor, max_anchor - search_limit + 1)
matches: List[Tuple[float, np.ndarray, Dict[str, float]]] = []
for anchor_index in range(start_anchor, max_anchor + 1):
context_window = np.asarray(
ohlc4[anchor_index - signature_len + 1 : anchor_index + 1],
dtype=float,
)
candidate_signature = _context_regime_signature(context_window)
distance = _regime_signature_distance(current_signature, candidate_signature)
baseline = float(ohlc4[anchor_index])
future_path = np.asarray(
ohlc4[anchor_index + 1 : anchor_index + 1 + horizon],
dtype=float,
)
if len(future_path) != horizon:
continue
prev_values = np.concatenate(([baseline], future_path[:-1]))
step_returns = ((future_path / np.maximum(np.abs(prev_values), 1e-8)) - 1.0) * 100.0
path_stats = _forecast_path_responsiveness(future_path, baseline)
matches.append((distance, step_returns, path_stats))
if not matches:
return None
matches.sort(key=lambda item: item[0])
chosen = matches[: min(top_k, len(matches))]
weights = np.array([1.0 / (distance + 0.08) for distance, _, _ in chosen], dtype=float)
step_matrix = np.vstack([step_returns for _, step_returns, _ in chosen])
averaged_steps = np.average(step_matrix, axis=0, weights=weights)
best_distance = float("inf")
best_steps: Optional[np.ndarray] = None
best_score = float("-inf")
for distance, step_returns, path_stats in chosen[: min(6, len(chosen))]:
step_abs_mean_pct = float(np.mean(np.abs(step_returns))) if len(step_returns) else 0.0
range_pct = float(path_stats.get("range_pct", 0.0))
texture_score = (
step_abs_mean_pct * 0.60
+ range_pct * 0.40
- (distance * 0.12)
)
if texture_score > best_score:
best_score = texture_score
best_distance = distance
best_steps = step_returns
if best_steps is None:
best_steps = averaged_steps
best_distance = float(chosen[0][0])
blended_steps = (best_steps * 0.72) + (averaged_steps * 0.28)
step_abs_mean_pct = float(np.mean(np.abs(blended_steps))) if len(blended_steps) else 0.0
simulated_prices = [baseline]
price = baseline
for step_return in blended_steps:
price = max(0.0, price * (1.0 + (float(step_return) / 100.0)))
simulated_prices.append(price)
simulated_path = np.asarray(simulated_prices[1:], dtype=float)
range_pct = _forecast_path_responsiveness(simulated_path, baseline)["range_pct"] if len(simulated_path) else 0.0
return {
"step_returns_pct": blended_steps,
"step_abs_mean_pct": round(step_abs_mean_pct, 4),
"range_pct": round(float(range_pct), 4),
"match_count": len(chosen),
"signature_len": signature_len,
"best_match_distance": round(best_distance, 4),
}
def _derive_target_amplitude_profile(
ohlc4: np.ndarray,
context_len: int,
horizon: int,
) -> Dict[str, Any]:
"""Blend recent and regime-matched history into a target amplitude profile."""
recent_abs_step_pct = _recent_ohlc4_abs_step_pct(ohlc4, window=20)
rolling_targets = _rolling_future_path_targets(
ohlc4,
horizon=horizon,
max_windows=max(24, min(72, horizon * 6)),
)
regime_targets = _regime_matched_future_targets(
ohlc4,
context_len=context_len,
horizon=horizon,
)
step_terms: List[Tuple[float, float]] = []
if recent_abs_step_pct > 0:
step_terms.append((recent_abs_step_pct, 0.24))
if rolling_targets is not None:
step_terms.append((float(rolling_targets["step_abs_mean_pct"]), 0.31))
if regime_targets is not None:
step_terms.append((float(regime_targets["step_abs_mean_pct"]), 0.45))
if step_terms:
target_step_abs_mean_pct = sum(value * weight for value, weight in step_terms) / sum(
weight for _, weight in step_terms
)
else:
target_step_abs_mean_pct = 0.0
range_terms: List[Tuple[float, float]] = []
if rolling_targets is not None:
range_terms.append((float(rolling_targets["range_pct"]), 0.42))
if regime_targets is not None:
range_terms.append((float(regime_targets["range_pct"]), 0.58))
if not range_terms and target_step_abs_mean_pct > 0:
fallback_range = target_step_abs_mean_pct * max(2.0, min(math.sqrt(max(horizon, 1)) * 1.6, 4.5))
range_terms.append((fallback_range, 1.0))
target_range_pct = sum(value * weight for value, weight in range_terms) / sum(
weight for _, weight in range_terms
) if range_terms else 0.0
return {
"target_step_abs_mean_pct": round(float(target_step_abs_mean_pct), 4),
"target_range_pct": round(float(target_range_pct), 4),
"recent_abs_step_pct": round(float(recent_abs_step_pct), 4),
"rolling_targets": rolling_targets,
"regime_targets": regime_targets,
}
def _build_raw_ohlc4_bundle(
model_output: Dict[str, Any],
last_ohlc4: float,
) -> Dict[str, Any]:
"""Build the OHLC4-path bundle directly from raw TimesFM output."""
raw_p10 = np.array(model_output["p10"], dtype=float)
raw_p50 = np.array(model_output["p50"], dtype=float)
raw_p90 = np.array(model_output["p90"], dtype=float)
path_metrics = _forecast_path_metrics(raw_p50, last_ohlc4)
avg_band_pct = float(
np.mean((raw_p90 - raw_p10) / np.maximum(np.abs(raw_p50), 1e-8)) * 100.0
) if len(raw_p50) else 0.0
band_certainty = math.exp(-avg_band_pct / 4.0)
path_consistency = path_metrics["path_consistency"] / 100.0
monotonicity = path_metrics["monotonicity"] / 100.0
move_pct = abs(path_metrics["final_return_pct"])
confidence = (
28.0
+ band_certainty * 34.0
+ path_consistency * 18.0
+ monotonicity * 12.0
+ min(move_pct, 4.0) * 2.0
)
confidence = max(20.0, min(95.0, confidence))
final_sign = 0 if abs(path_metrics["final_return_pct"]) < 0.05 else (1 if path_metrics["final_return_pct"] > 0 else -1)
weighted_sign = 0 if abs(path_metrics["weighted_return_pct"]) < 0.05 else (1 if path_metrics["weighted_return_pct"] > 0 else -1)
agreement = final_sign == 0 or weighted_sign == 0 or final_sign == weighted_sign
return {
"p10": raw_p10,
"p50": raw_p50,
"p90": raw_p90,
"model_weight": 1.0,
"anchor_weight": 0.0,
"agreement": agreement,
"scale": 1.0,
"confidence": round(confidence, 2),
"model_bias_pct": 0.0,
"path_metrics": path_metrics,
"mode": "raw_timesfm_ohlc4",
}
def _apply_forecast_amplitude_calibration(
raw_bundle: Dict[str, Any],
last_ohlc4: float,
target_profile: Dict[str, Any],
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Scale raw TimesFM OHLC4 paths so their step/range amplitude stays realistic."""
raw_p10 = np.asarray(raw_bundle.get("p10", []), dtype=float)
raw_p50 = np.asarray(raw_bundle.get("p50", []), dtype=float)
raw_p90 = np.asarray(raw_bundle.get("p90", []), dtype=float)
raw_responsiveness = _forecast_path_responsiveness(raw_p50, last_ohlc4)
raw_step = max(float(raw_responsiveness["step_abs_mean_pct"]), 1e-6)
raw_range = max(float(raw_responsiveness["range_pct"]), 1e-6)
target_step = max(float(target_profile.get("target_step_abs_mean_pct") or 0.0), 0.0)
target_range = max(float(target_profile.get("target_range_pct") or 0.0), 0.0)
ratios: List[Tuple[float, float]] = []
if target_step > 0:
ratios.append((target_step / raw_step, 0.62))
if target_range > 0:
ratios.append((target_range / raw_range, 0.38))
if ratios:
log_scale = sum(weight * math.log(max(ratio, 1e-6)) for ratio, weight in ratios) / sum(
weight for _, weight in ratios
)
scale = math.exp(log_scale)
else:
scale = 1.0
scale = float(np.clip(scale, 0.82, 2.35))
if raw_step >= target_step * 0.92 and raw_range >= target_range * 0.88:
scale = float(np.clip(scale, 0.9, 1.25))
calibrated_p10 = np.maximum(0.0, last_ohlc4 + ((raw_p10 - last_ohlc4) * scale))
calibrated_p50 = np.maximum(0.0, last_ohlc4 + ((raw_p50 - last_ohlc4) * scale))
calibrated_p90 = np.maximum(0.0, last_ohlc4 + ((raw_p90 - last_ohlc4) * scale))
calibrated_bundle = _build_raw_ohlc4_bundle(
{
"p10": calibrated_p10,
"p50": calibrated_p50,
"p90": calibrated_p90,
},
last_ohlc4,
)
calibrated_bundle["mode"] = "timesfm_ohlc4_vol_calibrated"
calibration_meta = {
"enabled": True,
"version": "volatility_regime_v1",
"scale": round(scale, 4),
"raw_path": raw_responsiveness,
"calibrated_path": _forecast_path_responsiveness(calibrated_p50, last_ohlc4),
"targets": target_profile,
}
return calibrated_bundle, calibration_meta
def _apply_forecast_path_texture(
base_bundle: Dict[str, Any],
last_ohlc4: float,
target_profile: Dict[str, Any],
texture_template: Optional[Dict[str, Any]],
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Inject regime-matched step texture so each future point is updated sequentially."""
base_p10 = np.asarray(base_bundle.get("p10", []), dtype=float)
base_p50 = np.asarray(base_bundle.get("p50", []), dtype=float)
base_p90 = np.asarray(base_bundle.get("p90", []), dtype=float)
base_path = _forecast_path_responsiveness(base_p50, last_ohlc4)
texture_meta: Dict[str, Any] = {
"enabled": True,
"version": "historical_step_texture_v2",
"applied": False,
"blend_alpha": 0.0,
"base_path": base_path,
"textured_path": base_path,
"template": {
"match_count": 0,
"signature_len": 0,
"step_abs_mean_pct": 0.0,
"range_pct": 0.0,
},
}
if texture_template is None:
return base_bundle, texture_meta
template_steps = np.asarray(texture_template.get("step_returns_pct", []), dtype=float)
if len(template_steps) != len(base_p50):
return base_bundle, texture_meta
template_step = max(float(texture_template.get("step_abs_mean_pct") or 0.0), 1e-6)
template_range = max(float(texture_template.get("range_pct") or 0.0), 1e-6)
target_step = max(float(target_profile.get("target_step_abs_mean_pct") or 0.0), 0.0)
target_range = max(float(target_profile.get("target_range_pct") or 0.0), 0.0)
base_step = float(base_path["step_abs_mean_pct"])
base_range = float(base_path["range_pct"])
base_monotonicity = float(base_bundle.get("path_metrics", {}).get("monotonicity", 50.0))
step_gap = max((target_step * 0.94) - base_step, 0.0)
range_gap = max((target_range * 0.78) - base_range, 0.0)
if base_monotonicity >= 88.0:
step_gap = max(step_gap, target_step * 0.16)
range_gap = max(range_gap, target_range * 0.10)
if step_gap <= 0.0 and range_gap <= 0.0:
texture_meta["template"] = {
"match_count": int(texture_template.get("match_count") or 0),
"signature_len": int(texture_template.get("signature_len") or 0),
"step_abs_mean_pct": round(float(texture_template.get("step_abs_mean_pct") or 0.0), 4),
"range_pct": round(float(texture_template.get("range_pct") or 0.0), 4),
}
return base_bundle, texture_meta
ratios: List[float] = []
if step_gap > 0.0:
ratios.append(step_gap / template_step)
if range_gap > 0.0:
ratios.append(range_gap / template_range)
blend_alpha = float(np.clip(np.median(ratios) if ratios else 0.0, 0.0, 0.82))
def _count_direction_flips(series: np.ndarray) -> int:
if len(series) < 2:
return 0
signs = [int(np.sign(delta)) for delta in series if abs(delta) >= 0.015]
return sum(1 for prev, curr in zip(signs[:-1], signs[1:]) if prev != curr)
template_flips = _count_direction_flips(template_steps)
if base_monotonicity >= 92.0 and template_flips >= 1:
blend_alpha = max(blend_alpha, 0.28)
def _step_returns_from_path(path: np.ndarray, anchor_price: float) -> np.ndarray:
prev_values = np.concatenate(([anchor_price], path[:-1]))
return ((path / np.maximum(np.abs(prev_values), 1e-8)) - 1.0) * 100.0
def _rebuild_from_step_returns(path: np.ndarray, step_returns_pct: np.ndarray) -> np.ndarray:
rebuilt: List[float] = []
prev_price = float(last_ohlc4)
for raw_price, textured_step in zip(path, step_returns_pct):
raw_step = ((float(raw_price) / max(abs(prev_price), 1e-8)) - 1.0) * 100.0
raw_factor = max(1e-6, 1.0 + (raw_step / 100.0))
target_factor = max(1e-6, 1.0 + (float(textured_step) / 100.0))
factor_ratio = target_factor / raw_factor
next_price = max(0.0, float(raw_price) * factor_ratio)
rebuilt.append(next_price)
prev_price = next_price
return np.asarray(rebuilt, dtype=float)
base_steps_p50 = _step_returns_from_path(base_p50, last_ohlc4)
textured_steps_p50 = (base_steps_p50 * (1.0 - blend_alpha)) + (template_steps * blend_alpha)
if base_monotonicity >= 92.0 and template_flips >= 1:
textured_flips = _count_direction_flips(textured_steps_p50)
while textured_flips < 1 and blend_alpha < 0.92:
blend_alpha = min(blend_alpha * 1.16, 0.92)
textured_steps_p50 = (base_steps_p50 * (1.0 - blend_alpha)) + (template_steps * blend_alpha)
textured_flips = _count_direction_flips(textured_steps_p50)
textured_p10 = _rebuild_from_step_returns(base_p10, textured_steps_p50)
textured_p50 = _rebuild_from_step_returns(base_p50, textured_steps_p50)
textured_p90 = _rebuild_from_step_returns(base_p90, textured_steps_p50)
textured_bundle = _build_raw_ohlc4_bundle(
{
"p10": textured_p10,
"p50": textured_p50,
"p90": textured_p90,
},
last_ohlc4,
)
textured_path = _forecast_path_responsiveness(
np.asarray(textured_bundle.get("p50", []), dtype=float),
last_ohlc4,
)
if target_step > 0.0 and float(textured_path["step_abs_mean_pct"]) > target_step * 1.28:
shrink = (target_step * 1.28) / max(float(textured_path["step_abs_mean_pct"]), 1e-6)
blend_alpha *= shrink
textured_steps_p50 = (base_steps_p50 * (1.0 - blend_alpha)) + (template_steps * blend_alpha)
textured_p10 = _rebuild_from_step_returns(base_p10, textured_steps_p50)
textured_p50 = _rebuild_from_step_returns(base_p50, textured_steps_p50)
textured_p90 = _rebuild_from_step_returns(base_p90, textured_steps_p50)
textured_bundle = _build_raw_ohlc4_bundle(
{
"p10": textured_p10,
"p50": textured_p50,
"p90": textured_p90,
},
last_ohlc4,
)
textured_path = _forecast_path_responsiveness(
np.asarray(textured_bundle.get("p50", []), dtype=float),
last_ohlc4,
)
textured_bundle["mode"] = "timesfm_ohlc4_textured"
texture_meta.update(
{
"applied": blend_alpha > 0.0,
"blend_alpha": round(blend_alpha, 4),
"textured_path": textured_path,
"template": {
"match_count": int(texture_template.get("match_count") or 0),
"signature_len": int(texture_template.get("signature_len") or 0),
"step_abs_mean_pct": round(float(texture_template.get("step_abs_mean_pct") or 0.0), 4),
"range_pct": round(float(texture_template.get("range_pct") or 0.0), 4),
},
}
)
return textured_bundle, texture_meta
def _score_forecast_context_candidate(
analysis_bundle: Dict[str, Any],
last_ohlc4: float,
recent_abs_step_pct: float,
) -> Dict[str, float]:
"""Prefer contexts that stay confident without collapsing into a near-flat path."""
p50_path = np.asarray(analysis_bundle.get("p50", []), dtype=float)
responsiveness = _forecast_path_responsiveness(p50_path, last_ohlc4)
step_abs_mean_pct = float(responsiveness["step_abs_mean_pct"])
range_pct = float(responsiveness["range_pct"])
step_floor = max(0.015, recent_abs_step_pct * 0.18)
range_floor = max(0.05, recent_abs_step_pct * 0.70)
step_bonus = min(step_abs_mean_pct / step_floor, 1.0) * 14.0
range_bonus = min(range_pct / range_floor, 1.0) * 10.0
flat_penalty = (
6.0
if step_abs_mean_pct < (step_floor * 0.55) and range_pct < (range_floor * 0.55)
else 0.0
)
confidence = float(analysis_bundle.get("confidence", 0.0))
score = confidence + step_bonus + range_bonus - flat_penalty
return {
"score": round(score, 4),
"confidence": round(confidence, 4),
"step_floor": round(step_floor, 4),
"range_floor": round(range_floor, 4),
**responsiveness,
}
def _calc_vote_gauge(buy: int, sell: int, neutral: int) -> float:
"""Equal-weight PTKT vote gauge from Buy / Sell / Neutral counts."""
buy_f = float(max(0, buy))
sell_f = float(max(0, sell))
neutral_f = float(max(0, neutral))
denominator = buy_f + sell_f + neutral_f
if denominator <= 0:
return 50.0
raw_gauge = 50.0 + 50.0 * ((buy_f - sell_f) / denominator)
return max(0.0, min(100.0, raw_gauge))
def _calc_adx_trend_activation(adx_value: Optional[float] = None) -> float:
"""Sigmoid ADX factor that activates PTKT votes only when trend strength is present."""
adx_numeric = float(adx_value) if adx_value is not None else 0.0
if math.isnan(adx_numeric):
adx_numeric = 0.0
adx_numeric = max(0.0, adx_numeric)
exponent = -((adx_numeric - 20.0) / 8.0)
return float(_clamp(1.0 / (1.0 + math.exp(exponent)), 0.0, 1.0))
def _calc_vote_participation_factor(buy: int, sell: int, neutral: int) -> float:
"""Reward PTKT gauge only when enough signals are directional instead of neutral."""
buy_f = float(max(0, buy))
sell_f = float(max(0, sell))
neutral_f = float(max(0, neutral))
denominator = buy_f + sell_f + neutral_f
if denominator <= 0:
return 0.0
return float(_clamp((buy_f + sell_f) / denominator, 0.0, 1.0))
def _calc_adx_weighted_technical_gauge(
buy: int,
sell: int,
neutral: int,
adx_value: Optional[float] = None,
) -> float:
"""Final PTKT gauge using the sigmoid ADX-weighted vote formula."""
buy_f = float(max(0, buy))
sell_f = float(max(0, sell))
neutral_f = float(max(0, neutral))
denominator = buy_f + sell_f + neutral_f
if denominator <= 0:
return 50.0
trend_activation = _calc_adx_trend_activation(adx_value)
participation_factor = _calc_vote_participation_factor(buy, sell, neutral)
vote_balance = (buy_f - sell_f) / denominator
raw_gauge = 50.0 + (50.0 * vote_balance * participation_factor * trend_activation)
return float(_clamp(raw_gauge, 0.0, 100.0))
def _vote_direction_from_counts(buy: int, sell: int) -> int:
if buy > sell:
return 1
if sell > buy:
return -1
return 0
def _candle_ohlc4(candle: Dict[str, Any]) -> Optional[float]:
try:
open_value = float(candle["open"])
high_value = float(candle["high"])
low_value = float(candle["low"])
close_value = float(candle["close"])
except (KeyError, TypeError, ValueError):
return None
values = (open_value, high_value, low_value, close_value)
if not all(math.isfinite(value) for value in values):
return None
return sum(values) / 4.0
def _performance_action(change_pct: Optional[float], *, included_in_vote: bool) -> str:
if not included_in_vote or change_pct is None:
return "N/A"
if change_pct > PERFORMANCE_NEUTRAL_EPSILON_PCT:
return "Mua"
if change_pct < -PERFORMANCE_NEUTRAL_EPSILON_PCT:
return "BΓ‘n"
return "Trung lαΊ­p"
def _get_performance_vote_weight(lookback: Any) -> float:
try:
return float(PERFORMANCE_VOTE_WEIGHTS.get(int(lookback), 1.0))
except (TypeError, ValueError):
return 1.0
def _build_performance_rows(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
latest_ohlc4 = _candle_ohlc4(data[-1]) if data else None
rows: List[Dict[str, Any]] = []
for lookback in PERFORMANCE_LOOKBACKS:
reference_index = len(data) - 1 - lookback
reference_ohlc4 = _candle_ohlc4(data[reference_index]) if reference_index >= 0 else None
included_in_vote = (
latest_ohlc4 is not None
and reference_ohlc4 is not None
and abs(reference_ohlc4) > 1e-12
)
change_pct = (
((latest_ohlc4 / reference_ohlc4) - 1.0) * 100.0
if included_in_vote and latest_ohlc4 is not None and reference_ohlc4 is not None
else None
)
rows.append(
{
"key": f"performance_{lookback}",
"name": f"Hiệu suαΊ₯t {lookback} nαΊΏn",
"lookback": lookback,
"current_ohlc4": latest_ohlc4,
"reference_ohlc4": reference_ohlc4,
"change_pct": change_pct,
"display_value": (
f"{float(latest_ohlc4):.4f} vs {float(reference_ohlc4):.4f} ({float(change_pct):+.2f}%)"
if included_in_vote and change_pct is not None
else "N/A"
),
"action": _performance_action(change_pct, included_in_vote=included_in_vote),
"included_in_vote": included_in_vote,
}
)
return rows
def _calc_osc_score(osc_data: list, interval: str = "1h") -> dict:
"""Summarize oscillator votes for the PTKT classification table."""
buy = sell = neutral = 0
buy_weight = sell_weight = neutral_weight = 0.0
for item in osc_data:
key = item.get("key") or _extract_osc_key(item["name"])
w = OSC_WEIGHTS.get(key, 1.0)
act = str(item.get("action", "")).strip().lower()
if act == "mua":
buy += 1
buy_weight += w
elif act == "bΓ‘n":
sell += 1
sell_weight += w
else:
neutral += 1
neutral_weight += w
return {
"signal": _gauge_to_signal(_calc_vote_gauge(buy, sell, neutral), interval),
"buy": buy, "sell": sell, "neutral": neutral,
"buy_weight": round(buy_weight, 2),
"sell_weight": round(sell_weight, 2),
"neutral_weight": round(neutral_weight, 2),
"formula_version": TECHNICAL_SCORING_VERSION,
}
def _calc_ma_score(ma_data: list, closes: np.ndarray, interval: str = "1h") -> dict:
"""Summarize moving-average votes for the PTKT classification table."""
buy = sell = neutral = 0
buy_weight = sell_weight = neutral_weight = 0.0
for item in ma_data:
w = _get_ma_weight(item.get("key") or item["name"])
act = str(item.get("action", "")).strip().lower()
if act == "mua":
buy += 1
buy_weight += w
elif act == "bΓ‘n":
sell += 1
sell_weight += w
else:
neutral += 1
neutral_weight += w
ema50 = _ema(closes, 50)[-1] if len(closes) >= 50 else float("nan")
ema200 = _ema(closes, 200)[-1] if len(closes) >= 200 else float("nan")
return {
"signal": _gauge_to_signal(_calc_vote_gauge(buy, sell, neutral), interval),
"buy": buy, "sell": sell, "neutral": neutral,
"buy_weight": round(buy_weight, 2),
"sell_weight": round(sell_weight, 2),
"neutral_weight": round(neutral_weight, 2),
"structure_bonus": 0.0,
"golden_cross": bool(not any(math.isnan(x) for x in (ema50, ema200)) and ema50 > ema200),
"death_cross": bool(not any(math.isnan(x) for x in (ema50, ema200)) and ema50 < ema200),
"formula_version": TECHNICAL_SCORING_VERSION,
}
def _calc_performance_score(performance_data: list, interval: str = "1h") -> dict:
"""Summarize OHLC4 performance votes across configured lookback candles."""
buy = sell = neutral = 0
buy_weight = sell_weight = neutral_weight = 0.0
available = missing = 0
for item in performance_data:
if not bool(item.get("included_in_vote")):
missing += 1
continue
available += 1
vote_weight = _get_performance_vote_weight(item.get("lookback"))
act = str(item.get("action", "")).strip().lower()
if act == "mua":
buy += int(vote_weight)
buy_weight += vote_weight
elif act == "bΓ‘n":
sell += int(vote_weight)
sell_weight += vote_weight
else:
neutral += int(vote_weight)
neutral_weight += vote_weight
return {
"signal": _gauge_to_signal(_calc_vote_gauge(buy, sell, neutral), interval),
"buy": buy,
"sell": sell,
"neutral": neutral,
"buy_weight": round(buy_weight, 2),
"sell_weight": round(sell_weight, 2),
"neutral_weight": round(neutral_weight, 2),
"available": available,
"missing": missing,
"lookbacks": list(PERFORMANCE_LOOKBACKS),
"formula_version": TECHNICAL_SCORING_VERSION,
}
def _calc_ai_forecast_score(
blended: dict,
forecast_rows: List[Dict[str, Any]],
last_close: float,
indicators: dict,
horizon: int,
interval: str,
model_reference_price: Optional[float] = None,
) -> dict:
"""MODULE 3: Score one AI forecast from the full path and band discipline."""
atr_pct = max(float(indicators.get("atr", {}).get("pct") or 0.0), 0.1)
p50_path = np.array(blended.get("p50", []), dtype=float)
p10_path = np.array(blended.get("p10", []), dtype=float)
p90_path = np.array(blended.get("p90", []), dtype=float)
if not len(p50_path):
p50_path = np.array([last_close], dtype=float)
if not len(p10_path):
p10_path = np.array([last_close], dtype=float)
if not len(p90_path):
p90_path = np.array([last_close], dtype=float)
series_reference_price = float(model_reference_price or last_close)
path_metrics = _forecast_path_metrics(p50_path, series_reference_price)
final_ret_pct = path_metrics["final_return_pct"]
weighted_ret_pct = path_metrics["weighted_return_pct"]
market_return_pct = _pct(float(p50_path[-1]), last_close) if len(p50_path) else 0.0
path_consistency = _clamp(path_metrics["path_consistency"] / 100.0, 0.0, 1.0)
monotonicity = _clamp(path_metrics["monotonicity"] / 100.0, 0.0, 1.0)
adverse_excursion_pct = path_metrics["max_adverse_excursion_pct"]
band_profile = _forecast_band_profile(p10_path, p50_path, p90_path)
direction_vote_threshold = max(atr_pct * 0.04, 0.04)
def _direction_vote(value_pct: float) -> int:
if abs(value_pct) < direction_vote_threshold:
return 0
return 1 if value_pct > 0 else -1
direction_votes = [
_direction_vote(weighted_ret_pct),
_direction_vote(final_ret_pct),
_direction_vote(market_return_pct),
]
bullish_votes = sum(1 for vote in direction_votes if vote > 0)
bearish_votes = sum(1 for vote in direction_votes if vote < 0)
direction_vote_balance = (
(bullish_votes - bearish_votes) / max(1, len(direction_votes))
)
direction_bias_sign = 0.0
if direction_vote_balance > 0:
direction_bias_sign = 1.0
elif direction_vote_balance < 0:
direction_bias_sign = -1.0
elif weighted_ret_pct > 0:
direction_bias_sign = 1.0
elif weighted_ret_pct < 0:
direction_bias_sign = -1.0
avg_band_tightness = math.exp(
-float(band_profile["avg_band_pct"]) / max(atr_pct * 3.0, 0.75)
)
end_band_tightness = math.exp(
-float(band_profile["end_band_pct"]) / max(atr_pct * 3.4, 0.85)
)
band_stability_score = math.exp(
-(
(float(band_profile["band_stability_pct"]) / max(atr_pct * 1.8, 0.35))
+ (float(band_profile["band_step_change_pct"]) / max(atr_pct * 1.2, 0.25))
)
* 0.75
)
adverse_control_score = math.exp(
-adverse_excursion_pct / max(atr_pct * 2.5, 0.8)
)
certainty_score = (
avg_band_tightness * 0.38 +
end_band_tightness * 0.16 +
band_stability_score * 0.16 +
path_consistency * 0.15 +
monotonicity * 0.10 +
adverse_control_score * 0.05
)
certainty_score = _clamp(certainty_score, 0.08, 0.98)
directional_edge_pct = (weighted_ret_pct * 0.72) + (final_ret_pct * 0.28)
direction_norm = np.tanh(directional_edge_pct / max(atr_pct * 1.20, 0.28))
weighted_move_in_atr = abs(weighted_ret_pct) / max(atr_pct, 0.1)
final_move_in_atr = abs(final_ret_pct) / max(atr_pct, 0.1)
path_move_score = np.tanh(((weighted_move_in_atr * 0.65) + (final_move_in_atr * 0.35)) / 1.7)
path_quality = 0.48 + 0.30 * path_consistency + 0.22 * monotonicity
certainty_factor = 0.72 + 0.28 * certainty_score
direction_consensus = _clamp(
(abs(direction_vote_balance) * 0.42) +
(path_consistency * 0.36) +
(monotonicity * 0.22),
0.0,
1.0,
)
direction_core_push = (
direction_norm
* (18.0 + 16.0 * path_move_score)
* (0.60 + 0.40 * path_quality)
)
micro_consensus_push = 0.0
if direction_bias_sign != 0.0 and abs(direction_norm) < 0.18 and abs(direction_vote_balance) >= 0.66:
remaining_room = (0.18 - abs(direction_norm)) / 0.18
micro_consensus_push = (
direction_bias_sign
* (2.4 + 5.6 * direction_consensus)
* remaining_room
* (0.44 + 0.56 * certainty_score)
)
direction_gauge = max(
8.0,
min(92.0, 50.0 + direction_core_push + micro_consensus_push),
)
directional_push = (direction_gauge - 50.0) * certainty_factor
conviction_gauge = max(8.0, min(92.0, 50.0 + directional_push))
gauge = direction_gauge
confidence_pct = _clamp((certainty_score * 0.65 + path_quality * 0.35) * 100.0, 12.0, 98.0)
direction_label = "bullish" if gauge >= 58 else "bearish" if gauge <= 42 else "neutral"
return {
"gauge": round(gauge, 1),
"direction_gauge": round(direction_gauge, 1),
"conviction_gauge": round(conviction_gauge, 1),
"normalized_score": round(_gauge_to_normalized_score(gauge), 4),
"confidence_pct": round(confidence_pct, 1),
"forecast_return_pct": round(market_return_pct, 2),
"model_reference_return_pct": round(final_ret_pct, 2),
"weighted_return_pct": round(weighted_ret_pct, 2),
"direction": direction_label,
"direction_consensus": round(direction_consensus * 100.0, 1),
"direction_strength_pct": round(abs(direction_gauge - 50.0) * 2.0, 1),
"magnitude_vs_atr": round(weighted_move_in_atr, 2),
"band_uncertainty_pct": round(float(band_profile["avg_band_pct"]), 2),
"band_uncertainty_end_pct": round(float(band_profile["end_band_pct"]), 2),
"band_stability_pct": round(float(band_profile["band_stability_pct"]), 2),
"certainty": round(certainty_score * 100.0, 1),
"path_consistency": round(path_consistency * 100.0, 1),
"monotonicity": round(monotonicity * 100.0, 1),
"max_adverse_excursion_pct": round(adverse_excursion_pct, 2),
"path_metrics": path_metrics,
"signal": _gauge_to_signal(gauge, interval),
"formula": "full_path_direction_gauge",
"formula_version": AI_SCORING_VERSION,
}
def _calc_technical_score_v2(
osc_score: dict,
ma_score: dict,
performance_score: dict,
interval: str = "1h",
adx_value: Optional[float] = None,
) -> dict:
"""Gauge PTKT follows the sigmoid ADX-weighted vote formula on total technical votes."""
buy = osc_score["buy"] + ma_score["buy"] + performance_score["buy"]
sell = osc_score["sell"] + ma_score["sell"] + performance_score["sell"]
neutral = osc_score["neutral"] + ma_score["neutral"] + performance_score["neutral"]
adx_numeric = float(adx_value) if adx_value is not None else 0.0
if math.isnan(adx_numeric):
adx_numeric = 0.0
adx_numeric = max(0.0, adx_numeric)
trend_activation = _calc_adx_trend_activation(adx_numeric)
vote_participation = _calc_vote_participation_factor(buy, sell, neutral)
final_gauge = _calc_adx_weighted_technical_gauge(buy, sell, neutral, adx_numeric)
osc_dir = _vote_direction_from_counts(int(osc_score["buy"]), int(osc_score["sell"]))
ma_dir = _vote_direction_from_counts(int(ma_score["buy"]), int(ma_score["sell"]))
performance_dir = _vote_direction_from_counts(
int(performance_score["buy"]),
int(performance_score["sell"]),
)
active_group_directions = [
direction
for direction in (osc_dir, ma_dir, performance_dir)
if direction != 0
]
return {
"gauge": round(final_gauge, 1),
"normalized_score": round(_gauge_to_normalized_score(final_gauge), 4),
"signal": _gauge_to_signal(final_gauge, interval),
"buy": buy,
"sell": sell,
"neutral": neutral,
"buy_weight": round(
osc_score.get("buy_weight", 0.0)
+ ma_score.get("buy_weight", 0.0)
+ performance_score.get("buy_weight", 0.0),
2,
),
"sell_weight": round(
osc_score.get("sell_weight", 0.0)
+ ma_score.get("sell_weight", 0.0)
+ performance_score.get("sell_weight", 0.0),
2,
),
"neutral_weight": round(
osc_score.get("neutral_weight", 0.0)
+ ma_score.get("neutral_weight", 0.0)
+ performance_score.get("neutral_weight", 0.0),
2,
),
"adx_period": ADX_GAUGE_PERIOD,
"trend_strength_adx": round(adx_numeric, 2),
"trend_activation": round(trend_activation, 4),
"trend_multiplier": round(trend_activation, 4),
"vote_participation": round(vote_participation, 4),
"alignment": len(active_group_directions) >= 2 and len(set(active_group_directions)) == 1,
"performance_available": int(performance_score.get("available", 0)),
"performance_missing": int(performance_score.get("missing", 0)),
"components": {
"formula": "max(0, min(100, 50 + 50 * ((buy - sell) / total_votes) * ((buy + sell) / total_votes) * (1 / (1 + exp(-(adx10 - 20) / 8)))))",
"formula_version": TECHNICAL_SCORING_VERSION,
"categories": {
"oscillators": osc_score.get("signal", "Trung lαΊ­p"),
"moving_averages": ma_score.get("signal", "Trung lαΊ­p"),
},
}
}
def _calc_technical_score_v2(
osc_score: dict,
ma_score: dict,
performance_score: dict,
interval: str = "1h",
adx_value: Optional[float] = None,
) -> dict:
"""Gauge PTKT follows the sigmoid ADX-weighted vote formula on total technical votes."""
buy = osc_score["buy"] + ma_score["buy"] + performance_score["buy"]
sell = osc_score["sell"] + ma_score["sell"] + performance_score["sell"]
neutral = osc_score["neutral"] + ma_score["neutral"] + performance_score["neutral"]
adx_numeric = float(adx_value) if adx_value is not None else 0.0
if math.isnan(adx_numeric):
adx_numeric = 0.0
adx_numeric = max(0.0, adx_numeric)
trend_activation = _calc_adx_trend_activation(adx_numeric)
vote_participation = _calc_vote_participation_factor(buy, sell, neutral)
final_gauge = _calc_adx_weighted_technical_gauge(buy, sell, neutral, adx_numeric)
osc_dir = _vote_direction_from_counts(int(osc_score["buy"]), int(osc_score["sell"]))
ma_dir = _vote_direction_from_counts(int(ma_score["buy"]), int(ma_score["sell"]))
performance_dir = _vote_direction_from_counts(
int(performance_score["buy"]),
int(performance_score["sell"]),
)
active_group_directions = [
direction
for direction in (osc_dir, ma_dir, performance_dir)
if direction != 0
]
return {
"gauge": round(final_gauge, 1),
"normalized_score": round(_gauge_to_normalized_score(final_gauge), 4),
"signal": _gauge_to_signal(final_gauge, interval),
"buy": buy,
"sell": sell,
"neutral": neutral,
"buy_weight": round(
osc_score.get("buy_weight", 0.0)
+ ma_score.get("buy_weight", 0.0)
+ performance_score.get("buy_weight", 0.0),
2,
),
"sell_weight": round(
osc_score.get("sell_weight", 0.0)
+ ma_score.get("sell_weight", 0.0)
+ performance_score.get("sell_weight", 0.0),
2,
),
"neutral_weight": round(
osc_score.get("neutral_weight", 0.0)
+ ma_score.get("neutral_weight", 0.0)
+ performance_score.get("neutral_weight", 0.0),
2,
),
"adx_period": ADX_GAUGE_PERIOD,
"trend_strength_adx": round(adx_numeric, 2),
"trend_activation": round(trend_activation, 4),
"trend_multiplier": round(trend_activation, 4),
"vote_participation": round(vote_participation, 4),
"alignment": len(active_group_directions) >= 2 and len(set(active_group_directions)) == 1,
"performance_available": int(performance_score.get("available", 0)),
"performance_missing": int(performance_score.get("missing", 0)),
"components": {
"formula": "max(0, min(100, 50 + 50 * ((buy - sell) / total_votes) * ((buy + sell) / total_votes) * (1 / (1 + exp(-(adx10 - 20) / 8)))))",
"formula_version": TECHNICAL_SCORING_VERSION,
"categories": {
"oscillators": osc_score.get("signal", "Trung lαΊ­p"),
"moving_averages": ma_score.get("signal", "Trung lαΊ­p"),
"performance": performance_score.get("signal", "Trung lαΊ­p"),
},
}
}
def _calc_summary_score_v2(tech_score: dict, ai_score: dict, interval: str = "1h") -> dict:
"""Final decision score = arithmetic mean of technical and AI gauges."""
tech_gauge = float(tech_score["gauge"])
ai_gauge = float(ai_score["gauge"])
tech_weight = 0.50
ai_weight = 0.50
final_gauge = max(5.0, min(95.0, (ai_gauge * ai_weight) + (tech_gauge * tech_weight)))
tech_delta = tech_gauge - 50.0
ai_delta = ai_gauge - 50.0
tech_dir = 0 if abs(tech_delta) < 2.0 else (1 if tech_delta > 0 else -1)
ai_dir = 0 if abs(ai_delta) < 2.0 else (1 if ai_delta > 0 else -1)
dist = abs(final_gauge - 50.0)
conviction = "R?t m?nh" if dist >= 25 else "M?nh" if dist >= 15 else "Trung b?nh" if dist >= 8 else "Y?u"
bias = "neutral"
if final_gauge >= 58:
bias = "bullish"
elif final_gauge <= 42:
bias = "bearish"
return {
"gauge": round(final_gauge, 1),
"normalized_score": round(_gauge_to_normalized_score(final_gauge), 4),
"signal": _gauge_to_signal(final_gauge, interval),
"conviction": conviction,
"bias": bias,
"agreement": tech_dir != 0 and tech_dir == ai_dir,
"buy": tech_score["buy"],
"sell": tech_score["sell"],
"neutral": tech_score["neutral"],
"buy_weight": tech_score.get("buy_weight", 0.0),
"sell_weight": tech_score.get("sell_weight", 0.0),
"neutral_weight": tech_score.get("neutral_weight", 0.0),
"components": {
"technical": round(tech_gauge, 1),
"ai_forecast": round(ai_gauge, 1),
"ai_weight": round(ai_weight, 2),
"technical_weight": round(tech_weight, 2),
"certainty_pct": round(float(ai_score.get("certainty", 0.0)), 1),
"formula": "(technical_gauge + ai_gauge) / 2",
"formula_version": AI_SCORING_VERSION,
}
}
def _build_dashboard_payload(
last_close: float,
forecast_rows: List[Dict[str, Any]],
technical_score: Dict[str, Any],
ai_score: Dict[str, Any],
summary: Dict[str, Any],
) -> Dict[str, Any]:
"""Single source of truth for hero gauges consumed by the frontend."""
forecast_end = last_close
if forecast_rows:
forecast_end = float(forecast_rows[-1].get("p50") or last_close)
return {
"technical": {
"gauge": technical_score["gauge"],
"normalized_score": technical_score["normalized_score"],
"signal": technical_score["signal"],
"buy": technical_score["buy"],
"sell": technical_score["sell"],
"neutral": technical_score["neutral"],
"buy_weight": technical_score.get("buy_weight", 0.0),
"sell_weight": technical_score.get("sell_weight", 0.0),
"neutral_weight": technical_score.get("neutral_weight", 0.0),
},
"ai": {
"gauge": ai_score["gauge"],
"direction_gauge": ai_score.get("direction_gauge", ai_score["gauge"]),
"conviction_gauge": ai_score.get("conviction_gauge", ai_score["gauge"]),
"normalized_score": ai_score["normalized_score"],
"signal": ai_score["signal"],
"forecast_return_pct": ai_score["forecast_return_pct"],
"weighted_return_pct": ai_score["weighted_return_pct"],
"confidence_pct": ai_score["confidence_pct"],
"certainty": ai_score["certainty"],
"direction_consensus": ai_score.get("direction_consensus", 0.0),
"direction_strength_pct": ai_score.get("direction_strength_pct", 0.0),
"path_consistency": ai_score["path_consistency"],
"monotonicity": ai_score.get("monotonicity", 50.0),
"max_adverse_excursion_pct": ai_score.get("max_adverse_excursion_pct", 0.0),
"current_price": round(float(last_close), 6),
"forecast_price": round(float(forecast_end), 6),
},
"summary": {
"gauge": summary["gauge"],
"normalized_score": summary["normalized_score"],
"signal": summary["signal"],
"conviction": summary["conviction"],
"bias": summary["bias"],
"agreement": summary.get("agreement", False),
"buy_weight": summary.get("buy_weight", 0.0),
"sell_weight": summary.get("sell_weight", 0.0),
"neutral_weight": summary.get("neutral_weight", 0.0),
"components": summary.get("components", {}),
},
}
def _normalize_presentation_card_payload(
card: Optional[Dict[str, Any]],
*,
prefer_direction_gauge: bool = False,
) -> Dict[str, Any]:
source = dict(card or {})
gauge_candidates: List[Any] = []
if prefer_direction_gauge:
gauge_candidates.append(source.get("direction_gauge"))
gauge_candidates.append(source.get("gauge"))
gauge_value = 50.0
for candidate in gauge_candidates:
try:
numeric = float(candidate)
except (TypeError, ValueError):
continue
if math.isfinite(numeric):
gauge_value = numeric
break
source["gauge"] = round(float(gauge_value), 1)
signal = source.get("signal")
if not isinstance(signal, str) or not signal.strip():
source["signal"] = "--"
return source
def _build_ai_aggregation_message(
requested_count: int,
ready_count: int,
failed_model_keys: List[str],
complete: bool,
*,
ai_available: bool,
summary_available: bool,
) -> str:
if requested_count <= 0:
if ai_available and summary_available:
return "AI forecast san sang"
return "Can chay du bao AI de tong hop"
if failed_model_keys:
failed_labels = ", ".join(str(model_key) for model_key in failed_model_keys)
return f"Dang tong hop {ready_count}/{requested_count} model - loi: {failed_labels}"
if not complete:
return f"Dang tong hop {ready_count}/{requested_count} model AI"
if requested_count == 1 and ai_available and summary_available:
return "AI forecast san sang"
return f"Trung binh cong {requested_count} model AI"
def _build_analysis_presentation(
technical_score: Optional[Dict[str, Any]],
ai_score: Optional[Dict[str, Any]],
summary: Optional[Dict[str, Any]],
*,
requested_models: Optional[Dict[str, bool]] = None,
active_model_keys: Optional[List[str]] = None,
failed_model_keys: Optional[List[str]] = None,
pending_model_keys: Optional[List[str]] = None,
message: Optional[str] = None,
) -> Dict[str, Any]:
technical_card = _normalize_presentation_card_payload(technical_score)
ai_card = _normalize_presentation_card_payload(
ai_score,
prefer_direction_gauge=True,
)
summary_card = _normalize_presentation_card_payload(summary)
requested_payload = (
{model_key: bool(enabled) for model_key, enabled in requested_models.items()}
if isinstance(requested_models, dict)
else {}
)
enabled_keys = [
model_key for model_key, enabled in requested_payload.items() if enabled
]
ready_keys = list(active_model_keys or [])
failed_keys = list(failed_model_keys or [])
if pending_model_keys is None:
pending_keys = [
model_key
for model_key in enabled_keys
if model_key not in ready_keys and model_key not in failed_keys
]
else:
pending_keys = list(pending_model_keys)
ai_available = isinstance(ai_score, dict) and bool(ai_score)
summary_available = isinstance(summary, dict) and bool(summary)
if enabled_keys:
requested_count = len(enabled_keys)
elif ai_available or summary_available:
requested_count = max(1, len(ready_keys) + len(failed_keys) + len(pending_keys))
else:
requested_count = 0
ready_count = len(ready_keys) if ready_keys else (1 if ai_available and summary_available else 0)
finalized = not pending_keys
complete = bool(
ai_available
and summary_available
and not failed_keys
and not pending_keys
and (
requested_count == 0
or ready_count >= requested_count
)
)
commit_ready = bool(finalized and ready_count > 0)
ai_ready = commit_ready and ai_available
summary_ready = commit_ready and summary_available
combo_active = bool(
ai_ready
and summary_ready
and (
summary_card.get("agreement")
or (
abs(float(technical_card.get("gauge", 50.0)) - 50.0) > 6.0
and abs(float(ai_card.get("gauge", 50.0)) - 50.0) > 6.0
and math.copysign(1.0, float(technical_card.get("gauge", 50.0)) - 50.0)
== math.copysign(1.0, float(ai_card.get("gauge", 50.0)) - 50.0)
)
)
)
message_text = message or _build_ai_aggregation_message(
requested_count,
ready_count,
failed_keys,
complete,
ai_available=ai_available,
summary_available=summary_available,
)
if ai_ready:
ai_state = "ready"
elif failed_keys and ready_count <= 0:
ai_state = "error"
elif requested_count > 0:
ai_state = "loading"
else:
ai_state = "idle"
if summary_ready:
summary_state = "ready"
elif failed_keys and ready_count <= 0:
summary_state = "error"
elif requested_count > 0:
summary_state = "loading"
else:
summary_state = "idle"
return {
"version": ANALYSIS_PRESENTATION_VERSION,
"technical": technical_card,
"ai": ai_card,
"summary": summary_card,
"aggregation": {
"requested": requested_payload,
"enabled_keys": enabled_keys,
"ready_keys": ready_keys,
"failed_keys": failed_keys,
"pending_keys": pending_keys,
"complete": complete,
"finalized": finalized,
"commit_ready": commit_ready,
"ready_count": ready_count,
"requested_count": requested_count,
},
"card_states": {
"technical": "ready" if technical_score else "idle",
"ai": ai_state,
"summary": summary_state,
},
"ai_ready": ai_ready,
"summary_ready": summary_ready,
"combo_active": combo_active,
"message": message_text,
}
def _build_trade_analysis(
symbol: str, interval: str, data: List[Dict[str, Any]], indicators: Dict[str, Any],
forecast_rows: List[Dict[str, Any]], confidence: float, source: str,
blended: Optional[Dict[str, Any]] = None,
forecast_reference_price: Optional[float] = None,
include_ai: bool = True,
active_model_keys: Optional[List[str]] = None,
failed_model_keys: Optional[List[str]] = None,
requested_models: Optional[Dict[str, bool]] = None,
) -> Dict[str, Any]:
"""
TradingView-style technical analysis dashboard (v6.1 Rework).
PTKT is one vote-only gauge across oscillator + performance + moving-average rows,
while AI keeps full-path scoring.
"""
if not data or len(data) < 30:
return {
"oscillators": {
"signal": "Trung lαΊ­p",
"buy": 0,
"sell": 0,
"neutral": 0,
"buy_weight": 0.0,
"sell_weight": 0.0,
"neutral_weight": 0.0,
"data": [],
},
"performance": {
"signal": "Trung lαΊ­p",
"buy": 0,
"sell": 0,
"neutral": 0,
"buy_weight": 0.0,
"sell_weight": 0.0,
"neutral_weight": 0.0,
"available": 0,
"missing": len(PERFORMANCE_LOOKBACKS),
"lookbacks": list(PERFORMANCE_LOOKBACKS),
"data": [],
},
"moving_averages": {
"signal": "Trung lαΊ­p",
"buy": 0,
"sell": 0,
"neutral": 0,
"buy_weight": 0.0,
"sell_weight": 0.0,
"neutral_weight": 0.0,
"golden_cross": False,
"death_cross": False,
"data": [],
},
"summary": {"signal": "Trung lαΊ­p", "buy": 0, "sell": 0, "neutral": 0},
}
closes = np.array([float(d["close"]) for d in data], dtype=float)
highs = np.array([float(d["high"]) for d in data], dtype=float)
lows = np.array([float(d["low"]) for d in data], dtype=float)
last_close = closes[-1]
model_reference_price = float(forecast_reference_price or last_close)
def _lv(arr):
return _latest_finite_value(arr)
def _fmt(value: Optional[float]) -> str:
return _format_analysis_value(value)
ema50_arr = _ema(closes, 50)
ema50_current = _lv(ema50_arr)
ema200_current = _lv(indicators["ema"].get("ema200"))
ema50_prev = (
float(ema50_arr[-6])
if len(ema50_arr) >= 6 and not math.isnan(float(ema50_arr[-6]))
else (ema50_current or last_close)
)
regime, regime_label = _classify_regime(indicators, last_close, _safe(indicators.get("atr", {}).get("pct"), 1.0))
# ── 1. Oscillators ──
osc_data = []
atr_scale = max(float(indicators.get("atr", {}).get("value") or last_close * 0.01), max(last_close * 0.0015, 1e-6))
def _add_osc(label, val, action_name, key=None, **kw):
if isinstance(val, (np.ndarray, pd.Series, list)): v = _lv(val)
else: v = float(val) if val is not None else None
osc_key = key or action_name
osc_kwargs = {
"last_close": last_close,
"ema50": ema50_current or last_close,
"ema200": ema200_current or last_close,
"ema50_prev": ema50_prev,
"atr": atr_scale,
**kw,
}
act = _osc_action(action_name, v if v is not None else 0, **osc_kwargs)
osc_data.append({
"key": osc_key,
"name": label,
"value": v,
"display_value": _fmt(v),
"action": act,
})
stoch_price_k, stoch_price_d = _stoch_kd(highs, lows, closes, 14, 3, 3)
stoch_rsi_k, stoch_rsi_d = _stoch_rsi(closes, 14, 14, 3, 3)
demarker14 = _demarker(highs, lows, 14)
_add_osc("Chỉ sα»‘ Sα»©c mαΊ‘nh tΖ°Ζ‘ng Δ‘α»‘i (14)", _rsi(closes, 14), "rsi", key="rsi")
_add_osc("Stochastic %K (14, 3, 3)", stoch_price_k, "stoch", key="stoch")
_add_osc("Chỉ sα»‘ KΓͺnh hΓ ng hΓ³a (20)", _cci(highs, lows, closes, 20), "cci", key="cci")
adx_vals = _adx(highs, lows, closes, ADX_GAUGE_PERIOD)
_add_osc(
f"Chỉ sα»‘ Định hΖ°α»›ng Trung bΓ¬nh ({ADX_GAUGE_PERIOD})",
adx_vals[0],
"adx",
key="adx",
plus_di=_lv(adx_vals[1]) or 0,
minus_di=_lv(adx_vals[2]) or 0,
)
ao_vals = _awesome_oscillator(highs, lows)
_add_osc("Chỉ sα»‘ Dao Δ‘α»™ng AO", ao_vals, "ao", key="ao", epsilon=atr_scale * 0.02)
_add_osc("Xung lượng (10)", _momentum(closes, 10), "momentum", key="momentum", scale=atr_scale * 1.15)
macd_vals = _macd(closes, 12, 26, 9)
_add_osc("CαΊ₯p Δ‘α»™ MACD (12, 26)", macd_vals[0], "macd", key="macd", epsilon=atr_scale * 0.02)
_add_osc("Đường RSI Nhanh (3, 3, 14, 14)", stoch_rsi_k, "stoch_rsi", key="stoch_rsi")
_add_osc("Vùng Phần trăm Williams (14)", _williams_r(highs, lows, closes, 14), "williams", key="williams")
bull_power_vals, bear_power_vals = _bull_bear_power(highs, lows, closes, 13)
bull_power = _lv(bull_power_vals)
bear_power = _lv(bear_power_vals)
osc_data.append(
{
"key": "bbp",
"name": "Sα»©c MαΊ‘nh GiΓ‘ LΓͺn vΓ  GiΓ‘ Xuα»‘ng",
"value": {"bull": bull_power, "bear": bear_power},
"display_value": f"{_fmt(bull_power)} / {_fmt(bear_power)}",
"action": _osc_action(
"bbp",
bull_power if bull_power is not None else 0.0,
atr=atr_scale,
bull_power=bull_power,
bear_power=bear_power,
epsilon=atr_scale * 0.02,
),
}
)
_add_osc("Dao Δ‘α»™ng Ultimate (7, 14, 28)", _ultimate_oscillator(highs, lows, closes, 7, 14, 28), "ultimate", key="ultimate")
_add_osc("Tα»‘c Δ‘α»™ biαΊΏn Δ‘α»™ng ROC (12)", _roc(closes, 12), "roc", key="roc")
_add_osc("TRIX (18)", _trix(closes, 18), "trix", key="trix")
_add_osc("PPO (12, 26)", _ppo(closes, 12, 26), "ppo", key="ppo")
_add_osc("CMO (14)", _cmo(closes, 14), "cmo", key="cmo")
_add_osc("DPO (20)", _dpo(closes, 20), "dpo", key="dpo", scale=atr_scale * 0.75)
_add_osc("Aroon Oscillator (25)", _aroon_oscillator(highs, lows, 25), "aroon", key="aroon")
_add_osc("TSI (25, 13)", _tsi(closes, 25, 13), "tsi", key="tsi")
_add_osc("DeMarker (14)", demarker14, "demarker", key="demarker")
osc_score = _calc_osc_score(osc_data, interval)
# ── 2. Moving Averages ──
ma_data = []
ema_periods = sorted({period for pair in EMA_PAIR_SEQUENCE for period in pair})
ema_map: Dict[int, np.ndarray] = {
period: closes.copy() if period == 1 else _ema(closes, period)
for period in ema_periods
}
def _add_ema_pair_row(fast_period: int, slow_period: int) -> None:
fast_arr = ema_map[fast_period]
slow_arr = ema_map[slow_period]
fast_value = _lv(fast_arr)
slow_value = _lv(slow_arr)
pair_key = f"ema_{fast_period}_{slow_period}"
act = _ema_pair_action(fast_value, slow_value, pair_key)
ma_data.append(
{
"key": pair_key,
"name": f"EMA{fast_period} / EMA{slow_period}",
"display_value": f"{_fmt(fast_value)} / {_fmt(slow_value)}",
"value": f"{fast_value if fast_value is not None else 'β€”'} / {slow_value if slow_value is not None else 'β€”'}",
"action": act,
"fast": fast_value,
"slow": slow_value,
}
)
for fast_period, slow_period in EMA_PAIR_SEQUENCE:
_add_ema_pair_row(fast_period, slow_period)
ichimoku_base = _latest_finite_value(_ichimoku_base(highs, lows, 26))
ma_data.append(
{
"key": "ichimoku_base_26",
"name": "GiΓ‘ / Ichimoku Base (26)",
"display_value": f"{_fmt(last_close)} / {_fmt(ichimoku_base)}",
"value": f"{last_close if last_close is not None else 'β€”'} / {ichimoku_base if ichimoku_base is not None else 'β€”'}",
"action": _price_vs_level_action(last_close, ichimoku_base, neutral_band_pct=0.08),
"fast": last_close,
"slow": ichimoku_base,
}
)
ma_score = _calc_ma_score(ma_data, closes, interval)
performance_data = _build_performance_rows(data)
performance_score = _calc_performance_score(performance_data, interval)
technical_score = _calc_technical_score_v2(
osc_score,
ma_score,
performance_score,
interval,
adx_value=_lv(adx_vals[0]),
)
support_resistance = _calc_price_action_levels(data, last_close, interval)
if not include_ai:
presentation = _build_analysis_presentation(
technical_score,
None,
None,
message="Can chay du bao AI de tong hop",
)
return {
"style": "tradingview",
"regime": {"key": regime, "label": regime_label},
"dashboard": {
"technical": {
"gauge": technical_score["gauge"],
"normalized_score": technical_score["normalized_score"],
"signal": technical_score["signal"],
"buy": technical_score["buy"],
"sell": technical_score["sell"],
"neutral": technical_score["neutral"],
"buy_weight": technical_score.get("buy_weight", 0.0),
"sell_weight": technical_score.get("sell_weight", 0.0),
"neutral_weight": technical_score.get("neutral_weight", 0.0),
},
"ai": None,
"summary": None,
},
"summary": None,
"presentation": presentation,
"technicals": technical_score,
"oscillators": {
"signal": osc_score["signal"],
"buy": osc_score["buy"],
"sell": osc_score["sell"],
"neutral": osc_score["neutral"],
"buy_weight": osc_score.get("buy_weight", 0.0),
"sell_weight": osc_score.get("sell_weight", 0.0),
"neutral_weight": osc_score.get("neutral_weight", 0.0),
"data": osc_data
},
"performance": {
"signal": performance_score["signal"],
"buy": performance_score["buy"],
"sell": performance_score["sell"],
"neutral": performance_score["neutral"],
"buy_weight": performance_score.get("buy_weight", 0.0),
"sell_weight": performance_score.get("sell_weight", 0.0),
"neutral_weight": performance_score.get("neutral_weight", 0.0),
"available": performance_score.get("available", 0),
"missing": performance_score.get("missing", 0),
"lookbacks": performance_score.get("lookbacks", list(PERFORMANCE_LOOKBACKS)),
"data": performance_data,
},
"moving_averages": {
"signal": ma_score["signal"],
"buy": ma_score["buy"],
"sell": ma_score["sell"],
"neutral": ma_score["neutral"],
"buy_weight": ma_score.get("buy_weight", 0.0),
"sell_weight": ma_score.get("sell_weight", 0.0),
"neutral_weight": ma_score.get("neutral_weight", 0.0),
"golden_cross": ma_score["golden_cross"],
"death_cross": ma_score["death_cross"],
"data": ma_data
},
"ai_gauge": None,
"support_resistance": support_resistance,
"pivot_points": support_resistance,
}
# ── 3. AI Forecast Gauge ──
if not blended:
# Fallback to simple blended if no forecast available
blended = {
"p50": [last_close * (1 + (confidence-50)/1000)],
"p10": [last_close * 0.98], "p90": [last_close * 1.02],
"agreement": True, "scale": 1.0
}
ai_score = _calc_ai_forecast_score(
blended,
forecast_rows,
last_close,
indicators,
len(forecast_rows) or 10,
interval,
model_reference_price=model_reference_price,
)
# ── 4. Summary ──
summary = _calc_summary_score_v2(technical_score, ai_score, interval)
dashboard = _build_dashboard_payload(last_close, forecast_rows, technical_score, ai_score, summary)
presentation = _build_analysis_presentation(
technical_score,
ai_score,
summary,
active_model_keys=active_model_keys or ["combined"],
failed_model_keys=failed_model_keys,
requested_models=requested_models,
)
return {
"style": "tradingview",
"regime": {"key": regime, "label": regime_label},
"dashboard": dashboard,
"summary": summary,
"presentation": presentation,
"technicals": technical_score,
"oscillators": {
"signal": osc_score["signal"],
"buy": osc_score["buy"],
"sell": osc_score["sell"],
"neutral": osc_score["neutral"],
"buy_weight": osc_score.get("buy_weight", 0.0),
"sell_weight": osc_score.get("sell_weight", 0.0),
"neutral_weight": osc_score.get("neutral_weight", 0.0),
"data": osc_data
},
"performance": {
"signal": performance_score["signal"],
"buy": performance_score["buy"],
"sell": performance_score["sell"],
"neutral": performance_score["neutral"],
"buy_weight": performance_score.get("buy_weight", 0.0),
"sell_weight": performance_score.get("sell_weight", 0.0),
"neutral_weight": performance_score.get("neutral_weight", 0.0),
"available": performance_score.get("available", 0),
"missing": performance_score.get("missing", 0),
"lookbacks": performance_score.get("lookbacks", list(PERFORMANCE_LOOKBACKS)),
"data": performance_data,
},
"moving_averages": {
"signal": ma_score["signal"],
"buy": ma_score["buy"],
"sell": ma_score["sell"],
"neutral": ma_score["neutral"],
"buy_weight": ma_score.get("buy_weight", 0.0),
"sell_weight": ma_score.get("sell_weight", 0.0),
"neutral_weight": ma_score.get("neutral_weight", 0.0),
"golden_cross": ma_score["golden_cross"],
"death_cross": ma_score["death_cross"],
"data": ma_data
},
"ai_gauge": ai_score,
"support_resistance": support_resistance,
"pivot_points": support_resistance,
}