import os, re, queue, threading, subprocess, multiprocessing, sys, gradio as gr from collections.abc import Callable class SubprocessPipe: def __init__(self, cmd:list[str], is_gui_process:bool, total_duration:float, msg:str='Processing', on_progress:Callable[[float], None]|None=None)->None: self.cmd = cmd self.is_gui_process = is_gui_process self.total_duration = total_duration self.msg = msg self.process = None self._stop_requested = False self.on_progress = on_progress self.progress_bar = False if self.is_gui_process: self.progress_bar = gr.Progress(track_tqdm=False) self.result = self._run_process() def _emit_progress(self, percent:float)->None: if self.on_progress is not None: self.on_progress(percent) elif self.progress_bar: self.progress_bar(percent / 100.0, desc=self.msg) sys.stdout.write(f"\r{self.msg} - {percent:.1f}%") sys.stdout.flush() def _on_complete(self)->None: msg = f"\n{self.msg} completed!" print(msg) if self.progress_bar: self.progress_bar(1.0, desc=msg) def _on_error(self, err:Exception)->None: error = f"{self.msg} failed! {err}" print(error) if self.progress_bar: self.progress_bar(0.0, desc=error) def _run_process(self)->bool: try: is_ffmpeg = "ffmpeg" in os.path.basename(self.cmd[0]) if is_ffmpeg: self.process = subprocess.Popen( self.cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, bufsize=0 ) else: if self.progress_bar: self.process = subprocess.Popen( self.cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0 ) else: self.process = subprocess.Popen( self.cmd, stdout=None, stderr=None ) if is_ffmpeg: time_pattern = re.compile(rb'out_time_ms=(\d+)') last_percent = 0.0 stderr_queue = queue.Queue() def read_stderr(): try: buffer = b'' while True: chunk = self.process.stderr.read(4096) if not chunk: break buffer += chunk while b'\n' in buffer: line, buffer = buffer.split(b'\n', 1) stderr_queue.put(line) except Exception: pass finally: stderr_queue.put(None) stderr_thread = threading.Thread(target=read_stderr, daemon=True) stderr_thread.start() while True: try: line = stderr_queue.get(timeout=0.1) except queue.Empty: if self.process.poll() is not None: break continue if line is None: # sentinel = stderr closed break match = time_pattern.search(line) if match and self.total_duration > 0: current_time = int(match.group(1)) / 1_000_000 percent = min((current_time / self.total_duration) * 100, 100) if abs(percent - last_percent) >= 0.5: self._emit_progress(percent) last_percent = percent elif b'progress=end' in line: self._emit_progress(100.0) stderr_thread.join() else: if self.progress_bar: tqdm_re = re.compile(rb'(\d{1,3})%\|') buffer = b'' last_percent = 0.0 while True: chunk = self.process.stdout.read(1024) if not chunk: break buffer += chunk if b'\r' in buffer: parts = buffer.split(b'\r') buffer = parts[-1] for part in parts[:-1]: match = tqdm_re.search(part) if match: percent = min(float(match.group(1)), 100.0) if percent - last_percent >= 0.5: self._emit_progress(percent) last_percent = percent self.process.wait() if self._stop_requested: return False elif self.process.returncode==0: self._on_complete() return True else: self._on_error(self.process.returncode) return False except Exception as e: self._on_error(e) return False def stop(self)->bool: self._stop_requested=True if self.process and self.process.poll() is None: try: self.process.terminate() except Exception: pass return False