""" Training Engine v1.0 — Complete T4 Training Pipeline Chạy trên Coordinator Space như background thread. Giải quyết: 1. Curriculum Scheduler — học có thứ tự, ôn bài yếu nhiều hơn 2. Experience Replay — buffer bài cũ, không quên 3. Anti-Catastrophic-Forgetting — luôn mix theory vào fine-tune 4. Multi-strategy training — N strategies chạy độc lập 5. Training progress log — log đầy đủ ra HF """ from __future__ import annotations import os, json, logging, math, time, tempfile from datetime import datetime, timezone import numpy as np import pandas as pd log = logging.getLogger("training_engine") HF_TOKEN = os.environ.get("HF_TOKEN", "") EXPERIENCE_REPO = os.environ.get("EXPERIENCE_REPO", "gionuibk/aetheris-experiences") MODEL_REPO = os.environ.get("MODEL_REPO", "gionuibk/aetheris-models") # ───────────────────────────────────────────────────────────────── # 1. CURRICULUM SCHEDULER # ───────────────────────────────────────────────────────────────── THEORY_DIFFICULTY = { # ── Level 1: Easy (price-following, clear signals) ──────────────── "mean_reversion": 1, "momentum_trend": 1, "kelly_risk_management": 1, "risk_no_edge_flat": 1, "meanrev_extreme_oversold": 1, "meanrev_extreme_overbought": 1, "micro_noise_flat": 1, "macro_healthy_bull": 1, "macro_healthy_bear": 1, "momentum_weak_no_trade": 1, # ── Level 2: Medium (confirmation patterns) ─────────────────────── "microstructure_informed": 2, "microstructure_noise": 2, "price_action_bos_bull": 2, "price_action_bos_bear": 2, "vsa_no_supply": 2, "vsa_no_demand": 2, "dow_markup": 2, "dow_accumulation": 2, "dow_distribution": 2, "dow_panic_sell": 2, "scalping_orderflow": 2, "scalping_pullback": 2, "scalping_wide_spread_avoid": 2, "scalping_momentum_burst": 2, "micro_informed_buy": 2, "micro_informed_sell": 2, "micro_absorption_bull": 2, "micro_spread_spike": 2, "meanrev_hurst_confirms": 2, "meanrev_vwap_bounce_bull": 2, "momentum_macd_cross_bull": 2, "momentum_turtle_buy": 2, "momentum_turtle_sell": 2, "indicator_macd_bullish": 2, "indicator_macd_bearish": 2, "indicator_bb_squeeze_bull":2, "indicator_bb_squeeze_bear": 2, "indicator_rsi_bull_divergence": 2, "indicator_rsi_bear_divergence": 2, "ta_bollinger_squeeze_breakout": 2, "ta_dow_trend_structure_bull": 2, "ta_golden_pocket_buy": 2, "ta_inside_bar_continuation": 2, "ta_pivot_support_bounce": 2, "pa_engulfing_reversal": 2, "pa_pin_bar_reversal": 2, "price_action_engulf_bull": 2, "price_action_engulf_bear": 2, "price_action_inside_cont": 2, "macro_crowded_long": 2, "macro_crowded_short": 2, "macro_funding_extreme_long": 2, "macro_funding_extreme_short": 2, "macro_funding_flip": 2, "macro_oi_trend_confirm": 2, "macro_oi_diverge_warning": 2, "fibonacci_618_retracement":2, "gann_angle_support": 2, "swing_range_boundary": 2, "swing_rsi_divergence": 2, "swing_breakout_retest": 2, "swing_htf_pullback": 2, "dca_accumulate": 2, "dca_pause_strong_trend": 2, "grid_buy_dip": 2, "grid_sell_peak": 2, "grid_exit_trend_break": 2, "risk_drawdown_reduce": 2, "risk_volatile_spread_avoid": 2, "risk_of_ruin": 2, # ── Level 3: Hard (multi-factor, context-dependent) ─────────────── "wyckoff_spring": 3, "wyckoff_UTAD": 3, "wyckoff_SOS": 3, "wyckoff_AR": 3, "wyckoff_LPS": 3, "wyckoff_ST": 3, "wyckoff_SC": 3, "wyckoff_PS": 3, "wyckoff_markup": 3, "wyckoff_markdown": 3, "wyckoff_upthrust": 3, "wyckoff_spring_buy": 3, "wyckoff_sos_buy": 3, "wyckoff_test_spring": 3, "wyckoff_absorption_bull": 3, "wyckoff_absorption_bear": 3, "wyckoff_upthrust_sell": 3, "wyckoff_SOW": 3, "wyckoff_LPSY": 3, "vsa_climax_buy": 3, "vsa_climax_sell": 3, "vsa_test_bar": 3, "vsa_upthrust_bar": 3, "volprofile_poc_bounce_bull": 3, "volprofile_poc_bounce_bear": 3, "volprofile_hvn_stall": 3, "volprofile_lvn_fast_move": 3, "volprofile_vah_rejection": 3, "volprofile_val_bounce": 3, "elliott_wave3_strong": 3, "elliott_wave1": 3, "elliott_wave2_retrace": 3, "elliott_wave5_exhaustion": 3, "elliott_waveA_correction": 3, "ict_bull_order_block": 3, "ict_bear_order_block": 3, "ict_bear_breaker": 3, "ict_bull_breaker": 3, "ict_fvg_fill": 3, "ict_fvg_fill_bull": 3, "ict_fvg_fill_bear": 3, "ict_bos_continuation": 3, "ict_optimal_entry": 3, "ict_bear_order_block": 3, "macro_long_liquidation": 3, "macro_short_squeeze": 3, "macro_crowded_long": 3, # ── Level 4: Expert (regime-aware, cascade, complex waves) ──────── "regime_aware_volatile": 4, "regime_aware_ranging": 4, "elliott_waveC_completion": 4, "macro_rug_pull": 4, "macro_funding_extreme_long": 4, } class CurriculumScheduler: """ Quyết định học theory nào tiếp theo và bao nhiêu examples. Logic: học dễ → khó. Accuracy thấp → tăng examples. """ def __init__(self): self.theory_scores: dict[str, float] = {} # theory → accuracy self.theory_seen: dict[str, int] = {} # theory → n_examples seen def update(self, theory_id: str, accuracy: float, n_examples: int): self.theory_scores[theory_id] = accuracy self.theory_seen[theory_id] = self.theory_seen.get(theory_id, 0) + n_examples def get_sample_weight(self, theory_id: str) -> float: """ Theories with low accuracy get MORE samples. Hard theories get base 1.5× weight. """ difficulty = THEORY_DIFFICULTY.get(theory_id, 2) accuracy = self.theory_scores.get(theory_id, 0.5) # Poor performers get more practice struggle_weight = max(0.5, 1.0 - accuracy) * 2.0 difficulty_weight = 1.0 + (difficulty - 1) * 0.3 return struggle_weight * difficulty_weight def schedule_epoch(self, df_theory: pd.DataFrame, epoch: int) -> pd.DataFrame: """ Curriculum: epoch 0 chỉ easy (level 1), tăng dần theo epoch. Unknown theories → treat as level 2 (medium), KHÔNG bypass. """ if "theory" not in df_theory.columns: return df_theory.sample(frac=1, random_state=epoch) max_difficulty = min(1 + epoch // 2, 4) # 0→1, 2→2, 4→3, 6→4 def get_diff(t: str) -> int: return THEORY_DIFFICULTY.get(t, 2) # unknown → medium, NOT bypass mask = df_theory["theory"].map(get_diff) <= max_difficulty df_allowed = df_theory[mask] # Fallback chỉ khi THẬT SỰ không có data (< 5% tổng, tối thiểu 10 rows) min_rows = max(10, int(len(df_theory) * 0.05)) if len(df_allowed) < min_rows: log.warning(f"Curriculum epoch {epoch}: only {len(df_allowed)} rows at " f"max_diff={max_difficulty}, falling back to all data") df_allowed = df_theory weights = df_allowed["theory"].map( lambda t: self.get_sample_weight(t) ).fillna(1.0) n_sample = min(len(df_allowed), 10_000) probs = (weights / weights.sum()).values idx = np.random.choice(len(df_allowed), size=n_sample, replace=len(df_allowed) < n_sample, p=probs) return df_allowed.iloc[idx].copy() # ───────────────────────────────────────────────────────────────── # 2. EXPERIENCE REPLAY BUFFER # ───────────────────────────────────────────────────────────────── class ExperienceReplayBuffer: """ Keeps a priority buffer of important past experiences. Mix into every training batch to prevent catastrophic forgetting. """ MAX_SIZE = 20_000 # max experiences in buffer def __init__(self): self._buffer: list[dict] = [] self._priorities: list[float] = [] def add(self, examples: list[dict], priority: float = 1.0): """Add examples with a priority score.""" for ex in examples: self._buffer.append(ex) self._priorities.append(priority) # Trim to max size (keep highest priority) if len(self._buffer) > self.MAX_SIZE: combined = sorted( zip(self._priorities, self._buffer), key=lambda x: x[0], reverse=True )[:self.MAX_SIZE] self._priorities, self._buffer = map(list, zip(*combined)) if combined else ([], []) def sample(self, n: int) -> pd.DataFrame | None: """Sample n examples from buffer, weighted by priority.""" if len(self._buffer) < 2: return None n = min(n, len(self._buffer)) total = sum(self._priorities) probs = [p / total for p in self._priorities] idx = np.random.choice(len(self._buffer), size=n, replace=False, p=probs) return pd.DataFrame([self._buffer[i] for i in idx]) def add_theory_data(self, df_theory: pd.DataFrame): """Theory data always has high priority — never forget.""" rows = df_theory.to_dict("records") self.add(rows, priority=2.0) # 2× priority vs real data def add_real_data(self, df_real: pd.DataFrame): """Real market data at normal priority.""" # Only add high-confidence examples (non-flat) df_good = df_real[df_real["label"] != 0] if "label" in df_real.columns else df_real rows = df_good.head(1000).to_dict("records") # cap per call self.add(rows, priority=1.0) def __len__(self) -> int: return len(self._buffer) # ───────────────────────────────────────────────────────────────── # 3. TRAINING PROGRESS LOG # ───────────────────────────────────────────────────────────────── class TrainingLog: """Log training progress to HF and locally.""" LOG_PATH = "training_log.jsonl" def __init__(self): self._entries: list[dict] = [] def record(self, expert: str, epoch: int, metrics: dict, phase: str = "fine_tune", strategy: str = "default"): entry = { "ts": datetime.now(timezone.utc).isoformat(), "expert": expert, "strategy": strategy, "epoch": epoch, "phase": phase, **metrics, } self._entries.append(entry) # Write to local file with open(self.LOG_PATH, "a") as f: f.write(json.dumps(entry) + "\n") log.info(f"📊 [{expert}] epoch={epoch} phase={phase} " f"acc={metrics.get('accuracy',0):.3f} " f"sharpe={metrics.get('sharpe',0):.3f} " f"n={metrics.get('n_samples',0)}") def push_to_hf(self): if not os.path.exists(self.LOG_PATH): return try: from huggingface_hub import HfApi api = HfApi(token=HF_TOKEN) api.upload_file( path_or_fileobj=self.LOG_PATH, path_in_repo="logs/training_log.jsonl", repo_id=EXPERIENCE_REPO, repo_type="dataset", token=HF_TOKEN, commit_message=f"Training log {datetime.now(timezone.utc).strftime('%Y%m%d_%H%M')}", ) except Exception as e: log.warning(f"Log push failed: {e}") def get_summary(self) -> dict: if not self._entries: return {} by_expert = {} for e in self._entries: ex = e["expert"] if ex not in by_expert: by_expert[ex] = [] by_expert[ex].append(e) return { ex: { "n_epochs": len(runs), "best_sharpe": max(r.get("sharpe", 0) for r in runs), "last_acc": runs[-1].get("accuracy", 0), "last_epoch": runs[-1]["epoch"], } for ex, runs in by_expert.items() } # ───────────────────────────────────────────────────────────────── # 4. MULTI-STRATEGY DEFINITIONS # ───────────────────────────────────────────────────────────────── STRATEGIES = { "momentum": { "description": "Trend following — ADX, EMA, Momentum", "expert_features": { "E1": ["obi","vpin","entropy","cvd_norm","absorption","tape_speed","large_trade"], "E2": ["ema_cross_fs","ema_cross_sl","adx","plus_di","minus_di","momentum_zscore","trend_strength","vol_surge","macd_hist"], "E3": ["zscore_60","rsi","hurst","bb_pct_b","mean_rev_signal"], "E4": ["market_structure","bos","wyckoff_phase","fib_618_prox","ob_score"], "E5": ["funding","funding_trend","oi_delta","ls_ratio","session_ny","session_overlap"], }, "regime_filter": ["TRENDING"], "target_theories": ["momentum_trend","elliott_wave3_strong","dow_markup","swing_htf_pullback"], }, "mean_reversion": { "description": "Range trading — Z-score, RSI, Bollinger", "expert_features": { "E1": ["obi","spread_pct","vpin","entropy","absorption","poc_proximity","vol_skew"], "E2": ["adx","ema_cross_fs","momentum_zscore","trend_strength"], "E3": ["zscore_60","zscore_300","rsi","hurst","bb_pct_b","bb_width","vwap_dev","mean_rev_signal"], "E4": ["market_structure","bos","pivot_dist_h","pivot_dist_l","wyckoff_phase","fvg_score"], "E5": ["funding","oi_delta","ls_ratio","session_asian","session_london"], }, "regime_filter": ["RANGING","QUIET"], "target_theories": ["mean_reversion","vsa_no_supply","vsa_no_demand","wyckoff_spring","volprofile_poc_bounce_bull"], }, "smart_money": { "description": "Wyckoff + ICT — Order Blocks, Spring, FVG", "expert_features": { "E1": ["obi","vpin","absorption","cvd_norm","large_trade","vol_skew"], "E2": ["adx","trend_strength","ema_cross_sl","momentum_zscore"], "E3": ["zscore_60","hurst","rsi","bb_pct_b"], "E4": ["market_structure","bos","wyckoff_phase","fib_618_prox","fvg_score","ob_score","pin_bar","engulfing"], "E5": ["funding","oi_delta","ls_ratio","macro_squeeze_risk"], }, "regime_filter": ["RANGING","VOLATILE"], "target_theories": ["wyckoff_spring","wyckoff_SOS","ict_bull_order_block","ict_fvg_fill_bull","macro_short_squeeze"], }, "scalping": { "description": "Microstructure — Tape, OBI, VPIN, tight SL", "expert_features": { "E1": ["obi","spread_pct","vpin","entropy","cvd_norm","tape_speed","large_trade","vol_adjusted_spread"], "E2": ["adx","momentum_zscore","ema_cross_fs"], "E3": ["zscore_60","rsi","vwap_dev"], "E4": ["bos","pin_bar","fvg_score"], "E5": ["session_overlap","session_ny","session_london"], }, "regime_filter": ["VOLATILE","TRENDING"], "target_theories": ["scalping_orderflow","scalping_momentum_burst","micro_informed_buy","micro_informed_sell"], }, } # ───────────────────────────────────────────────────────────────── # 5. MAIN TRAINING ENGINE # ───────────────────────────────────────────────────────────────── class TrainingEngine: """ Orchestrates the complete training pipeline for all strategies. Runs as background thread in Coordinator Space. """ def __init__(self): self.curriculum = CurriculumScheduler() self.replay = ExperienceReplayBuffer() self.log = TrainingLog() self._theory_df = None self._real_df = None def _load_theory_data(self) -> pd.DataFrame | None: if self._theory_df is not None: return self._theory_df try: from huggingface_hub import hf_hub_download local = hf_hub_download( repo_id=EXPERIENCE_REPO, filename="theory/synthetic_pretrain_v3.parquet", repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/theory_cache", ) self._theory_df = pd.read_parquet(local) log.info(f"📚 Theory data: {len(self._theory_df):,} examples") # Seed replay buffer with theory data (high priority) self.replay.add_theory_data(self._theory_df) return self._theory_df except Exception as e: log.warning(f"Cannot load theory data: {e}") return None def _load_real_data(self, max_files: int = 100) -> pd.DataFrame | None: try: from huggingface_hub import list_repo_files, hf_hub_download files = [f for f in list_repo_files(EXPERIENCE_REPO, repo_type="dataset", token=HF_TOKEN) if f.startswith("experiences/") and f.endswith(".parquet")] if not files: return None dfs = [] for fp in files[:max_files]: try: local = hf_hub_download(repo_id=EXPERIENCE_REPO, filename=fp, repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/exp_cache") dfs.append(pd.read_parquet(local)) except Exception: pass if not dfs: return None self._real_df = pd.concat(dfs, ignore_index=True).fillna(0.0) self.replay.add_real_data(self._real_df) log.info(f"📈 Real data: {len(self._real_df):,} rows") return self._real_df except Exception as e: log.warning(f"Cannot load real data: {e}") return None def _get_features_for_strategy(self, df: pd.DataFrame, expert_id: str, strategy_name: str) -> tuple: """Extract feature columns for a specific expert × strategy.""" strat = STRATEGIES.get(strategy_name, STRATEGIES["momentum"]) feat_cols = strat["expert_features"].get(expert_id, []) # Keep only columns that exist in df available = [c for c in feat_cols if c in df.columns] if len(available) < 2: # Fallback: use all numeric columns available = [c for c in df.select_dtypes(include=[np.number]).columns if c not in ("label","actual_pnl","timestamp")] return available def _compute_accuracy_per_theory(self, model, df: pd.DataFrame, feat_cols: list) -> dict: """Per-theory accuracy for curriculum updates.""" results = {} if "theory" not in df.columns: return results for theory in df["theory"].unique(): sub = df[df["theory"] == theory] if len(sub) < 10: continue X = sub[feat_cols].fillna(0).values y = sub["label"].values try: preds = model.predict(X) acc = float(np.mean(preds == y)) results[theory] = acc self.curriculum.update(theory, acc, len(sub)) except Exception: pass return results def train_strategy(self, strategy_name: str, n_epochs: int = 5) -> dict: """ Full training pipeline for one strategy. Returns dict with metrics per expert. """ log.info(f"🎯 Training strategy: {strategy_name}") strat = STRATEGIES.get(strategy_name) if not strat: return {"error": f"Unknown strategy: {strategy_name}"} df_theory = self._load_theory_data() df_real = self._load_real_data() results = {} for expert_id in ["E1","E2","E3","E4","E5"]: log.info(f" 🔷 {strategy_name}/{expert_id}") expert_name = f"{strategy_name}_{expert_id.lower()}" feat_cols = self._get_features_for_strategy( df_theory if df_theory is not None else pd.DataFrame(), expert_id, strategy_name ) if not feat_cols: log.warning(f" ⚠️ No features for {expert_id}, skip") continue # Load warm-start checkpoint from model_registry import load_best_checkpoint, register_model model, prev_version = load_best_checkpoint(expert_name) if model is None: model = self._build_model() # ── TRAINING LOOP ──────────────────────────────── for epoch in range(n_epochs): batches = [] # (A) Curriculum-scheduled theory data if df_theory is not None and len(df_theory) > 0: batch_theory = self.curriculum.schedule_epoch(df_theory, epoch) feats_t = [c for c in feat_cols if c in batch_theory.columns] if feats_t: for c in feat_cols: if c not in batch_theory.columns: batch_theory[c] = 0.0 batches.append(batch_theory[feat_cols + ["label"]].fillna(0.0)) # (B) Real data fine-tuning (from epoch 2 onward) if epoch >= 2 and df_real is not None and len(df_real) > 1000: for c in feat_cols: if c not in df_real.columns: df_real[c] = 0.0 batches.append(df_real[feat_cols + ["label"]].fillna(0.0).head(5000)) # (C) Replay buffer — anti-catastrophic-forgetting replay_sample = self.replay.sample(min(2000, len(self.replay))) if replay_sample is not None and len(replay_sample) > 100: for c in feat_cols: if c not in replay_sample.columns: replay_sample[c] = 0.0 if "label" in replay_sample.columns: batches.append(replay_sample[feat_cols + ["label"]].fillna(0.0)) if not batches: log.warning(f" ⚠️ No training data for {expert_id} epoch {epoch}") break # Merge, shuffle, class-balance via sample_weight df_batch = pd.concat(batches, ignore_index=True) df_batch = df_batch.sample(frac=1, random_state=epoch).reset_index(drop=True) X = df_batch[feat_cols].values.astype(np.float32) y = df_batch["label"].values.astype(np.int32) # Class-balanced sample weights (fix bias toward label=0) from sklearn.utils.class_weight import compute_sample_weight try: sw = compute_sample_weight("balanced", y) except Exception: sw = None # Train try: if sw is not None: model.fit(X, y, sample_weight=sw) else: model.fit(X, y) except TypeError: # some estimators don't support sample_weight model.fit(X, y) except Exception as e: log.warning(f" ❌ Fit error epoch {epoch}: {e}") break # Metrics preds = model.predict(X) acc = float(np.mean(preds == y)) conf = model.predict_proba(X) max_conf = np.max(conf, axis=1) sharpe = (float(np.mean(max_conf)) - 0.5) / (float(np.std(max_conf)) + 1e-9) * math.sqrt(len(max_conf)) # Theory accuracy for curriculum if df_theory is not None: self._compute_accuracy_per_theory(model, df_batch, feat_cols) # Log self.log.record(expert_name, epoch, {"accuracy": round(acc,4), "sharpe": round(sharpe,4), "n_samples": len(X), "n_feat": len(feat_cols)}, phase="theory" if epoch < 2 else "fine_tune", strategy=strategy_name) # ── Final metrics — guard: df_batch may not exist if training skipped ── if df_batch is None or len(df_batch) == 0: log.warning(f" ⚠️ {strategy_name}: no training data produced, skipping register") continue final_X = df_batch[feat_cols].values.astype(np.float32) final_conf = model.predict_proba(final_X) max_conf = np.max(final_conf, axis=1) final_sharpe = (float(np.mean(max_conf)) - 0.5) / (float(np.std(max_conf)) + 1e-9) * math.sqrt(len(max_conf)) final_acc = float(np.mean(model.predict(final_X) == df_batch["label"].values)) accepted = register_model(expert_name, model, { "sharpe": round(final_sharpe, 4), "accuracy": round(final_acc, 4), "n_samples": len(final_X), }) results[expert_id] = { "sharpe": round(final_sharpe, 4), "accuracy": round(final_acc, 4), "accepted": accepted, } # Push log self.log.push_to_hf() summary = self.log.get_summary() log.info(f"✅ Strategy {strategy_name} done: {results}") return {"strategy": strategy_name, "experts": results, "summary": summary} def _build_model(self): try: from sklearn.ensemble import GradientBoostingClassifier return GradientBoostingClassifier( n_estimators=100, max_depth=4, learning_rate=0.1, subsample=0.8, random_state=42, ) except ImportError: from sklearn.ensemble import RandomForestClassifier return RandomForestClassifier(n_estimators=100, max_depth=5, n_jobs=-1, random_state=42) def run_all_strategies(self): """Train all strategies sequentially.""" log.info("🏭 Training Engine: starting all strategies") for strategy_name in STRATEGIES: try: result = self.train_strategy(strategy_name, n_epochs=5) log.info(f"✅ {strategy_name}: {result.get('experts',{})}") except Exception as e: log.error(f"❌ {strategy_name}: {e}", exc_info=True) # ───────────────────────────────────────────────────────────────── # Background loop for Coordinator # ───────────────────────────────────────────────────────────────── def run_training_loop(check_interval: int = 3600): """ Run training periodically. Triggered by: enough experiences + coordinator health. """ log.info(f"🎓 TrainingEngine loop started (interval={check_interval}s)") engine = TrainingEngine() while True: try: engine.run_all_strategies() except Exception as e: log.error(f"❌ Training loop error: {e}", exc_info=True) log.info(f"💤 Training done. Next in {check_interval//3600}h") time.sleep(check_interval) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) engine = TrainingEngine() result = engine.train_strategy("momentum", n_epochs=2) print(json.dumps(result, indent=2))