# -*- coding: utf-8 -*- import torch import time import datetime import argparse def occupy_gpu_memory(gpu_id, fraction, extra_reserve_gb): """在单张 GPU 上尝试分配显存,并支持重试。""" try: torch.cuda.set_device(gpu_id) prop = torch.cuda.get_device_properties(gpu_id) total_memory = prop.total_memory total_gb = total_memory / 1024**3 target_reserve_bytes = int(total_memory * fraction) - int(extra_reserve_gb * 1024**3) print("GPU {} ({}): total={:.2f} GB, initial target occupying ~= {:.2f} GB".format( gpu_id, prop.name, total_gb, target_reserve_bytes / 1024**3)) if target_reserve_bytes <= 0: print("GPU {}: Target occupation is non-positive, skipping.".format(gpu_id)) return None # 尝试分配,如果 OOM 则减小尺寸重试 for attempt in range(5): # 最多重试 5 次 try: num_elems = target_reserve_bytes // 4 if num_elems <= 0: return None tensor = torch.randn(num_elems, dtype=torch.float32, device="cuda:{}".format(gpu_id)) torch.cuda.synchronize(gpu_id) allocated_gb = tensor.element_size() * tensor.numel() / 1024**3 print("GPU {}: Successfully occupied {:.2f} GB.".format(gpu_id, allocated_gb)) return tensor except RuntimeError as e: if "out of memory" in str(e).lower(): print("GPU {}: OOM on attempt {}. Reducing target by 256 MB and retrying...".format( gpu_id, attempt + 1)) target_reserve_bytes -= 256 * 1024 * 1024 else: print("GPU {}: A non-OOM runtime error occurred: {}".format(gpu_id, e)) return None print("GPU {}: Failed to allocate memory after all attempts.".format(gpu_id)) return None except Exception as e: print("An unexpected error occurred while processing GPU {}: {}".format(gpu_id, e)) return None def parse_gpu_selection(gpu_arg, max_gpus): """Parse GPU selection string like '0,1' or 'cuda:0,cuda:1'.""" if gpu_arg is None: return list(range(max_gpus)) selected = [] for token in gpu_arg.split(","): token = token.strip() if not token: continue if token.lower().startswith("cuda:"): token = token.split(":", 1)[1] try: idx = int(token) except ValueError: raise ValueError("Invalid GPU identifier '{}'.".format(token)) if idx < 0 or idx >= max_gpus: raise ValueError("GPU index {} is out of range [0, {}).".format(idx, max_gpus)) if idx not in selected: selected.append(idx) if not selected: raise ValueError("No valid GPU identifiers were provided.") return selected def main(args): num_gpus = torch.cuda.device_count() if num_gpus == 0: raise RuntimeError("No GPU detected.") print("Detected {} GPUs.".format(num_gpus)) try: gpu_ids = parse_gpu_selection(args.gpus, num_gpus) except ValueError as parse_error: raise RuntimeError(str(parse_error)) gpu_label = ", ".join(["cuda:{}".format(idx) for idx in gpu_ids]) print("Using GPUs: {}".format(gpu_label)) # --- 阶段一:显存占用 --- print("\n--- Stage 1: Allocating memory on all GPUs ---") tensors = [occupy_gpu_memory(gpu_id, args.fraction, args.extra_reserve_gb) for gpu_id in gpu_ids] # --- 阶段二:算力保活 --- print("\n--- Stage 2: Starting keep-alive compute task ---") compute_tensors = [] for gpu_id in gpu_ids: try: torch.cuda.set_device(gpu_id) compute_tensors.append(torch.randn(args.matrix_size, args.matrix_size, device="cuda:{}".format(gpu_id))) except Exception: compute_tensors.append(None) print("Holding memory with a compute duty cycle of {}s work / {}s sleep.".format( args.compute_sec, args.sleep_sec)) print("Press Ctrl+C to exit.") try: while True: start_burst_time = time.time() # 计算阶段 while time.time() - start_burst_time < args.compute_sec: for idx, gpu_id in enumerate(gpu_ids): if compute_tensors[idx] is not None: try: torch.cuda.set_device(gpu_id) compute_tensors[idx] = torch.matmul(compute_tensors[idx], compute_tensors[idx].T) compute_tensors[idx] = compute_tensors[idx] / (compute_tensors[idx].norm() + 1e-6) except Exception as e: print("Error during keep-alive on GPU {}: {}".format(gpu_id, e)) compute_tensors[idx] = None # 出错后停止在该 GPU 上的计算 # 同步并打印耗时 for idx, gpu_id in enumerate(gpu_ids): if compute_tensors[idx] is not None: torch.cuda.synchronize(gpu_id) actual_compute_time = time.time() - start_burst_time print("[{}] Compute burst finished in {:.2f}s.".format( datetime.datetime.now(), actual_compute_time), flush=True) # 睡眠阶段 time.sleep(args.sleep_sec) except KeyboardInterrupt: print("\nExiting and releasing memory...") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Occupy GPU memory and maintain a specified utilization duty cycle.") parser.add_argument("--fraction", type=float, default=0.95, help="Fraction of total GPU memory to try to occupy.") parser.add_argument("--extra_reserve_gb", type=int, default=2, help="Additional memory to reserve in GB.") parser.add_argument("--matrix_size", type=int, default=4096, help="Matrix size for keep-alive computation (e.g., 2048, 4096).") parser.add_argument("--compute_sec", type=float, default=5.0, help="Target duration (in seconds) for the computation burst.") parser.add_argument("--sleep_sec", type=float, default=3.0, help="Duration (in seconds) to sleep after each burst.") parser.add_argument("--gpus", type=str, default=None, help="Comma-separated GPU ids to occupy, e.g. '0,1' or 'cuda:0,cuda:1'. Default uses all GPUs.") args = parser.parse_args() main(args)