HPLL-DataReview / training_engine.py
gionuibk's picture
fix: curriculum+classweight+metalearner+delay: training_engine.py
e81f323 verified
"""
Training Engine v1.0 β€” Complete T4 Training Pipeline
ChαΊ‘y trΓͺn Coordinator Space nhΖ° background thread.
GiαΊ£i quyαΊΏt:
1. Curriculum Scheduler β€” học cΓ³ thα»© tα»±, Γ΄n bΓ i yαΊΏu nhiều hΖ‘n
2. Experience Replay β€” buffer bΓ i cΕ©, khΓ΄ng quΓͺn
3. Anti-Catastrophic-Forgetting β€” luΓ΄n mix theory vΓ o fine-tune
4. Multi-strategy training β€” N strategies chαΊ‘y Δ‘α»™c lαΊ­p
5. Training progress log β€” log Δ‘αΊ§y Δ‘α»§ ra HF
"""
from __future__ import annotations
import os, json, logging, math, time, tempfile
from datetime import datetime, timezone
import numpy as np
import pandas as pd
log = logging.getLogger("training_engine")
HF_TOKEN = os.environ.get("HF_TOKEN", "")
EXPERIENCE_REPO = os.environ.get("EXPERIENCE_REPO", "gionuibk/aetheris-experiences")
MODEL_REPO = os.environ.get("MODEL_REPO", "gionuibk/aetheris-models")
# ─────────────────────────────────────────────────────────────────
# 1. CURRICULUM SCHEDULER
# ─────────────────────────────────────────────────────────────────
THEORY_DIFFICULTY = {
# ── Level 1: Easy (price-following, clear signals) ────────────────
"mean_reversion": 1, "momentum_trend": 1,
"kelly_risk_management": 1, "risk_no_edge_flat": 1,
"meanrev_extreme_oversold": 1, "meanrev_extreme_overbought": 1,
"micro_noise_flat": 1, "macro_healthy_bull": 1,
"macro_healthy_bear": 1, "momentum_weak_no_trade": 1,
# ── Level 2: Medium (confirmation patterns) ───────────────────────
"microstructure_informed": 2, "microstructure_noise": 2,
"price_action_bos_bull": 2, "price_action_bos_bear": 2,
"vsa_no_supply": 2, "vsa_no_demand": 2,
"dow_markup": 2, "dow_accumulation": 2,
"dow_distribution": 2, "dow_panic_sell": 2,
"scalping_orderflow": 2, "scalping_pullback": 2,
"scalping_wide_spread_avoid": 2, "scalping_momentum_burst": 2,
"micro_informed_buy": 2, "micro_informed_sell": 2,
"micro_absorption_bull": 2, "micro_spread_spike": 2,
"meanrev_hurst_confirms": 2, "meanrev_vwap_bounce_bull": 2,
"momentum_macd_cross_bull": 2, "momentum_turtle_buy": 2,
"momentum_turtle_sell": 2,
"indicator_macd_bullish": 2, "indicator_macd_bearish": 2,
"indicator_bb_squeeze_bull":2, "indicator_bb_squeeze_bear": 2,
"indicator_rsi_bull_divergence": 2, "indicator_rsi_bear_divergence": 2,
"ta_bollinger_squeeze_breakout": 2, "ta_dow_trend_structure_bull": 2,
"ta_golden_pocket_buy": 2, "ta_inside_bar_continuation": 2,
"ta_pivot_support_bounce": 2,
"pa_engulfing_reversal": 2, "pa_pin_bar_reversal": 2,
"price_action_engulf_bull": 2, "price_action_engulf_bear": 2,
"price_action_inside_cont": 2,
"macro_crowded_long": 2, "macro_crowded_short": 2,
"macro_funding_extreme_long": 2, "macro_funding_extreme_short": 2,
"macro_funding_flip": 2, "macro_oi_trend_confirm": 2,
"macro_oi_diverge_warning": 2,
"fibonacci_618_retracement":2, "gann_angle_support": 2,
"swing_range_boundary": 2, "swing_rsi_divergence": 2,
"swing_breakout_retest": 2, "swing_htf_pullback": 2,
"dca_accumulate": 2, "dca_pause_strong_trend": 2,
"grid_buy_dip": 2, "grid_sell_peak": 2,
"grid_exit_trend_break": 2,
"risk_drawdown_reduce": 2, "risk_volatile_spread_avoid": 2,
"risk_of_ruin": 2,
# ── Level 3: Hard (multi-factor, context-dependent) ───────────────
"wyckoff_spring": 3, "wyckoff_UTAD": 3,
"wyckoff_SOS": 3, "wyckoff_AR": 3,
"wyckoff_LPS": 3, "wyckoff_ST": 3,
"wyckoff_SC": 3, "wyckoff_PS": 3,
"wyckoff_markup": 3, "wyckoff_markdown": 3,
"wyckoff_upthrust": 3, "wyckoff_spring_buy": 3,
"wyckoff_sos_buy": 3, "wyckoff_test_spring": 3,
"wyckoff_absorption_bull": 3, "wyckoff_absorption_bear": 3,
"wyckoff_upthrust_sell": 3,
"wyckoff_SOW": 3, "wyckoff_LPSY": 3,
"vsa_climax_buy": 3, "vsa_climax_sell": 3,
"vsa_test_bar": 3, "vsa_upthrust_bar": 3,
"volprofile_poc_bounce_bull": 3, "volprofile_poc_bounce_bear": 3,
"volprofile_hvn_stall": 3, "volprofile_lvn_fast_move": 3,
"volprofile_vah_rejection": 3, "volprofile_val_bounce": 3,
"elliott_wave3_strong": 3, "elliott_wave1": 3,
"elliott_wave2_retrace": 3, "elliott_wave5_exhaustion": 3,
"elliott_waveA_correction": 3,
"ict_bull_order_block": 3, "ict_bear_order_block": 3,
"ict_bear_breaker": 3, "ict_bull_breaker": 3,
"ict_fvg_fill": 3, "ict_fvg_fill_bull": 3,
"ict_fvg_fill_bear": 3, "ict_bos_continuation": 3,
"ict_optimal_entry": 3, "ict_bear_order_block": 3,
"macro_long_liquidation": 3, "macro_short_squeeze": 3,
"macro_crowded_long": 3,
# ── Level 4: Expert (regime-aware, cascade, complex waves) ────────
"regime_aware_volatile": 4, "regime_aware_ranging": 4,
"elliott_waveC_completion": 4,
"macro_rug_pull": 4, "macro_funding_extreme_long": 4,
}
class CurriculumScheduler:
"""
QuyαΊΏt Δ‘α»‹nh học theory nΓ o tiαΊΏp theo vΓ  bao nhiΓͺu examples.
Logic: học dα»… β†’ khΓ³. Accuracy thαΊ₯p β†’ tΔƒng examples.
"""
def __init__(self):
self.theory_scores: dict[str, float] = {} # theory β†’ accuracy
self.theory_seen: dict[str, int] = {} # theory β†’ n_examples seen
def update(self, theory_id: str, accuracy: float, n_examples: int):
self.theory_scores[theory_id] = accuracy
self.theory_seen[theory_id] = self.theory_seen.get(theory_id, 0) + n_examples
def get_sample_weight(self, theory_id: str) -> float:
"""
Theories with low accuracy get MORE samples.
Hard theories get base 1.5Γ— weight.
"""
difficulty = THEORY_DIFFICULTY.get(theory_id, 2)
accuracy = self.theory_scores.get(theory_id, 0.5)
# Poor performers get more practice
struggle_weight = max(0.5, 1.0 - accuracy) * 2.0
difficulty_weight = 1.0 + (difficulty - 1) * 0.3
return struggle_weight * difficulty_weight
def schedule_epoch(self, df_theory: pd.DataFrame, epoch: int) -> pd.DataFrame:
"""
Curriculum: epoch 0 chỉ easy (level 1), tΔƒng dαΊ§n theo epoch.
Unknown theories β†’ treat as level 2 (medium), KHΓ”NG bypass.
"""
if "theory" not in df_theory.columns:
return df_theory.sample(frac=1, random_state=epoch)
max_difficulty = min(1 + epoch // 2, 4) # 0β†’1, 2β†’2, 4β†’3, 6β†’4
def get_diff(t: str) -> int:
return THEORY_DIFFICULTY.get(t, 2) # unknown β†’ medium, NOT bypass
mask = df_theory["theory"].map(get_diff) <= max_difficulty
df_allowed = df_theory[mask]
# Fallback chỉ khi THαΊ¬T Sα»° khΓ΄ng cΓ³ data (< 5% tα»•ng, tα»‘i thiểu 10 rows)
min_rows = max(10, int(len(df_theory) * 0.05))
if len(df_allowed) < min_rows:
log.warning(f"Curriculum epoch {epoch}: only {len(df_allowed)} rows at "
f"max_diff={max_difficulty}, falling back to all data")
df_allowed = df_theory
weights = df_allowed["theory"].map(
lambda t: self.get_sample_weight(t)
).fillna(1.0)
n_sample = min(len(df_allowed), 10_000)
probs = (weights / weights.sum()).values
idx = np.random.choice(len(df_allowed), size=n_sample,
replace=len(df_allowed) < n_sample, p=probs)
return df_allowed.iloc[idx].copy()
# ─────────────────────────────────────────────────────────────────
# 2. EXPERIENCE REPLAY BUFFER
# ─────────────────────────────────────────────────────────────────
class ExperienceReplayBuffer:
"""
Keeps a priority buffer of important past experiences.
Mix into every training batch to prevent catastrophic forgetting.
"""
MAX_SIZE = 20_000 # max experiences in buffer
def __init__(self):
self._buffer: list[dict] = []
self._priorities: list[float] = []
def add(self, examples: list[dict], priority: float = 1.0):
"""Add examples with a priority score."""
for ex in examples:
self._buffer.append(ex)
self._priorities.append(priority)
# Trim to max size (keep highest priority)
if len(self._buffer) > self.MAX_SIZE:
combined = sorted(
zip(self._priorities, self._buffer),
key=lambda x: x[0], reverse=True
)[:self.MAX_SIZE]
self._priorities, self._buffer = map(list, zip(*combined)) if combined else ([], [])
def sample(self, n: int) -> pd.DataFrame | None:
"""Sample n examples from buffer, weighted by priority."""
if len(self._buffer) < 2:
return None
n = min(n, len(self._buffer))
total = sum(self._priorities)
probs = [p / total for p in self._priorities]
idx = np.random.choice(len(self._buffer), size=n, replace=False, p=probs)
return pd.DataFrame([self._buffer[i] for i in idx])
def add_theory_data(self, df_theory: pd.DataFrame):
"""Theory data always has high priority β€” never forget."""
rows = df_theory.to_dict("records")
self.add(rows, priority=2.0) # 2Γ— priority vs real data
def add_real_data(self, df_real: pd.DataFrame):
"""Real market data at normal priority."""
# Only add high-confidence examples (non-flat)
df_good = df_real[df_real["label"] != 0] if "label" in df_real.columns else df_real
rows = df_good.head(1000).to_dict("records") # cap per call
self.add(rows, priority=1.0)
def __len__(self) -> int:
return len(self._buffer)
# ─────────────────────────────────────────────────────────────────
# 3. TRAINING PROGRESS LOG
# ─────────────────────────────────────────────────────────────────
class TrainingLog:
"""Log training progress to HF and locally."""
LOG_PATH = "training_log.jsonl"
def __init__(self):
self._entries: list[dict] = []
def record(self, expert: str, epoch: int, metrics: dict,
phase: str = "fine_tune", strategy: str = "default"):
entry = {
"ts": datetime.now(timezone.utc).isoformat(),
"expert": expert,
"strategy": strategy,
"epoch": epoch,
"phase": phase,
**metrics,
}
self._entries.append(entry)
# Write to local file
with open(self.LOG_PATH, "a") as f:
f.write(json.dumps(entry) + "\n")
log.info(f"πŸ“Š [{expert}] epoch={epoch} phase={phase} "
f"acc={metrics.get('accuracy',0):.3f} "
f"sharpe={metrics.get('sharpe',0):.3f} "
f"n={metrics.get('n_samples',0)}")
def push_to_hf(self):
if not os.path.exists(self.LOG_PATH):
return
try:
from huggingface_hub import HfApi
api = HfApi(token=HF_TOKEN)
api.upload_file(
path_or_fileobj=self.LOG_PATH,
path_in_repo="logs/training_log.jsonl",
repo_id=EXPERIENCE_REPO, repo_type="dataset", token=HF_TOKEN,
commit_message=f"Training log {datetime.now(timezone.utc).strftime('%Y%m%d_%H%M')}",
)
except Exception as e:
log.warning(f"Log push failed: {e}")
def get_summary(self) -> dict:
if not self._entries:
return {}
by_expert = {}
for e in self._entries:
ex = e["expert"]
if ex not in by_expert:
by_expert[ex] = []
by_expert[ex].append(e)
return {
ex: {
"n_epochs": len(runs),
"best_sharpe": max(r.get("sharpe", 0) for r in runs),
"last_acc": runs[-1].get("accuracy", 0),
"last_epoch": runs[-1]["epoch"],
}
for ex, runs in by_expert.items()
}
# ─────────────────────────────────────────────────────────────────
# 4. MULTI-STRATEGY DEFINITIONS
# ─────────────────────────────────────────────────────────────────
STRATEGIES = {
"momentum": {
"description": "Trend following β€” ADX, EMA, Momentum",
"expert_features": {
"E1": ["obi","vpin","entropy","cvd_norm","absorption","tape_speed","large_trade"],
"E2": ["ema_cross_fs","ema_cross_sl","adx","plus_di","minus_di","momentum_zscore","trend_strength","vol_surge","macd_hist"],
"E3": ["zscore_60","rsi","hurst","bb_pct_b","mean_rev_signal"],
"E4": ["market_structure","bos","wyckoff_phase","fib_618_prox","ob_score"],
"E5": ["funding","funding_trend","oi_delta","ls_ratio","session_ny","session_overlap"],
},
"regime_filter": ["TRENDING"],
"target_theories": ["momentum_trend","elliott_wave3_strong","dow_markup","swing_htf_pullback"],
},
"mean_reversion": {
"description": "Range trading β€” Z-score, RSI, Bollinger",
"expert_features": {
"E1": ["obi","spread_pct","vpin","entropy","absorption","poc_proximity","vol_skew"],
"E2": ["adx","ema_cross_fs","momentum_zscore","trend_strength"],
"E3": ["zscore_60","zscore_300","rsi","hurst","bb_pct_b","bb_width","vwap_dev","mean_rev_signal"],
"E4": ["market_structure","bos","pivot_dist_h","pivot_dist_l","wyckoff_phase","fvg_score"],
"E5": ["funding","oi_delta","ls_ratio","session_asian","session_london"],
},
"regime_filter": ["RANGING","QUIET"],
"target_theories": ["mean_reversion","vsa_no_supply","vsa_no_demand","wyckoff_spring","volprofile_poc_bounce_bull"],
},
"smart_money": {
"description": "Wyckoff + ICT β€” Order Blocks, Spring, FVG",
"expert_features": {
"E1": ["obi","vpin","absorption","cvd_norm","large_trade","vol_skew"],
"E2": ["adx","trend_strength","ema_cross_sl","momentum_zscore"],
"E3": ["zscore_60","hurst","rsi","bb_pct_b"],
"E4": ["market_structure","bos","wyckoff_phase","fib_618_prox","fvg_score","ob_score","pin_bar","engulfing"],
"E5": ["funding","oi_delta","ls_ratio","macro_squeeze_risk"],
},
"regime_filter": ["RANGING","VOLATILE"],
"target_theories": ["wyckoff_spring","wyckoff_SOS","ict_bull_order_block","ict_fvg_fill_bull","macro_short_squeeze"],
},
"scalping": {
"description": "Microstructure β€” Tape, OBI, VPIN, tight SL",
"expert_features": {
"E1": ["obi","spread_pct","vpin","entropy","cvd_norm","tape_speed","large_trade","vol_adjusted_spread"],
"E2": ["adx","momentum_zscore","ema_cross_fs"],
"E3": ["zscore_60","rsi","vwap_dev"],
"E4": ["bos","pin_bar","fvg_score"],
"E5": ["session_overlap","session_ny","session_london"],
},
"regime_filter": ["VOLATILE","TRENDING"],
"target_theories": ["scalping_orderflow","scalping_momentum_burst","micro_informed_buy","micro_informed_sell"],
},
}
# ─────────────────────────────────────────────────────────────────
# 5. MAIN TRAINING ENGINE
# ─────────────────────────────────────────────────────────────────
class TrainingEngine:
"""
Orchestrates the complete training pipeline for all strategies.
Runs as background thread in Coordinator Space.
"""
def __init__(self):
self.curriculum = CurriculumScheduler()
self.replay = ExperienceReplayBuffer()
self.log = TrainingLog()
self._theory_df = None
self._real_df = None
def _load_theory_data(self) -> pd.DataFrame | None:
if self._theory_df is not None:
return self._theory_df
try:
from huggingface_hub import hf_hub_download
local = hf_hub_download(
repo_id=EXPERIENCE_REPO,
filename="theory/synthetic_pretrain_v3.parquet",
repo_type="dataset", token=HF_TOKEN,
cache_dir="/tmp/theory_cache",
)
self._theory_df = pd.read_parquet(local)
log.info(f"πŸ“š Theory data: {len(self._theory_df):,} examples")
# Seed replay buffer with theory data (high priority)
self.replay.add_theory_data(self._theory_df)
return self._theory_df
except Exception as e:
log.warning(f"Cannot load theory data: {e}")
return None
def _load_real_data(self, max_files: int = 100) -> pd.DataFrame | None:
try:
from huggingface_hub import list_repo_files, hf_hub_download
files = [f for f in list_repo_files(EXPERIENCE_REPO, repo_type="dataset", token=HF_TOKEN)
if f.startswith("experiences/") and f.endswith(".parquet")]
if not files:
return None
dfs = []
for fp in files[:max_files]:
try:
local = hf_hub_download(repo_id=EXPERIENCE_REPO, filename=fp,
repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/exp_cache")
dfs.append(pd.read_parquet(local))
except Exception:
pass
if not dfs:
return None
self._real_df = pd.concat(dfs, ignore_index=True).fillna(0.0)
self.replay.add_real_data(self._real_df)
log.info(f"πŸ“ˆ Real data: {len(self._real_df):,} rows")
return self._real_df
except Exception as e:
log.warning(f"Cannot load real data: {e}")
return None
def _get_features_for_strategy(self, df: pd.DataFrame,
expert_id: str, strategy_name: str) -> tuple:
"""Extract feature columns for a specific expert Γ— strategy."""
strat = STRATEGIES.get(strategy_name, STRATEGIES["momentum"])
feat_cols = strat["expert_features"].get(expert_id, [])
# Keep only columns that exist in df
available = [c for c in feat_cols if c in df.columns]
if len(available) < 2:
# Fallback: use all numeric columns
available = [c for c in df.select_dtypes(include=[np.number]).columns
if c not in ("label","actual_pnl","timestamp")]
return available
def _compute_accuracy_per_theory(self, model, df: pd.DataFrame, feat_cols: list) -> dict:
"""Per-theory accuracy for curriculum updates."""
results = {}
if "theory" not in df.columns:
return results
for theory in df["theory"].unique():
sub = df[df["theory"] == theory]
if len(sub) < 10:
continue
X = sub[feat_cols].fillna(0).values
y = sub["label"].values
try:
preds = model.predict(X)
acc = float(np.mean(preds == y))
results[theory] = acc
self.curriculum.update(theory, acc, len(sub))
except Exception:
pass
return results
def train_strategy(self, strategy_name: str, n_epochs: int = 5) -> dict:
"""
Full training pipeline for one strategy.
Returns dict with metrics per expert.
"""
log.info(f"🎯 Training strategy: {strategy_name}")
strat = STRATEGIES.get(strategy_name)
if not strat:
return {"error": f"Unknown strategy: {strategy_name}"}
df_theory = self._load_theory_data()
df_real = self._load_real_data()
results = {}
for expert_id in ["E1","E2","E3","E4","E5"]:
log.info(f" πŸ”· {strategy_name}/{expert_id}")
expert_name = f"{strategy_name}_{expert_id.lower()}"
feat_cols = self._get_features_for_strategy(
df_theory if df_theory is not None else pd.DataFrame(),
expert_id, strategy_name
)
if not feat_cols:
log.warning(f" ⚠️ No features for {expert_id}, skip")
continue
# Load warm-start checkpoint
from model_registry import load_best_checkpoint, register_model
model, prev_version = load_best_checkpoint(expert_name)
if model is None:
model = self._build_model()
# ── TRAINING LOOP ────────────────────────────────
for epoch in range(n_epochs):
batches = []
# (A) Curriculum-scheduled theory data
if df_theory is not None and len(df_theory) > 0:
batch_theory = self.curriculum.schedule_epoch(df_theory, epoch)
feats_t = [c for c in feat_cols if c in batch_theory.columns]
if feats_t:
for c in feat_cols:
if c not in batch_theory.columns:
batch_theory[c] = 0.0
batches.append(batch_theory[feat_cols + ["label"]].fillna(0.0))
# (B) Real data fine-tuning (from epoch 2 onward)
if epoch >= 2 and df_real is not None and len(df_real) > 1000:
for c in feat_cols:
if c not in df_real.columns:
df_real[c] = 0.0
batches.append(df_real[feat_cols + ["label"]].fillna(0.0).head(5000))
# (C) Replay buffer β€” anti-catastrophic-forgetting
replay_sample = self.replay.sample(min(2000, len(self.replay)))
if replay_sample is not None and len(replay_sample) > 100:
for c in feat_cols:
if c not in replay_sample.columns:
replay_sample[c] = 0.0
if "label" in replay_sample.columns:
batches.append(replay_sample[feat_cols + ["label"]].fillna(0.0))
if not batches:
log.warning(f" ⚠️ No training data for {expert_id} epoch {epoch}")
break
# Merge, shuffle, class-balance via sample_weight
df_batch = pd.concat(batches, ignore_index=True)
df_batch = df_batch.sample(frac=1, random_state=epoch).reset_index(drop=True)
X = df_batch[feat_cols].values.astype(np.float32)
y = df_batch["label"].values.astype(np.int32)
# Class-balanced sample weights (fix bias toward label=0)
from sklearn.utils.class_weight import compute_sample_weight
try:
sw = compute_sample_weight("balanced", y)
except Exception:
sw = None
# Train
try:
if sw is not None:
model.fit(X, y, sample_weight=sw)
else:
model.fit(X, y)
except TypeError: # some estimators don't support sample_weight
model.fit(X, y)
except Exception as e:
log.warning(f" ❌ Fit error epoch {epoch}: {e}")
break
# Metrics
preds = model.predict(X)
acc = float(np.mean(preds == y))
conf = model.predict_proba(X)
max_conf = np.max(conf, axis=1)
sharpe = (float(np.mean(max_conf)) - 0.5) / (float(np.std(max_conf)) + 1e-9) * math.sqrt(len(max_conf))
# Theory accuracy for curriculum
if df_theory is not None:
self._compute_accuracy_per_theory(model, df_batch, feat_cols)
# Log
self.log.record(expert_name, epoch,
{"accuracy": round(acc,4), "sharpe": round(sharpe,4),
"n_samples": len(X), "n_feat": len(feat_cols)},
phase="theory" if epoch < 2 else "fine_tune",
strategy=strategy_name)
# ── Final metrics β€” guard: df_batch may not exist if training skipped ──
if df_batch is None or len(df_batch) == 0:
log.warning(f" ⚠️ {strategy_name}: no training data produced, skipping register")
continue
final_X = df_batch[feat_cols].values.astype(np.float32)
final_conf = model.predict_proba(final_X)
max_conf = np.max(final_conf, axis=1)
final_sharpe = (float(np.mean(max_conf)) - 0.5) / (float(np.std(max_conf)) + 1e-9) * math.sqrt(len(max_conf))
final_acc = float(np.mean(model.predict(final_X) == df_batch["label"].values))
accepted = register_model(expert_name, model, {
"sharpe": round(final_sharpe, 4),
"accuracy": round(final_acc, 4),
"n_samples": len(final_X),
})
results[expert_id] = {
"sharpe": round(final_sharpe, 4),
"accuracy": round(final_acc, 4),
"accepted": accepted,
}
# Push log
self.log.push_to_hf()
summary = self.log.get_summary()
log.info(f"βœ… Strategy {strategy_name} done: {results}")
return {"strategy": strategy_name, "experts": results, "summary": summary}
def _build_model(self):
try:
from sklearn.ensemble import GradientBoostingClassifier
return GradientBoostingClassifier(
n_estimators=100, max_depth=4, learning_rate=0.1,
subsample=0.8, random_state=42,
)
except ImportError:
from sklearn.ensemble import RandomForestClassifier
return RandomForestClassifier(n_estimators=100, max_depth=5, n_jobs=-1, random_state=42)
def run_all_strategies(self):
"""Train all strategies sequentially."""
log.info("🏭 Training Engine: starting all strategies")
for strategy_name in STRATEGIES:
try:
result = self.train_strategy(strategy_name, n_epochs=5)
log.info(f"βœ… {strategy_name}: {result.get('experts',{})}")
except Exception as e:
log.error(f"❌ {strategy_name}: {e}", exc_info=True)
# ─────────────────────────────────────────────────────────────────
# Background loop for Coordinator
# ─────────────────────────────────────────────────────────────────
def run_training_loop(check_interval: int = 3600):
"""
Run training periodically.
Triggered by: enough experiences + coordinator health.
"""
log.info(f"πŸŽ“ TrainingEngine loop started (interval={check_interval}s)")
engine = TrainingEngine()
while True:
try:
engine.run_all_strategies()
except Exception as e:
log.error(f"❌ Training loop error: {e}", exc_info=True)
log.info(f"πŸ’€ Training done. Next in {check_interval//3600}h")
time.sleep(check_interval)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
engine = TrainingEngine()
result = engine.train_strategy("momentum", n_epochs=2)
print(json.dumps(result, indent=2))