File size: 5,760 Bytes
954a7df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import os, re, queue, threading, subprocess, multiprocessing, sys, gradio as gr

from collections.abc import Callable

class SubprocessPipe:

    def __init__(self, cmd:list[str], is_gui_process:bool, total_duration:float, msg:str='Processing', on_progress:Callable[[float], None]|None=None)->None:
        self.cmd = cmd
        self.is_gui_process = is_gui_process
        self.total_duration = total_duration
        self.msg = msg
        self.process = None
        self._stop_requested = False
        self.on_progress = on_progress
        self.progress_bar = False
        if self.is_gui_process:
            self.progress_bar = gr.Progress(track_tqdm=False)
        self.result = self._run_process()
        
    def _emit_progress(self, percent:float)->None:
        if self.on_progress is not None:
            self.on_progress(percent)
        elif self.progress_bar:
            self.progress_bar(percent / 100.0, desc=self.msg)
        sys.stdout.write(f"\r{self.msg} - {percent:.1f}%")
        sys.stdout.flush()

    def _on_complete(self)->None:
        msg = f"\n{self.msg} completed!"
        print(msg)
        if self.progress_bar:
            self.progress_bar(1.0, desc=msg)

    def _on_error(self, err:Exception)->None:
        error = f"{self.msg} failed! {err}"
        print(error)
        if self.progress_bar:
            self.progress_bar(0.0, desc=error)

    def _run_process(self)->bool:
        try:
            is_ffmpeg = "ffmpeg" in os.path.basename(self.cmd[0])
            if is_ffmpeg:
                self.process = subprocess.Popen(
                    self.cmd,
                    stdout=subprocess.DEVNULL,
                    stderr=subprocess.PIPE,
                    bufsize=0
                )
            else:
                if self.progress_bar:
                    self.process = subprocess.Popen(
                        self.cmd,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.STDOUT,
                        bufsize=0
                    )
                else:
                    self.process = subprocess.Popen(
                        self.cmd,
                        stdout=None,
                        stderr=None
                    )
            if is_ffmpeg:
                time_pattern = re.compile(rb'out_time_ms=(\d+)')
                last_percent = 0.0
                stderr_queue = queue.Queue()

                def read_stderr():
                    try:
                        buffer = b''
                        while True:
                            chunk = self.process.stderr.read(4096)
                            if not chunk:
                                break
                            buffer += chunk
                            while b'\n' in buffer:
                                line, buffer = buffer.split(b'\n', 1)
                                stderr_queue.put(line)
                    except Exception:
                        pass
                    finally:
                        stderr_queue.put(None)

                stderr_thread = threading.Thread(target=read_stderr, daemon=True)
                stderr_thread.start()

                while True:
                    try:
                        line = stderr_queue.get(timeout=0.1)
                    except queue.Empty:
                        if self.process.poll() is not None:
                            break
                        continue

                    if line is None:  # sentinel = stderr closed
                        break
                    match = time_pattern.search(line)
                    if match and self.total_duration > 0:
                        current_time = int(match.group(1)) / 1_000_000
                        percent = min((current_time / self.total_duration) * 100, 100)
                        if abs(percent - last_percent) >= 0.5:
                            self._emit_progress(percent)
                            last_percent = percent
                    elif b'progress=end' in line:
                        self._emit_progress(100.0)
                stderr_thread.join()
            else:
                if self.progress_bar:
                    tqdm_re = re.compile(rb'(\d{1,3})%\|')
                    buffer = b''
                    last_percent = 0.0
                    while True:
                        chunk = self.process.stdout.read(1024)
                        if not chunk:
                            break
                        buffer += chunk
                        if b'\r' in buffer:
                            parts = buffer.split(b'\r')
                            buffer = parts[-1]
                            for part in parts[:-1]:
                                match = tqdm_re.search(part)
                                if match:
                                    percent = min(float(match.group(1)), 100.0)
                                    if percent - last_percent >= 0.5:
                                        self._emit_progress(percent)
                                        last_percent = percent
            self.process.wait()
            if self._stop_requested:
                return False
            elif self.process.returncode==0:
                self._on_complete()
                return True
            else:
                self._on_error(self.process.returncode)
                return False
        except Exception as e:
            self._on_error(e)
            return False

    def stop(self)->bool:
        self._stop_requested=True
        if self.process and self.process.poll() is None:
            try:
                self.process.terminate()
            except Exception:
                pass
        return False