File size: 9,821 Bytes
4bf4bf6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
"""ctypes-based runtime dispatch for compiled agent C++.

Replaces the Hour 4-10 stubs in cpp_compiler._benchmark_cpp and verifier._exec_cpp_via_so
with real measurement.

Canonical agent function signature (system-prompted, enforced by all training data):

    extern "C" void agent_function(
        const double* in_ptr,    // flattened input (all args concatenated to float64)
        size_t in_n,             // total input length
        double* out_ptr,         // preallocated output buffer (caller-allocated, agent fills)
        size_t out_n             // output buffer size
    );

This uniform signature trades some type richness (everything's float64) for:
- Simple ctypes binding (no per-function ABI generation)
- Trivial for the agent to write
- Covers all numeric training functions (sklearn loops, NumPy ops, math kernels)

Inputs/outputs are float64 (8 bytes). For integer functions we cast at the
boundary; for the few bit-exact integer functions in the trap library, the
fuzzer's `rtol=0` semantics still catch divergence (e.g., int overflow modes
that propagate as different float values).
"""

from __future__ import annotations

import ctypes
import time
from typing import Any, Callable

import numpy as np


# ---------------------- Argument marshalling ----------------------

def _flatten_args(args: tuple) -> tuple[np.ndarray, list]:
    """Concatenate all args into one flat float64 array; remember per-arg shapes for the agent.

    Returns:
        flat: a single contiguous float64 array (the in_ptr buffer)
        shapes: list of (kind, shape, dtype) for each arg — informational, not used by the
                ABI itself but useful for debugging
    """
    flats: list[np.ndarray] = []
    shapes: list[tuple] = []
    for a in args:
        if isinstance(a, np.ndarray):
            shapes.append(("ndarray", a.shape, a.dtype))
            flats.append(np.ascontiguousarray(a, dtype=np.float64).ravel())
        elif isinstance(a, (int, float, np.integer, np.floating)):
            shapes.append(("scalar", (), type(a)))
            flats.append(np.array([float(a)], dtype=np.float64))
        elif isinstance(a, (list, tuple)):
            arr = np.array(a, dtype=np.float64)
            shapes.append(("list", arr.shape, np.float64))
            flats.append(arr.ravel())
        else:
            raise TypeError(f"unsupported arg type for agent_function: {type(a).__name__}")
    if not flats:
        return np.array([], dtype=np.float64), shapes
    return np.concatenate(flats).astype(np.float64, copy=False), shapes


def _infer_output_meta(py_fn: Callable, args: tuple) -> dict[str, Any]:
    """Run py_fn once to discover output shape + dtype. Used to size the C++ output buffer."""
    out = py_fn(*args)
    if isinstance(out, (int, np.integer)):
        return {"kind": "int", "size": 1, "shape": (), "dtype": int}
    if isinstance(out, (float, np.floating)):
        return {"kind": "float", "size": 1, "shape": (), "dtype": float}
    if isinstance(out, np.ndarray):
        return {"kind": "ndarray", "size": int(out.size), "shape": tuple(out.shape), "dtype": out.dtype}
    if isinstance(out, (list, tuple)):
        arr = np.array(out, dtype=np.float64)
        return {"kind": "list", "size": int(arr.size), "shape": tuple(arr.shape), "dtype": np.float64}
    raise TypeError(f"unsupported py_fn output type: {type(out).__name__}")


def _reshape_cpp_output(out_arr: np.ndarray, meta: dict[str, Any]) -> Any:
    """Reshape the flat output buffer back to py_fn's original output kind/shape."""
    if meta["kind"] == "int":
        return int(round(float(out_arr[0])))
    if meta["kind"] == "float":
        return float(out_arr[0])
    if meta["kind"] == "ndarray":
        return out_arr[: meta["size"]].reshape(meta["shape"]).astype(meta["dtype"], copy=False)
    if meta["kind"] == "list":
        return out_arr[: meta["size"]].reshape(meta["shape"]).tolist()
    return out_arr


# ---------------------- .so loader (cached) ----------------------

class _SOLoader:
    """Cache loaded ctypes libraries by path. Each .so loaded only once."""
    _cache: dict[str, ctypes.CDLL] = {}

    @classmethod
    def load(cls, so_path: str) -> ctypes.CDLL:
        if so_path in cls._cache:
            return cls._cache[so_path]
        lib = ctypes.CDLL(so_path)
        if not hasattr(lib, "agent_function"):
            raise RuntimeError(f"{so_path} does not export `agent_function`")
        lib.agent_function.argtypes = [
            ctypes.POINTER(ctypes.c_double),  # in_ptr
            ctypes.c_size_t,                  # in_n
            ctypes.POINTER(ctypes.c_double),  # out_ptr
            ctypes.c_size_t,                  # out_n
        ]
        lib.agent_function.restype = None
        cls._cache[so_path] = lib
        return lib

    @classmethod
    def clear(cls) -> None:
        cls._cache.clear()


# ---------------------- Public dispatch API ----------------------

