| """ |
| π± Leviathan v2 β EVM Exploit Topology Classifier |
| Pure NumPy inference. Zero PyTorch dependency. |
| |
| Architecture: |
| EVM Trace β Hilbert Encoder β 256Γ256 Manifold |
| β Downsample 20Γ20 β Conv2d(2β16) β Conv2d(16β16) |
| β FC(4096β64) β FC(64β1) β Raw Score |
| β PRIME Bistable Attractor Refinement β THREAT/CLEAN |
| |
| Channels: |
| 0: Opcode energy density (H activator field) |
| 1: Stack depth / state mutation intensity (V inhibitor field) |
| |
| Usage: |
| from leviathan import Leviathan |
| model = Leviathan.from_safetensors("leviathan_v2_session_trained.safetensors") |
| score = model.predict_manifold(H_256, V_256) |
| verdict = model.prime_refine(score) |
| """ |
| from __future__ import annotations |
| import numpy as np |
| from pathlib import Path |
|
|
| CNN_SPATIAL = 20 |
| PRIME_ETA = 3.50 |
| PRIME_GAMMA = 0.30 |
| PRIME_BETA = 0.10 |
| PRIME_SIGMA = 0.05 |
| PRIME_T = 256 |
| THREAT_THRESHOLD = 0.90 |
| BENIGN_THRESHOLD = 0.10 |
|
|
|
|
| def _conv2d(x, weight, bias): |
| B, C_in, H, W = x.shape |
| C_out, _, kH, kW = weight.shape |
| oH, oW = H - kH + 1, W - kW + 1 |
| cols = np.zeros((B, C_in, kH, kW, oH, oW), dtype=x.dtype) |
| for i in range(kH): |
| for j in range(kW): |
| cols[:, :, i, j, :, :] = x[:, :, i:i+oH, j:j+oW] |
| cols_flat = cols.reshape(B, C_in * kH * kW, oH * oW) |
| w_flat = weight.reshape(C_out, C_in * kH * kW) |
| out = (w_flat @ cols_flat).reshape(B, C_out, oH, oW) |
| out += bias[None, :, None, None] |
| return out |
|
|
|
|
| def _relu(x): |
| return np.maximum(x, 0) |
|
|
|
|
| def _linear(x, weight, bias): |
| return x @ weight.T + bias[None, :] |
|
|
|
|
| def _downsample(field, target): |
| H, W = field.shape |
| if H == target and W == target: |
| return field |
| bh, bw = H // target, W // target |
| if bh < 1 or bw < 1: |
| xi = np.round(np.linspace(0, H-1, target)).astype(int) |
| yi = np.round(np.linspace(0, W-1, target)).astype(int) |
| return field[np.ix_(xi, yi)] |
| cropped = field[:bh*target, :bw*target] |
| return cropped.reshape(target, bh, target, bw).mean(axis=(1, 3)) |
|
|
|
|
| def _sigmoid(x, gamma): |
| clamped = max(-500.0, min(500.0, -gamma * x)) |
| return 1.0 / (1.0 + np.exp(clamped)) |
|
|
|
|
| class Leviathan: |
| """ |
| Leviathan v2 β EVM Exploit Topology Classifier. |
| |
| Two-channel CNN on Hamiltonian energy manifolds from EVM traces. |
| 256x256 manifold -> downsample 20x20 -> CNN -> score -> PRIME refine. |
| """ |
|
|
| def __init__(self, weights): |
| self.conv0_w = weights["conv_net.0.weight"] |
| self.conv0_b = weights["conv_net.0.bias"] |
| self.conv1_w = weights["conv_net.2.weight"] |
| self.conv1_b = weights["conv_net.2.bias"] |
| self.fc1_w = weights["fc.1.weight"] |
| self.fc1_b = weights["fc.1.bias"] |
| self.fc2_w = weights["fc.3.weight"] |
| self.fc2_b = weights["fc.3.bias"] |
|
|
| @classmethod |
| def from_safetensors(cls, path): |
| from safetensors.numpy import load_file |
| return cls(load_file(str(path))) |
|
|
| @classmethod |
| def from_dict(cls, weights): |
| return cls(weights) |
|
|
| def forward(self, x): |
| if x.ndim == 3: |
| x = x[None] |
| B = x.shape[0] |
| h = _relu(_conv2d(x, self.conv0_w, self.conv0_b)) |
| h = _relu(_conv2d(h, self.conv1_w, self.conv1_b)) |
| h = h.reshape(B, -1) |
| h = _relu(_linear(h, self.fc1_w, self.fc1_b)) |
| h = _linear(h, self.fc2_w, self.fc2_b) |
| return h |
|
|
| def predict(self, H_field, V_field): |
| """Score from 20x20 field patches (CNN native resolution).""" |
| x = np.stack([H_field, V_field])[None].astype(np.float32) |
| return float(self.forward(x)[0, 0]) |
|
|
| def predict_manifold(self, H_256, V_256): |
| """Score from full 256x256 EVM execution manifold. Auto-downsamples.""" |
| H_down = _downsample(H_256.astype(np.float32), CNN_SPATIAL) |
| V_down = _downsample(V_256.astype(np.float32), CNN_SPATIAL) |
| return self.predict(H_down, V_down) |
|
|
| def predict_region(self, H_field, V_field, region=None): |
| """Score a region of any-sized energy landscape.""" |
| s = CNN_SPATIAL |
| if region is not None: |
| rx, ry, rw, rh = region |
| H_crop = H_field[rx:rx+rw, ry:ry+rh] |
| V_crop = V_field[rx:rx+rw, ry:ry+rh] |
| else: |
| cx = max(0, (H_field.shape[0] - s) // 2) |
| cy = max(0, (H_field.shape[1] - s) // 2) |
| H_crop = H_field[cx:cx+s, cy:cy+s] |
| V_crop = V_field[cx:cx+s, cy:cy+s] |
| H_down = _downsample(H_crop.astype(np.float32), s) |
| V_down = _downsample(V_crop.astype(np.float32), s) |
| return self.predict(H_down, V_down) |
|
|
| @staticmethod |
| def prime_refine(raw_score, seed=None): |
| """ |
| PRIME bistable attractor refinement. |
| Evolves raw CNN score through recursive Hamiltonian dynamics. |
| H* = -3.054 (BENIGN attractor), positive (THREAT attractor). |
| """ |
| rng = np.random.default_rng(seed) |
| H = (raw_score - 0.5) * 4.0 |
| trajectory = [H] |
|
|
| for t in range(PRIME_T): |
| sig = _sigmoid(H, PRIME_GAMMA) |
| noise = rng.normal(0, 1 + PRIME_BETA * abs(H)) |
| H = H + PRIME_ETA * H * sig + PRIME_SIGMA * noise |
| H = max(-10.0, min(10.0, H)) |
| trajectory.append(H) |
|
|
| refined = 1.0 / (1.0 + np.exp(-H)) |
|
|
| if refined > THREAT_THRESHOLD: |
| verdict = "THREAT" |
| confidence = refined |
| elif refined < BENIGN_THRESHOLD: |
| verdict = "BENIGN" |
| confidence = 1.0 - refined |
| else: |
| verdict = "UNCERTAIN" |
| confidence = 1.0 - 2.0 * abs(refined - 0.5) |
|
|
| return { |
| "verdict": verdict, |
| "confidence": round(float(confidence), 4), |
| "refined_score": round(float(refined), 6), |
| "raw_score": round(float(raw_score), 6), |
| "prime_iterations": PRIME_T, |
| "prime_params": {"eta": PRIME_ETA, "gamma": PRIME_GAMMA, |
| "beta": PRIME_BETA, "sigma": PRIME_SIGMA}, |
| "trajectory_final_10": [round(float(x), 4) for x in trajectory[-10:]], |
| } |
|
|
| def audit(self, H_manifold, V_manifold, seed=42): |
| """Full pipeline: manifold -> CNN -> PRIME refinement -> verdict.""" |
| if H_manifold.shape[0] > CNN_SPATIAL: |
| raw_score = self.predict_manifold(H_manifold, V_manifold) |
| else: |
| raw_score = self.predict(H_manifold.astype(np.float32), |
| V_manifold.astype(np.float32)) |
| result = self.prime_refine(raw_score, seed=seed) |
| result["manifold_size"] = list(H_manifold.shape) |
| result["cnn_input_size"] = [CNN_SPATIAL, CNN_SPATIAL] |
| return result |
|
|
| def __repr__(self): |
| return (f"Leviathan(channels=2, conv=[2->16->16], fc=[4096->64->1], " |
| f"params=264,897, manifold=256x256->{CNN_SPATIAL}x{CNN_SPATIAL}, " |
| f"PRIME=[eta={PRIME_ETA},gamma={PRIME_GAMMA},T={PRIME_T}])") |
|
|
|
|
| if __name__ == "__main__": |
| import sys |
| path = sys.argv[1] if len(sys.argv) > 1 else "leviathan_v2_session_trained.safetensors" |
| model = Leviathan.from_safetensors(path) |
| print(model) |
|
|
| print("\n=== 256x256 Manifold Audit ===") |
| H_256 = np.random.randn(256, 256).astype(np.float32) |
| V_256 = np.random.randn(256, 256).astype(np.float32) |
| result = model.audit(H_256, V_256) |
| print(f" Verdict: {result['verdict']}") |
| print(f" Raw: {result['raw_score']}") |
| print(f" Refined: {result['refined_score']}") |
| print(f" Confidence: {result['confidence']}") |
|
|
| print("\n=== 20x20 Direct ===") |
| score = model.predict(np.random.randn(20, 20).astype(np.float32), |
| np.zeros((20, 20), dtype=np.float32)) |
| print(f" Score: {score:.6f}") |
|
|
| print("\n=== PRIME Refinement Sweep ===") |
| for s in [0.0, 0.3, 0.5, 0.7, 1.0]: |
| r = Leviathan.prime_refine(s, seed=42) |
| print(f" input={s:.1f} -> {r['verdict']:>9} " |
| f"(refined={r['refined_score']:.4f}, conf={r['confidence']:.4f})") |
|
|