Spaces:
Running
Running
| from __future__ import annotations | |
| import math | |
| import re | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| AI_SCORING_VERSION = "2026-05-05-path-direction-v2" | |
| TECHNICAL_SCORING_VERSION = "2026-05-18-performance-votes-v12" | |
| ANALYSIS_PRESENTATION_VERSION = "2026-05-16-gauge-view-v2" | |
| EMA_PAIR_SEQUENCE: List[Tuple[int, int]] = [ | |
| (1, 2), | |
| (2, 5), | |
| (5, 10), | |
| (10, 20), | |
| (20, 50), | |
| (50, 100), | |
| (100, 200), | |
| ] | |
| EMA_PAIR_THRESHOLD_PCT: Dict[str, float] = { | |
| "ema_1_2": 0.05, | |
| "ema_2_5": 0.08, | |
| "ema_5_10": 0.10, | |
| "ema_10_20": 0.14, | |
| "ema_20_50": 0.22, | |
| "ema_50_100": 0.35, | |
| "ema_100_200": 0.55, | |
| } | |
| ADX_GAUGE_PERIOD = 10 | |
| PERFORMANCE_LOOKBACKS: Tuple[int, ...] = (1, 2, 3, 5, 7, 30, 90, 180, 365) | |
| PERFORMANCE_VOTE_WEIGHTS: Dict[int, float] = { | |
| 2: 2.0, | |
| } | |
| PERFORMANCE_NEUTRAL_EPSILON_PCT = 1e-9 | |
| __all__ = [ | |
| "AI_SCORING_VERSION", | |
| "ANALYSIS_PRESENTATION_VERSION", | |
| "TECHNICAL_SCORING_VERSION", | |
| "_apply_forecast_amplitude_calibration", | |
| "_build_analysis_presentation", | |
| "_apply_forecast_path_texture", | |
| "_build_dashboard_payload", | |
| "_build_raw_ohlc4_bundle", | |
| "_build_regime_texture_template", | |
| "_build_trade_analysis", | |
| "_calc_ai_forecast_score", | |
| "_calc_ma_score", | |
| "_calc_performance_score", | |
| "_calc_price_action_levels", | |
| "_calc_pivot_points", | |
| "_calc_summary_score_v2", | |
| "_calc_technical_score_v2", | |
| "_calc_vote_gauge", | |
| "_clamp", | |
| "_derive_target_amplitude_profile", | |
| "_forecast_path_metrics", | |
| "_gauge_to_signal", | |
| "_momentum", | |
| "_osc_action", | |
| "_pct", | |
| "_score_forecast_context_candidate", | |
| "compute_indicators", | |
| ] | |
| def _ema(arr: np.ndarray, period: int) -> np.ndarray: | |
| """Vectorized EMA using NumPy (replaces loops).""" | |
| if len(arr) == 0: return np.array([], dtype=float) | |
| alpha = 2.0 / (period + 1.0) | |
| # Use pandas ewm for robust vectorized calculation (v6.0) | |
| return pd.Series(arr).ewm(alpha=alpha, adjust=False).mean().values | |
| def _rsi(close: np.ndarray, period: int = 14) -> np.ndarray: | |
| """Vectorized RSI using NumPy/Pandas.""" | |
| delta = np.diff(close) | |
| gain = np.where(delta > 0, delta, 0.0) | |
| loss = np.where(delta < 0, -delta, 0.0) | |
| avg_gain = pd.Series(gain).ewm(alpha=1.0/period, adjust=False).mean() | |
| avg_loss = pd.Series(loss).ewm(alpha=1.0/period, adjust=False).mean() | |
| rs = avg_gain / avg_loss.replace(0, np.nan) | |
| rsi = 100 - (100 / (1 + rs)) | |
| rsi = rsi.where(avg_loss > 0, 100.0) | |
| rsi = rsi.where(avg_gain > 0, 0.0) | |
| rsi = rsi.where(~((avg_gain == 0) & (avg_loss == 0)), 50.0) | |
| # Prepend NaN to match original array length | |
| return np.concatenate([[np.nan], rsi.values]) | |
| def _bollinger(close: np.ndarray, period=20, k=2.0) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| """Vectorized Bollinger Bands.""" | |
| s = pd.Series(close) | |
| mid = s.rolling(window=period).mean() | |
| std = s.rolling(window=period).std(ddof=0) | |
| return (mid + k*std).values, mid.values, (mid - k*std).values | |
| def _macd(close: np.ndarray, fast=12, slow=26, signal=9 | |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: | |
| ema_fast = _ema(close, fast) | |
| ema_slow = _ema(close, slow) | |
| macd_line = ema_fast - ema_slow | |
| sig_line = _ema(np.where(np.isnan(macd_line), 0, macd_line), signal) | |
| histogram = macd_line - sig_line | |
| return macd_line, sig_line, histogram | |
| def _atr(high: np.ndarray, low: np.ndarray, close: np.ndarray, period=14) -> np.ndarray: | |
| """Vectorized ATR.""" | |
| tr = np.maximum(high[1:] - low[1:], | |
| np.maximum(np.abs(high[1:] - close[:-1]), | |
| np.abs(low[1:] - close[:-1]))) | |
| tr = np.concatenate([[np.nan], tr]) | |
| return pd.Series(tr).ewm(alpha=1.0/period, adjust=False).mean().values | |
| def _stoch_rsi(close: np.ndarray, rsi_period=14, stoch_period=14, | |
| smooth_k=3, smooth_d=3) -> Tuple[np.ndarray, np.ndarray]: | |
| """Vectorized Stochastic RSI.""" | |
| rsi_vals = pd.Series(_rsi(close, rsi_period)) | |
| roll_min = rsi_vals.rolling(window=stoch_period).min() | |
| roll_max = rsi_vals.rolling(window=stoch_period).max() | |
| k = 100 * (rsi_vals - roll_min) / (roll_max - roll_min).replace(0, np.inf) | |
| k_smooth = k.rolling(window=smooth_k).mean() | |
| d_smooth = k_smooth.rolling(window=smooth_d).mean() | |
| return k_smooth.values, d_smooth.values | |
| def _stoch_kd( | |
| high: np.ndarray, | |
| low: np.ndarray, | |
| close: np.ndarray, | |
| k_period: int = 14, | |
| smooth_k: int = 3, | |
| d_period: int = 3, | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """Standard stochastic oscillator computed from price high/low/close.""" | |
| high_s = pd.Series(high) | |
| low_s = pd.Series(low) | |
| close_s = pd.Series(close) | |
| hh = high_s.rolling(window=k_period).max() | |
| ll = low_s.rolling(window=k_period).min() | |
| raw_k = 100.0 * (close_s - ll) / (hh - ll).replace(0, np.nan) | |
| k = raw_k.rolling(window=smooth_k).mean() | |
| d = k.rolling(window=d_period).mean() | |
| return k.values, d.values | |
| def _sma(arr: np.ndarray, period: int) -> np.ndarray: | |
| """Vectorized Simple Moving Average.""" | |
| if len(arr) == 0: return np.array([], dtype=float) | |
| return pd.Series(arr).rolling(window=period).mean().values | |
| def _cci(high: np.ndarray, low: np.ndarray, close: np.ndarray, period: int = 20) -> np.ndarray: | |
| """Vectorized Commodity Channel Index.""" | |
| tp = (high + low + close) / 3.0 | |
| tp_series = pd.Series(tp) | |
| sma = tp_series.rolling(window=period).mean() | |
| # Optimized MAD calculation (Vectorized) | |
| mad = tp_series.rolling(window=period).apply(lambda x: np.abs(x - x.mean()).mean(), raw=True) | |
| return ((tp_series - sma) / (0.015 * mad.replace(0, np.inf))).values | |
| def _adx(high: np.ndarray, low: np.ndarray, close: np.ndarray, period: int = 14) -> tuple: | |
| """Vectorized Average Directional Index (v6.0).""" | |
| plus_dm = np.where((high[1:] - high[:-1] > low[:-1] - low[1:]) & (high[1:] - high[:-1] > 0), high[1:] - high[:-1], 0.0) | |
| minus_dm = np.where((low[:-1] - low[1:] > high[1:] - high[:-1]) & (low[:-1] - low[1:] > 0), low[:-1] - low[1:], 0.0) | |
| tr = np.maximum(high[1:] - low[1:], np.maximum(np.abs(high[1:] - close[:-1]), np.abs(low[1:] - close[:-1]))) | |
| # Pad first index | |
| plus_dm = np.concatenate([[0.0], plus_dm]) | |
| minus_dm = np.concatenate([[0.0], minus_dm]) | |
| tr = np.concatenate([[0.0], tr]) | |
| tr_sum = pd.Series(tr).ewm(alpha=1.0/period, adjust=False).mean() | |
| plus_di = 100 * pd.Series(plus_dm).ewm(alpha=1.0/period, adjust=False).mean() / tr_sum.replace(0, np.inf) | |
| minus_di = 100 * pd.Series(minus_dm).ewm(alpha=1.0/period, adjust=False).mean() / tr_sum.replace(0, np.inf) | |
| dx = 100 * np.abs(plus_di - minus_di) / (plus_di + minus_di).replace(0, np.inf) | |
| adx = dx.ewm(alpha=1.0/period, adjust=False).mean() | |
| return adx.values, plus_di.values, minus_di.values | |
| def _awesome_oscillator(high: np.ndarray, low: np.ndarray) -> np.ndarray: | |
| """Awesome Oscillator = SMA(5, median) - SMA(34, median).""" | |
| median = (high + low) / 2.0 | |
| sma5 = _sma(median, 5) | |
| sma34 = _sma(median, 34) | |
| return sma5 - sma34 | |
| def _momentum(close: np.ndarray, period: int = 10) -> np.ndarray: | |
| """Classic momentum oscillator centered at 0.""" | |
| base = pd.Series(close).shift(period) | |
| return (pd.Series(close) - base).values | |
| def _roc(close: np.ndarray, period: int = 12) -> np.ndarray: | |
| """Rate of Change in percentage.""" | |
| base = pd.Series(close).shift(period) | |
| return (pd.Series(close) / base.replace(0, np.nan) - 1.0).mul(100.0).values | |
| def _trix(close: np.ndarray, period: int = 18) -> np.ndarray: | |
| """Triple EMA oscillator in percentage.""" | |
| ema1 = pd.Series(_ema(close, period)) | |
| ema2 = ema1.ewm(span=period, adjust=False).mean() | |
| ema3 = ema2.ewm(span=period, adjust=False).mean() | |
| return ema3.pct_change().mul(100.0).values | |
| def _ppo(close: np.ndarray, fast: int = 12, slow: int = 26) -> np.ndarray: | |
| """Percentage Price Oscillator.""" | |
| ema_fast = _ema(close, fast) | |
| ema_slow = _ema(close, slow) | |
| return ((ema_fast - ema_slow) / np.where(np.abs(ema_slow) < 1e-8, np.nan, ema_slow) * 100.0) | |
| def _cmo(close: np.ndarray, period: int = 14) -> np.ndarray: | |
| """Chande Momentum Oscillator.""" | |
| delta = pd.Series(close).diff() | |
| up = delta.clip(lower=0.0).rolling(period).sum() | |
| down = (-delta.clip(upper=0.0)).rolling(period).sum() | |
| return ((up - down) / (up + down).replace(0, np.nan) * 100.0).values | |
| def _dpo(close: np.ndarray, period: int = 20) -> np.ndarray: | |
| """Detrended Price Oscillator.""" | |
| offset = int(period / 2) + 1 | |
| sma = pd.Series(close).rolling(period).mean() | |
| return (pd.Series(close) - sma.shift(offset)).values | |
| def _aroon_oscillator(high: np.ndarray, low: np.ndarray, period: int = 25) -> np.ndarray: | |
| """Aroon Oscillator = Aroon Up - Aroon Down.""" | |
| hs = pd.Series(high) | |
| ls = pd.Series(low) | |
| aroon_up = hs.rolling(period).apply(lambda x: ((period - 1 - (len(x) - 1 - int(np.argmax(x)))) / (period - 1)) * 100.0, raw=True) | |
| aroon_down = ls.rolling(period).apply(lambda x: ((period - 1 - (len(x) - 1 - int(np.argmin(x)))) / (period - 1)) * 100.0, raw=True) | |
| return (aroon_up - aroon_down).values | |
| def _tsi(close: np.ndarray, long_period: int = 25, short_period: int = 13) -> np.ndarray: | |
| """True Strength Index.""" | |
| delta = pd.Series(close).diff() | |
| abs_delta = delta.abs() | |
| ema1 = delta.ewm(span=long_period, adjust=False).mean() | |
| ema2 = ema1.ewm(span=short_period, adjust=False).mean() | |
| abs_ema1 = abs_delta.ewm(span=long_period, adjust=False).mean() | |
| abs_ema2 = abs_ema1.ewm(span=short_period, adjust=False).mean() | |
| return (ema2 / abs_ema2.replace(0, np.nan) * 100.0).values | |
| def _demarker(high: np.ndarray, low: np.ndarray, period: int = 14) -> np.ndarray: | |
| """DeMarker oscillator in range 0..100.""" | |
| high_delta = pd.Series(high).diff() | |
| low_delta = -pd.Series(low).diff() | |
| demax = high_delta.clip(lower=0.0) | |
| demin = low_delta.clip(lower=0.0) | |
| demax_avg = demax.rolling(period).sum() | |
| demin_avg = demin.rolling(period).sum() | |
| denom = (demax_avg + demin_avg).replace(0, np.nan) | |
| return (demax_avg / denom * 100.0).values | |
| def _williams_r(high: np.ndarray, low: np.ndarray, close: np.ndarray, | |
| period: int = 14) -> np.ndarray: | |
| """Vectorized Williams %R.""" | |
| s = pd.Series(close) | |
| hh = pd.Series(high).rolling(window=period).max() | |
| ll = pd.Series(low).rolling(window=period).min() | |
| wr = -100 * (hh - s) / (hh - ll).replace(0, np.inf) | |
| return wr.values | |
| def _bull_bear_power(high: np.ndarray, low: np.ndarray, close: np.ndarray, | |
| period: int = 13) -> Tuple[np.ndarray, np.ndarray]: | |
| """Return Bull Power and Bear Power separately for sign-based scoring.""" | |
| ema_val = _ema(close, period) | |
| return (high - ema_val), (low - ema_val) | |
| def _ultimate_oscillator(high: np.ndarray, low: np.ndarray, close: np.ndarray, | |
| p1: int = 7, p2: int = 14, p3: int = 28) -> np.ndarray: | |
| """Vectorized Ultimate Oscillator (v6.0).""" | |
| n = len(close) | |
| if n < p3 + 1: return np.full(n, np.nan) | |
| prev_close = pd.Series(close).shift(1) | |
| tl = np.minimum(low, prev_close.values) | |
| th = np.maximum(high, prev_close.values) | |
| bp = pd.Series(close - tl) | |
| tr = pd.Series(th - tl) | |
| def _avg(p): | |
| return bp.rolling(p).sum() / tr.rolling(p).sum().replace(0, np.inf) | |
| avg1 = _avg(p1) | |
| avg2 = _avg(p2) | |
| avg3 = _avg(p3) | |
| uo = 100 * (4 * avg1 + 2 * avg2 + avg3) / 7.0 | |
| return uo.values | |
| def _ichimoku_base(high: np.ndarray, low: np.ndarray, period: int = 26) -> np.ndarray: | |
| """Vectorized Ichimoku Base Line.""" | |
| hh = pd.Series(high).rolling(window=period).max() | |
| ll = pd.Series(low).rolling(window=period).min() | |
| return ((hh + ll) / 2.0).values | |
| def _ichimoku_cloud(high: np.ndarray, low: np.ndarray) -> Dict[str, np.ndarray]: | |
| """Basic Ichimoku lines for cloud-position scoring.""" | |
| hs = pd.Series(high) | |
| ls = pd.Series(low) | |
| tenkan = ((hs.rolling(9).max() + ls.rolling(9).min()) / 2.0).values | |
| kijun = ((hs.rolling(26).max() + ls.rolling(26).min()) / 2.0).values | |
| span_a = ((pd.Series(tenkan) + pd.Series(kijun)) / 2.0).values | |
| span_b = ((hs.rolling(52).max() + ls.rolling(52).min()) / 2.0).values | |
| return { | |
| "tenkan": tenkan, | |
| "kijun": kijun, | |
| "span_a": span_a, | |
| "span_b": span_b, | |
| } | |
| def _vwma(close: np.ndarray, volume: np.ndarray, period: int = 20) -> np.ndarray: | |
| """Vectorized Volume Weighted Moving Average.""" | |
| cv = pd.Series(close * volume) | |
| v = pd.Series(volume) | |
| return (cv.rolling(period).sum() / v.rolling(period).sum().replace(0, np.inf)).values | |
| def _wma(arr: np.ndarray, period: int) -> np.ndarray: | |
| """Weighted moving average used by Hull MA.""" | |
| if len(arr) == 0: | |
| return np.array([], dtype=float) | |
| weights = np.arange(1, period + 1, dtype=float) | |
| return pd.Series(arr).rolling(window=period).apply( | |
| lambda x: float(np.dot(x, weights) / weights.sum()), | |
| raw=True, | |
| ).values | |
| def _hull_ma(close: np.ndarray, period: int = 9) -> np.ndarray: | |
| """Hull Moving Average.""" | |
| half = max(period // 2, 1) | |
| sqrt_p = max(int(math.sqrt(period)), 1) | |
| wma_half = _wma(close, half) | |
| wma_full = _wma(close, period) | |
| diff = 2 * wma_half - wma_full | |
| hull = _wma(np.where(np.isnan(diff), close, diff), sqrt_p) | |
| return hull | |
| def _price_action_config(interval: str) -> Tuple[int, int]: | |
| config_map = { | |
| "1m": (2, 120), | |
| "5m": (2, 140), | |
| "15m": (3, 180), | |
| "1h": (4, 260), | |
| "4h": (4, 320), | |
| "1d": (5, 360), | |
| "1w": (5, 260), | |
| } | |
| return config_map.get(interval, (4, 240)) | |
| def _cluster_price_action_candidates( | |
| candidates: List[Tuple[float, float, int]], | |
| tolerance: float, | |
| ) -> List[Dict[str, Any]]: | |
| clusters: List[Dict[str, Any]] = [] | |
| for price, weight, candle_index in sorted(candidates, key=lambda item: item[0]): | |
| cluster = next( | |
| (item for item in clusters if abs(item["price"] - price) <= tolerance), | |
| None, | |
| ) | |
| if cluster is None: | |
| clusters.append( | |
| { | |
| "price": float(price), | |
| "weight": float(weight), | |
| "touches": 1, | |
| "last_index": int(candle_index), | |
| } | |
| ) | |
| continue | |
| total_weight = float(cluster["weight"]) + float(weight) | |
| cluster["price"] = ( | |
| (float(cluster["price"]) * float(cluster["weight"])) + (float(price) * float(weight)) | |
| ) / max(total_weight, 1e-8) | |
| cluster["weight"] = total_weight | |
| cluster["touches"] = int(cluster["touches"]) + 1 | |
| cluster["last_index"] = max(int(cluster["last_index"]), int(candle_index)) | |
| return clusters | |
| def _dedupe_sorted_levels( | |
| levels: List[Dict[str, Any]], | |
| *, | |
| tolerance: float, | |
| ) -> List[Dict[str, Any]]: | |
| deduped: List[Dict[str, Any]] = [] | |
| for level in levels: | |
| if deduped and abs(float(deduped[-1]["price"]) - float(level["price"])) <= tolerance: | |
| continue | |
| deduped.append(level) | |
| return deduped | |
| def _calc_price_action_levels( | |
| data: List[Dict[str, Any]], | |
| current_price: float, | |
| interval: str, | |
| ) -> Dict[str, Any]: | |
| if not data: | |
| rounded_price = round(float(current_price), 6) | |
| return { | |
| "method": "price_action", | |
| "timeframe": interval, | |
| "current_price": rounded_price, | |
| "summary": "Khong du du lieu de xac dinh cac moc khang cu va ho tro theo Price Action.", | |
| "data": [ | |
| {"level": "KC2", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)}, | |
| {"level": "KC1", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)}, | |
| {"level": "GiΓ‘ hiα»n tαΊ‘i", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)}, | |
| {"level": "HT1", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)}, | |
| {"level": "HT2", "price": rounded_price, "display_value": _format_analysis_value(rounded_price)}, | |
| ], | |
| } | |
| highs = np.array([float(row["high"]) for row in data], dtype=float) | |
| lows = np.array([float(row["low"]) for row in data], dtype=float) | |
| closes = np.array([float(row["close"]) for row in data], dtype=float) | |
| pivot_window, lookback = _price_action_config(interval) | |
| recent_start = max(0, len(closes) - min(len(closes), lookback)) | |
| recent_highs = highs[recent_start:] | |
| recent_lows = lows[recent_start:] | |
| recent_closes = closes[recent_start:] | |
| atr_value = _latest_finite_value(_atr(recent_highs, recent_lows, recent_closes, period=14)) | |
| median_range = float(np.nanmedian(recent_highs - recent_lows)) if len(recent_highs) else 0.0 | |
| cluster_tolerance = max( | |
| float(atr_value or 0.0) * 0.35, | |
| median_range * 0.22, | |
| abs(float(current_price)) * 0.0015, | |
| 1e-6, | |
| ) | |
| extension_step = max( | |
| float(atr_value or 0.0), | |
| median_range, | |
| abs(float(current_price)) * 0.003, | |
| cluster_tolerance * 1.5, | |
| ) | |
| resistance_candidates: List[Tuple[float, float, int]] = [] | |
| support_candidates: List[Tuple[float, float, int]] = [] | |
| recent_count = len(recent_closes) | |
| for idx in range(pivot_window, max(pivot_window, recent_count - pivot_window)): | |
| high_slice = recent_highs[idx - pivot_window : idx + pivot_window + 1] | |
| low_slice = recent_lows[idx - pivot_window : idx + pivot_window + 1] | |
| high_price = float(recent_highs[idx]) | |
| low_price = float(recent_lows[idx]) | |
| recency_weight = 1.0 + (idx / max(recent_count, 1)) | |
| if high_price >= float(np.max(high_slice)): | |
| resistance_candidates.append((high_price, recency_weight, recent_start + idx)) | |
| if low_price <= float(np.min(low_slice)): | |
| support_candidates.append((low_price, recency_weight, recent_start + idx)) | |
| fallback_spans = [10, 20, 50, 100, min(recent_count, lookback)] | |
| for span in fallback_spans: | |
| if span <= 0 or recent_count < span: | |
| continue | |
| resistance_candidates.append((float(np.max(recent_highs[-span:])), 0.75, len(data) - 1)) | |
| support_candidates.append((float(np.min(recent_lows[-span:])), 0.75, len(data) - 1)) | |
| resistance_clusters = _cluster_price_action_candidates(resistance_candidates, cluster_tolerance) | |
| support_clusters = _cluster_price_action_candidates(support_candidates, cluster_tolerance) | |
| resistance_levels = _dedupe_sorted_levels( | |
| sorted( | |
| [ | |
| cluster | |
| for cluster in resistance_clusters | |
| if float(cluster["price"]) > float(current_price) + (cluster_tolerance * 0.12) | |
| ], | |
| key=lambda item: float(item["price"]), | |
| ), | |
| tolerance=cluster_tolerance * 0.55, | |
| ) | |
| support_levels = _dedupe_sorted_levels( | |
| sorted( | |
| [ | |
| cluster | |
| for cluster in support_clusters | |
| if float(cluster["price"]) < float(current_price) - (cluster_tolerance * 0.12) | |
| ], | |
| key=lambda item: float(item["price"]), | |
| reverse=True, | |
| ), | |
| tolerance=cluster_tolerance * 0.55, | |
| ) | |
| while len(resistance_levels) < 2: | |
| base_price = float(resistance_levels[-1]["price"]) if resistance_levels else float(current_price) | |
| resistance_levels.append( | |
| { | |
| "price": base_price + extension_step, | |
| "weight": 0.0, | |
| "touches": 0, | |
| "last_index": len(data) - 1, | |
| } | |
| ) | |
| while len(support_levels) < 2: | |
| base_price = float(support_levels[-1]["price"]) if support_levels else float(current_price) | |
| support_levels.append( | |
| { | |
| "price": base_price - extension_step, | |
| "weight": 0.0, | |
| "touches": 0, | |
| "last_index": len(data) - 1, | |
| } | |
| ) | |
| kc1 = float(resistance_levels[0]["price"]) | |
| kc2 = float(resistance_levels[1]["price"]) | |
| ht1 = float(support_levels[0]["price"]) | |
| ht2 = float(support_levels[1]["price"]) | |
| if current_price >= kc1: | |
| summary = ( | |
| "Gia hien tai dang ap sat hoac vuot len tren KC1. " | |
| "Neu duy tri duoc dong luc, KC2 la moc can theo doi tiep theo." | |
| ) | |
| elif current_price <= ht1: | |
| summary = ( | |
| "Gia hien tai dang ap sat hoac nam duoi HT1. " | |
| "Neu ap luc ban tiep tuc, HT2 la vung ho tro can quan sat them." | |
| ) | |
| else: | |
| summary = ( | |
| "Gia hien tai dang nam giua HT1 va KC1. " | |
| "Day la vung can theo doi phan ung gia truoc khi xac nhan huong di tiep theo." | |
| ) | |
| def _row(level: str, price: float) -> Dict[str, Any]: | |
| rounded_price = round(float(price), 6) | |
| distance_pct = 0.0 if current_price == 0 else ((rounded_price - float(current_price)) / float(current_price)) * 100.0 | |
| return { | |
| "level": level, | |
| "price": rounded_price, | |
| "display_value": _format_analysis_value(rounded_price), | |
| "distance_pct": round(distance_pct, 2), | |
| } | |
| rows = [ | |
| _row("KC2", kc2), | |
| _row("KC1", kc1), | |
| _row("GiΓ‘ hiα»n tαΊ‘i", float(current_price)), | |
| _row("HT1", ht1), | |
| _row("HT2", ht2), | |
| ] | |
| return { | |
| "method": "price_action", | |
| "timeframe": interval, | |
| "current_price": round(float(current_price), 6), | |
| "summary": summary, | |
| "data": rows, | |
| } | |
| def _calc_pivot_points( | |
| data: List[Dict[str, Any]], | |
| current_price: float, | |
| interval: str, | |
| ) -> Dict[str, Any]: | |
| """Compatibility wrapper for old callers. New logic uses Price Action only.""" | |
| return _calc_price_action_levels(data, current_price, interval) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TradingView-style action classification helpers | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _latest_finite_value(values: Any) -> Optional[float]: | |
| """Return the latest finite float from a scalar or array-like input without rounding.""" | |
| if values is None: | |
| return None | |
| if isinstance(values, (float, int)): | |
| value = float(values) | |
| return None if math.isnan(value) else value | |
| if isinstance(values, pd.Series): | |
| values = values.values | |
| if not hasattr(values, "__len__") or len(values) == 0: | |
| return None | |
| value = float(values[-1]) | |
| return None if math.isnan(value) else value | |
| def _format_analysis_value(value: Optional[float]) -> str: | |
| """Format a technical value for UI without losing useful precision.""" | |
| if value is None or math.isnan(float(value)): | |
| return "Γ’β¬β" | |
| abs_value = abs(float(value)) | |
| if abs_value >= 100: | |
| decimals = 2 | |
| elif abs_value >= 10: | |
| decimals = 3 | |
| elif abs_value >= 1: | |
| decimals = 4 | |
| elif abs_value >= 0.1: | |
| decimals = 6 | |
| elif abs_value >= 0.01: | |
| decimals = 7 | |
| else: | |
| decimals = 8 | |
| return f"{float(value):.{decimals}f}".rstrip("0").rstrip(".") | |
| def _osc_action(name: str, value: float, **kw) -> str: | |
| """Classify oscillator value as 'Mua' / 'BΓ‘n' / 'Trung lαΊp'.""" | |
| if value is None or math.isnan(value): | |
| return "Trung lαΊp" | |
| atr = max(float(kw.get("atr", 0.0) or 0.0), 1e-8) | |
| def _bounded_action( | |
| current: float, | |
| min_value: float, | |
| max_value: float, | |
| *, | |
| bullish_high: bool = True, | |
| ) -> str: | |
| span = max(max_value - min_value, 1e-8) | |
| ratio = _clamp((current - min_value) / span, 0.0, 1.0) | |
| if not bullish_high: | |
| ratio = 1.0 - ratio | |
| if ratio > (2.0 / 3.0): | |
| return "Mua" | |
| if ratio < (1.0 / 3.0): | |
| return "BΓ‘n" | |
| return "Trung lαΊp" | |
| def _zero_line_action(current: float, epsilon: float) -> str: | |
| band = max(float(epsilon), 1e-8) | |
| if current > band: | |
| return "Mua" | |
| if current < -band: | |
| return "BΓ‘n" | |
| return "Trung lαΊp" | |
| if name == "rsi": | |
| return _bounded_action(value, 0.0, 100.0) | |
| if name in {"stoch", "stoch_rsi", "ultimate", "demarker"}: | |
| return _bounded_action(value, 0.0, 100.0) | |
| if name == "williams": | |
| return _bounded_action(value, -100.0, 0.0, bullish_high=True) | |
| if name in {"aroon", "cmo", "tsi"}: | |
| return _bounded_action(value, -100.0, 100.0) | |
| if name == "cci": | |
| return "Mua" if value > 50 else "BΓ‘n" if value < -50 else "Trung lαΊp" | |
| if name == "momentum": | |
| return _zero_line_action(value, kw.get("epsilon", atr * 0.025)) | |
| if name == "roc": | |
| return _zero_line_action(value, kw.get("epsilon", 0.01)) | |
| if name == "trix": | |
| return _zero_line_action(value, kw.get("epsilon", 0.01)) | |
| if name == "ppo": | |
| return _zero_line_action(value, kw.get("epsilon", 0.01)) | |
| if name == "dpo": | |
| return _zero_line_action(value, kw.get("epsilon", atr * 0.02)) | |
| if name == "adx": | |
| plus_di = kw.get("plus_di", 0) | |
| minus_di = kw.get("minus_di", 0) | |
| if value < 20: | |
| return "Trung lαΊp" | |
| return "Mua" if plus_di > minus_di else "BΓ‘n" | |
| if name == "ao": | |
| return _zero_line_action(value, kw.get("epsilon", atr * 0.02)) | |
| if name == "macd": | |
| return _zero_line_action(value, kw.get("epsilon", atr * 0.02)) | |
| if name == "bbp": | |
| bull_power = kw.get("bull_power") | |
| bear_power = kw.get("bear_power") | |
| epsilon = max(float(kw.get("epsilon", atr * 0.02)), 1e-8) | |
| if bull_power is None or bear_power is None: | |
| return "Trung lαΊp" | |
| if bull_power > epsilon and bear_power > epsilon: | |
| return "Mua" | |
| if bull_power < -epsilon and bear_power < -epsilon: | |
| return "BΓ‘n" | |
| return "Trung lαΊp" | |
| return "Trung lαΊp" | |
| def _ema_pair_action(fast_val: Optional[float], slow_val: Optional[float], pair_key: str = "") -> str: | |
| """EMA chain classification using raw spread instead of rounded display values.""" | |
| if fast_val is None or slow_val is None: | |
| return "Trung lαΊp" | |
| if math.isnan(fast_val) or math.isnan(slow_val): | |
| return "Trung lαΊp" | |
| spread_pct = _pct(fast_val, slow_val) | |
| neutral_band_pct = max(_ema_pair_threshold_pct(pair_key) * 0.08, 0.0005) | |
| if spread_pct > neutral_band_pct: | |
| return "Mua" | |
| if spread_pct < -neutral_band_pct: | |
| return "BΓ‘n" | |
| return "Trung lαΊp" | |
| def _price_vs_level_action( | |
| price: Optional[float], | |
| level: Optional[float], | |
| *, | |
| neutral_band_pct: float = 0.08, | |
| ) -> str: | |
| """Simple trend classification from price relative to a pure-price reference line.""" | |
| if price is None or level is None: | |
| return "Trung lαΊp" | |
| if math.isnan(price) or math.isnan(level): | |
| return "Trung lαΊp" | |
| spread_pct = _pct(price, level) | |
| band = max(float(neutral_band_pct), 0.0005) | |
| if spread_pct > band: | |
| return "Mua" | |
| if spread_pct < -band: | |
| return "BΓ‘n" | |
| return "Trung lαΊp" | |
| def _ema_pair_threshold_pct(pair_key: str) -> float: | |
| return EMA_PAIR_THRESHOLD_PCT.get(pair_key, 0.18) | |
| def compute_indicators(data: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Compute a full suite of technical indicators on OHLCV data.""" | |
| if len(data) < 30: | |
| return {"error": "Insufficient data (need β₯30 candles)"} | |
| closes = np.array([d["close"] for d in data], dtype=float) | |
| highs = np.array([d["high"] for d in data], dtype=float) | |
| lows = np.array([d["low"] for d in data], dtype=float) | |
| vols = np.array([d["volume"]for d in data], dtype=float) | |
| times = [d["time"] for d in data] | |
| def _last(arr): | |
| v = arr[-1] | |
| return None if math.isnan(v) else round(float(v), 8) | |
| def _series(arr, n: int = 50): | |
| out = [] | |
| window = arr[-min(n, len(arr)):] | |
| offset = len(arr) - len(window) | |
| for i, v in enumerate(window): | |
| idx = offset + i | |
| out.append({"time": times[idx], "value": None if math.isnan(v) else round(float(v), 8)}) | |
| return out | |
| ema9 = _ema(closes, 9) | |
| ema21 = _ema(closes, 21) | |
| ema50 = _ema(closes, 50) | |
| ema200 = _ema(closes, 200) | |
| rsi14 = _rsi(closes, 14) | |
| macd_l, macd_s, macd_h = _macd(closes) | |
| bb_u, bb_m, bb_l = _bollinger(closes) | |
| atr14 = _atr(highs, lows, closes, 14) | |
| stoch_k, stoch_d = _stoch_rsi(closes) | |
| roc12 = _roc(closes, 12) | |
| trix18 = _trix(closes, 18) | |
| ppo12 = _ppo(closes, 12, 26) | |
| cmo14 = _cmo(closes, 14) | |
| dpo20 = _dpo(closes, 20) | |
| aroon25 = _aroon_oscillator(highs, lows, 25) | |
| tsi25 = _tsi(closes, 25, 13) | |
| # Volume SMA 20 (Vectorized v6.0) | |
| vol_sma = _sma(vols, 20) | |
| last_close = closes[-1] | |
| last_atr = _last(atr14) or 0 | |
| return { | |
| "ema": { | |
| "ema9": _last(ema9), "ema21": _last(ema21), | |
| "ema50": _last(ema50), "ema200": _last(ema200), | |
| }, | |
| "rsi": { | |
| "value": _last(rsi14), | |
| "signal": ( | |
| "overbought" if (_last(rsi14) or 50) > 70 | |
| else "oversold" if (_last(rsi14) or 50) < 30 | |
| else "neutral" | |
| ), | |
| }, | |
| "macd": { | |
| "macd": _last(macd_l), | |
| "signal": _last(macd_s), | |
| "histogram": _last(macd_h), | |
| "cross": ( | |
| "bullish" if (_last(macd_h) or 0) > 0 | |
| else "bearish" if (_last(macd_h) or 0) < 0 | |
| else "neutral" | |
| ), | |
| }, | |
| "bollinger": { | |
| "upper": _last(bb_u), | |
| "middle": _last(bb_m), | |
| "lower": _last(bb_l), | |
| "bandwidth": round(((_last(bb_u) or 0) - (_last(bb_l) or 0)) / ((_last(bb_m) or 1)), 6), | |
| }, | |
| "atr": { | |
| "value": _last(atr14), | |
| "pct": round(last_atr / last_close * 100, 4) if last_close else None, | |
| }, | |
| "stoch_rsi": { | |
| "k": _last(stoch_k), | |
| "d": _last(stoch_d), | |
| "signal": ( | |
| "overbought" if (_last(stoch_k) or 50) > 80 | |
| else "oversold" if (_last(stoch_k) or 50) < 20 | |
| else "neutral" | |
| ), | |
| }, | |
| "roc": { | |
| "value": _last(roc12), | |
| "signal": "bullish" if (_last(roc12) or 0) > 1.0 else "bearish" if (_last(roc12) or 0) < -1.0 else "neutral", | |
| }, | |
| "trix": { | |
| "value": _last(trix18), | |
| "signal": "bullish" if (_last(trix18) or 0) > 0 else "bearish" if (_last(trix18) or 0) < 0 else "neutral", | |
| }, | |
| "ppo": { | |
| "value": _last(ppo12), | |
| "signal": "bullish" if (_last(ppo12) or 0) > 0.35 else "bearish" if (_last(ppo12) or 0) < -0.35 else "neutral", | |
| }, | |
| "cmo": { | |
| "value": _last(cmo14), | |
| "signal": "bullish" if (_last(cmo14) or 0) > 20 else "bearish" if (_last(cmo14) or 0) < -20 else "neutral", | |
| }, | |
| "dpo": { | |
| "value": _last(dpo20), | |
| "signal": "bullish" if (_last(dpo20) or 0) > 0 else "bearish" if (_last(dpo20) or 0) < 0 else "neutral", | |
| }, | |
| "aroon": { | |
| "value": _last(aroon25), | |
| "signal": "bullish" if (_last(aroon25) or 0) > 25 else "bearish" if (_last(aroon25) or 0) < -25 else "neutral", | |
| }, | |
| "tsi": { | |
| "value": _last(tsi25), | |
| "signal": "bullish" if (_last(tsi25) or 0) > 5 else "bearish" if (_last(tsi25) or 0) < -5 else "neutral", | |
| }, | |
| "volume": { | |
| "last": round(float(vols[-1]), 2), | |
| "sma20": round(float(vol_sma[-1]), 2), | |
| "above_avg": bool(vols[-1] > vol_sma[-1]), | |
| }, | |
| "trend": { | |
| "ema_bullish_stack": bool( | |
| all(x is not None for x in [_last(ema9),_last(ema21),_last(ema50)]) and | |
| _last(ema9) > _last(ema21) > _last(ema50) # type: ignore | |
| ), | |
| "above_ema200": bool(_last(ema200) is not None and last_close > (_last(ema200) or 0)), | |
| "close": round(float(last_close), 8), | |
| "short_momentum_ref": float(closes[max(0, len(closes)-6)]) if len(closes) else float(last_close), | |
| }, | |
| "short_momentum_ref": float(closes[max(0, len(closes)-6)]) if len(closes) else float(last_close), | |
| "series": { | |
| "ema9": _series(ema9), | |
| "ema21": _series(ema21), | |
| "ema50": _series(ema50), | |
| "bb_upper": _series(bb_u), | |
| "bb_mid": _series(bb_m), | |
| "bb_lower": _series(bb_l), | |
| }, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # UTILITIES | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _clamp(v: float, lo: float, hi: float) -> float: | |
| return max(lo, min(hi, v)) | |
| def _pct(a: float, b: float) -> float: | |
| """(a - b) / b * 100, an toΓ n vα»i b = 0.""" | |
| return (a - b) / b * 100.0 if b != 0 else 0.0 | |
| def _safe(v: Optional[float], default: float = 0.0) -> float: | |
| if v is None or math.isnan(v) or math.isinf(v): | |
| return default | |
| return float(v) | |
| def _classify_regime(indicators: Dict[str, Any], last_close: float, atr_pct: float) -> Tuple[str, str]: | |
| """PhΓ’n loαΊ‘i trαΊ‘ng thΓ‘i thα» trΖ°α»ng dα»±a trΓͺn biαΊΏn Δα»ng vΓ xu hΖ°α»ng.""" | |
| rsi = _safe(indicators["rsi"].get("value"), 50.0) | |
| ema9 = _safe(indicators["ema"].get("ema9"), last_close) | |
| ema50 = _safe(indicators["ema"].get("ema50"), last_close) | |
| ema200 = _safe(indicators["ema"].get("ema200"), last_close) | |
| # Logic 8 trαΊ‘ng thΓ‘i | |
| if atr_pct > 3.5: | |
| if rsi > 60: return "volatile_bull", "TΔng trΖ°α»ng biαΊΏn Δα»ng cao" | |
| if rsi < 40: return "volatile_bear", "Sα»₯t giαΊ£m biαΊΏn Δα»ng cao" | |
| return "high_chaos", "Thα» trΖ°α»ng hα»n loαΊ‘n / Vol cao" | |
| if last_close > ema200 and ema9 > ema50: | |
| if rsi > 70: return "overextended_bull", "TΔng trΖ°α»ng quΓ‘ mα»©c (Overbought)" | |
| return "stable_bull", "Xu hΖ°α»ng tΔng α»n Δα»nh" | |
| if last_close < ema200 and ema9 < ema50: | |
| if rsi < 30: return "capitulation", "HoαΊ£ng loαΊ‘n / QuΓ‘ bΓ‘n (Oversold)" | |
| return "stable_bear", "Xu hΖ°α»ng giαΊ£m α»n Δα»nh" | |
| if abs(_pct(ema9, ema50)) < 0.5: | |
| return "tight_range", "TΓch lΕ©y biΓͺn Δα» hαΊΉp (Squeeze)" | |
| return "sideways", "Thα» trΖ°α»ng Δi ngang (Sideway)" | |
| OSC_WEIGHTS = { | |
| "rsi": 2.1, | |
| "macd": 2.4, | |
| "stoch_rsi": 1.1, | |
| "stoch": 1.2, | |
| "cci": 1.7, | |
| "adx": 1.8, | |
| "williams": 1.5, | |
| "ultimate": 1.1, | |
| "bbp": 1.0, | |
| "ao": 0.9, | |
| "momentum": 1.6, | |
| "roc": 1.3, | |
| "trix": 1.4, | |
| "ppo": 1.4, | |
| "cmo": 1.2, | |
| "dpo": 0.8, | |
| "aroon": 1.3, | |
| "tsi": 1.4, | |
| "demarker": 1.2, | |
| } | |
| MA_WEIGHT_MAP = { | |
| "ema_1_2": 0.8, | |
| "ema_2_5": 1.0, | |
| "ema_5_10": 1.1, | |
| "ema_10_20": 1.35, | |
| "ema_20_50": 1.8, | |
| "ema_50_100": 2.2, | |
| "ema_100_200": 2.7, | |
| "ichimoku_base_26": 1.9, | |
| } | |
| def _extract_osc_key(label: str) -> str: | |
| l = label.lower() | |
| if "sα»©c mαΊ‘nh tΖ°Ζ‘ng Δα»i" in l or ("rsi" in l and "nhanh" not in l): return "rsi" | |
| if "macd" in l: return "macd" | |
| if "stochastic %k" in l: return "stoch" | |
| if "nhanh" in l or "stoch_rsi" in l: return "stoch_rsi" | |
| if "kΓͺnh hΓ ng hΓ³a" in l or "cci" in l: return "cci" | |
| if "Δα»nh hΖ°α»ng" in l or "adx" in l: return "adx" | |
| if "williams" in l: return "williams" | |
| if "ultimate" in l: return "ultimate" | |
| if "bbp" in l or "sα»©c mαΊ‘nh giΓ‘" in l: return "bbp" | |
| if "ao" in l: return "ao" | |
| if "xung lượng" in l or "momentum" in l: return "momentum" | |
| if "roc" in l: return "roc" | |
| if "trix" in l: return "trix" | |
| if "ppo" in l: return "ppo" | |
| if "cmo" in l: return "cmo" | |
| if "dpo" in l: return "dpo" | |
| if "aroon" in l: return "aroon" | |
| if "tsi" in l: return "tsi" | |
| if "demarker" in l: return "demarker" | |
| return "unknown" | |
| def _get_ma_weight(label: str) -> float: | |
| l = label.lower() | |
| if l in MA_WEIGHT_MAP: | |
| return MA_WEIGHT_MAP[l] | |
| pair = re.findall(r"\d+", l) | |
| if len(pair) >= 2: | |
| return MA_WEIGHT_MAP.get(f"ema_{pair[0]}_{pair[1]}", 1.0) | |
| return 1.0 | |
| def _gauge_to_signal(gauge: float, interval: str = "1h") -> str: | |
| """Strict 3-level signal converter.""" | |
| if gauge >= 66.66: return "Mua" | |
| if gauge > 33.33: return "Trung lαΊp" | |
| return "BΓ‘n" | |
| def _gauge_to_normalized_score(gauge: float) -> float: | |
| """Convert a 0..100 gauge into a -1..1 frontend-friendly scale.""" | |
| return _clamp((float(gauge) - 50.0) / 50.0, -1.0, 1.0) | |
| def _forecast_path_metrics(p50_path: np.ndarray, last_close: float) -> Dict[str, float]: | |
| """Measure forecast quality from the full path, not only the final endpoint.""" | |
| if len(p50_path) == 0 or abs(last_close) <= 1e-8: | |
| return { | |
| "weighted_return_pct": 0.0, | |
| "final_return_pct": 0.0, | |
| "path_consistency": 50.0, | |
| "monotonicity": 50.0, | |
| "max_adverse_excursion_pct": 0.0, | |
| "mean_step_return_pct": 0.0, | |
| } | |
| p50_safe = np.array(p50_path, dtype=float) | |
| if not np.isfinite(p50_safe).all(): | |
| last_valid = float(last_close) | |
| for i in range(len(p50_safe)): | |
| if not np.isfinite(p50_safe[i]): | |
| p50_safe[i] = last_valid | |
| else: | |
| last_valid = p50_safe[i] | |
| ret_path = ((p50_safe / last_close) - 1.0) * 100.0 | |
| step_weights = np.linspace(1.0, 0.65, len(ret_path)) | |
| weighted_ret_pct = float(np.average(ret_path, weights=step_weights)) | |
| final_ret_pct = float(ret_path[-1]) | |
| final_sign = 0 if abs(final_ret_pct) < 0.05 else (1 if final_ret_pct > 0 else -1) | |
| if len(ret_path) > 1 and final_sign != 0: | |
| signed_steps = [ | |
| 1.0 if np.sign(curr - prev) == final_sign else 0.0 | |
| for prev, curr in zip(ret_path[:-1], ret_path[1:]) | |
| if abs(curr - prev) >= 0.02 | |
| ] | |
| path_consistency = float(sum(signed_steps) / len(signed_steps)) if signed_steps else 0.5 | |
| else: | |
| path_consistency = 0.5 | |
| if len(ret_path) > 1: | |
| monotonicity = float(np.mean(np.diff(ret_path) >= 0)) if final_sign >= 0 else float(np.mean(np.diff(ret_path) <= 0)) | |
| else: | |
| monotonicity = 0.5 | |
| if final_sign > 0: | |
| adverse = abs(float(np.min(ret_path))) | |
| elif final_sign < 0: | |
| adverse = abs(float(np.max(ret_path))) | |
| else: | |
| adverse = max(abs(float(np.min(ret_path))), abs(float(np.max(ret_path)))) | |
| mean_step_return_pct = float(np.mean(np.diff(ret_path))) if len(ret_path) > 1 else final_ret_pct | |
| return { | |
| "weighted_return_pct": round(weighted_ret_pct, 2), | |
| "final_return_pct": round(final_ret_pct, 2), | |
| "path_consistency": round(path_consistency * 100.0, 1), | |
| "monotonicity": round(monotonicity * 100.0, 1), | |
| "max_adverse_excursion_pct": round(adverse, 2), | |
| "mean_step_return_pct": round(mean_step_return_pct, 3), | |
| } | |
| def _forecast_band_profile( | |
| p10_path: np.ndarray, | |
| p50_path: np.ndarray, | |
| p90_path: np.ndarray, | |
| ) -> Dict[str, float]: | |
| """Summarize uncertainty bands across the full forecast path.""" | |
| if len(p50_path) == 0: | |
| return { | |
| "avg_band_pct": 0.0, | |
| "end_band_pct": 0.0, | |
| "band_stability_pct": 0.0, | |
| "band_step_change_pct": 0.0, | |
| } | |
| safe_mid = np.maximum(np.abs(p50_path), 1e-8) | |
| band_pct = np.maximum(((p90_path - p10_path) / safe_mid) * 100.0, 0.0) | |
| band_step_changes = np.abs(np.diff(band_pct)) if len(band_pct) > 1 else np.array([0.0], dtype=float) | |
| return { | |
| "avg_band_pct": round(float(np.mean(band_pct)), 4), | |
| "end_band_pct": round(float(band_pct[-1]), 4), | |
| "band_stability_pct": round(float(np.std(band_pct)), 4), | |
| "band_step_change_pct": round(float(np.mean(band_step_changes)), 4), | |
| } | |
| def _forecast_path_responsiveness(p50_path: np.ndarray, last_close: float) -> Dict[str, float]: | |
| """Quantify how much the forecast path actually moves relative to the anchor.""" | |
| if len(p50_path) == 0 or abs(last_close) <= 1e-8: | |
| return { | |
| "range_pct": 0.0, | |
| "step_abs_mean_pct": 0.0, | |
| "step_abs_max_pct": 0.0, | |
| } | |
| ret_path = ((p50_path / last_close) - 1.0) * 100.0 | |
| diffs = np.diff(ret_path) if len(ret_path) > 1 else np.array([ret_path[-1]], dtype=float) | |
| return { | |
| "range_pct": round(float(np.max(ret_path) - np.min(ret_path)), 4), | |
| "step_abs_mean_pct": round(float(np.mean(np.abs(diffs))), 4), | |
| "step_abs_max_pct": round(float(np.max(np.abs(diffs))), 4), | |
| } | |
| def _recent_ohlc4_abs_step_pct(ohlc4: np.ndarray, window: int = 20) -> float: | |
| """Use recent OHLC4 movement as a volatility-aware floor for forecast responsiveness.""" | |
| if len(ohlc4) < 2: | |
| return 0.0 | |
| base = np.maximum(np.abs(ohlc4[:-1]), 1e-8) | |
| returns = np.diff(ohlc4) / base * 100.0 | |
| recent = returns[-min(window, len(returns)) :] | |
| return round(float(np.mean(np.abs(recent))), 4) if len(recent) else 0.0 | |
| def _future_window_path_stats( | |
| ohlc4: np.ndarray, | |
| anchor_index: int, | |
| horizon: int, | |
| ) -> Optional[Dict[str, float]]: | |
| """Measure the realized future path that followed a historical anchor point.""" | |
| if anchor_index < 0 or horizon <= 0: | |
| return None | |
| if anchor_index + horizon >= len(ohlc4): | |
| return None | |
| baseline = float(ohlc4[anchor_index]) | |
| future_path = np.asarray(ohlc4[anchor_index + 1 : anchor_index + 1 + horizon], dtype=float) | |
| responsiveness = _forecast_path_responsiveness(future_path, baseline) | |
| final_return_pct = ( | |
| ((future_path[-1] / max(abs(baseline), 1e-8)) - 1.0) * 100.0 | |
| if len(future_path) | |
| else 0.0 | |
| ) | |
| return { | |
| "range_pct": float(responsiveness["range_pct"]), | |
| "step_abs_mean_pct": float(responsiveness["step_abs_mean_pct"]), | |
| "step_abs_max_pct": float(responsiveness["step_abs_max_pct"]), | |
| "final_return_pct": float(final_return_pct), | |
| } | |
| def _rolling_future_path_targets( | |
| ohlc4: np.ndarray, | |
| horizon: int, | |
| max_windows: int = 48, | |
| ) -> Optional[Dict[str, float]]: | |
| """Summarize realized future-path amplitudes from recent historical anchors.""" | |
| if len(ohlc4) < horizon + 2: | |
| return None | |
| last_anchor = len(ohlc4) - horizon - 1 | |
| start_anchor = max(0, last_anchor - max_windows + 1) | |
| stats: List[Dict[str, float]] = [] | |
| for anchor_index in range(start_anchor, last_anchor + 1): | |
| path_stats = _future_window_path_stats(ohlc4, anchor_index, horizon) | |
| if path_stats is not None: | |
| stats.append(path_stats) | |
| if not stats: | |
| return None | |
| weights = np.linspace(0.6, 1.0, len(stats), dtype=float) | |
| range_vals = np.array([item["range_pct"] for item in stats], dtype=float) | |
| step_vals = np.array([item["step_abs_mean_pct"] for item in stats], dtype=float) | |
| return { | |
| "window_count": len(stats), | |
| "range_pct": round(float(np.average(range_vals, weights=weights)), 4), | |
| "step_abs_mean_pct": round(float(np.average(step_vals, weights=weights)), 4), | |
| } | |
| def _context_regime_signature(ohlc4_window: np.ndarray) -> Dict[str, float]: | |
| """Describe the local market regime of an OHLC4 window using scale-free features.""" | |
| if len(ohlc4_window) < 2: | |
| return { | |
| "net_change_pct": 0.0, | |
| "vol_pct": 0.0, | |
| "abs_step_mean_pct": 0.0, | |
| "range_pct": 0.0, | |
| } | |
| base = np.maximum(np.abs(ohlc4_window[:-1]), 1e-8) | |
| returns = np.diff(ohlc4_window) / base * 100.0 | |
| first = max(abs(float(ohlc4_window[0])), 1e-8) | |
| return { | |
| "net_change_pct": float(((ohlc4_window[-1] / first) - 1.0) * 100.0), | |
| "vol_pct": float(np.std(returns)) if len(returns) > 1 else 0.0, | |
| "abs_step_mean_pct": float(np.mean(np.abs(returns))) if len(returns) else 0.0, | |
| "range_pct": float(((np.max(ohlc4_window) - np.min(ohlc4_window)) / first) * 100.0), | |
| } | |
| def _regime_signature_distance(current: Dict[str, float], candidate: Dict[str, float]) -> float: | |
| """Weighted distance between two market-regime signatures.""" | |
| terms = ( | |
| ("net_change_pct", 0.34, 0.40), | |
| ("vol_pct", 0.28, 0.18), | |
| ("abs_step_mean_pct", 0.22, 0.12), | |
| ("range_pct", 0.16, 0.30), | |
| ) | |
| distance = 0.0 | |
| for key, weight, floor in terms: | |
| lhs = float(current.get(key, 0.0)) | |
| rhs = float(candidate.get(key, 0.0)) | |
| scale = max(abs(lhs), abs(rhs), floor) | |
| distance += weight * (abs(lhs - rhs) / scale) | |
| return float(distance) | |
| def _regime_matched_future_targets( | |
| ohlc4: np.ndarray, | |
| context_len: int, | |
| horizon: int, | |
| search_limit: int = 720, | |
| top_k: int = 18, | |
| ) -> Optional[Dict[str, float]]: | |
| """Find historical windows with a similar regime and measure their realized future amplitudes.""" | |
| max_anchor = len(ohlc4) - horizon - 1 | |
| if max_anchor < 32: | |
| return None | |
| signature_len = max(32, min(int(context_len), 96, max_anchor + 1)) | |
| if len(ohlc4) < signature_len + horizon + 1: | |
| return None | |
| current_signature = _context_regime_signature( | |
| np.asarray(ohlc4[-signature_len:], dtype=float) | |
| ) | |
| min_anchor = signature_len - 1 | |
| start_anchor = max(min_anchor, max_anchor - search_limit + 1) | |
| matches: List[Tuple[float, Dict[str, float]]] = [] | |
| for anchor_index in range(start_anchor, max_anchor + 1): | |
| context_window = np.asarray( | |
| ohlc4[anchor_index - signature_len + 1 : anchor_index + 1], | |
| dtype=float, | |
| ) | |
| candidate_signature = _context_regime_signature(context_window) | |
| path_stats = _future_window_path_stats(ohlc4, anchor_index, horizon) | |
| if path_stats is None: | |
| continue | |
| distance = _regime_signature_distance(current_signature, candidate_signature) | |
| matches.append((distance, path_stats)) | |
| if not matches: | |
| return None | |
| matches.sort(key=lambda item: item[0]) | |
| chosen = matches[: min(top_k, len(matches))] | |
| weights = np.array([1.0 / (item[0] + 0.08) for item in chosen], dtype=float) | |
| range_vals = np.array([item[1]["range_pct"] for item in chosen], dtype=float) | |
| step_vals = np.array([item[1]["step_abs_mean_pct"] for item in chosen], dtype=float) | |
| return { | |
| "match_count": len(chosen), | |
| "signature_len": signature_len, | |
| "range_pct": round(float(np.average(range_vals, weights=weights)), 4), | |
| "step_abs_mean_pct": round(float(np.average(step_vals, weights=weights)), 4), | |
| } | |
| def _build_regime_texture_template( | |
| ohlc4: np.ndarray, | |
| context_len: int, | |
| horizon: int, | |
| search_limit: int = 720, | |
| top_k: int = 18, | |
| ) -> Optional[Dict[str, Any]]: | |
| """Build a step-return template from regime-matched historical futures.""" | |
| max_anchor = len(ohlc4) - horizon - 1 | |
| if max_anchor < 32: | |
| return None | |
| signature_len = max(32, min(int(context_len), 96, max_anchor + 1)) | |
| if len(ohlc4) < signature_len + horizon + 1: | |
| return None | |
| current_signature = _context_regime_signature( | |
| np.asarray(ohlc4[-signature_len:], dtype=float) | |
| ) | |
| min_anchor = signature_len - 1 | |
| start_anchor = max(min_anchor, max_anchor - search_limit + 1) | |
| matches: List[Tuple[float, np.ndarray, Dict[str, float]]] = [] | |
| for anchor_index in range(start_anchor, max_anchor + 1): | |
| context_window = np.asarray( | |
| ohlc4[anchor_index - signature_len + 1 : anchor_index + 1], | |
| dtype=float, | |
| ) | |
| candidate_signature = _context_regime_signature(context_window) | |
| distance = _regime_signature_distance(current_signature, candidate_signature) | |
| baseline = float(ohlc4[anchor_index]) | |
| future_path = np.asarray( | |
| ohlc4[anchor_index + 1 : anchor_index + 1 + horizon], | |
| dtype=float, | |
| ) | |
| if len(future_path) != horizon: | |
| continue | |
| prev_values = np.concatenate(([baseline], future_path[:-1])) | |
| step_returns = ((future_path / np.maximum(np.abs(prev_values), 1e-8)) - 1.0) * 100.0 | |
| path_stats = _forecast_path_responsiveness(future_path, baseline) | |
| matches.append((distance, step_returns, path_stats)) | |
| if not matches: | |
| return None | |
| matches.sort(key=lambda item: item[0]) | |
| chosen = matches[: min(top_k, len(matches))] | |
| weights = np.array([1.0 / (distance + 0.08) for distance, _, _ in chosen], dtype=float) | |
| step_matrix = np.vstack([step_returns for _, step_returns, _ in chosen]) | |
| averaged_steps = np.average(step_matrix, axis=0, weights=weights) | |
| best_distance = float("inf") | |
| best_steps: Optional[np.ndarray] = None | |
| best_score = float("-inf") | |
| for distance, step_returns, path_stats in chosen[: min(6, len(chosen))]: | |
| step_abs_mean_pct = float(np.mean(np.abs(step_returns))) if len(step_returns) else 0.0 | |
| range_pct = float(path_stats.get("range_pct", 0.0)) | |
| texture_score = ( | |
| step_abs_mean_pct * 0.60 | |
| + range_pct * 0.40 | |
| - (distance * 0.12) | |
| ) | |
| if texture_score > best_score: | |
| best_score = texture_score | |
| best_distance = distance | |
| best_steps = step_returns | |
| if best_steps is None: | |
| best_steps = averaged_steps | |
| best_distance = float(chosen[0][0]) | |
| blended_steps = (best_steps * 0.72) + (averaged_steps * 0.28) | |
| step_abs_mean_pct = float(np.mean(np.abs(blended_steps))) if len(blended_steps) else 0.0 | |
| simulated_prices = [baseline] | |
| price = baseline | |
| for step_return in blended_steps: | |
| price = max(0.0, price * (1.0 + (float(step_return) / 100.0))) | |
| simulated_prices.append(price) | |
| simulated_path = np.asarray(simulated_prices[1:], dtype=float) | |
| range_pct = _forecast_path_responsiveness(simulated_path, baseline)["range_pct"] if len(simulated_path) else 0.0 | |
| return { | |
| "step_returns_pct": blended_steps, | |
| "step_abs_mean_pct": round(step_abs_mean_pct, 4), | |
| "range_pct": round(float(range_pct), 4), | |
| "match_count": len(chosen), | |
| "signature_len": signature_len, | |
| "best_match_distance": round(best_distance, 4), | |
| } | |
| def _derive_target_amplitude_profile( | |
| ohlc4: np.ndarray, | |
| context_len: int, | |
| horizon: int, | |
| ) -> Dict[str, Any]: | |
| """Blend recent and regime-matched history into a target amplitude profile.""" | |
| recent_abs_step_pct = _recent_ohlc4_abs_step_pct(ohlc4, window=20) | |
| rolling_targets = _rolling_future_path_targets( | |
| ohlc4, | |
| horizon=horizon, | |
| max_windows=max(24, min(72, horizon * 6)), | |
| ) | |
| regime_targets = _regime_matched_future_targets( | |
| ohlc4, | |
| context_len=context_len, | |
| horizon=horizon, | |
| ) | |
| step_terms: List[Tuple[float, float]] = [] | |
| if recent_abs_step_pct > 0: | |
| step_terms.append((recent_abs_step_pct, 0.24)) | |
| if rolling_targets is not None: | |
| step_terms.append((float(rolling_targets["step_abs_mean_pct"]), 0.31)) | |
| if regime_targets is not None: | |
| step_terms.append((float(regime_targets["step_abs_mean_pct"]), 0.45)) | |
| if step_terms: | |
| target_step_abs_mean_pct = sum(value * weight for value, weight in step_terms) / sum( | |
| weight for _, weight in step_terms | |
| ) | |
| else: | |
| target_step_abs_mean_pct = 0.0 | |
| range_terms: List[Tuple[float, float]] = [] | |
| if rolling_targets is not None: | |
| range_terms.append((float(rolling_targets["range_pct"]), 0.42)) | |
| if regime_targets is not None: | |
| range_terms.append((float(regime_targets["range_pct"]), 0.58)) | |
| if not range_terms and target_step_abs_mean_pct > 0: | |
| fallback_range = target_step_abs_mean_pct * max(2.0, min(math.sqrt(max(horizon, 1)) * 1.6, 4.5)) | |
| range_terms.append((fallback_range, 1.0)) | |
| target_range_pct = sum(value * weight for value, weight in range_terms) / sum( | |
| weight for _, weight in range_terms | |
| ) if range_terms else 0.0 | |
| return { | |
| "target_step_abs_mean_pct": round(float(target_step_abs_mean_pct), 4), | |
| "target_range_pct": round(float(target_range_pct), 4), | |
| "recent_abs_step_pct": round(float(recent_abs_step_pct), 4), | |
| "rolling_targets": rolling_targets, | |
| "regime_targets": regime_targets, | |
| } | |
| def _build_raw_ohlc4_bundle( | |
| model_output: Dict[str, Any], | |
| last_ohlc4: float, | |
| ) -> Dict[str, Any]: | |
| """Build the OHLC4-path bundle directly from raw TimesFM output.""" | |
| raw_p10 = np.array(model_output["p10"], dtype=float) | |
| raw_p50 = np.array(model_output["p50"], dtype=float) | |
| raw_p90 = np.array(model_output["p90"], dtype=float) | |
| path_metrics = _forecast_path_metrics(raw_p50, last_ohlc4) | |
| avg_band_pct = float( | |
| np.mean((raw_p90 - raw_p10) / np.maximum(np.abs(raw_p50), 1e-8)) * 100.0 | |
| ) if len(raw_p50) else 0.0 | |
| band_certainty = math.exp(-avg_band_pct / 4.0) | |
| path_consistency = path_metrics["path_consistency"] / 100.0 | |
| monotonicity = path_metrics["monotonicity"] / 100.0 | |
| move_pct = abs(path_metrics["final_return_pct"]) | |
| confidence = ( | |
| 28.0 | |
| + band_certainty * 34.0 | |
| + path_consistency * 18.0 | |
| + monotonicity * 12.0 | |
| + min(move_pct, 4.0) * 2.0 | |
| ) | |
| confidence = max(20.0, min(95.0, confidence)) | |
| final_sign = 0 if abs(path_metrics["final_return_pct"]) < 0.05 else (1 if path_metrics["final_return_pct"] > 0 else -1) | |
| weighted_sign = 0 if abs(path_metrics["weighted_return_pct"]) < 0.05 else (1 if path_metrics["weighted_return_pct"] > 0 else -1) | |
| agreement = final_sign == 0 or weighted_sign == 0 or final_sign == weighted_sign | |
| return { | |
| "p10": raw_p10, | |
| "p50": raw_p50, | |
| "p90": raw_p90, | |
| "model_weight": 1.0, | |
| "anchor_weight": 0.0, | |
| "agreement": agreement, | |
| "scale": 1.0, | |
| "confidence": round(confidence, 2), | |
| "model_bias_pct": 0.0, | |
| "path_metrics": path_metrics, | |
| "mode": "raw_timesfm_ohlc4", | |
| } | |
| def _apply_forecast_amplitude_calibration( | |
| raw_bundle: Dict[str, Any], | |
| last_ohlc4: float, | |
| target_profile: Dict[str, Any], | |
| ) -> Tuple[Dict[str, Any], Dict[str, Any]]: | |
| """Scale raw TimesFM OHLC4 paths so their step/range amplitude stays realistic.""" | |
| raw_p10 = np.asarray(raw_bundle.get("p10", []), dtype=float) | |
| raw_p50 = np.asarray(raw_bundle.get("p50", []), dtype=float) | |
| raw_p90 = np.asarray(raw_bundle.get("p90", []), dtype=float) | |
| raw_responsiveness = _forecast_path_responsiveness(raw_p50, last_ohlc4) | |
| raw_step = max(float(raw_responsiveness["step_abs_mean_pct"]), 1e-6) | |
| raw_range = max(float(raw_responsiveness["range_pct"]), 1e-6) | |
| target_step = max(float(target_profile.get("target_step_abs_mean_pct") or 0.0), 0.0) | |
| target_range = max(float(target_profile.get("target_range_pct") or 0.0), 0.0) | |
| ratios: List[Tuple[float, float]] = [] | |
| if target_step > 0: | |
| ratios.append((target_step / raw_step, 0.62)) | |
| if target_range > 0: | |
| ratios.append((target_range / raw_range, 0.38)) | |
| if ratios: | |
| log_scale = sum(weight * math.log(max(ratio, 1e-6)) for ratio, weight in ratios) / sum( | |
| weight for _, weight in ratios | |
| ) | |
| scale = math.exp(log_scale) | |
| else: | |
| scale = 1.0 | |
| scale = float(np.clip(scale, 0.82, 2.35)) | |
| if raw_step >= target_step * 0.92 and raw_range >= target_range * 0.88: | |
| scale = float(np.clip(scale, 0.9, 1.25)) | |
| calibrated_p10 = np.maximum(0.0, last_ohlc4 + ((raw_p10 - last_ohlc4) * scale)) | |
| calibrated_p50 = np.maximum(0.0, last_ohlc4 + ((raw_p50 - last_ohlc4) * scale)) | |
| calibrated_p90 = np.maximum(0.0, last_ohlc4 + ((raw_p90 - last_ohlc4) * scale)) | |
| calibrated_bundle = _build_raw_ohlc4_bundle( | |
| { | |
| "p10": calibrated_p10, | |
| "p50": calibrated_p50, | |
| "p90": calibrated_p90, | |
| }, | |
| last_ohlc4, | |
| ) | |
| calibrated_bundle["mode"] = "timesfm_ohlc4_vol_calibrated" | |
| calibration_meta = { | |
| "enabled": True, | |
| "version": "volatility_regime_v1", | |
| "scale": round(scale, 4), | |
| "raw_path": raw_responsiveness, | |
| "calibrated_path": _forecast_path_responsiveness(calibrated_p50, last_ohlc4), | |
| "targets": target_profile, | |
| } | |
| return calibrated_bundle, calibration_meta | |
| def _apply_forecast_path_texture( | |
| base_bundle: Dict[str, Any], | |
| last_ohlc4: float, | |
| target_profile: Dict[str, Any], | |
| texture_template: Optional[Dict[str, Any]], | |
| ) -> Tuple[Dict[str, Any], Dict[str, Any]]: | |
| """Inject regime-matched step texture so each future point is updated sequentially.""" | |
| base_p10 = np.asarray(base_bundle.get("p10", []), dtype=float) | |
| base_p50 = np.asarray(base_bundle.get("p50", []), dtype=float) | |
| base_p90 = np.asarray(base_bundle.get("p90", []), dtype=float) | |
| base_path = _forecast_path_responsiveness(base_p50, last_ohlc4) | |
| texture_meta: Dict[str, Any] = { | |
| "enabled": True, | |
| "version": "historical_step_texture_v2", | |
| "applied": False, | |
| "blend_alpha": 0.0, | |
| "base_path": base_path, | |
| "textured_path": base_path, | |
| "template": { | |
| "match_count": 0, | |
| "signature_len": 0, | |
| "step_abs_mean_pct": 0.0, | |
| "range_pct": 0.0, | |
| }, | |
| } | |
| if texture_template is None: | |
| return base_bundle, texture_meta | |
| template_steps = np.asarray(texture_template.get("step_returns_pct", []), dtype=float) | |
| if len(template_steps) != len(base_p50): | |
| return base_bundle, texture_meta | |
| template_step = max(float(texture_template.get("step_abs_mean_pct") or 0.0), 1e-6) | |
| template_range = max(float(texture_template.get("range_pct") or 0.0), 1e-6) | |
| target_step = max(float(target_profile.get("target_step_abs_mean_pct") or 0.0), 0.0) | |
| target_range = max(float(target_profile.get("target_range_pct") or 0.0), 0.0) | |
| base_step = float(base_path["step_abs_mean_pct"]) | |
| base_range = float(base_path["range_pct"]) | |
| base_monotonicity = float(base_bundle.get("path_metrics", {}).get("monotonicity", 50.0)) | |
| step_gap = max((target_step * 0.94) - base_step, 0.0) | |
| range_gap = max((target_range * 0.78) - base_range, 0.0) | |
| if base_monotonicity >= 88.0: | |
| step_gap = max(step_gap, target_step * 0.16) | |
| range_gap = max(range_gap, target_range * 0.10) | |
| if step_gap <= 0.0 and range_gap <= 0.0: | |
| texture_meta["template"] = { | |
| "match_count": int(texture_template.get("match_count") or 0), | |
| "signature_len": int(texture_template.get("signature_len") or 0), | |
| "step_abs_mean_pct": round(float(texture_template.get("step_abs_mean_pct") or 0.0), 4), | |
| "range_pct": round(float(texture_template.get("range_pct") or 0.0), 4), | |
| } | |
| return base_bundle, texture_meta | |
| ratios: List[float] = [] | |
| if step_gap > 0.0: | |
| ratios.append(step_gap / template_step) | |
| if range_gap > 0.0: | |
| ratios.append(range_gap / template_range) | |
| blend_alpha = float(np.clip(np.median(ratios) if ratios else 0.0, 0.0, 0.82)) | |
| def _count_direction_flips(series: np.ndarray) -> int: | |
| if len(series) < 2: | |
| return 0 | |
| signs = [int(np.sign(delta)) for delta in series if abs(delta) >= 0.015] | |
| return sum(1 for prev, curr in zip(signs[:-1], signs[1:]) if prev != curr) | |
| template_flips = _count_direction_flips(template_steps) | |
| if base_monotonicity >= 92.0 and template_flips >= 1: | |
| blend_alpha = max(blend_alpha, 0.28) | |
| def _step_returns_from_path(path: np.ndarray, anchor_price: float) -> np.ndarray: | |
| prev_values = np.concatenate(([anchor_price], path[:-1])) | |
| return ((path / np.maximum(np.abs(prev_values), 1e-8)) - 1.0) * 100.0 | |
| def _rebuild_from_step_returns(path: np.ndarray, step_returns_pct: np.ndarray) -> np.ndarray: | |
| rebuilt: List[float] = [] | |
| prev_price = float(last_ohlc4) | |
| for raw_price, textured_step in zip(path, step_returns_pct): | |
| raw_step = ((float(raw_price) / max(abs(prev_price), 1e-8)) - 1.0) * 100.0 | |
| raw_factor = max(1e-6, 1.0 + (raw_step / 100.0)) | |
| target_factor = max(1e-6, 1.0 + (float(textured_step) / 100.0)) | |
| factor_ratio = target_factor / raw_factor | |
| next_price = max(0.0, float(raw_price) * factor_ratio) | |
| rebuilt.append(next_price) | |
| prev_price = next_price | |
| return np.asarray(rebuilt, dtype=float) | |
| base_steps_p50 = _step_returns_from_path(base_p50, last_ohlc4) | |
| textured_steps_p50 = (base_steps_p50 * (1.0 - blend_alpha)) + (template_steps * blend_alpha) | |
| if base_monotonicity >= 92.0 and template_flips >= 1: | |
| textured_flips = _count_direction_flips(textured_steps_p50) | |
| while textured_flips < 1 and blend_alpha < 0.92: | |
| blend_alpha = min(blend_alpha * 1.16, 0.92) | |
| textured_steps_p50 = (base_steps_p50 * (1.0 - blend_alpha)) + (template_steps * blend_alpha) | |
| textured_flips = _count_direction_flips(textured_steps_p50) | |
| textured_p10 = _rebuild_from_step_returns(base_p10, textured_steps_p50) | |
| textured_p50 = _rebuild_from_step_returns(base_p50, textured_steps_p50) | |
| textured_p90 = _rebuild_from_step_returns(base_p90, textured_steps_p50) | |
| textured_bundle = _build_raw_ohlc4_bundle( | |
| { | |
| "p10": textured_p10, | |
| "p50": textured_p50, | |
| "p90": textured_p90, | |
| }, | |
| last_ohlc4, | |
| ) | |
| textured_path = _forecast_path_responsiveness( | |
| np.asarray(textured_bundle.get("p50", []), dtype=float), | |
| last_ohlc4, | |
| ) | |
| if target_step > 0.0 and float(textured_path["step_abs_mean_pct"]) > target_step * 1.28: | |
| shrink = (target_step * 1.28) / max(float(textured_path["step_abs_mean_pct"]), 1e-6) | |
| blend_alpha *= shrink | |
| textured_steps_p50 = (base_steps_p50 * (1.0 - blend_alpha)) + (template_steps * blend_alpha) | |
| textured_p10 = _rebuild_from_step_returns(base_p10, textured_steps_p50) | |
| textured_p50 = _rebuild_from_step_returns(base_p50, textured_steps_p50) | |
| textured_p90 = _rebuild_from_step_returns(base_p90, textured_steps_p50) | |
| textured_bundle = _build_raw_ohlc4_bundle( | |
| { | |
| "p10": textured_p10, | |
| "p50": textured_p50, | |
| "p90": textured_p90, | |
| }, | |
| last_ohlc4, | |
| ) | |
| textured_path = _forecast_path_responsiveness( | |
| np.asarray(textured_bundle.get("p50", []), dtype=float), | |
| last_ohlc4, | |
| ) | |
| textured_bundle["mode"] = "timesfm_ohlc4_textured" | |
| texture_meta.update( | |
| { | |
| "applied": blend_alpha > 0.0, | |
| "blend_alpha": round(blend_alpha, 4), | |
| "textured_path": textured_path, | |
| "template": { | |
| "match_count": int(texture_template.get("match_count") or 0), | |
| "signature_len": int(texture_template.get("signature_len") or 0), | |
| "step_abs_mean_pct": round(float(texture_template.get("step_abs_mean_pct") or 0.0), 4), | |
| "range_pct": round(float(texture_template.get("range_pct") or 0.0), 4), | |
| }, | |
| } | |
| ) | |
| return textured_bundle, texture_meta | |
| def _score_forecast_context_candidate( | |
| analysis_bundle: Dict[str, Any], | |
| last_ohlc4: float, | |
| recent_abs_step_pct: float, | |
| ) -> Dict[str, float]: | |
| """Prefer contexts that stay confident without collapsing into a near-flat path.""" | |
| p50_path = np.asarray(analysis_bundle.get("p50", []), dtype=float) | |
| responsiveness = _forecast_path_responsiveness(p50_path, last_ohlc4) | |
| step_abs_mean_pct = float(responsiveness["step_abs_mean_pct"]) | |
| range_pct = float(responsiveness["range_pct"]) | |
| step_floor = max(0.015, recent_abs_step_pct * 0.18) | |
| range_floor = max(0.05, recent_abs_step_pct * 0.70) | |
| step_bonus = min(step_abs_mean_pct / step_floor, 1.0) * 14.0 | |
| range_bonus = min(range_pct / range_floor, 1.0) * 10.0 | |
| flat_penalty = ( | |
| 6.0 | |
| if step_abs_mean_pct < (step_floor * 0.55) and range_pct < (range_floor * 0.55) | |
| else 0.0 | |
| ) | |
| confidence = float(analysis_bundle.get("confidence", 0.0)) | |
| score = confidence + step_bonus + range_bonus - flat_penalty | |
| return { | |
| "score": round(score, 4), | |
| "confidence": round(confidence, 4), | |
| "step_floor": round(step_floor, 4), | |
| "range_floor": round(range_floor, 4), | |
| **responsiveness, | |
| } | |
| def _calc_vote_gauge(buy: int, sell: int, neutral: int) -> float: | |
| """Equal-weight PTKT vote gauge from Buy / Sell / Neutral counts.""" | |
| buy_f = float(max(0, buy)) | |
| sell_f = float(max(0, sell)) | |
| neutral_f = float(max(0, neutral)) | |
| denominator = buy_f + sell_f + neutral_f | |
| if denominator <= 0: | |
| return 50.0 | |
| raw_gauge = 50.0 + 50.0 * ((buy_f - sell_f) / denominator) | |
| return max(0.0, min(100.0, raw_gauge)) | |
| def _calc_adx_trend_activation(adx_value: Optional[float] = None) -> float: | |
| """Sigmoid ADX factor that activates PTKT votes only when trend strength is present.""" | |
| adx_numeric = float(adx_value) if adx_value is not None else 0.0 | |
| if math.isnan(adx_numeric): | |
| adx_numeric = 0.0 | |
| adx_numeric = max(0.0, adx_numeric) | |
| exponent = -((adx_numeric - 20.0) / 8.0) | |
| return float(_clamp(1.0 / (1.0 + math.exp(exponent)), 0.0, 1.0)) | |
| def _calc_vote_participation_factor(buy: int, sell: int, neutral: int) -> float: | |
| """Reward PTKT gauge only when enough signals are directional instead of neutral.""" | |
| buy_f = float(max(0, buy)) | |
| sell_f = float(max(0, sell)) | |
| neutral_f = float(max(0, neutral)) | |
| denominator = buy_f + sell_f + neutral_f | |
| if denominator <= 0: | |
| return 0.0 | |
| return float(_clamp((buy_f + sell_f) / denominator, 0.0, 1.0)) | |
| def _calc_adx_weighted_technical_gauge( | |
| buy: int, | |
| sell: int, | |
| neutral: int, | |
| adx_value: Optional[float] = None, | |
| ) -> float: | |
| """Final PTKT gauge using the sigmoid ADX-weighted vote formula.""" | |
| buy_f = float(max(0, buy)) | |
| sell_f = float(max(0, sell)) | |
| neutral_f = float(max(0, neutral)) | |
| denominator = buy_f + sell_f + neutral_f | |
| if denominator <= 0: | |
| return 50.0 | |
| trend_activation = _calc_adx_trend_activation(adx_value) | |
| participation_factor = _calc_vote_participation_factor(buy, sell, neutral) | |
| vote_balance = (buy_f - sell_f) / denominator | |
| raw_gauge = 50.0 + (50.0 * vote_balance * participation_factor * trend_activation) | |
| return float(_clamp(raw_gauge, 0.0, 100.0)) | |
| def _vote_direction_from_counts(buy: int, sell: int) -> int: | |
| if buy > sell: | |
| return 1 | |
| if sell > buy: | |
| return -1 | |
| return 0 | |
| def _candle_ohlc4(candle: Dict[str, Any]) -> Optional[float]: | |
| try: | |
| open_value = float(candle["open"]) | |
| high_value = float(candle["high"]) | |
| low_value = float(candle["low"]) | |
| close_value = float(candle["close"]) | |
| except (KeyError, TypeError, ValueError): | |
| return None | |
| values = (open_value, high_value, low_value, close_value) | |
| if not all(math.isfinite(value) for value in values): | |
| return None | |
| return sum(values) / 4.0 | |
| def _performance_action(change_pct: Optional[float], *, included_in_vote: bool) -> str: | |
| if not included_in_vote or change_pct is None: | |
| return "N/A" | |
| if change_pct > PERFORMANCE_NEUTRAL_EPSILON_PCT: | |
| return "Mua" | |
| if change_pct < -PERFORMANCE_NEUTRAL_EPSILON_PCT: | |
| return "BΓ‘n" | |
| return "Trung lαΊp" | |
| def _get_performance_vote_weight(lookback: Any) -> float: | |
| try: | |
| return float(PERFORMANCE_VOTE_WEIGHTS.get(int(lookback), 1.0)) | |
| except (TypeError, ValueError): | |
| return 1.0 | |
| def _build_performance_rows(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| latest_ohlc4 = _candle_ohlc4(data[-1]) if data else None | |
| rows: List[Dict[str, Any]] = [] | |
| for lookback in PERFORMANCE_LOOKBACKS: | |
| reference_index = len(data) - 1 - lookback | |
| reference_ohlc4 = _candle_ohlc4(data[reference_index]) if reference_index >= 0 else None | |
| included_in_vote = ( | |
| latest_ohlc4 is not None | |
| and reference_ohlc4 is not None | |
| and abs(reference_ohlc4) > 1e-12 | |
| ) | |
| change_pct = ( | |
| ((latest_ohlc4 / reference_ohlc4) - 1.0) * 100.0 | |
| if included_in_vote and latest_ohlc4 is not None and reference_ohlc4 is not None | |
| else None | |
| ) | |
| rows.append( | |
| { | |
| "key": f"performance_{lookback}", | |
| "name": f"Hiα»u suαΊ₯t {lookback} nαΊΏn", | |
| "lookback": lookback, | |
| "current_ohlc4": latest_ohlc4, | |
| "reference_ohlc4": reference_ohlc4, | |
| "change_pct": change_pct, | |
| "display_value": ( | |
| f"{float(latest_ohlc4):.4f} vs {float(reference_ohlc4):.4f} ({float(change_pct):+.2f}%)" | |
| if included_in_vote and change_pct is not None | |
| else "N/A" | |
| ), | |
| "action": _performance_action(change_pct, included_in_vote=included_in_vote), | |
| "included_in_vote": included_in_vote, | |
| } | |
| ) | |
| return rows | |
| def _calc_osc_score(osc_data: list, interval: str = "1h") -> dict: | |
| """Summarize oscillator votes for the PTKT classification table.""" | |
| buy = sell = neutral = 0 | |
| buy_weight = sell_weight = neutral_weight = 0.0 | |
| for item in osc_data: | |
| key = item.get("key") or _extract_osc_key(item["name"]) | |
| w = OSC_WEIGHTS.get(key, 1.0) | |
| act = str(item.get("action", "")).strip().lower() | |
| if act == "mua": | |
| buy += 1 | |
| buy_weight += w | |
| elif act == "bΓ‘n": | |
| sell += 1 | |
| sell_weight += w | |
| else: | |
| neutral += 1 | |
| neutral_weight += w | |
| return { | |
| "signal": _gauge_to_signal(_calc_vote_gauge(buy, sell, neutral), interval), | |
| "buy": buy, "sell": sell, "neutral": neutral, | |
| "buy_weight": round(buy_weight, 2), | |
| "sell_weight": round(sell_weight, 2), | |
| "neutral_weight": round(neutral_weight, 2), | |
| "formula_version": TECHNICAL_SCORING_VERSION, | |
| } | |
| def _calc_ma_score(ma_data: list, closes: np.ndarray, interval: str = "1h") -> dict: | |
| """Summarize moving-average votes for the PTKT classification table.""" | |
| buy = sell = neutral = 0 | |
| buy_weight = sell_weight = neutral_weight = 0.0 | |
| for item in ma_data: | |
| w = _get_ma_weight(item.get("key") or item["name"]) | |
| act = str(item.get("action", "")).strip().lower() | |
| if act == "mua": | |
| buy += 1 | |
| buy_weight += w | |
| elif act == "bΓ‘n": | |
| sell += 1 | |
| sell_weight += w | |
| else: | |
| neutral += 1 | |
| neutral_weight += w | |
| ema50 = _ema(closes, 50)[-1] if len(closes) >= 50 else float("nan") | |
| ema200 = _ema(closes, 200)[-1] if len(closes) >= 200 else float("nan") | |
| return { | |
| "signal": _gauge_to_signal(_calc_vote_gauge(buy, sell, neutral), interval), | |
| "buy": buy, "sell": sell, "neutral": neutral, | |
| "buy_weight": round(buy_weight, 2), | |
| "sell_weight": round(sell_weight, 2), | |
| "neutral_weight": round(neutral_weight, 2), | |
| "structure_bonus": 0.0, | |
| "golden_cross": bool(not any(math.isnan(x) for x in (ema50, ema200)) and ema50 > ema200), | |
| "death_cross": bool(not any(math.isnan(x) for x in (ema50, ema200)) and ema50 < ema200), | |
| "formula_version": TECHNICAL_SCORING_VERSION, | |
| } | |
| def _calc_performance_score(performance_data: list, interval: str = "1h") -> dict: | |
| """Summarize OHLC4 performance votes across configured lookback candles.""" | |
| buy = sell = neutral = 0 | |
| buy_weight = sell_weight = neutral_weight = 0.0 | |
| available = missing = 0 | |
| for item in performance_data: | |
| if not bool(item.get("included_in_vote")): | |
| missing += 1 | |
| continue | |
| available += 1 | |
| vote_weight = _get_performance_vote_weight(item.get("lookback")) | |
| act = str(item.get("action", "")).strip().lower() | |
| if act == "mua": | |
| buy += int(vote_weight) | |
| buy_weight += vote_weight | |
| elif act == "bΓ‘n": | |
| sell += int(vote_weight) | |
| sell_weight += vote_weight | |
| else: | |
| neutral += int(vote_weight) | |
| neutral_weight += vote_weight | |
| return { | |
| "signal": _gauge_to_signal(_calc_vote_gauge(buy, sell, neutral), interval), | |
| "buy": buy, | |
| "sell": sell, | |
| "neutral": neutral, | |
| "buy_weight": round(buy_weight, 2), | |
| "sell_weight": round(sell_weight, 2), | |
| "neutral_weight": round(neutral_weight, 2), | |
| "available": available, | |
| "missing": missing, | |
| "lookbacks": list(PERFORMANCE_LOOKBACKS), | |
| "formula_version": TECHNICAL_SCORING_VERSION, | |
| } | |
| def _calc_ai_forecast_score( | |
| blended: dict, | |
| forecast_rows: List[Dict[str, Any]], | |
| last_close: float, | |
| indicators: dict, | |
| horizon: int, | |
| interval: str, | |
| model_reference_price: Optional[float] = None, | |
| ) -> dict: | |
| """MODULE 3: Score one AI forecast from the full path and band discipline.""" | |
| atr_pct = max(float(indicators.get("atr", {}).get("pct") or 0.0), 0.1) | |
| p50_path = np.array(blended.get("p50", []), dtype=float) | |
| p10_path = np.array(blended.get("p10", []), dtype=float) | |
| p90_path = np.array(blended.get("p90", []), dtype=float) | |
| if not len(p50_path): | |
| p50_path = np.array([last_close], dtype=float) | |
| if not len(p10_path): | |
| p10_path = np.array([last_close], dtype=float) | |
| if not len(p90_path): | |
| p90_path = np.array([last_close], dtype=float) | |
| series_reference_price = float(model_reference_price or last_close) | |
| path_metrics = _forecast_path_metrics(p50_path, series_reference_price) | |
| final_ret_pct = path_metrics["final_return_pct"] | |
| weighted_ret_pct = path_metrics["weighted_return_pct"] | |
| market_return_pct = _pct(float(p50_path[-1]), last_close) if len(p50_path) else 0.0 | |
| path_consistency = _clamp(path_metrics["path_consistency"] / 100.0, 0.0, 1.0) | |
| monotonicity = _clamp(path_metrics["monotonicity"] / 100.0, 0.0, 1.0) | |
| adverse_excursion_pct = path_metrics["max_adverse_excursion_pct"] | |
| band_profile = _forecast_band_profile(p10_path, p50_path, p90_path) | |
| direction_vote_threshold = max(atr_pct * 0.04, 0.04) | |
| def _direction_vote(value_pct: float) -> int: | |
| if abs(value_pct) < direction_vote_threshold: | |
| return 0 | |
| return 1 if value_pct > 0 else -1 | |
| direction_votes = [ | |
| _direction_vote(weighted_ret_pct), | |
| _direction_vote(final_ret_pct), | |
| _direction_vote(market_return_pct), | |
| ] | |
| bullish_votes = sum(1 for vote in direction_votes if vote > 0) | |
| bearish_votes = sum(1 for vote in direction_votes if vote < 0) | |
| direction_vote_balance = ( | |
| (bullish_votes - bearish_votes) / max(1, len(direction_votes)) | |
| ) | |
| direction_bias_sign = 0.0 | |
| if direction_vote_balance > 0: | |
| direction_bias_sign = 1.0 | |
| elif direction_vote_balance < 0: | |
| direction_bias_sign = -1.0 | |
| elif weighted_ret_pct > 0: | |
| direction_bias_sign = 1.0 | |
| elif weighted_ret_pct < 0: | |
| direction_bias_sign = -1.0 | |
| avg_band_tightness = math.exp( | |
| -float(band_profile["avg_band_pct"]) / max(atr_pct * 3.0, 0.75) | |
| ) | |
| end_band_tightness = math.exp( | |
| -float(band_profile["end_band_pct"]) / max(atr_pct * 3.4, 0.85) | |
| ) | |
| band_stability_score = math.exp( | |
| -( | |
| (float(band_profile["band_stability_pct"]) / max(atr_pct * 1.8, 0.35)) | |
| + (float(band_profile["band_step_change_pct"]) / max(atr_pct * 1.2, 0.25)) | |
| ) | |
| * 0.75 | |
| ) | |
| adverse_control_score = math.exp( | |
| -adverse_excursion_pct / max(atr_pct * 2.5, 0.8) | |
| ) | |
| certainty_score = ( | |
| avg_band_tightness * 0.38 + | |
| end_band_tightness * 0.16 + | |
| band_stability_score * 0.16 + | |
| path_consistency * 0.15 + | |
| monotonicity * 0.10 + | |
| adverse_control_score * 0.05 | |
| ) | |
| certainty_score = _clamp(certainty_score, 0.08, 0.98) | |
| directional_edge_pct = (weighted_ret_pct * 0.72) + (final_ret_pct * 0.28) | |
| direction_norm = np.tanh(directional_edge_pct / max(atr_pct * 1.20, 0.28)) | |
| weighted_move_in_atr = abs(weighted_ret_pct) / max(atr_pct, 0.1) | |
| final_move_in_atr = abs(final_ret_pct) / max(atr_pct, 0.1) | |
| path_move_score = np.tanh(((weighted_move_in_atr * 0.65) + (final_move_in_atr * 0.35)) / 1.7) | |
| path_quality = 0.48 + 0.30 * path_consistency + 0.22 * monotonicity | |
| certainty_factor = 0.72 + 0.28 * certainty_score | |
| direction_consensus = _clamp( | |
| (abs(direction_vote_balance) * 0.42) + | |
| (path_consistency * 0.36) + | |
| (monotonicity * 0.22), | |
| 0.0, | |
| 1.0, | |
| ) | |
| direction_core_push = ( | |
| direction_norm | |
| * (18.0 + 16.0 * path_move_score) | |
| * (0.60 + 0.40 * path_quality) | |
| ) | |
| micro_consensus_push = 0.0 | |
| if direction_bias_sign != 0.0 and abs(direction_norm) < 0.18 and abs(direction_vote_balance) >= 0.66: | |
| remaining_room = (0.18 - abs(direction_norm)) / 0.18 | |
| micro_consensus_push = ( | |
| direction_bias_sign | |
| * (2.4 + 5.6 * direction_consensus) | |
| * remaining_room | |
| * (0.44 + 0.56 * certainty_score) | |
| ) | |
| direction_gauge = max( | |
| 8.0, | |
| min(92.0, 50.0 + direction_core_push + micro_consensus_push), | |
| ) | |
| directional_push = (direction_gauge - 50.0) * certainty_factor | |
| conviction_gauge = max(8.0, min(92.0, 50.0 + directional_push)) | |
| gauge = direction_gauge | |
| confidence_pct = _clamp((certainty_score * 0.65 + path_quality * 0.35) * 100.0, 12.0, 98.0) | |
| direction_label = "bullish" if gauge >= 58 else "bearish" if gauge <= 42 else "neutral" | |
| return { | |
| "gauge": round(gauge, 1), | |
| "direction_gauge": round(direction_gauge, 1), | |
| "conviction_gauge": round(conviction_gauge, 1), | |
| "normalized_score": round(_gauge_to_normalized_score(gauge), 4), | |
| "confidence_pct": round(confidence_pct, 1), | |
| "forecast_return_pct": round(market_return_pct, 2), | |
| "model_reference_return_pct": round(final_ret_pct, 2), | |
| "weighted_return_pct": round(weighted_ret_pct, 2), | |
| "direction": direction_label, | |
| "direction_consensus": round(direction_consensus * 100.0, 1), | |
| "direction_strength_pct": round(abs(direction_gauge - 50.0) * 2.0, 1), | |
| "magnitude_vs_atr": round(weighted_move_in_atr, 2), | |
| "band_uncertainty_pct": round(float(band_profile["avg_band_pct"]), 2), | |
| "band_uncertainty_end_pct": round(float(band_profile["end_band_pct"]), 2), | |
| "band_stability_pct": round(float(band_profile["band_stability_pct"]), 2), | |
| "certainty": round(certainty_score * 100.0, 1), | |
| "path_consistency": round(path_consistency * 100.0, 1), | |
| "monotonicity": round(monotonicity * 100.0, 1), | |
| "max_adverse_excursion_pct": round(adverse_excursion_pct, 2), | |
| "path_metrics": path_metrics, | |
| "signal": _gauge_to_signal(gauge, interval), | |
| "formula": "full_path_direction_gauge", | |
| "formula_version": AI_SCORING_VERSION, | |
| } | |
| def _calc_technical_score_v2( | |
| osc_score: dict, | |
| ma_score: dict, | |
| performance_score: dict, | |
| interval: str = "1h", | |
| adx_value: Optional[float] = None, | |
| ) -> dict: | |
| """Gauge PTKT follows the sigmoid ADX-weighted vote formula on total technical votes.""" | |
| buy = osc_score["buy"] + ma_score["buy"] + performance_score["buy"] | |
| sell = osc_score["sell"] + ma_score["sell"] + performance_score["sell"] | |
| neutral = osc_score["neutral"] + ma_score["neutral"] + performance_score["neutral"] | |
| adx_numeric = float(adx_value) if adx_value is not None else 0.0 | |
| if math.isnan(adx_numeric): | |
| adx_numeric = 0.0 | |
| adx_numeric = max(0.0, adx_numeric) | |
| trend_activation = _calc_adx_trend_activation(adx_numeric) | |
| vote_participation = _calc_vote_participation_factor(buy, sell, neutral) | |
| final_gauge = _calc_adx_weighted_technical_gauge(buy, sell, neutral, adx_numeric) | |
| osc_dir = _vote_direction_from_counts(int(osc_score["buy"]), int(osc_score["sell"])) | |
| ma_dir = _vote_direction_from_counts(int(ma_score["buy"]), int(ma_score["sell"])) | |
| performance_dir = _vote_direction_from_counts( | |
| int(performance_score["buy"]), | |
| int(performance_score["sell"]), | |
| ) | |
| active_group_directions = [ | |
| direction | |
| for direction in (osc_dir, ma_dir, performance_dir) | |
| if direction != 0 | |
| ] | |
| return { | |
| "gauge": round(final_gauge, 1), | |
| "normalized_score": round(_gauge_to_normalized_score(final_gauge), 4), | |
| "signal": _gauge_to_signal(final_gauge, interval), | |
| "buy": buy, | |
| "sell": sell, | |
| "neutral": neutral, | |
| "buy_weight": round( | |
| osc_score.get("buy_weight", 0.0) | |
| + ma_score.get("buy_weight", 0.0) | |
| + performance_score.get("buy_weight", 0.0), | |
| 2, | |
| ), | |
| "sell_weight": round( | |
| osc_score.get("sell_weight", 0.0) | |
| + ma_score.get("sell_weight", 0.0) | |
| + performance_score.get("sell_weight", 0.0), | |
| 2, | |
| ), | |
| "neutral_weight": round( | |
| osc_score.get("neutral_weight", 0.0) | |
| + ma_score.get("neutral_weight", 0.0) | |
| + performance_score.get("neutral_weight", 0.0), | |
| 2, | |
| ), | |
| "adx_period": ADX_GAUGE_PERIOD, | |
| "trend_strength_adx": round(adx_numeric, 2), | |
| "trend_activation": round(trend_activation, 4), | |
| "trend_multiplier": round(trend_activation, 4), | |
| "vote_participation": round(vote_participation, 4), | |
| "alignment": len(active_group_directions) >= 2 and len(set(active_group_directions)) == 1, | |
| "performance_available": int(performance_score.get("available", 0)), | |
| "performance_missing": int(performance_score.get("missing", 0)), | |
| "components": { | |
| "formula": "max(0, min(100, 50 + 50 * ((buy - sell) / total_votes) * ((buy + sell) / total_votes) * (1 / (1 + exp(-(adx10 - 20) / 8)))))", | |
| "formula_version": TECHNICAL_SCORING_VERSION, | |
| "categories": { | |
| "oscillators": osc_score.get("signal", "Trung lαΊp"), | |
| "moving_averages": ma_score.get("signal", "Trung lαΊp"), | |
| }, | |
| } | |
| } | |
| def _calc_technical_score_v2( | |
| osc_score: dict, | |
| ma_score: dict, | |
| performance_score: dict, | |
| interval: str = "1h", | |
| adx_value: Optional[float] = None, | |
| ) -> dict: | |
| """Gauge PTKT follows the sigmoid ADX-weighted vote formula on total technical votes.""" | |
| buy = osc_score["buy"] + ma_score["buy"] + performance_score["buy"] | |
| sell = osc_score["sell"] + ma_score["sell"] + performance_score["sell"] | |
| neutral = osc_score["neutral"] + ma_score["neutral"] + performance_score["neutral"] | |
| adx_numeric = float(adx_value) if adx_value is not None else 0.0 | |
| if math.isnan(adx_numeric): | |
| adx_numeric = 0.0 | |
| adx_numeric = max(0.0, adx_numeric) | |
| trend_activation = _calc_adx_trend_activation(adx_numeric) | |
| vote_participation = _calc_vote_participation_factor(buy, sell, neutral) | |
| final_gauge = _calc_adx_weighted_technical_gauge(buy, sell, neutral, adx_numeric) | |
| osc_dir = _vote_direction_from_counts(int(osc_score["buy"]), int(osc_score["sell"])) | |
| ma_dir = _vote_direction_from_counts(int(ma_score["buy"]), int(ma_score["sell"])) | |
| performance_dir = _vote_direction_from_counts( | |
| int(performance_score["buy"]), | |
| int(performance_score["sell"]), | |
| ) | |
| active_group_directions = [ | |
| direction | |
| for direction in (osc_dir, ma_dir, performance_dir) | |
| if direction != 0 | |
| ] | |
| return { | |
| "gauge": round(final_gauge, 1), | |
| "normalized_score": round(_gauge_to_normalized_score(final_gauge), 4), | |
| "signal": _gauge_to_signal(final_gauge, interval), | |
| "buy": buy, | |
| "sell": sell, | |
| "neutral": neutral, | |
| "buy_weight": round( | |
| osc_score.get("buy_weight", 0.0) | |
| + ma_score.get("buy_weight", 0.0) | |
| + performance_score.get("buy_weight", 0.0), | |
| 2, | |
| ), | |
| "sell_weight": round( | |
| osc_score.get("sell_weight", 0.0) | |
| + ma_score.get("sell_weight", 0.0) | |
| + performance_score.get("sell_weight", 0.0), | |
| 2, | |
| ), | |
| "neutral_weight": round( | |
| osc_score.get("neutral_weight", 0.0) | |
| + ma_score.get("neutral_weight", 0.0) | |
| + performance_score.get("neutral_weight", 0.0), | |
| 2, | |
| ), | |
| "adx_period": ADX_GAUGE_PERIOD, | |
| "trend_strength_adx": round(adx_numeric, 2), | |
| "trend_activation": round(trend_activation, 4), | |
| "trend_multiplier": round(trend_activation, 4), | |
| "vote_participation": round(vote_participation, 4), | |
| "alignment": len(active_group_directions) >= 2 and len(set(active_group_directions)) == 1, | |
| "performance_available": int(performance_score.get("available", 0)), | |
| "performance_missing": int(performance_score.get("missing", 0)), | |
| "components": { | |
| "formula": "max(0, min(100, 50 + 50 * ((buy - sell) / total_votes) * ((buy + sell) / total_votes) * (1 / (1 + exp(-(adx10 - 20) / 8)))))", | |
| "formula_version": TECHNICAL_SCORING_VERSION, | |
| "categories": { | |
| "oscillators": osc_score.get("signal", "Trung lαΊp"), | |
| "moving_averages": ma_score.get("signal", "Trung lαΊp"), | |
| "performance": performance_score.get("signal", "Trung lαΊp"), | |
| }, | |
| } | |
| } | |
| def _calc_summary_score_v2(tech_score: dict, ai_score: dict, interval: str = "1h") -> dict: | |
| """Final decision score = arithmetic mean of technical and AI gauges.""" | |
| tech_gauge = float(tech_score["gauge"]) | |
| ai_gauge = float(ai_score["gauge"]) | |
| tech_weight = 0.50 | |
| ai_weight = 0.50 | |
| final_gauge = max(5.0, min(95.0, (ai_gauge * ai_weight) + (tech_gauge * tech_weight))) | |
| tech_delta = tech_gauge - 50.0 | |
| ai_delta = ai_gauge - 50.0 | |
| tech_dir = 0 if abs(tech_delta) < 2.0 else (1 if tech_delta > 0 else -1) | |
| ai_dir = 0 if abs(ai_delta) < 2.0 else (1 if ai_delta > 0 else -1) | |
| dist = abs(final_gauge - 50.0) | |
| conviction = "R?t m?nh" if dist >= 25 else "M?nh" if dist >= 15 else "Trung b?nh" if dist >= 8 else "Y?u" | |
| bias = "neutral" | |
| if final_gauge >= 58: | |
| bias = "bullish" | |
| elif final_gauge <= 42: | |
| bias = "bearish" | |
| return { | |
| "gauge": round(final_gauge, 1), | |
| "normalized_score": round(_gauge_to_normalized_score(final_gauge), 4), | |
| "signal": _gauge_to_signal(final_gauge, interval), | |
| "conviction": conviction, | |
| "bias": bias, | |
| "agreement": tech_dir != 0 and tech_dir == ai_dir, | |
| "buy": tech_score["buy"], | |
| "sell": tech_score["sell"], | |
| "neutral": tech_score["neutral"], | |
| "buy_weight": tech_score.get("buy_weight", 0.0), | |
| "sell_weight": tech_score.get("sell_weight", 0.0), | |
| "neutral_weight": tech_score.get("neutral_weight", 0.0), | |
| "components": { | |
| "technical": round(tech_gauge, 1), | |
| "ai_forecast": round(ai_gauge, 1), | |
| "ai_weight": round(ai_weight, 2), | |
| "technical_weight": round(tech_weight, 2), | |
| "certainty_pct": round(float(ai_score.get("certainty", 0.0)), 1), | |
| "formula": "(technical_gauge + ai_gauge) / 2", | |
| "formula_version": AI_SCORING_VERSION, | |
| } | |
| } | |
| def _build_dashboard_payload( | |
| last_close: float, | |
| forecast_rows: List[Dict[str, Any]], | |
| technical_score: Dict[str, Any], | |
| ai_score: Dict[str, Any], | |
| summary: Dict[str, Any], | |
| ) -> Dict[str, Any]: | |
| """Single source of truth for hero gauges consumed by the frontend.""" | |
| forecast_end = last_close | |
| if forecast_rows: | |
| forecast_end = float(forecast_rows[-1].get("p50") or last_close) | |
| return { | |
| "technical": { | |
| "gauge": technical_score["gauge"], | |
| "normalized_score": technical_score["normalized_score"], | |
| "signal": technical_score["signal"], | |
| "buy": technical_score["buy"], | |
| "sell": technical_score["sell"], | |
| "neutral": technical_score["neutral"], | |
| "buy_weight": technical_score.get("buy_weight", 0.0), | |
| "sell_weight": technical_score.get("sell_weight", 0.0), | |
| "neutral_weight": technical_score.get("neutral_weight", 0.0), | |
| }, | |
| "ai": { | |
| "gauge": ai_score["gauge"], | |
| "direction_gauge": ai_score.get("direction_gauge", ai_score["gauge"]), | |
| "conviction_gauge": ai_score.get("conviction_gauge", ai_score["gauge"]), | |
| "normalized_score": ai_score["normalized_score"], | |
| "signal": ai_score["signal"], | |
| "forecast_return_pct": ai_score["forecast_return_pct"], | |
| "weighted_return_pct": ai_score["weighted_return_pct"], | |
| "confidence_pct": ai_score["confidence_pct"], | |
| "certainty": ai_score["certainty"], | |
| "direction_consensus": ai_score.get("direction_consensus", 0.0), | |
| "direction_strength_pct": ai_score.get("direction_strength_pct", 0.0), | |
| "path_consistency": ai_score["path_consistency"], | |
| "monotonicity": ai_score.get("monotonicity", 50.0), | |
| "max_adverse_excursion_pct": ai_score.get("max_adverse_excursion_pct", 0.0), | |
| "current_price": round(float(last_close), 6), | |
| "forecast_price": round(float(forecast_end), 6), | |
| }, | |
| "summary": { | |
| "gauge": summary["gauge"], | |
| "normalized_score": summary["normalized_score"], | |
| "signal": summary["signal"], | |
| "conviction": summary["conviction"], | |
| "bias": summary["bias"], | |
| "agreement": summary.get("agreement", False), | |
| "buy_weight": summary.get("buy_weight", 0.0), | |
| "sell_weight": summary.get("sell_weight", 0.0), | |
| "neutral_weight": summary.get("neutral_weight", 0.0), | |
| "components": summary.get("components", {}), | |
| }, | |
| } | |
| def _normalize_presentation_card_payload( | |
| card: Optional[Dict[str, Any]], | |
| *, | |
| prefer_direction_gauge: bool = False, | |
| ) -> Dict[str, Any]: | |
| source = dict(card or {}) | |
| gauge_candidates: List[Any] = [] | |
| if prefer_direction_gauge: | |
| gauge_candidates.append(source.get("direction_gauge")) | |
| gauge_candidates.append(source.get("gauge")) | |
| gauge_value = 50.0 | |
| for candidate in gauge_candidates: | |
| try: | |
| numeric = float(candidate) | |
| except (TypeError, ValueError): | |
| continue | |
| if math.isfinite(numeric): | |
| gauge_value = numeric | |
| break | |
| source["gauge"] = round(float(gauge_value), 1) | |
| signal = source.get("signal") | |
| if not isinstance(signal, str) or not signal.strip(): | |
| source["signal"] = "--" | |
| return source | |
| def _build_ai_aggregation_message( | |
| requested_count: int, | |
| ready_count: int, | |
| failed_model_keys: List[str], | |
| complete: bool, | |
| *, | |
| ai_available: bool, | |
| summary_available: bool, | |
| ) -> str: | |
| if requested_count <= 0: | |
| if ai_available and summary_available: | |
| return "AI forecast san sang" | |
| return "Can chay du bao AI de tong hop" | |
| if failed_model_keys: | |
| failed_labels = ", ".join(str(model_key) for model_key in failed_model_keys) | |
| return f"Dang tong hop {ready_count}/{requested_count} model - loi: {failed_labels}" | |
| if not complete: | |
| return f"Dang tong hop {ready_count}/{requested_count} model AI" | |
| if requested_count == 1 and ai_available and summary_available: | |
| return "AI forecast san sang" | |
| return f"Trung binh cong {requested_count} model AI" | |
| def _build_analysis_presentation( | |
| technical_score: Optional[Dict[str, Any]], | |
| ai_score: Optional[Dict[str, Any]], | |
| summary: Optional[Dict[str, Any]], | |
| *, | |
| requested_models: Optional[Dict[str, bool]] = None, | |
| active_model_keys: Optional[List[str]] = None, | |
| failed_model_keys: Optional[List[str]] = None, | |
| pending_model_keys: Optional[List[str]] = None, | |
| message: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| technical_card = _normalize_presentation_card_payload(technical_score) | |
| ai_card = _normalize_presentation_card_payload( | |
| ai_score, | |
| prefer_direction_gauge=True, | |
| ) | |
| summary_card = _normalize_presentation_card_payload(summary) | |
| requested_payload = ( | |
| {model_key: bool(enabled) for model_key, enabled in requested_models.items()} | |
| if isinstance(requested_models, dict) | |
| else {} | |
| ) | |
| enabled_keys = [ | |
| model_key for model_key, enabled in requested_payload.items() if enabled | |
| ] | |
| ready_keys = list(active_model_keys or []) | |
| failed_keys = list(failed_model_keys or []) | |
| if pending_model_keys is None: | |
| pending_keys = [ | |
| model_key | |
| for model_key in enabled_keys | |
| if model_key not in ready_keys and model_key not in failed_keys | |
| ] | |
| else: | |
| pending_keys = list(pending_model_keys) | |
| ai_available = isinstance(ai_score, dict) and bool(ai_score) | |
| summary_available = isinstance(summary, dict) and bool(summary) | |
| if enabled_keys: | |
| requested_count = len(enabled_keys) | |
| elif ai_available or summary_available: | |
| requested_count = max(1, len(ready_keys) + len(failed_keys) + len(pending_keys)) | |
| else: | |
| requested_count = 0 | |
| ready_count = len(ready_keys) if ready_keys else (1 if ai_available and summary_available else 0) | |
| finalized = not pending_keys | |
| complete = bool( | |
| ai_available | |
| and summary_available | |
| and not failed_keys | |
| and not pending_keys | |
| and ( | |
| requested_count == 0 | |
| or ready_count >= requested_count | |
| ) | |
| ) | |
| commit_ready = bool(finalized and ready_count > 0) | |
| ai_ready = commit_ready and ai_available | |
| summary_ready = commit_ready and summary_available | |
| combo_active = bool( | |
| ai_ready | |
| and summary_ready | |
| and ( | |
| summary_card.get("agreement") | |
| or ( | |
| abs(float(technical_card.get("gauge", 50.0)) - 50.0) > 6.0 | |
| and abs(float(ai_card.get("gauge", 50.0)) - 50.0) > 6.0 | |
| and math.copysign(1.0, float(technical_card.get("gauge", 50.0)) - 50.0) | |
| == math.copysign(1.0, float(ai_card.get("gauge", 50.0)) - 50.0) | |
| ) | |
| ) | |
| ) | |
| message_text = message or _build_ai_aggregation_message( | |
| requested_count, | |
| ready_count, | |
| failed_keys, | |
| complete, | |
| ai_available=ai_available, | |
| summary_available=summary_available, | |
| ) | |
| if ai_ready: | |
| ai_state = "ready" | |
| elif failed_keys and ready_count <= 0: | |
| ai_state = "error" | |
| elif requested_count > 0: | |
| ai_state = "loading" | |
| else: | |
| ai_state = "idle" | |
| if summary_ready: | |
| summary_state = "ready" | |
| elif failed_keys and ready_count <= 0: | |
| summary_state = "error" | |
| elif requested_count > 0: | |
| summary_state = "loading" | |
| else: | |
| summary_state = "idle" | |
| return { | |
| "version": ANALYSIS_PRESENTATION_VERSION, | |
| "technical": technical_card, | |
| "ai": ai_card, | |
| "summary": summary_card, | |
| "aggregation": { | |
| "requested": requested_payload, | |
| "enabled_keys": enabled_keys, | |
| "ready_keys": ready_keys, | |
| "failed_keys": failed_keys, | |
| "pending_keys": pending_keys, | |
| "complete": complete, | |
| "finalized": finalized, | |
| "commit_ready": commit_ready, | |
| "ready_count": ready_count, | |
| "requested_count": requested_count, | |
| }, | |
| "card_states": { | |
| "technical": "ready" if technical_score else "idle", | |
| "ai": ai_state, | |
| "summary": summary_state, | |
| }, | |
| "ai_ready": ai_ready, | |
| "summary_ready": summary_ready, | |
| "combo_active": combo_active, | |
| "message": message_text, | |
| } | |
| def _build_trade_analysis( | |
| symbol: str, interval: str, data: List[Dict[str, Any]], indicators: Dict[str, Any], | |
| forecast_rows: List[Dict[str, Any]], confidence: float, source: str, | |
| blended: Optional[Dict[str, Any]] = None, | |
| forecast_reference_price: Optional[float] = None, | |
| include_ai: bool = True, | |
| active_model_keys: Optional[List[str]] = None, | |
| failed_model_keys: Optional[List[str]] = None, | |
| requested_models: Optional[Dict[str, bool]] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| TradingView-style technical analysis dashboard (v6.1 Rework). | |
| PTKT is one vote-only gauge across oscillator + performance + moving-average rows, | |
| while AI keeps full-path scoring. | |
| """ | |
| if not data or len(data) < 30: | |
| return { | |
| "oscillators": { | |
| "signal": "Trung lαΊp", | |
| "buy": 0, | |
| "sell": 0, | |
| "neutral": 0, | |
| "buy_weight": 0.0, | |
| "sell_weight": 0.0, | |
| "neutral_weight": 0.0, | |
| "data": [], | |
| }, | |
| "performance": { | |
| "signal": "Trung lαΊp", | |
| "buy": 0, | |
| "sell": 0, | |
| "neutral": 0, | |
| "buy_weight": 0.0, | |
| "sell_weight": 0.0, | |
| "neutral_weight": 0.0, | |
| "available": 0, | |
| "missing": len(PERFORMANCE_LOOKBACKS), | |
| "lookbacks": list(PERFORMANCE_LOOKBACKS), | |
| "data": [], | |
| }, | |
| "moving_averages": { | |
| "signal": "Trung lαΊp", | |
| "buy": 0, | |
| "sell": 0, | |
| "neutral": 0, | |
| "buy_weight": 0.0, | |
| "sell_weight": 0.0, | |
| "neutral_weight": 0.0, | |
| "golden_cross": False, | |
| "death_cross": False, | |
| "data": [], | |
| }, | |
| "summary": {"signal": "Trung lαΊp", "buy": 0, "sell": 0, "neutral": 0}, | |
| } | |
| closes = np.array([float(d["close"]) for d in data], dtype=float) | |
| highs = np.array([float(d["high"]) for d in data], dtype=float) | |
| lows = np.array([float(d["low"]) for d in data], dtype=float) | |
| last_close = closes[-1] | |
| model_reference_price = float(forecast_reference_price or last_close) | |
| def _lv(arr): | |
| return _latest_finite_value(arr) | |
| def _fmt(value: Optional[float]) -> str: | |
| return _format_analysis_value(value) | |
| ema50_arr = _ema(closes, 50) | |
| ema50_current = _lv(ema50_arr) | |
| ema200_current = _lv(indicators["ema"].get("ema200")) | |
| ema50_prev = ( | |
| float(ema50_arr[-6]) | |
| if len(ema50_arr) >= 6 and not math.isnan(float(ema50_arr[-6])) | |
| else (ema50_current or last_close) | |
| ) | |
| regime, regime_label = _classify_regime(indicators, last_close, _safe(indicators.get("atr", {}).get("pct"), 1.0)) | |
| # ββ 1. Oscillators ββ | |
| osc_data = [] | |
| atr_scale = max(float(indicators.get("atr", {}).get("value") or last_close * 0.01), max(last_close * 0.0015, 1e-6)) | |
| def _add_osc(label, val, action_name, key=None, **kw): | |
| if isinstance(val, (np.ndarray, pd.Series, list)): v = _lv(val) | |
| else: v = float(val) if val is not None else None | |
| osc_key = key or action_name | |
| osc_kwargs = { | |
| "last_close": last_close, | |
| "ema50": ema50_current or last_close, | |
| "ema200": ema200_current or last_close, | |
| "ema50_prev": ema50_prev, | |
| "atr": atr_scale, | |
| **kw, | |
| } | |
| act = _osc_action(action_name, v if v is not None else 0, **osc_kwargs) | |
| osc_data.append({ | |
| "key": osc_key, | |
| "name": label, | |
| "value": v, | |
| "display_value": _fmt(v), | |
| "action": act, | |
| }) | |
| stoch_price_k, stoch_price_d = _stoch_kd(highs, lows, closes, 14, 3, 3) | |
| stoch_rsi_k, stoch_rsi_d = _stoch_rsi(closes, 14, 14, 3, 3) | |
| demarker14 = _demarker(highs, lows, 14) | |
| _add_osc("Chα» sα» Sα»©c mαΊ‘nh tΖ°Ζ‘ng Δα»i (14)", _rsi(closes, 14), "rsi", key="rsi") | |
| _add_osc("Stochastic %K (14, 3, 3)", stoch_price_k, "stoch", key="stoch") | |
| _add_osc("Chα» sα» KΓͺnh hΓ ng hΓ³a (20)", _cci(highs, lows, closes, 20), "cci", key="cci") | |
| adx_vals = _adx(highs, lows, closes, ADX_GAUGE_PERIOD) | |
| _add_osc( | |
| f"Chα» sα» Δα»nh hΖ°α»ng Trung bΓ¬nh ({ADX_GAUGE_PERIOD})", | |
| adx_vals[0], | |
| "adx", | |
| key="adx", | |
| plus_di=_lv(adx_vals[1]) or 0, | |
| minus_di=_lv(adx_vals[2]) or 0, | |
| ) | |
| ao_vals = _awesome_oscillator(highs, lows) | |
| _add_osc("Chα» sα» Dao Δα»ng AO", ao_vals, "ao", key="ao", epsilon=atr_scale * 0.02) | |
| _add_osc("Xung lượng (10)", _momentum(closes, 10), "momentum", key="momentum", scale=atr_scale * 1.15) | |
| macd_vals = _macd(closes, 12, 26, 9) | |
| _add_osc("CαΊ₯p Δα» MACD (12, 26)", macd_vals[0], "macd", key="macd", epsilon=atr_scale * 0.02) | |
| _add_osc("ΔΖ°α»ng RSI Nhanh (3, 3, 14, 14)", stoch_rsi_k, "stoch_rsi", key="stoch_rsi") | |
| _add_osc("VΓΉng PhαΊ§n trΔm Williams (14)", _williams_r(highs, lows, closes, 14), "williams", key="williams") | |
| bull_power_vals, bear_power_vals = _bull_bear_power(highs, lows, closes, 13) | |
| bull_power = _lv(bull_power_vals) | |
| bear_power = _lv(bear_power_vals) | |
| osc_data.append( | |
| { | |
| "key": "bbp", | |
| "name": "Sα»©c MαΊ‘nh GiΓ‘ LΓͺn vΓ GiΓ‘ Xuα»ng", | |
| "value": {"bull": bull_power, "bear": bear_power}, | |
| "display_value": f"{_fmt(bull_power)} / {_fmt(bear_power)}", | |
| "action": _osc_action( | |
| "bbp", | |
| bull_power if bull_power is not None else 0.0, | |
| atr=atr_scale, | |
| bull_power=bull_power, | |
| bear_power=bear_power, | |
| epsilon=atr_scale * 0.02, | |
| ), | |
| } | |
| ) | |
| _add_osc("Dao Δα»ng Ultimate (7, 14, 28)", _ultimate_oscillator(highs, lows, closes, 7, 14, 28), "ultimate", key="ultimate") | |
| _add_osc("Tα»c Δα» biαΊΏn Δα»ng ROC (12)", _roc(closes, 12), "roc", key="roc") | |
| _add_osc("TRIX (18)", _trix(closes, 18), "trix", key="trix") | |
| _add_osc("PPO (12, 26)", _ppo(closes, 12, 26), "ppo", key="ppo") | |
| _add_osc("CMO (14)", _cmo(closes, 14), "cmo", key="cmo") | |
| _add_osc("DPO (20)", _dpo(closes, 20), "dpo", key="dpo", scale=atr_scale * 0.75) | |
| _add_osc("Aroon Oscillator (25)", _aroon_oscillator(highs, lows, 25), "aroon", key="aroon") | |
| _add_osc("TSI (25, 13)", _tsi(closes, 25, 13), "tsi", key="tsi") | |
| _add_osc("DeMarker (14)", demarker14, "demarker", key="demarker") | |
| osc_score = _calc_osc_score(osc_data, interval) | |
| # ββ 2. Moving Averages ββ | |
| ma_data = [] | |
| ema_periods = sorted({period for pair in EMA_PAIR_SEQUENCE for period in pair}) | |
| ema_map: Dict[int, np.ndarray] = { | |
| period: closes.copy() if period == 1 else _ema(closes, period) | |
| for period in ema_periods | |
| } | |
| def _add_ema_pair_row(fast_period: int, slow_period: int) -> None: | |
| fast_arr = ema_map[fast_period] | |
| slow_arr = ema_map[slow_period] | |
| fast_value = _lv(fast_arr) | |
| slow_value = _lv(slow_arr) | |
| pair_key = f"ema_{fast_period}_{slow_period}" | |
| act = _ema_pair_action(fast_value, slow_value, pair_key) | |
| ma_data.append( | |
| { | |
| "key": pair_key, | |
| "name": f"EMA{fast_period} / EMA{slow_period}", | |
| "display_value": f"{_fmt(fast_value)} / {_fmt(slow_value)}", | |
| "value": f"{fast_value if fast_value is not None else 'β'} / {slow_value if slow_value is not None else 'β'}", | |
| "action": act, | |
| "fast": fast_value, | |
| "slow": slow_value, | |
| } | |
| ) | |
| for fast_period, slow_period in EMA_PAIR_SEQUENCE: | |
| _add_ema_pair_row(fast_period, slow_period) | |
| ichimoku_base = _latest_finite_value(_ichimoku_base(highs, lows, 26)) | |
| ma_data.append( | |
| { | |
| "key": "ichimoku_base_26", | |
| "name": "GiΓ‘ / Ichimoku Base (26)", | |
| "display_value": f"{_fmt(last_close)} / {_fmt(ichimoku_base)}", | |
| "value": f"{last_close if last_close is not None else 'β'} / {ichimoku_base if ichimoku_base is not None else 'β'}", | |
| "action": _price_vs_level_action(last_close, ichimoku_base, neutral_band_pct=0.08), | |
| "fast": last_close, | |
| "slow": ichimoku_base, | |
| } | |
| ) | |
| ma_score = _calc_ma_score(ma_data, closes, interval) | |
| performance_data = _build_performance_rows(data) | |
| performance_score = _calc_performance_score(performance_data, interval) | |
| technical_score = _calc_technical_score_v2( | |
| osc_score, | |
| ma_score, | |
| performance_score, | |
| interval, | |
| adx_value=_lv(adx_vals[0]), | |
| ) | |
| support_resistance = _calc_price_action_levels(data, last_close, interval) | |
| if not include_ai: | |
| presentation = _build_analysis_presentation( | |
| technical_score, | |
| None, | |
| None, | |
| message="Can chay du bao AI de tong hop", | |
| ) | |
| return { | |
| "style": "tradingview", | |
| "regime": {"key": regime, "label": regime_label}, | |
| "dashboard": { | |
| "technical": { | |
| "gauge": technical_score["gauge"], | |
| "normalized_score": technical_score["normalized_score"], | |
| "signal": technical_score["signal"], | |
| "buy": technical_score["buy"], | |
| "sell": technical_score["sell"], | |
| "neutral": technical_score["neutral"], | |
| "buy_weight": technical_score.get("buy_weight", 0.0), | |
| "sell_weight": technical_score.get("sell_weight", 0.0), | |
| "neutral_weight": technical_score.get("neutral_weight", 0.0), | |
| }, | |
| "ai": None, | |
| "summary": None, | |
| }, | |
| "summary": None, | |
| "presentation": presentation, | |
| "technicals": technical_score, | |
| "oscillators": { | |
| "signal": osc_score["signal"], | |
| "buy": osc_score["buy"], | |
| "sell": osc_score["sell"], | |
| "neutral": osc_score["neutral"], | |
| "buy_weight": osc_score.get("buy_weight", 0.0), | |
| "sell_weight": osc_score.get("sell_weight", 0.0), | |
| "neutral_weight": osc_score.get("neutral_weight", 0.0), | |
| "data": osc_data | |
| }, | |
| "performance": { | |
| "signal": performance_score["signal"], | |
| "buy": performance_score["buy"], | |
| "sell": performance_score["sell"], | |
| "neutral": performance_score["neutral"], | |
| "buy_weight": performance_score.get("buy_weight", 0.0), | |
| "sell_weight": performance_score.get("sell_weight", 0.0), | |
| "neutral_weight": performance_score.get("neutral_weight", 0.0), | |
| "available": performance_score.get("available", 0), | |
| "missing": performance_score.get("missing", 0), | |
| "lookbacks": performance_score.get("lookbacks", list(PERFORMANCE_LOOKBACKS)), | |
| "data": performance_data, | |
| }, | |
| "moving_averages": { | |
| "signal": ma_score["signal"], | |
| "buy": ma_score["buy"], | |
| "sell": ma_score["sell"], | |
| "neutral": ma_score["neutral"], | |
| "buy_weight": ma_score.get("buy_weight", 0.0), | |
| "sell_weight": ma_score.get("sell_weight", 0.0), | |
| "neutral_weight": ma_score.get("neutral_weight", 0.0), | |
| "golden_cross": ma_score["golden_cross"], | |
| "death_cross": ma_score["death_cross"], | |
| "data": ma_data | |
| }, | |
| "ai_gauge": None, | |
| "support_resistance": support_resistance, | |
| "pivot_points": support_resistance, | |
| } | |
| # ββ 3. AI Forecast Gauge ββ | |
| if not blended: | |
| # Fallback to simple blended if no forecast available | |
| blended = { | |
| "p50": [last_close * (1 + (confidence-50)/1000)], | |
| "p10": [last_close * 0.98], "p90": [last_close * 1.02], | |
| "agreement": True, "scale": 1.0 | |
| } | |
| ai_score = _calc_ai_forecast_score( | |
| blended, | |
| forecast_rows, | |
| last_close, | |
| indicators, | |
| len(forecast_rows) or 10, | |
| interval, | |
| model_reference_price=model_reference_price, | |
| ) | |
| # ββ 4. Summary ββ | |
| summary = _calc_summary_score_v2(technical_score, ai_score, interval) | |
| dashboard = _build_dashboard_payload(last_close, forecast_rows, technical_score, ai_score, summary) | |
| presentation = _build_analysis_presentation( | |
| technical_score, | |
| ai_score, | |
| summary, | |
| active_model_keys=active_model_keys or ["combined"], | |
| failed_model_keys=failed_model_keys, | |
| requested_models=requested_models, | |
| ) | |
| return { | |
| "style": "tradingview", | |
| "regime": {"key": regime, "label": regime_label}, | |
| "dashboard": dashboard, | |
| "summary": summary, | |
| "presentation": presentation, | |
| "technicals": technical_score, | |
| "oscillators": { | |
| "signal": osc_score["signal"], | |
| "buy": osc_score["buy"], | |
| "sell": osc_score["sell"], | |
| "neutral": osc_score["neutral"], | |
| "buy_weight": osc_score.get("buy_weight", 0.0), | |
| "sell_weight": osc_score.get("sell_weight", 0.0), | |
| "neutral_weight": osc_score.get("neutral_weight", 0.0), | |
| "data": osc_data | |
| }, | |
| "performance": { | |
| "signal": performance_score["signal"], | |
| "buy": performance_score["buy"], | |
| "sell": performance_score["sell"], | |
| "neutral": performance_score["neutral"], | |
| "buy_weight": performance_score.get("buy_weight", 0.0), | |
| "sell_weight": performance_score.get("sell_weight", 0.0), | |
| "neutral_weight": performance_score.get("neutral_weight", 0.0), | |
| "available": performance_score.get("available", 0), | |
| "missing": performance_score.get("missing", 0), | |
| "lookbacks": performance_score.get("lookbacks", list(PERFORMANCE_LOOKBACKS)), | |
| "data": performance_data, | |
| }, | |
| "moving_averages": { | |
| "signal": ma_score["signal"], | |
| "buy": ma_score["buy"], | |
| "sell": ma_score["sell"], | |
| "neutral": ma_score["neutral"], | |
| "buy_weight": ma_score.get("buy_weight", 0.0), | |
| "sell_weight": ma_score.get("sell_weight", 0.0), | |
| "neutral_weight": ma_score.get("neutral_weight", 0.0), | |
| "golden_cross": ma_score["golden_cross"], | |
| "death_cross": ma_score["death_cross"], | |
| "data": ma_data | |
| }, | |
| "ai_gauge": ai_score, | |
| "support_resistance": support_resistance, | |
| "pivot_points": support_resistance, | |
| } | |