ebook2audiobook / lib /classes /subprocess_pipe.py
workmin
initial deploy
954a7df
import os, re, queue, threading, subprocess, multiprocessing, sys, gradio as gr
from collections.abc import Callable
class SubprocessPipe:
def __init__(self, cmd:list[str], is_gui_process:bool, total_duration:float, msg:str='Processing', on_progress:Callable[[float], None]|None=None)->None:
self.cmd = cmd
self.is_gui_process = is_gui_process
self.total_duration = total_duration
self.msg = msg
self.process = None
self._stop_requested = False
self.on_progress = on_progress
self.progress_bar = False
if self.is_gui_process:
self.progress_bar = gr.Progress(track_tqdm=False)
self.result = self._run_process()
def _emit_progress(self, percent:float)->None:
if self.on_progress is not None:
self.on_progress(percent)
elif self.progress_bar:
self.progress_bar(percent / 100.0, desc=self.msg)
sys.stdout.write(f"\r{self.msg} - {percent:.1f}%")
sys.stdout.flush()
def _on_complete(self)->None:
msg = f"\n{self.msg} completed!"
print(msg)
if self.progress_bar:
self.progress_bar(1.0, desc=msg)
def _on_error(self, err:Exception)->None:
error = f"{self.msg} failed! {err}"
print(error)
if self.progress_bar:
self.progress_bar(0.0, desc=error)
def _run_process(self)->bool:
try:
is_ffmpeg = "ffmpeg" in os.path.basename(self.cmd[0])
if is_ffmpeg:
self.process = subprocess.Popen(
self.cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
bufsize=0
)
else:
if self.progress_bar:
self.process = subprocess.Popen(
self.cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0
)
else:
self.process = subprocess.Popen(
self.cmd,
stdout=None,
stderr=None
)
if is_ffmpeg:
time_pattern = re.compile(rb'out_time_ms=(\d+)')
last_percent = 0.0
stderr_queue = queue.Queue()
def read_stderr():
try:
buffer = b''
while True:
chunk = self.process.stderr.read(4096)
if not chunk:
break
buffer += chunk
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
stderr_queue.put(line)
except Exception:
pass
finally:
stderr_queue.put(None)
stderr_thread = threading.Thread(target=read_stderr, daemon=True)
stderr_thread.start()
while True:
try:
line = stderr_queue.get(timeout=0.1)
except queue.Empty:
if self.process.poll() is not None:
break
continue
if line is None: # sentinel = stderr closed
break
match = time_pattern.search(line)
if match and self.total_duration > 0:
current_time = int(match.group(1)) / 1_000_000
percent = min((current_time / self.total_duration) * 100, 100)
if abs(percent - last_percent) >= 0.5:
self._emit_progress(percent)
last_percent = percent
elif b'progress=end' in line:
self._emit_progress(100.0)
stderr_thread.join()
else:
if self.progress_bar:
tqdm_re = re.compile(rb'(\d{1,3})%\|')
buffer = b''
last_percent = 0.0
while True:
chunk = self.process.stdout.read(1024)
if not chunk:
break
buffer += chunk
if b'\r' in buffer:
parts = buffer.split(b'\r')
buffer = parts[-1]
for part in parts[:-1]:
match = tqdm_re.search(part)
if match:
percent = min(float(match.group(1)), 100.0)
if percent - last_percent >= 0.5:
self._emit_progress(percent)
last_percent = percent
self.process.wait()
if self._stop_requested:
return False
elif self.process.returncode==0:
self._on_complete()
return True
else:
self._on_error(self.process.returncode)
return False
except Exception as e:
self._on_error(e)
return False
def stop(self)->bool:
self._stop_requested=True
if self.process and self.process.poll() is None:
try:
self.process.terminate()
except Exception:
pass
return False