"""Parallel experiment driver. Produces results.json + convergence.png.""" import json, time, sys, os; import numpy as np from multiprocessing import Pool, cpu_count from scipy.stats import wilcoxon import runner, convergence as cv N_RUNS = int(sys.argv[1]) if len(sys.argv) > 1 else 30 N_STEPS = 15; ITERS = 40 TOPOS = [10, 25, 50]; LAMBDAS = [10, 50, 100, 200] BASELINES = ["RoundRobin", "LeastConn", "ACO", "PSO", "WOA", "AdaptiSwarm"] METRICS = ["latency_ms", "utilization", "energy", "jain", "A_deadline", "all_deadline", "conv_iter"] def worker(args): name, nn, lam, seed = args return runner.run_episode(name, nn, lam, seed, n_steps=N_STEPS, iters=ITERS, probe_step=8) def run_all(nn, lam, names, n_runs): tasks = [(n, nn, lam, s) for s in range(n_runs) for n in names] with Pool(processes=min(8, cpu_count())) as p: results = p.map(worker, tasks) out = {n: {k: [] for k in METRICS} for n in names} for r, (n, _, _, _) in zip(results, tasks): for k in METRICS: out[n][k].append(r[k]) return out def mean_std(v): a = np.array(v, dtype=float); return [float(a.mean()), float(a.std())] t0 = time.time() grid = {} for nn in TOPOS: grid[nn] = {} for lam in LAMBDAS: out = run_all(nn, lam, BASELINES, N_RUNS) grid[nn][lam] = {m: {k: out[m][k] for k in METRICS} for m in BASELINES} print(f"[grid {time.time()-t0:.0f}s] nodes={nn} lam={lam}", flush=True) conv_methods = ["ACO", "PSO", "WOA", "AdaptiSwarm"] conv_iters = {m: [] for m in conv_methods}; curve_acc = {m: [] for m in conv_methods} for s in range(N_RUNS): curves, cc, tgt = cv.run_curves(25, 100, seed=s, iters=ITERS, methods=conv_methods) for m in conv_methods: conv_iters[m].append(cc[m]); curve_acc[m].append(curves[m]) conv_curve_mean = {m: np.mean(np.stack(curve_acc[m]), axis=0).tolist() for m in conv_methods} abl_ep_names = ["Abl_ACOonly", "Abl_PSOrand", "Abl_PSOseed", "AdaptiSwarm"] abl_cv_names = ["Abl_ACOonly", "Abl_PSOrand", "Abl_PSOseed", "AdaptiSwarm"] abl_ep = {m: {k: [] for k in ["latency_ms","utilization","jain"]} for m in abl_ep_names} abl_cv = {m: [] for m in abl_cv_names} for s in range(N_RUNS): for m in abl_ep_names: r = runner.run_episode(m, 50, 200, seed=s, n_steps=N_STEPS, iters=ITERS, probe_step=8) for k in abl_ep[m]: abl_ep[m][k].append(r[k]) _, cc, _ = cv.run_curves(50, 200, seed=s, iters=ITERS, methods=abl_cv_names) for m in abl_cv_names: abl_cv[m].append(cc[m]) W = {} for nn in TOPOS: W[nn] = {} for lam in LAMBDAS: W[nn][lam] = {} a_lat = np.array(grid[nn][lam]["AdaptiSwarm"]["latency_ms"]) for m in BASELINES: if m == "AdaptiSwarm": continue try: _, p = wilcoxon(a_lat, np.array(grid[nn][lam][m]["latency_ms"])) except ValueError: p = float("nan") W[nn][lam][m] = float(p) S = {} for nn in TOPOS: S[nn] = {} for lam in LAMBDAS: S[nn][lam] = {m: {k: mean_std(grid[nn][lam][m][k]) for k in METRICS} for m in BASELINES} out = dict(config=dict(n_runs=N_RUNS, n_steps=N_STEPS, iters=ITERS, topos=TOPOS, lambdas=LAMBDAS, baselines=BASELINES), summary=S, wilcoxon=W, convergence=dict(iters={m: mean_std(conv_iters[m]) for m in conv_methods}, curves=conv_curve_mean), ablation=dict(episode={m: {k: mean_std(abl_ep[m][k]) for k in abl_ep[m]} for m in abl_ep}, convergence={m: mean_std(abl_cv[m]) for m in abl_cv})) with open("results.json", "w") as f: json.dump(out, f, indent=2) print(f"[json {time.time()-t0:.0f}s]", flush=True) import matplotlib; matplotlib.use("Agg"); import matplotlib.pyplot as plt plt.figure(figsize=(6, 4)) styles = {"ACO": "C0--", "PSO": "C1-.", "WOA": "C2:", "AdaptiSwarm": "C3-"} for m in conv_methods: plt.plot(range(1, ITERS+1), conv_curve_mean[m], styles[m], label=m, linewidth=1.8) plt.xlabel("Iteration"); plt.ylabel("Best fitness $F(\\mathcal{A})$") plt.title("Convergence (25-node, $\\lambda=100$ tasks/s)"); plt.legend() plt.grid(True, alpha=0.3); plt.yscale("log"); plt.tight_layout() plt.savefig("convergence.png", dpi=160) print(f"[done {time.time()-t0:.0f}s]", flush=True)