test / occupy.py
azzzacs's picture
Upload occupy.py with huggingface_hub
36ab767 verified
# -*- coding: utf-8 -*-
import torch
import time
import datetime
import argparse
def occupy_gpu_memory(gpu_id, fraction, extra_reserve_gb):
"""在单张 GPU 上尝试分配显存,并支持重试。"""
try:
torch.cuda.set_device(gpu_id)
prop = torch.cuda.get_device_properties(gpu_id)
total_memory = prop.total_memory
total_gb = total_memory / 1024**3
target_reserve_bytes = int(total_memory * fraction) - int(extra_reserve_gb * 1024**3)
print("GPU {} ({}): total={:.2f} GB, initial target occupying ~= {:.2f} GB".format(
gpu_id, prop.name, total_gb, target_reserve_bytes / 1024**3))
if target_reserve_bytes <= 0:
print("GPU {}: Target occupation is non-positive, skipping.".format(gpu_id))
return None
# 尝试分配,如果 OOM 则减小尺寸重试
for attempt in range(5): # 最多重试 5 次
try:
num_elems = target_reserve_bytes // 4
if num_elems <= 0: return None
tensor = torch.randn(num_elems, dtype=torch.float32, device="cuda:{}".format(gpu_id))
torch.cuda.synchronize(gpu_id)
allocated_gb = tensor.element_size() * tensor.numel() / 1024**3
print("GPU {}: Successfully occupied {:.2f} GB.".format(gpu_id, allocated_gb))
return tensor
except RuntimeError as e:
if "out of memory" in str(e).lower():
print("GPU {}: OOM on attempt {}. Reducing target by 256 MB and retrying...".format(
gpu_id, attempt + 1))
target_reserve_bytes -= 256 * 1024 * 1024
else:
print("GPU {}: A non-OOM runtime error occurred: {}".format(gpu_id, e))
return None
print("GPU {}: Failed to allocate memory after all attempts.".format(gpu_id))
return None
except Exception as e:
print("An unexpected error occurred while processing GPU {}: {}".format(gpu_id, e))
return None
def parse_gpu_selection(gpu_arg, max_gpus):
"""Parse GPU selection string like '0,1' or 'cuda:0,cuda:1'."""
if gpu_arg is None:
return list(range(max_gpus))
selected = []
for token in gpu_arg.split(","):
token = token.strip()
if not token:
continue
if token.lower().startswith("cuda:"):
token = token.split(":", 1)[1]
try:
idx = int(token)
except ValueError:
raise ValueError("Invalid GPU identifier '{}'.".format(token))
if idx < 0 or idx >= max_gpus:
raise ValueError("GPU index {} is out of range [0, {}).".format(idx, max_gpus))
if idx not in selected:
selected.append(idx)
if not selected:
raise ValueError("No valid GPU identifiers were provided.")
return selected
def main(args):
num_gpus = torch.cuda.device_count()
if num_gpus == 0:
raise RuntimeError("No GPU detected.")
print("Detected {} GPUs.".format(num_gpus))
try:
gpu_ids = parse_gpu_selection(args.gpus, num_gpus)
except ValueError as parse_error:
raise RuntimeError(str(parse_error))
gpu_label = ", ".join(["cuda:{}".format(idx) for idx in gpu_ids])
print("Using GPUs: {}".format(gpu_label))
# --- 阶段一:显存占用 ---
print("\n--- Stage 1: Allocating memory on all GPUs ---")
tensors = [occupy_gpu_memory(gpu_id, args.fraction, args.extra_reserve_gb) for gpu_id in gpu_ids]
# --- 阶段二:算力保活 ---
print("\n--- Stage 2: Starting keep-alive compute task ---")
compute_tensors = []
for gpu_id in gpu_ids:
try:
torch.cuda.set_device(gpu_id)
compute_tensors.append(torch.randn(args.matrix_size, args.matrix_size, device="cuda:{}".format(gpu_id)))
except Exception:
compute_tensors.append(None)
print("Holding memory with a compute duty cycle of {}s work / {}s sleep.".format(
args.compute_sec, args.sleep_sec))
print("Press Ctrl+C to exit.")
try:
while True:
start_burst_time = time.time()
# 计算阶段
while time.time() - start_burst_time < args.compute_sec:
for idx, gpu_id in enumerate(gpu_ids):
if compute_tensors[idx] is not None:
try:
torch.cuda.set_device(gpu_id)
compute_tensors[idx] = torch.matmul(compute_tensors[idx], compute_tensors[idx].T)
compute_tensors[idx] = compute_tensors[idx] / (compute_tensors[idx].norm() + 1e-6)
except Exception as e:
print("Error during keep-alive on GPU {}: {}".format(gpu_id, e))
compute_tensors[idx] = None # 出错后停止在该 GPU 上的计算
# 同步并打印耗时
for idx, gpu_id in enumerate(gpu_ids):
if compute_tensors[idx] is not None: torch.cuda.synchronize(gpu_id)
actual_compute_time = time.time() - start_burst_time
print("[{}] Compute burst finished in {:.2f}s.".format(
datetime.datetime.now(), actual_compute_time), flush=True)
# 睡眠阶段
time.sleep(args.sleep_sec)
except KeyboardInterrupt:
print("\nExiting and releasing memory...")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Occupy GPU memory and maintain a specified utilization duty cycle.")
parser.add_argument("--fraction", type=float, default=0.95, help="Fraction of total GPU memory to try to occupy.")
parser.add_argument("--extra_reserve_gb", type=int, default=2, help="Additional memory to reserve in GB.")
parser.add_argument("--matrix_size", type=int, default=4096, help="Matrix size for keep-alive computation (e.g., 2048, 4096).")
parser.add_argument("--compute_sec", type=float, default=5.0, help="Target duration (in seconds) for the computation burst.")
parser.add_argument("--sleep_sec", type=float, default=3.0, help="Duration (in seconds) to sleep after each burst.")
parser.add_argument("--gpus", type=str, default=None, help="Comma-separated GPU ids to occupy, e.g. '0,1' or 'cuda:0,cuda:1'. Default uses all GPUs.")
args = parser.parse_args()
main(args)