HPLL-DataReview / forge_controller.py
gionuibk's picture
T6+T7 complete: forge_controller.py
e5be8c9 verified
"""
T5 β€” Validation Layer: Walk-forward OOS, Regime-specific, Benchmark
T6 β€” Strategy Research Lab: Hypothesis generation + graveyard
T7 β€” Forge Controller: Closed-loop brain running on Coordinator Space
"""
from __future__ import annotations
import os, json, math, logging, time
from datetime import datetime, timezone
from pathlib import Path
import numpy as np
import pandas as pd
log = logging.getLogger("forge_controller")
HF_TOKEN = os.environ.get("HF_TOKEN", "")
EXPERIENCE_REPO = os.environ.get("EXPERIENCE_REPO", "gionuibk/aetheris-experiences")
MODEL_REPO = os.environ.get("MODEL_REPO", "gionuibk/aetheris-models")
COORD_URL = os.environ.get("COORDINATOR_URL", "https://gionuibk-hpll-datareview.hf.space")
WORKER_SPACES = [f"gionuibk/HPLL-Worker-{i:02d}" for i in range(40)] # all 40 workers
# ══════════════════════════════════════════════════════════════════
# T5 β€” EXAM ENGINE (Walk-forward validation)
# ══════════════════════════════════════════════════════════════════
class ExamEngine:
"""
Validates a model using walk-forward OOS windows.
'Thi cα»­' β€” thi trΓͺn data chΖ°a thαΊ₯y bao giờ.
"""
N_WINDOWS = 5 # 5-fold walk-forward
MIN_PASS_SHARPE = 0.8
def walk_forward_validate(self, model, df: pd.DataFrame,
feat_cols: list, name: str = "") -> dict:
"""
Split data into N time-ordered folds.
Each fold: train on first k, test on k+1.
No lookahead bias.
"""
df = df.sort_values("timestamp") if "timestamp" in df.columns else df
n = len(df)
window_size = n // (self.N_WINDOWS + 1)
if window_size < 50:
return {"status": "insufficient_data", "n": n}
fold_metrics = []
for i in range(self.N_WINDOWS):
train_end = window_size * (i + 1)
test_end = min(train_end + window_size, n)
df_test = df.iloc[train_end:test_end]
if len(df_test) < 20:
continue
for c in feat_cols:
if c not in df_test.columns:
df_test = df_test.copy()
df_test[c] = 0.0
X_test = df_test[feat_cols].fillna(0).values.astype(np.float32)
y_test = df_test["label"].values.astype(np.int32)
try:
preds = model.predict(X_test)
probs = model.predict_proba(X_test)
acc = float(np.mean(preds == y_test))
conf = np.max(probs, axis=1)
sharpe = (float(np.mean(conf)) - 0.5) / (float(np.std(conf)) + 1e-9) * math.sqrt(len(conf))
# Win rate on non-flat predictions
non_flat = [(t, p) for t, p in zip(y_test, preds) if p != 0]
win_rate = sum(t == p for t, p in non_flat) / max(len(non_flat), 1)
fold_metrics.append({"fold": i, "acc": acc, "sharpe": sharpe, "win_rate": win_rate, "n": len(y_test)})
except Exception as e:
log.warning(f"Fold {i} error: {e}")
if not fold_metrics:
return {"status": "no_folds"}
avg_sharpe = sum(m["sharpe"] for m in fold_metrics) / len(fold_metrics)
avg_acc = sum(m["acc"] for m in fold_metrics) / len(fold_metrics)
avg_wr = sum(m["win_rate"] for m in fold_metrics) / len(fold_metrics)
passed = avg_sharpe >= self.MIN_PASS_SHARPE
return {
"status": "pass" if passed else "fail",
"avg_sharpe": round(avg_sharpe, 4),
"avg_accuracy":round(avg_acc, 4),
"avg_win_rate":round(avg_wr, 4),
"n_folds": len(fold_metrics),
"fold_details":fold_metrics,
"model_name": name,
}
def benchmark_vs_random(self, model, X_test, y_test) -> dict:
"""Compare model Sharpe vs random baseline."""
try:
preds = model.predict(X_test)
acc = float(np.mean(preds == y_test))
# Random baseline
random_preds = np.random.choice([-1, 0, 1], size=len(y_test))
random_acc = float(np.mean(random_preds == y_test))
edge = acc - random_acc
return {"model_acc": round(acc,4), "random_acc": round(random_acc,4), "edge": round(edge,4)}
except Exception as e:
return {"error": str(e)}
def regime_validate(self, model, df: pd.DataFrame, feat_cols: list) -> dict:
"""Test model performance in each detected regime."""
results = {}
# Detect regime from features
for regime_name, condition_fn in [
("TRENDING", lambda d: d["adx"] > 25 if "adx" in d.columns else pd.Series([True]*len(d))),
("RANGING", lambda d: d["adx"] < 20 if "adx" in d.columns else pd.Series([True]*len(d))),
]:
try:
mask = condition_fn(df)
sub = df[mask]
if len(sub) < 30:
continue
for c in feat_cols:
if c not in sub.columns:
sub = sub.copy(); sub[c] = 0.0
X = sub[feat_cols].fillna(0).values.astype(np.float32)
y = sub["label"].values
preds = model.predict(X)
acc = float(np.mean(preds == y))
results[regime_name] = {"acc": round(acc,4), "n": len(y)}
except Exception:
pass
return results
# ══════════════════════════════════════════════════════════════════
# T6 β€” STRATEGY RESEARCH LAB (Full version)
# ══════════════════════════════════════════════════════════════════
GRAVEYARD_FILE = "strategy_graveyard.json"
REGISTRY_FILE = "strategy_registry.json"
CANDIDATE_LIMIT = 50 # max candidates in registry
class StrategyResearchLab:
"""
Auto-discovers new strategy combinations.
Generates hypotheses from Theory Registry rules.
Tests them quickly. Saves winners. Remembers losers.
"""
def __init__(self):
self._graveyard: dict = self._load_json(GRAVEYARD_FILE, {"failed": {}, "count": 0})
self._registry: list = self._load_json(REGISTRY_FILE, [])
def _load_json(self, path: str, default):
try:
with open(path) as f: return json.load(f)
except Exception: return default
def _save_json(self, path: str, data):
with open(path, "w") as f: json.dump(data, f, indent=2)
def _in_graveyard(self, strategy_id: str) -> bool:
return strategy_id in self._graveyard.get("failed", {})
def _bury(self, strategy_id: str, reason: str):
self._graveyard.setdefault("failed", {})[strategy_id] = {
"reason": reason, "ts": datetime.now(timezone.utc).isoformat()
}
self._graveyard["count"] = len(self._graveyard["failed"])
self._save_json(GRAVEYARD_FILE, self._graveyard)
def _register(self, strategy: dict):
self._registry.append(strategy)
# Keep only top candidates by score
self._registry.sort(key=lambda x: x.get("eval_score", 0), reverse=True)
self._registry = self._registry[:CANDIDATE_LIMIT]
self._save_json(REGISTRY_FILE, self._registry)
def generate_hypotheses(self, n: int = 20) -> list[dict]:
"""Generate n candidate strategies by combining theory rules."""
try:
from theory_registry import get_all_rules, get_schools
except ImportError:
return []
import random
rules = get_all_rules()
schools = [s for s in get_schools() if s != "risk_management"]
hypotheses = []
for _ in range(n * 3): # generate 3Γ— more to account for graveyard rejects
if len(hypotheses) >= n:
break
# Pick 2 complementary schools
s1, s2 = random.sample(schools, 2)
rules_s1 = [r for r in rules if r["school"] == s1 and r["label"] != 0]
rules_s2 = [r for r in rules if r["school"] == s2 and r["label"] != 0]
if not rules_s1 or not rules_s2:
continue
r1 = random.choice(rules_s1)
r2 = random.choice(rules_s2)
# Only combine rules with same direction
if r1["label"] != r2["label"]:
continue
sid = f"{r1['theory_id']}__{r2['theory_id']}"
if self._in_graveyard(sid):
continue
# Find common regimes
regimes = set(r1.get("regime", ["ANY"])) & set(r2.get("regime", ["ANY"]))
if not regimes:
regimes = {"TRENDING", "RANGING"}
avg_conf = (r1["confidence"] + r2["confidence"]) / 2
hypotheses.append({
"strategy_id": sid,
"label": r1["label"],
"schools": [s1, s2],
"rules": [r1["theory_id"], r2["theory_id"]],
"regime": list(regimes),
"confidence": round(avg_conf, 3),
"timeframe": r1.get("timeframe", "any"),
"created_at": datetime.now(timezone.utc).isoformat(),
"status": "hypothesis",
})
return hypotheses
def quick_eval(self, hypothesis: dict) -> float:
"""
Fast heuristic evaluation without backtesting.
Uses: confidence, number of confirming conditions, school diversity.
"""
conf = hypothesis.get("confidence", 0.5)
n_rules = len(hypothesis.get("rules", []))
n_schools = len(set(hypothesis.get("schools", [])))
# Score: confidence Γ— rule depth Γ— school diversity
score = conf * (1 + (n_rules - 1) * 0.1) * (1 + (n_schools - 1) * 0.15)
return min(round(score, 4), 1.0)
def run_research_cycle(self, n_hypotheses: int = 20) -> dict:
"""One research cycle: generate β†’ evaluate β†’ register or bury."""
hypotheses = self.generate_hypotheses(n_hypotheses)
accepted = []
rejected = []
threshold = 0.60
for hyp in hypotheses:
score = self.quick_eval(hyp)
hyp["eval_score"] = score
if score >= threshold:
hyp["status"] = "candidate"
self._register(hyp)
accepted.append(hyp["strategy_id"])
else:
self._bury(hyp["strategy_id"], f"low_score={score:.3f}")
rejected.append(hyp["strategy_id"])
graveyard_size = self._graveyard.get("count", 0)
log.info(f"πŸ”¬ Research: {len(accepted)}/{n_hypotheses} accepted | "
f"registry={len(self._registry)} | graveyard={graveyard_size}")
return {
"accepted": len(accepted),
"rejected": len(rejected),
"registry_size": len(self._registry),
"graveyard_size": graveyard_size,
}
def get_best_candidates(self, n: int = 5) -> list[dict]:
return self._registry[:n]
# ══════════════════════════════════════════════════════════════════
# T7 β€” FORGE CONTROLLER (Closed-loop brain)
# ══════════════════════════════════════════════════════════════════
class ForgeController:
"""
The brain of the lΓ² luyện.
Runs 24/7 as background thread in Coordinator Space.
Automatically progresses through all training stages.
No manual intervention needed.
"""
THRESHOLDS = {
"min_experiences_for_training": 20_000,
"min_sharpe_expert": 0.5,
"min_experts_for_meta": 3,
"min_sharpe_meta": 1.5,
}
def __init__(self):
self.exam = ExamEngine()
self.lab = StrategyResearchLab()
self._state_file = "forge_state.json"
self._state = self._load_state()
def _load_state(self) -> dict:
try:
with open(self._state_file) as f: return json.load(f)
except Exception:
return {"phase": "collecting", "ready_experts": [], "meta_ready": False,
"last_checked": None, "cycle": 0}
def _save_state(self):
self._state["last_checked"] = datetime.now(timezone.utc).isoformat()
with open(self._state_file, "w") as f:
json.dump(self._state, f, indent=2)
def _count_experiences(self) -> int:
try:
from huggingface_hub import list_repo_files
files = [f for f in list_repo_files(EXPERIENCE_REPO, repo_type="dataset", token=HF_TOKEN)
if f.startswith("experiences/") and f.endswith(".parquet")]
return len(files) * 200 # estimate
except Exception: return 0
def _get_ready_experts(self) -> list[str]:
try:
from model_registry import get_best_sharpe
ready = []
for strat in ["momentum","mean_reversion","smart_money","scalping"]:
for eid in ["E1","E2","E3","E4","E5"]:
name = f"{strat}_{eid.lower()}"
sh = get_best_sharpe(name)
if sh >= self.THRESHOLDS["min_sharpe_expert"]:
ready.append(name)
return ready
except Exception: return []
def _seed_task(self, task_type: str, params: dict = None):
try:
import requests
payload = {
"task_type": task_type, "priority": 3,
"params": params or {},
"task_id": f"auto_{task_type.lower()}_{datetime.now().strftime('%Y%m%d_%H%M')}",
}
r = requests.post(f"{COORD_URL}/admin/add_task", json=payload, timeout=10)
log.info(f"πŸ“€ Seeded {task_type}: HTTP {r.status_code}")
except Exception as e:
log.warning(f"Seed {task_type} failed: {e}")
def _restart_dead_worker(self, space_name: str):
"""Restart a dead HF Space worker via API."""
try:
import requests
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
r = requests.post(
f"https://huggingface.co/api/spaces/{space_name}/restart",
headers=headers, timeout=15
)
log.info(f" πŸ”„ Restart {space_name}: HTTP {r.status_code}")
return r.status_code == 200
except Exception as e:
log.warning(f" ⚠️ Restart {space_name} failed: {e}")
return False
def self_heal(self) -> dict:
"""
T7 Self-healing: check worker health via coordinator heartbeat.
Restart workers that haven’t reported in > 30 minutes.
"""
healed = []
try:
import requests
r = requests.get(f"{COORD_URL}/worker_status", timeout=10)
if r.status_code != 200:
return {"status": "coord_unreachable"}
workers = r.json().get("workers", {})
now = time.time()
for wid, info in workers.items():
last_seen = info.get("last_seen", 0)
age_min = (now - last_seen) / 60
if age_min > 30: # dead if no heartbeat for 30+ min
space = f"gionuibk/HPLL-Worker-{wid.zfill(2)}"
log.warning(f" 🚨 Worker {wid} dead ({age_min:.0f}min) β€” restarting {space}")
ok = self._restart_dead_worker(space)
if ok:
healed.append(wid)
except Exception as e:
log.warning(f"Self-heal check failed: {e}")
if healed:
log.info(f" βœ… Healed {len(healed)} workers: {healed}")
return {"healed": healed, "n_healed": len(healed)}
def _load_recent_experiences(self, max_rows: int = 5000) -> pd.DataFrame | None:
"""Load sample of recent experience data for deep research."""
try:
from huggingface_hub import list_repo_files, hf_hub_download
files = sorted([
f for f in list_repo_files(EXPERIENCE_REPO, repo_type="dataset", token=HF_TOKEN)
if f.startswith("experiences/") and f.endswith(".parquet")
])[-20:] # last 20 files
dfs = []
for fp in files:
local = hf_hub_download(repo_id=EXPERIENCE_REPO, filename=fp,
repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/exp_cache")
dfs.append(pd.read_parquet(local))
if not dfs: return None
df = pd.concat(dfs, ignore_index=True).fillna(0.0)
return df.sample(min(max_rows, len(df)), random_state=42)
except Exception as e:
log.warning(f"Load experiences: {e}")
return None
def assess_and_act(self) -> dict:
"""
One iteration of the control loop.
Check state β†’ decide action β†’ execute.
"""
self._state["cycle"] = self._state.get("cycle", 0) + 1
n_exp = self._count_experiences()
ready = self._get_ready_experts()
action = "idle"
log.info(f"πŸ”„ Cycle {self._state['cycle']} | exp={n_exp:,} | ready_experts={len(ready)}")
# PHASE: Not enough data yet
if n_exp < self.THRESHOLDS["min_experiences_for_training"]:
log.info(f" ⏳ Collecting data ({n_exp:,}/{self.THRESHOLDS['min_experiences_for_training']:,})")
action = "collecting"
# PHASE: Enough data β†’ train experts
elif len(ready) < 5:
log.info(f" πŸŽ“ Triggering expert training ({len(ready)}/5 ready)")
for strat in ["momentum","mean_reversion","smart_money","scalping"]:
self._seed_task(f"TRAIN_EXPERT_{strat}", {"strategy": strat})
action = "training_experts"
# PHASE: Enough experts β†’ train meta
elif len(ready) >= self.THRESHOLDS["min_experts_for_meta"] and not self._state.get("meta_ready"):
log.info(f" 🧠 Triggering Meta training ({len(ready)} experts ready)")
self._seed_task("TRAIN_META")
action = "training_meta"
# PHASE: Running β€” deep research + self-heal
else:
log.info(f" πŸ”¬ Running: deep research + self-heal")
# Research
df_exp = self._load_recent_experiences()
if df_exp is not None and len(df_exp) > 200:
try:
from hypothesis_backtester import run_deep_research_cycle
res = run_deep_research_cycle(df_exp)
log.info(f" πŸ“Š Research: {res}")
except Exception as e:
log.warning(f" Research failed: {e}")
# Self-heal
heal = self.self_heal()
action = "running"
self._state["phase"] = action
self._state["ready_experts"] = ready
self._state["n_experiences"] = n_exp
self._save_state()
return {"action": action, "n_exp": n_exp, "ready": len(ready)}
def run_forge_controller(interval: int = 900):
"""Main loop β€” runs every 15 minutes in Coordinator Space."""
log.info(f"🏭 ForgeController started | interval={interval}s")
controller = ForgeController()
while True:
try:
result = controller.assess_and_act()
log.info(f"βœ… Cycle done: {result}")
except Exception as e:
log.error(f"❌ Forge error: {e}", exc_info=True)
time.sleep(interval)