File size: 11,001 Bytes
0404756
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
"""
Memory Benchmark Module
内存性能测试:带宽测试、延迟测试、缓存性能
Optimized with ctypes for raw C-level performance
"""

import time
import ctypes
import multiprocessing
import mmap
import os
import numpy as np  # Keep for latency/cache tests
from concurrent.futures import ProcessPoolExecutor
from typing import Dict, Any

# Load C standard library
try:
    libc = ctypes.CDLL("libc.so.6")
    libc.memset.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_size_t]
    libc.memcpy.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_size_t]
    libc.memchr.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_size_t]
except Exception:
    libc = None

# --- C Extension Handling ---
C_LIB_PATH = os.path.join(os.path.dirname(__file__), "_memory_bench_c.so")
C_SRC_PATH = os.path.join(os.path.dirname(__file__), "memory_bench_c.c")

def _compile_c_helper():
    """Compiles the C helper library if it doesn't exist or is outdated."""
    if not os.path.exists(C_SRC_PATH):
        return None
        
    needs_compile = False
    if not os.path.exists(C_LIB_PATH):
        needs_compile = True
    else:
        # Check timestamps
        if os.path.getmtime(C_SRC_PATH) > os.path.getmtime(C_LIB_PATH):
            needs_compile = True
            
    if needs_compile:
        # User requested max optimization
        cmd = f"gcc -O3 -shared -fPIC -o {C_LIB_PATH} {C_SRC_PATH}"
        if os.system(cmd) != 0:
            print("Failed to compile C helper.")
            return None
            
    try:
        lib = ctypes.CDLL(C_LIB_PATH)
        lib.measure_latency_random.argtypes = [ctypes.c_size_t, ctypes.c_size_t]
        lib.measure_latency_random.restype = ctypes.c_double
        
        lib.measure_latency_sequential.argtypes = [ctypes.c_size_t, ctypes.c_size_t]
        lib.measure_latency_sequential.restype = ctypes.c_double
        
        lib.measure_alloc_rate.argtypes = [ctypes.c_size_t, ctypes.c_size_t]
        lib.measure_alloc_rate.restype = ctypes.c_double
        return lib
    except Exception as e:
        print(f"Failed to load C helper: {e}")
        return None

c_lib = _compile_c_helper()

def _raw_memory_worker(args):
    """
    Worker process for memory bandwidth test using raw C calls.
    Equivalent to sysbench memory test.
    """
    block_size_mb, duration, mode = args
    block_size = block_size_mb * 1024 * 1024
    
    # Use mmap for aligned, raw memory allocation (no Python object overhead)
    # Anonymous mapping
    src_map = mmap.mmap(-1, block_size)
    dst_map = None
    
    # For copy mode, we need a destination
    if mode == 'copy':
        dst_map = mmap.mmap(-1, block_size)
    
    # Get raw pointers
    src_addr = ctypes.addressof(ctypes.c_char.from_buffer(src_map))
    dst_addr = ctypes.addressof(ctypes.c_char.from_buffer(dst_map)) if dst_map else 0
    
    # Prepare C function calls
    memset = libc.memset
    memcpy = libc.memcpy
    memchr = libc.memchr
    
    start_time = time.time()
    iterations = 0
    
    while time.time() - start_time < duration:
        if mode == 'read':
            # Scan memory (read access)
            # Find a byte that (likely) isn't there to force full scan
            memchr(src_addr, 1, block_size)
        elif mode == 'write':
            # Write memory
            memset(src_addr, 0, block_size)
        elif mode == 'copy':
            # Copy memory
            memcpy(dst_addr, src_addr, block_size)
        iterations += 1
        
    elapsed = time.time() - start_time
    
    # Cleanup
    src_map.close()
    if dst_map:
        dst_map.close()
        
    return iterations, elapsed

def benchmark_memory_bandwidth(block_size_mb: int = 4) -> Dict[str, Any]:
    """
    内存带宽测试 (Raw C Performance)
    Uses multiprocessing + ctypes to bypass Python overhead.
    """
    if not libc:
        return {"error": "libc not found, cannot run optimized benchmark"}
        
    num_cores = multiprocessing.cpu_count()
    duration = 3.0
    
    # sysbench defaults to 1KB-1MB blocks. User mentioned 1MB.
    # We use a slightly larger buffer per thread to amortize loop overhead if needed,
    # but 1MB-4MB is usually good for L3/RAM cache thrashing.
    # Let's stick to 4MB per thread to ensure we hit RAM.
    
    modes = ['read', 'write', 'copy']
    results = {}
    
    with ProcessPoolExecutor(max_workers=num_cores) as executor:
        for mode in modes:
            # Submit tasks
            futures = [executor.submit(_raw_memory_worker, (block_size_mb, duration, mode)) for _ in range(num_cores)]
            
            total_iterations = 0
            max_elapsed = 0
            
            for f in futures:
                iters, elapsed = f.result()
                total_iterations += iters
                max_elapsed = max(max_elapsed, elapsed)
            
            # Calculate Bandwidth
            # Data transferred per iteration = block_size
            bytes_per_iter = block_size_mb * 1024 * 1024
            
            total_bytes = total_iterations * bytes_per_iter
            
            # Note: For 'copy', sysbench counts read+write? 
            # Usually bandwidth is defined as bytes processed.
            # If we copy 1GB, we read 1GB and write 1GB.
            # sysbench memory test reports "transferred".
            # For copy, let's report the amount of data moved (Payload). 
            # Or if user wants bus bandwidth, it's 2x. 
            # Benchmarks usually report the size of the buffer processed.
            # However, previous impl multiplied by 2. Let's stick to total bytes moved over bus.
            if mode == 'copy':
                total_bytes *= 2
            
            # Avoid division by zero
            if max_elapsed > 0:
                bandwidth_gb_s = total_bytes / max_elapsed / (1024**3)
            else:
                bandwidth_gb_s = 0
                
            results[f"{mode}_bandwidth_gb_s"] = round(bandwidth_gb_s, 3)

    return {
        "test": "memory_bandwidth",
        "description": f"Memory bandwidth test (Multi-core C-level, {num_cores} threads)",
        "block_size_mb": block_size_mb,
        "read_bandwidth_gb_s": results['read_bandwidth_gb_s'],
        "write_bandwidth_gb_s": results['write_bandwidth_gb_s'],
        "copy_bandwidth_gb_s": results['copy_bandwidth_gb_s'],
        "score": round((results['read_bandwidth_gb_s'] + results['write_bandwidth_gb_s'] + results['copy_bandwidth_gb_s']) * 10, 2),
    }


