readctrl / code /old /readability_controlv2.py
shahidul034's picture
Add files using upload-large-folder tool
1db7196 verified
import os
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = "2"
import torch
import time
import random
def initialize_and_touch(tensor):
tensor.zero_()
torch.cuda.synchronize()
def dummy_compute(tensor):
result = torch.matmul(tensor, tensor.t())
torch.cuda.synchronize()
return result
device = torch.device("cuda")
total_memory = torch.cuda.get_device_properties(device).total_memory
print(f"Total VRAM: {total_memory / (1024**3):.2f} GB")
allocated_tensors = []
chunk_size_bytes = 4 * 1024**3 # 4 GiB
element_size = torch.tensor([], dtype=torch.float32).element_size()
chunk_elements = chunk_size_bytes // element_size
# Make the chunk roughly square
side = int(chunk_elements ** 0.5)
allocated = 0
target = total_memory * 0.95
print("Allocating and initializing memory...")
while allocated < target:
try:
# Allocate a 2D tensor
chunk = torch.empty((side, side), dtype=torch.float32, device=device)
initialize_and_touch(chunk)
allocated_tensors.append(chunk)
allocated += chunk_size_bytes
print(f"Allocated: {allocated / (1024**3):.2f} GB", end='\r')
except RuntimeError as e:
if 'out of memory' in str(e).lower():
print(f"\nOut of memory after {allocated / (1024**3):.2f} GB")
break
else:
raise
print(f"\nHolding {allocated / (1024**3):.2f} GB in {len(allocated_tensors)} chunks.")
print("Running dummy compute every 30 seconds to show GPU utilization...")
compute_interval = 30
last_compute = time.time()
while True:
now = time.time()
if now - last_compute >= compute_interval:
if allocated_tensors:
t = random.choice(allocated_tensors)
try:
side = min(t.shape[0], 8000)
_ = dummy_compute(t[:side, :side])
print(f"[{time.strftime('%H:%M:%S')}] GPU compute spike (util ↑)")
except Exception as e:
print(f"Compute failed (expected if chunk too big): {e}")
last_compute = now
time.sleep(1)