Spaces:
Sleeping
Sleeping
| """ | |
| CPU Benchmark Module | |
| Optimized with native C library compilation for maximum performance | |
| """ | |
| import time | |
| import os | |
| import ctypes | |
| import hashlib | |
| import multiprocessing | |
| import subprocess | |
| import zlib | |
| from concurrent.futures import ProcessPoolExecutor | |
| from typing import Dict, Any, Optional | |
| # Path to the C source and compiled library | |
| CPU_OPS_SRC = os.path.join(os.path.dirname(__file__), "cpu_ops.c") | |
| CPU_OPS_LIB = os.path.join(os.path.dirname(__file__), "cpu_ops.so") | |
| # Global reference to the Loaded Library | |
| _lib = None | |
| def compile_and_load_lib() -> Optional[ctypes.CDLL]: | |
| """Compile and load the C library""" | |
| global _lib | |
| if _lib is not None: | |
| return _lib | |
| try: | |
| # 1. Try to load existing library first (Avoid race condition in workers) | |
| if os.path.exists(CPU_OPS_LIB): | |
| try: | |
| lib = ctypes.CDLL(CPU_OPS_LIB) | |
| _init_lib_signatures(lib) | |
| _lib = lib | |
| return lib | |
| except OSError: | |
| # File might be corrupted or empty, proceed to compile | |
| pass | |
| # 2. Check if source exists | |
| if not os.path.exists(CPU_OPS_SRC): | |
| print(f"Error: {CPU_OPS_SRC} not found") | |
| return None | |
| # 3. Compile (Only if loading failed or didn't exist) | |
| # gcc -shared -o cpu_ops.so -fPIC -O3 cpu_ops.c -lm | |
| cmd = [ | |
| "gcc", "-shared", "-o", CPU_OPS_LIB, | |
| "-fPIC", "-O3", CPU_OPS_SRC, "-lm" | |
| ] | |
| # Run compilation | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| print(f"Compilation failed: {result.stderr}") | |
| return None | |
| # 4. Load library | |
| lib = ctypes.CDLL(CPU_OPS_LIB) | |
| _init_lib_signatures(lib) | |
| _lib = lib | |
| return lib | |
| except Exception as e: | |
| print(f"Failed to load native lib: {e}") | |
| try: | |
| # Try to cleanup bad file if it exists | |
| if os.path.exists(CPU_OPS_LIB): | |
| os.remove(CPU_OPS_LIB) | |
| except: | |
| pass | |
| return None | |
| def _init_lib_signatures(lib): | |
| """Initialize function signatures""" | |
| try: | |
| lib.benchmark_integer_time.argtypes = [ctypes.c_double] | |
| lib.benchmark_integer_time.restype = ctypes.c_uint64 | |
| lib.benchmark_float_time.argtypes = [ctypes.c_double] | |
| lib.benchmark_float_time.restype = ctypes.c_uint64 | |
| except AttributeError: | |
| pass | |
| def _native_worker_integer(duration: float) -> int: | |
| """Worker for integer benchmark using native C""" | |
| lib = compile_and_load_lib() | |
| if lib: | |
| return lib.benchmark_integer_time(duration) | |
| return 0 | |
| def _native_worker_float(duration: float) -> int: | |
| """Worker for float benchmark using native C""" | |
| lib = compile_and_load_lib() | |
| if lib: | |
| return lib.benchmark_float_time(duration) | |
| return 0 | |
| # --------------------------------------------------------------------------- | |
| # Python Fallbacks (Legacy) | |
| # --------------------------------------------------------------------------- | |
| def _is_prime(n: int) -> bool: | |
| if n < 2: return False | |
| if n == 2: return True | |
| if n % 2 == 0: return False | |
| for i in range(3, int(n**0.5) + 1, 2): | |
| if n % i == 0: return False | |
| return True | |
| def _python_single_core_integer(duration: float) -> int: | |
| start = time.time() | |
| n = 3 | |
| ops = 0 | |
| while time.time() - start < duration: | |
| if _is_prime(n): pass | |
| n += 1 | |
| ops += 1 | |
| return ops | |
| # --------------------------------------------------------------------------- | |
| # Benchmarks | |
| # --------------------------------------------------------------------------- | |
| def benchmark_single_core_integer(duration: float = 2.0) -> Dict[str, Any]: | |
| """单核整数运算测试 (Native C)""" | |
| # 尝试加载 C 库 | |
| lib = compile_and_load_lib() | |
| if lib: | |
| # Native Run | |
| start_time = time.time() | |
| operations = lib.benchmark_integer_time(duration) | |
| elapsed = time.time() - start_time # Should be close to duration | |
| # Fix elapsed if it differs significantly (C function returns strictly after duration) | |
| if elapsed < duration: elapsed = duration | |
| desc = "Prime calculation (Native C)" | |
| else: | |
| # Fallback | |
| start_time = time.time() | |
| operations = _python_single_core_integer(duration) | |
| elapsed = time.time() - start_time | |
| desc = "Prime calculation (Python Fallback)" | |
| ops_per_sec = operations / elapsed if elapsed > 0 else 0 | |
| return { | |
| "test": "single_core_integer", | |
| "description": desc, | |
| "duration_seconds": round(elapsed, 3), | |
| "operations": operations, | |
| "ops_per_second": round(ops_per_sec, 2), | |
| "score": round(ops_per_sec / 100000, 2), # Adjusted score scaling for C speed | |
| } | |
| def benchmark_multi_core_integer(duration: float = 2.0) -> Dict[str, Any]: | |
| """多核整数运算测试 (Native C, Parallel)""" | |
| num_cores = multiprocessing.cpu_count() | |
| # Check if native available | |
| lib = compile_and_load_lib() | |
| use_native = (lib is not None) | |
| start_time = time.time() | |
| with ProcessPoolExecutor(max_workers=num_cores) as executor: | |
| if use_native: | |
| futures = [executor.submit(_native_worker_integer, duration) for _ in range(num_cores)] | |
| else: | |
| # Simple python fallback wrapper | |
| futures = [executor.submit(_python_single_core_integer, duration) for _ in range(num_cores)] | |
| total_ops = sum(f.result() for f in futures) | |
| elapsed = time.time() - start_time | |
| # Parallel execution usually takes slightly longer than 'duration' due to overhead | |
| if elapsed < duration: elapsed = duration | |
| ops_per_sec = total_ops / elapsed | |
| # Efficiency | |
| single_core_perf = ops_per_sec / num_cores # Average per core | |
| # We can't easily calc efficiency without a distinct single core run, but we can assume ideal | |
| desc = f"Parallel Prime Calc ({num_cores} cores, {'Native C' if use_native else 'Python'})" | |
| return { | |
| "test": "multi_core_integer", | |
| "description": desc, | |
| "duration_seconds": round(elapsed, 3), | |
| "cores_used": num_cores, | |
| "operations": total_ops, | |
| "ops_per_second": round(ops_per_sec, 2), | |
| "score": round(ops_per_sec / 100000, 2), | |
| } | |
| def benchmark_single_core_float(duration: float = 2.0) -> Dict[str, Any]: | |
| """单核浮点运算测试 (Native C Math)""" | |
| lib = compile_and_load_lib() | |
| if lib: | |
| start_time = time.time() | |
| operations = lib.benchmark_float_time(duration) | |
| elapsed = time.time() - start_time | |
| desc = "Heavy Math (Native C: sin/cos/sqrt)" | |
| else: | |
| # Fallback to simple python | |
| start_time = time.time() | |
| # Very simple python float loop | |
| a = 1.1 | |
| ops = 0 | |
| while time.time() - start_time < duration: | |
| for _ in range(1000): | |
| a = (a * 1.000001) + 0.000001 | |
| ops += 1 | |
| elapsed = time.time() - start_time | |
| operations = ops | |
| desc = "Float Loop (Python Fallback)" | |
| ops_per_sec = operations / elapsed if elapsed > 0 else 0 | |
| # Estimate FLOPs | |
| # Native C: sin/cos/sqrt mix -> approx 20 FLOPs per iteration | |
| # Python: simple mul/add -> approx 2 FLOPs per iteration | |
| flops_per_op = 20 if lib else 2 | |
| gflops = (ops_per_sec * flops_per_op) / 1e9 | |
| return { | |
| "test": "single_core_float", | |
| "description": desc, | |
| "duration_seconds": round(elapsed, 3), | |
| "operations": operations, | |
| "ops_per_second": round(ops_per_sec, 2), | |
| "gflops": round(gflops, 4), | |
| "score": round(ops_per_sec / 100000, 2), | |
| } | |
| def benchmark_multi_core_float(duration: float = 2.0) -> Dict[str, Any]: | |
| """多核浮点运算测试 (Native C Math)""" | |
| num_cores = multiprocessing.cpu_count() | |
| lib = compile_and_load_lib() | |
| use_native = (lib is not None) | |
| start_time = time.time() | |
| with ProcessPoolExecutor(max_workers=num_cores) as executor: | |
| if use_native: | |
| futures = [executor.submit(_native_worker_float, duration) for _ in range(num_cores)] | |
| else: | |
| # Just return 0 for python fallback multithread float to avoid freezing | |
| return {"error": "Native lib required for multi-core float bench"} | |
| total_ops = sum(f.result() for f in futures) | |
| elapsed = time.time() - start_time | |
| ops_per_sec = total_ops / elapsed if elapsed > 0 else 0 | |
| # Estimate FLOPs (Native C required) | |
| flops_per_op = 20 | |
| gflops = (ops_per_sec * flops_per_op) / 1e9 | |
| return { | |
| "test": "multi_core_float", | |
| "description": f"Parallel Math ({num_cores} cores, Native C)", | |
| "duration_seconds": round(elapsed, 3), | |
| "cores_available": num_cores, | |
| "operations": total_ops, | |
| "ops_per_second": round(ops_per_sec, 2), | |
| "gflops": round(gflops, 4), | |
| "score": round(ops_per_sec / 100000, 2), | |
| } | |
| def benchmark_crypto(duration: float = 2.0) -> Dict[str, Any]: | |
| """加密性能测试 (OpenSSL via hashlib)""" | |
| # hashlib calls C-level OpenSSL, so it is already valid "native" benchmark | |
| data = b'x' * 1024 * 1024 # 1MB | |
| start_time = time.time() | |
| iterations = 0 | |
| while time.time() - start_time < duration: | |
| hashlib.sha256(data).hexdigest() | |
| iterations += 1 | |
| elapsed = time.time() - start_time | |
| mb_per_sec = iterations / elapsed | |
| return { | |
| "test": "crypto_sha256", | |
| "description": "SHA256 Hashing (OpenSSL)", | |
| "duration_seconds": round(elapsed, 3), | |
| "throughput_mb_per_sec": round(mb_per_sec, 2), | |
| "score": round(mb_per_sec * 2, 2), | |
| } | |
| def benchmark_compression(duration: float = 2.0) -> Dict[str, Any]: | |
| """压缩性能测试 (zlib C library)""" | |
| # zlib calls C-level library | |
| chunk_size = 1024 * 1024 | |
| data = os.urandom(chunk_size) | |
| start_time = time.time() | |
| total_bytes = 0 | |
| while time.time() - start_time < duration: | |
| c = zlib.compress(data, level=6) | |
| _ = zlib.decompress(c) | |
| total_bytes += chunk_size | |
| elapsed = time.time() - start_time | |
| mb_per_sec = (total_bytes / elapsed) / (1024 * 1024) | |
| return { | |
| "test": "compression_zlib", | |
| "description": "Zlib Compression (Native)", | |
| "duration_seconds": round(elapsed, 3), | |
| "throughput_mb_per_sec": round(mb_per_sec, 2), | |
| "score": round(mb_per_sec * 0.5, 2), | |
| } | |
| def benchmark_single_thread_stress(duration: float = 2.0) -> Dict[str, Any]: | |
| """单线程调度延迟 (System Call Stress)""" | |
| # time.sleep calls nanosleep syscall, which is a good test for kernel scheduler | |
| start_time = time.time() | |
| iterations = 0 | |
| while time.time() - start_time < duration: | |
| time.sleep(0.0001) | |
| iterations += 1 | |
| elapsed = time.time() - start_time | |
| return { | |
| "test": "single_thread_stress", | |
| "description": "Scheduler Stress (nanosleep syscall)", | |
| "duration_seconds": round(elapsed, 3), | |
| "wakeups_per_second": round(iterations / elapsed, 2), | |
| "score": round(iterations / elapsed / 100, 2), | |
| } | |
| def run_all_cpu_benchmarks() -> Dict[str, Any]: | |
| # Ensure lib is compiled once at start | |
| compile_and_load_lib() | |
| results = { | |
| "single_core_integer": benchmark_single_core_integer(), | |
| "multi_core_integer": benchmark_multi_core_integer(), | |
| "single_core_float": benchmark_single_core_float(), | |
| "multi_core_float": benchmark_multi_core_float(), | |
| "crypto": benchmark_crypto(), | |
| "compression": benchmark_compression(), | |
| "stress": benchmark_single_thread_stress(), | |
| } | |
| # Recalculate total score | |
| total_score = sum(r.get("score", 0) for r in results.values() if "score" in r) | |
| results["total_score"] = round(total_score, 2) | |
| return results | |