Spaces:
Running
Running
| import os, re, queue, threading, subprocess, multiprocessing, sys, gradio as gr | |
| from collections.abc import Callable | |
| class SubprocessPipe: | |
| def __init__(self, cmd:list[str], is_gui_process:bool, total_duration:float, msg:str='Processing', on_progress:Callable[[float], None]|None=None)->None: | |
| self.cmd = cmd | |
| self.is_gui_process = is_gui_process | |
| self.total_duration = total_duration | |
| self.msg = msg | |
| self.process = None | |
| self._stop_requested = False | |
| self.on_progress = on_progress | |
| self.progress_bar = False | |
| if self.is_gui_process: | |
| self.progress_bar = gr.Progress(track_tqdm=False) | |
| self.result = self._run_process() | |
| def _emit_progress(self, percent:float)->None: | |
| if self.on_progress is not None: | |
| self.on_progress(percent) | |
| elif self.progress_bar: | |
| self.progress_bar(percent / 100.0, desc=self.msg) | |
| sys.stdout.write(f"\r{self.msg} - {percent:.1f}%") | |
| sys.stdout.flush() | |
| def _on_complete(self)->None: | |
| msg = f"\n{self.msg} completed!" | |
| print(msg) | |
| if self.progress_bar: | |
| self.progress_bar(1.0, desc=msg) | |
| def _on_error(self, err:Exception)->None: | |
| error = f"{self.msg} failed! {err}" | |
| print(error) | |
| if self.progress_bar: | |
| self.progress_bar(0.0, desc=error) | |
| def _run_process(self)->bool: | |
| try: | |
| is_ffmpeg = "ffmpeg" in os.path.basename(self.cmd[0]) | |
| if is_ffmpeg: | |
| self.process = subprocess.Popen( | |
| self.cmd, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.PIPE, | |
| bufsize=0 | |
| ) | |
| else: | |
| if self.progress_bar: | |
| self.process = subprocess.Popen( | |
| self.cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| bufsize=0 | |
| ) | |
| else: | |
| self.process = subprocess.Popen( | |
| self.cmd, | |
| stdout=None, | |
| stderr=None | |
| ) | |
| if is_ffmpeg: | |
| time_pattern = re.compile(rb'out_time_ms=(\d+)') | |
| last_percent = 0.0 | |
| stderr_queue = queue.Queue() | |
| def read_stderr(): | |
| try: | |
| buffer = b'' | |
| while True: | |
| chunk = self.process.stderr.read(4096) | |
| if not chunk: | |
| break | |
| buffer += chunk | |
| while b'\n' in buffer: | |
| line, buffer = buffer.split(b'\n', 1) | |
| stderr_queue.put(line) | |
| except Exception: | |
| pass | |
| finally: | |
| stderr_queue.put(None) | |
| stderr_thread = threading.Thread(target=read_stderr, daemon=True) | |
| stderr_thread.start() | |
| while True: | |
| try: | |
| line = stderr_queue.get(timeout=0.1) | |
| except queue.Empty: | |
| if self.process.poll() is not None: | |
| break | |
| continue | |
| if line is None: # sentinel = stderr closed | |
| break | |
| match = time_pattern.search(line) | |
| if match and self.total_duration > 0: | |
| current_time = int(match.group(1)) / 1_000_000 | |
| percent = min((current_time / self.total_duration) * 100, 100) | |
| if abs(percent - last_percent) >= 0.5: | |
| self._emit_progress(percent) | |
| last_percent = percent | |
| elif b'progress=end' in line: | |
| self._emit_progress(100.0) | |
| stderr_thread.join() | |
| else: | |
| if self.progress_bar: | |
| tqdm_re = re.compile(rb'(\d{1,3})%\|') | |
| buffer = b'' | |
| last_percent = 0.0 | |
| while True: | |
| chunk = self.process.stdout.read(1024) | |
| if not chunk: | |
| break | |
| buffer += chunk | |
| if b'\r' in buffer: | |
| parts = buffer.split(b'\r') | |
| buffer = parts[-1] | |
| for part in parts[:-1]: | |
| match = tqdm_re.search(part) | |
| if match: | |
| percent = min(float(match.group(1)), 100.0) | |
| if percent - last_percent >= 0.5: | |
| self._emit_progress(percent) | |
| last_percent = percent | |
| self.process.wait() | |
| if self._stop_requested: | |
| return False | |
| elif self.process.returncode==0: | |
| self._on_complete() | |
| return True | |
| else: | |
| self._on_error(self.process.returncode) | |
| return False | |
| except Exception as e: | |
| self._on_error(e) | |
| return False | |
| def stop(self)->bool: | |
| self._stop_requested=True | |
| if self.process and self.process.poll() is None: | |
| try: | |
| self.process.terminate() | |
| except Exception: | |
| pass | |
| return False |