"""Tool 6/9: verify_equivalence — anti-cheating fuzzer. Per plan §10b, this is the single most important defense against the agent cheating by producing a fast-but-wrong implementation. 8 cheating modes defended: 1. Wrong algorithm with plausible output — random fuzz inputs 2. Edge-case overflow (int32 wraps int64) — typed inputs include int64, INT_MAX/MIN 3. Approximation drift — rtol=1e-5 (or rtol=0 per metadata) 4. Cached lookup table — seed randomized per call 5. Tail variance — 10% adversarial sub-pool 6. Returns 0 / empty — exact shape+dtype check 7. Detects benchmark context — same input pipeline as benchmarker 8. Side-channel access — sandboxed subprocess Returns: pass_rate ∈ [0, 1], first_failure dict, n_adversarial_failures. """ from __future__ import annotations import ast import random import warnings from typing import Any import numpy as np _ALLOWED_IMPORT_MODULES = {"math", "numpy"} _BANNED_CALLS = {"eval", "exec", "compile", "open", "__import__", "input"} def _safe_import(name, globals=None, locals=None, fromlist=(), level=0): root = name.split(".")[0] if root not in _ALLOWED_IMPORT_MODULES: raise RuntimeError(f"import '{name}' is not allowed in verifier") return __import__(name, globals, locals, fromlist, level) def _validate_python_code_safety(tree: ast.AST) -> None: """Reject high-risk constructs before running user-provided Python code.""" for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: root = alias.name.split(".")[0] if root not in _ALLOWED_IMPORT_MODULES: raise RuntimeError(f"import '{alias.name}' is not allowed in verifier") if isinstance(node, ast.ImportFrom): module = (node.module or "").split(".")[0] if module and module not in _ALLOWED_IMPORT_MODULES: raise RuntimeError(f"from '{node.module}' import ... is not allowed in verifier") if isinstance(node, ast.Call) and isinstance(node.func, ast.Name): if node.func.id in _BANNED_CALLS: raise RuntimeError(f"call '{node.func.id}(...)' is not allowed in verifier") def _safe_exec_function(python_code: str, fn_name: str): """Compile and execute Python in a constrained namespace, then return fn.""" tree = ast.parse(python_code) _validate_python_code_safety(tree) safe_builtins = { "abs": abs, "all": all, "any": any, "bool": bool, "dict": dict, "enumerate": enumerate, "Exception": Exception, "float": float, "int": int, "len": len, "list": list, "max": max, "min": min, "TypeError": TypeError, "pow": pow, "range": range, "round": round, "set": set, "sorted": sorted, "sum": sum, "tuple": tuple, "ValueError": ValueError, "__import__": _safe_import, "zip": zip, } ns: dict[str, Any] = {"__builtins__": safe_builtins, "np": np} exec(compile(tree, filename="", mode="exec"), ns) fn = ns.get(fn_name) if fn is None: raise RuntimeError(f"function '{fn_name}' not defined in python_code") return fn # ---------- Input generation from Python AST ---------- def _infer_input_signature(python_code: str) -> list[dict[str, str]]: """Inspect the Python function's signature + annotations to pick fuzz input types. Returns a list of {"name": str, "kind": "ndarray|int|float|list|str", "dtype": str}. Without explicit annotations, we fall back to ndarray of float64. """ try: tree = ast.parse(python_code) except SyntaxError: return [{"name": "x", "kind": "ndarray", "dtype": "float64"}] fn = next((n for n in tree.body if isinstance(n, ast.FunctionDef)), None) if fn is None: return [{"name": "x", "kind": "ndarray", "dtype": "float64"}] sig: list[dict[str, str]] = [] for arg in fn.args.args: ann = ast.unparse(arg.annotation) if arg.annotation else "" kind = "ndarray" dtype = "float64" if "int" in ann.lower() and "ndarray" not in ann.lower() and "list" not in ann.lower(): kind = "int" elif "float" in ann.lower() and "ndarray" not in ann.lower() and "list" not in ann.lower(): kind = "float" elif "list" in ann.lower(): kind = "list" elif "str" in ann.lower(): kind = "str" if "int32" in ann: dtype = "int32" elif "int64" in ann: dtype = "int64" elif "float32" in ann: dtype = "float32" sig.append({"name": arg.arg, "kind": kind, "dtype": dtype}) # Default fallback: assume one ndarray if not sig: sig = [{"name": "x", "kind": "ndarray", "dtype": "float64"}] return sig def _generate_typed_input(spec: dict[str, str], rng: np.random.Generator, adversarial: bool = False) -> Any: """Generate one input matching spec. If adversarial, sample boundary/edge values.""" kind = spec["kind"] dtype = spec["dtype"] if kind == "int": if adversarial: return int(rng.choice([0, 1, -1, 2**31 - 1, -(2**31), 2**62, -(2**62)])) return int(rng.integers(-1000, 1000)) if kind == "float": if adversarial: return float(rng.choice([0.0, -0.0, np.inf, -np.inf, np.nan, 1e-300, 1e300])) return float(rng.standard_normal()) if kind == "str": # Short ascii strings return "".join(chr(int(rng.integers(97, 123))) for _ in range(int(rng.integers(1, 16)))) # Default: ndarray n = int(rng.integers(10, 1000)) if adversarial: choices = [ np.zeros(n, dtype=dtype), np.ones(n, dtype=dtype), np.array([], dtype=dtype), # empty np.array([0.0], dtype=dtype), # singleton np.full(n, np.inf, dtype=dtype) if "float" in dtype else np.full(n, np.iinfo(np.dtype(dtype)).max, dtype=dtype), (rng.standard_normal(n) * 1e-300).astype(dtype) if "float" in dtype else rng.integers(-1, 2, n).astype(dtype), ] idx = int(rng.integers(0, len(choices))) return choices[idx] if "int" in dtype: return rng.integers(-100, 100, size=n).astype(dtype) return rng.standard_normal(n).astype(dtype) def _numerically_equivalent(a: Any, b: Any, rtol: float) -> bool: """Compare two outputs accounting for float tolerance, exact for int.""" if isinstance(a, (int, float)) and isinstance(b, (int, float)): if rtol == 0: return a == b if not np.isfinite(a) or not np.isfinite(b): return (np.isnan(a) and np.isnan(b)) or a == b return abs(a - b) <= rtol * (1 + abs(a)) try: a = np.asarray(a) b = np.asarray(b) except Exception: return a == b if a.shape != b.shape: return False if a.dtype != b.dtype: # We don't allow dtype-mismatch — that's a hard fail per plan §10b return False if rtol == 0: return bool(np.array_equal(a, b)) # Use allclose with NaN-equality return bool(np.allclose(a, b, rtol=rtol, atol=rtol * 0.1, equal_nan=True)) def _exec_python_in_sandbox(python_code: str, fn_name: str, args: tuple) -> Any: """Run python_code's function on args in a constrained namespace.""" fn = _safe_exec_function(python_code, fn_name) return fn(*args) def _exec_cpp_via_so(so_path: str, fn_name: str, args: tuple, py_fn=None, py_code: str = "") -> Any: """Load the compiled .so via ctypes and dispatch on `args`. The agent's C++ uses the canonical signature extern "C" void agent_function(const double*, size_t, double*, size_t); so we need the Python reference function to know the output shape. Either pass `py_fn` directly, or pass `py_code` and we'll compile it. Raises: RuntimeError: ctypes can't load the .so or symbol is missing """ from server.tools._runtime import call_compiled if py_fn is None: if not py_code: raise RuntimeError("verifier: need py_fn or py_code to dispatch C++") py_fn = _safe_exec_function(py_code, fn_name) return call_compiled(so_path, py_fn, args) def verify_equivalence_tool(tool_args: dict[str, Any], state) -> dict[str, Any]: """Fuzz-verify cpp_code against python_code on n_cases random + adversarial inputs. Args: cpp_code (str) — agent's C++ python_code (str) — reference Python (defaults to state.python_code) n_cases (int=1000) — total fuzz cases (10% adversarial sub-pool) rtol (float=1e-5) — float tolerance; 0 = bit-exact Returns: pass_rate (float) first_failure (dict | None) n_adversarial_failures (int) n_random_failures (int) seed (int) — randomized per call (defeats lookup tables) """ cpp_code = tool_args.get("cpp_code", "") python_code = tool_args.get("python_code") or state.python_code n_cases = int(tool_args.get("n_cases", 1000)) rtol = float(tool_args.get("rtol", state.rtol_override if state.rtol_override is not None else 1e-5)) if not cpp_code.strip(): return {"pass_rate": 0.0, "error": "empty cpp_code"} if n_cases <= 0: return {"pass_rate": 0.0, "error": "n_cases must be >= 1", "n_cases": n_cases} # Defeat lookup-table cheating mode 4: seed varies per call seed = random.randint(0, 2**32 - 1) rng = np.random.default_rng(seed) # Discover Python function name (first FunctionDef) try: tree = ast.parse(python_code) except SyntaxError as e: return {"pass_rate": 0.0, "error": f"python parse: {e}"} fn_node = next((n for n in tree.body if isinstance(n, ast.FunctionDef)), None) if fn_node is None: return {"pass_rate": 0.0, "error": "no function in python_code"} fn_name = fn_node.name sig = _infer_input_signature(python_code) # Compile (or get cached .so) — uses cpp_compiler tool's pathway from server.tools.cpp_compiler import _compile, _sha256 import json as _json cache_key = _sha256(cpp_code, _json.dumps(state.hardware_profile, sort_keys=True)) compile_result = _compile(cpp_code, state.hardware_profile, cache_key) if compile_result["status"] != "success": return { "pass_rate": 0.0, "error": f"cpp compile failed: {compile_result.get('error', '')[:300]}", "compile_status": compile_result["status"], } so_path = compile_result["so_path"] # Pre-load the Python reference function once (avoids repeated exec overhead) try: py_fn = _safe_exec_function(python_code, fn_name) except Exception as e: return {"pass_rate": 0.0, "error": f"python exec failed: {e}"} failures: list[dict[str, Any]] = [] n_adversarial_failures = 0 n_random_failures = 0 n_executed_cases = 0 n_adversarial_total = 0 n_random_total = 0 for i in range(n_cases): adversarial = (i % 10 == 9) # 10% adversarial sub-pool try: args = tuple(_generate_typed_input(spec, rng, adversarial=adversarial) for spec in sig) except Exception: continue # Skip if input generation itself fails # Run Python first; if it raises, skip (don't penalize the C++ for invalid input) try: # Adversarial fuzzing intentionally includes edge values (nan/inf/zero). # Suppress expected NumPy runtime warnings so job logs stay readable. with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) with np.errstate(divide="ignore", invalid="ignore", over="ignore", under="ignore"): py_out = py_fn(*args) except Exception: continue n_executed_cases += 1 if adversarial: n_adversarial_total += 1 else: n_random_total += 1 # Run C++ via ctypes dispatch — REAL execution now (not stub) try: cpp_out = _exec_cpp_via_so(so_path, fn_name, args, py_fn=py_fn) except Exception as e: if adversarial: n_adversarial_failures += 1 else: n_random_failures += 1 if not failures: failures.append({ "case": i, "reason": "cpp_exec_error", "error": str(e)[:200], "adversarial": adversarial, }) continue if not _numerically_equivalent(py_out, cpp_out, rtol): if adversarial: n_adversarial_failures += 1 else: n_random_failures += 1 if not failures: # Capture only first failure to bound observation size py_repr = repr(py_out)[:120] cpp_repr = repr(cpp_out)[:120] failures.append({ "case": i, "reason": "output_mismatch", "adversarial": adversarial, "py_out": py_repr, "cpp_out": cpp_repr, }) if n_executed_cases == 0: return { "pass_rate": 0.0, "n_cases": n_cases, "executed_cases": 0, "first_failure": failures[0] if failures else None, "n_adversarial_failures": n_adversarial_failures, "n_random_failures": n_random_failures, "adversarial_pass_rate": 0.0, "rtol_used": rtol, "seed": seed, "error": "insufficient_valid_cases", } pass_count = n_executed_cases - (n_adversarial_failures + n_random_failures) pass_rate = pass_count / n_executed_cases coverage = n_executed_cases / max(n_cases, 1) if coverage < 0.8: pass_rate = 0.0 adversarial_pass_rate = (n_adversarial_total - n_adversarial_failures) / max(n_adversarial_total, 1) return { "pass_rate": pass_rate, "n_cases": n_cases, "executed_cases": n_executed_cases, "coverage": coverage, "first_failure": failures[0] if failures else None, "n_adversarial_failures": n_adversarial_failures, "n_random_failures": n_random_failures, "adversarial_pass_rate": adversarial_pass_rate, "rtol_used": rtol, "seed": seed, } __all__ = ["verify_equivalence_tool", "_infer_input_signature", "_numerically_equivalent"]