def benchmark_memory_latency(iterations: int = 10000000) -> Dict[str, Any]:
    """
    内存延迟测试(随机访问)
    Uses C helper for precise pointer chasing.
    """
    if not c_lib:
        return {"error": "C helper not available"}
        
    # Test random access latency on a large block (64MB) to hit RAM
    array_size_bytes = 64 * 1024 * 1024
    
    elapsed = c_lib.measure_latency_random(array_size_bytes, iterations)
    
    if elapsed <= 0:
        return {"error": "Benchmark failed"}
        
    latency_ns = (elapsed / iterations) * 1e9
    
    return {
        "test": "memory_latency_random",
        "description": "Random access latency (64MB working set, Pointer Chasing)",
        "iterations": iterations,
        "total_time_seconds": round(elapsed, 4),
        "average_latency_ns": round(latency_ns, 2),
        "score": round(100 / latency_ns * 1000, 2), # Adjusted score scale
    }

def benchmark_sequential_latency(iterations: int = 10000000) -> Dict[str, Any]:
    """
    内存延迟测试(顺序访问)
    Uses C helper.
    """
    if not c_lib:
        return {"error": "C helper not available"}
        
    # Same 64MB block
    array_size_bytes = 64 * 1024 * 1024
    
    elapsed = c_lib.measure_latency_sequential(array_size_bytes, iterations)
    
    if elapsed <= 0:
        return {"error": "Benchmark failed"}
        
    latency_ns = (elapsed / iterations) * 1e9
    
    return {
        "test": "memory_latency_sequential",
        "description": "Sequential access latency (64MB working set, Strided Read)",
        "iterations": iterations,
        "total_time_seconds": round(elapsed, 4),
        "average_latency_ns": round(latency_ns, 2),
        "score": round(100 / latency_ns * 1000, 2),
    }

def benchmark_alloc_rate(iterations: int = 1000000) -> Dict[str, Any]:
    """
    内存分配/释放速率测试
    """
    if not c_lib:
        return {"error": "C helper not available"}
        
    # Test small allocations (e.g. 1KB) which are common
    alloc_size = 1024
    
    elapsed = c_lib.measure_alloc_rate(alloc_size, iterations)
    
    if elapsed <= 0:
        return {"error": "Benchmark failed"}
        
    ops_per_sec = iterations / elapsed
    
    return {
        "test": "memory_alloc_rate",
        "description": f"Malloc/Free rate (Size: {alloc_size} bytes)",
        "iterations": iterations,
        "ops_per_sec": round(ops_per_sec, 2),
        "score": round(ops_per_sec / 10000, 2),
    }



def benchmark_cache_latency() -> Dict[str, Any]:
    """
    缓存层级延迟测试 (L1/L2/L3)
    Uses C helper pointer chasing with smaller working sets.
    """
    if not c_lib:
        return {"error": "C helper not available"}
        
    results = {}
    
    # Approximate sizes. 
    # Must be small enough to fit in cache, but large enough to measure.
    # Typical: L1=32KB(use 16KB), L2=256KB(use 128KB), L3=8MB+(use 4MB)
    levels = [
        ("L1", 16 * 1024),
        ("L2", 128 * 1024),
        ("L3", 4 * 1024 * 1024)
    ]
    
    iterations = 10000000 # 10M iterations
    
    for name, size in levels:
        elapsed = c_lib.measure_latency_random(size, iterations)
        latency_ns = (elapsed / iterations) * 1e9
    for name, size in levels:
        elapsed = c_lib.measure_latency_random(size, iterations)
        if elapsed <= 0:
             # Fallback or error
             latency_ns = 0.0
        else:
             latency_ns = (elapsed / iterations) * 1e9
             
        results[name] = {
            "size_bytes": size,
            "latency_ns": round(latency_ns, 2)
        }
        
    l1_lat = results["L1"]["latency_ns"]
    score = 0
    if l1_lat > 0:
        score = round(100 / l1_lat * 500, 2)
        
    return {
        "test": "cache_latency",
        "description": "Cache hierarchy latency (Pointer Chasing)",
        "levels": results,
        "score": score
    }



def run_all_memory_benchmarks() -> Dict[str, Any]:
    """运行所有内存基准测试"""
    results = {
        "bandwidth": benchmark_memory_bandwidth(),
        "latency_random": benchmark_memory_latency(),
        "latency_sequential": benchmark_sequential_latency(),
        "cache_latency": benchmark_cache_latency(),
        "alloc_rate": benchmark_alloc_rate(),
    }
    
    # 计算总分
    total_score = sum(r.get("score", 0) for r in results.values())
    results["total_score"] = round(total_score, 2)
    
    return results