File size: 6,462 Bytes
36ab767 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# -*- coding: utf-8 -*-
import torch
import time
import datetime
import argparse
def occupy_gpu_memory(gpu_id, fraction, extra_reserve_gb):
"""在单张 GPU 上尝试分配显存,并支持重试。"""
try:
torch.cuda.set_device(gpu_id)
prop = torch.cuda.get_device_properties(gpu_id)
total_memory = prop.total_memory
total_gb = total_memory / 1024**3
target_reserve_bytes = int(total_memory * fraction) - int(extra_reserve_gb * 1024**3)
print("GPU {} ({}): total={:.2f} GB, initial target occupying ~= {:.2f} GB".format(
gpu_id, prop.name, total_gb, target_reserve_bytes / 1024**3))
if target_reserve_bytes <= 0:
print("GPU {}: Target occupation is non-positive, skipping.".format(gpu_id))
return None
# 尝试分配,如果 OOM 则减小尺寸重试
for attempt in range(5): # 最多重试 5 次
try:
num_elems = target_reserve_bytes // 4
if num_elems <= 0: return None
tensor = torch.randn(num_elems, dtype=torch.float32, device="cuda:{}".format(gpu_id))
torch.cuda.synchronize(gpu_id)
allocated_gb = tensor.element_size() * tensor.numel() / 1024**3
print("GPU {}: Successfully occupied {:.2f} GB.".format(gpu_id, allocated_gb))
return tensor
except RuntimeError as e:
if "out of memory" in str(e).lower():
print("GPU {}: OOM on attempt {}. Reducing target by 256 MB and retrying...".format(
gpu_id, attempt + 1))
target_reserve_bytes -= 256 * 1024 * 1024
else:
print("GPU {}: A non-OOM runtime error occurred: {}".format(gpu_id, e))
return None
print("GPU {}: Failed to allocate memory after all attempts.".format(gpu_id))
return None
except Exception as e:
print("An unexpected error occurred while processing GPU {}: {}".format(gpu_id, e))
return None
def parse_gpu_selection(gpu_arg, max_gpus):
"""Parse GPU selection string like '0,1' or 'cuda:0,cuda:1'."""
if gpu_arg is None:
return list(range(max_gpus))
selected = []
for token in gpu_arg.split(","):
token = token.strip()
if not token:
continue
if token.lower().startswith("cuda:"):
token = token.split(":", 1)[1]
try:
idx = int(token)
except ValueError:
raise ValueError("Invalid GPU identifier '{}'.".format(token))
if idx < 0 or idx >= max_gpus:
raise ValueError("GPU index {} is out of range [0, {}).".format(idx, max_gpus))
if idx not in selected:
selected.append(idx)
if not selected:
raise ValueError("No valid GPU identifiers were provided.")
return selected
def main(args):
num_gpus = torch.cuda.device_count()
if num_gpus == 0:
raise RuntimeError("No GPU detected.")
print("Detected {} GPUs.".format(num_gpus))
try:
gpu_ids = parse_gpu_selection(args.gpus, num_gpus)
except ValueError as parse_error:
raise RuntimeError(str(parse_error))
gpu_label = ", ".join(["cuda:{}".format(idx) for idx in gpu_ids])
print("Using GPUs: {}".format(gpu_label))
# --- 阶段一:显存占用 ---
print("\n--- Stage 1: Allocating memory on all GPUs ---")
tensors = [occupy_gpu_memory(gpu_id, args.fraction, args.extra_reserve_gb) for gpu_id in gpu_ids]
# --- 阶段二:算力保活 ---
print("\n--- Stage 2: Starting keep-alive compute task ---")
compute_tensors = []
for gpu_id in gpu_ids:
try:
torch.cuda.set_device(gpu_id)
compute_tensors.append(torch.randn(args.matrix_size, args.matrix_size, device="cuda:{}".format(gpu_id)))
except Exception:
compute_tensors.append(None)
print("Holding memory with a compute duty cycle of {}s work / {}s sleep.".format(
args.compute_sec, args.sleep_sec))
print("Press Ctrl+C to exit.")
try:
while True:
start_burst_time = time.time()
# 计算阶段
while time.time() - start_burst_time < args.compute_sec:
for idx, gpu_id in enumerate(gpu_ids):
if compute_tensors[idx] is not None:
try:
torch.cuda.set_device(gpu_id)
compute_tensors[idx] = torch.matmul(compute_tensors[idx], compute_tensors[idx].T)
compute_tensors[idx] = compute_tensors[idx] / (compute_tensors[idx].norm() + 1e-6)
except Exception as e:
print("Error during keep-alive on GPU {}: {}".format(gpu_id, e))
compute_tensors[idx] = None # 出错后停止在该 GPU 上的计算
# 同步并打印耗时
for idx, gpu_id in enumerate(gpu_ids):
if compute_tensors[idx] is not None: torch.cuda.synchronize(gpu_id)
actual_compute_time = time.time() - start_burst_time
print("[{}] Compute burst finished in {:.2f}s.".format(
datetime.datetime.now(), actual_compute_time), flush=True)
# 睡眠阶段
time.sleep(args.sleep_sec)
except KeyboardInterrupt:
print("\nExiting and releasing memory...")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Occupy GPU memory and maintain a specified utilization duty cycle.")
parser.add_argument("--fraction", type=float, default=0.95, help="Fraction of total GPU memory to try to occupy.")
parser.add_argument("--extra_reserve_gb", type=int, default=2, help="Additional memory to reserve in GB.")
parser.add_argument("--matrix_size", type=int, default=4096, help="Matrix size for keep-alive computation (e.g., 2048, 4096).")
parser.add_argument("--compute_sec", type=float, default=5.0, help="Target duration (in seconds) for the computation burst.")
parser.add_argument("--sleep_sec", type=float, default=3.0, help="Duration (in seconds) to sleep after each burst.")
parser.add_argument("--gpus", type=str, default=None, help="Comma-separated GPU ids to occupy, e.g. '0,1' or 'cuda:0,cuda:1'. Default uses all GPUs.")
args = parser.parse_args()
main(args)
|