""" Memory Benchmark Module 内存性能测试:带宽测试、延迟测试、缓存性能 Optimized with ctypes for raw C-level performance """ import time import ctypes import multiprocessing import mmap import os import numpy as np # Keep for latency/cache tests from concurrent.futures import ProcessPoolExecutor from typing import Dict, Any # Load C standard library try: libc = ctypes.CDLL("libc.so.6") libc.memset.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_size_t] libc.memcpy.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_size_t] libc.memchr.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_size_t] except Exception: libc = None # --- C Extension Handling --- C_LIB_PATH = os.path.join(os.path.dirname(__file__), "_memory_bench_c.so") C_SRC_PATH = os.path.join(os.path.dirname(__file__), "memory_bench_c.c") def _compile_c_helper(): """Compiles the C helper library if it doesn't exist or is outdated.""" if not os.path.exists(C_SRC_PATH): return None needs_compile = False if not os.path.exists(C_LIB_PATH): needs_compile = True else: # Check timestamps if os.path.getmtime(C_SRC_PATH) > os.path.getmtime(C_LIB_PATH): needs_compile = True if needs_compile: # User requested max optimization cmd = f"gcc -O3 -shared -fPIC -o {C_LIB_PATH} {C_SRC_PATH}" if os.system(cmd) != 0: print("Failed to compile C helper.") return None try: lib = ctypes.CDLL(C_LIB_PATH) lib.measure_latency_random.argtypes = [ctypes.c_size_t, ctypes.c_size_t] lib.measure_latency_random.restype = ctypes.c_double lib.measure_latency_sequential.argtypes = [ctypes.c_size_t, ctypes.c_size_t] lib.measure_latency_sequential.restype = ctypes.c_double lib.measure_alloc_rate.argtypes = [ctypes.c_size_t, ctypes.c_size_t] lib.measure_alloc_rate.restype = ctypes.c_double return lib except Exception as e: print(f"Failed to load C helper: {e}") return None c_lib = _compile_c_helper() def _raw_memory_worker(args): """ Worker process for memory bandwidth test using raw C calls. Equivalent to sysbench memory test. """ block_size_mb, duration, mode = args block_size = block_size_mb * 1024 * 1024 # Use mmap for aligned, raw memory allocation (no Python object overhead) # Anonymous mapping src_map = mmap.mmap(-1, block_size) dst_map = None # For copy mode, we need a destination if mode == 'copy': dst_map = mmap.mmap(-1, block_size) # Get raw pointers src_addr = ctypes.addressof(ctypes.c_char.from_buffer(src_map)) dst_addr = ctypes.addressof(ctypes.c_char.from_buffer(dst_map)) if dst_map else 0 # Prepare C function calls memset = libc.memset memcpy = libc.memcpy memchr = libc.memchr start_time = time.time() iterations = 0 while time.time() - start_time < duration: if mode == 'read': # Scan memory (read access) # Find a byte that (likely) isn't there to force full scan memchr(src_addr, 1, block_size) elif mode == 'write': # Write memory memset(src_addr, 0, block_size) elif mode == 'copy': # Copy memory memcpy(dst_addr, src_addr, block_size) iterations += 1 elapsed = time.time() - start_time # Cleanup src_map.close() if dst_map: dst_map.close() return iterations, elapsed def benchmark_memory_bandwidth(block_size_mb: int = 4) -> Dict[str, Any]: """ 内存带宽测试 (Raw C Performance) Uses multiprocessing + ctypes to bypass Python overhead. """ if not libc: return {"error": "libc not found, cannot run optimized benchmark"} num_cores = multiprocessing.cpu_count() duration = 3.0 # sysbench defaults to 1KB-1MB blocks. User mentioned 1MB. # We use a slightly larger buffer per thread to amortize loop overhead if needed, # but 1MB-4MB is usually good for L3/RAM cache thrashing. # Let's stick to 4MB per thread to ensure we hit RAM. modes = ['read', 'write', 'copy'] results = {} with ProcessPoolExecutor(max_workers=num_cores) as executor: for mode in modes: # Submit tasks futures = [executor.submit(_raw_memory_worker, (block_size_mb, duration, mode)) for _ in range(num_cores)] total_iterations = 0 max_elapsed = 0 for f in futures: iters, elapsed = f.result() total_iterations += iters max_elapsed = max(max_elapsed, elapsed) # Calculate Bandwidth # Data transferred per iteration = block_size bytes_per_iter = block_size_mb * 1024 * 1024 total_bytes = total_iterations * bytes_per_iter # Note: For 'copy', sysbench counts read+write? # Usually bandwidth is defined as bytes processed. # If we copy 1GB, we read 1GB and write 1GB. # sysbench memory test reports "transferred". # For copy, let's report the amount of data moved (Payload). # Or if user wants bus bandwidth, it's 2x. # Benchmarks usually report the size of the buffer processed. # However, previous impl multiplied by 2. Let's stick to total bytes moved over bus. if mode == 'copy': total_bytes *= 2 # Avoid division by zero if max_elapsed > 0: bandwidth_gb_s = total_bytes / max_elapsed / (1024**3) else: bandwidth_gb_s = 0 results[f"{mode}_bandwidth_gb_s"] = round(bandwidth_gb_s, 3) return { "test": "memory_bandwidth", "description": f"Memory bandwidth test (Multi-core C-level, {num_cores} threads)", "block_size_mb": block_size_mb, "read_bandwidth_gb_s": results['read_bandwidth_gb_s'], "write_bandwidth_gb_s": results['write_bandwidth_gb_s'], "copy_bandwidth_gb_s": results['copy_bandwidth_gb_s'], "score": round((results['read_bandwidth_gb_s'] + results['write_bandwidth_gb_s'] + results['copy_bandwidth_gb_s']) * 10, 2), } def benchmark_memory_latency(iterations: int = 10000000) -> Dict[str, Any]: """ 内存延迟测试(随机访问) Uses C helper for precise pointer chasing. """ if not c_lib: return {"error": "C helper not available"} # Test random access latency on a large block (64MB) to hit RAM array_size_bytes = 64 * 1024 * 1024 elapsed = c_lib.measure_latency_random(array_size_bytes, iterations) if elapsed <= 0: return {"error": "Benchmark failed"} latency_ns = (elapsed / iterations) * 1e9 return { "test": "memory_latency_random", "description": "Random access latency (64MB working set, Pointer Chasing)", "iterations": iterations, "total_time_seconds": round(elapsed, 4), "average_latency_ns": round(latency_ns, 2), "score": round(100 / latency_ns * 1000, 2), # Adjusted score scale } def benchmark_sequential_latency(iterations: int = 10000000) -> Dict[str, Any]: """ 内存延迟测试(顺序访问) Uses C helper. """ if not c_lib: return {"error": "C helper not available"} # Same 64MB block array_size_bytes = 64 * 1024 * 1024 elapsed = c_lib.measure_latency_sequential(array_size_bytes, iterations) if elapsed <= 0: return {"error": "Benchmark failed"} latency_ns = (elapsed / iterations) * 1e9 return { "test": "memory_latency_sequential", "description": "Sequential access latency (64MB working set, Strided Read)", "iterations": iterations, "total_time_seconds": round(elapsed, 4), "average_latency_ns": round(latency_ns, 2), "score": round(100 / latency_ns * 1000, 2), } def benchmark_alloc_rate(iterations: int = 1000000) -> Dict[str, Any]: """ 内存分配/释放速率测试 """ if not c_lib: return {"error": "C helper not available"} # Test small allocations (e.g. 1KB) which are common alloc_size = 1024 elapsed = c_lib.measure_alloc_rate(alloc_size, iterations) if elapsed <= 0: return {"error": "Benchmark failed"} ops_per_sec = iterations / elapsed return { "test": "memory_alloc_rate", "description": f"Malloc/Free rate (Size: {alloc_size} bytes)", "iterations": iterations, "ops_per_sec": round(ops_per_sec, 2), "score": round(ops_per_sec / 10000, 2), } def benchmark_cache_latency() -> Dict[str, Any]: """ 缓存层级延迟测试 (L1/L2/L3) Uses C helper pointer chasing with smaller working sets. """ if not c_lib: return {"error": "C helper not available"} results = {} # Approximate sizes. # Must be small enough to fit in cache, but large enough to measure. # Typical: L1=32KB(use 16KB), L2=256KB(use 128KB), L3=8MB+(use 4MB) levels = [ ("L1", 16 * 1024), ("L2", 128 * 1024), ("L3", 4 * 1024 * 1024) ] iterations = 10000000 # 10M iterations for name, size in levels: elapsed = c_lib.measure_latency_random(size, iterations) latency_ns = (elapsed / iterations) * 1e9 for name, size in levels: elapsed = c_lib.measure_latency_random(size, iterations) if elapsed <= 0: # Fallback or error latency_ns = 0.0 else: latency_ns = (elapsed / iterations) * 1e9 results[name] = { "size_bytes": size, "latency_ns": round(latency_ns, 2) } l1_lat = results["L1"]["latency_ns"] score = 0 if l1_lat > 0: score = round(100 / l1_lat * 500, 2) return { "test": "cache_latency", "description": "Cache hierarchy latency (Pointer Chasing)", "levels": results, "score": score } def run_all_memory_benchmarks() -> Dict[str, Any]: """运行所有内存基准测试""" results = { "bandwidth": benchmark_memory_bandwidth(), "latency_random": benchmark_memory_latency(), "latency_sequential": benchmark_sequential_latency(), "cache_latency": benchmark_cache_latency(), "alloc_rate": benchmark_alloc_rate(), } # 计算总分 total_score = sum(r.get("score", 0) for r in results.values()) results["total_score"] = round(total_score, 2) return results