space-fetch / benchmarks /memory_bench.py
Orion-zhen's picture
space fetch
0404756
"""
Memory Benchmark Module
内存性能测试:带宽测试、延迟测试、缓存性能
Optimized with ctypes for raw C-level performance
"""
import time
import ctypes
import multiprocessing
import mmap
import os
import numpy as np # Keep for latency/cache tests
from concurrent.futures import ProcessPoolExecutor
from typing import Dict, Any
# Load C standard library
try:
libc = ctypes.CDLL("libc.so.6")
libc.memset.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_size_t]
libc.memcpy.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_size_t]
libc.memchr.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_size_t]
except Exception:
libc = None
# --- C Extension Handling ---
C_LIB_PATH = os.path.join(os.path.dirname(__file__), "_memory_bench_c.so")
C_SRC_PATH = os.path.join(os.path.dirname(__file__), "memory_bench_c.c")
def _compile_c_helper():
"""Compiles the C helper library if it doesn't exist or is outdated."""
if not os.path.exists(C_SRC_PATH):
return None
needs_compile = False
if not os.path.exists(C_LIB_PATH):
needs_compile = True
else:
# Check timestamps
if os.path.getmtime(C_SRC_PATH) > os.path.getmtime(C_LIB_PATH):
needs_compile = True
if needs_compile:
# User requested max optimization
cmd = f"gcc -O3 -shared -fPIC -o {C_LIB_PATH} {C_SRC_PATH}"
if os.system(cmd) != 0:
print("Failed to compile C helper.")
return None
try:
lib = ctypes.CDLL(C_LIB_PATH)
lib.measure_latency_random.argtypes = [ctypes.c_size_t, ctypes.c_size_t]
lib.measure_latency_random.restype = ctypes.c_double
lib.measure_latency_sequential.argtypes = [ctypes.c_size_t, ctypes.c_size_t]
lib.measure_latency_sequential.restype = ctypes.c_double
lib.measure_alloc_rate.argtypes = [ctypes.c_size_t, ctypes.c_size_t]
lib.measure_alloc_rate.restype = ctypes.c_double
return lib
except Exception as e:
print(f"Failed to load C helper: {e}")
return None
c_lib = _compile_c_helper()
def _raw_memory_worker(args):
"""
Worker process for memory bandwidth test using raw C calls.
Equivalent to sysbench memory test.
"""
block_size_mb, duration, mode = args
block_size = block_size_mb * 1024 * 1024
# Use mmap for aligned, raw memory allocation (no Python object overhead)
# Anonymous mapping
src_map = mmap.mmap(-1, block_size)
dst_map = None
# For copy mode, we need a destination
if mode == 'copy':
dst_map = mmap.mmap(-1, block_size)
# Get raw pointers
src_addr = ctypes.addressof(ctypes.c_char.from_buffer(src_map))
dst_addr = ctypes.addressof(ctypes.c_char.from_buffer(dst_map)) if dst_map else 0
# Prepare C function calls
memset = libc.memset
memcpy = libc.memcpy
memchr = libc.memchr
start_time = time.time()
iterations = 0
while time.time() - start_time < duration:
if mode == 'read':
# Scan memory (read access)
# Find a byte that (likely) isn't there to force full scan
memchr(src_addr, 1, block_size)
elif mode == 'write':
# Write memory
memset(src_addr, 0, block_size)
elif mode == 'copy':
# Copy memory
memcpy(dst_addr, src_addr, block_size)
iterations += 1
elapsed = time.time() - start_time
# Cleanup
src_map.close()
if dst_map:
dst_map.close()
return iterations, elapsed
def benchmark_memory_bandwidth(block_size_mb: int = 4) -> Dict[str, Any]:
"""
内存带宽测试 (Raw C Performance)
Uses multiprocessing + ctypes to bypass Python overhead.
"""
if not libc:
return {"error": "libc not found, cannot run optimized benchmark"}
num_cores = multiprocessing.cpu_count()
duration = 3.0
# sysbench defaults to 1KB-1MB blocks. User mentioned 1MB.
# We use a slightly larger buffer per thread to amortize loop overhead if needed,
# but 1MB-4MB is usually good for L3/RAM cache thrashing.
# Let's stick to 4MB per thread to ensure we hit RAM.
modes = ['read', 'write', 'copy']
results = {}
with ProcessPoolExecutor(max_workers=num_cores) as executor:
for mode in modes:
# Submit tasks
futures = [executor.submit(_raw_memory_worker, (block_size_mb, duration, mode)) for _ in range(num_cores)]
total_iterations = 0
max_elapsed = 0
for f in futures:
iters, elapsed = f.result()
total_iterations += iters
max_elapsed = max(max_elapsed, elapsed)
# Calculate Bandwidth
# Data transferred per iteration = block_size
bytes_per_iter = block_size_mb * 1024 * 1024
total_bytes = total_iterations * bytes_per_iter
# Note: For 'copy', sysbench counts read+write?
# Usually bandwidth is defined as bytes processed.
# If we copy 1GB, we read 1GB and write 1GB.
# sysbench memory test reports "transferred".
# For copy, let's report the amount of data moved (Payload).
# Or if user wants bus bandwidth, it's 2x.
# Benchmarks usually report the size of the buffer processed.
# However, previous impl multiplied by 2. Let's stick to total bytes moved over bus.
if mode == 'copy':
total_bytes *= 2
# Avoid division by zero
if max_elapsed > 0:
bandwidth_gb_s = total_bytes / max_elapsed / (1024**3)
else:
bandwidth_gb_s = 0
results[f"{mode}_bandwidth_gb_s"] = round(bandwidth_gb_s, 3)
return {
"test": "memory_bandwidth",
"description": f"Memory bandwidth test (Multi-core C-level, {num_cores} threads)",
"block_size_mb": block_size_mb,
"read_bandwidth_gb_s": results['read_bandwidth_gb_s'],
"write_bandwidth_gb_s": results['write_bandwidth_gb_s'],
"copy_bandwidth_gb_s": results['copy_bandwidth_gb_s'],
"score": round((results['read_bandwidth_gb_s'] + results['write_bandwidth_gb_s'] + results['copy_bandwidth_gb_s']) * 10, 2),
}
def benchmark_memory_latency(iterations: int = 10000000) -> Dict[str, Any]:
"""
内存延迟测试(随机访问)
Uses C helper for precise pointer chasing.
"""
if not c_lib:
return {"error": "C helper not available"}
# Test random access latency on a large block (64MB) to hit RAM
array_size_bytes = 64 * 1024 * 1024
elapsed = c_lib.measure_latency_random(array_size_bytes, iterations)
if elapsed <= 0:
return {"error": "Benchmark failed"}
latency_ns = (elapsed / iterations) * 1e9
return {
"test": "memory_latency_random",
"description": "Random access latency (64MB working set, Pointer Chasing)",
"iterations": iterations,
"total_time_seconds": round(elapsed, 4),
"average_latency_ns": round(latency_ns, 2),
"score": round(100 / latency_ns * 1000, 2), # Adjusted score scale
}
def benchmark_sequential_latency(iterations: int = 10000000) -> Dict[str, Any]:
"""
内存延迟测试(顺序访问)
Uses C helper.
"""
if not c_lib:
return {"error": "C helper not available"}
# Same 64MB block
array_size_bytes = 64 * 1024 * 1024
elapsed = c_lib.measure_latency_sequential(array_size_bytes, iterations)
if elapsed <= 0:
return {"error": "Benchmark failed"}
latency_ns = (elapsed / iterations) * 1e9
return {
"test": "memory_latency_sequential",
"description": "Sequential access latency (64MB working set, Strided Read)",
"iterations": iterations,
"total_time_seconds": round(elapsed, 4),
"average_latency_ns": round(latency_ns, 2),
"score": round(100 / latency_ns * 1000, 2),
}
def benchmark_alloc_rate(iterations: int = 1000000) -> Dict[str, Any]:
"""
内存分配/释放速率测试
"""
if not c_lib:
return {"error": "C helper not available"}
# Test small allocations (e.g. 1KB) which are common
alloc_size = 1024
elapsed = c_lib.measure_alloc_rate(alloc_size, iterations)
if elapsed <= 0:
return {"error": "Benchmark failed"}
ops_per_sec = iterations / elapsed
return {
"test": "memory_alloc_rate",
"description": f"Malloc/Free rate (Size: {alloc_size} bytes)",
"iterations": iterations,
"ops_per_sec": round(ops_per_sec, 2),
"score": round(ops_per_sec / 10000, 2),
}
def benchmark_cache_latency() -> Dict[str, Any]:
"""
缓存层级延迟测试 (L1/L2/L3)
Uses C helper pointer chasing with smaller working sets.
"""
if not c_lib:
return {"error": "C helper not available"}
results = {}
# Approximate sizes.
# Must be small enough to fit in cache, but large enough to measure.
# Typical: L1=32KB(use 16KB), L2=256KB(use 128KB), L3=8MB+(use 4MB)
levels = [
("L1", 16 * 1024),
("L2", 128 * 1024),
("L3", 4 * 1024 * 1024)
]
iterations = 10000000 # 10M iterations
for name, size in levels:
elapsed = c_lib.measure_latency_random(size, iterations)
latency_ns = (elapsed / iterations) * 1e9
for name, size in levels:
elapsed = c_lib.measure_latency_random(size, iterations)
if elapsed <= 0:
# Fallback or error
latency_ns = 0.0
else:
latency_ns = (elapsed / iterations) * 1e9
results[name] = {
"size_bytes": size,
"latency_ns": round(latency_ns, 2)
}
l1_lat = results["L1"]["latency_ns"]
score = 0
if l1_lat > 0:
score = round(100 / l1_lat * 500, 2)
return {
"test": "cache_latency",
"description": "Cache hierarchy latency (Pointer Chasing)",
"levels": results,
"score": score
}
def run_all_memory_benchmarks() -> Dict[str, Any]:
"""运行所有内存基准测试"""
results = {
"bandwidth": benchmark_memory_bandwidth(),
"latency_random": benchmark_memory_latency(),
"latency_sequential": benchmark_sequential_latency(),
"cache_latency": benchmark_cache_latency(),
"alloc_rate": benchmark_alloc_rate(),
}
# 计算总分
total_score = sum(r.get("score", 0) for r in results.values())
results["total_score"] = round(total_score, 2)
return results