import codecs import json import os import signal import subprocess from pathlib import Path ACTIVE_PROCESSES = {} def stop_process(job_id: str) -> bool: process = ACTIVE_PROCESSES.get(job_id) if not process or process.poll() is not None: return False try: if os.name == "nt": process.send_signal(signal.CTRL_BREAK_EVENT) else: process.terminate() except Exception: process.terminate() return True def stream_process(command, cwd: Path, job_id=None): env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" popen_kwargs = { "cwd": cwd, "stdout": subprocess.PIPE, "stderr": subprocess.STDOUT, "bufsize": 0, "env": env, } if os.name == "nt": popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP process = subprocess.Popen( command, **popen_kwargs, ) if job_id: ACTIVE_PROCESSES[job_id] = process try: assert process.stdout is not None decoder = codecs.getincrementaldecoder("utf-8")("replace") while True: raw_chunk = process.stdout.read(1) if raw_chunk == b"" and process.poll() is not None: break if raw_chunk: chunk = decoder.decode(raw_chunk) yield f"data: {json.dumps(chunk)}\n\n" exit_code = process.wait() trailing_chunk = decoder.decode(b"", final=True) if trailing_chunk: yield f"data: {json.dumps(trailing_chunk)}\n\n" yield f"data: {json.dumps(chr(10) + f'Process exited with code {exit_code}' + chr(10))}\n\n" event_name = "done" if exit_code == 0 else "failed" yield f"event: {event_name}\ndata: {{}}\n\n" finally: if job_id: ACTIVE_PROCESSES.pop(job_id, None)