| """
|
| evaluator.py — Benchmark & Statistical Analysis Pipeline (DAHS_2)
|
|
|
| Port from DAHS_1 evaluator.py + extensions:
|
| - 300 test seeds (99000-99299) × 9 methods
|
| - Statistical tests: Friedman, Nemenyi, Wilcoxon, Cohen's d, Bootstrap CI
|
| - NEW: Switching analysis (evaluations, switches, hysteresis rate, distribution)
|
| - NEW: JSON export for frontend Results page
|
| - 11 dark-theme plots
|
|
|
| Statistical Methodology References
|
| -----------------------------------
|
| - Friedman non-parametric test for k ≥ 3 related samples:
|
| Friedman, M. (1940). A comparison of alternative tests of significance
|
| for the problem of m rankings. Annals of Mathematical Statistics, 11(1), 86-92.
|
| Recommended protocol for ML comparison:
|
| Demsar, J. (2006). Statistical comparisons of classifiers over multiple
|
| data sets. Journal of Machine Learning Research, 7, 1-30.
|
|
|
| - Nemenyi post-hoc pairwise test (Critical Difference diagram):
|
| Nemenyi, P. (1963). Distribution-free multiple comparisons.
|
| PhD thesis, Princeton University.
|
| Applied per: Demsar (2006), JMLR 7:1-30.
|
|
|
| - Wilcoxon signed-rank test (pairwise DAHS vs each baseline):
|
| Wilcoxon, F. (1945). Individual comparisons by ranking methods.
|
| Biometrics Bulletin, 1(6), 80-83. doi:10.2307/3001968.
|
|
|
| - Cohen's d effect size:
|
| Cohen, J. (1988). Statistical Power Analysis for the Behavioral
|
| Sciences. Lawrence Erlbaum Associates (2nd ed.).
|
| d > 0.2 small, d > 0.5 medium, d > 0.8 large.
|
|
|
| - Holm-Bonferroni multiple comparison correction:
|
| Holm, S. (1979). A simple sequentially rejective multiple test
|
| procedure. Scandinavian Journal of Statistics, 6(2), 65-70.
|
|
|
| - Bootstrap 95% CI (5,000 resamples):
|
| Efron, B. & Tibshirani, R.J. (1993). An Introduction to the
|
| Bootstrap. Chapman & Hall.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import json
|
| import logging
|
| import math
|
| import warnings
|
| from pathlib import Path
|
| from typing import Any, Dict, List, Optional, Tuple
|
|
|
| import matplotlib
|
| matplotlib.use("Agg")
|
| import matplotlib.pyplot as plt
|
| import numpy as np
|
| import pandas as pd
|
| from scipy import stats
|
|
|
| warnings.filterwarnings("ignore")
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| RESULTS_DIR = Path(__file__).parent.parent / "results"
|
| PLOTS_DIR = RESULTS_DIR / "plots"
|
| MODELS_DIR = Path(__file__).parent.parent / "models"
|
|
|
| HEURISTIC_NAMES = ["fifo", "priority_edd", "critical_ratio", "atc", "wspt", "slack"]
|
| HEURISTIC_LABELS = ["FIFO", "Priority-EDD", "Critical-Ratio", "ATC", "WSPT", "Slack"]
|
|
|
| DARK_BG = "#0f1117"
|
| DARK_AX = "#1a1d27"
|
| TEXT_COL = "#e0e0e0"
|
|
|
| COLORS = ["#4fc3f7", "#81c784", "#ffb74d", "#e57373", "#ce93d8", "#80cbc4",
|
| "#fff176", "#ff8a65", "#90caf9", "#f48fb1"]
|
|
|
|
|
| def _dark_fig(figsize=(12, 7)):
|
| fig, ax = plt.subplots(figsize=figsize)
|
| fig.patch.set_facecolor(DARK_BG)
|
| ax.set_facecolor(DARK_AX)
|
| ax.tick_params(colors=TEXT_COL)
|
| ax.xaxis.label.set_color(TEXT_COL)
|
| ax.yaxis.label.set_color(TEXT_COL)
|
| ax.title.set_color(TEXT_COL)
|
| for spine in ax.spines.values():
|
| spine.set_color("#333344")
|
| return fig, ax
|
|
|
|
|
| def _dark_fig_multi(rows=1, cols=2, figsize=(16, 7)):
|
| fig, axes = plt.subplots(rows, cols, figsize=figsize)
|
| fig.patch.set_facecolor(DARK_BG)
|
| for ax in np.array(axes).flatten():
|
| ax.set_facecolor(DARK_AX)
|
| ax.tick_params(colors=TEXT_COL)
|
| ax.xaxis.label.set_color(TEXT_COL)
|
| ax.yaxis.label.set_color(TEXT_COL)
|
| ax.title.set_color(TEXT_COL)
|
| for spine in ax.spines.values():
|
| spine.set_color("#333344")
|
| return fig, axes
|
|
|
|
|
| def _norm_min_max(arr: np.ndarray) -> np.ndarray:
|
| r = arr.max() - arr.min()
|
| if r < 1e-10:
|
| return np.zeros_like(arr)
|
| return (arr - arr.min()) / r
|
|
|
|
|
|
|
|
|
|
|
|
|
| def run_benchmark(
|
| seeds: Optional[List[int]] = None,
|
| n_workers: int = 4,
|
| save_csv: bool = True,
|
| ) -> pd.DataFrame:
|
| """Run benchmark across all seeds × 9 methods.
|
|
|
| Methods:
|
| 0-5: 6 baselines (FIFO, Priority-EDD, CR, ATC, WSPT, Slack)
|
| 6: Hybrid-Priority (GBR)
|
| 7: DAHS-RF (Random Forest selector)
|
| 8: DAHS-XGB (XGBoost selector)
|
| """
|
| import multiprocessing as mp
|
| from tqdm import tqdm
|
|
|
| if seeds is None:
|
| seeds = list(range(99000, 99300))
|
|
|
| RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
| PLOTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
| logger.info("Running benchmark: %d seeds × 9 methods", len(seeds))
|
|
|
| all_args = [(seed,) for seed in seeds]
|
|
|
| rows = []
|
| ctx = mp.get_context("spawn")
|
| with ctx.Pool(processes=n_workers) as pool:
|
| for result in tqdm(
|
| pool.imap_unordered(_benchmark_single_seed, all_args),
|
| total=len(all_args),
|
| desc="Benchmark",
|
| ):
|
| rows.extend(result)
|
|
|
| df = pd.DataFrame(rows)
|
| logger.info("Benchmark complete: %s rows", len(df))
|
|
|
| if save_csv:
|
| path = RESULTS_DIR / "benchmark_results.csv"
|
| df.to_csv(path, index=False)
|
| logger.info("Saved -> %s", path)
|
|
|
| return df
|
|
|
|
|
| def _row(seed: int, method: str, m: Any, elapsed: float) -> Dict[str, Any]:
|
| """Build one benchmark row from a SimulationMetrics + wall-clock seconds.
|
|
|
| Wall-clock matters for paper review: a method that wins on tardiness but
|
| is 50× slower than ATC isn't deployable. We capture it on every row so
|
| "DAHS adds X ms per dispatch" claims are backed by data, not asserted.
|
| """
|
| util_vals = list(m.zone_utilization.values())
|
| return {
|
| "seed": seed,
|
| "method": method,
|
| "makespan": m.makespan,
|
| "total_tardiness": m.total_tardiness,
|
| "sla_breach_rate": m.sla_breach_rate,
|
| "avg_cycle_time": m.avg_cycle_time,
|
| "zone_utilization_avg": float(np.mean(util_vals)) if util_vals else 0.0,
|
| "throughput": m.throughput,
|
| "queue_max": m.queue_max,
|
| "completed_jobs": m.completed_jobs,
|
| "elapsed_seconds": round(float(elapsed), 4),
|
| }
|
|
|
|
|
| def _benchmark_single_seed(args: Tuple) -> List[Dict[str, Any]]:
|
| """Worker: run all methods on one seed and return their metric rows."""
|
| (seed,) = args
|
| import time as _time
|
| from src.heuristics import (
|
| fifo_dispatch, priority_edd_dispatch, critical_ratio_dispatch,
|
| atc_dispatch, wspt_dispatch, slack_dispatch,
|
| )
|
| from src.simulator import WarehouseSimulator
|
| from src.features import FeatureExtractor
|
|
|
| rows: List[Dict[str, Any]] = []
|
| methods = [
|
| ("fifo", fifo_dispatch),
|
| ("priority_edd", priority_edd_dispatch),
|
| ("critical_ratio", critical_ratio_dispatch),
|
| ("atc", atc_dispatch),
|
| ("wspt", wspt_dispatch),
|
| ("slack", slack_dispatch),
|
| ]
|
|
|
|
|
|
|
|
|
|
|
| baseline_metrics: Dict[str, Any] = {}
|
|
|
| for method_name, heur_fn in methods:
|
| try:
|
| fe = FeatureExtractor()
|
| sim = WarehouseSimulator(seed=seed, heuristic_fn=heur_fn, feature_extractor=fe)
|
| t0 = _time.perf_counter()
|
| m = sim.run(duration=600.0)
|
| elapsed = _time.perf_counter() - t0
|
| rows.append(_row(seed, method_name, m, elapsed))
|
| baseline_metrics[method_name] = m
|
| except Exception as e:
|
| logger.warning("[%s] %s failed: %s", seed, method_name, e)
|
|
|
|
|
|
|
|
|
| if baseline_metrics:
|
| winner_name = min(
|
| baseline_metrics,
|
| key=lambda k: baseline_metrics[k].total_tardiness,
|
| )
|
| wm = baseline_metrics[winner_name]
|
| rows.append({
|
| **_row(seed, "best_fixed_oracle", wm, 0.0),
|
| "best_fixed_winner": winner_name,
|
| })
|
|
|
|
|
|
|
|
|
|
|
| for model_name in ("rf", "xgb"):
|
| model_path = MODELS_DIR / f"selector_{model_name}.joblib"
|
| if not model_path.exists():
|
| continue
|
| try:
|
| import joblib
|
| from src.hybrid_scheduler import BatchwiseSelector, RollingHorizonOracle
|
|
|
| model = joblib.load(model_path)
|
|
|
|
|
| fe = FeatureExtractor()
|
| selector = BatchwiseSelector(model=model, feature_extractor=fe)
|
| sim = WarehouseSimulator(seed=seed, heuristic_fn=fifo_dispatch, feature_extractor=fe)
|
|
|
| def make_dispatch(sel, s):
|
| def _dispatch(jobs, t, zone_id):
|
| sel.update_state(s.get_state_snapshot())
|
| return sel.dispatch(jobs, t, zone_id)
|
| return _dispatch
|
|
|
| sim.heuristic_fn = make_dispatch(selector, sim)
|
| t0 = _time.perf_counter()
|
| m = sim.run(duration=600.0)
|
| rows.append(_row(seed, f"dahs_{model_name}", m, _time.perf_counter() - t0))
|
|
|
|
|
| fe2 = FeatureExtractor()
|
| oracle = RollingHorizonOracle(ml_model=model, feature_extractor=fe2)
|
| sim2 = WarehouseSimulator(seed=seed, heuristic_fn=fifo_dispatch, feature_extractor=fe2)
|
| oracle.attach_simulator(sim2)
|
| sim2.heuristic_fn = lambda jobs, t, z: oracle.dispatch(jobs, t, z)
|
| t0 = _time.perf_counter()
|
| m2 = sim2.run(duration=600.0)
|
| rows.append(_row(seed, f"dahs_hybrid_{model_name}", m2, _time.perf_counter() - t0))
|
| except Exception as e:
|
| logger.warning("[%s] dahs_%s failed: %s", seed, model_name, e)
|
|
|
|
|
| try:
|
| from src.hybrid_scheduler import RollingHorizonOracle
|
|
|
| feo = FeatureExtractor()
|
| oracle = RollingHorizonOracle(ml_model=None, feature_extractor=None)
|
| simo = WarehouseSimulator(seed=seed, heuristic_fn=fifo_dispatch, feature_extractor=feo)
|
| oracle.attach_simulator(simo)
|
| simo.heuristic_fn = lambda jobs, t, z: oracle.dispatch(jobs, t, z)
|
| t0 = _time.perf_counter()
|
| mo = simo.run(duration=600.0)
|
| rows.append(_row(seed, "dahs_oracle", mo, _time.perf_counter() - t0))
|
| except Exception as e:
|
| logger.warning("[%s] dahs_oracle failed: %s", seed, e)
|
|
|
|
|
|
|
|
|
| priority_path = MODELS_DIR / "priority_gbr.joblib"
|
| if priority_path.exists():
|
| try:
|
| import joblib
|
| from src.hybrid_scheduler import HybridPriority
|
|
|
| fe = FeatureExtractor()
|
| priority = HybridPriority(model_path=priority_path, feature_extractor=fe)
|
| sim = WarehouseSimulator(seed=seed, heuristic_fn=fifo_dispatch, feature_extractor=fe)
|
|
|
| def _priority_dispatch(jobs, t, zone_id):
|
| priority.update_state(sim.get_state_snapshot())
|
| return priority(jobs, t, zone_id)
|
|
|
| sim.heuristic_fn = _priority_dispatch
|
| t0 = _time.perf_counter()
|
| m = sim.run(duration=600.0)
|
| rows.append(_row(seed, "hybrid_priority", m, _time.perf_counter() - t0))
|
| except Exception as e:
|
| logger.warning("[%s] hybrid_priority failed: %s", seed, e)
|
|
|
| return rows
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| METRIC_DIRECTIONS: Dict[str, str] = {
|
| "total_tardiness": "lower",
|
| "sla_breach_rate": "lower",
|
| "avg_cycle_time": "lower",
|
| "makespan": "lower",
|
| "throughput": "higher",
|
| "zone_utilization_avg": "higher",
|
| }
|
|
|
|
|
| def _wilcoxon_for_metric(
|
| pivot: pd.DataFrame,
|
| available_methods: List[str],
|
| dahs_col: str,
|
| metric: str,
|
| direction: str,
|
| ) -> List[Dict[str, Any]]:
|
| """One-sided Wilcoxon DAHS-vs-baseline for a single metric.
|
|
|
| Lower-is-better metrics test H1: baseline > DAHS, so a small p-value means
|
| DAHS is significantly *lower* (better). Higher-is-better metrics test
|
| H1: DAHS > baseline. `diff` is always (better-side - worse-side) so the
|
| resulting Cohen's d is positive when DAHS wins, negative when it loses.
|
| Holm-Bonferroni is applied within each metric family by the caller.
|
| """
|
| rows: List[Dict[str, Any]] = []
|
| if dahs_col not in pivot.columns:
|
| return rows
|
| dahs_vals = pivot[dahs_col].values
|
| for method in available_methods:
|
| if method == dahs_col:
|
| continue
|
| try:
|
| base_vals = pivot[method].values
|
| if direction == "lower":
|
| stat, p = stats.wilcoxon(base_vals, dahs_vals, alternative="greater")
|
| diff = base_vals - dahs_vals
|
| else:
|
| stat, p = stats.wilcoxon(dahs_vals, base_vals, alternative="greater")
|
| diff = dahs_vals - base_vals
|
| d = float(np.mean(diff) / (np.std(diff) + 1e-10))
|
| boot_means = [
|
| np.mean(np.random.choice(diff, size=len(diff), replace=True))
|
| for _ in range(5000)
|
| ]
|
| ci_lo, ci_hi = np.percentile(boot_means, [2.5, 97.5])
|
| rows.append({
|
| "metric": metric,
|
| "direction": direction,
|
| "baseline": method,
|
| "dahs": dahs_col,
|
| "statistic": round(float(stat), 4),
|
| "p_value": float(p),
|
| "significant_holm": False,
|
| "cohens_d": round(d, 4),
|
| "ci_95_lo": round(float(ci_lo), 4),
|
| "ci_95_hi": round(float(ci_hi), 4),
|
| })
|
| except Exception as exc:
|
| logger.warning("Wilcoxon failed for %s on %s: %s", method, metric, exc)
|
| if rows:
|
| ps = [r["p_value"] for r in rows]
|
| n = len(ps)
|
| order = np.argsort(ps)
|
| for rank, idx in enumerate(order):
|
| rows[idx]["significant_holm"] = ps[idx] < (0.05 / (n - rank))
|
| return rows
|
|
|
|
|
| def _nemenyi_critical_difference(k: int, n: int, alpha: float = 0.05) -> float:
|
| """Nemenyi critical-difference for k methods over n datasets at alpha=0.05.
|
|
|
| CD = q_alpha * sqrt(k*(k+1) / (6*n)) per Demsar (2006), JMLR 7:1-30.
|
| """
|
| Q_05 = {
|
| 2: 1.960, 3: 2.343, 4: 2.569, 5: 2.728, 6: 2.850, 7: 2.949,
|
| 8: 3.031, 9: 3.102, 10: 3.164,
|
| }
|
| q = Q_05.get(k, Q_05[10] + 0.05 * (k - 10))
|
| return float(q * math.sqrt(k * (k + 1) / (6.0 * n)))
|
|
|
|
|
| def _nemenyi_pairwise(pivot: pd.DataFrame, available_methods: List[str]) -> Dict[str, Any]:
|
| """Nemenyi pairwise comparisons + critical difference for the primary metric."""
|
| if len(available_methods) < 3 or pivot.shape[0] < 2:
|
| return {"available": False, "reason": "need >=3 methods and >=2 seeds"}
|
|
|
| ranks = pivot[available_methods].rank(axis=1, method="average")
|
| mean_ranks = ranks.mean(axis=0).to_dict()
|
| n_seeds = ranks.shape[0]
|
| k = len(available_methods)
|
| cd = _nemenyi_critical_difference(k, n_seeds)
|
|
|
| matrix: List[Dict[str, Any]] = []
|
| for i, mi in enumerate(available_methods):
|
| for j, mj in enumerate(available_methods):
|
| if j <= i:
|
| continue
|
| diff = abs(mean_ranks[mi] - mean_ranks[mj])
|
| matrix.append({
|
| "method_a": mi,
|
| "method_b": mj,
|
| "rank_a": round(float(mean_ranks[mi]), 4),
|
| "rank_b": round(float(mean_ranks[mj]), 4),
|
| "rank_diff": round(float(diff), 4),
|
| "significant": bool(diff > cd),
|
| })
|
| return {
|
| "available": True,
|
| "alpha": 0.05,
|
| "k": k,
|
| "n_seeds": n_seeds,
|
| "critical_difference": round(cd, 4),
|
| "mean_ranks": {m: round(float(r), 4) for m, r in mean_ranks.items()},
|
| "pairwise": matrix,
|
| }
|
|
|
|
|
| def _plot_critical_difference_diagram(nemenyi: Dict[str, Any]) -> None:
|
| """Render a Demsar-style critical-difference diagram at results/plots/cd_diagram.png."""
|
| if not nemenyi.get("available"):
|
| return
|
| mean_ranks: Dict[str, float] = nemenyi["mean_ranks"]
|
| cd: float = nemenyi["critical_difference"]
|
| methods = sorted(mean_ranks.keys(), key=lambda m: mean_ranks[m])
|
| ranks = [mean_ranks[m] for m in methods]
|
| k = len(methods)
|
|
|
| PLOTS_DIR.mkdir(parents=True, exist_ok=True)
|
| fig, ax = _dark_fig(figsize=(12, 4 + 0.3 * k))
|
| rank_min = min(ranks) - 0.5
|
| rank_max = max(ranks) + 0.5
|
| ax.set_xlim(rank_min, rank_max)
|
| ax.set_ylim(0, k + 1)
|
| ax.invert_xaxis()
|
| ax.get_yaxis().set_visible(False)
|
| for side in ("left", "right", "top"):
|
| ax.spines[side].set_visible(False)
|
|
|
| for i, m in enumerate(methods):
|
| y = k - i
|
| x = mean_ranks[m]
|
| ax.plot([rank_min, x], [y, y], color="#445", linewidth=0.75)
|
| ax.plot([x], [y], "o", color=COLORS[i % len(COLORS)], markersize=8)
|
| ax.text(rank_min - 0.05 * (rank_max - rank_min), y,
|
| f"{m} (rank {x:.2f})",
|
| ha="right", va="center", color=TEXT_COL, fontsize=10)
|
|
|
| cd_y = 0.5
|
| ax.plot([min(ranks), min(ranks) + cd], [cd_y, cd_y], color="#e57373", linewidth=2.5)
|
| ax.text(min(ranks) + cd / 2, cd_y - 0.25,
|
| f"CD = {cd:.3f} (Nemenyi, α=0.05)",
|
| ha="center", va="top", color="#e57373", fontsize=10)
|
|
|
| ax.set_xlabel("Mean rank (lower = better)")
|
| ax.set_title("Critical-Difference Diagram — total_tardiness", color=TEXT_COL, fontsize=13)
|
| plt.tight_layout()
|
| plt.savefig(PLOTS_DIR / "cd_diagram.png", dpi=150, facecolor=DARK_BG)
|
| plt.close()
|
|
|
|
|
| def run_statistical_analysis(df: pd.DataFrame) -> Dict[str, Any]:
|
| """Run Friedman, Nemenyi post-hoc, direction-aware Wilcoxon, Cohen's d.
|
|
|
| See Demsar (2006) JMLR 7:1-30 for the full protocol. The Wilcoxon test is
|
| direction-aware: for lower-is-better metrics the alternative is
|
| H1: baseline > DAHS; for higher-is-better metrics it is H1: DAHS > baseline.
|
| Cohen's d is signed so positive d always means DAHS wins.
|
| Holm-Bonferroni controls FWER within each metric family.
|
| """
|
| methods = sorted(df["method"].unique())
|
|
|
| primary_metric = "total_tardiness"
|
| pivot = df.pivot_table(index="seed", columns="method", values=primary_metric)
|
| pivot.dropna(inplace=True)
|
|
|
| available_methods = [m for m in methods if m in pivot.columns]
|
|
|
| results: Dict[str, Any] = {"primary_metric": primary_metric}
|
|
|
| try:
|
| data_arrays = [pivot[m].values for m in available_methods]
|
| stat, p = stats.friedmanchisquare(*data_arrays)
|
| results["friedman"] = {
|
| "statistic": round(float(stat), 4),
|
| "p_value": float(p),
|
| "significant": bool(p < 0.05),
|
| "metric": primary_metric,
|
| }
|
| logger.info("Friedman test: chi2=%.4f, p=%.6f", stat, p)
|
| except Exception as e:
|
| results["friedman"] = {"error": str(e)}
|
|
|
| try:
|
| nemenyi = _nemenyi_pairwise(pivot, available_methods)
|
| results["nemenyi"] = nemenyi
|
| if nemenyi.get("available"):
|
| _plot_critical_difference_diagram(nemenyi)
|
| logger.info("Nemenyi: CD=%.4f over k=%d methods, n=%d seeds",
|
| nemenyi["critical_difference"], nemenyi["k"], nemenyi["n_seeds"])
|
| except Exception as e:
|
| results["nemenyi"] = {"error": str(e)}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| _priority = [
|
| "dahs_hybrid_xgb", "dahs_hybrid_rf",
|
| "dahs_oracle",
|
| "dahs_xgb", "dahs_rf",
|
| "hybrid_priority",
|
| ]
|
| dahs_col = next((c for c in _priority if c in available_methods), None)
|
| results["headline_method"] = dahs_col
|
| if dahs_col is None:
|
| results["wilcoxon"] = []
|
| results["wilcoxon_secondary"] = {}
|
| results["per_seed_dominance"] = {}
|
| else:
|
| results["wilcoxon"] = _wilcoxon_for_metric(
|
| pivot, available_methods, dahs_col,
|
| primary_metric, METRIC_DIRECTIONS[primary_metric],
|
| )
|
|
|
|
|
|
|
|
|
| dominance: Dict[str, Any] = {"n_seeds": int(pivot.shape[0])}
|
| per_baseline: Dict[str, Dict[str, Any]] = {}
|
| beats_strongest_seeds = 0
|
|
|
|
|
| baseline_only = [m for m in available_methods
|
| if m not in (
|
| "dahs_xgb", "dahs_rf",
|
| "dahs_hybrid_xgb", "dahs_hybrid_rf",
|
| "dahs_oracle", "hybrid_priority",
|
| "best_fixed_oracle",
|
| )]
|
| for method in available_methods:
|
| if method == dahs_col:
|
| continue
|
| wins = int((pivot[dahs_col] < pivot[method]).sum())
|
| ties = int((pivot[dahs_col] == pivot[method]).sum())
|
| per_baseline[method] = {
|
| "wins": wins,
|
| "ties": ties,
|
| "losses": int(pivot.shape[0] - wins - ties),
|
| "win_rate": round(wins / max(pivot.shape[0], 1), 4),
|
| }
|
| if baseline_only:
|
| best_per_seed = pivot[baseline_only].min(axis=1)
|
| beats_strongest_seeds = int((pivot[dahs_col] < best_per_seed).sum())
|
| dominance["wins_vs_best_fixed_per_seed"] = beats_strongest_seeds
|
| dominance["win_rate_vs_best_fixed_per_seed"] = round(
|
| beats_strongest_seeds / max(pivot.shape[0], 1), 4
|
| )
|
| dominance["per_baseline"] = per_baseline
|
| results["per_seed_dominance"] = dominance
|
| secondary: Dict[str, List[Dict[str, Any]]] = {}
|
| for metric, direction in METRIC_DIRECTIONS.items():
|
| if metric == primary_metric:
|
| continue
|
| piv_m = df.pivot_table(index="seed", columns="method", values=metric).dropna()
|
| avail_m = [m for m in methods if m in piv_m.columns]
|
| if dahs_col not in avail_m:
|
| continue
|
| secondary[metric] = _wilcoxon_for_metric(
|
| piv_m, avail_m, dahs_col, metric, direction
|
| )
|
| results["wilcoxon_secondary"] = secondary
|
|
|
| summary = []
|
| for method in available_methods:
|
| method_df = df[df["method"] == method]
|
| summary.append({
|
| "method": method,
|
| "n": len(method_df),
|
| "makespan_mean": round(float(method_df["makespan"].mean()), 2),
|
| "makespan_std": round(float(method_df["makespan"].std()), 2),
|
| "tardiness_mean": round(float(method_df["total_tardiness"].mean()), 2),
|
| "tardiness_std": round(float(method_df["total_tardiness"].std()), 2),
|
| "sla_mean": round(float(method_df["sla_breach_rate"].mean()), 4),
|
| "cycle_mean": round(float(method_df["avg_cycle_time"].mean()), 2),
|
| "throughput_mean": round(float(method_df["throughput"].mean()), 2),
|
| })
|
| results["summary"] = summary
|
|
|
| RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
| with open(RESULTS_DIR / "statistical_tests.json", "w") as f:
|
| json.dump(results, f, indent=2)
|
| logger.info("Saved statistical_tests.json")
|
|
|
| return results
|
|
|
|
|
|
|
|
|
|
|
|
|
| def run_switching_analysis(df: pd.DataFrame) -> Dict[str, Any]:
|
| """Analyze DAHS switching behavior by running sample seeds with switching logs enabled."""
|
| from src.heuristics import fifo_dispatch
|
| from src.simulator import WarehouseSimulator
|
| from src.features import FeatureExtractor
|
| from src.hybrid_scheduler import BatchwiseSelector
|
| import joblib as _joblib
|
|
|
| RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
| sample_seeds = list(range(99000, 99010))
|
| per_model: Dict[str, Any] = {}
|
|
|
| for model_name in ("rf", "xgb"):
|
| model_path = MODELS_DIR / f"selector_{model_name}.joblib"
|
| if not model_path.exists():
|
| logger.warning("Model not found: %s", model_path)
|
| continue
|
|
|
| model = _joblib.load(model_path)
|
| total_evals = 0
|
| total_switches = 0
|
| total_hysteresis = 0
|
| total_guardrails = 0
|
| heuristic_counts: Dict[str, int] = {}
|
|
|
| for seed in sample_seeds:
|
| try:
|
| fe = FeatureExtractor()
|
| selector = BatchwiseSelector(model=model, feature_extractor=fe)
|
|
|
| sim = WarehouseSimulator(seed=seed, heuristic_fn=fifo_dispatch, feature_extractor=fe)
|
|
|
| def _make_dispatch(sel, s):
|
| def _d(jobs, t, zone_id):
|
| sel.update_state(s.get_state_snapshot())
|
| return sel.dispatch(jobs, t, zone_id)
|
| return _d
|
|
|
| sim.heuristic_fn = _make_dispatch(selector, sim)
|
| sim.run(duration=600.0)
|
|
|
| summary = selector.switching_log.summary()
|
| n_evals = summary.get("totalEvaluations", 0)
|
| total_evals += n_evals
|
| total_switches += summary.get("switchCount", 0)
|
| total_hysteresis += summary.get("hysteresisBlocked", 0)
|
| total_guardrails += summary.get("guardrailActivations", 0)
|
| for h, frac in summary.get("distribution", {}).items():
|
| heuristic_counts[h] = heuristic_counts.get(h, 0) + int(round(n_evals * frac))
|
|
|
| except Exception as e:
|
| logger.warning("Switching analysis seed %d (%s) failed: %s", seed, model_name, e)
|
|
|
| n = len(sample_seeds)
|
| total_h = sum(heuristic_counts.values())
|
| per_model[f"dahs_{model_name}"] = {
|
| "sample_seeds": n,
|
| "avg_evaluations_per_run": round(total_evals / max(n, 1), 1),
|
| "avg_switches_per_run": round(total_switches / max(n, 1), 1),
|
| "avg_hysteresis_blocked_per_run": round(total_hysteresis / max(n, 1), 1),
|
| "avg_guardrail_activations_per_run": round(total_guardrails / max(n, 1), 1),
|
| "switching_rate_per_interval": round(total_switches / max(total_evals - n, 1), 4),
|
| "heuristic_selection_distribution": {
|
| h: round(c / max(total_h, 1), 4)
|
| for h, c in sorted(heuristic_counts.items())
|
| },
|
| }
|
|
|
| analysis = {
|
| "description": "DAHS_2 batch-wise switching analysis (15-min intervals)",
|
| **per_model,
|
| }
|
|
|
| with open(RESULTS_DIR / "switching_analysis.json", "w") as f:
|
| json.dump(analysis, f, indent=2)
|
| logger.info("Saved switching_analysis.json")
|
|
|
| return analysis
|
|
|
|
|
|
|
|
|
|
|
|
|
| def export_benchmark_json(df: pd.DataFrame) -> None:
|
| """Export summary JSON for the Results page frontend."""
|
| RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
| methods = sorted(df["method"].unique())
|
| summary = []
|
| for method in methods:
|
| mdf = df[df["method"] == method]
|
| summary.append({
|
| "method": method,
|
| "n": len(mdf),
|
| "tardiness": {"mean": float(mdf["total_tardiness"].mean()), "std": float(mdf["total_tardiness"].std())},
|
| "sla": {"mean": float(mdf["sla_breach_rate"].mean()), "std": float(mdf["sla_breach_rate"].std())},
|
| "cycle": {"mean": float(mdf["avg_cycle_time"].mean()), "std": float(mdf["avg_cycle_time"].std())},
|
| "throughput": {"mean": float(mdf["throughput"].mean()), "std": float(mdf["throughput"].std())},
|
| "makespan": {"mean": float(mdf["makespan"].mean()), "std": float(mdf["makespan"].std())},
|
| })
|
|
|
| with open(RESULTS_DIR / "benchmark_summary.json", "w") as f:
|
| json.dump(summary, f, indent=2)
|
| logger.info("Saved benchmark_summary.json")
|
|
|
|
|
|
|
|
|
|
|
|
|
| def generate_plots(df: pd.DataFrame) -> None:
|
| """Generate all 11 dark-theme benchmark plots."""
|
| PLOTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
| methods = sorted(df["method"].unique())
|
| method_colors = {m: COLORS[i % len(COLORS)] for i, m in enumerate(methods)}
|
|
|
|
|
| fig, ax = _dark_fig(figsize=(14, 7))
|
| data_by_method = [df[df["method"] == m]["total_tardiness"].dropna().values for m in methods]
|
| bp = ax.boxplot(data_by_method, labels=methods, patch_artist=True)
|
| for patch, method in zip(bp["boxes"], methods):
|
| patch.set_facecolor(method_colors[method])
|
| patch.set_alpha(0.75)
|
| ax.set_title("Total Tardiness — All Methods", fontsize=14)
|
| ax.set_xlabel("Method")
|
| ax.set_ylabel("Total Tardiness (min)")
|
| ax.tick_params(axis="x", rotation=35)
|
| plt.tight_layout()
|
| plt.savefig(PLOTS_DIR / "benchmark_tardiness.png", dpi=150, facecolor=DARK_BG)
|
| plt.close()
|
|
|
|
|
| fig, ax = _dark_fig(figsize=(12, 6))
|
| sla_means = [df[df["method"] == m]["sla_breach_rate"].mean() * 100 for m in methods]
|
| bars = ax.bar(methods, sla_means, color=[method_colors[m] for m in methods], alpha=0.85)
|
| ax.set_title("Average SLA Breach Rate", fontsize=14)
|
| ax.set_ylabel("SLA Breach Rate (%)")
|
| ax.tick_params(axis="x", rotation=35)
|
| for bar, val in zip(bars, sla_means):
|
| ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.3,
|
| f"{val:.1f}%", ha="center", va="bottom", color=TEXT_COL, fontsize=9)
|
| plt.tight_layout()
|
| plt.savefig(PLOTS_DIR / "sla_breach_bar.png", dpi=150, facecolor=DARK_BG)
|
| plt.close()
|
|
|
|
|
| try:
|
| fig, ax = _dark_fig(figsize=(10, 6))
|
| util_data = []
|
| for m in methods:
|
| mdf = df[df["method"] == m]
|
| util_data.append([mdf["zone_utilization_avg"].mean()])
|
| import seaborn as sns
|
| sns.set_style("dark")
|
| hm = ax.imshow([[v[0] for v in util_data]], aspect="auto", cmap="coolwarm")
|
| ax.set_xticks(range(len(methods)))
|
| ax.set_xticklabels(methods, rotation=35)
|
| ax.set_yticklabels(["Avg Util"])
|
| plt.colorbar(hm, ax=ax, label="Zone Utilization")
|
| ax.set_title("Zone Utilization Heatmap", fontsize=14)
|
| plt.tight_layout()
|
| plt.savefig(PLOTS_DIR / "zone_utilization_heatmap.png", dpi=150, facecolor=DARK_BG)
|
| plt.close()
|
| except Exception:
|
| pass
|
|
|
|
|
| try:
|
| categories = ["Tardiness↓", "SLA↓", "Cycle Time↓", "Throughput↑", "Utilization"]
|
| n_cats = len(categories)
|
| angles = np.linspace(0, 2 * np.pi, n_cats, endpoint=False).tolist()
|
| angles += angles[:1]
|
|
|
| fig = plt.figure(figsize=(10, 10))
|
| fig.patch.set_facecolor(DARK_BG)
|
| ax = fig.add_subplot(111, polar=True)
|
| ax.set_facecolor(DARK_AX)
|
|
|
| for i, method in enumerate(methods[:6]):
|
| mdf = df[df["method"] == method]
|
| values = [
|
| 1 - float(np.clip(mdf["total_tardiness"].mean() / max(df["total_tardiness"].max(), 1e-9), 0, 1)),
|
| 1 - float(mdf["sla_breach_rate"].mean()),
|
| 1 - float(np.clip(mdf["avg_cycle_time"].mean() / df["avg_cycle_time"].max(), 0, 1)),
|
| float(np.clip(mdf["throughput"].mean() / df["throughput"].max(), 0, 1)),
|
| float(mdf["zone_utilization_avg"].mean()),
|
| ]
|
| values += values[:1]
|
| ax.plot(angles, values, color=COLORS[i], linewidth=2, label=method)
|
| ax.fill(angles, values, color=COLORS[i], alpha=0.1)
|
|
|
| ax.set_xticks(angles[:-1])
|
| ax.set_xticklabels(categories, color=TEXT_COL)
|
| ax.legend(loc="upper right", bbox_to_anchor=(1.3, 1.1))
|
| ax.set_title("Performance Radar Chart", color=TEXT_COL, fontsize=14, pad=20)
|
| plt.tight_layout()
|
| plt.savefig(PLOTS_DIR / "radar_chart.png", dpi=150, facecolor=DARK_BG)
|
| plt.close()
|
| except Exception:
|
| pass
|
|
|
|
|
| fig, ax = _dark_fig(figsize=(10, 7))
|
| for method in methods:
|
| mdf = df[df["method"] == method]
|
| ax.scatter(
|
| mdf["makespan"].mean(),
|
| mdf["total_tardiness"].mean(),
|
| color=method_colors[method],
|
| s=120, label=method, zorder=5,
|
| )
|
| ax.set_title("Pareto Front: Makespan vs Tardiness", fontsize=14)
|
| ax.set_xlabel("Mean Makespan (min)")
|
| ax.set_ylabel("Mean Total Tardiness (min)")
|
| ax.legend(facecolor=DARK_AX, labelcolor=TEXT_COL)
|
| plt.tight_layout()
|
| plt.savefig(PLOTS_DIR / "pareto_front.png", dpi=150, facecolor=DARK_BG)
|
| plt.close()
|
|
|
|
|
| fig, ax = _dark_fig(figsize=(12, 6))
|
| thru_means = [df[df["method"] == m]["throughput"].mean() for m in methods]
|
| ax.bar(methods, thru_means, color=[method_colors[m] for m in methods], alpha=0.85)
|
| ax.set_title("Average Throughput (jobs/hour)", fontsize=14)
|
| ax.set_ylabel("Throughput (jobs/hr)")
|
| ax.tick_params(axis="x", rotation=35)
|
| plt.tight_layout()
|
| plt.savefig(PLOTS_DIR / "throughput_comparison.png", dpi=150, facecolor=DARK_BG)
|
| plt.close()
|
|
|
| logger.info("Generated plots in %s", PLOTS_DIR)
|
|
|
|
|
|
|
|
|
|
|
|
|
| def run_full_evaluation(
|
| seeds: Optional[List[int]] = None,
|
| n_workers: int = 4,
|
| ) -> Dict[str, Any]:
|
| """Run complete evaluation: benchmark + stats + plots + JSON export."""
|
| df = run_benchmark(seeds=seeds, n_workers=n_workers)
|
| stats_results = run_statistical_analysis(df)
|
| switching = run_switching_analysis(df)
|
| export_benchmark_json(df)
|
| generate_plots(df)
|
|
|
| return {
|
| "benchmark": df,
|
| "stats": stats_results,
|
| "switching": switching,
|
| }
|
|
|
|
|
| if __name__ == "__main__":
|
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
|
|
| run_full_evaluation(seeds=list(range(99000, 99020)), n_workers=2)
|
|
|