Spaces:
Sleeping
Sleeping
| """Tool 6/9: verify_equivalence — anti-cheating fuzzer. | |
| Per plan §10b, this is the single most important defense against the agent | |
| cheating by producing a fast-but-wrong implementation. | |
| 8 cheating modes defended: | |
| 1. Wrong algorithm with plausible output — random fuzz inputs | |
| 2. Edge-case overflow (int32 wraps int64) — typed inputs include int64, INT_MAX/MIN | |
| 3. Approximation drift — rtol=1e-5 (or rtol=0 per metadata) | |
| 4. Cached lookup table — seed randomized per call | |
| 5. Tail variance — 10% adversarial sub-pool | |
| 6. Returns 0 / empty — exact shape+dtype check | |
| 7. Detects benchmark context — same input pipeline as benchmarker | |
| 8. Side-channel access — sandboxed subprocess | |
| Returns: pass_rate ∈ [0, 1], first_failure dict, n_adversarial_failures. | |
| """ | |
| from __future__ import annotations | |
| import ast | |
| import random | |
| import warnings | |
| from typing import Any | |
| import numpy as np | |
| _ALLOWED_IMPORT_MODULES = {"math", "numpy"} | |
| _BANNED_CALLS = {"eval", "exec", "compile", "open", "__import__", "input"} | |
| def _safe_import(name, globals=None, locals=None, fromlist=(), level=0): | |
| root = name.split(".")[0] | |
| if root not in _ALLOWED_IMPORT_MODULES: | |
| raise RuntimeError(f"import '{name}' is not allowed in verifier") | |
| return __import__(name, globals, locals, fromlist, level) | |
| def _validate_python_code_safety(tree: ast.AST) -> None: | |
| """Reject high-risk constructs before running user-provided Python code.""" | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| root = alias.name.split(".")[0] | |
| if root not in _ALLOWED_IMPORT_MODULES: | |
| raise RuntimeError(f"import '{alias.name}' is not allowed in verifier") | |
| if isinstance(node, ast.ImportFrom): | |
| module = (node.module or "").split(".")[0] | |
| if module and module not in _ALLOWED_IMPORT_MODULES: | |
| raise RuntimeError(f"from '{node.module}' import ... is not allowed in verifier") | |
| if isinstance(node, ast.Call) and isinstance(node.func, ast.Name): | |
| if node.func.id in _BANNED_CALLS: | |
| raise RuntimeError(f"call '{node.func.id}(...)' is not allowed in verifier") | |
| def _safe_exec_function(python_code: str, fn_name: str): | |
| """Compile and execute Python in a constrained namespace, then return fn.""" | |
| tree = ast.parse(python_code) | |
| _validate_python_code_safety(tree) | |
| safe_builtins = { | |
| "abs": abs, | |
| "all": all, | |
| "any": any, | |
| "bool": bool, | |
| "dict": dict, | |
| "enumerate": enumerate, | |
| "Exception": Exception, | |
| "float": float, | |
| "int": int, | |
| "len": len, | |
| "list": list, | |
| "max": max, | |
| "min": min, | |
| "TypeError": TypeError, | |
| "pow": pow, | |
| "range": range, | |
| "round": round, | |
| "set": set, | |
| "sorted": sorted, | |
| "sum": sum, | |
| "tuple": tuple, | |
| "ValueError": ValueError, | |
| "__import__": _safe_import, | |
| "zip": zip, | |
| } | |
| ns: dict[str, Any] = {"__builtins__": safe_builtins, "np": np} | |
| exec(compile(tree, filename="<verifier_python>", mode="exec"), ns) | |
| fn = ns.get(fn_name) | |
| if fn is None: | |
| raise RuntimeError(f"function '{fn_name}' not defined in python_code") | |
| return fn | |
| # ---------- Input generation from Python AST ---------- | |
| def _infer_input_signature(python_code: str) -> list[dict[str, str]]: | |
| """Inspect the Python function's signature + annotations to pick fuzz input types. | |
| Returns a list of {"name": str, "kind": "ndarray|int|float|list|str", "dtype": str}. | |
| Without explicit annotations, we fall back to ndarray of float64. | |
| """ | |
| try: | |
| tree = ast.parse(python_code) | |
| except SyntaxError: | |
| return [{"name": "x", "kind": "ndarray", "dtype": "float64"}] | |
| fn = next((n for n in tree.body if isinstance(n, ast.FunctionDef)), None) | |
| if fn is None: | |
| return [{"name": "x", "kind": "ndarray", "dtype": "float64"}] | |
| sig: list[dict[str, str]] = [] | |
| for arg in fn.args.args: | |
| ann = ast.unparse(arg.annotation) if arg.annotation else "" | |
| kind = "ndarray" | |
| dtype = "float64" | |
| if "int" in ann.lower() and "ndarray" not in ann.lower() and "list" not in ann.lower(): | |
| kind = "int" | |
| elif "float" in ann.lower() and "ndarray" not in ann.lower() and "list" not in ann.lower(): | |
| kind = "float" | |
| elif "list" in ann.lower(): | |
| kind = "list" | |
| elif "str" in ann.lower(): | |
| kind = "str" | |
| if "int32" in ann: | |
| dtype = "int32" | |
| elif "int64" in ann: | |
| dtype = "int64" | |
| elif "float32" in ann: | |
| dtype = "float32" | |
| sig.append({"name": arg.arg, "kind": kind, "dtype": dtype}) | |
| # Default fallback: assume one ndarray | |
| if not sig: | |
| sig = [{"name": "x", "kind": "ndarray", "dtype": "float64"}] | |
| return sig | |
| def _generate_typed_input(spec: dict[str, str], rng: np.random.Generator, adversarial: bool = False) -> Any: | |
| """Generate one input matching spec. If adversarial, sample boundary/edge values.""" | |
| kind = spec["kind"] | |
| dtype = spec["dtype"] | |
| if kind == "int": | |
| if adversarial: | |
| return int(rng.choice([0, 1, -1, 2**31 - 1, -(2**31), 2**62, -(2**62)])) | |
| return int(rng.integers(-1000, 1000)) | |
| if kind == "float": | |
| if adversarial: | |
| return float(rng.choice([0.0, -0.0, np.inf, -np.inf, np.nan, 1e-300, 1e300])) | |
| return float(rng.standard_normal()) | |
| if kind == "str": | |
| # Short ascii strings | |
| return "".join(chr(int(rng.integers(97, 123))) for _ in range(int(rng.integers(1, 16)))) | |
| # Default: ndarray | |
| n = int(rng.integers(10, 1000)) | |
| if adversarial: | |
| choices = [ | |
| np.zeros(n, dtype=dtype), | |
| np.ones(n, dtype=dtype), | |
| np.array([], dtype=dtype), # empty | |
| np.array([0.0], dtype=dtype), # singleton | |
| np.full(n, np.inf, dtype=dtype) if "float" in dtype else np.full(n, np.iinfo(np.dtype(dtype)).max, dtype=dtype), | |
| (rng.standard_normal(n) * 1e-300).astype(dtype) if "float" in dtype else rng.integers(-1, 2, n).astype(dtype), | |
| ] | |
| idx = int(rng.integers(0, len(choices))) | |
| return choices[idx] | |
| if "int" in dtype: | |
| return rng.integers(-100, 100, size=n).astype(dtype) | |
| return rng.standard_normal(n).astype(dtype) | |
| def _numerically_equivalent(a: Any, b: Any, rtol: float) -> bool: | |
| """Compare two outputs accounting for float tolerance, exact for int.""" | |
| if isinstance(a, (int, float)) and isinstance(b, (int, float)): | |
| if rtol == 0: | |
| return a == b | |
| if not np.isfinite(a) or not np.isfinite(b): | |
| return (np.isnan(a) and np.isnan(b)) or a == b | |
| return abs(a - b) <= rtol * (1 + abs(a)) | |
| try: | |
| a = np.asarray(a) | |
| b = np.asarray(b) | |
| except Exception: | |
| return a == b | |
| if a.shape != b.shape: | |
| return False | |
| if a.dtype != b.dtype: | |
| # We don't allow dtype-mismatch — that's a hard fail per plan §10b | |
| return False | |
| if rtol == 0: | |
| return bool(np.array_equal(a, b)) | |
| # Use allclose with NaN-equality | |
| return bool(np.allclose(a, b, rtol=rtol, atol=rtol * 0.1, equal_nan=True)) | |
| def _exec_python_in_sandbox(python_code: str, fn_name: str, args: tuple) -> Any: | |
| """Run python_code's function on args in a constrained namespace.""" | |
| fn = _safe_exec_function(python_code, fn_name) | |
| return fn(*args) | |
| def _exec_cpp_via_so(so_path: str, fn_name: str, args: tuple, py_fn=None, py_code: str = "") -> Any: | |
| """Load the compiled .so via ctypes and dispatch on `args`. | |
| The agent's C++ uses the canonical signature | |
| extern "C" void agent_function(const double*, size_t, double*, size_t); | |
| so we need the Python reference function to know the output shape. Either | |
| pass `py_fn` directly, or pass `py_code` and we'll compile it. | |
| Raises: | |
| RuntimeError: ctypes can't load the .so or symbol is missing | |
| """ | |
| from server.tools._runtime import call_compiled | |
| if py_fn is None: | |
| if not py_code: | |
| raise RuntimeError("verifier: need py_fn or py_code to dispatch C++") | |
| py_fn = _safe_exec_function(py_code, fn_name) | |
| return call_compiled(so_path, py_fn, args) | |
| def verify_equivalence_tool(tool_args: dict[str, Any], state) -> dict[str, Any]: | |
| """Fuzz-verify cpp_code against python_code on n_cases random + adversarial inputs. | |
| Args: | |
| cpp_code (str) — agent's C++ | |
| python_code (str) — reference Python (defaults to state.python_code) | |
| n_cases (int=1000) — total fuzz cases (10% adversarial sub-pool) | |
| rtol (float=1e-5) — float tolerance; 0 = bit-exact | |
| Returns: | |
| pass_rate (float) | |
| first_failure (dict | None) | |
| n_adversarial_failures (int) | |
| n_random_failures (int) | |
| seed (int) — randomized per call (defeats lookup tables) | |
| """ | |
| cpp_code = tool_args.get("cpp_code", "") | |
| python_code = tool_args.get("python_code") or state.python_code | |
| n_cases = int(tool_args.get("n_cases", 1000)) | |
| rtol = float(tool_args.get("rtol", state.rtol_override if state.rtol_override is not None else 1e-5)) | |
| if not cpp_code.strip(): | |
| return {"pass_rate": 0.0, "error": "empty cpp_code"} | |
| if n_cases <= 0: | |
| return {"pass_rate": 0.0, "error": "n_cases must be >= 1", "n_cases": n_cases} | |
| # Defeat lookup-table cheating mode 4: seed varies per call | |
| seed = random.randint(0, 2**32 - 1) | |
| rng = np.random.default_rng(seed) | |
| # Discover Python function name (first FunctionDef) | |
| try: | |
| tree = ast.parse(python_code) | |
| except SyntaxError as e: | |
| return {"pass_rate": 0.0, "error": f"python parse: {e}"} | |
| fn_node = next((n for n in tree.body if isinstance(n, ast.FunctionDef)), None) | |
| if fn_node is None: | |
| return {"pass_rate": 0.0, "error": "no function in python_code"} | |
| fn_name = fn_node.name | |
| sig = _infer_input_signature(python_code) | |
| # Compile (or get cached .so) — uses cpp_compiler tool's pathway | |
| from server.tools.cpp_compiler import _compile, _sha256 | |
| import json as _json | |
| cache_key = _sha256(cpp_code, _json.dumps(state.hardware_profile, sort_keys=True)) | |
| compile_result = _compile(cpp_code, state.hardware_profile, cache_key) | |
| if compile_result["status"] != "success": | |
| return { | |
| "pass_rate": 0.0, | |
| "error": f"cpp compile failed: {compile_result.get('error', '')[:300]}", | |
| "compile_status": compile_result["status"], | |
| } | |
| so_path = compile_result["so_path"] | |
| # Pre-load the Python reference function once (avoids repeated exec overhead) | |
| try: | |
| py_fn = _safe_exec_function(python_code, fn_name) | |
| except Exception as e: | |
| return {"pass_rate": 0.0, "error": f"python exec failed: {e}"} | |
| failures: list[dict[str, Any]] = [] | |
| n_adversarial_failures = 0 | |
| n_random_failures = 0 | |
| n_executed_cases = 0 | |
| n_adversarial_total = 0 | |
| n_random_total = 0 | |
| for i in range(n_cases): | |
| adversarial = (i % 10 == 9) # 10% adversarial sub-pool | |
| try: | |
| args = tuple(_generate_typed_input(spec, rng, adversarial=adversarial) for spec in sig) | |
| except Exception: | |
| continue # Skip if input generation itself fails | |
| # Run Python first; if it raises, skip (don't penalize the C++ for invalid input) | |
| try: | |
| # Adversarial fuzzing intentionally includes edge values (nan/inf/zero). | |
| # Suppress expected NumPy runtime warnings so job logs stay readable. | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore", RuntimeWarning) | |
| with np.errstate(divide="ignore", invalid="ignore", over="ignore", under="ignore"): | |
| py_out = py_fn(*args) | |
| except Exception: | |
| continue | |
| n_executed_cases += 1 | |
| if adversarial: | |
| n_adversarial_total += 1 | |
| else: | |
| n_random_total += 1 | |
| # Run C++ via ctypes dispatch — REAL execution now (not stub) | |
| try: | |
| cpp_out = _exec_cpp_via_so(so_path, fn_name, args, py_fn=py_fn) | |
| except Exception as e: | |
| if adversarial: | |
| n_adversarial_failures += 1 | |
| else: | |
| n_random_failures += 1 | |
| if not failures: | |
| failures.append({ | |
| "case": i, "reason": "cpp_exec_error", "error": str(e)[:200], | |
| "adversarial": adversarial, | |
| }) | |
| continue | |
| if not _numerically_equivalent(py_out, cpp_out, rtol): | |
| if adversarial: | |
| n_adversarial_failures += 1 | |
| else: | |
| n_random_failures += 1 | |
| if not failures: | |
| # Capture only first failure to bound observation size | |
| py_repr = repr(py_out)[:120] | |
| cpp_repr = repr(cpp_out)[:120] | |
| failures.append({ | |
| "case": i, "reason": "output_mismatch", | |
| "adversarial": adversarial, | |
| "py_out": py_repr, "cpp_out": cpp_repr, | |
| }) | |
| if n_executed_cases == 0: | |
| return { | |
| "pass_rate": 0.0, | |
| "n_cases": n_cases, | |
| "executed_cases": 0, | |
| "first_failure": failures[0] if failures else None, | |
| "n_adversarial_failures": n_adversarial_failures, | |
| "n_random_failures": n_random_failures, | |
| "adversarial_pass_rate": 0.0, | |
| "rtol_used": rtol, | |
| "seed": seed, | |
| "error": "insufficient_valid_cases", | |
| } | |
| pass_count = n_executed_cases - (n_adversarial_failures + n_random_failures) | |
| pass_rate = pass_count / n_executed_cases | |
| coverage = n_executed_cases / max(n_cases, 1) | |
| if coverage < 0.8: | |
| pass_rate = 0.0 | |
| adversarial_pass_rate = (n_adversarial_total - n_adversarial_failures) / max(n_adversarial_total, 1) | |
| return { | |
| "pass_rate": pass_rate, | |
| "n_cases": n_cases, | |
| "executed_cases": n_executed_cases, | |
| "coverage": coverage, | |
| "first_failure": failures[0] if failures else None, | |
| "n_adversarial_failures": n_adversarial_failures, | |
| "n_random_failures": n_random_failures, | |
| "adversarial_pass_rate": adversarial_pass_rate, | |
| "rtol_used": rtol, | |
| "seed": seed, | |
| } | |
| __all__ = ["verify_equivalence_tool", "_infer_input_signature", "_numerically_equivalent"] | |