Swastikr's picture
Upload folder using huggingface_hub
4bf4bf6 verified
"""ctypes-based runtime dispatch for compiled agent C++.
Replaces the Hour 4-10 stubs in cpp_compiler._benchmark_cpp and verifier._exec_cpp_via_so
with real measurement.
Canonical agent function signature (system-prompted, enforced by all training data):
extern "C" void agent_function(
const double* in_ptr, // flattened input (all args concatenated to float64)
size_t in_n, // total input length
double* out_ptr, // preallocated output buffer (caller-allocated, agent fills)
size_t out_n // output buffer size
);
This uniform signature trades some type richness (everything's float64) for:
- Simple ctypes binding (no per-function ABI generation)
- Trivial for the agent to write
- Covers all numeric training functions (sklearn loops, NumPy ops, math kernels)
Inputs/outputs are float64 (8 bytes). For integer functions we cast at the
boundary; for the few bit-exact integer functions in the trap library, the
fuzzer's `rtol=0` semantics still catch divergence (e.g., int overflow modes
that propagate as different float values).
"""
from __future__ import annotations
import ctypes
import time
from typing import Any, Callable
import numpy as np
# ---------------------- Argument marshalling ----------------------
def _flatten_args(args: tuple) -> tuple[np.ndarray, list]:
"""Concatenate all args into one flat float64 array; remember per-arg shapes for the agent.
Returns:
flat: a single contiguous float64 array (the in_ptr buffer)
shapes: list of (kind, shape, dtype) for each arg — informational, not used by the
ABI itself but useful for debugging
"""
flats: list[np.ndarray] = []
shapes: list[tuple] = []
for a in args:
if isinstance(a, np.ndarray):
shapes.append(("ndarray", a.shape, a.dtype))
flats.append(np.ascontiguousarray(a, dtype=np.float64).ravel())
elif isinstance(a, (int, float, np.integer, np.floating)):
shapes.append(("scalar", (), type(a)))
flats.append(np.array([float(a)], dtype=np.float64))
elif isinstance(a, (list, tuple)):
arr = np.array(a, dtype=np.float64)
shapes.append(("list", arr.shape, np.float64))
flats.append(arr.ravel())
else:
raise TypeError(f"unsupported arg type for agent_function: {type(a).__name__}")
if not flats:
return np.array([], dtype=np.float64), shapes
return np.concatenate(flats).astype(np.float64, copy=False), shapes
def _infer_output_meta(py_fn: Callable, args: tuple) -> dict[str, Any]:
"""Run py_fn once to discover output shape + dtype. Used to size the C++ output buffer."""
out = py_fn(*args)
if isinstance(out, (int, np.integer)):
return {"kind": "int", "size": 1, "shape": (), "dtype": int}
if isinstance(out, (float, np.floating)):
return {"kind": "float", "size": 1, "shape": (), "dtype": float}
if isinstance(out, np.ndarray):
return {"kind": "ndarray", "size": int(out.size), "shape": tuple(out.shape), "dtype": out.dtype}
if isinstance(out, (list, tuple)):
arr = np.array(out, dtype=np.float64)
return {"kind": "list", "size": int(arr.size), "shape": tuple(arr.shape), "dtype": np.float64}
raise TypeError(f"unsupported py_fn output type: {type(out).__name__}")
def _reshape_cpp_output(out_arr: np.ndarray, meta: dict[str, Any]) -> Any:
"""Reshape the flat output buffer back to py_fn's original output kind/shape."""
if meta["kind"] == "int":
return int(round(float(out_arr[0])))
if meta["kind"] == "float":
return float(out_arr[0])
if meta["kind"] == "ndarray":
return out_arr[: meta["size"]].reshape(meta["shape"]).astype(meta["dtype"], copy=False)
if meta["kind"] == "list":
return out_arr[: meta["size"]].reshape(meta["shape"]).tolist()
return out_arr
# ---------------------- .so loader (cached) ----------------------
class _SOLoader:
"""Cache loaded ctypes libraries by path. Each .so loaded only once."""
_cache: dict[str, ctypes.CDLL] = {}
@classmethod
def load(cls, so_path: str) -> ctypes.CDLL:
if so_path in cls._cache:
return cls._cache[so_path]
lib = ctypes.CDLL(so_path)
if not hasattr(lib, "agent_function"):
raise RuntimeError(f"{so_path} does not export `agent_function`")
lib.agent_function.argtypes = [
ctypes.POINTER(ctypes.c_double), # in_ptr
ctypes.c_size_t, # in_n
ctypes.POINTER(ctypes.c_double), # out_ptr
ctypes.c_size_t, # out_n
]
lib.agent_function.restype = None
cls._cache[so_path] = lib
return lib
@classmethod
def clear(cls) -> None:
cls._cache.clear()
# ---------------------- Public dispatch API ----------------------
def call_compiled(so_path: str, py_fn: Callable, args: tuple) -> Any:
"""Call agent_function in the .so on args. Return value matches py_fn's output shape.
Raises:
RuntimeError: if .so can't be loaded or `agent_function` symbol is missing
"""
lib = _SOLoader.load(so_path)
in_flat, _ = _flatten_args(args)
in_arr = np.ascontiguousarray(in_flat, dtype=np.float64)
in_ptr = in_arr.ctypes.data_as(ctypes.POINTER(ctypes.c_double))
out_meta = _infer_output_meta(py_fn, args)
out_arr = np.zeros(out_meta["size"], dtype=np.float64)
out_ptr = out_arr.ctypes.data_as(ctypes.POINTER(ctypes.c_double))
lib.agent_function(in_ptr, ctypes.c_size_t(in_arr.size),
out_ptr, ctypes.c_size_t(out_meta["size"]))
return _reshape_cpp_output(out_arr, out_meta)
def benchmark_python_vs_cpp(
so_path: str,
py_fn: Callable,
args: tuple,
n_per_repeat: int = 5,
repeats: int = 3,
) -> dict[str, float]:
"""Median-of-(repeats×n_per_repeat) wall time for both Python and C++ on the SAME args.
Returns:
py_median_ms: float — median ms per Python call
cpp_median_ms: float — median ms per C++ call (via ctypes)
speedup: float — py_median_ms / cpp_median_ms
"""
lib = _SOLoader.load(so_path)
# Pre-flatten inputs ONCE — re-flattening would pollute timing
in_flat, _ = _flatten_args(args)
in_arr = np.ascontiguousarray(in_flat, dtype=np.float64)
in_ptr = in_arr.ctypes.data_as(ctypes.POINTER(ctypes.c_double))
out_meta = _infer_output_meta(py_fn, args)
out_arr = np.zeros(out_meta["size"], dtype=np.float64)
out_ptr = out_arr.ctypes.data_as(ctypes.POINTER(ctypes.c_double))
in_n = ctypes.c_size_t(in_arr.size)
out_n = ctypes.c_size_t(out_meta["size"])
# ---- Python timing ----
py_times: list[float] = []
for _ in range(repeats):
t0 = time.perf_counter()
for _ in range(n_per_repeat):
py_fn(*args)
elapsed = time.perf_counter() - t0
py_times.append((elapsed / n_per_repeat) * 1000)
py_times.sort()
py_median = py_times[len(py_times) // 2]
# ---- C++ timing ----
cpp_times: list[float] = []
for _ in range(repeats):
t0 = time.perf_counter()
for _ in range(n_per_repeat):
lib.agent_function(in_ptr, in_n, out_ptr, out_n)
elapsed = time.perf_counter() - t0
cpp_times.append((elapsed / n_per_repeat) * 1000)
cpp_times.sort()
cpp_median = cpp_times[len(cpp_times) // 2]
return {
"py_median_ms": py_median,
"cpp_median_ms": cpp_median,
"speedup": py_median / max(cpp_median, 1e-6),
"n_per_repeat": n_per_repeat,
"repeats": repeats,
}
def time_python_only(py_fn: Callable, args: tuple, n_per_repeat: int = 5, repeats: int = 3) -> float:
"""Pure Python baseline timing (no .so needed). Returns median ms per call."""
times: list[float] = []
for _ in range(repeats):
t0 = time.perf_counter()
for _ in range(n_per_repeat):
py_fn(*args)
times.append((time.perf_counter() - t0) / n_per_repeat * 1000)
times.sort()
return times[len(times) // 2]
# ---------------------- Sample-input synthesizer ----------------------
def make_default_args_for(py_fn: Callable, n: int = 1024, seed: int = 0) -> tuple:
"""Construct a default (numeric ndarray + scalars) arg tuple for py_fn from its signature.
Used for the benchmark baseline when no specific input is provided.
Falls back to a 1024-element float64 array if introspection fails.
"""
import inspect
rng = np.random.default_rng(seed)
try:
sig = inspect.signature(py_fn)
params = list(sig.parameters.values())
except (ValueError, TypeError):
return (rng.standard_normal(n).astype(np.float64),)
out = []
for p in params:
ann = str(p.annotation).lower() if p.annotation is not inspect.Parameter.empty else ""
default = p.default if p.default is not inspect.Parameter.empty else None
if "int" in ann and "ndarray" not in ann and "list" not in ann:
out.append(default if isinstance(default, int) else int(rng.integers(2, 16)))
elif "float" in ann and "ndarray" not in ann and "list" not in ann:
out.append(default if isinstance(default, float) else float(rng.standard_normal()))
elif "list" in ann or "ndarray" in ann or ann == "":
out.append(rng.standard_normal(n).astype(np.float64))
elif "str" in ann:
out.append("hello world")
else:
out.append(rng.standard_normal(n).astype(np.float64))
return tuple(out)
__all__ = [
"call_compiled",
"benchmark_python_vs_cpp",
"time_python_only",
"make_default_args_for",
"_SOLoader",
]