Spaces:
Running
Running
File size: 5,760 Bytes
954a7df | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | import os, re, queue, threading, subprocess, multiprocessing, sys, gradio as gr
from collections.abc import Callable
class SubprocessPipe:
def __init__(self, cmd:list[str], is_gui_process:bool, total_duration:float, msg:str='Processing', on_progress:Callable[[float], None]|None=None)->None:
self.cmd = cmd
self.is_gui_process = is_gui_process
self.total_duration = total_duration
self.msg = msg
self.process = None
self._stop_requested = False
self.on_progress = on_progress
self.progress_bar = False
if self.is_gui_process:
self.progress_bar = gr.Progress(track_tqdm=False)
self.result = self._run_process()
def _emit_progress(self, percent:float)->None:
if self.on_progress is not None:
self.on_progress(percent)
elif self.progress_bar:
self.progress_bar(percent / 100.0, desc=self.msg)
sys.stdout.write(f"\r{self.msg} - {percent:.1f}%")
sys.stdout.flush()
def _on_complete(self)->None:
msg = f"\n{self.msg} completed!"
print(msg)
if self.progress_bar:
self.progress_bar(1.0, desc=msg)
def _on_error(self, err:Exception)->None:
error = f"{self.msg} failed! {err}"
print(error)
if self.progress_bar:
self.progress_bar(0.0, desc=error)
def _run_process(self)->bool:
try:
is_ffmpeg = "ffmpeg" in os.path.basename(self.cmd[0])
if is_ffmpeg:
self.process = subprocess.Popen(
self.cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
bufsize=0
)
else:
if self.progress_bar:
self.process = subprocess.Popen(
self.cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0
)
else:
self.process = subprocess.Popen(
self.cmd,
stdout=None,
stderr=None
)
if is_ffmpeg:
time_pattern = re.compile(rb'out_time_ms=(\d+)')
last_percent = 0.0
stderr_queue = queue.Queue()
def read_stderr():
try:
buffer = b''
while True:
chunk = self.process.stderr.read(4096)
if not chunk:
break
buffer += chunk
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
stderr_queue.put(line)
except Exception:
pass
finally:
stderr_queue.put(None)
stderr_thread = threading.Thread(target=read_stderr, daemon=True)
stderr_thread.start()
while True:
try:
line = stderr_queue.get(timeout=0.1)
except queue.Empty:
if self.process.poll() is not None:
break
continue
if line is None: # sentinel = stderr closed
break
match = time_pattern.search(line)
if match and self.total_duration > 0:
current_time = int(match.group(1)) / 1_000_000
percent = min((current_time / self.total_duration) * 100, 100)
if abs(percent - last_percent) >= 0.5:
self._emit_progress(percent)
last_percent = percent
elif b'progress=end' in line:
self._emit_progress(100.0)
stderr_thread.join()
else:
if self.progress_bar:
tqdm_re = re.compile(rb'(\d{1,3})%\|')
buffer = b''
last_percent = 0.0
while True:
chunk = self.process.stdout.read(1024)
if not chunk:
break
buffer += chunk
if b'\r' in buffer:
parts = buffer.split(b'\r')
buffer = parts[-1]
for part in parts[:-1]:
match = tqdm_re.search(part)
if match:
percent = min(float(match.group(1)), 100.0)
if percent - last_percent >= 0.5:
self._emit_progress(percent)
last_percent = percent
self.process.wait()
if self._stop_requested:
return False
elif self.process.returncode==0:
self._on_complete()
return True
else:
self._on_error(self.process.returncode)
return False
except Exception as e:
self._on_error(e)
return False
def stop(self)->bool:
self._stop_requested=True
if self.process and self.process.poll() is None:
try:
self.process.terminate()
except Exception:
pass
return False |