Spaces:
Running
Running
| """ | |
| T5 β Validation Layer: Walk-forward OOS, Regime-specific, Benchmark | |
| T6 β Strategy Research Lab: Hypothesis generation + graveyard | |
| T7 β Forge Controller: Closed-loop brain running on Coordinator Space | |
| """ | |
| from __future__ import annotations | |
| import os, json, math, logging, time | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| log = logging.getLogger("forge_controller") | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| EXPERIENCE_REPO = os.environ.get("EXPERIENCE_REPO", "gionuibk/aetheris-experiences") | |
| MODEL_REPO = os.environ.get("MODEL_REPO", "gionuibk/aetheris-models") | |
| COORD_URL = os.environ.get("COORDINATOR_URL", "https://gionuibk-hpll-datareview.hf.space") | |
| WORKER_SPACES = [f"gionuibk/HPLL-Worker-{i:02d}" for i in range(40)] # all 40 workers | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # T5 β EXAM ENGINE (Walk-forward validation) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ExamEngine: | |
| """ | |
| Validates a model using walk-forward OOS windows. | |
| 'Thi cα»' β thi trΓͺn data chΖ°a thαΊ₯y bao giα». | |
| """ | |
| N_WINDOWS = 5 # 5-fold walk-forward | |
| MIN_PASS_SHARPE = 0.8 | |
| def walk_forward_validate(self, model, df: pd.DataFrame, | |
| feat_cols: list, name: str = "") -> dict: | |
| """ | |
| Split data into N time-ordered folds. | |
| Each fold: train on first k, test on k+1. | |
| No lookahead bias. | |
| """ | |
| df = df.sort_values("timestamp") if "timestamp" in df.columns else df | |
| n = len(df) | |
| window_size = n // (self.N_WINDOWS + 1) | |
| if window_size < 50: | |
| return {"status": "insufficient_data", "n": n} | |
| fold_metrics = [] | |
| for i in range(self.N_WINDOWS): | |
| train_end = window_size * (i + 1) | |
| test_end = min(train_end + window_size, n) | |
| df_test = df.iloc[train_end:test_end] | |
| if len(df_test) < 20: | |
| continue | |
| for c in feat_cols: | |
| if c not in df_test.columns: | |
| df_test = df_test.copy() | |
| df_test[c] = 0.0 | |
| X_test = df_test[feat_cols].fillna(0).values.astype(np.float32) | |
| y_test = df_test["label"].values.astype(np.int32) | |
| try: | |
| preds = model.predict(X_test) | |
| probs = model.predict_proba(X_test) | |
| acc = float(np.mean(preds == y_test)) | |
| conf = np.max(probs, axis=1) | |
| sharpe = (float(np.mean(conf)) - 0.5) / (float(np.std(conf)) + 1e-9) * math.sqrt(len(conf)) | |
| # Win rate on non-flat predictions | |
| non_flat = [(t, p) for t, p in zip(y_test, preds) if p != 0] | |
| win_rate = sum(t == p for t, p in non_flat) / max(len(non_flat), 1) | |
| fold_metrics.append({"fold": i, "acc": acc, "sharpe": sharpe, "win_rate": win_rate, "n": len(y_test)}) | |
| except Exception as e: | |
| log.warning(f"Fold {i} error: {e}") | |
| if not fold_metrics: | |
| return {"status": "no_folds"} | |
| avg_sharpe = sum(m["sharpe"] for m in fold_metrics) / len(fold_metrics) | |
| avg_acc = sum(m["acc"] for m in fold_metrics) / len(fold_metrics) | |
| avg_wr = sum(m["win_rate"] for m in fold_metrics) / len(fold_metrics) | |
| passed = avg_sharpe >= self.MIN_PASS_SHARPE | |
| return { | |
| "status": "pass" if passed else "fail", | |
| "avg_sharpe": round(avg_sharpe, 4), | |
| "avg_accuracy":round(avg_acc, 4), | |
| "avg_win_rate":round(avg_wr, 4), | |
| "n_folds": len(fold_metrics), | |
| "fold_details":fold_metrics, | |
| "model_name": name, | |
| } | |
| def benchmark_vs_random(self, model, X_test, y_test) -> dict: | |
| """Compare model Sharpe vs random baseline.""" | |
| try: | |
| preds = model.predict(X_test) | |
| acc = float(np.mean(preds == y_test)) | |
| # Random baseline | |
| random_preds = np.random.choice([-1, 0, 1], size=len(y_test)) | |
| random_acc = float(np.mean(random_preds == y_test)) | |
| edge = acc - random_acc | |
| return {"model_acc": round(acc,4), "random_acc": round(random_acc,4), "edge": round(edge,4)} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def regime_validate(self, model, df: pd.DataFrame, feat_cols: list) -> dict: | |
| """Test model performance in each detected regime.""" | |
| results = {} | |
| # Detect regime from features | |
| for regime_name, condition_fn in [ | |
| ("TRENDING", lambda d: d["adx"] > 25 if "adx" in d.columns else pd.Series([True]*len(d))), | |
| ("RANGING", lambda d: d["adx"] < 20 if "adx" in d.columns else pd.Series([True]*len(d))), | |
| ]: | |
| try: | |
| mask = condition_fn(df) | |
| sub = df[mask] | |
| if len(sub) < 30: | |
| continue | |
| for c in feat_cols: | |
| if c not in sub.columns: | |
| sub = sub.copy(); sub[c] = 0.0 | |
| X = sub[feat_cols].fillna(0).values.astype(np.float32) | |
| y = sub["label"].values | |
| preds = model.predict(X) | |
| acc = float(np.mean(preds == y)) | |
| results[regime_name] = {"acc": round(acc,4), "n": len(y)} | |
| except Exception: | |
| pass | |
| return results | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # T6 β STRATEGY RESEARCH LAB (Full version) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| GRAVEYARD_FILE = "strategy_graveyard.json" | |
| REGISTRY_FILE = "strategy_registry.json" | |
| CANDIDATE_LIMIT = 50 # max candidates in registry | |
| class StrategyResearchLab: | |
| """ | |
| Auto-discovers new strategy combinations. | |
| Generates hypotheses from Theory Registry rules. | |
| Tests them quickly. Saves winners. Remembers losers. | |
| """ | |
| def __init__(self): | |
| self._graveyard: dict = self._load_json(GRAVEYARD_FILE, {"failed": {}, "count": 0}) | |
| self._registry: list = self._load_json(REGISTRY_FILE, []) | |
| def _load_json(self, path: str, default): | |
| try: | |
| with open(path) as f: return json.load(f) | |
| except Exception: return default | |
| def _save_json(self, path: str, data): | |
| with open(path, "w") as f: json.dump(data, f, indent=2) | |
| def _in_graveyard(self, strategy_id: str) -> bool: | |
| return strategy_id in self._graveyard.get("failed", {}) | |
| def _bury(self, strategy_id: str, reason: str): | |
| self._graveyard.setdefault("failed", {})[strategy_id] = { | |
| "reason": reason, "ts": datetime.now(timezone.utc).isoformat() | |
| } | |
| self._graveyard["count"] = len(self._graveyard["failed"]) | |
| self._save_json(GRAVEYARD_FILE, self._graveyard) | |
| def _register(self, strategy: dict): | |
| self._registry.append(strategy) | |
| # Keep only top candidates by score | |
| self._registry.sort(key=lambda x: x.get("eval_score", 0), reverse=True) | |
| self._registry = self._registry[:CANDIDATE_LIMIT] | |
| self._save_json(REGISTRY_FILE, self._registry) | |
| def generate_hypotheses(self, n: int = 20) -> list[dict]: | |
| """Generate n candidate strategies by combining theory rules.""" | |
| try: | |
| from theory_registry import get_all_rules, get_schools | |
| except ImportError: | |
| return [] | |
| import random | |
| rules = get_all_rules() | |
| schools = [s for s in get_schools() if s != "risk_management"] | |
| hypotheses = [] | |
| for _ in range(n * 3): # generate 3Γ more to account for graveyard rejects | |
| if len(hypotheses) >= n: | |
| break | |
| # Pick 2 complementary schools | |
| s1, s2 = random.sample(schools, 2) | |
| rules_s1 = [r for r in rules if r["school"] == s1 and r["label"] != 0] | |
| rules_s2 = [r for r in rules if r["school"] == s2 and r["label"] != 0] | |
| if not rules_s1 or not rules_s2: | |
| continue | |
| r1 = random.choice(rules_s1) | |
| r2 = random.choice(rules_s2) | |
| # Only combine rules with same direction | |
| if r1["label"] != r2["label"]: | |
| continue | |
| sid = f"{r1['theory_id']}__{r2['theory_id']}" | |
| if self._in_graveyard(sid): | |
| continue | |
| # Find common regimes | |
| regimes = set(r1.get("regime", ["ANY"])) & set(r2.get("regime", ["ANY"])) | |
| if not regimes: | |
| regimes = {"TRENDING", "RANGING"} | |
| avg_conf = (r1["confidence"] + r2["confidence"]) / 2 | |
| hypotheses.append({ | |
| "strategy_id": sid, | |
| "label": r1["label"], | |
| "schools": [s1, s2], | |
| "rules": [r1["theory_id"], r2["theory_id"]], | |
| "regime": list(regimes), | |
| "confidence": round(avg_conf, 3), | |
| "timeframe": r1.get("timeframe", "any"), | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "status": "hypothesis", | |
| }) | |
| return hypotheses | |
| def quick_eval(self, hypothesis: dict) -> float: | |
| """ | |
| Fast heuristic evaluation without backtesting. | |
| Uses: confidence, number of confirming conditions, school diversity. | |
| """ | |
| conf = hypothesis.get("confidence", 0.5) | |
| n_rules = len(hypothesis.get("rules", [])) | |
| n_schools = len(set(hypothesis.get("schools", []))) | |
| # Score: confidence Γ rule depth Γ school diversity | |
| score = conf * (1 + (n_rules - 1) * 0.1) * (1 + (n_schools - 1) * 0.15) | |
| return min(round(score, 4), 1.0) | |
| def run_research_cycle(self, n_hypotheses: int = 20) -> dict: | |
| """One research cycle: generate β evaluate β register or bury.""" | |
| hypotheses = self.generate_hypotheses(n_hypotheses) | |
| accepted = [] | |
| rejected = [] | |
| threshold = 0.60 | |
| for hyp in hypotheses: | |
| score = self.quick_eval(hyp) | |
| hyp["eval_score"] = score | |
| if score >= threshold: | |
| hyp["status"] = "candidate" | |
| self._register(hyp) | |
| accepted.append(hyp["strategy_id"]) | |
| else: | |
| self._bury(hyp["strategy_id"], f"low_score={score:.3f}") | |
| rejected.append(hyp["strategy_id"]) | |
| graveyard_size = self._graveyard.get("count", 0) | |
| log.info(f"π¬ Research: {len(accepted)}/{n_hypotheses} accepted | " | |
| f"registry={len(self._registry)} | graveyard={graveyard_size}") | |
| return { | |
| "accepted": len(accepted), | |
| "rejected": len(rejected), | |
| "registry_size": len(self._registry), | |
| "graveyard_size": graveyard_size, | |
| } | |
| def get_best_candidates(self, n: int = 5) -> list[dict]: | |
| return self._registry[:n] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # T7 β FORGE CONTROLLER (Closed-loop brain) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ForgeController: | |
| """ | |
| The brain of the lΓ² luyα»n. | |
| Runs 24/7 as background thread in Coordinator Space. | |
| Automatically progresses through all training stages. | |
| No manual intervention needed. | |
| """ | |
| THRESHOLDS = { | |
| "min_experiences_for_training": 20_000, | |
| "min_sharpe_expert": 0.5, | |
| "min_experts_for_meta": 3, | |
| "min_sharpe_meta": 1.5, | |
| } | |
| def __init__(self): | |
| self.exam = ExamEngine() | |
| self.lab = StrategyResearchLab() | |
| self._state_file = "forge_state.json" | |
| self._state = self._load_state() | |
| def _load_state(self) -> dict: | |
| try: | |
| with open(self._state_file) as f: return json.load(f) | |
| except Exception: | |
| return {"phase": "collecting", "ready_experts": [], "meta_ready": False, | |
| "last_checked": None, "cycle": 0} | |
| def _save_state(self): | |
| self._state["last_checked"] = datetime.now(timezone.utc).isoformat() | |
| with open(self._state_file, "w") as f: | |
| json.dump(self._state, f, indent=2) | |
| def _count_experiences(self) -> int: | |
| try: | |
| from huggingface_hub import list_repo_files | |
| files = [f for f in list_repo_files(EXPERIENCE_REPO, repo_type="dataset", token=HF_TOKEN) | |
| if f.startswith("experiences/") and f.endswith(".parquet")] | |
| return len(files) * 200 # estimate | |
| except Exception: return 0 | |
| def _get_ready_experts(self) -> list[str]: | |
| try: | |
| from model_registry import get_best_sharpe | |
| ready = [] | |
| for strat in ["momentum","mean_reversion","smart_money","scalping"]: | |
| for eid in ["E1","E2","E3","E4","E5"]: | |
| name = f"{strat}_{eid.lower()}" | |
| sh = get_best_sharpe(name) | |
| if sh >= self.THRESHOLDS["min_sharpe_expert"]: | |
| ready.append(name) | |
| return ready | |
| except Exception: return [] | |
| def _seed_task(self, task_type: str, params: dict = None): | |
| try: | |
| import requests | |
| payload = { | |
| "task_type": task_type, "priority": 3, | |
| "params": params or {}, | |
| "task_id": f"auto_{task_type.lower()}_{datetime.now().strftime('%Y%m%d_%H%M')}", | |
| } | |
| r = requests.post(f"{COORD_URL}/admin/add_task", json=payload, timeout=10) | |
| log.info(f"π€ Seeded {task_type}: HTTP {r.status_code}") | |
| except Exception as e: | |
| log.warning(f"Seed {task_type} failed: {e}") | |
| def _restart_dead_worker(self, space_name: str): | |
| """Restart a dead HF Space worker via API.""" | |
| try: | |
| import requests | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| r = requests.post( | |
| f"https://huggingface.co/api/spaces/{space_name}/restart", | |
| headers=headers, timeout=15 | |
| ) | |
| log.info(f" π Restart {space_name}: HTTP {r.status_code}") | |
| return r.status_code == 200 | |
| except Exception as e: | |
| log.warning(f" β οΈ Restart {space_name} failed: {e}") | |
| return False | |
| def self_heal(self) -> dict: | |
| """ | |
| T7 Self-healing: check worker health via coordinator heartbeat. | |
| Restart workers that havenβt reported in > 30 minutes. | |
| """ | |
| healed = [] | |
| try: | |
| import requests | |
| r = requests.get(f"{COORD_URL}/worker_status", timeout=10) | |
| if r.status_code != 200: | |
| return {"status": "coord_unreachable"} | |
| workers = r.json().get("workers", {}) | |
| now = time.time() | |
| for wid, info in workers.items(): | |
| last_seen = info.get("last_seen", 0) | |
| age_min = (now - last_seen) / 60 | |
| if age_min > 30: # dead if no heartbeat for 30+ min | |
| space = f"gionuibk/HPLL-Worker-{wid.zfill(2)}" | |
| log.warning(f" π¨ Worker {wid} dead ({age_min:.0f}min) β restarting {space}") | |
| ok = self._restart_dead_worker(space) | |
| if ok: | |
| healed.append(wid) | |
| except Exception as e: | |
| log.warning(f"Self-heal check failed: {e}") | |
| if healed: | |
| log.info(f" β Healed {len(healed)} workers: {healed}") | |
| return {"healed": healed, "n_healed": len(healed)} | |
| def _load_recent_experiences(self, max_rows: int = 5000) -> pd.DataFrame | None: | |
| """Load sample of recent experience data for deep research.""" | |
| try: | |
| from huggingface_hub import list_repo_files, hf_hub_download | |
| files = sorted([ | |
| f for f in list_repo_files(EXPERIENCE_REPO, repo_type="dataset", token=HF_TOKEN) | |
| if f.startswith("experiences/") and f.endswith(".parquet") | |
| ])[-20:] # last 20 files | |
| dfs = [] | |
| for fp in files: | |
| local = hf_hub_download(repo_id=EXPERIENCE_REPO, filename=fp, | |
| repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/exp_cache") | |
| dfs.append(pd.read_parquet(local)) | |
| if not dfs: return None | |
| df = pd.concat(dfs, ignore_index=True).fillna(0.0) | |
| return df.sample(min(max_rows, len(df)), random_state=42) | |
| except Exception as e: | |
| log.warning(f"Load experiences: {e}") | |
| return None | |
| def assess_and_act(self) -> dict: | |
| """ | |
| One iteration of the control loop. | |
| Check state β decide action β execute. | |
| """ | |
| self._state["cycle"] = self._state.get("cycle", 0) + 1 | |
| n_exp = self._count_experiences() | |
| ready = self._get_ready_experts() | |
| action = "idle" | |
| log.info(f"π Cycle {self._state['cycle']} | exp={n_exp:,} | ready_experts={len(ready)}") | |
| # PHASE: Not enough data yet | |
| if n_exp < self.THRESHOLDS["min_experiences_for_training"]: | |
| log.info(f" β³ Collecting data ({n_exp:,}/{self.THRESHOLDS['min_experiences_for_training']:,})") | |
| action = "collecting" | |
| # PHASE: Enough data β train experts | |
| elif len(ready) < 5: | |
| log.info(f" π Triggering expert training ({len(ready)}/5 ready)") | |
| for strat in ["momentum","mean_reversion","smart_money","scalping"]: | |
| self._seed_task(f"TRAIN_EXPERT_{strat}", {"strategy": strat}) | |
| action = "training_experts" | |
| # PHASE: Enough experts β train meta | |
| elif len(ready) >= self.THRESHOLDS["min_experts_for_meta"] and not self._state.get("meta_ready"): | |
| log.info(f" π§ Triggering Meta training ({len(ready)} experts ready)") | |
| self._seed_task("TRAIN_META") | |
| action = "training_meta" | |
| # PHASE: Running β deep research + self-heal | |
| else: | |
| log.info(f" π¬ Running: deep research + self-heal") | |
| # Research | |
| df_exp = self._load_recent_experiences() | |
| if df_exp is not None and len(df_exp) > 200: | |
| try: | |
| from hypothesis_backtester import run_deep_research_cycle | |
| res = run_deep_research_cycle(df_exp) | |
| log.info(f" π Research: {res}") | |
| except Exception as e: | |
| log.warning(f" Research failed: {e}") | |
| # Self-heal | |
| heal = self.self_heal() | |
| action = "running" | |
| self._state["phase"] = action | |
| self._state["ready_experts"] = ready | |
| self._state["n_experiences"] = n_exp | |
| self._save_state() | |
| return {"action": action, "n_exp": n_exp, "ready": len(ready)} | |
| def run_forge_controller(interval: int = 900): | |
| """Main loop β runs every 15 minutes in Coordinator Space.""" | |
| log.info(f"π ForgeController started | interval={interval}s") | |
| controller = ForgeController() | |
| while True: | |
| try: | |
| result = controller.assess_and_act() | |
| log.info(f"β Cycle done: {result}") | |
| except Exception as e: | |
| log.error(f"β Forge error: {e}", exc_info=True) | |
| time.sleep(interval) | |