from __future__ import annotations import math import re from typing import Any, Dict, List, Optional, Tuple import numpy as np import pandas as pd AI_SCORING_VERSION = "2026-05-05-path-direction-v2" TECHNICAL_SCORING_VERSION = "2026-05-18-performance-votes-v12" ANALYSIS_PRESENTATION_VERSION = "2026-05-16-gauge-view-v2" EMA_PAIR_SEQUENCE: List[Tuple[int, int]] = [ (1, 2), (2, 5), (5, 10), (10, 20), (20, 50), (50, 100), (100, 200), ] EMA_PAIR_THRESHOLD_PCT: Dict[str, float] = { "ema_1_2": 0.05, "ema_2_5": 0.08, "ema_5_10": 0.10, "ema_10_20": 0.14, "ema_20_50": 0.22, "ema_50_100": 0.35, "ema_100_200": 0.55, } ADX_GAUGE_PERIOD = 10 PERFORMANCE_LOOKBACKS: Tuple[int, ...] = (1, 2, 3, 5, 7, 30, 90, 180, 365) PERFORMANCE_VOTE_WEIGHTS: Dict[int, float] = { 2: 2.0, } PERFORMANCE_NEUTRAL_EPSILON_PCT = 1e-9 __all__ = [ "AI_SCORING_VERSION", "ANALYSIS_PRESENTATION_VERSION", "TECHNICAL_SCORING_VERSION", "_apply_forecast_amplitude_calibration", "_build_analysis_presentation", "_apply_forecast_path_texture", "_build_dashboard_payload", "_build_raw_ohlc4_bundle", "_build_regime_texture_template", "_build_trade_analysis", "_calc_ai_forecast_score", "_calc_ma_score", "_calc_performance_score", "_calc_price_action_levels", "_calc_pivot_points", "_calc_summary_score_v2", "_calc_technical_score_v2", "_calc_vote_gauge", "_clamp", "_derive_target_amplitude_profile", "_forecast_path_metrics", "_gauge_to_signal", "_momentum", "_osc_action", "_pct", "_score_forecast_context_candidate", "compute_indicators", ] def _ema(arr: np.ndarray, period: int) -> np.ndarray: """Vectorized EMA using NumPy (replaces loops).""" if len(arr) == 0: return np.array([], dtype=float) alpha = 2.0 / (period + 1.0) # Use pandas ewm for robust vectorized calculation (v6.0) return pd.Series(arr).ewm(alpha=alpha, adjust=False).mean().values def _rsi(close: np.ndarray, period: int = 14) -> np.ndarray: """Vectorized RSI using NumPy/Pandas.""" delta = np.diff(close) gain = np.where(delta > 0, delta, 0.0) loss = np.where(delta < 0, -delta, 0.0) avg_gain = pd.Series(gain).ewm(alpha=1.0/period, adjust=False).mean() avg_loss = pd.Series(loss).ewm(alpha=1.0/period, adjust=False).mean() rs = avg_gain / avg_loss.replace(0, np.nan) rsi = 100 - (100 / (1 + rs)) rsi = rsi.where(avg_loss > 0, 100.0) rsi = rsi.where(avg_gain > 0, 0.0) rsi = rsi.where(~((avg_gain == 0) & (avg_loss == 0)), 50.0) # Prepend NaN to match original array length return np.concatenate([[np.nan], rsi.values]) def _bollinger(close: np.ndarray, period=20, k=2.0) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Vectorized Bollinger Bands.""" s = pd.Series(close) mid = s.rolling(window=period).mean() std = s.rolling(window=period).std(ddof=0) return (mid + k*std).values, mid.values, (mid - k*std).values def _macd(close: np.ndarray, fast=12, slow=26, signal=9 ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: ema_fast = _ema(close, fast) ema_slow = _ema(close, slow) macd_line = ema_fast - ema_slow sig_line = _ema(np.where(np.isnan(macd_line), 0, macd_line), signal) histogram = macd_line - sig_line return macd_line, sig_line, histogram def _atr(high: np.ndarray, low: np.ndarray, close: np.ndarray, period=14) -> np.ndarray: """Vectorized ATR.""" tr = np.maximum(high[1:] - low[1:], np.maximum(np.abs(high[1:] - close[:-1]), np.abs(low[1:] - close[:-1]))) tr = np.concatenate([[np.nan], tr]) return pd.Series(tr).ewm(alpha=1.0/period, adjust=False).mean().values def _stoch_rsi(close: np.ndarray, rsi_period=14, stoch_period=14, smooth_k=3, smooth_d=3) -> Tuple[np.ndarray, np.ndarray]: """Vectorized Stochastic RSI.""" rsi_vals = pd.Series(_rsi(close, rsi_period)) roll_min = rsi_vals.rolling(window=stoch_period).min() roll_max = rsi_vals.rolling(window=stoch_period).max() k = 100 * (rsi_vals - roll_min) / (roll_max - roll_min).replace(0, np.inf) k_smooth = k.rolling(window=smooth_k).mean() d_smooth = k_smooth.rolling(window=smooth_d).mean() return k_smooth.values, d_smooth.values def _stoch_kd( high: np.ndarray, low: np.ndarray, close: np.ndarray, k_period: int = 14, smooth_k: int = 3, d_period: int = 3, ) -> Tuple[np.ndarray, np.ndarray]: """Standard stochastic oscillator computed from price high/low/close.""" high_s = pd.Series(high) low_s = pd.Series(low) close_s = pd.Series(close) hh = high_s.rolling(window=k_period).max() ll = low_s.rolling(window=k_period).min() raw_k = 100.0 * (close_s - ll) / (hh - ll).replace(0, np.nan) k = raw_k.rolling(window=smooth_k).mean() d = k.rolling(window=d_period).mean() return k.values, d.values def _sma(arr: np.ndarray, period: int) -> np.ndarray: """Vectorized Simple Moving Average.""" if len(arr) == 0: return np.array([], dtype=float) return pd.Series(arr).rolling(window=period).mean().values def _cci(high: np.ndarray, low: np.ndarray, close: np.ndarray, period: int = 20) -> np.ndarray: """Vectorized Commodity Channel Index.""" tp = (high + low + close) / 3.0 tp_series = pd.Series(tp) sma = tp_series.rolling(window=period).mean() # Optimized MAD calculation (Vectorized) mad = tp_series.rolling(window=period).apply(lambda x: np.abs(x - x.mean()).mean(), raw=True) return ((tp_series - sma) / (0.015 * mad.replace(0, np.inf))).values def _adx(high: np.ndarray, low: np.ndarray, close: np.ndarray, period: int = 14) -> tuple: """Vectorized Average Directional Index (v6.0).""" plus_dm = np.where((high[1:] - high[:-1] > low[:-1] - low[1:]) & (high[1:] - high[:-1] > 0), high[1:] - high[:-1], 0.0) minus_dm = np.where((low[:-1] - low[1:] > high[1:] - high[:-1]) & (low[:-1] - low[1:] > 0), low[:-1] - low[1:], 0.0) tr = np.maximum(high[1:] - low[1:], np.maximum(np.abs(high[1:] - close[:-1]), np.abs(low[1:] - close[:-1]))) # Pad first index plus_dm = np.concatenate([[0.0], plus_dm]) minus_dm = np.concatenate([[0.0], minus_dm]) tr = np.concatenate([[0.0], tr]) tr_sum = pd.Series(tr).ewm(alpha=1.0/period, adjust=False).mean() plus_di = 100 * pd.Series(plus_dm).ewm(alpha=1.0/period, adjust=False).mean() / tr_sum.replace(0, np.inf) minus_di = 100 * pd.Series(minus_dm).ewm(alpha=1.0/period, adjust=False).mean() / tr_sum.replace(0, np.inf) dx = 100 * np.abs(plus_di - minus_di) / (plus_di + minus_di).replace(0, np.inf) adx = dx.ewm(alpha=1.0/period, adjust=False).mean() return adx.values, plus_di.values, minus_di.values def _awesome_oscillator(high: np.ndarray, low: np.ndarray) -> np.ndarray: """Awesome Oscillator = SMA(5, median) - SMA(34, median).""" median = (high + low) / 2.0 sma5 = _sma(median, 5) sma34 = _sma(median, 34) return sma5 - sma34 def _momentum(close: np.ndarray, period: int = 10) -> np.ndarray: """Classic momentum oscillator centered at 0.""" base = pd.Series(close).shift(period) return (pd.Series(close) - base).values def _roc(close: np.ndarray, period: int = 12) -> np.ndarray: """Rate of Change in percentage.""" base = pd.Series(close).shift(period) return (pd.Series(close) / base.replace(0, np.nan) - 1.0).mul(100.0).values def _trix(close: np.ndarray, period: int = 18) -> np.ndarray: """Triple EMA oscillator in percentage.""" ema1 = pd.Series(_ema(close, period)) ema2 = ema1.ewm(span=period, adjust=False).mean() ema3 = ema2.ewm(span=period, adjust=False).mean() return ema3.pct_change().mul(100.0).values def _ppo(close: np.ndarray, fast: int = 12, slow: int = 26) -> np.ndarray: """Percentage Price Oscillator.""" ema_fast = _ema(close, fast) ema_slow = _ema(close, slow) return ((ema_fast - ema_slow) / np.where(np.abs(ema_slow) < 1e-8, np.nan, ema_slow) * 100.0) def _cmo(close: np.ndarray, period: int = 14) -> np.ndarray: """Chande Momentum Oscillator.""" delta = pd.Series(close).diff() up = delta.clip(lower=0.0).rolling(period).sum() down = (-delta.clip(upper=0.0)).rolling(period).sum() return ((up - down) / (up + down).replace(0, np.nan) * 100.0).values def _dpo(close: np.ndarray, period: int = 20) -> np.ndarray: """Detrended Price Oscillator.""" offset = int(period / 2) + 1 sma = pd.Series(close).rolling(period).mean() return (pd.Series(close) - sma.shift(offset)).values def _aroon_oscillator(high: np.ndarray, low: np.ndarray, period: int = 25) -> np.ndarray: """Aroon Oscillator = Aroon Up - Aroon Down.""" hs = pd.Series(high) ls = pd.Series(low) aroon_up = hs.rolling(period).apply(lambda x: ((period - 1 - (len(x) - 1 - int(np.argmax(x)))) / (period - 1)) * 100.0, raw=True) aroon_down = ls.rolling(period).apply(lambda x: ((period - 1 - (len(x) - 1 - int(np.argmin(x)))) / (period - 1)) * 100.0, raw=True) return (aroon_up - aroon_down).values def _tsi(close: np.ndarray, long_period: int = 25, short_period: int = 13) -> np.ndarray: """True Strength Index.""" delta = pd.Series(close).diff() abs_delta = delta.abs() ema1 = delta.ewm(span=long_period, adjust=False).mean() ema2 = ema1.ewm(span=short_period, adjust=False).mean() abs_ema1 = abs_delta.ewm(span=long_period, adjust=False).mean() abs_ema2 = abs_ema1.ewm(span=short_period, adjust=False).mean() return (ema2 / abs_ema2.replace(0, np.nan) * 100.0).values def _demarker(high: np.ndarray, low: np.ndarray, period: int = 14) -> np.ndarray: """DeMarker oscillator in range 0..100.""" high_delta = pd.Series(high).diff() low_delta = -pd.Series(low).diff() demax = high_delta.clip(lower=0.0) demin = low_delta.clip(lower=0.0) demax_avg = demax.rolling(period).sum() demin_avg = demin.rolling(period).sum() denom = (demax_avg + demin_avg).replace(0, np.nan) return (demax_avg / denom * 100.0).values def _williams_r(high: np.ndarray, low: np.ndarray, close: np.ndarray, period: int = 14) -> np.ndarray: """Vectorized Williams %R.""" s = pd.Series(close) hh = pd.Series(high).rolling(window=period).max() ll = pd.Series(low).rolling(window=period).min() wr = -100 * (hh - s) / (hh - ll).replace(0, np.inf) return wr.values def _bull_bear_power(high: np.ndarray, low: np.ndarray, close: np.ndarray, period: int = 13) -> Tuple[np.ndarray, np.ndarray]: """Return Bull Power and Bear Power separately for sign-based scoring.""" ema_val = _ema(close, period) return (high - ema_val), (low - ema_val) def _ultimate_oscillator(high: np.ndarray, low: np.ndarray, close: np.ndarray, p1: int = 7, p2: int = 14, p3: int = 28) -> np.ndarray: """Vectorized Ultimate Oscillator (v6.0).""" n = len(close) if n < p3 + 1: return np.full(n, np.nan) prev_close = pd.Series(close).shift(1) tl = np.minimum(low, prev_close.values) th = np.maximum(high, prev_close.values) bp = pd.Series(close - tl) tr = pd.Series(th - tl) def _avg(p): return bp.rolling(p).sum() / tr.rolling(p).sum().replace(0, np.inf) avg1 = _avg(p1) avg2 = _avg(p2) avg3 = _avg(p3) uo = 100 * (4 * avg1 + 2 * avg2 + avg3) / 7.0 return uo.values def _ichimoku_base(high: np.ndarray, low: np.ndarray, period: int = 26) -> np.ndarray: """Vectorized Ichimoku Base Line.""" hh = pd.Series(high).rolling(window=period).max() ll = pd.Series(low).rolling(window=period).min() return ((hh + ll) / 2.0).values def _ichimoku_cloud(high: np.ndarray, low: np.ndarray) -> Dict[str, np.ndarray]: """Basic Ichimoku lines for cloud-position scoring.""" hs = pd.Series(high) ls = pd.Series(low) tenkan = ((hs.rolling(9).max() + ls.rolling(9).min()) / 2.0).values kijun = ((hs.rolling(26).max() + ls.rolling(26).min()) / 2.0).values span_a = ((pd.Series(tenkan) + pd.Series(kijun)) / 2.0).values span_b = ((hs.rolling(52).max() + ls.rolling(52).min()) / 2.0).values return { "tenkan": tenkan, "kijun": kijun, "span_a": span_a, "span_b": span_b, } def _vwma(close: np.ndarray, volume: np.ndarray, period: int = 20) -> np.ndarray: """Vectorized Volume Weighted Moving Average.""" cv = pd.Series(close * volume) v = pd.Series(volume) return (cv.rolling(period).sum() / v.rolling(period).sum().replace(0, np.inf)).values def _wma(arr: np.ndarray, period: int) -> np.ndarray: """Weighted moving average used by Hull MA.""" if len(arr) == 0: return np.array([], dtype=float) weights = np.arange(1, period + 1, dtype=float) return pd.Series(arr).rolling(window=period).apply( lambda x: float(np.dot(x, weights) / weights.sum()), raw=True, ).values def _hull_ma(close: np.ndarray, period: int = 9) -> np.ndarray: """Hull Moving Average.""" half = max(period // 2, 1) sqrt_p = max(int(math.sqrt(period)), 1) wma_half = _wma(close, half) wma_full = _wma(close, period) diff = 2 * wma_half - wma_full hull = _wma(np.where(np.isnan(diff), close, diff), sqrt_p) return hull def _price_action_config(interval: str) -> Tuple[int, int]: config_map = { "1m": (2, 120), "5m": (2, 140), "15m": (3, 180), "1h": (4, 260), "4h": (4, 320), "1d": (5, 360), "1w": (5, 260), } return config_map.get(interval, (4, 240)) def _cluster_price_action_candidates( candidates: List[Tuple[float, float, int]], tolerance: float, ) -> List[Dict[str, Any]]: clusters: List[Dict[str, Any]] = [] for price, weight, candle_index in sorted(candidates, key=lambda item: item[0]): cluster = next( (item for item in clusters if abs(item["price"] - price) <= tolerance), None, ) if cluster is None: clusters.append( { "price": float(price), "weight": float(weight), "touches": 1, "last_index": int(candle_index), } ) continue total_weight = float(cluster["weight"]) + float(weight) cluster["price"] = ( (float(cluster["price"]) * float(cluster["weight"])) + (float(price) * float(weight)) ) / max(total_weight, 1e-8) cluster["weight"] = total_weight cluster["touches"] = int(cluster["touches"]) + 1 cluster["last_index"] = max(int(cluster["last_index"]), int(candle_index)) return clusters def _dedupe_sorted_levels( levels: List[Dict[str, Any]], *, tolerance: float, ) -> List[Dict[str, Any]]: deduped: List[Dict[str, Any]] = [] for level in levels: if deduped and abs(float(deduped[-1]["price"]) - float(level["price"])) <= tolerance: continue deduped.append(level) return deduped def _calc_price_action_levels( data: List[Dict[str, Any]], current_price: float, interval: str, ) -> Dict[str, Any]: if not data: rounded_price = round(float(current_price), 6) return { "method": "price_action", "timeframe": interval, "current_price": rounded_price, "summary": "Khong du du lieu de xac dinh cac moc khang cu va ho tro theo Price Action.", "data": [ {"level": "KC2", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)}, {"level": "KC1", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)}, {"level": "Giá hiện tại", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)}, {"level": "HT1", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)}, {"level": "HT2", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)}, ], } highs = np.array([float(row["high"]) for row in data], dtype=float) lows = np.array([float(row["low"]) for row in data], dtype=float) closes = np.array([float(row["close"]) for row in data], dtype=float) pivot_window, lookback = _price_action_config(interval) recent_start = max(0, len(closes) - min(len(closes), lookback)) recent_highs = highs[recent_start:] recent_lows = lows[recent_start:] recent_closes = closes[recent_start:] atr_value = _latest_finite_value(_atr(recent_highs, recent_lows, recent_closes, period=14)) median_range = float(np.nanmedian(recent_highs - recent_lows)) if len(recent_highs) else 0.0 cluster_tolerance = max( float(atr_value or 0.0) * 0.35, median_range * 0.22, abs(float(current_price)) * 0.0015, 1e-6, ) extension_step = max( float(atr_value or 0.0), median_range, abs(float(current_price)) * 0.003, cluster_tolerance * 1.5, ) resistance_candidates: List[Tuple[float, float, int]] = [] support_candidates: List[Tuple[float, float, int]] = [] recent_count = len(recent_closes) for idx in range(pivot_window, max(pivot_window, recent_count - pivot_window)): high_slice = recent_highs[idx - pivot_window : idx + pivot_window + 1] low_slice = recent_lows[idx - pivot_window : idx + pivot_window + 1] high_price = float(recent_highs[idx]) low_price = float(recent_lows[idx]) recency_weight = 1.0 + (idx / max(recent_count, 1)) if high_price >= float(np.max(high_slice)): resistance_candidates.append((high_price, recency_weight, recent_start + idx)) if low_price <= float(np.min(low_slice)): support_candidates.append((low_price, recency_weight, recent_start + idx)) fallback_spans = [10, 20, 50, 100, min(recent_count, lookback)] for span in fallback_spans: if span <= 0 or recent_count < span: continue resistance_candidates.append((float(np.max(recent_highs[-span:])), 0.75, len(data) - 1)) support_candidates.append((float(np.min(recent_lows[-span:])), 0.75, len(data) - 1)) resistance_clusters = _cluster_price_action_candidates(resistance_candidates, cluster_tolerance) support_clusters = _cluster_price_action_candidates(support_candidates, cluster_tolerance) resistance_levels = _dedupe_sorted_levels( sorted( [ cluster for cluster in resistance_clusters if float(cluster["price"]) > float(current_price) + (cluster_tolerance * 0.12) ], key=lambda item: float(item["price"]), ), tolerance=cluster_tolerance * 0.55, ) support_levels = _dedupe_sorted_levels( sorted( [ cluster for cluster in support_clusters if float(cluster["price"]) < float(current_price) - (cluster_tolerance * 0.12) ], key=lambda item: float(item["price"]), reverse=True, ), tolerance=cluster_tolerance * 0.55, ) while len(resistance_levels) < 2: base_price = float(resistance_levels[-1]["price"]) if resistance_levels else float(current_price) resistance_levels.append( { "price": base_price + extension_step, "weight": 0.0, "touches": 0, "last_index": len(data) - 1, } ) while len(support_levels) < 2: base_price = float(support_levels[-1]["price"]) if support_levels else float(current_price) support_levels.append( { "price": base_price - extension_step, "weight": 0.0, "touches": 0, "last_index": len(data) - 1, } ) kc1 = float(resistance_levels[0]["price"]) kc2 = float(resistance_levels[1]["price"]) ht1 = float(support_levels[0]["price"]) ht2 = float(support_levels[1]["price"]) if current_price >= kc1: summary = ( "Gia hien tai dang ap sat hoac vuot len tren KC1. " "Neu duy tri duoc dong luc, KC2 la moc can theo doi tiep theo." ) elif current_price <= ht1: summary = ( "Gia hien tai dang ap sat hoac nam duoi HT1. " "Neu ap luc ban tiep tuc, HT2 la vung ho tro can quan sat them." ) else: summary = ( "Gia hien tai dang nam giua HT1 va KC1. " "Day la vung can theo doi phan ung gia truoc khi xac nhan huong di tiep theo." ) def _row(level: str, price: float) -> Dict[str, Any]: rounded_price = round(float(price), 6) distance_pct = 0.0 if current_price == 0 else ((rounded_price - float(current_price)) / float(current_price)) * 100.0 return { "level": level, "price": rounded_price, "display_value": _format_analysis_value(rounded_price), "distance_pct": round(distance_pct, 2), } rows = [ _row("KC2", kc2), _row("KC1", kc1), _row("Giá hiện tại", float(current_price)), _row("HT1", ht1), _row("HT2", ht2), ] return { "method": "price_action", "timeframe": interval, "current_price": round(float(current_price), 6), "summary": summary, "data": rows, } def _calc_pivot_points( data: List[Dict[str, Any]], current_price: float, interval: str, ) -> Dict[str, Any]: """Compatibility wrapper for old callers. New logic uses Price Action only.""" return _calc_price_action_levels(data, current_price, interval) # ───────────────────────────────────────────────────────────────────────────── # TradingView-style action classification helpers # ───────────────────────────────────────────────────────────────────────────── def _latest_finite_value(values: Any) -> Optional[float]: """Return the latest finite float from a scalar or array-like input without rounding.""" if values is None: return None if isinstance(values, (float, int)): value = float(values) return None if math.isnan(value) else value if isinstance(values, pd.Series): values = values.values if not hasattr(values, "__len__") or len(values) == 0: return None value = float(values[-1]) return None if math.isnan(value) else value def _format_analysis_value(value: Optional[float]) -> str: """Format a technical value for UI without losing useful precision.""" if value is None or math.isnan(float(value)): return "—" abs_value = abs(float(value)) if abs_value >= 100: decimals = 2 elif abs_value >= 10: decimals = 3 elif abs_value >= 1: decimals = 4 elif abs_value >= 0.1: decimals = 6 elif abs_value >= 0.01: decimals = 7 else: decimals = 8 return f"{float(value):.{decimals}f}".rstrip("0").rstrip(".") def _osc_action(name: str, value: float, **kw) -> str: """Classify oscillator value as 'Mua' / 'Bán' / 'Trung lập'.""" if value is None or math.isnan(value): return "Trung lập" atr = max(float(kw.get("atr", 0.0) or 0.0), 1e-8) def _bounded_action( current: float, min_value: float, max_value: float, *, bullish_high: bool = True, ) -> str: span = max(max_value - min_value, 1e-8) ratio = _clamp((current - min_value) / span, 0.0, 1.0) if not bullish_high: ratio = 1.0 - ratio if ratio > (2.0 / 3.0): return "Mua" if ratio < (1.0 / 3.0): return "Bán" return "Trung lập" def _zero_line_action(current: float, epsilon: float) -> str: band = max(float(epsilon), 1e-8) if current > band: return "Mua" if current < -band: return "Bán" return "Trung lập" if name == "rsi": return _bounded_action(value, 0.0, 100.0) if name in {"stoch", "stoch_rsi", "ultimate", "demarker"}: return _bounded_action(value, 0.0, 100.0) if name == "williams": return _bounded_action(value, -100.0, 0.0, bullish_high=True) if name in {"aroon", "cmo", "tsi"}: return _bounded_action(value, -100.0, 100.0) if name == "cci": return "Mua" if value > 50 else "Bán" if value < -50 else "Trung lập" if name == "momentum": return _zero_line_action(value, kw.get("epsilon", atr * 0.025)) if name == "roc": return _zero_line_action(value, kw.get("epsilon", 0.01)) if name == "trix": return _zero_line_action(value, kw.get("epsilon", 0.01)) if name == "ppo": return _zero_line_action(value, kw.get("epsilon", 0.01)) if name == "dpo": return _zero_line_action(value, kw.get("epsilon", atr * 0.02)) if name == "adx": plus_di = kw.get("plus_di", 0) minus_di = kw.get("minus_di", 0) if value < 20: return "Trung lập" return "Mua" if plus_di > minus_di else "Bán" if name == "ao": return _zero_line_action(value, kw.get("epsilon", atr * 0.02)) if name == "macd": return _zero_line_action(value, kw.get("epsilon", atr * 0.02)) if name == "bbp": bull_power = kw.get("bull_power") bear_power = kw.get("bear_power") epsilon = max(float(kw.get("epsilon", atr * 0.02)), 1e-8) if bull_power is None or bear_power is None: return "Trung lập" if bull_power > epsilon and bear_power > epsilon: return "Mua" if bull_power < -epsilon and bear_power < -epsilon: return "Bán" return "Trung lập" return "Trung lập" def _ema_pair_action(fast_val: Optional[float], slow_val: Optional[float], pair_key: str = "") -> str: """EMA chain classification using raw spread instead of rounded display values.""" if fast_val is None or slow_val is None: return "Trung lập" if math.isnan(fast_val) or math.isnan(slow_val): return "Trung lập" spread_pct = _pct(fast_val, slow_val) neutral_band_pct = max(_ema_pair_threshold_pct(pair_key) * 0.08, 0.0005) if spread_pct > neutral_band_pct: return "Mua" if spread_pct < -neutral_band_pct: return "Bán" return "Trung lập" def _price_vs_level_action( price: Optional[float], level: Optional[float], *, neutral_band_pct: float = 0.08, ) -> str: """Simple trend classification from price relative to a pure-price reference line.""" if price is None or level is None: return "Trung lập" if math.isnan(price) or math.isnan(level): return "Trung lập" spread_pct = _pct(price, level) band = max(float(neutral_band_pct), 0.0005) if spread_pct > band: return "Mua" if spread_pct < -band: return "Bán" return "Trung lập" def _ema_pair_threshold_pct(pair_key: str) -> float: return EMA_PAIR_THRESHOLD_PCT.get(pair_key, 0.18) def compute_indicators(data: List[Dict[str, Any]]) -> Dict[str, Any]: """Compute a full suite of technical indicators on OHLCV data.""" if len(data) < 30: return {"error": "Insufficient data (need ≥30 candles)"} closes = np.array([d["close"] for d in data], dtype=float) highs = np.array([d["high"] for d in data], dtype=float) lows = np.array([d["low"] for d in data], dtype=float) vols = np.array([d["volume"]for d in data], dtype=float) times = [d["time"] for d in data] def _last(arr): v = arr[-1] return None if math.isnan(v) else round(float(v), 8) def _series(arr, n: int = 50): out = [] window = arr[-min(n, len(arr)):] offset = len(arr) - len(window) for i, v in enumerate(window): idx = offset + i out.append({"time": times[idx], "value": None if math.isnan(v) else round(float(v), 8)}) return out ema9 = _ema(closes, 9) ema21 = _ema(closes, 21) ema50 = _ema(closes, 50) ema200 = _ema(closes, 200) rsi14 = _rsi(closes, 14) macd_l, macd_s, macd_h = _macd(closes) bb_u, bb_m, bb_l = _bollinger(closes) atr14 = _atr(highs, lows, closes, 14) stoch_k, stoch_d = _stoch_rsi(closes) roc12 = _roc(closes, 12) trix18 = _trix(closes, 18) ppo12 = _ppo(closes, 12, 26) cmo14 = _cmo(closes, 14) dpo20 = _dpo(closes, 20) aroon25 = _aroon_oscillator(highs, lows, 25) tsi25 = _tsi(closes, 25, 13) # Volume SMA 20 (Vectorized v6.0) vol_sma = _sma(vols, 20) last_close = closes[-1] last_atr = _last(atr14) or 0 return { "ema": { "ema9": _last(ema9), "ema21": _last(ema21), "ema50": _last(ema50), "ema200": _last(ema200), }, "rsi": { "value": _last(rsi14), "signal": ( "overbought" if (_last(rsi14) or 50) > 70 else "oversold" if (_last(rsi14) or 50) < 30 else "neutral" ), }, "macd": { "macd": _last(macd_l), "signal": _last(macd_s), "histogram": _last(macd_h), "cross": ( "bullish" if (_last(macd_h) or 0) > 0 else "bearish" if (_last(macd_h) or 0) < 0 else "neutral" ), }, "bollinger": { "upper": _last(bb_u), "middle": _last(bb_m), "lower": _last(bb_l), "bandwidth": round(((_last(bb_u) or 0) - (_last(bb_l) or 0)) / ((_last(bb_m) or 1)), 6), }, "atr": { "value": _last(atr14), "pct": round(last_atr / last_close * 100, 4) if last_close else None, }, "stoch_rsi": { "k": _last(stoch_k), "d": _last(stoch_d), "signal": ( "overbought" if (_last(stoch_k) or 50) > 80 else "oversold" if (_last(stoch_k) or 50) < 20 else "neutral" ), }, "roc": { "value": _last(roc12), "signal": "bullish" if (_last(roc12) or 0) > 1.0 else "bearish" if (_last(roc12) or 0) < -1.0 else "neutral", }, "trix": { "value": _last(trix18), "signal": "bullish" if (_last(trix18) or 0) > 0 else "bearish" if (_last(trix18) or 0) < 0 else "neutral", }, "ppo": { "value": _last(ppo12), "signal": "bullish" if (_last(ppo12) or 0) > 0.35 else "bearish" if (_last(ppo12) or 0) < -0.35 else "neutral", }, "cmo": { "value": _last(cmo14), "signal": "bullish" if (_last(cmo14) or 0) > 20 else "bearish" if (_last(cmo14) or 0) < -20 else "neutral", }, "dpo": { "value": _last(dpo20), "signal": "bullish" if (_last(dpo20) or 0) > 0 else "bearish" if (_last(dpo20) or 0) < 0 else "neutral", }, "aroon": { "value": _last(aroon25), "signal": "bullish" if (_last(aroon25) or 0) > 25 else "bearish" if (_last(aroon25) or 0) < -25 else "neutral", }, "tsi": { "value": _last(tsi25), "signal": "bullish" if (_last(tsi25) or 0) > 5 else "bearish" if (_last(tsi25) or 0) < -5 else "neutral", }, "volume": { "last": round(float(vols[-1]), 2), "sma20": round(float(vol_sma[-1]), 2), "above_avg": bool(vols[-1] > vol_sma[-1]), }, "trend": { "ema_bullish_stack": bool( all(x is not None for x in [_last(ema9),_last(ema21),_last(ema50)]) and _last(ema9) > _last(ema21) > _last(ema50) # type: ignore ), "above_ema200": bool(_last(ema200) is not None and last_close > (_last(ema200) or 0)), "close": round(float(last_close), 8), "short_momentum_ref": float(closes[max(0, len(closes)-6)]) if len(closes) else float(last_close), }, "short_momentum_ref": float(closes[max(0, len(closes)-6)]) if len(closes) else float(last_close), "series": { "ema9": _series(ema9), "ema21": _series(ema21), "ema50": _series(ema50), "bb_upper": _series(bb_u), "bb_mid": _series(bb_m), "bb_lower": _series(bb_l), }, } # ───────────────────────────────────────────────────────────────────────────── # UTILITIES # ───────────────────────────────────────────────────────────────────────────── def _clamp(v: float, lo: float, hi: float) -> float: return max(lo, min(hi, v)) def _pct(a: float, b: float) -> float: """(a - b) / b * 100, an toàn với b = 0.""" return (a - b) / b * 100.0 if b != 0 else 0.0 def _safe(v: Optional[float], default: float = 0.0) -> float: if v is None or math.isnan(v) or math.isinf(v): return default return float(v) def _classify_regime(indicators: Dict[str, Any], last_close: float, atr_pct: float) -> Tuple[str, str]: """Phân loại trạng thái thị trường dựa trên biến động và xu hướng.""" rsi = _safe(indicators["rsi"].get("value"), 50.0) ema9 = _safe(indicators["ema"].get("ema9"), last_close) ema50 = _safe(indicators["ema"].get("ema50"), last_close) ema200 = _safe(indicators["ema"].get("ema200"), last_close) # Logic 8 trạng thái if atr_pct > 3.5: if rsi > 60: return "volatile_bull", "Tăng trưởng biến động cao" if rsi < 40: return "volatile_bear", "Sụt giảm biến động cao" return "high_chaos", "Thị trường hỗn loạn / Vol cao" if last_close > ema200 and ema9 > ema50: if rsi > 70: return "overextended_bull", "Tăng trưởng quá mức (Overbought)" return "stable_bull", "Xu hướng tăng ổn định" if last_close < ema200 and ema9 < ema50: if rsi < 30: return "capitulation", "Hoảng loạn / Quá bán (Oversold)" return "stable_bear", "Xu hướng giảm ổn định" if abs(_pct(ema9, ema50)) < 0.5: return "tight_range", "Tích lũy biên độ hẹp (Squeeze)" return "sideways", "Thị trường đi ngang (Sideway)" OSC_WEIGHTS = { "rsi": 2.1, "macd": 2.4, "stoch_rsi": 1.1, "stoch": 1.2, "cci": 1.7, "adx": 1.8, "williams": 1.5, "ultimate": 1.1, "bbp": 1.0, "ao": 0.9, "momentum": 1.6, "roc": 1.3, "trix": 1.4, "ppo": 1.4, "cmo": 1.2, "dpo": 0.8, "aroon": 1.3, "tsi": 1.4, "demarker": 1.2, } MA_WEIGHT_MAP = { "ema_1_2": 0.8, "ema_2_5": 1.0, "ema_5_10": 1.1, "ema_10_20": 1.35, "ema_20_50": 1.8, "ema_50_100": 2.2, "ema_100_200": 2.7, "ichimoku_base_26": 1.9, } def _extract_osc_key(label: str) -> str: l = label.lower() if "sức mạnh tương đối" in l or ("rsi" in l and "nhanh" not in l): return "rsi" if "macd" in l: return "macd" if "stochastic %k" in l: return "stoch" if "nhanh" in l or "stoch_rsi" in l: return "stoch_rsi" if "kênh hàng hóa" in l or "cci" in l: return "cci" if "định hướng" in l or "adx" in l: return "adx" if "williams" in l: return "williams" if "ultimate" in l: return "ultimate" if "bbp" in l or "sức mạnh giá" in l: return "bbp" if "ao" in l: return "ao" if "xung lượng" in l or "momentum" in l: return "momentum" if "roc" in l: return "roc" if "trix" in l: return "trix" if "ppo" in l: return "ppo" if "cmo" in l: return "cmo" if "dpo" in l: return "dpo" if "aroon" in l: return "aroon" if "tsi" in l: return "tsi" if "demarker" in l: return "demarker" return "unknown" def _get_ma_weight(label: str) -> float: l = label.lower() if l in MA_WEIGHT_MAP: return MA_WEIGHT_MAP[l] pair = re.findall(r"\d+", l) if len(pair) >= 2: return MA_WEIGHT_MAP.get(f"ema_{pair[0]}_{pair[1]}", 1.0) return 1.0 def _gauge_to_signal(gauge: float, interval: str = "1h") -> str: """Strict 3-level signal converter.""" if gauge >= 66.66: return "Mua" if gauge > 33.33: return "Trung lập" return "Bán" def _gauge_to_normalized_score(gauge: float) -> float: """Convert a 0..100 gauge into a -1..1 frontend-friendly scale.""" return _clamp((float(gauge) - 50.0) / 50.0, -1.0, 1.0) def _forecast_path_metrics(p50_path: np.ndarray, last_close: float) -> Dict[str, float]: """Measure forecast quality from the full path, not only the final endpoint.""" if len(p50_path) == 0 or abs(last_close) <= 1e-8: return { "weighted_return_pct": 0.0, "final_return_pct": 0.0, "path_consistency": 50.0, "monotonicity": 50.0, "max_adverse_excursion_pct": 0.0, "mean_step_return_pct": 0.0, } p50_safe = np.array(p50_path, dtype=float) if not np.isfinite(p50_safe).all(): last_valid = float(last_close) for i in range(len(p50_safe)): if not np.isfinite(p50_safe[i]): p50_safe[i] = last_valid else: last_valid = p50_safe[i] ret_path = ((p50_safe / last_close) - 1.0) * 100.0 step_weights = np.linspace(1.0, 0.65, len(ret_path)) weighted_ret_pct = float(np.average(ret_path, weights=step_weights)) final_ret_pct = float(ret_path[-1]) final_sign = 0 if abs(final_ret_pct) < 0.05 else (1 if final_ret_pct > 0 else -1) if len(ret_path) > 1 and final_sign != 0: signed_steps = [ 1.0 if np.sign(curr - prev) == final_sign else 0.0 for prev, curr in zip(ret_path[:-1], ret_path[1:]) if abs(curr - prev) >= 0.02 ] path_consistency = float(sum(signed_steps) / len(signed_steps)) if signed_steps else 0.5 else: path_consistency = 0.5 if len(ret_path) > 1: monotonicity = float(np.mean(np.diff(ret_path) >= 0)) if final_sign >= 0 else float(np.mean(np.diff(ret_path) <= 0)) else: monotonicity = 0.5 if final_sign > 0: adverse = abs(float(np.min(ret_path))) elif final_sign < 0: adverse = abs(float(np.max(ret_path))) else: adverse = max(abs(float(np.min(ret_path))), abs(float(np.max(ret_path)))) mean_step_return_pct = float(np.mean(np.diff(ret_path))) if len(ret_path) > 1 else final_ret_pct return { "weighted_return_pct": round(weighted_ret_pct, 2), "final_return_pct": round(final_ret_pct, 2), "path_consistency": round(path_consistency * 100.0, 1), "monotonicity": round(monotonicity * 100.0, 1), "max_adverse_excursion_pct": round(adverse, 2), "mean_step_return_pct": round(mean_step_return_pct, 3), } def _forecast_band_profile( p10_path: np.ndarray, p50_path: np.ndarray, p90_path: np.ndarray, ) -> Dict[str, float]: """Summarize uncertainty bands across the full forecast path.""" if len(p50_path) == 0: return { "avg_band_pct": 0.0, "end_band_pct": 0.0, "band_stability_pct": 0.0, "band_step_change_pct": 0.0, } safe_mid = np.maximum(np.abs(p50_path), 1e-8) band_pct = np.maximum(((p90_path - p10_path) / safe_mid) * 100.0, 0.0) band_step_changes = np.abs(np.diff(band_pct)) if len(band_pct) > 1 else np.array([0.0], dtype=float) return { "avg_band_pct": round(float(np.mean(band_pct)), 4), "end_band_pct": round(float(band_pct[-1]), 4), "band_stability_pct": round(float(np.std(band_pct)), 4), "band_step_change_pct": round(float(np.mean(band_step_changes)), 4), } def _forecast_path_responsiveness(p50_path: np.ndarray, last_close: float) -> Dict[str, float]: """Quantify how much the forecast path actually moves relative to the anchor.""" if len(p50_path) == 0 or abs(last_close) <= 1e-8: return { "range_pct": 0.0, "step_abs_mean_pct": 0.0, "step_abs_max_pct": 0.0, } ret_path = ((p50_path / last_close) - 1.0) * 100.0 diffs = np.diff(ret_path) if len(ret_path) > 1 else np.array([ret_path[-1]], dtype=float) return { "range_pct": round(float(np.max(ret_path) - np.min(ret_path)), 4), "step_abs_mean_pct": round(float(np.mean(np.abs(diffs))), 4), "step_abs_max_pct": round(float(np.max(np.abs(diffs))), 4), } def _recent_ohlc4_abs_step_pct(ohlc4: np.ndarray, window: int = 20) -> float: """Use recent OHLC4 movement as a volatility-aware floor for forecast responsiveness.""" if len(ohlc4) < 2: return 0.0 base = np.maximum(np.abs(ohlc4[:-1]), 1e-8) returns = np.diff(ohlc4) / base * 100.0 recent = returns[-min(window, len(returns)) :] return round(float(np.mean(np.abs(recent))), 4) if len(recent) else 0.0 def _future_window_path_stats( ohlc4: np.ndarray, anchor_index: int, horizon: int, ) -> Optional[Dict[str, float]]: """Measure the realized future path that followed a historical anchor point.""" if anchor_index < 0 or horizon <= 0: return None if anchor_index + horizon >= len(ohlc4): return None baseline = float(ohlc4[anchor_index]) future_path = np.asarray(ohlc4[anchor_index + 1 : anchor_index + 1 + horizon], dtype=float) responsiveness = _forecast_path_responsiveness(future_path, baseline) final_return_pct = ( ((future_path[-1] / max(abs(baseline), 1e-8)) - 1.0) * 100.0 if len(future_path) else 0.0 ) return { "range_pct": float(responsiveness["range_pct"]), "step_abs_mean_pct": float(responsiveness["step_abs_mean_pct"]), "step_abs_max_pct": float(responsiveness["step_abs_max_pct"]), "final_return_pct": float(final_return_pct), } def _rolling_future_path_targets( ohlc4: np.ndarray, horizon: int, max_windows: int = 48, ) -> Optional[Dict[str, float]]: """Summarize realized future-path amplitudes from recent historical anchors.""" if len(ohlc4) < horizon + 2: return None last_anchor = len(ohlc4) - horizon - 1 start_anchor = max(0, last_anchor - max_windows + 1) stats: List[Dict[str, float]] = [] for anchor_index in range(start_anchor, last_anchor + 1): path_stats = _future_window_path_stats(ohlc4, anchor_index, horizon) if path_stats is not None: stats.append(path_stats) if not stats: return None weights = np.linspace(0.6, 1.0, len(stats), dtype=float) range_vals = np.array([item["range_pct"] for item in stats], dtype=float) step_vals = np.array([item["step_abs_mean_pct"] for item in stats], dtype=float) return { "window_count": len(stats), "range_pct": round(float(np.average(range_vals, weights=weights)), 4), "step_abs_mean_pct": round(float(np.average(step_vals, weights=weights)), 4), } def _context_regime_signature(ohlc4_window: np.ndarray) -> Dict[str, float]: """Describe the local market regime of an OHLC4 window using scale-free features.""" if len(ohlc4_window) < 2: return { "net_change_pct": 0.0, "vol_pct": 0.0, "abs_step_mean_pct": 0.0, "range_pct": 0.0, } base = np.maximum(np.abs(ohlc4_window[:-1]), 1e-8) returns = np.diff(ohlc4_window) / base * 100.0 first = max(abs(float(ohlc4_window[0])), 1e-8) return { "net_change_pct": float(((ohlc4_window[-1] / first) - 1.0) * 100.0), "vol_pct": float(np.std(returns)) if len(returns) > 1 else 0.0, "abs_step_mean_pct": float(np.mean(np.abs(returns))) if len(returns) else 0.0, "range_pct": float(((np.max(ohlc4_window) - np.min(ohlc4_window)) / first) * 100.0), } def _regime_signature_distance(current: Dict[str, float], candidate: Dict[str, float]) -> float: """Weighted distance between two market-regime signatures.""" terms = ( ("net_change_pct", 0.34, 0.40), ("vol_pct", 0.28, 0.18), ("abs_step_mean_pct", 0.22, 0.12), ("range_pct", 0.16, 0.30), ) distance = 0.0 for key, weight, floor in terms: lhs = float(current.get(key, 0.0)) rhs = float(candidate.get(key, 0.0)) scale = max(abs(lhs), abs(rhs), floor) distance += weight * (abs(lhs - rhs) / scale) return float(distance) def _regime_matched_future_targets( ohlc4: np.ndarray, context_len: int, horizon: int, search_limit: int = 720, top_k: int = 18, ) -> Optional[Dict[str, float]]: """Find historical windows with a similar regime and measure their realized future amplitudes.""" max_anchor = len(ohlc4) - horizon - 1 if max_anchor < 32: return None signature_len = max(32, min(int(context_len), 96, max_anchor + 1)) if len(ohlc4) < signature_len + horizon + 1: return None current_signature = _context_regime_signature( np.asarray(ohlc4[-signature_len:], dtype=float) ) min_anchor = signature_len - 1 start_anchor = max(min_anchor, max_anchor - search_limit + 1) matches: List[Tuple[float, Dict[str, float]]] = [] for anchor_index in range(start_anchor, max_anchor + 1): context_window = np.asarray( ohlc4[anchor_index - signature_len + 1 : anchor_index + 1], dtype=float, ) candidate_signature = _context_regime_signature(context_window) path_stats = _future_window_path_stats(ohlc4, anchor_index, horizon) if path_stats is None: continue distance = _regime_signature_distance(current_signature, candidate_signature) matches.append((distance, path_stats)) if not matches: return None matches.sort(key=lambda item: item[0]) chosen = matches[: min(top_k, len(matches))] weights = np.array([1.0 / (item[0] + 0.08) for item in chosen], dtype=float) range_vals = np.array([item[1]["range_pct"] for item in chosen], dtype=float) step_vals = np.array([item[1]["step_abs_mean_pct"] for item in chosen], dtype=float) return { "match_count": len(chosen), "signature_len": signature_len, "range_pct": round(float(np.average(range_vals, weights=weights)), 4), "step_abs_mean_pct": round(float(np.average(step_vals, weights=weights)), 4), } def _build_regime_texture_template( ohlc4: np.ndarray, context_len: int, horizon: int, search_limit: int = 720, top_k: int = 18, ) -> Optional[Dict[str, Any]]: """Build a step-return template from regime-matched historical futures.""" max_anchor = len(ohlc4) - horizon - 1 if max_anchor < 32: return None signature_len = max(32, min(int(context_len), 96, max_anchor + 1)) if len(ohlc4) < signature_len + horizon + 1: return None current_signature = _context_regime_signature( np.asarray(ohlc4[-signature_len:], dtype=float) ) min_anchor = signature_len - 1 start_anchor = max(min_anchor, max_anchor - search_limit + 1) matches: List[Tuple[float, np.ndarray, Dict[str, float]]] = [] for anchor_index in range(start_anchor, max_anchor + 1): context_window = np.asarray( ohlc4[anchor_index - signature_len + 1 : anchor_index + 1], dtype=float, ) candidate_signature = _context_regime_signature(context_window) distance = _regime_signature_distance(current_signature, candidate_signature) baseline = float(ohlc4[anchor_index]) future_path = np.asarray( ohlc4[anchor_index + 1 : anchor_index + 1 + horizon], dtype=float, ) if len(future_path) != horizon: continue prev_values = np.concatenate(([baseline], future_path[:-1])) step_returns = ((future_path / np.maximum(np.abs(prev_values), 1e-8)) - 1.0) * 100.0 path_stats = _forecast_path_responsiveness(future_path, baseline) matches.append((distance, step_returns, path_stats)) if not matches: return None matches.sort(key=lambda item: item[0]) chosen = matches[: min(top_k, len(matches))] weights = np.array([1.0 / (distance + 0.08) for distance, _, _ in chosen], dtype=float) step_matrix = np.vstack([step_returns for _, step_returns, _ in chosen]) averaged_steps = np.average(step_matrix, axis=0, weights=weights) best_distance = float("inf") best_steps: Optional[np.ndarray] = None best_score = float("-inf") for distance, step_returns, path_stats in chosen[: min(6, len(chosen))]: step_abs_mean_pct = float(np.mean(np.abs(step_returns))) if len(step_returns) else 0.0 range_pct = float(path_stats.get("range_pct", 0.0)) texture_score = ( step_abs_mean_pct * 0.60 + range_pct * 0.40 - (distance * 0.12) ) if texture_score > best_score: best_score = texture_score best_distance = distance best_steps = step_returns if best_steps is None: best_steps = averaged_steps best_distance = float(chosen[0][0]) blended_steps = (best_steps * 0.72) + (averaged_steps * 0.28) step_abs_mean_pct = float(np.mean(np.abs(blended_steps))) if len(blended_steps) else 0.0 simulated_prices = [baseline] price = baseline for step_return in blended_steps: price = max(0.0, price * (1.0 + (float(step_return) / 100.0))) simulated_prices.append(price) simulated_path = np.asarray(simulated_prices[1:], dtype=float) range_pct = _forecast_path_responsiveness(simulated_path, baseline)["range_pct"] if len(simulated_path) else 0.0 return { "step_returns_pct": blended_steps, "step_abs_mean_pct": round(step_abs_mean_pct, 4), "range_pct": round(float(range_pct), 4), "match_count": len(chosen), "signature_len": signature_len, "best_match_distance": round(best_distance, 4), } def _derive_target_amplitude_profile( ohlc4: np.ndarray, context_len: int, horizon: int, ) -> Dict[str, Any]: """Blend recent and regime-matched history into a target amplitude profile.""" recent_abs_step_pct = _recent_ohlc4_abs_step_pct(ohlc4, window=20) rolling_targets = _rolling_future_path_targets( ohlc4, horizon=horizon, max_windows=max(24, min(72, horizon * 6)), ) regime_targets = _regime_matched_future_targets( ohlc4, context_len=context_len, horizon=horizon, ) step_terms: List[Tuple[float, float]] = [] if recent_abs_step_pct > 0: step_terms.append((recent_abs_step_pct, 0.24)) if rolling_targets is not None: step_terms.append((float(rolling_targets["step_abs_mean_pct"]), 0.31)) if regime_targets is not None: step_terms.append((float(regime_targets["step_abs_mean_pct"]), 0.45)) if step_terms: target_step_abs_mean_pct = sum(value * weight for value, weight in step_terms) / sum( weight for _, weight in step_terms ) else: target_step_abs_mean_pct = 0.0 range_terms: List[Tuple[float, float]] = [] if rolling_targets is not None: range_terms.append((float(rolling_targets["range_pct"]), 0.42)) if regime_targets is not None: range_terms.append((float(regime_targets["range_pct"]), 0.58)) if not range_terms and target_step_abs_mean_pct > 0: fallback_range = target_step_abs_mean_pct * max(2.0, min(math.sqrt(max(horizon, 1)) * 1.6, 4.5)) range_terms.append((fallback_range, 1.0)) target_range_pct = sum(value * weight for value, weight in range_terms) / sum( weight for _, weight in range_terms ) if range_terms else 0.0 return { "target_step_abs_mean_pct": round(float(target_step_abs_mean_pct), 4), "target_range_pct": round(float(target_range_pct), 4), "recent_abs_step_pct": round(float(recent_abs_step_pct), 4), "rolling_targets": rolling_targets, "regime_targets": regime_targets, } def _build_raw_ohlc4_bundle( model_output: Dict[str, Any], last_ohlc4: float, ) -> Dict[str, Any]: """Build the OHLC4-path bundle directly from raw TimesFM output.""" raw_p10 = np.array(model_output["p10"], dtype=float) raw_p50 = np.array(model_output["p50"], dtype=float) raw_p90 = np.array(model_output["p90"], dtype=float) path_metrics = _forecast_path_metrics(raw_p50, last_ohlc4) avg_band_pct = float( np.mean((raw_p90 - raw_p10) / np.maximum(np.abs(raw_p50), 1e-8)) * 100.0 ) if len(raw_p50) else 0.0 band_certainty = math.exp(-avg_band_pct / 4.0) path_consistency = path_metrics["path_consistency"] / 100.0 monotonicity = path_metrics["monotonicity"] / 100.0 move_pct = abs(path_metrics["final_return_pct"]) confidence = ( 28.0 + band_certainty * 34.0 + path_consistency * 18.0 + monotonicity * 12.0 + min(move_pct, 4.0) * 2.0 ) confidence = max(20.0, min(95.0, confidence)) final_sign = 0 if abs(path_metrics["final_return_pct"]) < 0.05 else (1 if path_metrics["final_return_pct"] > 0 else -1) weighted_sign = 0 if abs(path_metrics["weighted_return_pct"]) < 0.05 else (1 if path_metrics["weighted_return_pct"] > 0 else -1) agreement = final_sign == 0 or weighted_sign == 0 or final_sign == weighted_sign return { "p10": raw_p10, "p50": raw_p50, "p90": raw_p90, "model_weight": 1.0, "anchor_weight": 0.0, "agreement": agreement, "scale": 1.0, "confidence": round(confidence, 2), "model_bias_pct": 0.0, "path_metrics": path_metrics, "mode": "raw_timesfm_ohlc4", } def _apply_forecast_amplitude_calibration( raw_bundle: Dict[str, Any], last_ohlc4: float, target_profile: Dict[str, Any], ) -> Tuple[Dict[str, Any], Dict[str, Any]]: """Scale raw TimesFM OHLC4 paths so their step/range amplitude stays realistic.""" raw_p10 = np.asarray(raw_bundle.get("p10", []), dtype=float) raw_p50 = np.asarray(raw_bundle.get("p50", []), dtype=float) raw_p90 = np.asarray(raw_bundle.get("p90", []), dtype=float) raw_responsiveness = _forecast_path_responsiveness(raw_p50, last_ohlc4) raw_step = max(float(raw_responsiveness["step_abs_mean_pct"]), 1e-6) raw_range = max(float(raw_responsiveness["range_pct"]), 1e-6) target_step = max(float(target_profile.get("target_step_abs_mean_pct") or 0.0), 0.0) target_range = max(float(target_profile.get("target_range_pct") or 0.0), 0.0) ratios: List[Tuple[float, float]] = [] if target_step > 0: ratios.append((target_step / raw_step, 0.62)) if target_range > 0: ratios.append((target_range / raw_range, 0.38)) if ratios: log_scale = sum(weight * math.log(max(ratio, 1e-6)) for ratio, weight in ratios) / sum( weight for _, weight in ratios ) scale = math.exp(log_scale) else: scale = 1.0 scale = float(np.clip(scale, 0.82, 2.35)) if raw_step >= target_step * 0.92 and raw_range >= target_range * 0.88: scale = float(np.clip(scale, 0.9, 1.25)) calibrated_p10 = np.maximum(0.0, last_ohlc4 + ((raw_p10 - last_ohlc4) * scale)) calibrated_p50 = np.maximum(0.0, last_ohlc4 + ((raw_p50 - last_ohlc4) * scale)) calibrated_p90 = np.maximum(0.0, last_ohlc4 + ((raw_p90 - last_ohlc4) * scale)) calibrated_bundle = _build_raw_ohlc4_bundle( { "p10": calibrated_p10, "p50": calibrated_p50, "p90": calibrated_p90, }, last_ohlc4, ) calibrated_bundle["mode"] = "timesfm_ohlc4_vol_calibrated" calibration_meta = { "enabled": True, "version": "volatility_regime_v1", "scale": round(scale, 4), "raw_path": raw_responsiveness, "calibrated_path": _forecast_path_responsiveness(calibrated_p50, last_ohlc4), "targets": target_profile, } return calibrated_bundle, calibration_meta def _apply_forecast_path_texture( base_bundle: Dict[str, Any], last_ohlc4: float, target_profile: Dict[str, Any], texture_template: Optional[Dict[str, Any]], ) -> Tuple[Dict[str, Any], Dict[str, Any]]: """Inject regime-matched step texture so each future point is updated sequentially.""" base_p10 = np.asarray(base_bundle.get("p10", []), dtype=float) base_p50 = np.asarray(base_bundle.get("p50", []), dtype=float) base_p90 = np.asarray(base_bundle.get("p90", []), dtype=float) base_path = _forecast_path_responsiveness(base_p50, last_ohlc4) texture_meta: Dict[str, Any] = { "enabled": True, "version": "historical_step_texture_v2", "applied": False, "blend_alpha": 0.0, "base_path": base_path, "textured_path": base_path, "template": { "match_count": 0, "signature_len": 0, "step_abs_mean_pct": 0.0, "range_pct": 0.0, }, } if texture_template is None: return base_bundle, texture_meta template_steps = np.asarray(texture_template.get("step_returns_pct", []), dtype=float) if len(template_steps) != len(base_p50): return base_bundle, texture_meta template_step = max(float(texture_template.get("step_abs_mean_pct") or 0.0), 1e-6) template_range = max(float(texture_template.get("range_pct") or 0.0), 1e-6) target_step = max(float(target_profile.get("target_step_abs_mean_pct") or 0.0), 0.0) target_range = max(float(target_profile.get("target_range_pct") or 0.0), 0.0) base_step = float(base_path["step_abs_mean_pct"]) base_range = float(base_path["range_pct"]) base_monotonicity = float(base_bundle.get("path_metrics", {}).get("monotonicity", 50.0)) step_gap = max((target_step * 0.94) - base_step, 0.0) range_gap = max((target_range * 0.78) - base_range, 0.0) if base_monotonicity >= 88.0: step_gap = max(step_gap, target_step * 0.16) range_gap = max(range_gap, target_range * 0.10) if step_gap <= 0.0 and range_gap <= 0.0: texture_meta["template"] = { "match_count": int(texture_template.get("match_count") or 0), "signature_len": int(texture_template.get("signature_len") or 0), "step_abs_mean_pct": round(float(texture_template.get("step_abs_mean_pct") or 0.0), 4), "range_pct": round(float(texture_template.get("range_pct") or 0.0), 4), } return base_bundle, texture_meta ratios: List[float] = [] if step_gap > 0.0: ratios.append(step_gap / template_step) if range_gap > 0.0: ratios.append(range_gap / template_range) blend_alpha = float(np.clip(np.median(ratios) if ratios else 0.0, 0.0, 0.82)) def _count_direction_flips(series: np.ndarray) -> int: if len(series) < 2: return 0 signs = [int(np.sign(delta)) for delta in series if abs(delta) >= 0.015] return sum(1 for prev, curr in zip(signs[:-1], signs[1:]) if prev != curr) template_flips = _count_direction_flips(template_steps) if base_monotonicity >= 92.0 and template_flips >= 1: blend_alpha = max(blend_alpha, 0.28) def _step_returns_from_path(path: np.ndarray, anchor_price: float) -> np.ndarray: prev_values = np.concatenate(([anchor_price], path[:-1])) return ((path / np.maximum(np.abs(prev_values), 1e-8)) - 1.0) * 100.0 def _rebuild_from_step_returns(path: np.ndarray, step_returns_pct: np.ndarray) -> np.ndarray: rebuilt: List[float] = [] prev_price = float(last_ohlc4) for raw_price, textured_step in zip(path, step_returns_pct): raw_step = ((float(raw_price) / max(abs(prev_price), 1e-8)) - 1.0) * 100.0 raw_factor = max(1e-6, 1.0 + (raw_step / 100.0)) target_factor = max(1e-6, 1.0 + (float(textured_step) / 100.0)) factor_ratio = target_factor / raw_factor next_price = max(0.0, float(raw_price) * factor_ratio) rebuilt.append(next_price) prev_price = next_price return np.asarray(rebuilt, dtype=float) base_steps_p50 = _step_returns_from_path(base_p50, last_ohlc4) textured_steps_p50 = (base_steps_p50 * (1.0 - blend_alpha)) + (template_steps * blend_alpha) if base_monotonicity >= 92.0 and template_flips >= 1: textured_flips = _count_direction_flips(textured_steps_p50) while textured_flips < 1 and blend_alpha < 0.92: blend_alpha = min(blend_alpha * 1.16, 0.92) textured_steps_p50 = (base_steps_p50 * (1.0 - blend_alpha)) + (template_steps * blend_alpha) textured_flips = _count_direction_flips(textured_steps_p50) textured_p10 = _rebuild_from_step_returns(base_p10, textured_steps_p50) textured_p50 = _rebuild_from_step_returns(base_p50, textured_steps_p50) textured_p90 = _rebuild_from_step_returns(base_p90, textured_steps_p50) textured_bundle = _build_raw_ohlc4_bundle( { "p10": textured_p10, "p50": textured_p50, "p90": textured_p90, }, last_ohlc4, ) textured_path = _forecast_path_responsiveness( np.asarray(textured_bundle.get("p50", []), dtype=float), last_ohlc4, ) if target_step > 0.0 and float(textured_path["step_abs_mean_pct"]) > target_step * 1.28: shrink = (target_step * 1.28) / max(float(textured_path["step_abs_mean_pct"]), 1e-6) blend_alpha *= shrink textured_steps_p50 = (base_steps_p50 * (1.0 - blend_alpha)) + (template_steps * blend_alpha) textured_p10 = _rebuild_from_step_returns(base_p10, textured_steps_p50) textured_p50 = _rebuild_from_step_returns(base_p50, textured_steps_p50) textured_p90 = _rebuild_from_step_returns(base_p90, textured_steps_p50) textured_bundle = _build_raw_ohlc4_bundle( { "p10": textured_p10, "p50": textured_p50, "p90": textured_p90, }, last_ohlc4, ) textured_path = _forecast_path_responsiveness( np.asarray(textured_bundle.get("p50", []), dtype=float), last_ohlc4, ) textured_bundle["mode"] = "timesfm_ohlc4_textured" texture_meta.update( { "applied": blend_alpha > 0.0, "blend_alpha": round(blend_alpha, 4), "textured_path": textured_path, "template": { "match_count": int(texture_template.get("match_count") or 0), "signature_len": int(texture_template.get("signature_len") or 0), "step_abs_mean_pct": round(float(texture_template.get("step_abs_mean_pct") or 0.0), 4), "range_pct": round(float(texture_template.get("range_pct") or 0.0), 4), }, } ) return textured_bundle, texture_meta def _score_forecast_context_candidate( analysis_bundle: Dict[str, Any], last_ohlc4: float, recent_abs_step_pct: float, ) -> Dict[str, float]: """Prefer contexts that stay confident without collapsing into a near-flat path.""" p50_path = np.asarray(analysis_bundle.get("p50", []), dtype=float) responsiveness = _forecast_path_responsiveness(p50_path, last_ohlc4) step_abs_mean_pct = float(responsiveness["step_abs_mean_pct"]) range_pct = float(responsiveness["range_pct"]) step_floor = max(0.015, recent_abs_step_pct * 0.18) range_floor = max(0.05, recent_abs_step_pct * 0.70) step_bonus = min(step_abs_mean_pct / step_floor, 1.0) * 14.0 range_bonus = min(range_pct / range_floor, 1.0) * 10.0 flat_penalty = ( 6.0 if step_abs_mean_pct < (step_floor * 0.55) and range_pct < (range_floor * 0.55) else 0.0 ) confidence = float(analysis_bundle.get("confidence", 0.0)) score = confidence + step_bonus + range_bonus - flat_penalty return { "score": round(score, 4), "confidence": round(confidence, 4), "step_floor": round(step_floor, 4), "range_floor": round(range_floor, 4), **responsiveness, } def _calc_vote_gauge(buy: int, sell: int, neutral: int) -> float: """Equal-weight PTKT vote gauge from Buy / Sell / Neutral counts.""" buy_f = float(max(0, buy)) sell_f = float(max(0, sell)) neutral_f = float(max(0, neutral)) denominator = buy_f + sell_f + neutral_f if denominator <= 0: return 50.0 raw_gauge = 50.0 + 50.0 * ((buy_f - sell_f) / denominator) return max(0.0, min(100.0, raw_gauge)) def _calc_adx_trend_activation(adx_value: Optional[float] = None) -> float: """Sigmoid ADX factor that activates PTKT votes only when trend strength is present.""" adx_numeric = float(adx_value) if adx_value is not None else 0.0 if math.isnan(adx_numeric): adx_numeric = 0.0 adx_numeric = max(0.0, adx_numeric) exponent = -((adx_numeric - 20.0) / 8.0) return float(_clamp(1.0 / (1.0 + math.exp(exponent)), 0.0, 1.0)) def _calc_vote_participation_factor(buy: int, sell: int, neutral: int) -> float: """Reward PTKT gauge only when enough signals are directional instead of neutral.""" buy_f = float(max(0, buy)) sell_f = float(max(0, sell)) neutral_f = float(max(0, neutral)) denominator = buy_f + sell_f + neutral_f if denominator <= 0: return 0.0 return float(_clamp((buy_f + sell_f) / denominator, 0.0, 1.0)) def _calc_adx_weighted_technical_gauge( buy: int, sell: int, neutral: int, adx_value: Optional[float] = None, ) -> float: """Final PTKT gauge using the sigmoid ADX-weighted vote formula.""" buy_f = float(max(0, buy)) sell_f = float(max(0, sell)) neutral_f = float(max(0, neutral)) denominator = buy_f + sell_f + neutral_f if denominator <= 0: return 50.0 trend_activation = _calc_adx_trend_activation(adx_value) participation_factor = _calc_vote_participation_factor(buy, sell, neutral) vote_balance = (buy_f - sell_f) / denominator raw_gauge = 50.0 + (50.0 * vote_balance * participation_factor * trend_activation) return float(_clamp(raw_gauge, 0.0, 100.0)) def _vote_direction_from_counts(buy: int, sell: int) -> int: if buy > sell: return 1 if sell > buy: return -1 return 0 def _candle_ohlc4(candle: Dict[str, Any]) -> Optional[float]: try: open_value = float(candle["open"]) high_value = float(candle["high"]) low_value = float(candle["low"]) close_value = float(candle["close"]) except (KeyError, TypeError, ValueError): return None values = (open_value, high_value, low_value, close_value) if not all(math.isfinite(value) for value in values): return None return sum(values) / 4.0 def _performance_action(change_pct: Optional[float], *, included_in_vote: bool) -> str: if not included_in_vote or change_pct is None: return "N/A" if change_pct > PERFORMANCE_NEUTRAL_EPSILON_PCT: return "Mua" if change_pct < -PERFORMANCE_NEUTRAL_EPSILON_PCT: return "Bán" return "Trung lập" def _get_performance_vote_weight(lookback: Any) -> float: try: return float(PERFORMANCE_VOTE_WEIGHTS.get(int(lookback), 1.0)) except (TypeError, ValueError): return 1.0 def _build_performance_rows(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: latest_ohlc4 = _candle_ohlc4(data[-1]) if data else None rows: List[Dict[str, Any]] = [] for lookback in PERFORMANCE_LOOKBACKS: reference_index = len(data) - 1 - lookback reference_ohlc4 = _candle_ohlc4(data[reference_index]) if reference_index >= 0 else None included_in_vote = ( latest_ohlc4 is not None and reference_ohlc4 is not None and abs(reference_ohlc4) > 1e-12 ) change_pct = ( ((latest_ohlc4 / reference_ohlc4) - 1.0) * 100.0 if included_in_vote and latest_ohlc4 is not None and reference_ohlc4 is not None else None ) rows.append( { "key": f"performance_{lookback}", "name": f"Hiệu suất {lookback} nến", "lookback": lookback, "current_ohlc4": latest_ohlc4, "reference_ohlc4": reference_ohlc4, "change_pct": change_pct, "display_value": ( f"{float(latest_ohlc4):.4f} vs {float(reference_ohlc4):.4f} ({float(change_pct):+.2f}%)" if included_in_vote and change_pct is not None else "N/A" ), "action": _performance_action(change_pct, included_in_vote=included_in_vote), "included_in_vote": included_in_vote, } ) return rows def _calc_osc_score(osc_data: list, interval: str = "1h") -> dict: """Summarize oscillator votes for the PTKT classification table.""" buy = sell = neutral = 0 buy_weight = sell_weight = neutral_weight = 0.0 for item in osc_data: key = item.get("key") or _extract_osc_key(item["name"]) w = OSC_WEIGHTS.get(key, 1.0) act = str(item.get("action", "")).strip().lower() if act == "mua": buy += 1 buy_weight += w elif act == "bán": sell += 1 sell_weight += w else: neutral += 1 neutral_weight += w return { "signal": _gauge_to_signal(_calc_vote_gauge(buy, sell, neutral), interval), "buy": buy, "sell": sell, "neutral": neutral, "buy_weight": round(buy_weight, 2), "sell_weight": round(sell_weight, 2), "neutral_weight": round(neutral_weight, 2), "formula_version": TECHNICAL_SCORING_VERSION, } def _calc_ma_score(ma_data: list, closes: np.ndarray, interval: str = "1h") -> dict: """Summarize moving-average votes for the PTKT classification table.""" buy = sell = neutral = 0 buy_weight = sell_weight = neutral_weight = 0.0 for item in ma_data: w = _get_ma_weight(item.get("key") or item["name"]) act = str(item.get("action", "")).strip().lower() if act == "mua": buy += 1 buy_weight += w elif act == "bán": sell += 1 sell_weight += w else: neutral += 1 neutral_weight += w ema50 = _ema(closes, 50)[-1] if len(closes) >= 50 else float("nan") ema200 = _ema(closes, 200)[-1] if len(closes) >= 200 else float("nan") return { "signal": _gauge_to_signal(_calc_vote_gauge(buy, sell, neutral), interval), "buy": buy, "sell": sell, "neutral": neutral, "buy_weight": round(buy_weight, 2), "sell_weight": round(sell_weight, 2), "neutral_weight": round(neutral_weight, 2), "structure_bonus": 0.0, "golden_cross": bool(not any(math.isnan(x) for x in (ema50, ema200)) and ema50 > ema200), "death_cross": bool(not any(math.isnan(x) for x in (ema50, ema200)) and ema50 < ema200), "formula_version": TECHNICAL_SCORING_VERSION, } def _calc_performance_score(performance_data: list, interval: str = "1h") -> dict: """Summarize OHLC4 performance votes across configured lookback candles.""" buy = sell = neutral = 0 buy_weight = sell_weight = neutral_weight = 0.0 available = missing = 0 for item in performance_data: if not bool(item.get("included_in_vote")): missing += 1 continue available += 1 vote_weight = _get_performance_vote_weight(item.get("lookback")) act = str(item.get("action", "")).strip().lower() if act == "mua": buy += int(vote_weight) buy_weight += vote_weight elif act == "bán": sell += int(vote_weight) sell_weight += vote_weight else: neutral += int(vote_weight) neutral_weight += vote_weight return { "signal": _gauge_to_signal(_calc_vote_gauge(buy, sell, neutral), interval), "buy": buy, "sell": sell, "neutral": neutral, "buy_weight": round(buy_weight, 2), "sell_weight": round(sell_weight, 2), "neutral_weight": round(neutral_weight, 2), "available": available, "missing": missing, "lookbacks": list(PERFORMANCE_LOOKBACKS), "formula_version": TECHNICAL_SCORING_VERSION, } def _calc_ai_forecast_score( blended: dict, forecast_rows: List[Dict[str, Any]], last_close: float, indicators: dict, horizon: int, interval: str, model_reference_price: Optional[float] = None, ) -> dict: """MODULE 3: Score one AI forecast from the full path and band discipline.""" atr_pct = max(float(indicators.get("atr", {}).get("pct") or 0.0), 0.1) p50_path = np.array(blended.get("p50", []), dtype=float) p10_path = np.array(blended.get("p10", []), dtype=float) p90_path = np.array(blended.get("p90", []), dtype=float) if not len(p50_path): p50_path = np.array([last_close], dtype=float) if not len(p10_path): p10_path = np.array([last_close], dtype=float) if not len(p90_path): p90_path = np.array([last_close], dtype=float) series_reference_price = float(model_reference_price or last_close) path_metrics = _forecast_path_metrics(p50_path, series_reference_price) final_ret_pct = path_metrics["final_return_pct"] weighted_ret_pct = path_metrics["weighted_return_pct"] market_return_pct = _pct(float(p50_path[-1]), last_close) if len(p50_path) else 0.0 path_consistency = _clamp(path_metrics["path_consistency"] / 100.0, 0.0, 1.0) monotonicity = _clamp(path_metrics["monotonicity"] / 100.0, 0.0, 1.0) adverse_excursion_pct = path_metrics["max_adverse_excursion_pct"] band_profile = _forecast_band_profile(p10_path, p50_path, p90_path) direction_vote_threshold = max(atr_pct * 0.04, 0.04) def _direction_vote(value_pct: float) -> int: if abs(value_pct) < direction_vote_threshold: return 0 return 1 if value_pct > 0 else -1 direction_votes = [ _direction_vote(weighted_ret_pct), _direction_vote(final_ret_pct), _direction_vote(market_return_pct), ] bullish_votes = sum(1 for vote in direction_votes if vote > 0) bearish_votes = sum(1 for vote in direction_votes if vote < 0) direction_vote_balance = ( (bullish_votes - bearish_votes) / max(1, len(direction_votes)) ) direction_bias_sign = 0.0 if direction_vote_balance > 0: direction_bias_sign = 1.0 elif direction_vote_balance < 0: direction_bias_sign = -1.0 elif weighted_ret_pct > 0: direction_bias_sign = 1.0 elif weighted_ret_pct < 0: direction_bias_sign = -1.0 avg_band_tightness = math.exp( -float(band_profile["avg_band_pct"]) / max(atr_pct * 3.0, 0.75) ) end_band_tightness = math.exp( -float(band_profile["end_band_pct"]) / max(atr_pct * 3.4, 0.85) ) band_stability_score = math.exp( -( (float(band_profile["band_stability_pct"]) / max(atr_pct * 1.8, 0.35)) + (float(band_profile["band_step_change_pct"]) / max(atr_pct * 1.2, 0.25)) ) * 0.75 ) adverse_control_score = math.exp( -adverse_excursion_pct / max(atr_pct * 2.5, 0.8) ) certainty_score = ( avg_band_tightness * 0.38 + end_band_tightness * 0.16 + band_stability_score * 0.16 + path_consistency * 0.15 + monotonicity * 0.10 + adverse_control_score * 0.05 ) certainty_score = _clamp(certainty_score, 0.08, 0.98) directional_edge_pct = (weighted_ret_pct * 0.72) + (final_ret_pct * 0.28) direction_norm = np.tanh(directional_edge_pct / max(atr_pct * 1.20, 0.28)) weighted_move_in_atr = abs(weighted_ret_pct) / max(atr_pct, 0.1) final_move_in_atr = abs(final_ret_pct) / max(atr_pct, 0.1) path_move_score = np.tanh(((weighted_move_in_atr * 0.65) + (final_move_in_atr * 0.35)) / 1.7) path_quality = 0.48 + 0.30 * path_consistency + 0.22 * monotonicity certainty_factor = 0.72 + 0.28 * certainty_score direction_consensus = _clamp( (abs(direction_vote_balance) * 0.42) + (path_consistency * 0.36) + (monotonicity * 0.22), 0.0, 1.0, ) direction_core_push = ( direction_norm * (18.0 + 16.0 * path_move_score) * (0.60 + 0.40 * path_quality) ) micro_consensus_push = 0.0 if direction_bias_sign != 0.0 and abs(direction_norm) < 0.18 and abs(direction_vote_balance) >= 0.66: remaining_room = (0.18 - abs(direction_norm)) / 0.18 micro_consensus_push = ( direction_bias_sign * (2.4 + 5.6 * direction_consensus) * remaining_room * (0.44 + 0.56 * certainty_score) ) direction_gauge = max( 8.0, min(92.0, 50.0 + direction_core_push + micro_consensus_push), ) directional_push = (direction_gauge - 50.0) * certainty_factor conviction_gauge = max(8.0, min(92.0, 50.0 + directional_push)) gauge = direction_gauge confidence_pct = _clamp((certainty_score * 0.65 + path_quality * 0.35) * 100.0, 12.0, 98.0) direction_label = "bullish" if gauge >= 58 else "bearish" if gauge <= 42 else "neutral" return { "gauge": round(gauge, 1), "direction_gauge": round(direction_gauge, 1), "conviction_gauge": round(conviction_gauge, 1), "normalized_score": round(_gauge_to_normalized_score(gauge), 4), "confidence_pct": round(confidence_pct, 1), "forecast_return_pct": round(market_return_pct, 2), "model_reference_return_pct": round(final_ret_pct, 2), "weighted_return_pct": round(weighted_ret_pct, 2), "direction": direction_label, "direction_consensus": round(direction_consensus * 100.0, 1), "direction_strength_pct": round(abs(direction_gauge - 50.0) * 2.0, 1), "magnitude_vs_atr": round(weighted_move_in_atr, 2), "band_uncertainty_pct": round(float(band_profile["avg_band_pct"]), 2), "band_uncertainty_end_pct": round(float(band_profile["end_band_pct"]), 2), "band_stability_pct": round(float(band_profile["band_stability_pct"]), 2), "certainty": round(certainty_score * 100.0, 1), "path_consistency": round(path_consistency * 100.0, 1), "monotonicity": round(monotonicity * 100.0, 1), "max_adverse_excursion_pct": round(adverse_excursion_pct, 2), "path_metrics": path_metrics, "signal": _gauge_to_signal(gauge, interval), "formula": "full_path_direction_gauge", "formula_version": AI_SCORING_VERSION, } def _calc_technical_score_v2( osc_score: dict, ma_score: dict, performance_score: dict, interval: str = "1h", adx_value: Optional[float] = None, ) -> dict: """Gauge PTKT follows the sigmoid ADX-weighted vote formula on total technical votes.""" buy = osc_score["buy"] + ma_score["buy"] + performance_score["buy"] sell = osc_score["sell"] + ma_score["sell"] + performance_score["sell"] neutral = osc_score["neutral"] + ma_score["neutral"] + performance_score["neutral"] adx_numeric = float(adx_value) if adx_value is not None else 0.0 if math.isnan(adx_numeric): adx_numeric = 0.0 adx_numeric = max(0.0, adx_numeric) trend_activation = _calc_adx_trend_activation(adx_numeric) vote_participation = _calc_vote_participation_factor(buy, sell, neutral) final_gauge = _calc_adx_weighted_technical_gauge(buy, sell, neutral, adx_numeric) osc_dir = _vote_direction_from_counts(int(osc_score["buy"]), int(osc_score["sell"])) ma_dir = _vote_direction_from_counts(int(ma_score["buy"]), int(ma_score["sell"])) performance_dir = _vote_direction_from_counts( int(performance_score["buy"]), int(performance_score["sell"]), ) active_group_directions = [ direction for direction in (osc_dir, ma_dir, performance_dir) if direction != 0 ] return { "gauge": round(final_gauge, 1), "normalized_score": round(_gauge_to_normalized_score(final_gauge), 4), "signal": _gauge_to_signal(final_gauge, interval), "buy": buy, "sell": sell, "neutral": neutral, "buy_weight": round( osc_score.get("buy_weight", 0.0) + ma_score.get("buy_weight", 0.0) + performance_score.get("buy_weight", 0.0), 2, ), "sell_weight": round( osc_score.get("sell_weight", 0.0) + ma_score.get("sell_weight", 0.0) + performance_score.get("sell_weight", 0.0), 2, ), "neutral_weight": round( osc_score.get("neutral_weight", 0.0) + ma_score.get("neutral_weight", 0.0) + performance_score.get("neutral_weight", 0.0), 2, ), "adx_period": ADX_GAUGE_PERIOD, "trend_strength_adx": round(adx_numeric, 2), "trend_activation": round(trend_activation, 4), "trend_multiplier": round(trend_activation, 4), "vote_participation": round(vote_participation, 4), "alignment": len(active_group_directions) >= 2 and len(set(active_group_directions)) == 1, "performance_available": int(performance_score.get("available", 0)), "performance_missing": int(performance_score.get("missing", 0)), "components": { "formula": "max(0, min(100, 50 + 50 * ((buy - sell) / total_votes) * ((buy + sell) / total_votes) * (1 / (1 + exp(-(adx10 - 20) / 8)))))", "formula_version": TECHNICAL_SCORING_VERSION, "categories": { "oscillators": osc_score.get("signal", "Trung lập"), "moving_averages": ma_score.get("signal", "Trung lập"), }, } } def _calc_technical_score_v2( osc_score: dict, ma_score: dict, performance_score: dict, interval: str = "1h", adx_value: Optional[float] = None, ) -> dict: """Gauge PTKT follows the sigmoid ADX-weighted vote formula on total technical votes.""" buy = osc_score["buy"] + ma_score["buy"] + performance_score["buy"] sell = osc_score["sell"] + ma_score["sell"] + performance_score["sell"] neutral = osc_score["neutral"] + ma_score["neutral"] + performance_score["neutral"] adx_numeric = float(adx_value) if adx_value is not None else 0.0 if math.isnan(adx_numeric): adx_numeric = 0.0 adx_numeric = max(0.0, adx_numeric) trend_activation = _calc_adx_trend_activation(adx_numeric) vote_participation = _calc_vote_participation_factor(buy, sell, neutral) final_gauge = _calc_adx_weighted_technical_gauge(buy, sell, neutral, adx_numeric) osc_dir = _vote_direction_from_counts(int(osc_score["buy"]), int(osc_score["sell"])) ma_dir = _vote_direction_from_counts(int(ma_score["buy"]), int(ma_score["sell"])) performance_dir = _vote_direction_from_counts( int(performance_score["buy"]), int(performance_score["sell"]), ) active_group_directions = [ direction for direction in (osc_dir, ma_dir, performance_dir) if direction != 0 ] return { "gauge": round(final_gauge, 1), "normalized_score": round(_gauge_to_normalized_score(final_gauge), 4), "signal": _gauge_to_signal(final_gauge, interval), "buy": buy, "sell": sell, "neutral": neutral, "buy_weight": round( osc_score.get("buy_weight", 0.0) + ma_score.get("buy_weight", 0.0) + performance_score.get("buy_weight", 0.0), 2, ), "sell_weight": round( osc_score.get("sell_weight", 0.0) + ma_score.get("sell_weight", 0.0) + performance_score.get("sell_weight", 0.0), 2, ), "neutral_weight": round( osc_score.get("neutral_weight", 0.0) + ma_score.get("neutral_weight", 0.0) + performance_score.get("neutral_weight", 0.0), 2, ), "adx_period": ADX_GAUGE_PERIOD, "trend_strength_adx": round(adx_numeric, 2), "trend_activation": round(trend_activation, 4), "trend_multiplier": round(trend_activation, 4), "vote_participation": round(vote_participation, 4), "alignment": len(active_group_directions) >= 2 and len(set(active_group_directions)) == 1, "performance_available": int(performance_score.get("available", 0)), "performance_missing": int(performance_score.get("missing", 0)), "components": { "formula": "max(0, min(100, 50 + 50 * ((buy - sell) / total_votes) * ((buy + sell) / total_votes) * (1 / (1 + exp(-(adx10 - 20) / 8)))))", "formula_version": TECHNICAL_SCORING_VERSION, "categories": { "oscillators": osc_score.get("signal", "Trung lập"), "moving_averages": ma_score.get("signal", "Trung lập"), "performance": performance_score.get("signal", "Trung lập"), }, } } def _calc_summary_score_v2(tech_score: dict, ai_score: dict, interval: str = "1h") -> dict: """Final decision score = arithmetic mean of technical and AI gauges.""" tech_gauge = float(tech_score["gauge"]) ai_gauge = float(ai_score["gauge"]) tech_weight = 0.50 ai_weight = 0.50 final_gauge = max(5.0, min(95.0, (ai_gauge * ai_weight) + (tech_gauge * tech_weight))) tech_delta = tech_gauge - 50.0 ai_delta = ai_gauge - 50.0 tech_dir = 0 if abs(tech_delta) < 2.0 else (1 if tech_delta > 0 else -1) ai_dir = 0 if abs(ai_delta) < 2.0 else (1 if ai_delta > 0 else -1) dist = abs(final_gauge - 50.0) conviction = "R?t m?nh" if dist >= 25 else "M?nh" if dist >= 15 else "Trung b?nh" if dist >= 8 else "Y?u" bias = "neutral" if final_gauge >= 58: bias = "bullish" elif final_gauge <= 42: bias = "bearish" return { "gauge": round(final_gauge, 1), "normalized_score": round(_gauge_to_normalized_score(final_gauge), 4), "signal": _gauge_to_signal(final_gauge, interval), "conviction": conviction, "bias": bias, "agreement": tech_dir != 0 and tech_dir == ai_dir, "buy": tech_score["buy"], "sell": tech_score["sell"], "neutral": tech_score["neutral"], "buy_weight": tech_score.get("buy_weight", 0.0), "sell_weight": tech_score.get("sell_weight", 0.0), "neutral_weight": tech_score.get("neutral_weight", 0.0), "components": { "technical": round(tech_gauge, 1), "ai_forecast": round(ai_gauge, 1), "ai_weight": round(ai_weight, 2), "technical_weight": round(tech_weight, 2), "certainty_pct": round(float(ai_score.get("certainty", 0.0)), 1), "formula": "(technical_gauge + ai_gauge) / 2", "formula_version": AI_SCORING_VERSION, } } def _build_dashboard_payload( last_close: float, forecast_rows: List[Dict[str, Any]], technical_score: Dict[str, Any], ai_score: Dict[str, Any], summary: Dict[str, Any], ) -> Dict[str, Any]: """Single source of truth for hero gauges consumed by the frontend.""" forecast_end = last_close if forecast_rows: forecast_end = float(forecast_rows[-1].get("p50") or last_close) return { "technical": { "gauge": technical_score["gauge"], "normalized_score": technical_score["normalized_score"], "signal": technical_score["signal"], "buy": technical_score["buy"], "sell": technical_score["sell"], "neutral": technical_score["neutral"], "buy_weight": technical_score.get("buy_weight", 0.0), "sell_weight": technical_score.get("sell_weight", 0.0), "neutral_weight": technical_score.get("neutral_weight", 0.0), }, "ai": { "gauge": ai_score["gauge"], "direction_gauge": ai_score.get("direction_gauge", ai_score["gauge"]), "conviction_gauge": ai_score.get("conviction_gauge", ai_score["gauge"]), "normalized_score": ai_score["normalized_score"], "signal": ai_score["signal"], "forecast_return_pct": ai_score["forecast_return_pct"], "weighted_return_pct": ai_score["weighted_return_pct"], "confidence_pct": ai_score["confidence_pct"], "certainty": ai_score["certainty"], "direction_consensus": ai_score.get("direction_consensus", 0.0), "direction_strength_pct": ai_score.get("direction_strength_pct", 0.0), "path_consistency": ai_score["path_consistency"], "monotonicity": ai_score.get("monotonicity", 50.0), "max_adverse_excursion_pct": ai_score.get("max_adverse_excursion_pct", 0.0), "current_price": round(float(last_close), 6), "forecast_price": round(float(forecast_end), 6), }, "summary": { "gauge": summary["gauge"], "normalized_score": summary["normalized_score"], "signal": summary["signal"], "conviction": summary["conviction"], "bias": summary["bias"], "agreement": summary.get("agreement", False), "buy_weight": summary.get("buy_weight", 0.0), "sell_weight": summary.get("sell_weight", 0.0), "neutral_weight": summary.get("neutral_weight", 0.0), "components": summary.get("components", {}), }, } def _normalize_presentation_card_payload( card: Optional[Dict[str, Any]], *, prefer_direction_gauge: bool = False, ) -> Dict[str, Any]: source = dict(card or {}) gauge_candidates: List[Any] = [] if prefer_direction_gauge: gauge_candidates.append(source.get("direction_gauge")) gauge_candidates.append(source.get("gauge")) gauge_value = 50.0 for candidate in gauge_candidates: try: numeric = float(candidate) except (TypeError, ValueError): continue if math.isfinite(numeric): gauge_value = numeric break source["gauge"] = round(float(gauge_value), 1) signal = source.get("signal") if not isinstance(signal, str) or not signal.strip(): source["signal"] = "--" return source def _build_ai_aggregation_message( requested_count: int, ready_count: int, failed_model_keys: List[str], complete: bool, *, ai_available: bool, summary_available: bool, ) -> str: if requested_count <= 0: if ai_available and summary_available: return "AI forecast san sang" return "Can chay du bao AI de tong hop" if failed_model_keys: failed_labels = ", ".join(str(model_key) for model_key in failed_model_keys) return f"Dang tong hop {ready_count}/{requested_count} model - loi: {failed_labels}" if not complete: return f"Dang tong hop {ready_count}/{requested_count} model AI" if requested_count == 1 and ai_available and summary_available: return "AI forecast san sang" return f"Trung binh cong {requested_count} model AI" def _build_analysis_presentation( technical_score: Optional[Dict[str, Any]], ai_score: Optional[Dict[str, Any]], summary: Optional[Dict[str, Any]], *, requested_models: Optional[Dict[str, bool]] = None, active_model_keys: Optional[List[str]] = None, failed_model_keys: Optional[List[str]] = None, pending_model_keys: Optional[List[str]] = None, message: Optional[str] = None, ) -> Dict[str, Any]: technical_card = _normalize_presentation_card_payload(technical_score) ai_card = _normalize_presentation_card_payload( ai_score, prefer_direction_gauge=True, ) summary_card = _normalize_presentation_card_payload(summary) requested_payload = ( {model_key: bool(enabled) for model_key, enabled in requested_models.items()} if isinstance(requested_models, dict) else {} ) enabled_keys = [ model_key for model_key, enabled in requested_payload.items() if enabled ] ready_keys = list(active_model_keys or []) failed_keys = list(failed_model_keys or []) if pending_model_keys is None: pending_keys = [ model_key for model_key in enabled_keys if model_key not in ready_keys and model_key not in failed_keys ] else: pending_keys = list(pending_model_keys) ai_available = isinstance(ai_score, dict) and bool(ai_score) summary_available = isinstance(summary, dict) and bool(summary) if enabled_keys: requested_count = len(enabled_keys) elif ai_available or summary_available: requested_count = max(1, len(ready_keys) + len(failed_keys) + len(pending_keys)) else: requested_count = 0 ready_count = len(ready_keys) if ready_keys else (1 if ai_available and summary_available else 0) finalized = not pending_keys complete = bool( ai_available and summary_available and not failed_keys and not pending_keys and ( requested_count == 0 or ready_count >= requested_count ) ) commit_ready = bool(finalized and ready_count > 0) ai_ready = commit_ready and ai_available summary_ready = commit_ready and summary_available combo_active = bool( ai_ready and summary_ready and ( summary_card.get("agreement") or ( abs(float(technical_card.get("gauge", 50.0)) - 50.0) > 6.0 and abs(float(ai_card.get("gauge", 50.0)) - 50.0) > 6.0 and math.copysign(1.0, float(technical_card.get("gauge", 50.0)) - 50.0) == math.copysign(1.0, float(ai_card.get("gauge", 50.0)) - 50.0) ) ) ) message_text = message or _build_ai_aggregation_message( requested_count, ready_count, failed_keys, complete, ai_available=ai_available, summary_available=summary_available, ) if ai_ready: ai_state = "ready" elif failed_keys and ready_count <= 0: ai_state = "error" elif requested_count > 0: ai_state = "loading" else: ai_state = "idle" if summary_ready: summary_state = "ready" elif failed_keys and ready_count <= 0: summary_state = "error" elif requested_count > 0: summary_state = "loading" else: summary_state = "idle" return { "version": ANALYSIS_PRESENTATION_VERSION, "technical": technical_card, "ai": ai_card, "summary": summary_card, "aggregation": { "requested": requested_payload, "enabled_keys": enabled_keys, "ready_keys": ready_keys, "failed_keys": failed_keys, "pending_keys": pending_keys, "complete": complete, "finalized": finalized, "commit_ready": commit_ready, "ready_count": ready_count, "requested_count": requested_count, }, "card_states": { "technical": "ready" if technical_score else "idle", "ai": ai_state, "summary": summary_state, }, "ai_ready": ai_ready, "summary_ready": summary_ready, "combo_active": combo_active, "message": message_text, } def _build_trade_analysis( symbol: str, interval: str, data: List[Dict[str, Any]], indicators: Dict[str, Any], forecast_rows: List[Dict[str, Any]], confidence: float, source: str, blended: Optional[Dict[str, Any]] = None, forecast_reference_price: Optional[float] = None, include_ai: bool = True, active_model_keys: Optional[List[str]] = None, failed_model_keys: Optional[List[str]] = None, requested_models: Optional[Dict[str, bool]] = None, ) -> Dict[str, Any]: """ TradingView-style technical analysis dashboard (v6.1 Rework). PTKT is one vote-only gauge across oscillator + performance + moving-average rows, while AI keeps full-path scoring. """ if not data or len(data) < 30: return { "oscillators": { "signal": "Trung lập", "buy": 0, "sell": 0, "neutral": 0, "buy_weight": 0.0, "sell_weight": 0.0, "neutral_weight": 0.0, "data": [], }, "performance": { "signal": "Trung lập", "buy": 0, "sell": 0, "neutral": 0, "buy_weight": 0.0, "sell_weight": 0.0, "neutral_weight": 0.0, "available": 0, "missing": len(PERFORMANCE_LOOKBACKS), "lookbacks": list(PERFORMANCE_LOOKBACKS), "data": [], }, "moving_averages": { "signal": "Trung lập", "buy": 0, "sell": 0, "neutral": 0, "buy_weight": 0.0, "sell_weight": 0.0, "neutral_weight": 0.0, "golden_cross": False, "death_cross": False, "data": [], }, "summary": {"signal": "Trung lập", "buy": 0, "sell": 0, "neutral": 0}, } closes = np.array([float(d["close"]) for d in data], dtype=float) highs = np.array([float(d["high"]) for d in data], dtype=float) lows = np.array([float(d["low"]) for d in data], dtype=float) last_close = closes[-1] model_reference_price = float(forecast_reference_price or last_close) def _lv(arr): return _latest_finite_value(arr) def _fmt(value: Optional[float]) -> str: return _format_analysis_value(value) ema50_arr = _ema(closes, 50) ema50_current = _lv(ema50_arr) ema200_current = _lv(indicators["ema"].get("ema200")) ema50_prev = ( float(ema50_arr[-6]) if len(ema50_arr) >= 6 and not math.isnan(float(ema50_arr[-6])) else (ema50_current or last_close) ) regime, regime_label = _classify_regime(indicators, last_close, _safe(indicators.get("atr", {}).get("pct"), 1.0)) # ── 1. Oscillators ── osc_data = [] atr_scale = max(float(indicators.get("atr", {}).get("value") or last_close * 0.01), max(last_close * 0.0015, 1e-6)) def _add_osc(label, val, action_name, key=None, **kw): if isinstance(val, (np.ndarray, pd.Series, list)): v = _lv(val) else: v = float(val) if val is not None else None osc_key = key or action_name osc_kwargs = { "last_close": last_close, "ema50": ema50_current or last_close, "ema200": ema200_current or last_close, "ema50_prev": ema50_prev, "atr": atr_scale, **kw, } act = _osc_action(action_name, v if v is not None else 0, **osc_kwargs) osc_data.append({ "key": osc_key, "name": label, "value": v, "display_value": _fmt(v), "action": act, }) stoch_price_k, stoch_price_d = _stoch_kd(highs, lows, closes, 14, 3, 3) stoch_rsi_k, stoch_rsi_d = _stoch_rsi(closes, 14, 14, 3, 3) demarker14 = _demarker(highs, lows, 14) _add_osc("Chỉ số Sức mạnh tương đối (14)", _rsi(closes, 14), "rsi", key="rsi") _add_osc("Stochastic %K (14, 3, 3)", stoch_price_k, "stoch", key="stoch") _add_osc("Chỉ số Kênh hàng hóa (20)", _cci(highs, lows, closes, 20), "cci", key="cci") adx_vals = _adx(highs, lows, closes, ADX_GAUGE_PERIOD) _add_osc( f"Chỉ số Định hướng Trung bình ({ADX_GAUGE_PERIOD})", adx_vals[0], "adx", key="adx", plus_di=_lv(adx_vals[1]) or 0, minus_di=_lv(adx_vals[2]) or 0, ) ao_vals = _awesome_oscillator(highs, lows) _add_osc("Chỉ số Dao động AO", ao_vals, "ao", key="ao", epsilon=atr_scale * 0.02) _add_osc("Xung lượng (10)", _momentum(closes, 10), "momentum", key="momentum", scale=atr_scale * 1.15) macd_vals = _macd(closes, 12, 26, 9) _add_osc("Cấp độ MACD (12, 26)", macd_vals[0], "macd", key="macd", epsilon=atr_scale * 0.02) _add_osc("Đường RSI Nhanh (3, 3, 14, 14)", stoch_rsi_k, "stoch_rsi", key="stoch_rsi") _add_osc("Vùng Phần trăm Williams (14)", _williams_r(highs, lows, closes, 14), "williams", key="williams") bull_power_vals, bear_power_vals = _bull_bear_power(highs, lows, closes, 13) bull_power = _lv(bull_power_vals) bear_power = _lv(bear_power_vals) osc_data.append( { "key": "bbp", "name": "Sức Mạnh Giá Lên và Giá Xuống", "value": {"bull": bull_power, "bear": bear_power}, "display_value": f"{_fmt(bull_power)} / {_fmt(bear_power)}", "action": _osc_action( "bbp", bull_power if bull_power is not None else 0.0, atr=atr_scale, bull_power=bull_power, bear_power=bear_power, epsilon=atr_scale * 0.02, ), } ) _add_osc("Dao động Ultimate (7, 14, 28)", _ultimate_oscillator(highs, lows, closes, 7, 14, 28), "ultimate", key="ultimate") _add_osc("Tốc độ biến động ROC (12)", _roc(closes, 12), "roc", key="roc") _add_osc("TRIX (18)", _trix(closes, 18), "trix", key="trix") _add_osc("PPO (12, 26)", _ppo(closes, 12, 26), "ppo", key="ppo") _add_osc("CMO (14)", _cmo(closes, 14), "cmo", key="cmo") _add_osc("DPO (20)", _dpo(closes, 20), "dpo", key="dpo", scale=atr_scale * 0.75) _add_osc("Aroon Oscillator (25)", _aroon_oscillator(highs, lows, 25), "aroon", key="aroon") _add_osc("TSI (25, 13)", _tsi(closes, 25, 13), "tsi", key="tsi") _add_osc("DeMarker (14)", demarker14, "demarker", key="demarker") osc_score = _calc_osc_score(osc_data, interval) # ── 2. Moving Averages ── ma_data = [] ema_periods = sorted({period for pair in EMA_PAIR_SEQUENCE for period in pair}) ema_map: Dict[int, np.ndarray] = { period: closes.copy() if period == 1 else _ema(closes, period) for period in ema_periods } def _add_ema_pair_row(fast_period: int, slow_period: int) -> None: fast_arr = ema_map[fast_period] slow_arr = ema_map[slow_period] fast_value = _lv(fast_arr) slow_value = _lv(slow_arr) pair_key = f"ema_{fast_period}_{slow_period}" act = _ema_pair_action(fast_value, slow_value, pair_key) ma_data.append( { "key": pair_key, "name": f"EMA{fast_period} / EMA{slow_period}", "display_value": f"{_fmt(fast_value)} / {_fmt(slow_value)}", "value": f"{fast_value if fast_value is not None else '—'} / {slow_value if slow_value is not None else '—'}", "action": act, "fast": fast_value, "slow": slow_value, } ) for fast_period, slow_period in EMA_PAIR_SEQUENCE: _add_ema_pair_row(fast_period, slow_period) ichimoku_base = _latest_finite_value(_ichimoku_base(highs, lows, 26)) ma_data.append( { "key": "ichimoku_base_26", "name": "Giá / Ichimoku Base (26)", "display_value": f"{_fmt(last_close)} / {_fmt(ichimoku_base)}", "value": f"{last_close if last_close is not None else '—'} / {ichimoku_base if ichimoku_base is not None else '—'}", "action": _price_vs_level_action(last_close, ichimoku_base, neutral_band_pct=0.08), "fast": last_close, "slow": ichimoku_base, } ) ma_score = _calc_ma_score(ma_data, closes, interval) performance_data = _build_performance_rows(data) performance_score = _calc_performance_score(performance_data, interval) technical_score = _calc_technical_score_v2( osc_score, ma_score, performance_score, interval, adx_value=_lv(adx_vals[0]), ) support_resistance = _calc_price_action_levels(data, last_close, interval) if not include_ai: presentation = _build_analysis_presentation( technical_score, None, None, message="Can chay du bao AI de tong hop", ) return { "style": "tradingview", "regime": {"key": regime, "label": regime_label}, "dashboard": { "technical": { "gauge": technical_score["gauge"], "normalized_score": technical_score["normalized_score"], "signal": technical_score["signal"], "buy": technical_score["buy"], "sell": technical_score["sell"], "neutral": technical_score["neutral"], "buy_weight": technical_score.get("buy_weight", 0.0), "sell_weight": technical_score.get("sell_weight", 0.0), "neutral_weight": technical_score.get("neutral_weight", 0.0), }, "ai": None, "summary": None, }, "summary": None, "presentation": presentation, "technicals": technical_score, "oscillators": { "signal": osc_score["signal"], "buy": osc_score["buy"], "sell": osc_score["sell"], "neutral": osc_score["neutral"], "buy_weight": osc_score.get("buy_weight", 0.0), "sell_weight": osc_score.get("sell_weight", 0.0), "neutral_weight": osc_score.get("neutral_weight", 0.0), "data": osc_data }, "performance": { "signal": performance_score["signal"], "buy": performance_score["buy"], "sell": performance_score["sell"], "neutral": performance_score["neutral"], "buy_weight": performance_score.get("buy_weight", 0.0), "sell_weight": performance_score.get("sell_weight", 0.0), "neutral_weight": performance_score.get("neutral_weight", 0.0), "available": performance_score.get("available", 0), "missing": performance_score.get("missing", 0), "lookbacks": performance_score.get("lookbacks", list(PERFORMANCE_LOOKBACKS)), "data": performance_data, }, "moving_averages": { "signal": ma_score["signal"], "buy": ma_score["buy"], "sell": ma_score["sell"], "neutral": ma_score["neutral"], "buy_weight": ma_score.get("buy_weight", 0.0), "sell_weight": ma_score.get("sell_weight", 0.0), "neutral_weight": ma_score.get("neutral_weight", 0.0), "golden_cross": ma_score["golden_cross"], "death_cross": ma_score["death_cross"], "data": ma_data }, "ai_gauge": None, "support_resistance": support_resistance, "pivot_points": support_resistance, } # ── 3. AI Forecast Gauge ── if not blended: # Fallback to simple blended if no forecast available blended = { "p50": [last_close * (1 + (confidence-50)/1000)], "p10": [last_close * 0.98], "p90": [last_close * 1.02], "agreement": True, "scale": 1.0 } ai_score = _calc_ai_forecast_score( blended, forecast_rows, last_close, indicators, len(forecast_rows) or 10, interval, model_reference_price=model_reference_price, ) # ── 4. Summary ── summary = _calc_summary_score_v2(technical_score, ai_score, interval) dashboard = _build_dashboard_payload(last_close, forecast_rows, technical_score, ai_score, summary) presentation = _build_analysis_presentation( technical_score, ai_score, summary, active_model_keys=active_model_keys or ["combined"], failed_model_keys=failed_model_keys, requested_models=requested_models, ) return { "style": "tradingview", "regime": {"key": regime, "label": regime_label}, "dashboard": dashboard, "summary": summary, "presentation": presentation, "technicals": technical_score, "oscillators": { "signal": osc_score["signal"], "buy": osc_score["buy"], "sell": osc_score["sell"], "neutral": osc_score["neutral"], "buy_weight": osc_score.get("buy_weight", 0.0), "sell_weight": osc_score.get("sell_weight", 0.0), "neutral_weight": osc_score.get("neutral_weight", 0.0), "data": osc_data }, "performance": { "signal": performance_score["signal"], "buy": performance_score["buy"], "sell": performance_score["sell"], "neutral": performance_score["neutral"], "buy_weight": performance_score.get("buy_weight", 0.0), "sell_weight": performance_score.get("sell_weight", 0.0), "neutral_weight": performance_score.get("neutral_weight", 0.0), "available": performance_score.get("available", 0), "missing": performance_score.get("missing", 0), "lookbacks": performance_score.get("lookbacks", list(PERFORMANCE_LOOKBACKS)), "data": performance_data, }, "moving_averages": { "signal": ma_score["signal"], "buy": ma_score["buy"], "sell": ma_score["sell"], "neutral": ma_score["neutral"], "buy_weight": ma_score.get("buy_weight", 0.0), "sell_weight": ma_score.get("sell_weight", 0.0), "neutral_weight": ma_score.get("neutral_weight", 0.0), "golden_cross": ma_score["golden_cross"], "death_cross": ma_score["death_cross"], "data": ma_data }, "ai_gauge": ai_score, "support_resistance": support_resistance, "pivot_points": support_resistance, }