def call_compiled(so_path: str, py_fn: Callable, args: tuple) -> Any:
    """Call agent_function in the .so on args. Return value matches py_fn's output shape.

    Raises:
        RuntimeError: if .so can't be loaded or `agent_function` symbol is missing
    """
    lib = _SOLoader.load(so_path)

    in_flat, _ = _flatten_args(args)
    in_arr = np.ascontiguousarray(in_flat, dtype=np.float64)
    in_ptr = in_arr.ctypes.data_as(ctypes.POINTER(ctypes.c_double))

    out_meta = _infer_output_meta(py_fn, args)
    out_arr = np.zeros(out_meta["size"], dtype=np.float64)
    out_ptr = out_arr.ctypes.data_as(ctypes.POINTER(ctypes.c_double))

    lib.agent_function(in_ptr, ctypes.c_size_t(in_arr.size),
                       out_ptr, ctypes.c_size_t(out_meta["size"]))

    return _reshape_cpp_output(out_arr, out_meta)


def benchmark_python_vs_cpp(
    so_path: str,
    py_fn: Callable,
    args: tuple,
    n_per_repeat: int = 5,
    repeats: int = 3,
) -> dict[str, float]:
    """Median-of-(repeats×n_per_repeat) wall time for both Python and C++ on the SAME args.

    Returns:
        py_median_ms: float — median ms per Python call
        cpp_median_ms: float — median ms per C++ call (via ctypes)
        speedup: float — py_median_ms / cpp_median_ms
    """
    lib = _SOLoader.load(so_path)

    # Pre-flatten inputs ONCE — re-flattening would pollute timing
    in_flat, _ = _flatten_args(args)
    in_arr = np.ascontiguousarray(in_flat, dtype=np.float64)
    in_ptr = in_arr.ctypes.data_as(ctypes.POINTER(ctypes.c_double))

    out_meta = _infer_output_meta(py_fn, args)
    out_arr = np.zeros(out_meta["size"], dtype=np.float64)
    out_ptr = out_arr.ctypes.data_as(ctypes.POINTER(ctypes.c_double))

    in_n = ctypes.c_size_t(in_arr.size)
    out_n = ctypes.c_size_t(out_meta["size"])

    # ---- Python timing ----
    py_times: list[float] = []
    for _ in range(repeats):
        t0 = time.perf_counter()
        for _ in range(n_per_repeat):
            py_fn(*args)
        elapsed = time.perf_counter() - t0
        py_times.append((elapsed / n_per_repeat) * 1000)
    py_times.sort()
    py_median = py_times[len(py_times) // 2]

    # ---- C++ timing ----
    cpp_times: list[float] = []
    for _ in range(repeats):
        t0 = time.perf_counter()
        for _ in range(n_per_repeat):
            lib.agent_function(in_ptr, in_n, out_ptr, out_n)
        elapsed = time.perf_counter() - t0
        cpp_times.append((elapsed / n_per_repeat) * 1000)
    cpp_times.sort()
    cpp_median = cpp_times[len(cpp_times) // 2]

    return {
        "py_median_ms": py_median,
        "cpp_median_ms": cpp_median,
        "speedup": py_median / max(cpp_median, 1e-6),
        "n_per_repeat": n_per_repeat,
        "repeats": repeats,
    }


def time_python_only(py_fn: Callable, args: tuple, n_per_repeat: int = 5, repeats: int = 3) -> float:
    """Pure Python baseline timing (no .so needed). Returns median ms per call."""
    times: list[float] = []
    for _ in range(repeats):
        t0 = time.perf_counter()
        for _ in range(n_per_repeat):
            py_fn(*args)
        times.append((time.perf_counter() - t0) / n_per_repeat * 1000)
    times.sort()
    return times[len(times) // 2]


# ---------------------- Sample-input synthesizer ----------------------

def make_default_args_for(py_fn: Callable, n: int = 1024, seed: int = 0) -> tuple:
    """Construct a default (numeric ndarray + scalars) arg tuple for py_fn from its signature.

    Used for the benchmark baseline when no specific input is provided.
    Falls back to a 1024-element float64 array if introspection fails.
    """
    import inspect
    rng = np.random.default_rng(seed)
    try:
        sig = inspect.signature(py_fn)
        params = list(sig.parameters.values())
    except (ValueError, TypeError):
        return (rng.standard_normal(n).astype(np.float64),)

    out = []
    for p in params:
        ann = str(p.annotation).lower() if p.annotation is not inspect.Parameter.empty else ""
        default = p.default if p.default is not inspect.Parameter.empty else None
        if "int" in ann and "ndarray" not in ann and "list" not in ann:
            out.append(default if isinstance(default, int) else int(rng.integers(2, 16)))
        elif "float" in ann and "ndarray" not in ann and "list" not in ann:
            out.append(default if isinstance(default, float) else float(rng.standard_normal()))
        elif "list" in ann or "ndarray" in ann or ann == "":
            out.append(rng.standard_normal(n).astype(np.float64))
        elif "str" in ann:
            out.append("hello world")
        else:
            out.append(rng.standard_normal(n).astype(np.float64))
    return tuple(out)


__all__ = [
    "call_compiled",
    "benchmark_python_vs_cpp",
    "time_python_only",
    "make_default_args_for",
    "_SOLoader",
]