| | import torch |
| | import time |
| | import argparse |
| | from threading import Thread |
| |
|
| | def gpu_worker(gpu_id, duration, tensor_size): |
| | """单个GPU的工作线程,负责持续进行张量运算""" |
| | try: |
| | |
| | device = torch.device(f"cuda:{gpu_id}") |
| | torch.cuda.set_device(device) |
| | |
| | |
| | gpu_name = torch.cuda.get_device_name(device) |
| | print(f"GPU {gpu_id} 启动: {gpu_name}") |
| | |
| | |
| | tensor_a = torch.randn(tensor_size, tensor_size, device=device) |
| | tensor_b = torch.randn(tensor_size, tensor_size, device=device) |
| | |
| | |
| | for _ in range(10): |
| | result = torch.matmul(tensor_a, tensor_b) |
| | torch.cuda.synchronize(device) |
| | |
| | |
| | start_time = time.time() |
| | iterations = 0 |
| | |
| | while time.time() - start_time < duration: |
| | |
| | result = torch.matmul(tensor_a, tensor_b) |
| | |
| | |
| | if iterations % 100 == 0: |
| | tensor_a = 0.999 * tensor_a + 0.001 * torch.randn_like(tensor_a) |
| | tensor_b = 0.999 * tensor_b + 0.001 * torch.randn_like(tensor_b) |
| | |
| | iterations += 1 |
| | |
| | |
| | if iterations % 1000 == 0: |
| | elapsed = time.time() - start_time |
| | print(f"GPU {gpu_id}: 已运行 {elapsed:.1f} 秒, 完成 {iterations} 次迭代") |
| | |
| | |
| | if iterations % 100 == 0: |
| | torch.cuda.synchronize(device) |
| | |
| | |
| | elapsed = time.time() - start_time |
| | print(f"GPU {gpu_id} 完成: 总时间 {elapsed:.1f} 秒, 总迭代 {iterations} 次, " |
| | f"平均每秒 {iterations/elapsed:.2f} 次") |
| | |
| | except Exception as e: |
| | print(f"GPU {gpu_id} 出错: {str(e)}") |
| | |
| | finally: |
| | |
| | torch.cuda.empty_cache() |
| |
|
| | def multi_gpu_stress_test(duration, tensor_size, use_gpus=None): |
| | """多GPU压力测试主函数""" |
| | |
| | available_gpus = torch.cuda.device_count() |
| | if available_gpus == 0: |
| | print("错误: 未检测到可用GPU") |
| | return |
| | |
| | |
| | if use_gpus is None: |
| | use_gpus = list(range(available_gpus)) |
| | else: |
| | |
| | use_gpus = [g for g in use_gpus if 0 <= g < available_gpus] |
| | if not use_gpus: |
| | print("错误: 没有有效的GPU ID") |
| | return |
| | |
| | print(f"检测到 {available_gpus} 张GPU,将使用 {len(use_gpus)} 张: {use_gpus}") |
| | |
| | |
| | threads = [] |
| | for gpu_id in use_gpus: |
| | thread = Thread(target=gpu_worker, args=(gpu_id, duration, tensor_size)) |
| | threads.append(thread) |
| | thread.start() |
| | |
| | |
| | for thread in threads: |
| | thread.join() |
| | |
| | print("所有GPU测试完成") |
| |
|
| | if __name__ == "__main__": |
| | parser = argparse.ArgumentParser(description='多GPU压力测试程序') |
| | parser.add_argument('--duration', type=int, default=6000000, |
| | help='测试持续时间(秒),默认60秒') |
| | parser.add_argument('--size', type=int, default=4096, |
| | help='每张GPU上的张量大小,默认4096x4096') |
| | parser.add_argument('--gpus', type=int, nargs='+', |
| | help=f'指定要使用的GPU ID,如 --gpus 0 1 2 3 4 5 6 7') |
| | args = parser.parse_args() |
| | |
| | |
| | multi_gpu_stress_test(args.duration, args.size, args.gpus) |
| |
|