# -*- coding: utf-8 -*- """ MelodyDeterminism - Canonical Determinism Demo (NumPy-only, CPU) Esteso con: - PRNG switch: philox (veloce, GPU-like) / sha256 (indipendente) - Softmax canonica con riduzioni pairwise: max tree + sum (kahan/tree) - Edge test: maschere, ±inf, nan, invariance a shift, idempotenza - dtype selezionabile, benchmark parametrico """ import os os.environ.setdefault("OMP_NUM_THREADS", "1") os.environ.setdefault("MKL_NUM_THREADS", "1") import hashlib import json import time import tempfile from typing import Any, Dict, List, Tuple import numpy as np import gradio as gr # ========================== Utils ========================== def sha256_ndarray(a: np.ndarray) -> str: return hashlib.sha256(np.ascontiguousarray(a).tobytes()).hexdigest() def tol_stats(ref: np.ndarray, got: np.ndarray, eps: float = 1e-12) -> Dict[str, float]: ref64 = np.asarray(ref, dtype=np.float64) got64 = np.asarray(got, dtype=np.float64) diff = got64 - ref64 mae = float(np.max(np.abs(diff))) denom = float(max(np.max(np.abs(ref64)), eps)) mre = float(np.max(np.abs(diff) / denom)) return {"max_abs_err": mae, "max_rel_err": mre} # ========================== PRNG config ========================== PRNG_MODE = "philox" # "philox" (consigliato) oppure "sha256" def _philox_random(seed: int, shape): g = np.random.Generator(np.random.Philox(int(seed))) return g.random(shape, dtype=np.float64).astype(np.float32, copy=False) # ====================== Deterministic ops ======================== class D: @staticmethod def counter_prng(seed: int, counter: int, shape: Tuple[int, ...]) -> np.ndarray: """ PRNG dichiarativo: - philox: veloce, counter-based, vicino all'ambiente GPU - sha256: indipendente da NumPy, più lento """ if PRNG_MODE == "philox": return _philox_random(seed + counter, shape) # Fallback SHA256 (deterministico, ma lento) total = int(np.prod(shape)) if len(shape) else 1 vals: List[float] = [] i = 0 while len(vals) < total: payload = f"{seed}:{counter}:{i}".encode("utf-8") h = hashlib.sha256(payload).digest() # 32 bytes for k in range(0, 32, 8): chunk = int.from_bytes(h[k:k+8], "big", signed=False) vals.append((chunk % (1 << 53)) / float(1 << 53)) # [0,1) if len(vals) >= total: break i += 1 arr = np.array(vals, dtype=np.float64).reshape(shape) return arr.astype(np.float32, copy=False) @staticmethod def _tree_sum_row(vec64: np.ndarray) -> float: v = np.asarray(vec64, dtype=np.float64) n = v.size m = 1 << (n - 1).bit_length() if m != n: v = np.pad(v, (0, m - n), constant_values=0.0) while v.size > 1: v = v[0::2] + v[1::2] return float(v[0]) @staticmethod def _tree_max_row(vec64: np.ndarray) -> float: """Riduzione deterministica del massimo (pairwise, GPU-like).""" v = np.asarray(vec64, dtype=np.float64) n = v.size m = 1 << (n - 1).bit_length() if m != n: v = np.pad(v, (0, m - n), constant_values=-np.inf) while v.size > 1: v = np.maximum(v[0::2], v[1::2]) return float(v[0]) @staticmethod def tree_fixed_reduce(x: np.ndarray) -> np.float64: """Somma pairwise deterministica su vettore intero.""" y = np.asarray(x, dtype=np.float64).reshape(-1) if y.size == 0: return np.float64(0.0) n = y.size m = 1 << (n - 1).bit_length() if m != n: y = np.pad(y, (0, m - n), constant_values=0.0) while y.size > 1: y = y[0::2] + y[1::2] return np.float64(y[0]) @staticmethod def kahan_sum(x: np.ndarray) -> np.float64: y = np.asarray(x, dtype=np.float64).reshape(-1) s = np.float64(0.0) c = np.float64(0.0) for v in y: yk = v - c t = s + yk c = (t - s) - yk s = t return s @staticmethod def deterministic_softmax(x: np.ndarray, axis: int = -1, mask: np.ndarray = None, sum_mode: str = "kahan") -> np.ndarray: """ Softmax stabile e deterministica. - max pairwise (tree) per asse scelto (GPU-like) - sum_mode: 'kahan' (più precisa) | 'tree' (pairwise GPU-like) - mask: True = valido, False = mascherato a -inf """ x64 = np.asarray(x, dtype=np.float64) if mask is not None: x64 = np.where(mask.astype(bool), x64, -np.inf) if axis < 0: axis = x64.ndim + axis # --- max deterministico pairwise lungo axis --- x_move = np.moveaxis(x64, axis, -1) # [..., L] flatx = x_move.reshape(-1, x_move.shape[-1]) m_rows = np.array([D._tree_max_row(flatx[i]) for i in range(flatx.shape[0])], dtype=np.float64) m = np.moveaxis(m_rows.reshape(x_move.shape[:-1] + (1,)), -1, axis) z = np.exp(x64 - m) # --- sum deterministica (kahan/tree) lungo axis --- z_move = np.moveaxis(z, axis, -1) # [..., L] flat = z_move.reshape(-1, z_move.shape[-1]) if sum_mode == "tree": sums = np.array([D._tree_sum_row(flat[i]) for i in range(flat.shape[0])], dtype=np.float64) else: sums = np.zeros((flat.shape[0],), dtype=np.float64) comp = np.zeros((flat.shape[0],), dtype=np.float64) for j in range(flat.shape[-1]): yj = flat[:, j] - comp tj = sums + yj comp = (tj - sums) - yj sums = tj sums = sums.reshape(z_move.shape[:-1]) denom = np.expand_dims(sums, axis=-1) out = (z_move / denom) out = np.where(np.isfinite(out), out, 0.0) # sicurezza in caso di tutti -inf out = out.astype(x.dtype, copy=False) return np.moveaxis(out.reshape(z_move.shape), -1, axis) @staticmethod def deterministic_categorical(logits: np.ndarray, num_samples: int, seed: int, sum_mode: str = "kahan") -> np.ndarray: """ Sampling deterministico (vectorizzato): - softmax canonica (max tree, sum kahan/tree) - CDF una volta - U in blocco con PRNG dichiarativo (philox/sha256) - searchsorted(..., 'left') ⇒ tie-break deterministico (min indice) """ x = np.asarray(logits, dtype=np.float64) single = False if x.ndim == 1: x = x[None, :] single = True B, V = x.shape probs = D.deterministic_softmax(x, axis=-1, sum_mode=sum_mode).astype(np.float64) cdf = np.cumsum(probs, axis=-1) # clamp robusto per chiusura [0,1] np.clip(cdf, 0.0, 1.0, out=cdf) cdf[:, -1] = 1.0 U = D.counter_prng(seed, 0, (B, num_samples)).astype(np.float64) # tie-break deterministico: side='left' idx_rows = [np.searchsorted(cdf[b], U[b], side="left") for b in range(B)] out = np.stack(idx_rows, axis=0).astype(np.int64, copy=False) if single: out = out[0] return out # ======================= Standard refs ======================= def standard_sum(x: np.ndarray) -> Tuple[np.float64, float, str]: t0 = time.perf_counter() y = np.sum(x.astype(np.float64)) dt = (time.perf_counter() - t0) * 1000.0 return np.float64(y), dt, hashlib.sha256(np.ascontiguousarray(np.array([y], dtype=np.float64)).tobytes()).hexdigest() def standard_softmax(x: np.ndarray, axis: int = -1) -> Tuple[np.ndarray, float, str]: t0 = time.perf_counter() xx = x.astype(np.float64) m = np.max(xx, axis=axis, keepdims=True) z = np.exp(xx - m) s = np.sum(z, axis=axis, keepdims=True) y = (z / s).astype(x.dtype, copy=False) dt = (time.perf_counter() - t0) * 1000.0 return y, dt, sha256_ndarray(y) def standard_categorical(logits: np.ndarray, num_samples: int, seed: int) -> Tuple[np.ndarray, float, str]: rng = np.random.default_rng(seed) t0 = time.perf_counter() x = logits.astype(np.float64) if x.ndim == 1: probs = np.exp(x - np.max(x)); probs /= probs.sum() y = rng.choice(len(x), size=num_samples, replace=True, p=probs) else: B, V = x.shape probs = np.exp(x - np.max(x, axis=1, keepdims=True)) probs /= probs.sum(axis=1, keepdims=True) y = np.stack([rng.choice(V, size=num_samples, replace=True, p=probs[b]) for b in range(B)], axis=0) dt = (time.perf_counter() - t0) * 1000.0 return y.astype(np.int64, copy=False), dt, sha256_ndarray(y.astype(np.int64, copy=False)) # ======================== Suite helpers ======================== def gen_demo(seed: int, shape: Tuple[int, ...], dtype: str = "float32") -> np.ndarray: arr = D.counter_prng(seed, 0, shape).astype(np.float64, copy=False) return arr.astype(np.float32 if dtype == "float32" else np.float64, copy=False) def compare_close(a: np.ndarray, b: np.ndarray, atol=1e-9, rtol=1e-9) -> bool: return np.allclose(a.astype(np.float64), b.astype(np.float64), atol=atol, rtol=rtol) def run_full_suite(seed: int, n: int, v: int, dtype: str, sum_mode: str) -> Dict[str, Any]: rep: Dict[str, Any] = {} # Reduce x = gen_demo(seed, (n, v), dtype=dtype).reshape(-1) s_std, ms_std, h_std = standard_sum(x) s_tree = D.tree_fixed_reduce(x) s_kah = D.kahan_sum(x) rep["reduce"] = { "ms": {"standard": round(ms_std, 3)}, "values": {"standard": float(s_std), "tree": float(s_tree), "kahan": float(s_kah)}, "tolerance_vs_standard": { "tree": tol_stats(np.array([s_std]), np.array([s_tree])), "kahan": tol_stats(np.array([s_std]), np.array([s_kah])), }, "hash": { "standard": h_std, "tree": hashlib.sha256(np.ascontiguousarray(np.array([s_tree], dtype=np.float64)).tobytes()).hexdigest(), "kahan": hashlib.sha256(np.ascontiguousarray(np.array([s_kah], dtype=np.float64)).tobytes()).hexdigest(), }, "equalities": { "standard_vs_tree_equal": bool(abs(s_std - s_tree) < 1e-12), "standard_vs_kahan_equal": bool(abs(s_std - s_kah) < 1e-12), } } # Softmax logits = gen_demo(seed + 1, (n, v), dtype=dtype) sm_std, ms_sm_std, h_sm_std = standard_softmax(logits, axis=-1) t0 = time.perf_counter() sm_can = D.deterministic_softmax(logits, axis=-1, sum_mode=sum_mode) ms_sm_can = (time.perf_counter() - t0) * 1000.0 rep["softmax"] = { "ms": {"standard": round(ms_sm_std, 3), "canonical": round(ms_sm_can, 3)}, "allclose": bool(compare_close(sm_std, sm_can, 1e-9, 1e-9)), "tolerance_vs_standard": tol_stats(sm_std, sm_can), "hash": {"standard": h_sm_std, "canonical": sha256_ndarray(sm_can)}, "sum_mode": sum_mode, } # Sampling logits1 = gen_demo(seed + 2, (v,), dtype=dtype) samp_std, ms_samp_std, h_samp_std = standard_categorical(logits1, num_samples=16, seed=seed) t0 = time.perf_counter() samp_det = D.deterministic_categorical(logits1, num_samples=16, seed=seed, sum_mode=sum_mode) ms_samp_det = (time.perf_counter() - t0) * 1000.0 samp_det2 = D.deterministic_categorical(logits1, num_samples=16, seed=seed, sum_mode=sum_mode) rep["sampling"] = { "ms": {"standard": round(ms_samp_std, 3), "deterministic": round(ms_samp_det, 3)}, "standard_vs_deterministic_equal": bool(np.array_equal(samp_std, samp_det)), "deterministic_stable": bool(np.array_equal(samp_det, samp_det2)), "hash": { "standard": h_samp_std, "deterministic": sha256_ndarray(samp_det), "deterministic_again": sha256_ndarray(samp_det2), }, "samples": {"standard": samp_std.tolist(), "deterministic": samp_det.tolist()}, "sum_mode": sum_mode, } rep["meta"] = { "backend": "numpy", "seed": seed, "shape": [n, v], "dtype": dtype, "prng": PRNG_MODE, "note": "NumPy-only canonical ops; philox/sha256 PRNG; softmax max/sum pairwise deterministici.", } return rep def run_edge_softmax(seed: int, n: int, v: int, dtype: str, sum_mode: str) -> Dict[str, Any]: """ Edge cases: ±inf, nan, mask, invariance a shift, idempotenza. """ rng = np.random.default_rng(seed) x = rng.standard_normal((n, v)).astype(np.float64) # Estremi nella prima riga x[0, 0] = np.inf x[0, 1] = -np.inf x[0, 2] = np.nan # Mask: ~80% valido mask = rng.random((n, v)) > 0.2 x = x.astype(np.float32 if dtype == "float32" else np.float64, copy=False) p1 = D.deterministic_softmax(x, axis=-1, mask=mask, sum_mode=sum_mode) # invariance a shift c = 123.456 p2 = D.deterministic_softmax(x + c, axis=-1, mask=mask, sum_mode=sum_mode) inv_shift = bool(np.allclose(p1, p2)) # idempotenza p3 = D.deterministic_softmax(p1, axis=-1, mask=np.ones_like(p1, dtype=bool), sum_mode=sum_mode) idempotent = bool(np.allclose(p1, p3)) # conserva probabilità conserve = bool(np.allclose(np.sum(p1, axis=-1), 1.0)) return { "sum_mode": sum_mode, "dtype": dtype, "mask_ratio": float(np.mean(mask)), "invariance_shift": inv_shift, "idempotent": idempotent, "conserve_prob": conserve, "finite": bool(np.isfinite(p1).all()), "tolerance_against_self": tol_stats(p1, p2), } # ========================= Gradio callbacks ========================= def run_single_test(test_kind: str, seed: float, n: float, v: float, dtype: str, sum_mode: str, prng_choice: str): global PRNG_MODE PRNG_MODE = prng_choice # aggiorna PRNG globale seed, n, v = int(seed), int(n), int(v) if test_kind == "Full suite": rep = run_full_suite(seed, n, v, dtype=dtype, sum_mode=sum_mode) text_lines = [ "== MelodyDeterminism - Full suite (NumPy) ==", f"Seed: {seed} Shape: ({n},{v}) dtype={dtype} PRNG={PRNG_MODE} sum={sum_mode}", "", "[Reduce]", f" values: {rep['reduce']['values']}", f" tol(tree): {rep['reduce']['tolerance_vs_standard']['tree']}", f" tol(kahan): {rep['reduce']['tolerance_vs_standard']['kahan']}", "", "[Softmax]", f" allclose(standard, canonical): {rep['softmax']['allclose']}", f" tol: {rep['softmax']['tolerance_vs_standard']}", f" ms: std={rep['softmax']['ms']['standard']} canonical={rep['softmax']['ms']['canonical']} (sum={sum_mode})", "", "[Sampling]", f" deterministic_stable (two runs): {rep['sampling']['deterministic_stable']}", f" ms: std={rep['sampling']['ms']['standard']} deterministic={rep['sampling']['ms']['deterministic']} (PRNG={PRNG_MODE})", f" samples_deterministic: {rep['sampling']['samples']['deterministic']}", ] return "\n".join(text_lines), _save_json(rep) elif test_kind == "Softmax (edge: mask ±inf/nan)": rep = run_edge_softmax(seed, n, v, dtype=dtype, sum_mode=sum_mode) text = [ "== Softmax Edge ==", f"Seed: {seed} Shape: ({n},{v}) dtype={dtype} PRNG={PRNG_MODE} sum={sum_mode}", f"invariance_shift: {rep['invariance_shift']}", f"idempotent: {rep['idempotent']}", f"conserve_prob (≈1): {rep['conserve_prob']}", f"finite: {rep['finite']}", f"mask_ratio: {rep['mask_ratio']:.2f}", f"tolerance_against_self: {rep['tolerance_against_self']}", ] return "\n".join(text), _save_json(rep) elif test_kind == "Reduce (tree vs kahan vs standard)": x = gen_demo(seed, (n, v), dtype=dtype).reshape(-1) s_std, ms_std, h_std = standard_sum(x) s_tree = D.tree_fixed_reduce(x) s_kah = D.kahan_sum(x) rep = { "seed": seed, "N": int(x.size), "values": {"standard": float(s_std), "tree": float(s_tree), "kahan": float(s_kah)}, "tolerance_vs_standard": { "tree": tol_stats(np.array([s_std]), np.array([s_tree])), "kahan": tol_stats(np.array([s_std]), np.array([s_kah])), }, "ms": {"standard": round(ms_std, 3)}, } text = [ "== Reduce ==", f"seed={seed} len={x.size} dtype={dtype}", f"standard: {float(s_std)} ms={round(ms_std,3)}", f"tree: {float(s_tree)} tol={rep['tolerance_vs_standard']['tree']}", f"kahan: {float(s_kah)} tol={rep['tolerance_vs_standard']['kahan']}", ] return "\n".join(text), _save_json(rep) elif test_kind == "Softmax (canonical vs standard)": logits = gen_demo(seed, (n, v), dtype=dtype) sm_std, ms_std, h_std = standard_softmax(logits, axis=-1) t0 = time.perf_counter(); sm_can = D.deterministic_softmax(logits, axis=-1, sum_mode=sum_mode) ms_can = (time.perf_counter() - t0) * 1000.0 rep = { "seed": seed, "shape": [n, v], "dtype": dtype, "sum_mode": sum_mode, "ms": {"standard": round(ms_std,3), "canonical": round(ms_can,3)}, "tolerance_vs_standard": tol_stats(sm_std, sm_can), "hash": {"standard": h_std, "canonical": sha256_ndarray(sm_can)}, } text = [ "== Softmax ==", f"seed={seed} shape=({n},{v}) dtype={dtype} sum={sum_mode}", f"tol: {rep['tolerance_vs_standard']}", f"ms: standard={round(ms_std,3)} canonical={round(ms_can,3)}", ] return "\n".join(text), _save_json(rep) elif test_kind == "Categorical sampling (deterministic)": logits = gen_demo(seed, (v,), dtype=dtype) samp_std, ms_std, h_std = standard_categorical(logits, num_samples=16, seed=seed) t0 = time.perf_counter(); det1 = D.deterministic_categorical(logits, num_samples=16, seed=seed, sum_mode=sum_mode) ms_det = (time.perf_counter() - t0) * 1000.0 det2 = D.deterministic_categorical(logits, num_samples=16, seed=seed, sum_mode=sum_mode) rep = { "seed": seed, "vocab": v, "samples": 16, "dtype": dtype, "prng": PRNG_MODE, "sum_mode": sum_mode, "standard_samples": samp_std.tolist(), "deterministic_samples": det1.tolist(), "deterministic_stable": bool(np.array_equal(det1, det2)), "ms": {"standard": round(ms_std,3), "deterministic": round(ms_det,3)}, } text = [ "== Categorical sampling ==", f"seed={seed} vocab={v} samples=16 dtype={dtype} PRNG={PRNG_MODE} sum={sum_mode}", f"deterministic_stable: {rep['deterministic_stable']}", f"ms: std={rep['ms']['standard']} deterministic={rep['ms']['deterministic']}", f"deterministic samples (first 16): {det1.tolist()}", ] return "\n".join(text), _save_json(rep) else: rep = {"error": "unknown test"} return "Unknown test.", _save_json(rep) def _save_json(payload: Dict[str, Any]) -> str: json_bytes = json.dumps(payload, indent=2).encode("utf-8") tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".json") try: tmp.write(json_bytes); tmp.flush() path = tmp.name finally: tmp.close() return path # =========================== Benchmark =========================== def _timed(fn, repeats: int = 10, warmup: int = 3) -> float: for _ in range(warmup): fn() t0 = time.perf_counter() for _ in range(repeats): fn() t1 = time.perf_counter() return (t1 - t0) / repeats def bench_suite(ns=(1, 8, 32), vs=(128, 512, 1024), dtype="float32", sum_mode="kahan"): results = [] for n in ns: for v in vs: x = np.random.standard_normal((n, v)).astype(np.float32 if dtype=="float32" else np.float64) def f_std(): p = np.exp(x - np.max(x, axis=1, keepdims=True)) p = p / np.sum(p, axis=1, keepdims=True) _ = np.argmax(p, axis=1) def f_can(): for i in range(n): _ = D.deterministic_categorical(x[i], num_samples=1, seed=42, sum_mode=sum_mode) t_std = _timed(f_std); t_can = _timed(f_can) results.append({ "n": int(n), "v": int(v), "t_std_ms": round(1000.0 * t_std, 3), "t_can_ms": round(1000.0 * t_can, 3), "overhead_pct": round(100.0 * (t_can - t_std) / max(t_std, 1e-9), 2), "dtype": dtype, "prng": PRNG_MODE, "sum": sum_mode, }) return results def run_benchmark_and_save(dtype: str, sum_mode: str, prng_choice: str): global PRNG_MODE PRNG_MODE = prng_choice res = bench_suite(dtype=dtype, sum_mode=sum_mode) headers = ["n","v","t_std_ms","t_can_ms","overhead_pct","dtype","prng","sum"] table = [[r[h] for h in headers] for r in res] tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", mode="w", encoding="utf-8") try: tmp.write(",".join(headers) + "\n") for row in table: tmp.write(",".join(str(x) for x in row) + "\n") tmp.flush() path = tmp.name finally: tmp.close() jtmp = tempfile.NamedTemporaryFile(delete=False, suffix=".json", mode="w", encoding="utf-8") try: json.dump(res, jtmp, indent=2) jtmp.flush() jpath = jtmp.name finally: jtmp.close() return table, path, jpath # =========================== Gradio UI =========================== with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# MelodyDeterminism - Canonical Determinism Demo (NumPy / CPU)") gr.Markdown( "Deterministic ops: reduce (Kahan/Tree), softmax canonica (max tree + sum kahan/tree), sampling RNG dichiarativo. " "PRNG: Philox (GPU-like) o SHA256 (indipendente). Edge: maschera, ±inf, nan, shift, idempotenza. " "Benchmark parametrico con overhead%." ) with gr.Tabs(): with gr.Tab("Suite"): with gr.Row(): with gr.Column(scale=1): test_kind = gr.Dropdown( label="Select test", choices=[ "Full suite", "Reduce (tree vs kahan vs standard)", "Softmax (canonical vs standard)", "Softmax (edge: mask ±inf/nan)", "Categorical sampling (deterministic)", ], value="Full suite", ) seed = gr.Number(value=42, precision=0, label="Seed") n = gr.Slider(1, 256, step=1, value=8, label="Rows / Batch (n)") v = gr.Slider(2, 4096, step=1, value=32, label="Width / Vocab (v)") dtype = gr.Radio(["float32","float64"], value="float32", label="dtype") sum_mode = gr.Radio(["kahan","tree"], value="tree", label="Softmax sum") # default GPU-like prng_choice = gr.Radio(["philox","sha256"], value="philox", label="PRNG") run_btn = gr.Button("Run") with gr.Column(scale=2): report = gr.Textbox(label="Report", lines=24) download = gr.File(label="Download JSON report") run_btn.click( run_single_test, inputs=[test_kind, seed, n, v, dtype, sum_mode, prng_choice], outputs=[report, download] ) with gr.Tab("Benchmark"): gr.Markdown("Confronto standard vs deterministico (sampling) con le scelte sotto.") dtype_b = gr.Radio(["float32","float64"], value="float32", label="dtype") sum_mode_b = gr.Radio(["kahan","tree"], value="tree", label="Softmax sum") # default GPU-like prng_b = gr.Radio(["philox","sha256"], value="philox", label="PRNG") bench_btn = gr.Button("Esegui benchmark") bench_table = gr.Dataframe( headers=["n","v","t_std_ms","t_can_ms","overhead_pct","dtype","prng","sum"], label="Latenze (ms) e overhead (%)", wrap=True, ) bench_csv = gr.File(label="Scarica CSV") bench_json = gr.File(label="Scarica JSON") bench_btn.click(run_benchmark_and_save, inputs=[dtype_b, sum_mode_b, prng_b], outputs=[bench_table, bench_csv, bench_json]) if __name__ == "__main__": demo.queue().launch()