mastermap-cleaner / src /process_runner.py
andrewbejjani's picture
Added functional doc in README.md and added basic
c6a3f44
import codecs
import json
import os
import signal
import subprocess
from pathlib import Path
ACTIVE_PROCESSES = {}
def stop_process(job_id: str) -> bool:
"""Stop a tracked subprocess by frontend job id."""
process = ACTIVE_PROCESSES.get(job_id)
if not process or process.poll() is not None:
return False
try:
if os.name == "nt":
process.send_signal(signal.CTRL_BREAK_EVENT)
else:
process.terminate()
except Exception:
process.terminate()
return True
def stream_process(command, cwd: Path, job_id=None):
"""Run a command and yield stdout/stderr as server-sent event chunks."""
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
popen_kwargs = {
"cwd": cwd,
"stdout": subprocess.PIPE,
"stderr": subprocess.STDOUT,
"bufsize": 0,
"env": env,
}
if os.name == "nt":
# Required so CTRL_BREAK can stop child Python processes on Windows.
popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
process = subprocess.Popen(
command,
**popen_kwargs,
)
if job_id:
# The UI can later call /stop with this id.
ACTIVE_PROCESSES[job_id] = process
try:
assert process.stdout is not None
decoder = codecs.getincrementaldecoder("utf-8")("replace")
while True:
raw_chunk = process.stdout.read(1)
if raw_chunk == b"" and process.poll() is not None:
break
if raw_chunk:
chunk = decoder.decode(raw_chunk)
yield f"data: {json.dumps(chunk)}\n\n"
exit_code = process.wait()
trailing_chunk = decoder.decode(b"", final=True)
if trailing_chunk:
yield f"data: {json.dumps(trailing_chunk)}\n\n"
# The frontend distinguishes a real success from a crashed subprocess.
yield f"data: {json.dumps(chr(10) + f'Process exited with code {exit_code}' + chr(10))}\n\n"
event_name = "done" if exit_code == 0 else "failed"
yield f"event: {event_name}\ndata: {{}}\n\n"
finally:
if job_id:
ACTIVE_PROCESSES.pop(job_id, None)