| |
| """ |
| S11 Experiment Runner β Isolated Evaluation Server |
| ===================================================== |
| Turns S11 from a clone of S10 into a dedicated experiment server. |
| Agents (Eve, CrewAI, etc.) submit experiments to Supabase queue. |
| S11 polls the queue, evaluates in isolation (walk-forward backtest), |
| and stores results back. S10's population is NEVER touched. |
| |
| Experiment Types: |
| - feature_test: Test specific feature mask β evaluate with walk-forward |
| - model_test: Test specific model_type + hyperparams β evaluate |
| - calibration_test: Test calibration method on current best features |
| - config_change: Test GA config by running mini-evolution (5 gens) |
| |
| Queue: Supabase table `nba_experiments` |
| status: pending β running β completed | failed |
| """ |
|
|
| import os |
| import sys |
| import json |
| import time |
| import random |
| import traceback |
| import threading |
| import numpy as np |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Optional, Dict, Any |
|
|
| |
| |
| |
| |
| |
|
|
| def _default_log(msg, level="INFO"): |
| print(f"[{level}] {msg}") |
|
|
| Individual = None |
| evaluate = None |
| _build = None |
| _prune_correlated_features = None |
| _log_loss_score = None |
| _ece = None |
| _evaluate_stacking = None |
| load_all_games = None |
| build_features = None |
| pull_seasons = None |
| log = _default_log |
| live = {} |
| FAST_EVAL_GAMES = 7000 |
| DATA_DIR = Path("/data") if Path("/data").exists() else Path("data") |
| HIST_DIR = DATA_DIR / "historical" |
| STATE_DIR = DATA_DIR / "evolution-state" |
| RESULTS_DIR = DATA_DIR / "results" |
|
|
| def init_from_app(): |
| """Grab references from the already-loaded app module. Call once at startup.""" |
| import sys as _sys |
| app_mod = _sys.modules.get('app') or _sys.modules.get('__main__') |
| if app_mod is None: |
| raise RuntimeError("app module not loaded") |
|
|
| global Individual, evaluate, _build, _prune_correlated_features |
| global _log_loss_score, _ece, _evaluate_stacking |
| global load_all_games, build_features, pull_seasons, log, live |
| global FAST_EVAL_GAMES, DATA_DIR, HIST_DIR, STATE_DIR, RESULTS_DIR |
|
|
| Individual = getattr(app_mod, 'Individual') |
| evaluate = getattr(app_mod, 'evaluate') |
| _build = getattr(app_mod, '_build') |
| _prune_correlated_features = getattr(app_mod, '_prune_correlated_features') |
| _log_loss_score = getattr(app_mod, '_log_loss_score') |
| _ece = getattr(app_mod, '_ece') |
| _evaluate_stacking = getattr(app_mod, '_evaluate_stacking') |
| load_all_games = getattr(app_mod, 'load_all_games') |
| build_features = getattr(app_mod, 'build_features') |
| pull_seasons = getattr(app_mod, 'pull_seasons') |
| log = getattr(app_mod, 'log') |
| live = getattr(app_mod, 'live') |
| FAST_EVAL_GAMES = getattr(app_mod, 'FAST_EVAL_GAMES', 7000) |
| DATA_DIR = getattr(app_mod, 'DATA_DIR', DATA_DIR) |
| HIST_DIR = getattr(app_mod, 'HIST_DIR', HIST_DIR) |
| STATE_DIR = getattr(app_mod, 'STATE_DIR', STATE_DIR) |
| RESULTS_DIR = getattr(app_mod, 'RESULTS_DIR', RESULTS_DIR) |
|
|
| |
| POLL_INTERVAL = 60 |
| MAX_EVAL_GAMES = 7000 |
| MINI_EVO_GENS = 5 |
| MINI_EVO_POP = 20 |
| EXPERIMENT_TIMEOUT = 1800 |
|
|
| |
| _pg_pool = None |
|
|
|
|
| def _get_pg(): |
| """Lazy PostgreSQL connection pool for Supabase.""" |
| global _pg_pool |
| if _pg_pool is not None: |
| return _pg_pool |
| db_url = os.environ.get("DATABASE_URL", "") |
| if not db_url: |
| log("[EXPERIMENT] DATABASE_URL not set β cannot connect to Supabase", "ERROR") |
| return None |
| try: |
| import psycopg2 |
| from psycopg2 import pool as pg_pool |
| _pg_pool = pg_pool.SimpleConnectionPool(1, 3, db_url, options="-c search_path=public") |
| conn = _pg_pool.getconn() |
| with conn.cursor() as cur: |
| cur.execute("SELECT 1") |
| _pg_pool.putconn(conn) |
| log("[EXPERIMENT] PostgreSQL connected to Supabase OK") |
| return _pg_pool |
| except Exception as e: |
| log(f"[EXPERIMENT] PostgreSQL connection failed: {e}", "ERROR") |
| _pg_pool = None |
| return None |
|
|
|
|
| def _exec_sql(sql, params=None, fetch=True): |
| """Execute SQL on Supabase. Returns rows on SELECT, True on INSERT/UPDATE, None on failure.""" |
| pool = _get_pg() |
| if not pool: |
| return None |
| conn = None |
| try: |
| conn = pool.getconn() |
| with conn.cursor() as cur: |
| cur.execute(sql, params) |
| conn.commit() |
| if fetch: |
| try: |
| return cur.fetchall() |
| except Exception: |
| return True |
| return True |
| except Exception as e: |
| log(f"[EXPERIMENT] SQL error: {e}", "ERROR") |
| if conn: |
| try: |
| conn.rollback() |
| except Exception: |
| pass |
| return None |
| finally: |
| if conn and pool: |
| try: |
| pool.putconn(conn) |
| except Exception: |
| pass |
|
|
|
|
| def _reconnect_pg(): |
| """Force reconnection on next call (e.g., after connection timeout).""" |
| global _pg_pool |
| if _pg_pool: |
| try: |
| _pg_pool.closeall() |
| except Exception: |
| pass |
| _pg_pool = None |
|
|
|
|
| |
| |
| |
|
|
| def fetch_next_experiment() -> Optional[Dict[str, Any]]: |
| """Fetch AND atomically claim the next pending experiment from Supabase. |
| |
| Returns dict with all columns, or None if queue empty. |
| Uses CTE with FOR UPDATE SKIP LOCKED to prevent double-pickup. |
| """ |
| rows = _exec_sql(""" |
| WITH next_exp AS ( |
| SELECT id FROM public.nba_experiments |
| WHERE status = 'pending' |
| AND (target_space IS NULL OR target_space = 'S11' OR target_space = 'any') |
| ORDER BY priority DESC, created_at ASC |
| LIMIT 1 |
| FOR UPDATE SKIP LOCKED |
| ) |
| UPDATE public.nba_experiments e |
| SET status = 'running', started_at = NOW() |
| FROM next_exp |
| WHERE e.id = next_exp.id |
| RETURNING e.id, e.experiment_id, e.agent_name, e.experiment_type, |
| e.description, e.hypothesis, e.params, e.priority, |
| e.status, e.target_space, e.baseline_brier, e.created_at |
| """) |
| if not rows or rows is True or len(rows) == 0: |
| return None |
| row = rows[0] |
| return { |
| "id": row[0], |
| "experiment_id": row[1], |
| "agent_name": row[2], |
| "experiment_type": row[3], |
| "description": row[4], |
| "hypothesis": row[5], |
| "params": row[6] if isinstance(row[6], dict) else json.loads(row[6]) if row[6] else {}, |
| "priority": row[7], |
| "status": row[8], |
| "target_space": row[9], |
| "baseline_brier": row[10], |
| "created_at": str(row[11]) if row[11] else None, |
| } |
|
|
|
|
| def claim_experiment(exp_id: int) -> bool: |
| """Legacy claim β now handled atomically in fetch_next_experiment(), kept for compatibility.""" |
| return True |
|
|
|
|
| def _sanitize_for_json(obj): |
| """Convert numpy types to native Python for JSON serialization.""" |
| if isinstance(obj, dict): |
| return {k: _sanitize_for_json(v) for k, v in obj.items()} |
| if isinstance(obj, (list, tuple)): |
| return [_sanitize_for_json(v) for v in obj] |
| if hasattr(obj, 'item'): |
| return obj.item() |
| return obj |
|
|
|
|
| def complete_experiment(exp_id: int, brier: float, accuracy: float, |
| log_loss_val: float, details: dict, status: str = "completed"): |
| """Write results back to Supabase.""" |
| clean_details = _sanitize_for_json(details) |
| _exec_sql(""" |
| UPDATE public.nba_experiments |
| SET status = %s, |
| result_brier = %s, |
| result_accuracy = %s, |
| result_log_loss = %s, |
| result_details = %s, |
| feature_engine_version = %s, |
| completed_at = NOW() |
| WHERE id = %s |
| """, (status, float(brier), float(accuracy), float(log_loss_val), |
| json.dumps(clean_details), "v3.0-35cat-6000feat", int(exp_id)), fetch=False) |
|
|
|
|
| def fail_experiment(exp_id: int, error_msg: str): |
| """Mark experiment as failed with error details.""" |
| details = {"error": error_msg[:2000], "failed_at": datetime.now(timezone.utc).isoformat()} |
| _exec_sql(""" |
| UPDATE public.nba_experiments |
| SET status = 'failed', |
| result_details = %s, |
| completed_at = NOW() |
| WHERE id = %s |
| """, (json.dumps(details), exp_id), fetch=False) |
|
|
|
|
| |
| |
| |
|
|
| def _make_individual(n_features: int, params: dict) -> Individual: |
| """Create an Individual from experiment params. |
| |
| params can contain: |
| - features: list of ints (feature mask) or list of feature indices |
| - feature_indices: list of ints (indices to enable) |
| - hyperparams: dict of hyperparams (merged with defaults) |
| - model_type: str (shortcut for hyperparams.model_type) |
| - calibration: str (shortcut for hyperparams.calibration) |
| """ |
| ind = Individual(n_features, target=params.get("target_features", 80)) |
|
|
| |
| if "features" in params: |
| feat = params["features"] |
| if len(feat) == n_features: |
| ind.features = [int(f) for f in feat] |
| else: |
| |
| ind.features = [0] * n_features |
| for idx in feat: |
| if 0 <= idx < n_features: |
| ind.features[idx] = 1 |
|
|
| |
| if "feature_indices" in params: |
| ind.features = [0] * n_features |
| for idx in params["feature_indices"]: |
| if 0 <= idx < n_features: |
| ind.features[idx] = 1 |
|
|
| |
| if "hyperparams" in params: |
| for k, v in params["hyperparams"].items(): |
| if k in ind.hyperparams: |
| ind.hyperparams[k] = v |
|
|
| |
| if "model_type" in params: |
| ind.hyperparams["model_type"] = params["model_type"] |
| if "calibration" in params: |
| ind.hyperparams["calibration"] = params["calibration"] |
|
|
| ind.n_features = sum(ind.features) |
| return ind |
|
|
|
|
| def run_feature_test(experiment: dict, X: np.ndarray, y: np.ndarray, |
| feature_names: list) -> dict: |
| """Test specific feature configuration with walk-forward evaluation. |
| |
| params should contain: |
| - features or feature_indices: which features to enable |
| - hyperparams (optional): model hyperparams |
| - model_type (optional): which model to use (default: xgboost) |
| - n_splits (optional): number of walk-forward splits (default: 3) |
| """ |
| params = experiment["params"] |
| n_features = X.shape[1] |
|
|
| ind = _make_individual(n_features, params) |
|
|
| |
| if "features" not in params and "feature_indices" not in params: |
| |
| best = _load_best_individual(n_features) |
| if best: |
| ind.features = list(best.features) |
| ind.n_features = sum(ind.features) |
|
|
| n_splits = params.get("n_splits", 3) |
| fast = params.get("fast", False) |
|
|
| log(f"[EXPERIMENT] feature_test: {ind.n_features} features, model={ind.hyperparams['model_type']}, splits={n_splits}") |
|
|
| |
| evaluate(ind, X, y, n_splits=n_splits, fast=fast) |
|
|
| return { |
| "brier": ind.fitness.get("brier", 1.0), |
| "roi": ind.fitness.get("roi", 0.0), |
| "sharpe": ind.fitness.get("sharpe", 0.0), |
| "calibration": ind.fitness.get("calibration", 1.0), |
| "composite": ind.fitness.get("composite", 0.0), |
| "features_pruned": ind.fitness.get("features_pruned", 0), |
| "n_features_selected": ind.n_features, |
| "model_type": ind.hyperparams["model_type"], |
| "hyperparams": ind.hyperparams, |
| } |
|
|
|
|
| def run_model_test(experiment: dict, X: np.ndarray, y: np.ndarray, |
| feature_names: list) -> dict: |
| """Test specific model type and hyperparams. |
| |
| params should contain: |
| - model_type: str (xgboost, lightgbm, catboost, random_forest, extra_trees, stacking, mlp) |
| - hyperparams (optional): full or partial hyperparams dict |
| - features or feature_indices (optional): use specific features |
| """ |
| params = experiment["params"] |
| n_features = X.shape[1] |
|
|
| |
| best = _load_best_individual(n_features) |
| ind = _make_individual(n_features, params) |
|
|
| if best and "features" not in params and "feature_indices" not in params: |
| ind.features = list(best.features) |
| ind.n_features = sum(ind.features) |
|
|
| |
| if "model_type" not in params: |
| raise ValueError("model_test requires 'model_type' in params") |
|
|
| ind.hyperparams["model_type"] = params["model_type"] |
| n_splits = params.get("n_splits", 3) |
| fast = params.get("fast", False) |
|
|
| log(f"[EXPERIMENT] model_test: model={ind.hyperparams['model_type']}, " |
| f"{ind.n_features} features, splits={n_splits}") |
|
|
| evaluate(ind, X, y, n_splits=n_splits, fast=fast) |
|
|
| return { |
| "brier": ind.fitness.get("brier", 1.0), |
| "roi": ind.fitness.get("roi", 0.0), |
| "sharpe": ind.fitness.get("sharpe", 0.0), |
| "calibration": ind.fitness.get("calibration", 1.0), |
| "composite": ind.fitness.get("composite", 0.0), |
| "features_pruned": ind.fitness.get("features_pruned", 0), |
| "n_features_selected": ind.n_features, |
| "model_type": ind.hyperparams["model_type"], |
| "hyperparams": ind.hyperparams, |
| } |
|
|
|
|
| def run_calibration_test(experiment: dict, X: np.ndarray, y: np.ndarray, |
| feature_names: list) -> dict: |
| """Test calibration method on current best features. |
| |
| params should contain: |
| - calibration: str (isotonic, sigmoid, none) |
| - model_type (optional): override model type |
| """ |
| params = experiment["params"] |
| n_features = X.shape[1] |
|
|
| best = _load_best_individual(n_features) |
| if not best: |
| raise ValueError("No best individual found β cannot run calibration_test without baseline features") |
|
|
| |
| calibration = params.get("calibration", "isotonic") |
| methods_to_test = ["isotonic", "sigmoid", "none"] if calibration == "all" else [calibration] |
|
|
| results = {} |
| best_brier = 1.0 |
| best_method = None |
|
|
| for method in methods_to_test: |
| ind = Individual.__new__(Individual) |
| ind.features = list(best.features) |
| ind.hyperparams = dict(best.hyperparams) |
| ind.fitness = {"brier": 1.0, "roi": 0.0, "sharpe": 0.0, "calibration": 1.0, "composite": 0.0} |
| ind.generation = 0 |
| ind.n_features = sum(ind.features) |
| ind.hyperparams["calibration"] = method |
|
|
| if "model_type" in params: |
| ind.hyperparams["model_type"] = params["model_type"] |
|
|
| n_splits = params.get("n_splits", 3) |
| log(f"[EXPERIMENT] calibration_test: method={method}, model={ind.hyperparams['model_type']}") |
|
|
| evaluate(ind, X, y, n_splits=n_splits, fast=False) |
|
|
| results[method] = { |
| "brier": ind.fitness.get("brier", 1.0), |
| "roi": ind.fitness.get("roi", 0.0), |
| "sharpe": ind.fitness.get("sharpe", 0.0), |
| "calibration": ind.fitness.get("calibration", 1.0), |
| "composite": ind.fitness.get("composite", 0.0), |
| } |
|
|
| if ind.fitness.get("brier", 1.0) < best_brier: |
| best_brier = ind.fitness["brier"] |
| best_method = method |
|
|
| return { |
| "brier": best_brier, |
| "best_method": best_method, |
| "all_results": results, |
| "n_features_selected": best.n_features, |
| "model_type": best.hyperparams.get("model_type", "unknown"), |
| } |
|
|
|
|
| def run_config_change(experiment: dict, X: np.ndarray, y: np.ndarray, |
| feature_names: list) -> dict: |
| """Test GA config change by running a mini-evolution. |
| |
| params should contain any of: |
| - pop_size: int (default 20) |
| - mutation_rate: float |
| - crossover_rate: float |
| - tournament_size: int |
| - target_features: int |
| - n_generations: int (default 5) |
| - elite_size: int |
| """ |
| params = experiment["params"] |
| n_features = X.shape[1] |
|
|
| pop_size = min(params.get("pop_size", MINI_EVO_POP), 30) |
| n_gens = min(params.get("n_generations", MINI_EVO_GENS), 10) |
| mutation_rate = params.get("mutation_rate", 0.04) |
| crossover_rate = params.get("crossover_rate", 0.80) |
| tournament_size = min(params.get("tournament_size", 4), pop_size // 2) |
| target_features = params.get("target_features", 80) |
| elite_size = min(params.get("elite_size", 3), pop_size // 3) |
|
|
| log(f"[EXPERIMENT] config_change: pop={pop_size}, gens={n_gens}, " |
| f"mut={mutation_rate}, cx={crossover_rate}, target_feat={target_features}") |
|
|
| |
| population = [] |
| model_types = ["xgboost", "lightgbm", "catboost", "random_forest", "extra_trees", "stacking", "mlp"] |
| for i in range(pop_size): |
| ind = Individual(n_features, target=target_features) |
| ind.hyperparams["model_type"] = model_types[i % len(model_types)] |
| population.append(ind) |
|
|
| |
| best = _load_best_individual(n_features) |
| if best and len(population) > 0: |
| population[0] = best |
|
|
| history = [] |
| best_ever_brier = 1.0 |
| best_ever_composite = 0.0 |
|
|
| for gen in range(n_gens): |
| |
| for ind in population: |
| evaluate(ind, X, y, n_splits=2, fast=True) |
|
|
| |
| population.sort(key=lambda x: x.fitness.get("composite", 0), reverse=True) |
| gen_best = population[0] |
|
|
| gen_brier = gen_best.fitness.get("brier", 1.0) |
| gen_composite = gen_best.fitness.get("composite", 0.0) |
| if gen_brier < best_ever_brier: |
| best_ever_brier = gen_brier |
| if gen_composite > best_ever_composite: |
| best_ever_composite = gen_composite |
|
|
| history.append({ |
| "generation": gen, |
| "best_brier": gen_brier, |
| "best_composite": gen_composite, |
| "avg_brier": round(np.mean([p.fitness.get("brier", 1.0) for p in population]), 5), |
| "avg_composite": round(np.mean([p.fitness.get("composite", 0) for p in population]), 5), |
| }) |
|
|
| log(f"[EXPERIMENT] config_change gen {gen}: best_brier={gen_brier:.4f}, " |
| f"composite={gen_composite:.4f}") |
|
|
| |
| elites = population[:elite_size] |
| new_pop = list(elites) |
|
|
| while len(new_pop) < pop_size: |
| |
| candidates = random.sample(population, min(tournament_size, len(population))) |
| p1 = max(candidates, key=lambda x: x.fitness.get("composite", 0)) |
| candidates = random.sample(population, min(tournament_size, len(population))) |
| p2 = max(candidates, key=lambda x: x.fitness.get("composite", 0)) |
|
|
| if random.random() < crossover_rate: |
| child = Individual.crossover(p1, p2) |
| else: |
| child = Individual(n_features, target=target_features) |
|
|
| child.mutate(rate=mutation_rate) |
| new_pop.append(child) |
|
|
| population = new_pop[:pop_size] |
|
|
| |
| final_best = max(population, key=lambda x: x.fitness.get("composite", 0)) |
| evaluate(final_best, X, y, n_splits=3, fast=False) |
|
|
| return { |
| "brier": final_best.fitness.get("brier", 1.0), |
| "roi": final_best.fitness.get("roi", 0.0), |
| "sharpe": final_best.fitness.get("sharpe", 0.0), |
| "calibration": final_best.fitness.get("calibration", 1.0), |
| "composite": final_best.fitness.get("composite", 0.0), |
| "best_ever_brier": best_ever_brier, |
| "best_ever_composite": best_ever_composite, |
| "n_generations": n_gens, |
| "pop_size": pop_size, |
| "mutation_rate": mutation_rate, |
| "crossover_rate": crossover_rate, |
| "target_features": target_features, |
| "history": history, |
| "final_model_type": final_best.hyperparams.get("model_type", "unknown"), |
| "final_n_features": final_best.n_features, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def _load_best_individual(n_features: int) -> Optional[Individual]: |
| """Load the best individual from S10's saved state (read-only, never modifies).""" |
| state_file = STATE_DIR / "population.json" |
| if not state_file.exists(): |
| return None |
| try: |
| st = json.loads(state_file.read_text()) |
| if not st.get("best_ever"): |
| return None |
| be = st["best_ever"] |
| ind = Individual.__new__(Individual) |
| ind.features = be["features"] |
| ind.hyperparams = be["hyperparams"] |
| ind.fitness = be["fitness"] |
| ind.generation = be.get("generation", 0) |
| ind.n_features = sum(ind.features) |
|
|
| |
| if len(ind.features) < n_features: |
| ind.features.extend([0] * (n_features - len(ind.features))) |
| elif len(ind.features) > n_features: |
| ind.features = ind.features[:n_features] |
| ind.n_features = sum(ind.features) |
|
|
| return ind |
| except Exception as e: |
| log(f"[EXPERIMENT] Failed to load best individual: {e}", "WARN") |
| return None |
|
|
|
|
| def _compute_accuracy(ind: Individual, X: np.ndarray, y: np.ndarray) -> float: |
| """Compute accuracy for an individual (separate from evaluate's fitness).""" |
| try: |
| from sklearn.model_selection import TimeSeriesSplit |
| from sklearn.metrics import accuracy_score |
| from sklearn.base import clone |
| from sklearn.calibration import CalibratedClassifierCV |
|
|
| selected = ind.selected_indices() |
| if len(selected) < 15: |
| return 0.0 |
|
|
| X_sub = np.nan_to_num(X[:, selected], nan=0.0, posinf=1e6, neginf=-1e6) |
| X_sub, _ = _prune_correlated_features(X_sub, threshold=0.95) |
|
|
| hp = ind.hyperparams |
| model = _build(hp) |
| if model is None: |
| return 0.0 |
|
|
| tscv = TimeSeriesSplit(n_splits=3) |
| accs = [] |
| for ti, vi in tscv.split(X_sub): |
| try: |
| m = clone(model) |
| if hp.get("calibration", "none") != "none": |
| m = CalibratedClassifierCV(m, method=hp["calibration"], cv=2) |
| m.fit(X_sub[ti], y[ti]) |
| preds = m.predict(X_sub[vi]) |
| accs.append(accuracy_score(y[vi], preds)) |
| except Exception: |
| pass |
| return round(float(np.mean(accs)), 4) if accs else 0.0 |
| except Exception: |
| return 0.0 |
|
|
|
|
| |
| |
| |
|
|
| |
| _current_experiment = None |
| _queue_depth = 0 |
| _experiments_completed = 0 |
| _experiments_failed = 0 |
| _last_result = None |
|
|
|
|
| EXECUTORS = { |
| "feature_test": run_feature_test, |
| "model_test": run_model_test, |
| "calibration_test": run_calibration_test, |
| "config_change": run_config_change, |
| } |
|
|
|
|
| def run_experiment(experiment: dict, X: np.ndarray, y: np.ndarray, |
| feature_names: list) -> dict: |
| """Route an experiment to the correct executor and return results.""" |
| global _current_experiment, _experiments_completed, _experiments_failed, _last_result |
|
|
| exp_type = experiment["experiment_type"] |
| exp_id = experiment["id"] |
| _current_experiment = experiment |
|
|
| log(f"[EXPERIMENT] === Starting: {experiment['experiment_id']} ===") |
| log(f"[EXPERIMENT] Type: {exp_type} | Agent: {experiment['agent_name']} | Priority: {experiment['priority']}") |
| log(f"[EXPERIMENT] Description: {experiment['description'][:200]}") |
|
|
| executor = EXECUTORS.get(exp_type) |
| if not executor: |
| error_msg = f"Unknown experiment type: {exp_type}. Valid: {list(EXECUTORS.keys())}" |
| fail_experiment(exp_id, error_msg) |
| _experiments_failed += 1 |
| _current_experiment = None |
| raise ValueError(error_msg) |
|
|
| |
| if not claim_experiment(exp_id): |
| _current_experiment = None |
| raise RuntimeError(f"Experiment {exp_id} already claimed by another runner") |
|
|
| start_time = time.time() |
| try: |
| results = executor(experiment, X, y, feature_names) |
| elapsed = time.time() - start_time |
|
|
| brier = float(results.get("brier", 1.0)) |
| accuracy = float(_compute_accuracy( |
| _make_individual(X.shape[1], experiment["params"]), X, y |
| )) if exp_type != "config_change" else float(results.get("accuracy", 0.0)) |
|
|
| |
| log_loss_val = float(results.get("log_loss", 0.0)) |
|
|
| results["elapsed_seconds"] = round(elapsed, 1) |
| results["games_evaluated"] = min(X.shape[0], MAX_EVAL_GAMES) |
| results["feature_candidates"] = X.shape[1] |
| results["experiment_id"] = experiment["experiment_id"] |
| results["agent_name"] = experiment["agent_name"] |
|
|
| |
| baseline = experiment.get("baseline_brier") |
| if baseline: |
| results["improvement"] = round(baseline - brier, 5) |
| results["improved"] = brier < baseline |
|
|
| complete_experiment(exp_id, brier, accuracy, log_loss_val, results) |
| _experiments_completed += 1 |
| _last_result = results |
|
|
| log(f"[EXPERIMENT] === Completed: {experiment['experiment_id']} ===") |
| log(f"[EXPERIMENT] Brier: {brier:.4f} | Elapsed: {elapsed:.1f}s") |
| if baseline: |
| delta = baseline - brier |
| log(f"[EXPERIMENT] vs baseline {baseline:.4f}: {'BETTER' if delta > 0 else 'WORSE'} by {abs(delta):.4f}") |
|
|
| return results |
|
|
| except Exception as e: |
| elapsed = time.time() - start_time |
| error_msg = f"{str(e)[:500]}\n{traceback.format_exc()[-1000:]}" |
| fail_experiment(exp_id, error_msg) |
| _experiments_failed += 1 |
| log(f"[EXPERIMENT] === FAILED: {experiment['experiment_id']} ({elapsed:.1f}s) ===", "ERROR") |
| log(f"[EXPERIMENT] Error: {str(e)[:300]}", "ERROR") |
| raise |
| finally: |
| _current_experiment = None |
|
|
|
|
| |
| |
| |
|
|
| def experiment_loop(): |
| """Main experiment polling loop β runs in background thread on S11. |
| |
| 1. Load game data + build features (same as evolution_loop init) |
| 2. Poll Supabase every 60s for pending experiments |
| 3. Execute one at a time, write results back |
| 4. Never modifies S10's population or state |
| """ |
| global _queue_depth |
|
|
| |
| init_from_app() |
|
|
| log("=" * 60) |
| log("S11 EXPERIMENT RUNNER β STARTING") |
| log("=" * 60) |
|
|
| |
| live["status"] = "EXPERIMENT: LOADING DATA" |
| pull_seasons() |
| games = load_all_games() |
| live["games"] = len(games) |
| log(f"[EXPERIMENT] Games loaded: {len(games)}") |
|
|
| if len(games) < 500: |
| log("[EXPERIMENT] NOT ENOUGH GAMES β aborting", "ERROR") |
| live["status"] = "EXPERIMENT: ERROR (no data)" |
| return |
|
|
| |
| live["status"] = "EXPERIMENT: BUILDING FEATURES" |
| X, y, feature_names = build_features(games) |
| log(f"[EXPERIMENT] Raw feature matrix: {X.shape}") |
|
|
| |
| variances = np.var(X, axis=0) |
| valid_mask = variances > 1e-10 |
| n_removed = int((~valid_mask).sum()) |
| if n_removed > 0: |
| X = X[:, valid_mask] |
| feature_names = [f for f, v in zip(feature_names, valid_mask) if v] |
| log(f"[EXPERIMENT] Noise filter: removed {n_removed} zero-variance features") |
|
|
| n_feat = X.shape[1] |
| live["feature_candidates"] = n_feat |
| log(f"[EXPERIMENT] Clean feature matrix: {X.shape} ({n_feat} usable features)") |
|
|
| |
| if X.shape[0] > MAX_EVAL_GAMES: |
| log(f"[EXPERIMENT] Capping to {MAX_EVAL_GAMES} most recent games (OOM protection)") |
| X = X[-MAX_EVAL_GAMES:] |
| y = y[-MAX_EVAL_GAMES:] |
|
|
| |
| live["status"] = "EXPERIMENT: READY (polling)" |
| log(f"[EXPERIMENT] Ready β polling every {POLL_INTERVAL}s for experiments") |
| consecutive_errors = 0 |
|
|
| while True: |
| try: |
| experiment = fetch_next_experiment() |
|
|
| if experiment: |
| _queue_depth = _count_pending() |
| live["status"] = f"EXPERIMENT: RUNNING ({experiment['experiment_type']})" |
| log(f"[EXPERIMENT] Found experiment: {experiment['experiment_id']} " |
| f"(type={experiment['experiment_type']}, queue={_queue_depth})") |
|
|
| try: |
| run_experiment(experiment, X, y, feature_names) |
| except Exception as e: |
| log(f"[EXPERIMENT] Experiment failed: {e}", "ERROR") |
|
|
| live["status"] = "EXPERIMENT: READY (polling)" |
| consecutive_errors = 0 |
|
|
| |
| if (_experiments_completed + _experiments_failed) % 10 == 0: |
| try: |
| log("[EXPERIMENT] Refreshing game data...") |
| new_games = load_all_games() |
| if len(new_games) > len(games): |
| games = new_games |
| X_new, y_new, fn_new = build_features(games) |
| variances = np.var(X_new, axis=0) |
| valid_mask = variances > 1e-10 |
| X_new = X_new[:, valid_mask] |
| fn_new = [f for f, v in zip(fn_new, valid_mask) if v] |
| if X_new.shape[0] > MAX_EVAL_GAMES: |
| X_new = X_new[-MAX_EVAL_GAMES:] |
| y_new = y_new[-MAX_EVAL_GAMES:] |
| X, y, feature_names = X_new, y_new, fn_new |
| n_feat = X.shape[1] |
| log(f"[EXPERIMENT] Data refreshed: {X.shape}") |
| except Exception as e: |
| log(f"[EXPERIMENT] Data refresh failed (continuing with old): {e}", "WARN") |
|
|
| else: |
| |
| time.sleep(POLL_INTERVAL) |
|
|
| except Exception as e: |
| consecutive_errors += 1 |
| log(f"[EXPERIMENT] Poll error #{consecutive_errors}: {e}", "ERROR") |
|
|
| if consecutive_errors >= 5: |
| log("[EXPERIMENT] 5 consecutive errors β reconnecting to Supabase", "WARN") |
| _reconnect_pg() |
| consecutive_errors = 0 |
|
|
| time.sleep(POLL_INTERVAL * 2) |
|
|
|
|
| def _count_pending() -> int: |
| """Count pending experiments in queue.""" |
| rows = _exec_sql(""" |
| SELECT COUNT(*) FROM public.nba_experiments |
| WHERE status = 'pending' |
| AND (target_space IS NULL OR target_space = 'S11' OR target_space = 'any') |
| """) |
| if rows and rows is not True and len(rows) > 0: |
| return rows[0][0] |
| return 0 |
|
|
|
|
| |
| |
| |
|
|
| def register_experiment_endpoints(api): |
| """Register experiment-related FastAPI endpoints on the control_api.""" |
| from fastapi import Request |
| from fastapi.responses import JSONResponse |
|
|
| @api.get("/api/experiment/status") |
| async def experiment_status(): |
| """Current experiment runner status.""" |
| return JSONResponse({ |
| "mode": "experiment_runner", |
| "status": live.get("status", "unknown"), |
| "current_experiment": { |
| "experiment_id": _current_experiment["experiment_id"], |
| "type": _current_experiment["experiment_type"], |
| "agent": _current_experiment["agent_name"], |
| "description": _current_experiment["description"][:200], |
| } if _current_experiment else None, |
| "queue_depth": _queue_depth, |
| "experiments_completed": _experiments_completed, |
| "experiments_failed": _experiments_failed, |
| "last_result": { |
| "experiment_id": _last_result.get("experiment_id"), |
| "brier": _last_result.get("brier"), |
| "improvement": _last_result.get("improvement"), |
| "elapsed_seconds": _last_result.get("elapsed_seconds"), |
| } if _last_result else None, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| }) |
|
|
| @api.post("/api/experiment/run") |
| async def experiment_run_direct(request: Request): |
| """Submit and immediately run an experiment (bypasses queue). |
| |
| Body: { |
| "experiment_type": "feature_test|model_test|calibration_test|config_change", |
| "description": "...", |
| "params": { ... }, |
| "agent_name": "direct_api" |
| } |
| """ |
| try: |
| body = await request.json() |
|
|
| exp_type = body.get("experiment_type") |
| if exp_type not in EXECUTORS: |
| return JSONResponse( |
| {"error": f"Invalid experiment_type. Valid: {list(EXECUTORS.keys())}"}, |
| status_code=400 |
| ) |
|
|
| |
| exp_id_str = f"direct-{int(time.time())}" |
| _exec_sql(""" |
| INSERT INTO public.nba_experiments |
| (experiment_id, agent_name, experiment_type, description, hypothesis, |
| params, priority, status, target_space, feature_engine_version) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending', 'S11', %s) |
| RETURNING id |
| """, ( |
| exp_id_str, |
| body.get("agent_name", "direct_api"), |
| exp_type, |
| body.get("description", "Direct API submission"), |
| body.get("hypothesis", ""), |
| json.dumps(body.get("params", {})), |
| body.get("priority", 10), |
| "v3.0-35cat-6000feat", |
| )) |
|
|
| |
| rows = _exec_sql(""" |
| SELECT id, experiment_id, agent_name, experiment_type, description, |
| hypothesis, params, priority, status, target_space, |
| baseline_brier, created_at |
| FROM public.nba_experiments |
| WHERE experiment_id = %s |
| ORDER BY id DESC LIMIT 1 |
| """, (exp_id_str,)) |
|
|
| if not rows or rows is True: |
| return JSONResponse({"error": "Failed to insert experiment"}, status_code=500) |
|
|
| row = rows[0] |
| experiment = { |
| "id": row[0], "experiment_id": row[1], "agent_name": row[2], |
| "experiment_type": row[3], "description": row[4], "hypothesis": row[5], |
| "params": row[6] if isinstance(row[6], dict) else json.loads(row[6]) if row[6] else {}, |
| "priority": row[7], "status": row[8], "target_space": row[9], |
| "baseline_brier": row[10], "created_at": str(row[11]) if row[11] else None, |
| } |
|
|
| |
| |
| return JSONResponse({ |
| "status": "queued_for_poll", |
| "experiment_id": exp_id_str, |
| "message": "Experiment inserted into queue. S11 will pick it up on next poll cycle.", |
| }) |
|
|
| except Exception as e: |
| return JSONResponse({"error": str(e)[:500]}, status_code=500) |
|
|
| @api.post("/api/experiment/submit") |
| async def experiment_submit(request: Request): |
| """Submit an experiment to the Supabase queue (non-blocking). |
| |
| Body: { |
| "experiment_id": "optional-custom-id", |
| "agent_name": "eve|crew-research|crew-feature|manual", |
| "experiment_type": "feature_test|model_test|calibration_test|config_change", |
| "description": "What this experiment tests", |
| "hypothesis": "Expected outcome", |
| "params": { ... }, |
| "priority": 5, |
| "baseline_brier": 0.2205 |
| } |
| """ |
| try: |
| body = await request.json() |
|
|
| exp_type = body.get("experiment_type") |
| if exp_type not in EXECUTORS: |
| return JSONResponse( |
| {"error": f"Invalid experiment_type. Valid: {list(EXECUTORS.keys())}"}, |
| status_code=400 |
| ) |
|
|
| if not body.get("description"): |
| return JSONResponse({"error": "description is required"}, status_code=400) |
|
|
| exp_id_str = body.get("experiment_id", f"submit-{int(time.time())}") |
|
|
| result = _exec_sql(""" |
| INSERT INTO public.nba_experiments |
| (experiment_id, agent_name, experiment_type, description, hypothesis, |
| params, priority, status, target_space, baseline_brier, feature_engine_version) |
| VALUES (%s, %s, %s, %s, %s, %s, %s, 'pending', %s, %s, %s) |
| RETURNING id |
| """, ( |
| exp_id_str, |
| body.get("agent_name", "unknown"), |
| exp_type, |
| body["description"], |
| body.get("hypothesis", ""), |
| json.dumps(body.get("params", {})), |
| body.get("priority", 5), |
| body.get("target_space", "S11"), |
| body.get("baseline_brier"), |
| body.get("feature_engine_version", "v3.0-35cat-6000feat"), |
| )) |
|
|
| if result is None: |
| return JSONResponse({"error": "Failed to insert β Supabase error"}, status_code=500) |
|
|
| db_id = result[0][0] if result and result is not True else None |
|
|
| return JSONResponse({ |
| "status": "queued", |
| "id": db_id, |
| "experiment_id": exp_id_str, |
| "message": f"Experiment queued. S11 polls every {POLL_INTERVAL}s.", |
| }) |
|
|
| except Exception as e: |
| return JSONResponse({"error": str(e)[:500]}, status_code=500) |
|
|
| @api.get("/api/experiment/results") |
| async def experiment_results(): |
| """Fetch recent experiment results from Supabase.""" |
| rows = _exec_sql(""" |
| SELECT experiment_id, agent_name, experiment_type, status, |
| result_brier, result_accuracy, result_details, |
| created_at, completed_at |
| FROM public.nba_experiments |
| ORDER BY id DESC |
| LIMIT 20 |
| """) |
| if not rows or rows is True: |
| return JSONResponse({"experiments": [], "count": 0}) |
|
|
| experiments = [] |
| for row in rows: |
| experiments.append({ |
| "experiment_id": row[0], |
| "agent_name": row[1], |
| "experiment_type": row[2], |
| "status": row[3], |
| "result_brier": row[4], |
| "result_accuracy": row[5], |
| "result_details": row[6], |
| "created_at": str(row[7]) if row[7] else None, |
| "completed_at": str(row[8]) if row[8] else None, |
| }) |
|
|
| return JSONResponse({"experiments": experiments, "count": len(experiments)}) |
|
|
| log("[EXPERIMENT] FastAPI endpoints registered: /api/experiment/{status,run,submit,results}") |
|
|