darkmedia-x-api / engine /stop_engine.py
cybermedia's picture
Upload folder using huggingface_hub
343eed9 verified
import os
import signal
import subprocess
import sys
def kill_engine_processes():
print("SEARCHING Searching for DarkMedia Engine processes...")
try:
# Get all python processes with "content creator" or "Engine" in command line
if sys.platform == "win32":
import ctypes
# Filter for python.exe processes that match our project
# Fast scan using wmic (Avoids CIM/PowerShell hangs)
cmd = 'wmic process where "name=\'python.exe\'" get ProcessId,CommandLine'
try:
output = subprocess.check_output(cmd, shell=True, timeout=10).decode('utf-8', errors='ignore')
except subprocess.TimeoutExpired:
print("⚠️ [TIMEOUT] WMIC scan took too long. Force killing all python.exe...")
os.system('taskkill /F /IM python.exe /T')
return
lines = [line.strip() for line in output.split('\n') if line.strip() and "CommandLine" not in line]
pids = []
my_pid = os.getpid()
parent_pid = os.getppid()
for line in lines:
lower_line = line.lower()
# Skip 00_master_pipeline to avoid self-killing
if "00_master_pipeline.py" in lower_line or "dashboard_server.py" in lower_line:
continue
# Match our project directory or engine keywords
if "content creator" in lower_line or "engine" in lower_line or "pipeline" in lower_line:
# Extract the PID (usually at the end of the line in wmic output)
parts = line.rsplit(None, 1)
if len(parts) > 1 and parts[-1].isdigit():
pid = int(parts[-1])
if pid != my_pid and pid != parent_pid:
pids.append(pid)
else:
print(f"ℹ️ Skipping protected PID {pid} (Self/Parent)")
if not pids:
print("CHECK No orphaned Engine processes found.")
return
print(f"WARNING Found {len(pids)} processes to terminate: {pids}")
for pid in pids:
try:
os.kill(pid, signal.SIGTERM)
print("X Terminated PID", pid)
except Exception as e:
print(f" X Failed to kill {pid}: {e}")
# Force kill if SIGTERM fails
try:
os.kill(pid, signal.SIGKILL)
print(f" FIRE Force killed PID {pid}")
except: pass
else:
# Linux process cleanup
print(" 🔍 Scan des processus orphelins (Linux)...")
try:
# On cherche les processus python qui tournent dans le dossier du projet
# ou qui contiennent des mots clés spécifiques
cmd = "ps -ef | grep python | grep -E 'engine|pipeline|backend' | grep -v grep"
output = subprocess.check_output(cmd, shell=True).decode('utf-8', errors='ignore')
lines = output.split('\n')
my_pid = os.getpid()
parent_pid = os.getppid()
pids_to_kill = []
for line in lines:
if not line.strip(): continue
parts = line.split()
if len(parts) < 2: continue
pid = int(parts[1])
cmdline = " ".join(parts[7:])
# Ne pas se tuer soi-même ni le parent (master_pipeline)
if pid == my_pid or pid == parent_pid:
continue
# Protection spécifique pour le serveur dashboard et le master pipeline
if "00_master_pipeline.py" in cmdline or "main.py" in cmdline:
continue
pids_to_kill.append(pid)
# On cherche aussi Blender
try:
blender_pids = subprocess.check_output(["pgrep", "blender"]).decode().split()
for bpid in blender_pids:
pids_to_kill.append(int(bpid))
except: pass
if pids_to_kill:
print(f" ⚠️ Nettoyage de {len(pids_to_kill)} processus : {pids_to_kill}")
for pid in pids_to_kill:
try:
os.kill(pid, signal.SIGKILL)
print(f" ✅ tué PID {pid}")
except: pass
else:
print(" ✅ Aucun processus orphelin détecté.")
except Exception as e:
print(f" ℹ️ Rien à nettoyer ou erreur légère: {e}")
except Exception as e:
print(f"❌ Error during cleanup: {e}")
if __name__ == "__main__":
kill_engine_processes()
print("\n✨ Cleanup finished.")