File size: 4,331 Bytes
9a71cb6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 | #!/usr/bin/env python3
"""
一键清空 GPU 显存的 Python 脚本
支持 CUDA 和 ROCm
"""
import os
import sys
import subprocess
import signal
def get_gpu_processes():
"""获取所有占用 GPU 的进程"""
processes = []
try:
# 尝试使用 nvidia-smi
result = subprocess.run(
['nvidia-smi', '--query-compute-apps=pid,process_name,used_memory', '--format=csv,noheader'],
capture_output=True, text=True, check=True
)
for line in result.stdout.strip().split('\n'):
if line:
parts = line.split(',')
if len(parts) >= 2:
pid = int(parts[0].strip())
name = parts[1].strip()
memory = parts[2].strip() if len(parts) > 2 else 'N/A'
processes.append({'pid': pid, 'name': name, 'memory': memory})
return processes, 'nvidia'
except (subprocess.CalledProcessError, FileNotFoundError):
pass
try:
# 尝试使用 rocm-smi (通过 fuser 查找)
result = subprocess.run(
['fuser', '/dev/kfd'] + [f'/dev/dri/renderD{128+i}' for i in range(8)],
capture_output=True, text=True
)
pids = []
for pid_str in result.stdout.split():
try:
pid = int(pid_str)
pids.append(pid)
except ValueError:
continue
# 获取进程详情
for pid in pids:
try:
result = subprocess.run(
['ps', '-p', str(pid), '-o', 'comm='],
capture_output=True, text=True, check=True
)
name = result.stdout.strip()
processes.append({'pid': pid, 'name': name, 'memory': 'N/A'})
except subprocess.CalledProcessError:
continue
return processes, 'rocm'
except (subprocess.CalledProcessError, FileNotFoundError):
pass
return [], 'unknown'
def kill_processes(processes, force=True):
"""杀掉指定的进程"""
killed = []
failed = []
for proc in processes:
pid = proc['pid']
try:
if force:
os.kill(pid, signal.SIGKILL)
else:
os.kill(pid, signal.SIGTERM)
killed.append(proc)
print(f"✅ Killed PID {pid} ({proc['name']}) - Memory: {proc['memory']}")
except ProcessLookupError:
print(f"⚠️ PID {pid} already dead")
except PermissionError:
failed.append(proc)
print(f"❌ Permission denied for PID {pid} ({proc['name']})")
except Exception as e:
failed.append(proc)
print(f"❌ Failed to kill PID {pid}: {e}")
return killed, failed
def show_gpu_status(gpu_type):
"""显示 GPU 状态"""
print("\n📊 GPU Memory Status:")
print("=" * 60)
if gpu_type == 'nvidia':
subprocess.run(['nvidia-smi'])
elif gpu_type == 'rocm':
subprocess.run(['rocm-smi'])
else:
print("No GPU monitoring tool available")
def main():
print("🔍 Scanning GPU processes...\n")
processes, gpu_type = get_gpu_processes()
if not processes:
print("✅ No GPU processes found!")
show_gpu_status(gpu_type)
return 0
print(f"Found {len(processes)} GPU process(es):")
print("-" * 60)
for proc in processes:
print(f" PID: {proc['pid']:6d} | {proc['name']:30s} | Memory: {proc['memory']}")
print("-" * 60)
# 询问确认(如果需要)
if '--force' not in sys.argv and '-f' not in sys.argv:
response = input("\n🔪 Kill all these processes? [y/N]: ").strip().lower()
if response not in ['y', 'yes']:
print("❌ Cancelled")
return 1
print("\n🔪 Killing processes...")
killed, failed = kill_processes(processes, force=True)
print(f"\n✅ Killed {len(killed)} process(es)")
if failed:
print(f"❌ Failed to kill {len(failed)} process(es) (may need sudo)")
show_gpu_status(gpu_type)
return 0 if not failed else 1
if __name__ == '__main__':
sys.exit(main())
|