""" T5 — Validation Layer: Walk-forward OOS, Regime-specific, Benchmark T6 — Strategy Research Lab: Hypothesis generation + graveyard T7 — Forge Controller: Closed-loop brain running on Coordinator Space """ from __future__ import annotations import os, json, math, logging, time from datetime import datetime, timezone from pathlib import Path import numpy as np import pandas as pd log = logging.getLogger("forge_controller") HF_TOKEN = os.environ.get("HF_TOKEN", "") EXPERIENCE_REPO = os.environ.get("EXPERIENCE_REPO", "gionuibk/aetheris-experiences") MODEL_REPO = os.environ.get("MODEL_REPO", "gionuibk/aetheris-models") COORD_URL = os.environ.get("COORDINATOR_URL", "https://gionuibk-hpll-datareview.hf.space") WORKER_SPACES = [f"gionuibk/HPLL-Worker-{i:02d}" for i in range(40)] # all 40 workers # ══════════════════════════════════════════════════════════════════ # T5 — EXAM ENGINE (Walk-forward validation) # ══════════════════════════════════════════════════════════════════ class ExamEngine: """ Validates a model using walk-forward OOS windows. 'Thi cử' — thi trên data chưa thấy bao giờ. """ N_WINDOWS = 5 # 5-fold walk-forward MIN_PASS_SHARPE = 0.8 def walk_forward_validate(self, model, df: pd.DataFrame, feat_cols: list, name: str = "") -> dict: """ Split data into N time-ordered folds. Each fold: train on first k, test on k+1. No lookahead bias. """ df = df.sort_values("timestamp") if "timestamp" in df.columns else df n = len(df) window_size = n // (self.N_WINDOWS + 1) if window_size < 50: return {"status": "insufficient_data", "n": n} fold_metrics = [] for i in range(self.N_WINDOWS): train_end = window_size * (i + 1) test_end = min(train_end + window_size, n) df_test = df.iloc[train_end:test_end] if len(df_test) < 20: continue for c in feat_cols: if c not in df_test.columns: df_test = df_test.copy() df_test[c] = 0.0 X_test = df_test[feat_cols].fillna(0).values.astype(np.float32) y_test = df_test["label"].values.astype(np.int32) try: preds = model.predict(X_test) probs = model.predict_proba(X_test) acc = float(np.mean(preds == y_test)) conf = np.max(probs, axis=1) sharpe = (float(np.mean(conf)) - 0.5) / (float(np.std(conf)) + 1e-9) * math.sqrt(len(conf)) # Win rate on non-flat predictions non_flat = [(t, p) for t, p in zip(y_test, preds) if p != 0] win_rate = sum(t == p for t, p in non_flat) / max(len(non_flat), 1) fold_metrics.append({"fold": i, "acc": acc, "sharpe": sharpe, "win_rate": win_rate, "n": len(y_test)}) except Exception as e: log.warning(f"Fold {i} error: {e}") if not fold_metrics: return {"status": "no_folds"} avg_sharpe = sum(m["sharpe"] for m in fold_metrics) / len(fold_metrics) avg_acc = sum(m["acc"] for m in fold_metrics) / len(fold_metrics) avg_wr = sum(m["win_rate"] for m in fold_metrics) / len(fold_metrics) passed = avg_sharpe >= self.MIN_PASS_SHARPE return { "status": "pass" if passed else "fail", "avg_sharpe": round(avg_sharpe, 4), "avg_accuracy":round(avg_acc, 4), "avg_win_rate":round(avg_wr, 4), "n_folds": len(fold_metrics), "fold_details":fold_metrics, "model_name": name, } def benchmark_vs_random(self, model, X_test, y_test) -> dict: """Compare model Sharpe vs random baseline.""" try: preds = model.predict(X_test) acc = float(np.mean(preds == y_test)) # Random baseline random_preds = np.random.choice([-1, 0, 1], size=len(y_test)) random_acc = float(np.mean(random_preds == y_test)) edge = acc - random_acc return {"model_acc": round(acc,4), "random_acc": round(random_acc,4), "edge": round(edge,4)} except Exception as e: return {"error": str(e)} def regime_validate(self, model, df: pd.DataFrame, feat_cols: list) -> dict: """Test model performance in each detected regime.""" results = {} # Detect regime from features for regime_name, condition_fn in [ ("TRENDING", lambda d: d["adx"] > 25 if "adx" in d.columns else pd.Series([True]*len(d))), ("RANGING", lambda d: d["adx"] < 20 if "adx" in d.columns else pd.Series([True]*len(d))), ]: try: mask = condition_fn(df) sub = df[mask] if len(sub) < 30: continue for c in feat_cols: if c not in sub.columns: sub = sub.copy(); sub[c] = 0.0 X = sub[feat_cols].fillna(0).values.astype(np.float32) y = sub["label"].values preds = model.predict(X) acc = float(np.mean(preds == y)) results[regime_name] = {"acc": round(acc,4), "n": len(y)} except Exception: pass return results # ══════════════════════════════════════════════════════════════════ # T6 — STRATEGY RESEARCH LAB (Full version) # ══════════════════════════════════════════════════════════════════ GRAVEYARD_FILE = "strategy_graveyard.json" REGISTRY_FILE = "strategy_registry.json" CANDIDATE_LIMIT = 50 # max candidates in registry class StrategyResearchLab: """ Auto-discovers new strategy combinations. Generates hypotheses from Theory Registry rules. Tests them quickly. Saves winners. Remembers losers. """ def __init__(self): self._graveyard: dict = self._load_json(GRAVEYARD_FILE, {"failed": {}, "count": 0}) self._registry: list = self._load_json(REGISTRY_FILE, []) def _load_json(self, path: str, default): try: with open(path) as f: return json.load(f) except Exception: return default def _save_json(self, path: str, data): with open(path, "w") as f: json.dump(data, f, indent=2) def _in_graveyard(self, strategy_id: str) -> bool: return strategy_id in self._graveyard.get("failed", {}) def _bury(self, strategy_id: str, reason: str): self._graveyard.setdefault("failed", {})[strategy_id] = { "reason": reason, "ts": datetime.now(timezone.utc).isoformat() } self._graveyard["count"] = len(self._graveyard["failed"]) self._save_json(GRAVEYARD_FILE, self._graveyard) def _register(self, strategy: dict): self._registry.append(strategy) # Keep only top candidates by score self._registry.sort(key=lambda x: x.get("eval_score", 0), reverse=True) self._registry = self._registry[:CANDIDATE_LIMIT] self._save_json(REGISTRY_FILE, self._registry) def generate_hypotheses(self, n: int = 20) -> list[dict]: """Generate n candidate strategies by combining theory rules.""" try: from theory_registry import get_all_rules, get_schools except ImportError: return [] import random rules = get_all_rules() schools = [s for s in get_schools() if s != "risk_management"] hypotheses = [] for _ in range(n * 3): # generate 3× more to account for graveyard rejects if len(hypotheses) >= n: break # Pick 2 complementary schools s1, s2 = random.sample(schools, 2) rules_s1 = [r for r in rules if r["school"] == s1 and r["label"] != 0] rules_s2 = [r for r in rules if r["school"] == s2 and r["label"] != 0] if not rules_s1 or not rules_s2: continue r1 = random.choice(rules_s1) r2 = random.choice(rules_s2) # Only combine rules with same direction if r1["label"] != r2["label"]: continue sid = f"{r1['theory_id']}__{r2['theory_id']}" if self._in_graveyard(sid): continue # Find common regimes regimes = set(r1.get("regime", ["ANY"])) & set(r2.get("regime", ["ANY"])) if not regimes: regimes = {"TRENDING", "RANGING"} avg_conf = (r1["confidence"] + r2["confidence"]) / 2 hypotheses.append({ "strategy_id": sid, "label": r1["label"], "schools": [s1, s2], "rules": [r1["theory_id"], r2["theory_id"]], "regime": list(regimes), "confidence": round(avg_conf, 3), "timeframe": r1.get("timeframe", "any"), "created_at": datetime.now(timezone.utc).isoformat(), "status": "hypothesis", }) return hypotheses def quick_eval(self, hypothesis: dict) -> float: """ Fast heuristic evaluation without backtesting. Uses: confidence, number of confirming conditions, school diversity. """ conf = hypothesis.get("confidence", 0.5) n_rules = len(hypothesis.get("rules", [])) n_schools = len(set(hypothesis.get("schools", []))) # Score: confidence × rule depth × school diversity score = conf * (1 + (n_rules - 1) * 0.1) * (1 + (n_schools - 1) * 0.15) return min(round(score, 4), 1.0) def run_research_cycle(self, n_hypotheses: int = 20) -> dict: """One research cycle: generate → evaluate → register or bury.""" hypotheses = self.generate_hypotheses(n_hypotheses) accepted = [] rejected = [] threshold = 0.60 for hyp in hypotheses: score = self.quick_eval(hyp) hyp["eval_score"] = score if score >= threshold: hyp["status"] = "candidate" self._register(hyp) accepted.append(hyp["strategy_id"]) else: self._bury(hyp["strategy_id"], f"low_score={score:.3f}") rejected.append(hyp["strategy_id"]) graveyard_size = self._graveyard.get("count", 0) log.info(f"🔬 Research: {len(accepted)}/{n_hypotheses} accepted | " f"registry={len(self._registry)} | graveyard={graveyard_size}") return { "accepted": len(accepted), "rejected": len(rejected), "registry_size": len(self._registry), "graveyard_size": graveyard_size, } def get_best_candidates(self, n: int = 5) -> list[dict]: return self._registry[:n] # ══════════════════════════════════════════════════════════════════ # T7 — FORGE CONTROLLER (Closed-loop brain) # ══════════════════════════════════════════════════════════════════ class ForgeController: """ The brain of the lò luyện. Runs 24/7 as background thread in Coordinator Space. Automatically progresses through all training stages. No manual intervention needed. """ THRESHOLDS = { "min_experiences_for_training": 20_000, "min_sharpe_expert": 0.5, "min_experts_for_meta": 3, "min_sharpe_meta": 1.5, } def __init__(self): self.exam = ExamEngine() self.lab = StrategyResearchLab() self._state_file = "forge_state.json" self._state = self._load_state() def _load_state(self) -> dict: try: with open(self._state_file) as f: return json.load(f) except Exception: return {"phase": "collecting", "ready_experts": [], "meta_ready": False, "last_checked": None, "cycle": 0} def _save_state(self): self._state["last_checked"] = datetime.now(timezone.utc).isoformat() with open(self._state_file, "w") as f: json.dump(self._state, f, indent=2) def _count_experiences(self) -> int: try: from huggingface_hub import list_repo_files files = [f for f in list_repo_files(EXPERIENCE_REPO, repo_type="dataset", token=HF_TOKEN) if f.startswith("experiences/") and f.endswith(".parquet")] return len(files) * 200 # estimate except Exception: return 0 def _get_ready_experts(self) -> list[str]: try: from model_registry import get_best_sharpe ready = [] for strat in ["momentum","mean_reversion","smart_money","scalping"]: for eid in ["E1","E2","E3","E4","E5"]: name = f"{strat}_{eid.lower()}" sh = get_best_sharpe(name) if sh >= self.THRESHOLDS["min_sharpe_expert"]: ready.append(name) return ready except Exception: return [] def _seed_task(self, task_type: str, params: dict = None): try: import requests payload = { "task_type": task_type, "priority": 3, "params": params or {}, "task_id": f"auto_{task_type.lower()}_{datetime.now().strftime('%Y%m%d_%H%M')}", } r = requests.post(f"{COORD_URL}/admin/add_task", json=payload, timeout=10) log.info(f"📤 Seeded {task_type}: HTTP {r.status_code}") except Exception as e: log.warning(f"Seed {task_type} failed: {e}") def _restart_dead_worker(self, space_name: str): """Restart a dead HF Space worker via API.""" try: import requests headers = {"Authorization": f"Bearer {HF_TOKEN}"} r = requests.post( f"https://huggingface.co/api/spaces/{space_name}/restart", headers=headers, timeout=15 ) log.info(f" 🔄 Restart {space_name}: HTTP {r.status_code}") return r.status_code == 200 except Exception as e: log.warning(f" ⚠️ Restart {space_name} failed: {e}") return False def self_heal(self) -> dict: """ T7 Self-healing: check worker health via coordinator heartbeat. Restart workers that haven’t reported in > 30 minutes. """ healed = [] try: import requests r = requests.get(f"{COORD_URL}/worker_status", timeout=10) if r.status_code != 200: return {"status": "coord_unreachable"} workers = r.json().get("workers", {}) now = time.time() for wid, info in workers.items(): last_seen = info.get("last_seen", 0) age_min = (now - last_seen) / 60 if age_min > 30: # dead if no heartbeat for 30+ min space = f"gionuibk/HPLL-Worker-{wid.zfill(2)}" log.warning(f" 🚨 Worker {wid} dead ({age_min:.0f}min) — restarting {space}") ok = self._restart_dead_worker(space) if ok: healed.append(wid) except Exception as e: log.warning(f"Self-heal check failed: {e}") if healed: log.info(f" ✅ Healed {len(healed)} workers: {healed}") return {"healed": healed, "n_healed": len(healed)} def _load_recent_experiences(self, max_rows: int = 5000) -> pd.DataFrame | None: """Load sample of recent experience data for deep research.""" try: from huggingface_hub import list_repo_files, hf_hub_download files = sorted([ f for f in list_repo_files(EXPERIENCE_REPO, repo_type="dataset", token=HF_TOKEN) if f.startswith("experiences/") and f.endswith(".parquet") ])[-20:] # last 20 files dfs = [] for fp in files: local = hf_hub_download(repo_id=EXPERIENCE_REPO, filename=fp, repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/exp_cache") dfs.append(pd.read_parquet(local)) if not dfs: return None df = pd.concat(dfs, ignore_index=True).fillna(0.0) return df.sample(min(max_rows, len(df)), random_state=42) except Exception as e: log.warning(f"Load experiences: {e}") return None def assess_and_act(self) -> dict: """ One iteration of the control loop. Check state → decide action → execute. """ self._state["cycle"] = self._state.get("cycle", 0) + 1 n_exp = self._count_experiences() ready = self._get_ready_experts() action = "idle" log.info(f"🔄 Cycle {self._state['cycle']} | exp={n_exp:,} | ready_experts={len(ready)}") # PHASE: Not enough data yet if n_exp < self.THRESHOLDS["min_experiences_for_training"]: log.info(f" ⏳ Collecting data ({n_exp:,}/{self.THRESHOLDS['min_experiences_for_training']:,})") action = "collecting" # PHASE: Enough data → train experts elif len(ready) < 5: log.info(f" 🎓 Triggering expert training ({len(ready)}/5 ready)") for strat in ["momentum","mean_reversion","smart_money","scalping"]: self._seed_task(f"TRAIN_EXPERT_{strat}", {"strategy": strat}) action = "training_experts" # PHASE: Enough experts → train meta elif len(ready) >= self.THRESHOLDS["min_experts_for_meta"] and not self._state.get("meta_ready"): log.info(f" 🧠 Triggering Meta training ({len(ready)} experts ready)") self._seed_task("TRAIN_META") action = "training_meta" # PHASE: Running — deep research + self-heal else: log.info(f" 🔬 Running: deep research + self-heal") # Research df_exp = self._load_recent_experiences() if df_exp is not None and len(df_exp) > 200: try: from hypothesis_backtester import run_deep_research_cycle res = run_deep_research_cycle(df_exp) log.info(f" 📊 Research: {res}") except Exception as e: log.warning(f" Research failed: {e}") # Self-heal heal = self.self_heal() action = "running" self._state["phase"] = action self._state["ready_experts"] = ready self._state["n_experiences"] = n_exp self._save_state() return {"action": action, "n_exp": n_exp, "ready": len(ready)} def run_forge_controller(interval: int = 900): """Main loop — runs every 15 minutes in Coordinator Space.""" log.info(f"🏭 ForgeController started | interval={interval}s") controller = ForgeController() while True: try: result = controller.assess_and_act() log.info(f"✅ Cycle done: {result}") except Exception as e: log.error(f"❌ Forge error: {e}", exc_info=True) time.sleep(interval)