File size: 6,462 Bytes
36ab767
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# -*- coding: utf-8 -*-
import torch
import time
import datetime
import argparse

def occupy_gpu_memory(gpu_id, fraction, extra_reserve_gb):
    """在单张 GPU 上尝试分配显存,并支持重试。"""
    try:
        torch.cuda.set_device(gpu_id)
        prop = torch.cuda.get_device_properties(gpu_id)
        total_memory = prop.total_memory
        total_gb = total_memory / 1024**3

        target_reserve_bytes = int(total_memory * fraction) - int(extra_reserve_gb * 1024**3)

        print("GPU {} ({}): total={:.2f} GB, initial target occupying ~= {:.2f} GB".format(
            gpu_id, prop.name, total_gb, target_reserve_bytes / 1024**3))

        if target_reserve_bytes <= 0:
            print("GPU {}: Target occupation is non-positive, skipping.".format(gpu_id))
            return None

        # 尝试分配,如果 OOM 则减小尺寸重试
        for attempt in range(5): # 最多重试 5 次
            try:
                num_elems = target_reserve_bytes // 4
                if num_elems <= 0: return None

                tensor = torch.randn(num_elems, dtype=torch.float32, device="cuda:{}".format(gpu_id))
                torch.cuda.synchronize(gpu_id)
                allocated_gb = tensor.element_size() * tensor.numel() / 1024**3
                print("GPU {}: Successfully occupied {:.2f} GB.".format(gpu_id, allocated_gb))
                return tensor
            except RuntimeError as e:
                if "out of memory" in str(e).lower():
                    print("GPU {}: OOM on attempt {}. Reducing target by 256 MB and retrying...".format(
                        gpu_id, attempt + 1))
                    target_reserve_bytes -= 256 * 1024 * 1024
                else:
                    print("GPU {}: A non-OOM runtime error occurred: {}".format(gpu_id, e))
                    return None

        print("GPU {}: Failed to allocate memory after all attempts.".format(gpu_id))
        return None

    except Exception as e:
        print("An unexpected error occurred while processing GPU {}: {}".format(gpu_id, e))
        return None

def parse_gpu_selection(gpu_arg, max_gpus):
    """Parse GPU selection string like '0,1' or 'cuda:0,cuda:1'."""
    if gpu_arg is None:
        return list(range(max_gpus))

    selected = []
    for token in gpu_arg.split(","):
        token = token.strip()
        if not token:
            continue
        if token.lower().startswith("cuda:"):
            token = token.split(":", 1)[1]
        try:
            idx = int(token)
        except ValueError:
            raise ValueError("Invalid GPU identifier '{}'.".format(token))
        if idx < 0 or idx >= max_gpus:
            raise ValueError("GPU index {} is out of range [0, {}).".format(idx, max_gpus))
        if idx not in selected:
            selected.append(idx)

    if not selected:
        raise ValueError("No valid GPU identifiers were provided.")
    return selected


def main(args):
    num_gpus = torch.cuda.device_count()
    if num_gpus == 0:
        raise RuntimeError("No GPU detected.")
    print("Detected {} GPUs.".format(num_gpus))

    try:
        gpu_ids = parse_gpu_selection(args.gpus, num_gpus)
    except ValueError as parse_error:
        raise RuntimeError(str(parse_error))

    gpu_label = ", ".join(["cuda:{}".format(idx) for idx in gpu_ids])
    print("Using GPUs: {}".format(gpu_label))

    # --- 阶段一:显存占用 ---
    print("\n--- Stage 1: Allocating memory on all GPUs ---")
    tensors = [occupy_gpu_memory(gpu_id, args.fraction, args.extra_reserve_gb) for gpu_id in gpu_ids]

    # --- 阶段二:算力保活 ---
    print("\n--- Stage 2: Starting keep-alive compute task ---")
    compute_tensors = []
    for gpu_id in gpu_ids:
        try:
            torch.cuda.set_device(gpu_id)
            compute_tensors.append(torch.randn(args.matrix_size, args.matrix_size, device="cuda:{}".format(gpu_id)))
        except Exception:
            compute_tensors.append(None)

    print("Holding memory with a compute duty cycle of {}s work / {}s sleep.".format(
        args.compute_sec, args.sleep_sec))
    print("Press Ctrl+C to exit.")

    try:
        while True:
            start_burst_time = time.time()

            # 计算阶段
            while time.time() - start_burst_time < args.compute_sec:
                for idx, gpu_id in enumerate(gpu_ids):
                    if compute_tensors[idx] is not None:
                        try:
                            torch.cuda.set_device(gpu_id)
                            compute_tensors[idx] = torch.matmul(compute_tensors[idx], compute_tensors[idx].T)
                            compute_tensors[idx] = compute_tensors[idx] / (compute_tensors[idx].norm() + 1e-6)
                        except Exception as e:
                            print("Error during keep-alive on GPU {}: {}".format(gpu_id, e))
                            compute_tensors[idx] = None # 出错后停止在该 GPU 上的计算

            # 同步并打印耗时
            for idx, gpu_id in enumerate(gpu_ids):
                if compute_tensors[idx] is not None: torch.cuda.synchronize(gpu_id)
            actual_compute_time = time.time() - start_burst_time
            print("[{}] Compute burst finished in {:.2f}s.".format(
                datetime.datetime.now(), actual_compute_time), flush=True)

            # 睡眠阶段
            time.sleep(args.sleep_sec)

    except KeyboardInterrupt:
        print("\nExiting and releasing memory...")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Occupy GPU memory and maintain a specified utilization duty cycle.")
    parser.add_argument("--fraction", type=float, default=0.95, help="Fraction of total GPU memory to try to occupy.")
    parser.add_argument("--extra_reserve_gb", type=int, default=2, help="Additional memory to reserve in GB.")
    parser.add_argument("--matrix_size", type=int, default=4096, help="Matrix size for keep-alive computation (e.g., 2048, 4096).")
    parser.add_argument("--compute_sec", type=float, default=5.0, help="Target duration (in seconds) for the computation burst.")
    parser.add_argument("--sleep_sec", type=float, default=3.0, help="Duration (in seconds) to sleep after each burst.")
    parser.add_argument("--gpus", type=str, default=None, help="Comma-separated GPU ids to occupy, e.g. '0,1' or 'cuda:0,cuda:1'. Default uses all GPUs.")

    args = parser.parse_args()
    main(args)