| |
| """ |
| 一键清空 GPU 显存的 Python 脚本 |
| 支持 CUDA 和 ROCm |
| """ |
|
|
| import os |
| import sys |
| import subprocess |
| import signal |
|
|
| def get_gpu_processes(): |
| """获取所有占用 GPU 的进程""" |
| processes = [] |
| |
| try: |
| |
| result = subprocess.run( |
| ['nvidia-smi', '--query-compute-apps=pid,process_name,used_memory', '--format=csv,noheader'], |
| capture_output=True, text=True, check=True |
| ) |
| |
| for line in result.stdout.strip().split('\n'): |
| if line: |
| parts = line.split(',') |
| if len(parts) >= 2: |
| pid = int(parts[0].strip()) |
| name = parts[1].strip() |
| memory = parts[2].strip() if len(parts) > 2 else 'N/A' |
| processes.append({'pid': pid, 'name': name, 'memory': memory}) |
| |
| return processes, 'nvidia' |
| |
| except (subprocess.CalledProcessError, FileNotFoundError): |
| pass |
| |
| try: |
| |
| result = subprocess.run( |
| ['fuser', '/dev/kfd'] + [f'/dev/dri/renderD{128+i}' for i in range(8)], |
| capture_output=True, text=True |
| ) |
| |
| pids = [] |
| for pid_str in result.stdout.split(): |
| try: |
| pid = int(pid_str) |
| pids.append(pid) |
| except ValueError: |
| continue |
| |
| |
| for pid in pids: |
| try: |
| result = subprocess.run( |
| ['ps', '-p', str(pid), '-o', 'comm='], |
| capture_output=True, text=True, check=True |
| ) |
| name = result.stdout.strip() |
| processes.append({'pid': pid, 'name': name, 'memory': 'N/A'}) |
| except subprocess.CalledProcessError: |
| continue |
| |
| return processes, 'rocm' |
| |
| except (subprocess.CalledProcessError, FileNotFoundError): |
| pass |
| |
| return [], 'unknown' |
|
|
| def kill_processes(processes, force=True): |
| """杀掉指定的进程""" |
| killed = [] |
| failed = [] |
| |
| for proc in processes: |
| pid = proc['pid'] |
| try: |
| if force: |
| os.kill(pid, signal.SIGKILL) |
| else: |
| os.kill(pid, signal.SIGTERM) |
| killed.append(proc) |
| print(f"✅ Killed PID {pid} ({proc['name']}) - Memory: {proc['memory']}") |
| except ProcessLookupError: |
| print(f"⚠️ PID {pid} already dead") |
| except PermissionError: |
| failed.append(proc) |
| print(f"❌ Permission denied for PID {pid} ({proc['name']})") |
| except Exception as e: |
| failed.append(proc) |
| print(f"❌ Failed to kill PID {pid}: {e}") |
| |
| return killed, failed |
|
|
| def show_gpu_status(gpu_type): |
| """显示 GPU 状态""" |
| print("\n📊 GPU Memory Status:") |
| print("=" * 60) |
| |
| if gpu_type == 'nvidia': |
| subprocess.run(['nvidia-smi']) |
| elif gpu_type == 'rocm': |
| subprocess.run(['rocm-smi']) |
| else: |
| print("No GPU monitoring tool available") |
|
|
| def main(): |
| print("🔍 Scanning GPU processes...\n") |
| |
| processes, gpu_type = get_gpu_processes() |
| |
| if not processes: |
| print("✅ No GPU processes found!") |
| show_gpu_status(gpu_type) |
| return 0 |
| |
| print(f"Found {len(processes)} GPU process(es):") |
| print("-" * 60) |
| for proc in processes: |
| print(f" PID: {proc['pid']:6d} | {proc['name']:30s} | Memory: {proc['memory']}") |
| print("-" * 60) |
| |
| |
| if '--force' not in sys.argv and '-f' not in sys.argv: |
| response = input("\n🔪 Kill all these processes? [y/N]: ").strip().lower() |
| if response not in ['y', 'yes']: |
| print("❌ Cancelled") |
| return 1 |
| |
| print("\n🔪 Killing processes...") |
| killed, failed = kill_processes(processes, force=True) |
| |
| print(f"\n✅ Killed {len(killed)} process(es)") |
| if failed: |
| print(f"❌ Failed to kill {len(failed)} process(es) (may need sudo)") |
| |
| show_gpu_status(gpu_type) |
| |
| return 0 if not failed else 1 |
|
|
| if __name__ == '__main__': |
| sys.exit(main()) |
|
|
|
|